fixed CancelledError on killing server

This commit is contained in:
Devaev Maxim 2019-06-08 06:12:11 +03:00
parent 77a7498731
commit 5181b09db8

View File

@ -87,6 +87,7 @@ class Streamer: # pylint: disable=too-many-instance-attributes
self.__cmd = cmd self.__cmd = cmd
self.__streamer_task: Optional[asyncio.Task] = None self.__streamer_task: Optional[asyncio.Task] = None
self.__streamer_proc: Optional[asyncio.subprocess.Process] = None # pylint: disable=no-member
self.__http_session: Optional[aiohttp.ClientSession] = None self.__http_session: Optional[aiohttp.ClientSession] = None
@ -174,12 +175,13 @@ class Streamer: # pylint: disable=too-many-instance-attributes
async def __inner_start(self) -> None: async def __inner_start(self) -> None:
assert not self.__streamer_task assert not self.__streamer_task
await self.__set_hw_enabled(True) await self.__set_hw_enabled(True)
self.__streamer_task = asyncio.create_task(self.__run_streamer()) self.__streamer_task = asyncio.create_task(self.__streamer_task_loop())
async def __inner_stop(self) -> None: async def __inner_stop(self) -> None:
assert self.__streamer_task assert self.__streamer_task
self.__streamer_task.cancel() self.__streamer_task.cancel()
await asyncio.gather(self.__streamer_task, return_exceptions=True) await asyncio.gather(self.__streamer_task, return_exceptions=True)
await self.__kill_streamer_proc()
await self.__set_hw_enabled(False) await self.__set_hw_enabled(False)
self.__streamer_task = None self.__streamer_task = None
@ -194,23 +196,15 @@ class Streamer: # pylint: disable=too-many-instance-attributes
if enabled: if enabled:
await asyncio.sleep(self.__init_delay) await asyncio.sleep(self.__init_delay)
async def __run_streamer(self) -> None: # pylint: disable=too-many-branches async def __streamer_task_loop(self) -> None: # pylint: disable=too-many-branches
logger = get_logger(0) logger = get_logger(0)
while True: # pylint: disable=too-many-nested-blocks while True: # pylint: disable=too-many-nested-blocks
proc: Optional[asyncio.subprocess.Process] = None # pylint: disable=no-member
try: try:
cmd = self.__make_cmd() await self.__start_streamer_proc()
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
preexec_fn=(lambda: signal.signal(signal.SIGINT, signal.SIG_IGN)),
)
logger.info("Started streamer pid=%d: %s", proc.pid, cmd)
empty = 0 empty = 0
async for line_bytes in proc.stdout: # type: ignore async for line_bytes in self.__streamer_proc.stdout: # type: ignore
line = line_bytes.decode(errors="ignore").strip() line = line_bytes.decode(errors="ignore").strip()
if line: if line:
logger.info("Streamer: %s", line) logger.info("Streamer: %s", line)
@ -226,18 +220,16 @@ class Streamer: # pylint: disable=too-many-instance-attributes
break break
except Exception as err: except Exception as err:
if proc: if self.__streamer_proc:
logger.exception("Unexpected streamer error: pid=%d", proc.pid) logger.exception("Unexpected streamer error: pid=%d", self.__streamer_proc.pid)
else: else:
logger.exception("Can't start streamer: %s", err) logger.exception("Can't start streamer: %s", err)
await self.__kill_streamer_proc()
await asyncio.sleep(1) await asyncio.sleep(1)
finally: async def __start_streamer_proc(self) -> None:
if proc and proc.returncode is None: assert self.__streamer_proc is None
await self.__kill(proc) cmd = [
def __make_cmd(self) -> List[str]:
return [
part.format( part.format(
host=self.__host, host=self.__host,
port=self.__port, port=self.__port,
@ -246,21 +238,35 @@ class Streamer: # pylint: disable=too-many-instance-attributes
) )
for part in self.__cmd for part in self.__cmd
] ]
self.__streamer_proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
preexec_fn=(lambda: signal.signal(signal.SIGINT, signal.SIG_IGN)),
)
get_logger(0).info("Started streamer pid=%d: %s", self.__streamer_proc.pid, cmd)
async def __kill(self, proc: asyncio.subprocess.Process) -> None: # pylint: disable=no-member async def __kill_streamer_proc(self) -> None:
try: logger = get_logger(0)
proc.terminate() if self.__streamer_proc and self.__streamer_proc.returncode is None:
await asyncio.sleep(1) try:
if proc.returncode is None: self.__streamer_proc.terminate()
try: await asyncio.sleep(1)
proc.kill() if self.__streamer_proc.returncode is None:
except Exception: try:
if proc.returncode is not None: self.__streamer_proc.kill()
raise except Exception:
await proc.wait() if self.__streamer_proc.returncode is not None:
get_logger().info("Streamer killed: pid=%d; retcode=%d", proc.pid, proc.returncode) raise
except Exception: await self.__streamer_proc.wait()
if proc.returncode is None: logger.info("Streamer killed: pid=%d; retcode=%d",
get_logger().exception("Can't kill streamer pid=%d", proc.pid) self.__streamer_proc.pid, self.__streamer_proc.returncode)
else: except asyncio.CancelledError:
get_logger().info("Streamer killed: pid=%d; retcode=%d", proc.pid, proc.returncode) pass
except Exception:
if self.__streamer_proc.returncode is None:
logger.exception("Can't kill streamer pid=%d", self.__streamer_proc.pid)
else:
logger.info("Streamer killed: pid=%d; retcode=%d",
self.__streamer_proc.pid, self.__streamer_proc.returncode)
self.__streamer_proc = None