better atomic ops

This commit is contained in:
Devaev Maxim 2019-06-05 06:30:21 +03:00
parent 234aa8bda4
commit 8aa333ba89
7 changed files with 83 additions and 29 deletions

48
kvmd/aiotools.py Normal file
View File

@ -0,0 +1,48 @@
# ========================================================================== #
# #
# KVMD - The main Pi-KVM daemon. #
# #
# Copyright (C) 2018 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
# ========================================================================== #
import asyncio
import functools
import typing
from typing import Callable
from typing import TypeVar
from typing import Any
# =====
_AtomicF = TypeVar("_AtomicF", bound=Callable[..., Any])
def atomic(method: _AtomicF) -> _AtomicF:
@functools.wraps(method)
async def wrapper(*args: object, **kwargs: object) -> Any:
return (await asyncio.shield(method(*args, **kwargs)))
return typing.cast(_AtomicF, wrapper)
def task(method: Callable[..., Any]) -> Callable[..., asyncio.Task]:
@functools.wraps(method)
async def wrapper(*args: object, **kwargs: object) -> asyncio.Task:
return asyncio.create_task(method(*args, **kwargs))
return typing.cast(Callable[..., asyncio.Task], wrapper)

View File

@ -30,6 +30,7 @@ from typing import Any
from ...logging import get_logger
from ... import aiotools
from ... import aioregion
from ... import gpio
@ -174,14 +175,10 @@ class Atx: # pylint: disable=too-many-instance-attributes
# =====
@aiotools.task
@aiotools.atomic
async def __click(self, pin: int, delay: float) -> None:
self.__region.enter()
asyncio.ensure_future(self.__inner_click(pin, delay))
async def __inner_click(self, pin: int, delay: float) -> None:
try:
for flag in (True, False):
with self.__region:
for flag in [True, False]:
gpio.write(pin, flag)
await asyncio.sleep(delay)
finally:
self.__region.exit()

View File

@ -26,6 +26,8 @@ from typing import List
from typing import Dict
from typing import Optional
from ... import aiotools
from ...logging import get_logger
from ...plugins.auth import BaseAuthService
@ -91,6 +93,7 @@ class AuthManager:
def check(self, token: str) -> Optional[str]:
return self.__tokens.get(token)
@aiotools.atomic
async def cleanup(self) -> None:
await self.__internal_service.cleanup()
if self.__external_service:

View File

@ -40,6 +40,7 @@ import setproctitle
from ...logging import get_logger
from ... import aiotools
from ... import gpio
from ... import keymap
@ -164,6 +165,8 @@ class Hid(multiprocessing.Process): # pylint: disable=too-many-instance-attribu
yield self.get_state()
await asyncio.sleep(self.__state_poll)
@aiotools.task
@aiotools.atomic
async def reset(self) -> None:
async with self.__lock:
gpio.write(self.__reset_pin, True)
@ -187,6 +190,7 @@ class Hid(multiprocessing.Process): # pylint: disable=too-many-instance-attribu
async with self.__lock:
self.__unsafe_clear_events()
@aiotools.atomic
async def cleanup(self) -> None:
async with self.__lock:
if self.is_alive():

View File

@ -42,6 +42,7 @@ import aiofiles.base
from ...logging import get_logger
from ... import aioregion
from ... import aiotools
from ... import gpio
@ -273,6 +274,7 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes
else:
await asyncio.sleep(60)
@aiotools.atomic
async def cleanup(self) -> None:
if self._enabled:
await self.__close_device_file()
@ -280,6 +282,7 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes
gpio.write(self.__reset_pin, False)
@_msd_working
@aiotools.atomic
async def connect_to_kvm(self, initial: bool=False) -> Dict:
with self.__region:
if self.__device_info:
@ -299,6 +302,7 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes
return state
@_msd_working
@aiotools.atomic
async def connect_to_pc(self) -> Dict:
with self.__region:
if not self.__device_info:
@ -311,15 +315,17 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes
return state
@_msd_working
@aiotools.task
@aiotools.atomic
async def reset(self) -> None:
with self.__region:
get_logger().info("Mass-storage device reset")
gpio.write(self.__reset_pin, True)
await asyncio.sleep(self.__reset_delay)
gpio.write(self.__reset_pin, False)
await self.__state_queue.put(self.get_state())
@_msd_working
@aiotools.atomic
async def __aenter__(self) -> "MassStorageDevice":
self.__region.enter()
try:
@ -332,6 +338,7 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes
await self.__state_queue.put(self.get_state())
self.__region.exit()
@aiotools.atomic
async def write_image_info(self, name: str, complete: bool) -> None:
assert self.__device_file
assert self.__device_info
@ -344,11 +351,13 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes
else:
get_logger().error("Can't write image info because device is full")
@aiotools.atomic
async def write_image_chunk(self, chunk: bytes) -> int:
await self.__write_to_device_file(chunk)
self.__written += len(chunk)
return self.__written
@aiotools.atomic
async def __aexit__(
self,
_exc_type: Type[BaseException],
@ -380,6 +389,6 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes
await self.__device_file.close()
except Exception:
get_logger().exception("Can't close mass-storage device file")
await self.reset()
await (await self.reset())
self.__device_file = None
self.__written = 0

View File

@ -155,13 +155,7 @@ _HEADER_AUTH_PASSWD = "X-KVMD-Passwd"
_COOKIE_AUTH_TOKEN = "auth_token"
def _atomic(handler: Callable) -> Callable:
async def wrapper(self: "Server", request: aiohttp.web.Request) -> aiohttp.web.Response:
return (await asyncio.shield(handler(self, request)))
return wrapper
def _exposed(http_method: str, path: str, atomic: bool=False, auth_required: bool=True) -> Callable:
def _exposed(http_method: str, path: str, auth_required: bool=True) -> Callable:
def make_wrapper(handler: Callable) -> Callable:
async def wrapper(self: "Server", request: aiohttp.web.Request) -> aiohttp.web.Response:
try:
@ -197,9 +191,6 @@ def _exposed(http_method: str, path: str, atomic: bool=False, auth_required: boo
except ForbiddenError as err:
return _json_exception(err, 403)
if atomic:
wrapper = _atomic(wrapper)
setattr(wrapper, _ATTR_EXPOSED, True)
setattr(wrapper, _ATTR_EXPOSED_METHOD, http_method)
setattr(wrapper, _ATTR_EXPOSED_PATH, path)
@ -311,7 +302,7 @@ class Server: # pylint: disable=too-many-instance-attributes
# ===== AUTH
@_exposed("POST", "/auth/login", atomic=True, auth_required=False)
@_exposed("POST", "/auth/login", auth_required=False)
async def __auth_login_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
credentials = await request.post()
token = await self._auth_manager.login(
@ -429,7 +420,7 @@ class Server: # pylint: disable=too-many-instance-attributes
async def __hid_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
return _json(self.__hid.get_state())
@_exposed("POST", "/hid/reset", atomic=True)
@_exposed("POST", "/hid/reset")
async def __hid_reset_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
await self.__hid.reset()
return _json()
@ -440,18 +431,18 @@ class Server: # pylint: disable=too-many-instance-attributes
async def __atx_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
return _json(self.__atx.get_state())
@_exposed("POST", "/atx/power", atomic=True)
@_exposed("POST", "/atx/power")
async def __atx_power_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
action = valid_atx_power_action(request.query.get("action"))
done = await ({
processing = await ({
"on": self.__atx.power_on,
"off": self.__atx.power_off,
"off_hard": self.__atx.power_off_hard,
"reset_hard": self.__atx.power_reset_hard,
}[action])()
return _json({"action": action, "done": done})
return _json({"processing": processing})
@_exposed("POST", "/atx/click", atomic=True)
@_exposed("POST", "/atx/click")
async def __atx_click_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
button = valid_atx_button(request.query.get("button"))
await ({
@ -459,7 +450,7 @@ class Server: # pylint: disable=too-many-instance-attributes
"power_long": self.__atx.click_power_long,
"reset": self.__atx.click_reset,
}[button])()
return _json({"clicked": button})
return _json()
# ===== MSD
@ -467,7 +458,7 @@ class Server: # pylint: disable=too-many-instance-attributes
async def __msd_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
return _json(self.__msd.get_state())
@_exposed("POST", "/msd/connect", atomic=True)
@_exposed("POST", "/msd/connect")
async def __msd_connect_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
to = valid_kvm_target(request.query.get("to"))
return _json(await ({
@ -500,7 +491,7 @@ class Server: # pylint: disable=too-many-instance-attributes
logger.info("Written %d bytes to mass-storage device", written)
return _json({"written": written})
@_exposed("POST", "/msd/reset", atomic=True)
@_exposed("POST", "/msd/reset")
async def __msd_reset_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
await self.__msd.reset()
return _json()

View File

@ -34,6 +34,7 @@ import aiohttp
from ...logging import get_logger
from ... import aiotools
from ... import gpio
from ... import __version__
@ -152,6 +153,7 @@ class Streamer: # pylint: disable=too-many-instance-attributes
(stdout, _) = await proc.communicate()
return stdout.decode(errors="ignore").strip()
@aiotools.atomic
async def cleanup(self) -> None:
if self.is_running():
await self.stop()