refactoring

This commit is contained in:
Maxim Devaev 2025-02-10 00:55:33 +02:00
parent 6ffaa8d6bd
commit 4c9c98c6ab
2 changed files with 18 additions and 18 deletions

View File

@ -66,22 +66,22 @@ async def _run_process(cmd: list[str], data_path: str) -> asyncio.subprocess.Pro
async def _run_cmd_ws(cmd: list[str], ws: aiohttp.ClientWebSocketResponse) -> int: # pylint: disable=too-many-branches async def _run_cmd_ws(cmd: list[str], ws: aiohttp.ClientWebSocketResponse) -> int: # pylint: disable=too-many-branches
logger = get_logger(0) logger = get_logger(0)
receive_task: (asyncio.Task | None) = None recv_task: (asyncio.Task | None) = None
proc_task: (asyncio.Task | None) = None proc_task: (asyncio.Task | None) = None
proc: (asyncio.subprocess.Process | None) = None # pylint: disable=no-member proc: (asyncio.subprocess.Process | None) = None # pylint: disable=no-member
try: # pylint: disable=too-many-nested-blocks try: # pylint: disable=too-many-nested-blocks
while True: while True:
if receive_task is None: if recv_task is None:
receive_task = asyncio.create_task(ws.receive()) recv_task = asyncio.create_task(ws.receive())
if proc_task is None and proc is not None: if proc_task is None and proc is not None:
proc_task = asyncio.create_task(proc.wait()) proc_task = asyncio.create_task(proc.wait())
tasks = list(filter(None, [receive_task, proc_task])) tasks = list(filter(None, [recv_task, proc_task]))
done = (await aiotools.wait_first(*tasks))[0] done = (await aiotools.wait_first(*tasks))[0]
if receive_task in done: if recv_task in done:
msg = receive_task.result() msg = recv_task.result()
if msg.type == aiohttp.WSMsgType.TEXT: if msg.type == aiohttp.WSMsgType.TEXT:
(event_type, event) = htserver.parse_ws_event(msg.data) (event_type, event) = htserver.parse_ws_event(msg.data)
if event_type == "storage": if event_type == "storage":
@ -98,15 +98,15 @@ async def _run_cmd_ws(cmd: list[str], ws: aiohttp.ClientWebSocketResponse) -> in
else: else:
logger.error("Unknown PST message type: %r", msg) logger.error("Unknown PST message type: %r", msg)
break break
receive_task = None recv_task = None
if proc_task in done: if proc_task in done:
break break
except Exception: except Exception:
logger.exception("Unhandled exception") logger.exception("Unhandled exception")
if receive_task is not None: if recv_task is not None:
receive_task.cancel() recv_task.cancel()
if proc_task is not None: if proc_task is not None:
proc_task.cancel() proc_task.cancel()
if proc is not None: if proc is not None:

View File

@ -140,19 +140,19 @@ class KvmdClientWs:
async def communicate(self) -> AsyncGenerator[tuple[str, dict], None]: # pylint: disable=too-many-branches async def communicate(self) -> AsyncGenerator[tuple[str, dict], None]: # pylint: disable=too-many-branches
assert not self.__communicated assert not self.__communicated
self.__communicated = True self.__communicated = True
receive_task: (asyncio.Task | None) = None recv_task: (asyncio.Task | None) = None
writer_task: (asyncio.Task | None) = None writer_task: (asyncio.Task | None) = None
try: try:
while True: while True:
if receive_task is None: if recv_task is None:
receive_task = asyncio.create_task(self.__ws.receive()) recv_task = asyncio.create_task(self.__ws.receive())
if writer_task is None: if writer_task is None:
writer_task = asyncio.create_task(self.__writer_queue.get()) writer_task = asyncio.create_task(self.__writer_queue.get())
done = (await aiotools.wait_first(receive_task, writer_task))[0] done = (await aiotools.wait_first(recv_task, writer_task))[0]
if receive_task in done: if recv_task in done:
msg = receive_task.result() msg = recv_task.result()
if msg.type == aiohttp.WSMsgType.TEXT: if msg.type == aiohttp.WSMsgType.TEXT:
yield htserver.parse_ws_event(msg.data) yield htserver.parse_ws_event(msg.data)
elif msg.type == aiohttp.WSMsgType.CLOSE: elif msg.type == aiohttp.WSMsgType.CLOSE:
@ -161,7 +161,7 @@ class KvmdClientWs:
break break
else: else:
raise RuntimeError(f"Unhandled WS message type: {msg!r}") raise RuntimeError(f"Unhandled WS message type: {msg!r}")
receive_task = None recv_task = None
if writer_task in done: if writer_task in done:
payload = writer_task.result() payload = writer_task.result()
@ -171,8 +171,8 @@ class KvmdClientWs:
await htserver.send_ws_event(self.__ws, *payload) await htserver.send_ws_event(self.__ws, *payload)
writer_task = None writer_task = None
finally: finally:
if receive_task: if recv_task:
receive_task.cancel() recv_task.cancel()
if writer_task: if writer_task:
writer_task.cancel() writer_task.cancel()
try: try: