exclusive region-based access for soma apis

This commit is contained in:
Devaev Maxim 2018-07-13 17:54:15 +00:00
parent 9e1f9ae853
commit e118d270df
5 changed files with 153 additions and 109 deletions

33
kvmd/kvmd/aioregion.py Normal file
View File

@ -0,0 +1,33 @@
import types
from typing import Type
# =====
class RegionIsBusyError(Exception):
pass
class AioExclusiveRegion:
def __init__(self) -> None:
self.__busy = False
def enter(self) -> None:
if not self.__busy:
self.__busy = True
return
raise RegionIsBusyError()
def exit(self) -> None:
self.__busy = False
def __enter__(self) -> None:
self.enter()
def __exit__(
self,
_exc_type: Type[BaseException],
_exc: BaseException,
_tb: types.TracebackType,
) -> None:
self.exit()

View File

@ -4,6 +4,7 @@ from typing import Dict
from .logging import get_logger
from . import aioregion
from . import gpio
@ -28,7 +29,7 @@ class Atx:
self.__click_delay = click_delay
self.__long_click_delay = long_click_delay
self.__lock = asyncio.Lock()
self.__region = aioregion.AioExclusiveRegion()
def get_state(self) -> Dict:
return {
@ -39,22 +40,19 @@ class Atx:
}
async def click_power(self) -> None:
if (await self.__click(self.__power_switch, self.__click_delay)):
get_logger().info("Clicked power")
await self.__click(self.__power_switch, self.__click_delay)
get_logger().info("Clicked power")
async def click_power_long(self) -> None:
if (await self.__click(self.__power_switch, self.__long_click_delay)):
get_logger().info("Clicked power (long press)")
await self.__click(self.__power_switch, self.__long_click_delay)
get_logger().info("Clicked power (long press)")
async def click_reset(self) -> None:
if (await self.__click(self.__reset_switch, self.__click_delay)):
get_logger().info("Clicked reset")
await self.__click(self.__reset_switch, self.__click_delay)
get_logger().info("Clicked reset")
async def __click(self, pin: int, delay: float) -> bool:
if not self.__lock.locked():
async with self.__lock:
for flag in (True, False):
gpio.write(pin, flag)
await asyncio.sleep(delay)
return True
return False
async def __click(self, pin: int, delay: float) -> None:
with self.__region:
for flag in (True, False):
gpio.write(pin, flag)
await asyncio.sleep(delay)

View File

@ -15,6 +15,8 @@ import pyudev
import aiofiles
import aiofiles.base
from . import aioregion
from .logging import get_logger
@ -151,14 +153,13 @@ def _explore_device(device_path: str) -> Optional[_MassStorageDeviceInfo]:
)
def _operated_and_locked(method: Callable) -> Callable:
def _msd_operated(method: Callable) -> Callable:
async def wrap(self: "MassStorageDevice", *args: Any, **kwargs: Any) -> Any:
if self._device_file: # pylint: disable=protected-access
raise IsBusyError()
if not self._device_path: # pylint: disable=protected-access
IsNotOperationalError()
async with self._lock: # pylint: disable=protected-access
return (await method(self, *args, **kwargs))
return (await method(self, *args, **kwargs))
return wrap
@ -178,7 +179,7 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes
self.__loop = loop
self.__device_info: Optional[_MassStorageDeviceInfo] = None
self._lock = asyncio.Lock()
self.__region = aioregion.AioExclusiveRegion()
self._device_file: Optional[aiofiles.base.AiofilesContextManager] = None
self.__written = 0
@ -198,23 +199,25 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes
else:
logger.warning("Mass-storage device is not operational")
@_operated_and_locked
@_msd_operated
async def connect_to_kvm(self, no_delay: bool=False) -> None:
if self.__device_info:
raise AlreadyConnectedToKvmError()
# TODO: disable gpio
if not no_delay:
await asyncio.sleep(self.__init_delay)
await self.__load_device_info()
get_logger().info("Mass-storage device switched to KVM: %s", self.__device_info)
with self.__region:
if self.__device_info:
raise AlreadyConnectedToKvmError()
# TODO: disable gpio
if not no_delay:
await asyncio.sleep(self.__init_delay)
await self.__load_device_info()
get_logger().info("Mass-storage device switched to KVM: %s", self.__device_info)
@_operated_and_locked
@_msd_operated
async def connect_to_pc(self) -> None:
if not self.__device_info:
raise AlreadyConnectedToPcError()
# TODO: enable gpio
self.__device_info = None
get_logger().info("Mass-storage device switched to Server")
with self.__region:
if not self.__device_info:
raise AlreadyConnectedToPcError()
# TODO: enable gpio
self.__device_info = None
get_logger().info("Mass-storage device switched to Server")
def get_state(self) -> Dict:
info = (self.__device_info._asdict() if self.__device_info else None)
@ -230,36 +233,34 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes
}
async def cleanup(self) -> None:
async with self._lock:
await self.__close_device_file()
# TODO: disable gpio
await self.__close_device_file()
# TODO: disable gpio
@_operated_and_locked
@_msd_operated
async def __aenter__(self) -> "MassStorageDevice":
if not self.__device_info:
raise IsNotConnectedToKvmError()
self._device_file = await aiofiles.open(self.__device_info.path, mode="w+b", buffering=0)
self.__written = 0
self.__region.enter()
return self
async def write_image_info(self, name: str, complete: bool) -> None:
async with self._lock:
assert self._device_file
assert self.__device_info
if self.__write_meta:
if self.__device_info.size - self.__written > _IMAGE_INFO_SIZE:
await self._device_file.seek(self.__device_info.size - _IMAGE_INFO_SIZE)
await self.__write_to_device_file(_make_image_info_bytes(name, self.__written, complete))
await self._device_file.seek(0)
await self.__load_device_info()
else:
get_logger().error("Can't write image info because device is full")
assert self._device_file
assert self.__device_info
if self.__write_meta:
if self.__device_info.size - self.__written > _IMAGE_INFO_SIZE:
await self._device_file.seek(self.__device_info.size - _IMAGE_INFO_SIZE)
await self.__write_to_device_file(_make_image_info_bytes(name, self.__written, complete))
await self._device_file.seek(0)
await self.__load_device_info()
else:
get_logger().error("Can't write image info because device is full")
async def write_image_chunk(self, chunk: bytes) -> int:
async with self._lock:
await self.__write_to_device_file(chunk)
self.__written += len(chunk)
return self.__written
await self.__write_to_device_file(chunk)
self.__written += len(chunk)
return self.__written
async def __aexit__(
self,
@ -267,8 +268,10 @@ class MassStorageDevice: # pylint: disable=too-many-instance-attributes
_exc: BaseException,
_tb: types.TracebackType,
) -> None:
async with self._lock:
try:
await self.__close_device_file()
finally:
self.__region.exit()
async def __write_to_device_file(self, data: bytes) -> None:
assert self._device_file

View File

@ -9,10 +9,11 @@ from typing import Dict
from typing import Set
from typing import Callable
from typing import Optional
from typing import Type
import aiohttp.web
from .aioregion import RegionIsBusyError
from .hid import Hid
from .atx import Atx
@ -38,35 +39,46 @@ def _system_task(method: Callable) -> Callable:
return wrap
class _BadRequest(Exception):
def _json(result: Optional[Dict]=None, status: int=200) -> aiohttp.web.Response:
return aiohttp.web.json_response({
"ok": (True if status == 200 else False),
"result": (result or {}),
}, status=status)
def _json_exception(msg: str, err: Exception, status: int) -> aiohttp.web.Response:
get_logger().error("%s: %s", msg, err)
return _json({
"error": type(err).__name__,
"error_msg": str(err),
}, status=status)
class BadRequest(Exception):
pass
def _exceptions_as_400(msg: str, exceptions: List[Type[Exception]]) -> Callable:
class PerformingAnotherOperation(Exception):
def __init__(self) -> None:
super().__init__("Performing another operation, please try again later")
def _wrap_exceptions_for_web(msg: str) -> Callable:
def make_wrapper(method: Callable) -> Callable:
async def wrap(self: "Server", request: aiohttp.web.Request) -> aiohttp.web.Response:
try:
return (await method(self, request))
except tuple(exceptions) as err: # pylint: disable=catching-non-exception
get_logger().error(msg)
return aiohttp.web.json_response({
"ok": False,
"result": {
"error": type(err).__name__,
"error_msg": str(err),
},
}, status=400)
try:
return (await method(self, request))
except RegionIsBusyError:
raise PerformingAnotherOperation()
except (BadRequest, MassStorageOperationError) as err:
return _json_exception(msg, err, 400)
except PerformingAnotherOperation as err:
return _json_exception(msg, err, 409)
return wrap
return make_wrapper
def _json_200(result: Optional[Dict]=None) -> aiohttp.web.Response:
return aiohttp.web.json_response({
"ok": True,
"result": (result or {}),
})
class Server: # pylint: disable=too-many-instance-attributes
def __init__(
self,
@ -159,9 +171,9 @@ class Server: # pylint: disable=too-many-instance-attributes
return ws
async def __atx_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
return _json_200(self.__atx.get_state())
return _json(self.__atx.get_state())
@_exceptions_as_400("Click error", [_BadRequest])
@_wrap_exceptions_for_web("Click error")
async def __atx_click_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
button = request.query.get("button")
if button == "power":
@ -171,13 +183,13 @@ class Server: # pylint: disable=too-many-instance-attributes
elif button == "reset":
await self.__atx.click_reset()
else:
raise _BadRequest("Missing or invalid 'button=%s'" % (button))
return _json_200({"clicked": button})
raise BadRequest("Missing or invalid 'button=%s'" % (button))
return _json({"clicked": button})
async def __msd_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
return _json_200(self.__msd.get_state())
return _json(self.__msd.get_state())
@_exceptions_as_400("Mass-storage error", [MassStorageOperationError, _BadRequest])
@_wrap_exceptions_for_web("Mass-storage error")
async def __msd_connect_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
to = request.query.get("to")
if to == "kvm":
@ -187,10 +199,10 @@ class Server: # pylint: disable=too-many-instance-attributes
await self.__msd.connect_to_pc()
await self.__broadcast_event("msd_state", state="connected_to_server") # type: ignore
else:
raise _BadRequest("Missing or invalid 'to=%s'" % (to))
return _json_200(self.__msd.get_state())
raise BadRequest("Missing or invalid 'to=%s'" % (to))
return _json(self.__msd.get_state())
@_exceptions_as_400("Can't write data to mass-storage device", [MassStorageOperationError, _BadRequest])
@_wrap_exceptions_for_web("Can't write data to mass-storage device")
async def __msd_write_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
logger = get_logger(0)
reader = await request.multipart()
@ -198,12 +210,12 @@ class Server: # pylint: disable=too-many-instance-attributes
try:
field = await reader.next()
if not field or field.name != "image_name":
raise _BadRequest("Missing 'image_name' field")
raise BadRequest("Missing 'image_name' field")
image_name = (await field.read()).decode("utf-8")[:256]
field = await reader.next()
if not field or field.name != "image_data":
raise _BadRequest("Missing 'image_data' field")
raise BadRequest("Missing 'image_data' field")
async with self.__msd:
await self.__broadcast_event("msd_state", state="busy") # type: ignore
@ -219,14 +231,14 @@ class Server: # pylint: disable=too-many-instance-attributes
finally:
if written != 0:
logger.info("written %d bytes to mass-storage device", written)
return _json_200({"written": written})
return _json({"written": written})
async def __streamer_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
return _json_200(self.__streamer.get_state())
return _json(self.__streamer.get_state())
async def __streamer_reset_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
self.__reset_streamer = True
return _json_200()
return _json()
def __run_app_print(self, text: str) -> None:
logger = get_logger()

View File

@ -154,17 +154,16 @@ function clickAtxButton(el_button) {
if (button && confirm(confirm_msg)) {
__setAtxButtonsBusy(true);
var http = new XMLHttpRequest();
http.open("POST", "/kvmd/atx/click?button=" + button, true);
http.onreadystatechange = function() {
var http = __request("POST", "/kvmd/atx/click?button=" + button, function() {
if (http.readyState == 4) {
if (http.status != 200) {
if (http.status == 409) {
alert("Performing another ATX operation for other client, please try again later");
} else if (http.status != 200) {
alert("Click error: " + http.responseText);
}
__setAtxButtonsBusy(false);
__setAtxButtonsBusy(false);
}
}
http.send();
});
}
}
@ -181,9 +180,7 @@ function __setAtxButtonsBusy(busy) {
// -----------------------------------------------------------------------------
function pollStreamer() {
var http = new XMLHttpRequest();
http.open("GET", "/streamer/?action=snapshot", true);
http.onreadystatechange = function() {
var http = __request("GET", "/streamer/?action=snapshot", function() {
if (http.readyState == 2 || http.readyState == 4) {
var status = http.status;
http.onreadystatechange = null;
@ -198,16 +195,13 @@ function pollStreamer() {
pollStreamer.last = true;
}
}
}
http.send();
});
setTimeout(pollStreamer, 2000);
}
pollStreamer.last = false;
function __refreshStreamer() {
var http = new XMLHttpRequest();
http.open("GET", "/kvmd/streamer", true);
http.onreadystatechange = function() {
var http = __request("GET", "/kvmd/streamer", function() {
if (http.readyState == 4 && http.status == 200) {
size = JSON.parse(http.responseText).result.size;
el_stream_box = document.getElementById("stream-image");
@ -215,27 +209,31 @@ function __refreshStreamer() {
el_stream_box.style.height = size.height + "px";
document.getElementById("stream-image").src = "/streamer/?action=stream&time=" + new Date().getTime();
}
}
http.send();
});
}
function clickResetStreamerButton(el_button) {
__setButtonBusy(el_button, true);
var http = new XMLHttpRequest();
http.open("POST", "/kvmd/streamer/reset", true);
http.onreadystatechange = function() {
var http = __request("POST", "/kvmd/streamer/reset", function() {
if (http.readyState == 4) {
if (http.status != 200) {
alert("Can't reset streamer: " + http.responseText);
}
__setButtonBusy(el_button, false);
}
}
http.send();
});
}
// -----------------------------------------------------------------------------
function __request(method, url, callback) {
var http = new XMLHttpRequest();
http.open(method, url, true)
http.onreadystatechange = callback;
http.send();
return http;
}
function __setButtonBusy(el_button, busy) {
el_button.disabled = busy;
el_button.style.cursor = (busy ? "wait" : "default");