refactoring

This commit is contained in:
Devaev Maxim
2018-11-06 01:55:13 +03:00
parent 1ac968e924
commit f0ae427d8e
17 changed files with 109 additions and 113 deletions

0
kvmd/apps/__init__.py Normal file
View File

View File

@@ -0,0 +1,40 @@
import os
import subprocess
import time
from ...application import init
from ...logging import get_logger
from ... import gpio
# =====
def main() -> None:
config = init()
logger = get_logger(0)
logger.info("Cleaning up ...")
with gpio.bcm():
for (name, pin) in [
("hid_reset", config["hid"]["pinout"]["reset"]),
("msd_target", config["msd"]["pinout"]["target"]),
("msd_reset", config["msd"]["pinout"]["reset"]),
("atx_power_switch", config["atx"]["pinout"]["power_switch"]),
("atx_reset_switch", config["atx"]["pinout"]["reset_switch"]),
("streamer_cap", config["streamer"]["pinout"]["cap"]),
("streamer_conv", config["streamer"]["pinout"]["conv"]),
]:
if pin > 0:
logger.info("Writing value=0 to pin=%d (%s)", pin, name)
gpio.set_output(pin, initial=False)
streamer = os.path.basename(config["streamer"]["cmd"][0])
logger.info("Trying to find and kill %r ...", streamer)
try:
subprocess.check_output(["killall", streamer], stderr=subprocess.STDOUT)
time.sleep(3)
subprocess.check_output(["killall", "-9", streamer], stderr=subprocess.STDOUT)
except subprocess.CalledProcessError:
pass
logger.info("Bye-bye")

View File

@@ -0,0 +1,2 @@
from . import main
main()

View File

@@ -0,0 +1,81 @@
import asyncio
from ...application import init
from ...logging import get_logger
from ...logging import Log
from ... import gpio
from .hid import Hid
from .atx import Atx
from .msd import MassStorageDevice
from .streamer import Streamer
from .server import Server
# =====
def main() -> None:
config = init()
with gpio.bcm():
loop = asyncio.get_event_loop()
log = Log(
services=list(config["log"]["services"]),
loop=loop,
)
hid = Hid(
reset=int(config["hid"]["pinout"]["reset"]),
device_path=str(config["hid"]["device"]),
speed=int(config["hid"]["speed"]),
reset_delay=float(config["hid"]["reset_delay"]),
)
atx = Atx(
power_led=int(config["atx"]["pinout"]["power_led"]),
hdd_led=int(config["atx"]["pinout"]["hdd_led"]),
power_switch=int(config["atx"]["pinout"]["power_switch"]),
reset_switch=int(config["atx"]["pinout"]["reset_switch"]),
click_delay=float(config["atx"]["click_delay"]),
long_click_delay=float(config["atx"]["long_click_delay"]),
)
msd = MassStorageDevice(
target=int(config["msd"]["pinout"]["target"]),
reset=int(config["msd"]["pinout"]["reset"]),
device_path=str(config["msd"]["device"]),
init_delay=float(config["msd"]["init_delay"]),
reset_delay=float(config["msd"]["reset_delay"]),
write_meta=bool(config["msd"]["write_meta"]),
loop=loop,
)
streamer = Streamer(
cap_power=int(config["streamer"]["pinout"]["cap"]),
conv_power=int(config["streamer"]["pinout"]["conv"]),
sync_delay=float(config["streamer"]["sync_delay"]),
init_delay=float(config["streamer"]["init_delay"]),
init_restart_after=float(config["streamer"]["init_restart_after"]),
quality=int(config["streamer"]["quality"]),
soft_fps=int(config["streamer"]["soft_fps"]),
cmd=list(map(str, config["streamer"]["cmd"])),
loop=loop,
)
Server(
log=log,
hid=hid,
atx=atx,
msd=msd,
streamer=streamer,
heartbeat=float(config["server"]["heartbeat"]),
atx_state_poll=float(config["atx"]["state_poll"]),
streamer_shutdown_delay=float(config["streamer"]["shutdown_delay"]),
msd_chunk_size=int(config["msd"]["chunk_size"]),
loop=loop,
).run(
host=str(config["server"]["host"]),
port=int(config["server"]["port"]),
)
get_logger().info("Bye-bye")

View File

@@ -0,0 +1,2 @@
from . import main
main()

63
kvmd/apps/kvmd/atx.py Normal file
View File

@@ -0,0 +1,63 @@
import asyncio
from typing import Dict
from ...logging import get_logger
from ... import aioregion
from ... import gpio
# =====
class AtxIsBusy(aioregion.RegionIsBusyError):
pass
class Atx:
def __init__(
self,
power_led: int,
hdd_led: int,
power_switch: int,
reset_switch: int,
click_delay: float,
long_click_delay: float,
) -> None:
self.__power_led = gpio.set_input(power_led)
self.__hdd_led = gpio.set_input(hdd_led)
self.__power_switch = gpio.set_output(power_switch)
self.__reset_switch = gpio.set_output(reset_switch)
self.__click_delay = click_delay
self.__long_click_delay = long_click_delay
self.__region = aioregion.AioExclusiveRegion(AtxIsBusy)
def get_state(self) -> Dict:
return {
"busy": self.__region.is_busy(),
"leds": {
"power": (not gpio.read(self.__power_led)),
"hdd": (not gpio.read(self.__hdd_led)),
},
}
async def click_power(self) -> None:
await self.__click(self.__power_switch, self.__click_delay)
get_logger().info("Clicked power")
async def click_power_long(self) -> None:
await self.__click(self.__power_switch, self.__long_click_delay)
get_logger().info("Clicked power (long press)")
async def click_reset(self) -> None:
await self.__click(self.__reset_switch, self.__click_delay)
get_logger().info("Clicked reset")
async def __click(self, pin: int, delay: float) -> None:
with self.__region:
for flag in (True, False):
gpio.write(pin, flag)
await asyncio.sleep(delay)

213
kvmd/apps/kvmd/hid.py Normal file
View File

@@ -0,0 +1,213 @@
import asyncio
import multiprocessing
import multiprocessing.queues
import queue
import struct
import pkgutil
import time
from typing import Dict
from typing import Set
from typing import NamedTuple
import yaml
import serial
import setproctitle
from ...logging import get_logger
from ... import gpio
# =====
def _get_keymap() -> Dict[str, int]:
return yaml.load(pkgutil.get_data("kvmd", "data/keymap.yaml").decode()) # type: ignore
_KEYMAP = _get_keymap()
class _KeyEvent(NamedTuple):
key: str
state: bool
class _MouseMoveEvent(NamedTuple):
to_x: int
to_y: int
class _MouseButtonEvent(NamedTuple):
button: str
state: bool
class _MouseWheelEvent(NamedTuple):
delta_y: int
class Hid(multiprocessing.Process): # pylint: disable=too-many-instance-attributes
def __init__(
self,
reset: int,
device_path: str,
speed: int,
reset_delay: float,
) -> None:
super().__init__(daemon=True)
self.__reset = gpio.set_output(reset)
self.__device_path = device_path
self.__speed = speed
self.__reset_delay = reset_delay
self.__pressed_keys: Set[str] = set()
self.__pressed_mouse_buttons: Set[str] = set()
self.__lock = asyncio.Lock()
self.__queue: multiprocessing.queues.Queue = multiprocessing.Queue()
self.__stop_event = multiprocessing.Event()
def start(self) -> None:
get_logger().info("Starting HID daemon ...")
super().start()
async def reset(self) -> None:
async with self.__lock:
gpio.write(self.__reset, True)
await asyncio.sleep(self.__reset_delay)
gpio.write(self.__reset, False)
async def send_key_event(self, key: str, state: bool) -> None:
if not self.__stop_event.is_set():
async with self.__lock:
if state and key not in self.__pressed_keys:
self.__pressed_keys.add(key)
self.__queue.put(_KeyEvent(key, state))
elif not state and key in self.__pressed_keys:
self.__pressed_keys.remove(key)
self.__queue.put(_KeyEvent(key, state))
async def send_mouse_move_event(self, to_x: int, to_y: int) -> None:
if not self.__stop_event.is_set():
async with self.__lock:
self.__queue.put(_MouseMoveEvent(to_x, to_y))
async def send_mouse_button_event(self, button: str, state: bool) -> None:
if not self.__stop_event.is_set():
async with self.__lock:
if state and button not in self.__pressed_mouse_buttons:
self.__pressed_mouse_buttons.add(button)
self.__queue.put(_MouseButtonEvent(button, state))
elif not state and button in self.__pressed_mouse_buttons:
self.__pressed_mouse_buttons.remove(button)
self.__queue.put(_MouseButtonEvent(button, state))
async def send_mouse_wheel_event(self, delta_y: int) -> None:
if not self.__stop_event.is_set():
async with self.__lock:
self.__queue.put(_MouseWheelEvent(delta_y))
async def clear_events(self) -> None:
if not self.__stop_event.is_set():
async with self.__lock:
self.__unsafe_clear_events()
async def cleanup(self) -> None:
async with self.__lock:
if self.is_alive():
self.__unsafe_clear_events()
get_logger().info("Stopping keyboard daemon ...")
self.__stop_event.set()
self.join()
else:
get_logger().warning("Emergency cleaning up HID events ...")
self.__emergency_clear_events()
gpio.write(self.__reset, False)
def __unsafe_clear_events(self) -> None:
for button in self.__pressed_mouse_buttons:
self.__queue.put(_MouseButtonEvent(button, False))
self.__pressed_mouse_buttons.clear()
for key in self.__pressed_keys:
self.__queue.put(_KeyEvent(key, False))
self.__pressed_keys.clear()
def __emergency_clear_events(self) -> None:
try:
with serial.Serial(self.__device_path, self.__speed) as tty:
self.__send_clear_hid(tty)
except Exception:
get_logger().exception("Can't execute emergency clear HID events")
def run(self) -> None: # pylint: disable=too-many-branches
setproctitle.setproctitle("[hid] " + setproctitle.getproctitle())
try:
with serial.Serial(self.__device_path, self.__speed) as tty:
hid_ready = False
while True:
if hid_ready:
try:
event = self.__queue.get(timeout=0.05)
except queue.Empty:
pass
else:
if isinstance(event, _KeyEvent):
self.__send_key_event(tty, event)
elif isinstance(event, _MouseMoveEvent):
self.__send_mouse_move_event(tty, event)
elif isinstance(event, _MouseButtonEvent):
self.__send_mouse_button_event(tty, event)
elif isinstance(event, _MouseWheelEvent):
self.__send_mouse_wheel_event(tty, event)
else:
raise RuntimeError("Unknown HID event")
hid_ready = False
if tty.in_waiting:
while tty.in_waiting:
tty.read(tty.in_waiting)
hid_ready = True
else:
time.sleep(0.05)
if self.__stop_event.is_set() and self.__queue.qsize() == 0:
break
except Exception:
get_logger().exception("Unhandled exception")
raise
def __send_key_event(self, tty: serial.Serial, event: _KeyEvent) -> None:
code = _KEYMAP.get(event.key)
if code:
key_bytes = bytes([code])
assert len(key_bytes) == 1, (event, key_bytes)
tty.write(
b"\01"
+ key_bytes
+ (b"\01" if event.state else b"\00")
+ b"\00\00"
)
def __send_mouse_move_event(self, tty: serial.Serial, event: _MouseMoveEvent) -> None:
to_x = min(max(-32768, event.to_x), 32767)
to_y = min(max(-32768, event.to_y), 32767)
tty.write(b"\02" + struct.pack(">hh", to_x, to_y))
def __send_mouse_button_event(self, tty: serial.Serial, event: _MouseButtonEvent) -> None:
if event.button == "left":
code = (0b10000000 | (0b00001000 if event.state else 0))
elif event.button == "right":
code = (0b01000000 | (0b00000100 if event.state else 0))
else:
code = 0
if code:
tty.write(b"\03" + bytes([code]) + b"\00\00\00")
def __send_mouse_wheel_event(self, tty: serial.Serial, event: _MouseWheelEvent) -> None:
delta_y = min(max(-128, event.delta_y), 127)
tty.write(b"\04\00" + struct.pack(">b", delta_y) + b"\00\00")
def __send_clear_hid(self, tty: serial.Serial) -> None:
tty.write(b"\00\00\00\00\00")

322
kvmd/apps/kvmd/msd.py Normal file
View File

@@ -0,0 +1,322 @@
import os
import struct
import asyncio
import types
from typing import Dict
from typing import NamedTuple
from typing import Callable
from typing import Type
from typing import Optional
from typing import Any
import pyudev
import aiofiles
import aiofiles.base
from ...logging import get_logger
from ... import aioregion
from ... import gpio
# =====
class MsdError(Exception):
pass
class MsdOperationError(MsdError):
pass
class MsdIsNotOperationalError(MsdOperationError):
def __init__(self) -> None:
super().__init__("Missing path for mass-storage device")
class MsdAlreadyConnectedToPcError(MsdOperationError):
def __init__(self) -> None:
super().__init__("Mass-storage is already connected to Server")
class MsdAlreadyConnectedToKvmError(MsdOperationError):
def __init__(self) -> None:
super().__init__("Mass-storage is already connected to KVM")
class MsdIsNotConnectedToKvmError(MsdOperationError):
def __init__(self) -> None:
super().__init__("Mass-storage is not connected to KVM")
class MsdIsBusyError(MsdOperationError, aioregion.RegionIsBusyError):
pass
# =====
class _HardwareInfo(NamedTuple):
manufacturer: str
product: str
serial: str
class _ImageInfo(NamedTuple):
name: str
size: int
complete: bool
class _MassStorageDeviceInfo(NamedTuple):
path: str
real: str
size: int
hw: Optional[_HardwareInfo]
image: Optional[_ImageInfo]
_IMAGE_INFO_SIZE = 4096
_IMAGE_INFO_MAGIC_SIZE = 16
_IMAGE_INFO_IMAGE_NAME_SIZE = 256
_IMAGE_INFO_PADS_SIZE = _IMAGE_INFO_SIZE - _IMAGE_INFO_IMAGE_NAME_SIZE - 1 - 8 - _IMAGE_INFO_MAGIC_SIZE * 8
_IMAGE_INFO_FORMAT = ">%dL%dc?Q%dx%dL" % (
_IMAGE_INFO_MAGIC_SIZE,
_IMAGE_INFO_IMAGE_NAME_SIZE,
_IMAGE_INFO_PADS_SIZE,
_IMAGE_INFO_MAGIC_SIZE,
)
_IMAGE_INFO_MAGIC = [0x1ACE1ACE] * _IMAGE_INFO_MAGIC_SIZE
def _make_image_info_bytes(name: str, size: int, complete: bool) -> bytes:
return struct.pack(
_IMAGE_INFO_FORMAT,
*_IMAGE_INFO_MAGIC,
*memoryview(( # type: ignore
name.encode("utf-8")
+ b"\x00" * _IMAGE_INFO_IMAGE_NAME_SIZE
)[:_IMAGE_INFO_IMAGE_NAME_SIZE]).cast("c"),
complete,
size,
*_IMAGE_INFO_MAGIC,
)
def _parse_image_info_bytes(data: bytes) -> Optional[_ImageInfo]:
try:
parsed = list(struct.unpack(_IMAGE_INFO_FORMAT, data))
except struct.error:
pass
else:
magic_begin = parsed[:_IMAGE_INFO_MAGIC_SIZE]
magic_end = parsed[-_IMAGE_INFO_MAGIC_SIZE:]
if magic_begin == magic_end == _IMAGE_INFO_MAGIC:
image_name_bytes = b"".join(parsed[_IMAGE_INFO_MAGIC_SIZE:_IMAGE_INFO_MAGIC_SIZE + _IMAGE_INFO_IMAGE_NAME_SIZE])
return _ImageInfo(
name=image_name_bytes.decode("utf-8", errors="ignore").strip("\x00").strip(),
size=parsed[_IMAGE_INFO_MAGIC_SIZE + _IMAGE_INFO_IMAGE_NAME_SIZE + 1],
complete=parsed[_IMAGE_INFO_MAGIC_SIZE + _IMAGE_INFO_IMAGE_NAME_SIZE],
)
return None
def _explore_device(device_path: str) -> Optional[_MassStorageDeviceInfo]:
# udevadm info -a -p $(udevadm info -q path -n /dev/sda)
ctx = pyudev.Context()
device = pyudev.Devices.from_device_file(ctx, device_path)
if device.subsystem != "block":
return None
try:
size = device.attributes.asint("size") * 512
except KeyError:
return None
hw_info: Optional[_HardwareInfo] = None
usb_device = device.find_parent("usb", "usb_device")
if usb_device:
hw_info = _HardwareInfo(**{
attr: usb_device.attributes.asstring(attr).strip()
for attr in ["manufacturer", "product", "serial"]
})
with open(device_path, "rb") as device_file:
device_file.seek(size - _IMAGE_INFO_SIZE)
image_info = _parse_image_info_bytes(device_file.read())
return _MassStorageDeviceInfo(
path=device_path,
real=os.path.realpath(device_path),
size=size,
image=image_info,
hw=hw_info,
)
def _msd_operated(method: Callable) -> Callable:
async def wrap(self: "MassStorageDevice", *args: Any, **kwargs: Any) -> Any:
if not self._device_path: # pylint: disable=protected-access
MsdIsNotOperationalError()
return (await method(self, *args, **kwargs))
return wrap
# =====
class MassStorageDevice: # pylint: disable=too-many-instance-attributes
def __init__(
self,
target: int,
reset: int,
device_path: str,
init_delay: float,
reset_delay: float,
write_meta: bool,
loop: asyncio.AbstractEventLoop,
) -> None:
self.__target = gpio.set_output(target)
self.__reset = gpio.set_output(reset)
self._device_path = device_path
self.__init_delay = init_delay
self.__reset_delay = reset_delay
self.__write_meta = write_meta
self.__loop = loop
self.__device_info: Optional[_MassStorageDeviceInfo] = None
self.__saved_device_info: Optional[_MassStorageDeviceInfo] = None
self.__region = aioregion.AioExclusiveRegion(MsdIsBusyError)
self.__device_file: Optional[aiofiles.base.AiofilesContextManager] = None
self.__written = 0
logger = get_logger(0)
if self._device_path:
logger.info("Using %r as mass-storage device", self._device_path)
try:
logger.info("Enabled image metadata writing")
loop.run_until_complete(self.connect_to_kvm(no_delay=True))
except Exception as err:
if isinstance(err, MsdError):
log = logger.error
else:
log = logger.exception
log("Mass-storage device is not operational: %s", err)
self._device_path = ""
else:
logger.warning("Mass-storage device is not operational")
@_msd_operated
async def connect_to_kvm(self, no_delay: bool=False) -> None:
with self.__region:
if self.__device_info:
raise MsdAlreadyConnectedToKvmError()
gpio.write(self.__target, False)
if not no_delay:
await asyncio.sleep(self.__init_delay)
await self.__load_device_info()
get_logger().info("Mass-storage device switched to KVM: %s", self.__device_info)
@_msd_operated
async def connect_to_pc(self) -> None:
with self.__region:
if not self.__device_info:
raise MsdAlreadyConnectedToPcError()
gpio.write(self.__target, True)
self.__device_info = None
get_logger().info("Mass-storage device switched to Server")
@_msd_operated
async def reset(self) -> None:
with self.__region:
gpio.write(self.__reset, True)
await asyncio.sleep(self.__reset_delay)
gpio.write(self.__reset, False)
def get_state(self) -> Dict:
info = (self.__saved_device_info._asdict() if self.__saved_device_info else None)
if info:
info["hw"] = (info["hw"]._asdict() if info["hw"] else None)
info["image"] = (info["image"]._asdict() if info["image"] else None)
connected_to: Optional[str] = None
if self._device_path:
connected_to = ("kvm" if self.__device_info else "server")
return {
"in_operate": bool(self._device_path),
"connected_to": connected_to,
"busy": bool(self.__device_file),
"written": self.__written,
"info": info,
}
async def cleanup(self) -> None:
await self.__close_device_file()
gpio.write(self.__target, False)
gpio.write(self.__reset, False)
@_msd_operated
async def __aenter__(self) -> "MassStorageDevice":
self.__region.enter()
try:
if not self.__device_info:
raise MsdIsNotConnectedToKvmError()
self.__device_file = await aiofiles.open(self.__device_info.path, mode="w+b", buffering=0)
self.__written = 0
return self
finally:
self.__region.exit()
async def write_image_info(self, name: str, complete: bool) -> None:
assert self.__device_file
assert self.__device_info
if self.__write_meta:
if self.__device_info.size - self.__written > _IMAGE_INFO_SIZE:
await self.__device_file.seek(self.__device_info.size - _IMAGE_INFO_SIZE)
await self.__write_to_device_file(_make_image_info_bytes(name, self.__written, complete))
await self.__device_file.seek(0)
await self.__load_device_info()
else:
get_logger().error("Can't write image info because device is full")
async def write_image_chunk(self, chunk: bytes) -> int:
await self.__write_to_device_file(chunk)
self.__written += len(chunk)
return self.__written
async def __aexit__(
self,
_exc_type: Type[BaseException],
_exc: BaseException,
_tb: types.TracebackType,
) -> None:
try:
await self.__close_device_file()
finally:
self.__region.exit()
async def __write_to_device_file(self, data: bytes) -> None:
assert self.__device_file
await self.__device_file.write(data)
await self.__device_file.flush()
await self.__loop.run_in_executor(None, os.fsync, self.__device_file.fileno())
async def __load_device_info(self) -> None:
device_info = await self.__loop.run_in_executor(None, _explore_device, self._device_path)
if not device_info:
raise MsdError("Can't explore device %r" % (self._device_path))
self.__device_info = self.__saved_device_info = device_info
async def __close_device_file(self) -> None:
try:
if self.__device_file:
get_logger().info("Closing mass-storage device file ...")
await self.__device_file.close()
except Exception:
get_logger().exception("Can't close mass-storage device file")
await self.reset()
self.__device_file = None
self.__written = 0

471
kvmd/apps/kvmd/server.py Normal file
View File

@@ -0,0 +1,471 @@
import os
import signal
import asyncio
import json
import time
from typing import List
from typing import Dict
from typing import Set
from typing import Callable
from typing import Optional
import aiohttp.web
import setproctitle
from ...logging import get_logger
from ...logging import Log
from ...aioregion import RegionIsBusyError
from ... import __version__
from .hid import Hid
from .atx import Atx
from .msd import MsdOperationError
from .msd import MassStorageDevice
from .streamer import Streamer
# =====
def _system_task(method: Callable) -> Callable:
async def wrap(self: "Server") -> None:
try:
await method(self)
except asyncio.CancelledError:
pass
except Exception:
get_logger().exception("Unhandled exception, killing myself ...")
os.kill(os.getpid(), signal.SIGTERM)
return wrap
def _json(result: Optional[Dict]=None, status: int=200) -> aiohttp.web.Response:
return aiohttp.web.Response(
text=json.dumps({
"ok": (True if status == 200 else False),
"result": (result or {}),
}, sort_keys=True, indent=4),
status=status,
content_type="application/json",
)
def _json_exception(msg: str, err: Exception, status: int) -> aiohttp.web.Response:
msg = "%s: %s" % (msg, err)
get_logger().error(msg)
return _json({
"error": type(err).__name__,
"error_msg": msg,
}, status=status)
class BadRequest(Exception):
pass
def _valid_bool(name: str, flag: Optional[str]) -> bool:
flag = str(flag).strip().lower()
if flag in ["1", "true", "yes"]:
return True
elif flag in ["0", "false", "no"]:
return False
raise BadRequest("Invalid param '%s'" % (name))
def _valid_int(name: str, value: Optional[str], min_value: Optional[int]=None, max_value: Optional[int]=None) -> int:
try:
value_int = int(value) # type: ignore
if (
(min_value is not None and value_int < min_value)
or (max_value is not None and value_int > max_value)
):
raise ValueError()
return value_int
except Exception:
raise BadRequest("Invalid param %r" % (name))
def _wrap_exceptions_for_web(msg: str) -> Callable:
def make_wrapper(method: Callable) -> Callable:
async def wrap(self: "Server", request: aiohttp.web.Request) -> aiohttp.web.Response:
try:
return (await method(self, request))
except RegionIsBusyError as err:
return _json_exception(msg, err, 409)
except (BadRequest, MsdOperationError) as err:
return _json_exception(msg, err, 400)
return wrap
return make_wrapper
class Server: # pylint: disable=too-many-instance-attributes
def __init__( # pylint: disable=too-many-arguments
self,
log: Log,
hid: Hid,
atx: Atx,
msd: MassStorageDevice,
streamer: Streamer,
heartbeat: float,
atx_state_poll: float,
streamer_shutdown_delay: float,
msd_chunk_size: int,
loop: asyncio.AbstractEventLoop,
) -> None:
self.__log = log
self.__hid = hid
self.__atx = atx
self.__msd = msd
self.__streamer = streamer
self.__heartbeat = heartbeat
self.__streamer_shutdown_delay = streamer_shutdown_delay
self.__atx_state_poll = atx_state_poll
self.__msd_chunk_size = msd_chunk_size
self.__loop = loop
self.__sockets: Set[aiohttp.web.WebSocketResponse] = set()
self.__sockets_lock = asyncio.Lock()
self.__system_tasks: List[asyncio.Task] = []
self.__reset_streamer = False
self.__streamer_quality = streamer.get_current_quality()
self.__streamer_soft_fps = streamer.get_current_soft_fps()
def run(self, host: str, port: int) -> None:
self.__hid.start()
setproctitle.setproctitle("[main] " + setproctitle.getproctitle())
app = aiohttp.web.Application(loop=self.__loop)
app.router.add_get("/info", self.__info_handler)
app.router.add_get("/log", self.__log_handler)
app.router.add_get("/ws", self.__ws_handler)
app.router.add_post("/hid/reset", self.__hid_reset_handler)
app.router.add_get("/atx", self.__atx_state_handler)
app.router.add_post("/atx/click", self.__atx_click_handler)
app.router.add_get("/msd", self.__msd_state_handler)
app.router.add_post("/msd/connect", self.__msd_connect_handler)
app.router.add_post("/msd/write", self.__msd_write_handler)
app.router.add_post("/msd/reset", self.__msd_reset_handler)
app.router.add_get("/streamer", self.__streamer_state_handler)
app.router.add_post("/streamer/set_params", self.__streamer_set_params_handler)
app.router.add_post("/streamer/reset", self.__streamer_reset_handler)
app.on_shutdown.append(self.__on_shutdown)
app.on_cleanup.append(self.__on_cleanup)
self.__system_tasks.extend([
self.__loop.create_task(self.__hid_watchdog()),
self.__loop.create_task(self.__stream_controller()),
self.__loop.create_task(self.__poll_dead_sockets()),
self.__loop.create_task(self.__poll_atx_state()),
])
aiohttp.web.run_app(app, host=host, port=port, print=self.__run_app_print)
# ===== SYSTEM
async def __info_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
return _json({
"version": {
"kvmd": __version__,
"streamer": await self.__streamer.get_version(),
},
"streamer": self.__streamer.get_app(),
})
@_wrap_exceptions_for_web("Log error")
async def __log_handler(self, request: aiohttp.web.Request) -> aiohttp.web.StreamResponse:
seek = _valid_int("seek", request.query.get("seek", "0"), 0)
follow = _valid_bool("follow", request.query.get("follow", "false"))
response = aiohttp.web.StreamResponse(status=200, reason="OK", headers={"Content-Type": "text/plain"})
await response.prepare(request)
async for record in self.__log.log(seek, follow):
await response.write(("[%s %s] --- %s" % (
record["dt"].strftime("%Y-%m-%d %H:%M:%S"),
record["service"],
record["msg"],
)).encode("utf-8") + b"\r\n")
return response
# ===== WEBSOCKET
async def __ws_handler(self, request: aiohttp.web.Request) -> aiohttp.web.WebSocketResponse:
logger = get_logger(0)
ws = aiohttp.web.WebSocketResponse(heartbeat=self.__heartbeat)
await ws.prepare(request)
await self.__register_socket(ws)
async for msg in ws:
if msg.type == aiohttp.web.WSMsgType.TEXT:
try:
event = json.loads(msg.data)
except Exception as err:
logger.error("Can't parse JSON event from websocket: %s", err)
else:
event_type = event.get("event_type")
if event_type == "ping":
await ws.send_str(json.dumps({"msg_type": "pong"}))
elif event_type == "key":
await self.__handle_ws_key_event(event)
elif event_type == "mouse_move":
await self.__handle_ws_mouse_move_event(event)
elif event_type == "mouse_button":
await self.__handle_ws_mouse_button_event(event)
elif event_type == "mouse_wheel":
await self.__handle_ws_mouse_wheel_event(event)
else:
logger.error("Unknown websocket event: %r", event)
else:
break
return ws
async def __handle_ws_key_event(self, event: Dict) -> None:
key = str(event.get("key", ""))[:64].strip()
state = event.get("state")
if key and state in [True, False]:
await self.__hid.send_key_event(key, state)
async def __handle_ws_mouse_move_event(self, event: Dict) -> None:
try:
to_x = int(event["to"]["x"])
to_y = int(event["to"]["y"])
except Exception:
return
await self.__hid.send_mouse_move_event(to_x, to_y)
async def __handle_ws_mouse_button_event(self, event: Dict) -> None:
button = str(event.get("button", ""))[:64].strip()
state = event.get("state")
if button and state in [True, False]:
await self.__hid.send_mouse_button_event(button, state)
async def __handle_ws_mouse_wheel_event(self, event: Dict) -> None:
try:
delta_y = int(event["delta"]["y"])
except Exception:
return
await self.__hid.send_mouse_wheel_event(delta_y)
# ===== HID
async def __hid_reset_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
await self.__hid.reset()
return _json()
# ===== ATX
async def __atx_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
return _json(self.__atx.get_state())
@_wrap_exceptions_for_web("Click error")
async def __atx_click_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
button = request.query.get("button")
clicker = {
"power": self.__atx.click_power,
"power_long": self.__atx.click_power_long,
"reset": self.__atx.click_reset,
}.get(button)
if not clicker:
raise BadRequest("Invalid param 'button'")
await self.__broadcast_event("atx_click", button=button) # type: ignore
await clicker()
await self.__broadcast_event("atx_click", button=None) # type: ignore
return _json({"clicked": button})
# ===== MSD
async def __msd_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
return _json(self.__msd.get_state())
@_wrap_exceptions_for_web("Mass-storage error")
async def __msd_connect_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
to = request.query.get("to")
if to == "kvm":
await self.__msd.connect_to_kvm()
state = self.__msd.get_state()
await self.__broadcast_event("msd_state", **state)
elif to == "server":
await self.__msd.connect_to_pc()
state = self.__msd.get_state()
await self.__broadcast_event("msd_state", **state)
else:
raise BadRequest("Invalid param 'to'")
return _json(state)
@_wrap_exceptions_for_web("Can't write data to mass-storage device")
async def __msd_write_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
logger = get_logger(0)
reader = await request.multipart()
written = 0
try:
field = await reader.next()
if not field or field.name != "image_name":
raise BadRequest("Missing 'image_name' field")
image_name = (await field.read()).decode("utf-8")[:256]
field = await reader.next()
if not field or field.name != "image_data":
raise BadRequest("Missing 'image_data' field")
async with self.__msd:
await self.__broadcast_event("msd_state", **self.__msd.get_state())
logger.info("Writing image %r to mass-storage device ...", image_name)
await self.__msd.write_image_info(image_name, False)
while True:
chunk = await field.read_chunk(self.__msd_chunk_size)
if not chunk:
break
written = await self.__msd.write_image_chunk(chunk)
await self.__msd.write_image_info(image_name, True)
finally:
await self.__broadcast_event("msd_state", **self.__msd.get_state())
if written != 0:
logger.info("Written %d bytes to mass-storage device", written)
return _json({"written": written})
@_wrap_exceptions_for_web("Mass-storage error")
async def __msd_reset_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
await self.__msd.reset()
return _json()
# ===== STREAMER
async def __streamer_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
return _json(self.__streamer.get_state())
@_wrap_exceptions_for_web("Can't set stream params")
async def __streamer_set_params_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
quality = request.query.get("quality")
if quality:
self.__streamer_quality = _valid_int("quality", quality, 1, 100)
soft_fps = request.query.get("soft_fps")
if soft_fps:
self.__streamer_soft_fps = _valid_int("soft_fps", soft_fps, 1, 30)
return _json()
async def __streamer_reset_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
self.__reset_streamer = True
return _json()
# =====
def __run_app_print(self, text: str) -> None:
logger = get_logger()
for line in text.strip().splitlines():
logger.info(line.strip())
async def __on_shutdown(self, _: aiohttp.web.Application) -> None:
logger = get_logger(0)
logger.info("Cancelling system tasks ...")
for task in self.__system_tasks:
task.cancel()
await asyncio.gather(*self.__system_tasks)
logger.info("Disconnecting clients ...")
for ws in list(self.__sockets):
await self.__remove_socket(ws)
async def __on_cleanup(self, _: aiohttp.web.Application) -> None:
await self.__hid.cleanup()
await self.__streamer.cleanup()
await self.__msd.cleanup()
@_system_task
async def __hid_watchdog(self) -> None:
while self.__hid.is_alive():
await asyncio.sleep(0.1)
raise RuntimeError("HID is dead")
@_system_task
async def __stream_controller(self) -> None:
prev = 0
shutdown_at = 0.0
while True:
cur = len(self.__sockets)
if prev == 0 and cur > 0:
if not self.__streamer.is_running():
await self.__streamer.start(self.__streamer_quality, self.__streamer_soft_fps)
await self.__broadcast_event("streamer_state", **self.__streamer.get_state())
elif prev > 0 and cur == 0:
shutdown_at = time.time() + self.__streamer_shutdown_delay
elif prev == 0 and cur == 0 and time.time() > shutdown_at:
if self.__streamer.is_running():
await self.__streamer.stop()
await self.__broadcast_event("streamer_state", **self.__streamer.get_state())
if (
self.__reset_streamer
or self.__streamer_quality != self.__streamer.get_current_quality()
or self.__streamer_soft_fps != self.__streamer.get_current_soft_fps()
):
if self.__streamer.is_running():
await self.__streamer.stop()
await self.__streamer.start(self.__streamer_quality, self.__streamer_soft_fps, no_init_restart=True)
await self.__broadcast_event("streamer_state", **self.__streamer.get_state())
self.__reset_streamer = False
prev = cur
await asyncio.sleep(0.1)
@_system_task
async def __poll_dead_sockets(self) -> None:
while True:
for ws in list(self.__sockets):
if ws.closed or not ws._req.transport: # pylint: disable=protected-access
await self.__remove_socket(ws)
await asyncio.sleep(0.1)
@_system_task
async def __poll_atx_state(self) -> None:
while True:
if self.__sockets:
await self.__broadcast_event("atx_state", **self.__atx.get_state())
await asyncio.sleep(self.__atx_state_poll)
async def __broadcast_event(self, event: str, **kwargs: Dict) -> None:
await asyncio.gather(*[
ws.send_str(json.dumps({
"msg_type": "event",
"msg": {
"event": event,
"event_attrs": kwargs,
},
}))
for ws in list(self.__sockets)
if not ws.closed and ws._req.transport # pylint: disable=protected-access
], return_exceptions=True)
async def __register_socket(self, ws: aiohttp.web.WebSocketResponse) -> None:
async with self.__sockets_lock:
self.__sockets.add(ws)
get_logger().info("Registered new client socket: remote=%s; id=%d; active=%d",
ws._req.remote, id(ws), len(self.__sockets)) # pylint: disable=protected-access
async def __remove_socket(self, ws: aiohttp.web.WebSocketResponse) -> None:
async with self.__sockets_lock:
await self.__hid.clear_events()
try:
self.__sockets.remove(ws)
get_logger().info("Removed client socket: remote=%s; id=%d; active=%d",
ws._req.remote, id(ws), len(self.__sockets)) # pylint: disable=protected-access
await ws.close()
except Exception:
pass

175
kvmd/apps/kvmd/streamer.py Normal file
View File

@@ -0,0 +1,175 @@
import os
import asyncio
import asyncio.subprocess
from typing import List
from typing import Dict
from typing import Optional
from ...logging import get_logger
from ... import gpio
# =====
class Streamer: # pylint: disable=too-many-instance-attributes
def __init__(
self,
cap_power: int,
conv_power: int,
sync_delay: float,
init_delay: float,
init_restart_after: float,
quality: int,
soft_fps: int,
cmd: List[str],
loop: asyncio.AbstractEventLoop,
) -> None:
self.__cap_power = (gpio.set_output(cap_power) if cap_power > 0 else cap_power)
self.__conv_power = (gpio.set_output(conv_power) if conv_power > 0 else conv_power)
self.__sync_delay = sync_delay
self.__init_delay = init_delay
self.__init_restart_after = init_restart_after
self.__quality = quality
self.__soft_fps = soft_fps
self.__cmd = cmd
self.__loop = loop
self.__proc_task: Optional[asyncio.Task] = None
async def start(self, quality: int, soft_fps: int, no_init_restart: bool=False) -> None:
logger = get_logger()
logger.info("Starting streamer ...")
assert 1 <= quality <= 100
self.__quality = quality
assert 1 <= soft_fps <= 30
self.__soft_fps = soft_fps
await self.__inner_start()
if self.__init_restart_after > 0.0 and not no_init_restart:
logger.info("Stopping streamer to restart ...")
await self.__inner_stop()
logger.info("Starting again ...")
await self.__inner_start()
async def stop(self) -> None:
get_logger().info("Stopping streamer ...")
await self.__inner_stop()
def is_running(self) -> bool:
return bool(self.__proc_task)
def get_current_quality(self) -> int:
return self.__quality
def get_current_soft_fps(self) -> int:
return self.__soft_fps
def get_state(self) -> Dict:
return {
"is_running": self.is_running(),
"quality": self.__quality,
"soft_fps": self.__soft_fps,
}
def get_app(self) -> str:
return os.path.basename(self.__cmd[0])
async def get_version(self) -> str:
proc = await asyncio.create_subprocess_exec(
*[self.__cmd[0], "--version"],
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.DEVNULL,
)
(stdout, _) = await proc.communicate()
return stdout.decode(errors="ignore").strip()
async def cleanup(self) -> None:
if self.is_running():
await self.stop()
async def __inner_start(self) -> None:
assert not self.__proc_task
await self.__set_hw_enabled(True)
self.__proc_task = self.__loop.create_task(self.__process())
async def __inner_stop(self) -> None:
assert self.__proc_task
self.__proc_task.cancel()
await asyncio.gather(self.__proc_task, return_exceptions=True)
await self.__set_hw_enabled(False)
self.__proc_task = None
async def __set_hw_enabled(self, enabled: bool) -> None:
# XXX: This sequence is very important to enable converter and cap board
if self.__cap_power > 0:
gpio.write(self.__cap_power, enabled)
if self.__conv_power > 0:
if enabled:
await asyncio.sleep(self.__sync_delay)
gpio.write(self.__conv_power, enabled)
if enabled:
await asyncio.sleep(self.__init_delay)
async def __process(self) -> None: # pylint: disable=too-many-branches
logger = get_logger(0)
while True: # pylint: disable=too-many-nested-blocks
proc: Optional[asyncio.subprocess.Process] = None # pylint: disable=no-member
try:
cmd = [part.format(quality=self.__quality, soft_fps=self.__soft_fps) for part in self.__cmd]
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
)
logger.info("Started streamer pid=%d: %s", proc.pid, cmd)
empty = 0
async for line_bytes in proc.stdout: # type: ignore
line = line_bytes.decode(errors="ignore").strip()
if line:
logger.info("Streamer: %s", line)
empty = 0
else:
empty += 1
if empty == 100: # asyncio bug
raise RuntimeError("Streamer/asyncio: too many empty lines")
raise RuntimeError("Streamer unexpectedly died")
except asyncio.CancelledError:
break
except Exception as err:
if proc:
logger.exception("Unexpected streamer error: pid=%d", proc.pid)
else:
logger.exception("Can't start streamer: %s", err)
await asyncio.sleep(1)
finally:
if proc and proc.returncode is None:
await self.__kill(proc)
async def __kill(self, proc: asyncio.subprocess.Process) -> None: # pylint: disable=no-member
try:
proc.terminate()
await asyncio.sleep(1)
if proc.returncode is None:
try:
proc.kill()
except Exception:
if proc.returncode is not None:
raise
await proc.wait()
get_logger().info("Streamer killed: pid=%d; retcode=%d", proc.pid, proc.returncode)
except Exception:
if proc.returncode is None:
get_logger().exception("Can't kill streamer pid=%d", proc.pid)
else:
get_logger().info("Streamer killed: pid=%d; retcode=%d", proc.pid, proc.returncode)

View File

@@ -0,0 +1,50 @@
#!/usr/bin/env python3
import sys
import signal
import asyncio
import argparse
import time
import aiohttp
# =====
async def _run_client(loop: asyncio.AbstractEventLoop, url: str) -> None:
def stdin_callback() -> None:
line = sys.stdin.buffer.readline().decode()
if line:
asyncio.ensure_future(ws.send_str(line), loop=loop)
else:
loop.stop()
loop.add_reader(sys.stdin.fileno(), stdin_callback)
async def dispatch() -> None:
while True:
msg = await ws.receive()
if msg.type == aiohttp.WSMsgType.TEXT:
print("[%.5f] Received: %s" % (time.time(), msg.data.strip()))
else:
if msg.type == aiohttp.WSMsgType.CLOSE:
await ws.close()
elif msg.type == aiohttp.WSMsgType.ERROR:
print("[%.5f] Error during receive: %s" % (time.time(), ws.exception()))
elif msg.type == aiohttp.WSMsgType.CLOSED:
pass
break
async with aiohttp.ClientSession().ws_connect(url) as ws:
await dispatch()
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("-u", "--url", default="http://127.0.0.1:8081/ws")
options = parser.parse_args()
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, loop.stop)
loop.create_task(_run_client(loop, options.url))
loop.run_forever()

View File

@@ -0,0 +1,2 @@
from . import main
main()