test implementation of mass-storage device

This commit is contained in:
Devaev Maxim 2018-07-02 09:09:25 +03:00
parent 61603c4f7b
commit 682a93e757
12 changed files with 336 additions and 20 deletions

View File

@ -13,6 +13,8 @@ depends=(
"python"
"python-yaml"
"python-aiohttp"
"python-aiofiles"
"python-pyudev"
"python-raspberry-gpio"
)
makedepends=("python-setuptools" "wget")

View File

@ -1,5 +1,7 @@
git+git://github.com/willbuckner/rpi-gpio-development-mock@master#egg=rpi
aiohttp
aiofiles
pyudev
pyyaml
bumpversion
tox

View File

@ -3,9 +3,10 @@ import asyncio
from .application import init
from .logging import get_logger
from .atx import Atx
from .streamer import Streamer
from .ps2 import Ps2Keyboard
from .atx import Atx
from .msd import MassStorageDevice
from .streamer import Streamer
from .server import Server
from . import gpio
@ -17,6 +18,12 @@ def main() -> None:
with gpio.bcm():
loop = asyncio.get_event_loop()
keyboard = Ps2Keyboard(
clock=int(config["keyboard"]["pinout"]["clock"]),
data=int(config["keyboard"]["pinout"]["data"]),
pulse=float(config["keyboard"]["pulse"]),
)
atx = Atx(
power_led=int(config["atx"]["leds"]["pinout"]["power"]),
hdd_led=int(config["atx"]["leds"]["pinout"]["hdd"]),
@ -26,6 +33,12 @@ def main() -> None:
long_click_delay=float(config["atx"]["switches"]["long_click_delay"]),
)
msd = MassStorageDevice(
bind=str(config["msd"]["bind"]),
init_delay=float(config["msd"]["init_delay"]),
loop=loop,
)
streamer = Streamer(
cap_power=int(config["video"]["pinout"]["cap"]),
conv_power=int(config["video"]["pinout"]["conv"]),
@ -34,19 +47,15 @@ def main() -> None:
loop=loop,
)
keyboard = Ps2Keyboard(
clock=int(config["keyboard"]["pinout"]["clock"]),
data=int(config["keyboard"]["pinout"]["data"]),
pulse=float(config["keyboard"]["pulse"]),
)
Server(
atx=atx,
streamer=streamer,
keyboard=keyboard,
atx=atx,
msd=msd,
streamer=streamer,
heartbeat=float(config["server"]["heartbeat"]),
atx_leds_poll=float(config["atx"]["leds"]["poll"]),
video_shutdown_delay=float(config["video"]["shutdown_delay"]),
msd_chunk_size=int(config["msd"]["chunk_size"]),
loop=loop,
).run(
host=str(config["server"]["host"]),

View File

@ -0,0 +1,20 @@
import argparse
from ...msd import explore_device
from ...msd import locate_by_bind
# =====
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("-d", "--device", default="/dev/sda")
options = parser.parse_args()
info = explore_device(options.device)
print("Path:", info.path)
print("Bind:", info.bind)
print("Size:", info.size)
print("Manufacturer:", info.manufacturer)
print("Product:", info.product)
print("Serial:", info.serial)
assert locate_by_bind(info.bind), "WTF?! Can't locate device file using bind %r" % (info.bind)

View File

@ -0,0 +1,2 @@
from . import main
main()

197
kvmd/kvmd/msd.py Normal file
View File

@ -0,0 +1,197 @@
import os
import asyncio
import types
from typing import Dict
from typing import NamedTuple
from typing import Callable
from typing import Type
from typing import Optional
from typing import Any
import pyudev
import aiofiles
import aiofiles.base
from .logging import get_logger
# =====
class MassStorageError(Exception):
pass
class MassStorageIsNotOperationalError(MassStorageError):
def __init__(self) -> None:
super().__init__("Missing bind for mass-storage device")
class MassStorageAlreadyConnectedToPcError(MassStorageError):
def __init__(self) -> None:
super().__init__("Mass-storage is already connected to PC")
class MassStorageAlreadyConnectedToKvmError(MassStorageError):
def __init__(self) -> None:
super().__init__("Mass-storage is already connected to KVM")
class MassStorageIsNotConnectedToKvmError(MassStorageError):
def __init__(self) -> None:
super().__init__("Mass-storage is not connected to KVM")
class MassStorageIsBusyError(MassStorageError):
def __init__(self) -> None:
super().__init__("Mass-storage is busy (write in progress)")
class DeviceInfo(NamedTuple):
path: str
bind: str
size: int
manufacturer: str
product: str
serial: str
def explore_device(path: str) -> DeviceInfo:
# udevadm info -a -p $(udevadm info -q path -n /dev/sda)
ctx = pyudev.Context()
block_device = pyudev.Devices.from_device_file(ctx, path)
size = block_device.attributes.asint("size") * 512
storage_device = block_device.find_parent("usb", "usb_interface")
assert storage_device.driver == "usb-storage", (storage_device.driver, storage_device)
usb_device = block_device.find_parent("usb", "usb_device")
assert usb_device.driver == "usb", (usb_device.driver, usb_device)
return DeviceInfo(
path=path,
bind=storage_device.sys_name,
size=size,
manufacturer=usb_device.attributes.asstring("manufacturer").strip(),
product=usb_device.attributes.asstring("product").strip(),
serial=usb_device.attributes.asstring("serial").strip(),
)
def locate_by_bind(bind: str) -> str:
ctx = pyudev.Context()
for device in ctx.list_devices(subsystem="block"):
storage_device = device.find_parent("usb", "usb_interface")
if storage_device:
try:
device.attributes.asint("partititon")
except KeyError:
if storage_device.sys_name == bind:
return os.path.join("/dev", device.sys_name)
return ""
def _operated_and_locked(method: Callable) -> Callable:
async def wrap(self: "MassStorageDevice", *args: Any, **kwargs: Any) -> Any:
if self._device_file: # pylint: disable=protected-access
raise MassStorageIsBusyError()
if not self._bind: # pylint: disable=protected-access
MassStorageIsNotOperationalError()
async with self._lock: # pylint: disable=protected-access
return (await method(self, *args, **kwargs))
return wrap
class MassStorageDevice:
def __init__(self, bind: str, init_delay: float, loop: asyncio.AbstractEventLoop) -> None:
self._bind = bind
self.__init_delay = init_delay
self.__device_info: Optional[DeviceInfo] = None
self._lock = asyncio.Lock()
self._device_file: Optional[aiofiles.base.AiofilesContextManager] = None
self.__writed = 0
if self._bind:
get_logger().info("Using bind %r as mass-storage device", self._bind)
try:
loop.run_until_complete(self.connect_to_kvm(no_delay=True))
except Exception:
get_logger().exception("Mass-storage device is not operational")
self._bind = ""
else:
get_logger().warning("Missing bind; mass-storage device is not operational")
@_operated_and_locked
async def connect_to_kvm(self, no_delay: bool=False) -> None:
if self.__device_info:
raise MassStorageAlreadyConnectedToKvmError()
# TODO: disable gpio
if not no_delay:
await asyncio.sleep(self.__init_delay)
path = locate_by_bind(self._bind)
if not path:
raise RuntimeError("Can't locate device by bind %r" % (self._bind))
self.__device_info = explore_device(path)
get_logger().info("Mass-storage device switched to KVM: %s", self.__device_info)
@_operated_and_locked
async def connect_to_pc(self) -> None:
if not self.__device_info:
raise MassStorageAlreadyConnectedToPcError()
# TODO: enable gpio
self.__device_info = None
get_logger().info("Mass-storage device switched to PC")
def get_state(self) -> Dict:
return {
"in_operate": bool(self._bind),
"connected_to": ("kvm" if self.__device_info else "pc"),
"is_busy": bool(self._device_file),
"writed": self.__writed,
"info": (self.__device_info._asdict() if self.__device_info else None),
}
async def cleanup(self) -> None:
async with self._lock:
await self.__close_file()
# TODO: disable gpio
@_operated_and_locked
async def __aenter__(self) -> "MassStorageDevice":
if not self.__device_info:
raise MassStorageIsNotConnectedToKvmError()
self._device_file = await aiofiles.open(self.__device_info.path, mode="wb", buffering=0)
self.__writed = 0
return self
async def write(self, data: bytes) -> int:
async with self._lock:
assert self._device_file
size = len(data)
await self._device_file.write(data)
await self._device_file.flush()
os.fsync(self._device_file.fileno())
self.__writed += size
return self.__writed
async def __aexit__(
self,
_exc_type: Type[BaseException],
_exc: BaseException,
_tb: types.TracebackType,
) -> None:
async with self._lock:
await self.__close_file()
async def __close_file(self) -> None:
try:
if self._device_file:
get_logger().info("Closing device file ...")
await self._device_file.close()
except Exception:
get_logger().exception("Can't close device file")
# TODO: reset device file
self._device_file = None
self.__writed = 0

View File

@ -32,6 +32,10 @@ class Ps2Keyboard(multiprocessing.Process):
def send_byte(self, code: int) -> None:
self.__queue.put(code)
def cleanup(self) -> None:
if self.is_alive():
self.stop()
def run(self) -> None:
with gpio.bcm():
try:

View File

@ -7,13 +7,19 @@ from typing import List
from typing import Set
from typing import Callable
from typing import Optional
from typing import Type
import aiohttp.web
from .atx import Atx
from .streamer import Streamer
from .ps2 import Ps2Keyboard
from .atx import Atx
from .msd import MassStorageError
from .msd import MassStorageDevice
from .streamer import Streamer
from .logging import get_logger
@ -30,24 +36,43 @@ def _system_task(method: Callable) -> Callable:
return wrap
def _exceptions_as_400(msg: str, exceptions: List[Type[Exception]]) -> Callable:
def make_wrapper(method: Callable) -> Callable:
async def wrap(self: "Server", request: aiohttp.web.Request) -> aiohttp.web.WebSocketResponse:
try:
return (await method(self, request))
except tuple(exceptions) as err: # pylint: disable=catching-non-exception
get_logger().exception(msg)
return aiohttp.web.json_response({
"error": type(err).__name__,
"error_msg": str(err),
}, status=400)
return wrap
return make_wrapper
class Server: # pylint: disable=too-many-instance-attributes
def __init__(
self,
atx: Atx,
streamer: Streamer,
keyboard: Ps2Keyboard,
atx: Atx,
msd: MassStorageDevice,
streamer: Streamer,
heartbeat: float,
atx_leds_poll: float,
video_shutdown_delay: float,
msd_chunk_size: int,
loop: asyncio.AbstractEventLoop,
) -> None:
self.__keyboard = keyboard
self.__atx = atx
self.__msd = msd
self.__streamer = streamer
self.__heartbeat = heartbeat
self.__keyboard = keyboard
self.__video_shutdown_delay = video_shutdown_delay
self.__atx_leds_poll = atx_leds_poll
self.__msd_chunk_size = msd_chunk_size
self.__loop = loop
self.__sockets: Set[aiohttp.web.WebSocketResponse] = set()
@ -63,6 +88,9 @@ class Server: # pylint: disable=too-many-instance-attributes
app = aiohttp.web.Application(loop=self.__loop)
app.router.add_get("/", self.__root_handler)
app.router.add_get("/ws", self.__ws_handler)
app.router.add_get("/msd", self.__msd_state_handler)
app.router.add_post("/msd/connect", self.__msd_connect_handler)
app.router.add_post("/msd/write", self.__msd_write_handler)
app.on_shutdown.append(self.__on_shutdown)
app.on_cleanup.append(self.__on_cleanup)
@ -91,6 +119,46 @@ class Server: # pylint: disable=too-many-instance-attributes
break
return ws
async def __msd_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
return aiohttp.web.json_response(self.__msd.get_state())
@_exceptions_as_400("Mass-storage error", [MassStorageError, RuntimeError])
async def __msd_connect_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
to = request.query.get("to")
if to == "kvm":
await self.__msd.connect_to_kvm()
await self.__broadcast("EVENT msd connected_to_kvm")
elif to == "pc":
await self.__msd.connect_to_pc()
await self.__broadcast("EVENT msd connected_to_pc")
else:
raise RuntimeError("Missing or invalid 'to=%s'" % (to))
return aiohttp.web.json_response(self.__msd.get_state())
@_exceptions_as_400("Can't write image to mass-storage device", [MassStorageError, RuntimeError, OSError])
async def __msd_write_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
logger = get_logger(0)
reader = await request.multipart()
writed = 0
try:
field = await reader.next()
if field.name != "image":
raise RuntimeError("Missing 'data' field")
async with self.__msd:
await self.__broadcast("EVENT msd busy")
logger.info("Writing image to mass-storage device ...")
while True:
chunk = await field.read_chunk(self.__msd_chunk_size)
if not chunk:
break
writed = await self.__msd.write(chunk)
await self.__broadcast("EVENT msd free")
finally:
if writed != 0:
logger.info("Writed %d bytes to mass-storage device", writed)
return aiohttp.web.json_response({"writed": writed})
def __run_app_print(self, text: str) -> None:
logger = get_logger()
for line in text.strip().splitlines():
@ -109,10 +177,9 @@ class Server: # pylint: disable=too-many-instance-attributes
await self.__remove_socket(ws)
async def __on_cleanup(self, _: aiohttp.web.Application) -> None:
if self.__keyboard.is_alive():
self.__keyboard.stop()
if self.__streamer.is_running():
await self.__streamer.stop()
self.__keyboard.cleanup()
await self.__streamer.cleanup()
await self.__msd.cleanup()
@_system_task
async def __keyboard_watchdog(self) -> None:

View File

@ -48,6 +48,10 @@ class Streamer: # pylint: disable=too-many-instance-attributes
def is_running(self) -> bool:
return bool(self.__proc_task)
async def cleanup(self) -> None:
if self.is_running():
await self.stop()
async def __set_hw_enabled(self, enabled: bool) -> None:
# XXX: This sequence is very important to enable converter and cap board
gpio.write(self.__cap_power, enabled)

View File

@ -1,3 +1,5 @@
RPi.GPIO
aiohttp
aiofiles
pyudev
pyyaml

View File

@ -24,6 +24,7 @@ def main() -> None:
"kvmd.extras",
"kvmd.extras.cleanup",
"kvmd.extras.wscli",
"kvmd.extras.exploremsd",
],
entry_points={
@ -31,6 +32,7 @@ def main() -> None:
"kvmd = kvmd:main",
"kvmd-cleanup = kvmd.extras.cleanup:main",
"kvmd-wscli = kvmd.extras.wscli:main",
"kvmd-exploremsd = kvmd.extras.exploremsd:main",
],
},

View File

@ -24,12 +24,17 @@ kvmd:
click_delay: 0.1
long_click_delay: 5.5
msd:
# FIXME: It's for laptop lol
bind: "1-2:1.0"
init_delay: 2.0
chunk_size: 512
video:
pinout:
cap: 21
conv: 25
sync_delay: 1.0
shutdown_delay: 10.0
cmd: