many fixes with asyncio

This commit is contained in:
Devaev Maxim 2019-06-19 04:15:43 +03:00
parent 376ab295bd
commit c7a2e445d0
7 changed files with 163 additions and 78 deletions

View File

@ -22,24 +22,50 @@
import asyncio
import functools
import contextlib
import typing
from typing import List
from typing import Callable
from typing import Coroutine
from typing import Generator
from typing import AsyncGenerator
from typing import TypeVar
from typing import Any
from . import aioregion
from .logging import get_logger
# =====
_AtomicF = TypeVar("_AtomicF", bound=Callable[..., Any])
_ATTR_SHORT_TASK = "_aiotools_short_task"
_MethodT = TypeVar("_MethodT", bound=Callable[..., Any])
_RetvalT = TypeVar("_RetvalT")
def atomic(method: _AtomicF) -> _AtomicF:
# =====
def atomic(method: _MethodT) -> _MethodT:
@functools.wraps(method)
async def wrapper(*args: Any, **kwargs: Any) -> Any:
return (await asyncio.shield(method(*args, **kwargs)))
return typing.cast(_AtomicF, wrapper)
return typing.cast(_MethodT, wrapper)
def muted(msg: str) -> Callable[[_MethodT], Callable[..., None]]:
def make_wrapper(method: _MethodT) -> Callable[..., None]:
@functools.wraps(method)
async def wrapper(*args: Any, **kwargs: Any) -> None:
try:
await method(*args, **kwargs)
except asyncio.CancelledError: # pylint: disable=try-except-raise
raise
except Exception:
get_logger(0).exception(msg)
return typing.cast(Callable[..., None], wrapper)
return make_wrapper
def tasked(method: Callable[..., Any]) -> Callable[..., asyncio.Task]:
@ -49,29 +75,46 @@ def tasked(method: Callable[..., Any]) -> Callable[..., asyncio.Task]:
return typing.cast(Callable[..., asyncio.Task], wrapper)
_ATTR_SHORT_TASK = "_aiotools_short_task"
# =====
def create_short_task(coro: Coroutine) -> asyncio.Task:
task = asyncio.create_task(coro)
setattr(task, _ATTR_SHORT_TASK, True)
return task
async def gather_short_tasks() -> None:
await asyncio.gather(*[
def get_short_tasks() -> List[asyncio.Task]:
return [
task
for task in asyncio.Task.all_tasks()
if getattr(task, _ATTR_SHORT_TASK, False)
])
_RetvalT = TypeVar("_RetvalT")
]
# =====
async def run_async(method: Callable[..., _RetvalT], *args: Any) -> _RetvalT:
return (await asyncio.get_running_loop().run_in_executor(None, method, *args))
def run_sync(coro: Coroutine[Any, Any, _RetvalT]) -> _RetvalT:
return asyncio.get_event_loop().run_until_complete(coro)
# =====
@contextlib.contextmanager
def unregion_only_on_exception(region: aioregion.AioExclusiveRegion) -> Generator[None, None, None]:
region.enter()
try:
yield
except: # noqa: E722
region.exit()
raise
@contextlib.asynccontextmanager
async def unlock_only_on_exception(lock: asyncio.Lock) -> AsyncGenerator[None, None]:
await lock.acquire()
try:
yield
except: # noqa: E722
lock.release()
raise

View File

@ -67,4 +67,4 @@ def main(argv: Optional[List[str]]=None) -> None:
streamer=Streamer(**config.streamer._unpack()),
).run(**config.server._unpack())
get_logger().info("Bye-bye")
get_logger(0).info("Bye-bye")

View File

@ -129,6 +129,16 @@ class Atx: # pylint: disable=too-many-instance-attributes
else:
await asyncio.sleep(60)
async def cleanup(self) -> None:
for (name, pin) in [
("power", self.__power_switch_pin),
("reset", self.__reset_switch_pin),
]:
try:
gpio.write(pin, False)
except Exception:
get_logger(0).exception("Can't cleanup %s pin %d", name, pin)
# =====
@_atx_working
@ -163,25 +173,33 @@ class Atx: # pylint: disable=too-many-instance-attributes
@_atx_working
async def click_power(self) -> None:
get_logger().info("Clicking power ...")
await self.__click(self.__power_switch_pin, self.__click_delay)
await self.__click("power", self.__power_switch_pin, self.__click_delay)
@_atx_working
async def click_power_long(self) -> None:
get_logger().info("Clicking power (long press) ...")
await self.__click(self.__power_switch_pin, self.__long_click_delay)
await self.__click("power_long", self.__power_switch_pin, self.__long_click_delay)
@_atx_working
async def click_reset(self) -> None:
get_logger().info("Clicking reset")
await self.__click(self.__reset_switch_pin, self.__click_delay)
await self.__click("reset", self.__reset_switch_pin, self.__click_delay)
# =====
@aiotools.tasked
@aiotools.atomic
async def __click(self, pin: int, delay: float) -> None:
with self.__region:
for flag in [True, False]:
gpio.write(pin, flag)
await asyncio.sleep(delay)
async def __click(self, name: str, pin: int, delay: float) -> None:
with aiotools.unregion_only_on_exception(self.__region):
await self.__inner_click(name, pin, delay)
@aiotools.tasked
@aiotools.muted("Can't perform ATX click or operation was not completed")
async def __inner_click(self, name: str, pin: int, delay: float) -> None:
try:
gpio.write(pin, True)
await asyncio.sleep(delay)
finally:
try:
gpio.write(pin, False)
await asyncio.sleep(1)
finally:
self.__region.exit()
get_logger(0).info("Clicked ATX button %r", name)

View File

@ -169,13 +169,24 @@ class Hid(multiprocessing.Process): # pylint: disable=too-many-instance-attribu
prev_state = state
await asyncio.sleep(self.__state_poll)
@aiotools.tasked
@aiotools.atomic
async def reset(self) -> None:
async with self.__lock:
async with aiotools.unlock_only_on_exception(self.__lock):
await self.__inner_reset()
@aiotools.tasked
@aiotools.muted("Can't reset HID or operation was not completed")
async def __inner_reset(self) -> None:
try:
gpio.write(self.__reset_pin, True)
await asyncio.sleep(self.__reset_delay)
gpio.write(self.__reset_pin, False)
finally:
try:
gpio.write(self.__reset_pin, False)
await asyncio.sleep(1)
finally:
self.__lock.release()
get_logger(0).info("Reset HID performed")
async def send_key_event(self, key: str, state: bool) -> None:
await self.__send_bool_event(_KeyEvent(key, state), self.__pressed_keys)
@ -196,17 +207,20 @@ class Hid(multiprocessing.Process): # pylint: disable=too-many-instance-attribu
@aiotools.atomic
async def cleanup(self) -> None:
logger = get_logger(0)
async with self.__lock:
if self.is_alive():
self.__unsafe_clear_events()
get_logger(0).info("Stopping HID daemon ...")
self.__stop_event.set()
else:
get_logger(0).warning("Emergency cleaning up HID events ...")
self.__emergency_clear_events()
if self.exitcode is not None:
self.join()
gpio.write(self.__reset_pin, False)
try:
if self.is_alive():
self.__unsafe_clear_events()
logger.info("Stopping HID daemon ...")
self.__stop_event.set()
else:
logger.warning("Emergency cleaning up HID events ...")
self.__emergency_clear_events()
if self.exitcode is not None:
self.join()
finally:
gpio.write(self.__reset_pin, False)
async def __send_bool_event(self, event: _BoolEvent, pressed: Set[str]) -> None:
if not self.__stop_event.is_set():

View File

@ -58,27 +58,27 @@ class MsdOperationError(MsdError):
class MsdDisabledError(MsdOperationError):
def __init__(self) -> None:
super().__init__("Mass-storage device is disabled")
super().__init__("MSD is disabled")
class MsdOfflineError(MsdOperationError):
def __init__(self) -> None:
super().__init__("Mass-storage device is not found")
super().__init__("MSD is not found")
class MsdAlreadyOnServerError(MsdOperationError):
def __init__(self) -> None:
super().__init__("Mass-storage is already connected to Server")
super().__init__("MSD is already connected to Server")
class MsdAlreadyOnKvmError(MsdOperationError):
def __init__(self) -> None:
super().__init__("Mass-storage is already connected to KVM")
super().__init__("MSD is already connected to KVM")
class MsdNotOnKvmError(MsdOperationError):
def __init__(self) -> None:
super().__init__("Mass-storage is not connected to KVM")
super().__init__("MSD is not connected to KVM")
class MsdIsBusyError(MsdOperationError, aioregion.RegionIsBusyError):
@ -226,16 +226,16 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes
logger = get_logger(0)
if self._enabled:
logger.info("Using %r as mass-storage device", self.__device_path)
logger.info("Using %r as MSD", self.__device_path)
try:
aiotools.run_sync(self.__load_device_info())
if self.__write_meta:
logger.info("Enabled image metadata writing")
except Exception as err:
log = (logger.error if isinstance(err, MsdError) else logger.exception)
log("Mass-storage device is offline: %s", err)
log("MSD is offline: %s", err)
else:
logger.info("Mass-storage device is disabled")
logger.info("MSD is disabled")
def get_state(self) -> Dict:
online = (self._enabled and bool(self._device_info))
@ -282,7 +282,7 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes
gpio.write(self.__target_pin, True)
raise
self.__on_kvm = True
get_logger().info("Mass-storage device switched to KVM: %s", self._device_info)
get_logger().info("MSD switched to KVM: %s", self._device_info)
state = self.get_state()
return state
@ -303,7 +303,7 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes
gpio.write(self.__target_pin, True)
self.__on_kvm = False
get_logger().info("Mass-storage device switched to Server")
get_logger().info("MSD switched to Server")
state = self.get_state()
return state
@ -311,28 +311,34 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes
if notify:
await self.__state_queue.put(state or self.get_state())
@aiotools.tasked
@aiotools.atomic
async def reset(self) -> None:
notify = False
if not self._enabled:
raise MsdDisabledError()
with aiotools.unregion_only_on_exception(self.__region):
await self.__inner_reset()
@aiotools.tasked
@aiotools.muted("Can't reset MSD or operation was not completed")
async def __inner_reset(self) -> None:
try:
with self.__region:
if not self._enabled:
raise MsdDisabledError()
notify = True
gpio.write(self.__reset_pin, True)
await asyncio.sleep(self.__reset_delay)
gpio.write(self.__reset_pin, False)
gpio.write(self.__reset_pin, True)
await asyncio.sleep(self.__reset_delay)
gpio.write(self.__target_pin, False)
self.__on_kvm = True
await asyncio.sleep(self.__reset_delay)
gpio.write(self.__reset_pin, False)
gpio.write(self.__target_pin, False)
self.__on_kvm = True
await self.__load_device_info()
get_logger(0).info("Mass-storage device reset has been successful")
await self.__load_device_info()
get_logger(0).info("MSD reset has been successful")
finally:
if notify:
await self.__state_queue.put(self.get_state())
try:
gpio.write(self.__reset_pin, False)
finally:
try:
await self.__state_queue.put(self.get_state())
finally:
self.__region.exit()
@_msd_working
@aiotools.atomic
@ -392,12 +398,12 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes
async def __close_device_file(self) -> None:
try:
if self.__device_file:
get_logger().info("Closing mass-storage device file ...")
get_logger().info("Closing device file ...")
await self.__device_file.close()
except asyncio.CancelledError: # pylint: disable=try-except-raise
raise
except Exception:
get_logger().exception("Can't close mass-storage device file")
get_logger().exception("Can't close device file")
finally:
self.__device_file = None
self.__written = 0

View File

@ -480,7 +480,7 @@ class Server: # pylint: disable=too-many-instance-attributes
data_field = await _get_multipart_field(reader, "image_data")
logger.info("Writing image %r to mass-storage device ...", image_name)
logger.info("Writing image %r to MSD ...", image_name)
await self.__msd.write_image_info(image_name, False)
while True:
chunk = await data_field.read_chunk(self.__msd.chunk_size)
@ -490,7 +490,7 @@ class Server: # pylint: disable=too-many-instance-attributes
await self.__msd.write_image_info(image_name, True)
finally:
if written != 0:
logger.info("Written %d bytes to mass-storage device", written)
logger.info("Written %d bytes to MSD", written)
return _json({"written": written})
@_exposed("POST", "/msd/reset")
@ -549,12 +549,14 @@ class Server: # pylint: disable=too-many-instance-attributes
logger = get_logger(0)
logger.info("Waiting short tasks ...")
await aiotools.gather_short_tasks()
await asyncio.gather(*aiotools.get_short_tasks(), return_exceptions=True)
logger.info("Cancelling system tasks ...")
for task in self.__system_tasks:
task.cancel()
await asyncio.gather(*self.__system_tasks)
logger.info("Waiting system tasks ...")
await asyncio.gather(*self.__system_tasks, return_exceptions=True)
logger.info("Disconnecting clients ...")
for ws in list(self.__sockets):
@ -566,15 +568,14 @@ class Server: # pylint: disable=too-many-instance-attributes
self._auth_manager,
self.__streamer,
self.__msd,
self.__atx,
self.__hid,
]:
logger.info("Cleaning up %s ...", type(obj).__name__)
try:
await obj.cleanup() # type: ignore
except asyncio.CancelledError: # pylint: disable=try-except-raise
raise
except Exception:
logger.exception("Cleanup error")
logger.exception("Cleanup error on %s", type(obj).__name__)
async def __broadcast_event(self, event_type: _Events, event_attrs: Dict) -> None:
if self.__sockets:

View File

@ -162,11 +162,14 @@ class Streamer: # pylint: disable=too-many-instance-attributes
@aiotools.atomic
async def cleanup(self) -> None:
if self.is_running():
await self.stop()
if self.__http_session:
await self.__http_session.close()
self.__http_session = None
try:
if self.is_running():
await self.stop()
if self.__http_session:
await self.__http_session.close()
self.__http_session = None
finally:
await self.__set_hw_enabled(False)
def __ensure_session(self) -> aiohttp.ClientSession:
if not self.__http_session: