using evdev instead of string constants

This commit is contained in:
Maxim Devaev
2025-05-01 03:03:25 +03:00
parent 1624b0cbf8
commit ebbd55ee17
29 changed files with 692 additions and 539 deletions

View File

@@ -31,8 +31,11 @@ from typing import Callable
from aiohttp.web import Request
from aiohttp.web import Response
from ....keyboard.mappings import WEB_TO_EVDEV
from ....keyboard.keysym import build_symmap
from ....keyboard.printer import text_to_web_keys
from ....keyboard.printer import text_to_evdev_keys
from ....mouse import MOUSE_TO_EVDEV
from ....htserver import exposed_http
from ....htserver import exposed_ws
@@ -124,10 +127,10 @@ class HidApi:
text = text[:limit]
symmap = self.__ensure_symmap(req.query.get("keymap", self.__default_keymap_name))
slow = valid_bool(req.query.get("slow", False))
await self.__hid.send_key_events(text_to_web_keys(text, symmap), no_ignore_keys=True, slow=slow)
await self.__hid.send_key_events(text_to_evdev_keys(text, symmap), no_ignore_keys=True, slow=slow)
return make_json_response()
def __ensure_symmap(self, keymap_name: str) -> dict[int, dict[int, str]]:
def __ensure_symmap(self, keymap_name: str) -> dict[int, dict[int, int]]:
keymap_name = valid_printable_filename(keymap_name, "keymap")
path = os.path.join(self.__keymaps_dir_path, keymap_name)
try:
@@ -139,7 +142,7 @@ class HidApi:
return self.__inner_ensure_symmap(path, st.st_mtime)
@functools.lru_cache(maxsize=10)
def __inner_ensure_symmap(self, path: str, mod_ts: int) -> dict[int, dict[int, str]]:
def __inner_ensure_symmap(self, path: str, mod_ts: int) -> dict[int, dict[int, int]]:
_ = mod_ts # For LRU
return build_symmap(path)
@@ -148,9 +151,12 @@ class HidApi:
@exposed_ws(1)
async def __ws_bin_key_handler(self, _: WsSession, data: bytes) -> None:
try:
key = valid_hid_key(data[1:].decode("ascii"))
state = bool(data[0] & 0b01)
finish = bool(data[0] & 0b10)
if data[0] & 0b10000000:
key = struct.unpack(">H", data[1:])[0]
else:
key = WEB_TO_EVDEV[valid_hid_key(data[1:33].decode("ascii"))]
except Exception:
return
self.__hid.send_key_event(key, state, finish)
@@ -158,7 +164,11 @@ class HidApi:
@exposed_ws(2)
async def __ws_bin_mouse_button_handler(self, _: WsSession, data: bytes) -> None:
try:
button = valid_hid_mouse_button(data[1:].decode("ascii"))
state = bool(data[0] & 0b01)
if data[0] & 0b10000000:
button = struct.unpack(">H", data[1:])[0]
else:
button = MOUSE_TO_EVDEV[valid_hid_mouse_button(data[1:33].decode("ascii"))]
state = bool(data[0] & 0b01)
except Exception:
return
@@ -199,7 +209,7 @@ class HidApi:
@exposed_ws("key")
async def __ws_key_handler(self, _: WsSession, event: dict) -> None:
try:
key = valid_hid_key(event["key"])
key = WEB_TO_EVDEV[valid_hid_key(event["key"])]
state = valid_bool(event["state"])
finish = valid_bool(event.get("finish", False))
except Exception:
@@ -209,7 +219,7 @@ class HidApi:
@exposed_ws("mouse_button")
async def __ws_mouse_button_handler(self, _: WsSession, event: dict) -> None:
try:
button = valid_hid_mouse_button(event["button"])
button = MOUSE_TO_EVDEV[valid_hid_mouse_button(event["button"])]
state = valid_bool(event["state"])
except Exception:
return
@@ -248,7 +258,7 @@ class HidApi:
@exposed_http("POST", "/hid/events/send_key")
async def __events_send_key_handler(self, req: Request) -> Response:
key = valid_hid_key(req.query.get("key"))
key = WEB_TO_EVDEV[valid_hid_key(req.query.get("key"))]
if "state" in req.query:
state = valid_bool(req.query["state"])
finish = valid_bool(req.query.get("finish", False))
@@ -259,7 +269,7 @@ class HidApi:
@exposed_http("POST", "/hid/events/send_mouse_button")
async def __events_send_mouse_button_handler(self, req: Request) -> Response:
button = valid_hid_mouse_button(req.query.get("button"))
button = MOUSE_TO_EVDEV[valid_hid_mouse_button(req.query.get("button"))]
if "state" in req.query:
state = valid_bool(req.query["state"])
self.__hid.send_mouse_button_event(button, state)

View File

@@ -31,6 +31,8 @@ from ... import aiotools
from ...plugins.hid import BaseHid
from ...keyboard.mappings import WEB_TO_EVDEV
from .streamer import Streamer
@@ -63,7 +65,7 @@ class Snapshoter: # pylint: disable=too-many-instance-attributes
else:
self.__idle_interval = self.__live_interval = 0.0
self.__wakeup_key = wakeup_key
self.__wakeup_key = WEB_TO_EVDEV.get(wakeup_key, 0)
self.__wakeup_move = wakeup_move
self.__online_delay = online_delay
@@ -121,8 +123,8 @@ class Snapshoter: # pylint: disable=too-many-instance-attributes
async def __wakeup(self) -> None:
logger = get_logger(0)
if self.__wakeup_key:
logger.info("Waking up using key %r ...", self.__wakeup_key)
if self.__wakeup_key > 0:
logger.info("Waking up using keyboard ...")
await self.__hid.send_key_events(
keys=[(self.__wakeup_key, True), (self.__wakeup_key, False)],
no_ignore_keys=True,

View File

@@ -159,7 +159,7 @@ class RfbClient(RfbClientStream): # pylint: disable=too-many-instance-attribute
async def _on_ext_key_event(self, code: int, state: bool) -> None:
raise NotImplementedError
async def _on_pointer_event(self, buttons: dict[str, bool], wheel: dict[str, int], move: dict[str, int]) -> None:
async def _on_pointer_event(self, buttons: dict[str, bool], wheel: tuple[int, int], move: tuple[int, int]) -> None:
raise NotImplementedError
async def _on_cut_event(self, text: str) -> None:
@@ -498,20 +498,20 @@ class RfbClient(RfbClientStream): # pylint: disable=too-many-instance-attribute
sr = self.__scroll_rate
await self._on_pointer_event(
buttons={
"left": bool(buttons & 0x1),
"right": bool(buttons & 0x4),
"left": bool(buttons & 0x1),
"right": bool(buttons & 0x4),
"middle": bool(buttons & 0x2),
"up": bool(ext_buttons & 0x2),
"down": bool(ext_buttons & 0x1),
},
wheel={
"x": (-sr if buttons & 0x40 else (sr if buttons & 0x20 else 0)),
"y": (-sr if buttons & 0x10 else (sr if buttons & 0x8 else 0)),
},
move={
"x": tools.remap(to_x, 0, self._width, *MouseRange.RANGE),
"y": tools.remap(to_y, 0, self._height, *MouseRange.RANGE),
"up": bool(ext_buttons & 0x2),
"down": bool(ext_buttons & 0x1),
},
wheel=(
(-sr if buttons & 0x40 else (sr if buttons & 0x20 else 0)),
(-sr if buttons & 0x10 else (sr if buttons & 0x8 else 0)),
),
move=(
tools.remap(to_x, 0, self._width, *MouseRange.RANGE),
tools.remap(to_y, 0, self._height, *MouseRange.RANGE),
),
)
async def __handle_client_cut_text(self) -> None:

View File

@@ -32,9 +32,11 @@ from ...logging import get_logger
from ...keyboard.keysym import SymmapModifiers
from ...keyboard.keysym import build_symmap
from ...keyboard.mappings import WebModifiers
from ...keyboard.mappings import EvdevModifiers
from ...keyboard.mappings import X11Modifiers
from ...keyboard.mappings import AT1_TO_WEB
from ...keyboard.mappings import AT1_TO_EVDEV
from ...mouse import MOUSE_TO_EVDEV
from ...clients.kvmd import KvmdClientWs
from ...clients.kvmd import KvmdClientSession
@@ -80,7 +82,7 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
desired_fps: int,
mouse_output: str,
keymap_name: str,
symmap: dict[int, dict[int, str]],
symmap: dict[int, dict[int, int]],
scroll_rate: int,
allow_cut_after: float,
@@ -132,8 +134,8 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
# Эти состояния шарить не обязательно - бекенд исключает дублирующиеся события.
# Все это нужно только чтобы не посылать лишние жсоны в сокет KVMD
self.__mouse_buttons: dict[str, (bool | None)] = dict.fromkeys(["left", "right", "middle", "up", "down"], None)
self.__mouse_move = {"x": -1, "y": -1}
self.__mouse_buttons: dict[str, (bool | None)] = dict.fromkeys(MOUSE_TO_EVDEV, None)
self.__mouse_move = (-1, -1) # (X, Y)
self.__modifiers = 0
@@ -337,45 +339,45 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
# =====
async def _on_key_event(self, code: int, state: bool) -> None:
is_modifier = self.__switch_modifiers(code, state)
is_modifier = self.__switch_modifiers_x11(code, state)
variants = self.__symmap.get(code)
fake_shift = False
if variants:
if is_modifier:
web_key = variants.get(0)
key = variants.get(0)
else:
web_key = variants.get(self.__modifiers)
if web_key is None:
web_key = variants.get(0)
key = variants.get(self.__modifiers)
if key is None:
key = variants.get(0)
if web_key is None and self.__modifiers == 0 and SymmapModifiers.SHIFT in variants:
if key is None and self.__modifiers == 0 and SymmapModifiers.SHIFT in variants:
# JUMP doesn't send shift events:
# - https://github.com/pikvm/pikvm/issues/820
web_key = variants[SymmapModifiers.SHIFT]
key = variants[SymmapModifiers.SHIFT]
fake_shift = True
if web_key and self.__kvmd_ws:
if key and self.__kvmd_ws:
if fake_shift:
await self.__kvmd_ws.send_key_event(WebModifiers.SHIFT_LEFT, True)
await self.__kvmd_ws.send_key_event(web_key, state)
await self.__kvmd_ws.send_key_event(EvdevModifiers.SHIFT_LEFT, True)
await self.__kvmd_ws.send_key_event(key, state)
if fake_shift:
await self.__kvmd_ws.send_key_event(WebModifiers.SHIFT_LEFT, False)
await self.__kvmd_ws.send_key_event(EvdevModifiers.SHIFT_LEFT, False)
async def _on_ext_key_event(self, code: int, state: bool) -> None:
web_key = AT1_TO_WEB.get(code)
if web_key:
self.__switch_modifiers(web_key, state) # Предполагаем, что модификаторы всегда известны
key = AT1_TO_EVDEV.get(code, 0)
if key:
self.__switch_modifiers_evdev(key, state) # Предполагаем, что модификаторы всегда известны
if self.__kvmd_ws:
await self.__kvmd_ws.send_key_event(web_key, state)
await self.__kvmd_ws.send_key_event(key, state)
def __switch_modifiers(self, key: (int | str), state: bool) -> bool:
def __switch_modifiers_x11(self, key: int, state: bool) -> bool:
mod = 0
if key in X11Modifiers.SHIFTS or key in WebModifiers.SHIFTS:
if key in X11Modifiers.SHIFTS:
mod = SymmapModifiers.SHIFT
elif key == X11Modifiers.ALTGR or key == WebModifiers.ALT_RIGHT:
elif key == X11Modifiers.ALTGR:
mod = SymmapModifiers.ALTGR
elif key in X11Modifiers.CTRLS or key in WebModifiers.CTRLS:
elif key in X11Modifiers.CTRLS:
mod = SymmapModifiers.CTRL
if mod == 0:
return False
@@ -385,18 +387,34 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
self.__modifiers &= ~mod
return True
async def _on_pointer_event(self, buttons: dict[str, bool], wheel: dict[str, int], move: dict[str, int]) -> None:
def __switch_modifiers_evdev(self, key: int, state: bool) -> bool:
mod = 0
if key in EvdevModifiers.SHIFTS:
mod = SymmapModifiers.SHIFT
elif key == EvdevModifiers.ALT_RIGHT:
mod = SymmapModifiers.ALTGR
elif key in EvdevModifiers.CTRLS:
mod = SymmapModifiers.CTRL
if mod == 0:
return False
if state:
self.__modifiers |= mod
else:
self.__modifiers &= ~mod
return True
async def _on_pointer_event(self, buttons: dict[str, bool], wheel: tuple[int, int], move: tuple[int, int]) -> None:
if self.__kvmd_ws:
if wheel["x"] or wheel["y"]:
await self.__kvmd_ws.send_mouse_wheel_event(wheel["x"], wheel["y"])
if wheel[0] or wheel[1]:
await self.__kvmd_ws.send_mouse_wheel_event(*wheel)
if self.__mouse_move != move:
await self.__kvmd_ws.send_mouse_move_event(move["x"], move["y"])
await self.__kvmd_ws.send_mouse_move_event(*move)
self.__mouse_move = move
for (button, state) in buttons.items():
if self.__mouse_buttons[button] != state:
await self.__kvmd_ws.send_mouse_button_event(button, state)
await self.__kvmd_ws.send_mouse_button_event(MOUSE_TO_EVDEV[button], state)
self.__mouse_buttons[button] = state
async def _on_cut_event(self, text: str) -> None: