Merge branch 'ch9329'

This commit is contained in:
Maxim Devaev 2023-07-31 01:55:05 +03:00
commit cf44668af9
5 changed files with 493 additions and 0 deletions

View File

@ -0,0 +1,227 @@
# ========================================================================== #
# #
# KVMD - The main PiKVM daemon. #
# #
# Copyright (C) 2018-2022 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
# ========================================================================== #
import multiprocessing
import queue
import time
from typing import Iterable
from typing import AsyncGenerator
from ....logging import get_logger
from .... import tools
from .... import aiotools
from .... import aiomulti
from .... import aioproc
from ....yamlconf import Option
from ....validators.basic import valid_float_f01
from ....validators.os import valid_abs_path
from ....validators.hw import valid_tty_speed
from .. import BaseHid
from .chip import ChipResponseError
from .chip import ChipConnection
from .chip import Chip
from .mouse import Mouse
from .keyboard import Keyboard
# =====
class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-instance-attributes
def __init__( # pylint: disable=too-many-arguments,super-init-not-called
self,
device_path: str,
speed: int,
read_timeout: float,
) -> None:
multiprocessing.Process.__init__(self, daemon=True)
self.__device_path = device_path
self.__speed = speed
self.__read_timeout = read_timeout
self.__reset_required_event = multiprocessing.Event()
self.__cmd_queue: "multiprocessing.Queue[bytes]" = multiprocessing.Queue()
self.__notifier = aiomulti.AioProcessNotifier()
self.__state_flags = aiomulti.AioSharedFlags({
"online": 0,
"busy": 0,
"status": 0,
}, self.__notifier, type=int)
self.__stop_event = multiprocessing.Event()
self.__chip = Chip(device_path, speed, read_timeout)
self.__keyboard = Keyboard()
self.__mouse = Mouse()
@classmethod
def get_plugin_options(cls) -> dict:
return {
"device": Option("/dev/kvmd-hid", type=valid_abs_path, unpack_as="device_path"),
"speed": Option(9600, type=valid_tty_speed),
"read_timeout": Option(0.3, type=valid_float_f01),
}
def sysprep(self) -> None:
get_logger(0).info("Starting HID daemon ...")
self.start()
async def get_state(self) -> dict:
state = await self.__state_flags.get()
absolute = self.__mouse.is_absolute()
leds = await self.__keyboard.get_leds()
return {
"online": state["online"],
"busy": False,
"connected": None,
"keyboard": {
"online": state["online"],
"leds": leds,
"outputs": {"available": [], "active": ""},
},
"mouse": {
"online": state["online"],
"absolute": absolute,
"outputs": {
"available": ["usb", "usb_rel"],
"active": ("usb" if absolute else "usb_rel"),
},
},
}
async def poll_state(self) -> AsyncGenerator[dict, None]:
prev_state: dict = {}
while True:
state = await self.get_state()
if state != prev_state:
yield state
prev_state = state
await self.__notifier.wait()
async def reset(self) -> None:
self.__reset_required_event.set()
@aiotools.atomic_fg
async def cleanup(self) -> None:
if self.is_alive():
get_logger(0).info("Stopping HID daemon ...")
self.__stop_event.set()
if self.is_alive() or self.exitcode is not None:
self.join()
# =====
def send_key_events(self, keys: Iterable[tuple[str, bool]]) -> None:
for (key, state) in keys:
self.__queue_cmd(self.__keyboard.process_key(key, state))
def send_mouse_button_event(self, button: str, state: bool) -> None:
self.__queue_cmd(self.__mouse.process_button(button, state))
def send_mouse_move_event(self, to_x: int, to_y: int) -> None:
self.__queue_cmd(self.__mouse.process_move(to_x, to_y))
def send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None:
self.__queue_cmd(self.__mouse.process_wheel(delta_x, delta_y))
def send_mouse_relative_event(self, delta_x: int, delta_y: int) -> None:
self.__queue_cmd(self.__mouse.process_relative(delta_x, delta_y))
def set_params(self, keyboard_output: (str | None)=None, mouse_output: (str | None)=None) -> None:
if mouse_output is not None:
get_logger(0).info("HID : mouse output = %s", mouse_output)
self.__mouse.set_absolute(mouse_output == "usb")
self.__notifier.notify()
def set_connected(self, connected: bool) -> None:
pass
def clear_events(self) -> None:
tools.clear_queue(self.__cmd_queue)
def __queue_cmd(self, cmd: bytes, clear: bool=False) -> None:
if not self.__stop_event.is_set():
if clear:
# FIXME: Если очистка производится со стороны процесса хида, то возможна гонка между
# очисткой и добавлением нового события. Неприятно, но не смертельно.
# Починить блокировкой после перехода на асинхронные очереди.
tools.clear_queue(self.__cmd_queue)
self.__cmd_queue.put_nowait(cmd)
def run(self) -> None: # pylint: disable=too-many-branches
logger = aioproc.settle("HID", "hid")
while not self.__stop_event.is_set():
try:
self.__hid_loop()
except Exception:
logger.exception("Unexpected error in the run loop")
time.sleep(1)
def __hid_loop(self) -> None:
while not self.__stop_event.is_set():
try:
with self.__chip.connected() as conn:
while not (self.__stop_event.is_set() and self.__cmd_queue.qsize() == 0):
if self.__reset_required_event.is_set():
try:
self.__set_state_busy(True)
# self.__process_request(conn, RESET)
finally:
self.__reset_required_event.clear()
try:
cmd = self.__cmd_queue.get(timeout=0.1)
# get_logger(0).info(f"HID : cmd = {cmd}")
except queue.Empty:
self.__process_cmd(conn, b"")
else:
self.__process_cmd(conn, cmd)
except Exception:
self.clear_events()
get_logger(0).exception("Unexpected error in the HID loop")
time.sleep(2)
def __process_cmd(self, conn: ChipConnection, cmd: bytes) -> bool: # pylint: disable=too-many-branches
try:
led_byte = conn.xfer(cmd)
except ChipResponseError as err:
self.__set_state_online(False)
get_logger(0).info(err)
time.sleep(2)
else:
if led_byte >= 0:
self.__keyboard.set_leds(led_byte)
self.__notifier.notify()
self.__set_state_online(True)
return True
return False
def __set_state_online(self, online: bool) -> None:
self.__state_flags.update(online=int(online))
def __set_state_busy(self, busy: bool) -> None:
self.__state_flags.update(busy=int(busy))

View File

@ -0,0 +1,82 @@
# ========================================================================== #
# #
# KVMD - The main PiKVM daemon. #
# #
# Copyright (C) 2018-2022 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
# ========================================================================== #
import serial
import contextlib
from typing import Generator
# =====
class ChipResponseError(Exception):
pass
# =====
class ChipConnection:
def __init__(self, tty: serial.Serial) -> None:
self.__tty = tty
def xfer(self, cmd: bytes) -> int:
self.__send(cmd)
return self.__recv()
def __send(self, cmd: bytes) -> None:
# RESET = [0x00,0x0F,0x00]
# GET_INFO = [0x00,0x01,0x00]
if len(cmd) == 0:
cmd = b"\x00\x01\x00"
cmd = b"\x57\xAB" + cmd
cmd += self.__make_checksum(cmd).to_bytes(1, "big")
self.__tty.write(cmd)
def __recv(self) -> int:
data = self.__tty.read(5)
if len(data) < 5:
raise ChipResponseError("Too short response, HID might be disconnected")
if data and data[4]:
data += self.__tty.read(data[4] + 1)
if self.__make_checksum(data[:-1]) != data[-1]:
raise ChipResponseError("Invalid response checksum")
if data[4] == 1 and data[5] != 0:
raise ChipResponseError(f"Response error code = {data[5]!r}")
# led_byte (info) response
return (data[7] if data[3] == 0x81 else -1)
def __make_checksum(self, cmd: bytes) -> int:
return (sum(cmd) % 256)
class Chip:
def __init__(self, device_path: str, speed: int, read_timeout: float) -> None:
self.__device_path = device_path
self.__speed = speed
self.__read_timeout = read_timeout
@contextlib.contextmanager
def connected(self) -> Generator[ChipConnection, None, None]: # type: ignore
with serial.Serial(self.__device_path, self.__speed, timeout=self.__read_timeout) as tty:
yield ChipConnection(tty)

View File

@ -0,0 +1,68 @@
# ========================================================================== #
# #
# KVMD - The main PiKVM daemon. #
# #
# Copyright (C) 2018-2022 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
# ========================================================================== #
from .... import aiomulti
from ....keyboard.mappings import KEYMAP
# =====
class Keyboard:
def __init__(self) -> None:
self.__leds = aiomulti.AioSharedFlags({
"num": False,
"caps": False,
"scroll": False,
}, aiomulti.AioProcessNotifier(), bool)
self.__modifiers = 0
self.__active_keys: list[int] = []
def set_leds(self, led_byte: int) -> None:
self.__leds.update(
num=bool(led_byte & 1),
caps=bool((led_byte >> 1) & 1),
scroll=bool((led_byte >> 2) & 1),
)
async def get_leds(self) -> dict[str, bool]:
return (await self.__leds.get())
def process_key(self, key: str, state: bool) -> bytes:
code = KEYMAP[key].usb.code
is_modifier = KEYMAP[key].usb.is_modifier
if state:
if is_modifier:
self.__modifiers |= code
elif len(self.__active_keys) < 6 and code not in self.__active_keys:
self.__active_keys.append(code)
else:
if is_modifier:
self.__modifiers &= ~code
elif code in self.__active_keys:
self.__active_keys.remove(code)
cmd = [
0, 0x02, 0x08, self.__modifiers, 0,
0, 0, 0, 0, 0, 0,
]
for (index, code) in enumerate(self.__active_keys):
cmd[index + 5] = code
return bytes(cmd)

View File

@ -0,0 +1,115 @@
# ========================================================================== #
# #
# KVMD - The main PiKVM daemon. #
# #
# Copyright (C) 2018-2022 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
# ========================================================================== #
import math
from ....mouse import MouseRange
# =====
class Mouse: # pylint: disable=too-many-instance-attributes
def __init__(self) -> None:
self.__absolute = True
self.__buttons = 0
self.__to_x = (0, 0)
self.__to_y = (0, 0)
self.__delta_x = 0
self.__delta_y = 0
self.__wheel_y = 0
def set_absolute(self, flag: bool) -> None:
self.__absolute = flag
def is_absolute(self) -> bool:
return self.__absolute
def process_button(self, button: str, state: bool) -> bytes:
code = 0x00
match button:
case "left":
code = 0x01
case "right":
code = 0x02
case "middle":
code = 0x04
case "up":
code = 0x08
case "down":
code = 0x10
if code:
if state:
self.__buttons |= code
else:
self.__buttons &= ~code
self.__wheel_y = 0
if not self.__absolute:
return self.__make_relative_cmd()
else:
return self.__make_absolute_cmd()
def process_move(self, to_x: int, to_y: int) -> bytes:
self.__to_x = self.__fix_absolute(to_x)
self.__to_y = self.__fix_absolute(to_y)
self.__wheel_y = 0
return self.__make_absolute_cmd()
def __fix_absolute(self, value: int) -> tuple[int, int]:
assert MouseRange.MIN <= value <= MouseRange.MAX
to_fixed = math.ceil(MouseRange.remap(value, 0, MouseRange.MAX) / 8)
return (to_fixed >> 8, to_fixed & 0xFF)
def process_wheel(self, delta_x: int, delta_y: int) -> bytes:
_ = delta_x
assert -127 <= delta_y <= 127
self.__wheel_y = (1 if delta_y > 0 else 255)
if not self.__absolute:
return self.__make_relative_cmd()
else:
return self.__make_absolute_cmd()
def process_relative(self, delta_x: int, delta_y: int) -> bytes:
self.__delta_x = self.__fix_relative(delta_x)
self.__delta_y = self.__fix_relative(delta_y)
self.__wheel_y = 0
return self.__make_relative_cmd()
def __make_absolute_cmd(self) -> bytes:
return bytes([
0, 0x04, 0x07, 0x02,
self.__buttons,
self.__to_x[1], self.__to_x[0],
self.__to_y[1], self.__to_y[0],
self.__wheel_y,
])
def __make_relative_cmd(self) -> bytes:
return bytes([
0, 0x05, 0x05, 0x01,
self.__buttons,
self.__delta_x, self.__delta_y,
self.__wheel_y,
])
def __fix_relative(self, value: int) -> int:
assert -127 <= value <= 127
value = math.ceil(value / 3)
return (value if value >= 0 else (255 + value))

View File

@ -75,6 +75,7 @@ def main() -> None:
"kvmd.plugins.hid._mcu",
"kvmd.plugins.hid.otg",
"kvmd.plugins.hid.bt",
"kvmd.plugins.hid.ch9329",
"kvmd.plugins.atx",
"kvmd.plugins.msd",
"kvmd.plugins.msd.otg",