wait short tasks

This commit is contained in:
Devaev Maxim 2019-06-05 20:56:46 +03:00
parent 8aa333ba89
commit 6d7996924f
6 changed files with 32 additions and 9 deletions

View File

@ -26,6 +26,7 @@ import functools
import typing import typing
from typing import Callable from typing import Callable
from typing import Coroutine
from typing import TypeVar from typing import TypeVar
from typing import Any from typing import Any
@ -41,8 +42,25 @@ def atomic(method: _AtomicF) -> _AtomicF:
return typing.cast(_AtomicF, wrapper) return typing.cast(_AtomicF, wrapper)
def task(method: Callable[..., Any]) -> Callable[..., asyncio.Task]: def tasked(method: Callable[..., Any]) -> Callable[..., asyncio.Task]:
@functools.wraps(method) @functools.wraps(method)
async def wrapper(*args: object, **kwargs: object) -> asyncio.Task: async def wrapper(*args: object, **kwargs: object) -> asyncio.Task:
return asyncio.create_task(method(*args, **kwargs)) return create_short_task(method(*args, **kwargs))
return typing.cast(Callable[..., asyncio.Task], wrapper) return typing.cast(Callable[..., asyncio.Task], wrapper)
_ATTR_SHORT_TASK = "_aiotools_short_task"
def create_short_task(coro: Coroutine) -> asyncio.Task:
task = asyncio.create_task(coro)
setattr(task, _ATTR_SHORT_TASK, True)
return task
async def gather_short_tasks() -> None:
await asyncio.gather(*[
task
for task in asyncio.Task.all_tasks()
if getattr(task, _ATTR_SHORT_TASK, False)
])

View File

@ -175,7 +175,7 @@ class Atx: # pylint: disable=too-many-instance-attributes
# ===== # =====
@aiotools.task @aiotools.tasked
@aiotools.atomic @aiotools.atomic
async def __click(self, pin: int, delay: float) -> None: async def __click(self, pin: int, delay: float) -> None:
with self.__region: with self.__region:

View File

@ -165,7 +165,7 @@ class Hid(multiprocessing.Process): # pylint: disable=too-many-instance-attribu
yield self.get_state() yield self.get_state()
await asyncio.sleep(self.__state_poll) await asyncio.sleep(self.__state_poll)
@aiotools.task @aiotools.tasked
@aiotools.atomic @aiotools.atomic
async def reset(self) -> None: async def reset(self) -> None:
async with self.__lock: async with self.__lock:

View File

@ -315,7 +315,7 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes
return state return state
@_msd_working @_msd_working
@aiotools.task @aiotools.tasked
@aiotools.atomic @aiotools.atomic
async def reset(self) -> None: async def reset(self) -> None:
with self.__region: with self.__region:

View File

@ -40,6 +40,8 @@ import aiohttp
import aiohttp.web import aiohttp.web
import setproctitle import setproctitle
from ... import aiotools
from ...logging import get_logger from ...logging import get_logger
from ...aioregion import RegionIsBusyError from ...aioregion import RegionIsBusyError
@ -144,9 +146,9 @@ async def _get_multipart_field(reader: aiohttp.MultipartReader, name: str) -> ai
return field return field
_ATTR_EXPOSED = "exposed" _ATTR_EXPOSED = "_server_exposed"
_ATTR_EXPOSED_METHOD = "exposed_method" _ATTR_EXPOSED_METHOD = "_server_exposed_method"
_ATTR_EXPOSED_PATH = "exposed_path" _ATTR_EXPOSED_PATH = "_server_exposed_path"
_ATTR_SYSTEM_TASK = "system_task" _ATTR_SYSTEM_TASK = "system_task"
_HEADER_AUTH_USER = "X-KVMD-User" _HEADER_AUTH_USER = "X-KVMD-User"
@ -549,6 +551,9 @@ class Server: # pylint: disable=too-many-instance-attributes
async def __on_shutdown(self, _: aiohttp.web.Application) -> None: async def __on_shutdown(self, _: aiohttp.web.Application) -> None:
logger = get_logger(0) logger = get_logger(0)
logger.info("Waiting short tasks ...")
await aiotools.gather_short_tasks()
logger.info("Cancelling system tasks ...") logger.info("Cancelling system tasks ...")
for task in self.__system_tasks: for task in self.__system_tasks:
task.cancel() task.cancel()

View File

@ -172,7 +172,7 @@ class Streamer: # pylint: disable=too-many-instance-attributes
async def __inner_start(self) -> None: async def __inner_start(self) -> None:
assert not self.__streamer_task assert not self.__streamer_task
await self.__set_hw_enabled(True) await self.__set_hw_enabled(True)
self.__streamer_task = asyncio.get_running_loop().create_task(self.__run_streamer()) self.__streamer_task = asyncio.create_task(self.__run_streamer())
async def __inner_stop(self) -> None: async def __inner_stop(self) -> None:
assert self.__streamer_task assert self.__streamer_task