proper usage of asyncio.wait() for first completed

This commit is contained in:
Devaev Maxim
2020-03-15 02:42:10 +03:00
parent eb419822cd
commit 5b58af4d6f
3 changed files with 17 additions and 8 deletions

View File

@@ -28,8 +28,11 @@ import types
import typing import typing
from typing import Tuple
from typing import List from typing import List
from typing import Set
from typing import Callable from typing import Callable
from typing import Awaitable
from typing import Coroutine from typing import Coroutine
from typing import Type from typing import Type
from typing import TypeVar from typing import TypeVar
@@ -86,6 +89,10 @@ async def wait_infinite() -> None:
await asyncio.get_event_loop().create_future() await asyncio.get_event_loop().create_future()
async def wait_first(*aws: Awaitable) -> Tuple[Set[asyncio.Future], Set[asyncio.Future]]:
return (await asyncio.wait(list(aws), return_when=asyncio.FIRST_COMPLETED))
# ===== # =====
async def afile_write_now(afile: aiofiles.base.AiofilesContextManager, data: bytes) -> None: async def afile_write_now(afile: aiofiles.base.AiofilesContextManager, data: bytes) -> None:
await afile.write(data) await afile.write(data)

View File

@@ -207,6 +207,7 @@ class Streamer: # pylint: disable=too-many-instance-attributes
get_logger(0).info("Installing SIGUSR2 streamer handler ...") get_logger(0).info("Installing SIGUSR2 streamer handler ...")
asyncio.get_event_loop().add_signal_handler(signal.SIGUSR2, signal_handler) asyncio.get_event_loop().add_signal_handler(signal.SIGUSR2, signal_handler)
waiter_task: Optional[asyncio.Task] = None
prev_state: Dict = {} prev_state: Dict = {}
while True: while True:
state = await self.get_state() state = await self.get_state()
@@ -214,10 +215,10 @@ class Streamer: # pylint: disable=too-many-instance-attributes
yield state yield state
prev_state = state prev_state = state
await asyncio.wait([ if waiter_task is None:
asyncio.sleep(self.__state_poll), waiter_task = asyncio.create_task(notifier.wait())
notifier.wait(), if waiter_task in (await aiotools.wait_first(asyncio.sleep(self.__state_poll), waiter_task))[0]:
], return_when=asyncio.FIRST_COMPLETED) waiter_task = None
async def get_info(self) -> Dict: async def get_info(self) -> Dict:
proc = await asyncio.create_subprocess_exec( proc = await asyncio.create_subprocess_exec(

View File

@@ -206,6 +206,7 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
async def poll_state(self) -> AsyncGenerator[Dict, None]: async def poll_state(self) -> AsyncGenerator[Dict, None]:
inotify_task = asyncio.create_task(self.__watch_inotify()) inotify_task = asyncio.create_task(self.__watch_inotify())
waiter_task: Optional[asyncio.Task] = None
prev_state: Dict = {} prev_state: Dict = {}
try: try:
while True: while True:
@@ -219,10 +220,10 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
yield state yield state
prev_state = state prev_state = state
await asyncio.wait([ if waiter_task is None:
inotify_task, waiter_task = asyncio.create_task(self.__state_notifier.wait())
self.__state_notifier.wait(), if waiter_task in (await aiotools.wait_first(inotify_task, waiter_task))[0]:
], return_when=asyncio.FIRST_COMPLETED) waiter_task = None
finally: finally:
if not inotify_task.done(): if not inotify_task.done():
inotify_task.cancel() inotify_task.cancel()