mirror of
https://github.com/mofeng-git/One-KVM.git
synced 2025-12-12 01:00:29 +08:00
202 lines
7.0 KiB
Python
202 lines
7.0 KiB
Python
import asyncio
|
|
import multiprocessing
|
|
import multiprocessing.queues
|
|
import queue
|
|
import struct
|
|
import pkgutil
|
|
import time
|
|
|
|
from typing import Dict
|
|
from typing import Set
|
|
from typing import NamedTuple
|
|
|
|
import yaml
|
|
import serial
|
|
import setproctitle
|
|
|
|
from .logging import get_logger
|
|
|
|
|
|
# =====
|
|
def _get_keymap() -> Dict[str, int]:
|
|
return yaml.load(pkgutil.get_data(__name__, "data/keymap.yaml").decode()) # type: ignore
|
|
|
|
|
|
_KEYMAP = _get_keymap()
|
|
|
|
|
|
class _KeyEvent(NamedTuple):
|
|
key: str
|
|
state: bool
|
|
|
|
|
|
class _MouseMoveEvent(NamedTuple):
|
|
to_x: int
|
|
to_y: int
|
|
|
|
|
|
class _MouseButtonEvent(NamedTuple):
|
|
button: str
|
|
state: bool
|
|
|
|
|
|
class _MouseWheelEvent(NamedTuple):
|
|
delta_y: int
|
|
|
|
|
|
class Hid(multiprocessing.Process):
|
|
def __init__(
|
|
self,
|
|
device_path: str,
|
|
speed: int,
|
|
) -> None:
|
|
|
|
super().__init__(daemon=True)
|
|
|
|
self.__device_path = device_path
|
|
self.__speed = speed
|
|
|
|
self.__pressed_keys: Set[str] = set()
|
|
self.__pressed_mouse_buttons: Set[str] = set()
|
|
self.__lock = asyncio.Lock()
|
|
self.__queue: multiprocessing.queues.Queue = multiprocessing.Queue()
|
|
|
|
self.__stop_event = multiprocessing.Event()
|
|
|
|
def start(self) -> None:
|
|
get_logger().info("Starting HID daemon ...")
|
|
super().start()
|
|
|
|
# TODO: add reset or power switching
|
|
|
|
async def send_key_event(self, key: str, state: bool) -> None:
|
|
if not self.__stop_event.is_set():
|
|
async with self.__lock:
|
|
if state and key not in self.__pressed_keys:
|
|
self.__pressed_keys.add(key)
|
|
self.__queue.put(_KeyEvent(key, state))
|
|
elif not state and key in self.__pressed_keys:
|
|
self.__pressed_keys.remove(key)
|
|
self.__queue.put(_KeyEvent(key, state))
|
|
|
|
async def send_mouse_move_event(self, to_x: int, to_y: int) -> None:
|
|
if not self.__stop_event.is_set():
|
|
async with self.__lock:
|
|
self.__queue.put(_MouseMoveEvent(to_x, to_y))
|
|
|
|
async def send_mouse_button_event(self, button: str, state: bool) -> None:
|
|
if not self.__stop_event.is_set():
|
|
async with self.__lock:
|
|
if state and button not in self.__pressed_mouse_buttons:
|
|
self.__pressed_mouse_buttons.add(button)
|
|
self.__queue.put(_MouseButtonEvent(button, state))
|
|
elif not state and button in self.__pressed_mouse_buttons:
|
|
self.__pressed_mouse_buttons.remove(button)
|
|
self.__queue.put(_MouseButtonEvent(button, state))
|
|
|
|
async def send_mouse_wheel_event(self, delta_y: int) -> None:
|
|
if not self.__stop_event.is_set():
|
|
async with self.__lock:
|
|
self.__queue.put(_MouseWheelEvent(delta_y))
|
|
|
|
async def clear_events(self) -> None:
|
|
if not self.__stop_event.is_set():
|
|
async with self.__lock:
|
|
self.__unsafe_clear_events()
|
|
|
|
async def cleanup(self) -> None:
|
|
async with self.__lock:
|
|
if self.is_alive():
|
|
self.__unsafe_clear_events()
|
|
get_logger().info("Stopping keyboard daemon ...")
|
|
self.__stop_event.set()
|
|
self.join()
|
|
else:
|
|
get_logger().warning("Emergency cleaning up HID events ...")
|
|
self.__emergency_clear_events()
|
|
|
|
def __unsafe_clear_events(self) -> None:
|
|
for button in self.__pressed_mouse_buttons:
|
|
self.__queue.put(_MouseButtonEvent(button, False))
|
|
self.__pressed_mouse_buttons.clear()
|
|
for key in self.__pressed_keys:
|
|
self.__queue.put(_KeyEvent(key, False))
|
|
self.__pressed_keys.clear()
|
|
|
|
def __emergency_clear_events(self) -> None:
|
|
try:
|
|
with serial.Serial(self.__device_path, self.__speed) as tty:
|
|
self.__send_clear_hid(tty)
|
|
except Exception:
|
|
get_logger().exception("Can't execute emergency clear HID events")
|
|
|
|
def run(self) -> None: # pylint: disable=too-many-branches
|
|
setproctitle.setproctitle("[hid] " + setproctitle.getproctitle())
|
|
try:
|
|
with serial.Serial(self.__device_path, self.__speed) as tty:
|
|
hid_ready = False
|
|
while True:
|
|
if hid_ready:
|
|
try:
|
|
event = self.__queue.get(timeout=0.05)
|
|
except queue.Empty:
|
|
pass
|
|
else:
|
|
if isinstance(event, _KeyEvent):
|
|
self.__send_key_event(tty, event)
|
|
elif isinstance(event, _MouseMoveEvent):
|
|
self.__send_mouse_move_event(tty, event)
|
|
elif isinstance(event, _MouseButtonEvent):
|
|
self.__send_mouse_button_event(tty, event)
|
|
elif isinstance(event, _MouseWheelEvent):
|
|
self.__send_mouse_wheel_event(tty, event)
|
|
else:
|
|
raise RuntimeError("Unknown HID event")
|
|
hid_ready = False
|
|
else:
|
|
if tty.in_waiting:
|
|
while tty.in_waiting:
|
|
tty.read(tty.in_waiting)
|
|
hid_ready = True
|
|
else:
|
|
time.sleep(0.05)
|
|
if self.__stop_event.is_set() and self.__queue.qsize() == 0:
|
|
break
|
|
except Exception:
|
|
get_logger().exception("Unhandled exception")
|
|
raise
|
|
|
|
def __send_key_event(self, tty: serial.Serial, event: _KeyEvent) -> None:
|
|
code = _KEYMAP.get(event.key)
|
|
if code:
|
|
key_bytes = bytes([code])
|
|
assert len(key_bytes) == 1, (event, key_bytes)
|
|
tty.write(
|
|
b"\01"
|
|
+ key_bytes
|
|
+ (b"\01" if event.state else b"\00")
|
|
+ b"\00\00"
|
|
)
|
|
|
|
def __send_mouse_move_event(self, tty: serial.Serial, event: _MouseMoveEvent) -> None:
|
|
to_x = min(max(-32768, event.to_x), 32767)
|
|
to_y = min(max(-32768, event.to_y), 32767)
|
|
tty.write(b"\02" + struct.pack(">hh", to_x, to_y))
|
|
|
|
def __send_mouse_button_event(self, tty: serial.Serial, event: _MouseButtonEvent) -> None:
|
|
if event.button == "left":
|
|
code = (0b10000000 | (0b00001000 if event.state else 0))
|
|
elif event.button == "right":
|
|
code = (0b01000000 | (0b00000100 if event.state else 0))
|
|
else:
|
|
code = 0
|
|
if code:
|
|
tty.write(b"\03" + bytes([code]) + b"\00\00\00")
|
|
|
|
def __send_mouse_wheel_event(self, tty: serial.Serial, event: _MouseWheelEvent) -> None:
|
|
delta_y = min(max(-128, event.delta_y), 127)
|
|
tty.write(b"\04\00" + struct.pack(">b", delta_y) + b"\00\00")
|
|
|
|
def __send_clear_hid(self, tty: serial.Serial) -> None:
|
|
tty.write(b"\00\00\00\00\00")
|