signals handling

This commit is contained in:
Devaev Maxim 2021-05-25 23:26:13 +03:00
parent 98c3956994
commit 6ce07208a1
2 changed files with 40 additions and 28 deletions

View File

@ -21,6 +21,7 @@
import asyncio
import signal
import functools
import types
@ -101,6 +102,20 @@ async def close_writer(writer: asyncio.StreamWriter) -> bool:
return (not closing)
# =====
def run(coro: Coroutine) -> None:
def sigint_handler() -> None:
raise KeyboardInterrupt()
def sigterm_handler() -> None:
raise SystemExit()
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, sigint_handler)
loop.add_signal_handler(signal.SIGTERM, sigterm_handler)
loop.run_until_complete(coro)
# =====
class AioNotifier:
def __init__(self) -> None:

View File

@ -62,43 +62,38 @@ class JanusRunner: # pylint: disable=too-many-instance-attributes
logger = get_logger(0)
logger.info("Starting Janus Runner ...")
try:
asyncio.run(self.__run())
aiotools.run(self.__run())
except (SystemExit, KeyboardInterrupt):
pass
aiotools.run(self.__stop_janus())
logger.info("Bye-bye")
# =====
async def __run(self) -> None:
logger = get_logger(0)
try:
prev_netcfg: Optional[_Netcfg] = None
while True:
retry = 0
netcfg = _Netcfg()
for retry in range(self.__check_retries):
netcfg = await self.__get_netcfg()
if netcfg.ext_ip:
break
await asyncio.sleep(self.__check_retries_delay)
if retry != 0 and netcfg.ext_ip:
logger.info("I'm fine, continue working ...")
prev_netcfg: Optional[_Netcfg] = None
while True:
retry = 0
netcfg = _Netcfg()
for retry in range(self.__check_retries):
netcfg = await self.__get_netcfg()
if netcfg.ext_ip:
break
await asyncio.sleep(self.__check_retries_delay)
if retry != 0 and netcfg.ext_ip:
logger.info("I'm fine, continue working ...")
if netcfg != prev_netcfg:
logger.info("Got new %s", netcfg)
if netcfg.src_ip and netcfg.ext_ip:
logger.info("Okay, restarting Janus ...")
await self.__stop_janus()
await self.__start_janus(netcfg)
else:
logger.error("Empty src_ip or ext_ip; stopping Janus ...")
await self.__stop_janus()
prev_netcfg = netcfg
if netcfg != prev_netcfg:
logger.info("Got new %s", netcfg)
if netcfg.src_ip and netcfg.ext_ip:
await self.__stop_janus()
await self.__start_janus(netcfg)
else:
logger.error("Empty src_ip or ext_ip; stopping Janus ...")
await self.__stop_janus()
prev_netcfg = netcfg
await asyncio.sleep(self.__check_interval)
except: # noqa: E722
await self.__stop_janus()
raise
await asyncio.sleep(self.__check_interval)
async def __get_netcfg(self) -> _Netcfg:
src_ip = (self.__get_default_ip() or "0.0.0.0")
@ -136,12 +131,14 @@ class JanusRunner: # pylint: disable=too-many-instance-attributes
@aiotools.atomic
async def __start_janus(self, netcfg: _Netcfg) -> None:
get_logger(0).info("Starting Janus ...")
assert not self.__janus_task
self.__janus_task = asyncio.create_task(self.__janus_task_loop(netcfg))
@aiotools.atomic
async def __stop_janus(self) -> None:
if self.__janus_task:
get_logger(0).info("Stopping Janus ...")
self.__janus_task.cancel()
await asyncio.gather(self.__janus_task, return_exceptions=True)
await self.__kill_janus_proc()