atomic fixes, removed tasked and muted

This commit is contained in:
Devaev Maxim 2020-03-03 23:48:53 +03:00
parent 3b16242cfa
commit 552bb93212
6 changed files with 124 additions and 153 deletions

View File

@ -24,7 +24,6 @@ import os
import asyncio import asyncio
import asyncio.queues import asyncio.queues
import functools import functools
import contextlib
import types import types
import typing import typing
@ -32,7 +31,6 @@ import typing
from typing import List from typing import List
from typing import Callable from typing import Callable
from typing import Coroutine from typing import Coroutine
from typing import AsyncGenerator
from typing import Type from typing import Type
from typing import TypeVar from typing import TypeVar
from typing import Optional from typing import Optional
@ -59,27 +57,6 @@ def atomic(method: _MethodT) -> _MethodT:
return typing.cast(_MethodT, wrapper) return typing.cast(_MethodT, wrapper)
def muted(msg: str) -> Callable[[_MethodT], Callable[..., None]]:
def make_wrapper(method: _MethodT) -> Callable[..., None]:
@functools.wraps(method)
async def wrapper(*args: Any, **kwargs: Any) -> None:
try:
await method(*args, **kwargs)
except asyncio.CancelledError: # pylint: disable=try-except-raise
raise
except Exception:
get_logger(0).exception(msg)
return typing.cast(Callable[..., None], wrapper)
return make_wrapper
def tasked(method: Callable[..., Any]) -> Callable[..., asyncio.Task]:
@functools.wraps(method)
async def wrapper(*args: Any, **kwargs: Any) -> asyncio.Task:
return create_short_task(method(*args, **kwargs))
return typing.cast(Callable[..., asyncio.Task], wrapper)
# ===== # =====
def create_short_task(coro: Coroutine) -> asyncio.Task: def create_short_task(coro: Coroutine) -> asyncio.Task:
task = asyncio.create_task(coro) task = asyncio.create_task(coro)
@ -109,17 +86,6 @@ async def wait_infinite() -> None:
await asyncio.get_event_loop().create_future() await asyncio.get_event_loop().create_future()
# =====
@contextlib.asynccontextmanager
async def unlock_only_on_exception(lock: asyncio.Lock) -> AsyncGenerator[None, None]:
await lock.acquire()
try:
yield
except: # noqa: E722
lock.release()
raise
# ===== # =====
async def afile_write_now(afile: aiofiles.base.AiofilesContextManager, data: bytes) -> None: async def afile_write_now(afile: aiofiles.base.AiofilesContextManager, data: bytes) -> None:
await afile.write(data) await afile.write(data)
@ -154,6 +120,9 @@ class AioExclusiveRegion:
self.__busy = False self.__busy = False
def get_exc_type(self) -> Type[Exception]:
return self.__exc_type
def is_busy(self) -> bool: def is_busy(self) -> bool:
return self.__busy return self.__busy
@ -174,15 +143,6 @@ class AioExclusiveRegion:
if self.__notifier: if self.__notifier:
await self.__notifier.notify() await self.__notifier.notify()
@contextlib.asynccontextmanager
async def exit_only_on_exception(self) -> AsyncGenerator[None, None]:
await self.enter()
try:
yield
except: # noqa: E722
await self.exit()
raise
async def __aenter__(self) -> None: async def __aenter__(self) -> None:
await self.enter() await self.enter()
@ -194,3 +154,34 @@ class AioExclusiveRegion:
) -> None: ) -> None:
await self.exit() await self.exit()
async def run_region_task(
msg: str,
region: AioExclusiveRegion,
method: Callable[..., Coroutine[Any, Any, None]],
*args: Any,
**kwargs: Any,
) -> None:
entered = asyncio.Future() # type: ignore
async def wrapper() -> None:
try:
async with region:
entered.set_result(None)
await method(*args, **kwargs)
except asyncio.CancelledError: # pylint: disable=try-except-raise
raise
except region.get_exc_type():
raise
except Exception:
get_logger(0).exception(msg)
task = create_short_task(wrapper())
await asyncio.wait([entered, task], return_when=asyncio.FIRST_COMPLETED)
if entered.done():
return
if (exc := task.exception()) is not None: # noqa: E203,E231
raise exc

View File

@ -64,13 +64,10 @@ class WakeOnLan:
async def wakeup(self) -> None: async def wakeup(self) -> None:
if not self.__magic: if not self.__magic:
raise WolDisabledError() raise WolDisabledError()
await self.__inner_wakeup()
@aiotools.tasked
@aiotools.muted("Can't perform Wake-on-LAN or operation was not completed")
async def __inner_wakeup(self) -> None:
logger = get_logger(0) logger = get_logger(0)
logger.info("Waking up %s (%s:%s) using Wake-on-LAN ...", self.__mac, self.__ip, self.__port) logger.info("Waking up %s (%s:%s) using Wake-on-LAN ...", self.__mac, self.__ip, self.__port)
sock: Optional[socket.socket] = None sock: Optional[socket.socket] = None
try: try:
# TODO: IPv6 support: http://lists.cluenet.de/pipermail/ipv6-ops/2014-September/010139.html # TODO: IPv6 support: http://lists.cluenet.de/pipermail/ipv6-ops/2014-September/010139.html

View File

@ -162,19 +162,17 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes
@aiotools.atomic @aiotools.atomic
async def __click(self, name: str, pin: int, delay: float) -> None: async def __click(self, name: str, pin: int, delay: float) -> None:
async with self.__region.exit_only_on_exception(): await aiotools.run_region_task(
await self.__inner_click(name, pin, delay) "Can't perform ATX click or operation was not completed",
self.__region, self.__inner_click, name, pin, delay,
)
@aiotools.tasked @aiotools.atomic
@aiotools.muted("Can't perform ATX click or operation was not completed")
async def __inner_click(self, name: str, pin: int, delay: float) -> None: async def __inner_click(self, name: str, pin: int, delay: float) -> None:
try: try:
gpio.write(pin, True) gpio.write(pin, True)
await asyncio.sleep(delay) await asyncio.sleep(delay)
finally: finally:
try:
gpio.write(pin, False) gpio.write(pin, False)
await asyncio.sleep(1) await asyncio.sleep(1)
finally:
await self.__region.exit()
get_logger(0).info("Clicked ATX button %r", name) get_logger(0).info("Clicked ATX button %r", name)

View File

@ -157,7 +157,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
self.__retries_delay = retries_delay self.__retries_delay = retries_delay
self.__noop = noop self.__noop = noop
self.__lock = asyncio.Lock() self.__reset_wip = False
self.__events_queue: multiprocessing.queues.Queue = multiprocessing.Queue() self.__events_queue: multiprocessing.queues.Queue = multiprocessing.Queue()
@ -216,13 +216,9 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
@aiotools.atomic @aiotools.atomic
async def reset(self) -> None: async def reset(self) -> None:
async with aiotools.unlock_only_on_exception(self.__lock): if not self.__reset_wip:
await self.__inner_reset()
@aiotools.tasked
@aiotools.muted("Can't reset HID or operation was not completed")
async def __inner_reset(self) -> None:
try: try:
self.__reset_wip = True
gpio.write(self.__reset_pin, True) gpio.write(self.__reset_pin, True)
await asyncio.sleep(self.__reset_delay) await asyncio.sleep(self.__reset_delay)
finally: finally:
@ -230,13 +226,14 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
gpio.write(self.__reset_pin, False) gpio.write(self.__reset_pin, False)
await asyncio.sleep(1) await asyncio.sleep(1)
finally: finally:
self.__lock.release() self.__reset_wip = False
get_logger(0).info("Reset HID performed") get_logger().info("Reset HID performed")
else:
get_logger().info("Another reset HID in progress")
@aiotools.atomic @aiotools.atomic
async def cleanup(self) -> None: async def cleanup(self) -> None:
logger = get_logger(0) logger = get_logger(0)
async with self.__lock:
try: try:
if self.is_alive(): if self.is_alive():
logger.info("Stopping HID daemon ...") logger.info("Stopping HID daemon ...")
@ -272,8 +269,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
async def __queue_event(self, event: _BaseEvent) -> None: async def __queue_event(self, event: _BaseEvent) -> None:
if not self.__stop_event.is_set(): if not self.__stop_event.is_set():
async with self.__lock: self.__events_queue.put_nowait(event)
self.__events_queue.put(event)
def run(self) -> None: # pylint: disable=too-many-branches def run(self) -> None: # pylint: disable=too-many-branches
logger = get_logger(0) logger = get_logger(0)

View File

@ -352,6 +352,7 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
await self.__state_notifier.notify() await self.__state_notifier.notify()
return self.__new_file_written return self.__new_file_written
@aiotools.atomic
async def remove(self, name: str) -> None: async def remove(self, name: str) -> None:
async with self.__state.busy(): async with self.__state.busy():
assert self.__state.storage assert self.__state.storage

View File

@ -25,7 +25,6 @@ import stat
import fcntl import fcntl
import struct import struct
import asyncio import asyncio
import asyncio.queues
import contextlib import contextlib
import dataclasses import dataclasses
@ -173,15 +172,14 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
self.__init_retries = init_retries self.__init_retries = init_retries
self.__reset_delay = reset_delay self.__reset_delay = reset_delay
self.__region = aiotools.AioExclusiveRegion(MsdIsBusyError)
self.__device_info: Optional[_DeviceInfo] = None self.__device_info: Optional[_DeviceInfo] = None
self.__connected = False self.__connected = False
self.__device_file: Optional[aiofiles.base.AiofilesContextManager] = None self.__device_file: Optional[aiofiles.base.AiofilesContextManager] = None
self.__written = 0 self.__written = 0
self.__state_queue: asyncio.queues.Queue = asyncio.Queue() self.__state_notifier = aiotools.AioNotifier()
self.__region = aiotools.AioExclusiveRegion(MsdIsBusyError, self.__state_notifier)
logger = get_logger(0) logger = get_logger(0)
logger.info("Using %r as MSD", self.__device_path) logger.info("Using %r as MSD", self.__device_path)
@ -229,16 +227,22 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
} }
async def poll_state(self) -> AsyncGenerator[Dict, None]: async def poll_state(self) -> AsyncGenerator[Dict, None]:
prev_state: Dict = {}
while True: while True:
yield (await self.__state_queue.get()) state = await self.get_state()
if state != prev_state:
yield state
prev_state = state
await self.__state_notifier.wait()
@aiotools.atomic @aiotools.atomic
async def reset(self) -> None: async def reset(self) -> None:
async with self.__region.exit_only_on_exception(): await aiotools.run_region_task(
await self.__inner_reset() "Can't reset MSD or operation was not completed",
self.__region, self.__inner_reset,
)
@aiotools.tasked @aiotools.atomic
@aiotools.muted("Can't reset MSD or operation was not completed")
async def __inner_reset(self) -> None: async def __inner_reset(self) -> None:
try: try:
gpio.write(self.__reset_pin, True) gpio.write(self.__reset_pin, True)
@ -251,20 +255,19 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
await self.__load_device_info() await self.__load_device_info()
get_logger(0).info("MSD reset has been successful") get_logger(0).info("MSD reset has been successful")
finally: finally:
try:
gpio.write(self.__reset_pin, False) gpio.write(self.__reset_pin, False)
finally:
await self.__region.exit()
await self.__state_queue.put(await self.get_state())
@aiotools.atomic @aiotools.atomic
async def cleanup(self) -> None: async def cleanup(self) -> None:
try:
await self.__close_device_file() await self.__close_device_file()
finally:
gpio.write(self.__target_pin, False) gpio.write(self.__target_pin, False)
gpio.write(self.__reset_pin, False) gpio.write(self.__reset_pin, False)
# ===== # =====
@aiotools.atomic
async def set_params(self, name: Optional[str]=None, cdrom: Optional[bool]=None) -> None: async def set_params(self, name: Optional[str]=None, cdrom: Optional[bool]=None) -> None:
async with self.__working(): async with self.__working():
if name is not None: if name is not None:
@ -275,29 +278,20 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
@aiotools.atomic @aiotools.atomic
async def connect(self) -> None: async def connect(self) -> None:
async with self.__working(): async with self.__working():
notify = False
try:
async with self.__region: async with self.__region:
if self.__connected: if self.__connected:
raise MsdConnectedError() raise MsdConnectedError()
notify = True
gpio.write(self.__target_pin, True) gpio.write(self.__target_pin, True)
self.__connected = True self.__connected = True
get_logger(0).info("MSD switched to Server") get_logger(0).info("MSD switched to Server")
finally:
if notify:
await self.__state_queue.put(await self.get_state())
@aiotools.atomic @aiotools.atomic
async def disconnect(self) -> None: async def disconnect(self) -> None:
async with self.__working(): async with self.__working():
notify = False
try:
async with self.__region: async with self.__region:
if not self.__connected: if not self.__connected:
raise MsdDisconnectedError() raise MsdDisconnectedError()
notify = True
gpio.write(self.__target_pin, False) gpio.write(self.__target_pin, False)
try: try:
@ -308,14 +302,11 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
raise raise
self.__connected = False self.__connected = False
get_logger(0).info("MSD switched to KVM: %s", self.__device_info) get_logger(0).info("MSD switched to KVM: %s", self.__device_info)
finally:
if notify:
await self.__state_queue.put(await self.get_state())
@contextlib.asynccontextmanager @contextlib.asynccontextmanager
async def write_image(self, name: str) -> AsyncGenerator[None, None]: async def write_image(self, name: str) -> AsyncGenerator[None, None]:
async with self.__working(): async with self.__working():
await self.__region.enter() async with self.__region:
try: try:
assert self.__device_info assert self.__device_info
if self.__connected: if self.__connected:
@ -325,16 +316,12 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
self.__written = 0 self.__written = 0
await self.__write_image_info(name, complete=False) await self.__write_image_info(name, complete=False)
await self.__state_queue.put(await self.get_state()) await self.__state_notifier.notify()
yield yield
await self.__write_image_info(name, complete=True) await self.__write_image_info(name, complete=True)
finally: finally:
try:
await self.__close_device_file() await self.__close_device_file()
await self.__load_device_info() await self.__load_device_info()
finally:
await self.__region.exit()
await self.__state_queue.put(await self.get_state())
async def write_image_chunk(self, chunk: bytes) -> int: async def write_image_chunk(self, chunk: bytes) -> int:
assert self.__device_file assert self.__device_file
@ -342,6 +329,7 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
self.__written += len(chunk) self.__written += len(chunk)
return self.__written return self.__written
@aiotools.atomic
async def remove(self, name: str) -> None: async def remove(self, name: str) -> None:
async with self.__working(): async with self.__working():
raise MsdMultiNotSupported() raise MsdMultiNotSupported()