diff --git a/kvmd/apps/vnc/server.py b/kvmd/apps/vnc/server.py index 1d5e12d6..a2b90736 100644 --- a/kvmd/apps/vnc/server.py +++ b/kvmd/apps/vnc/server.py @@ -29,10 +29,12 @@ import contextlib import aiohttp import async_lru +from evdev import ecodes + from ...logging import get_logger from ...keyboard.keysym import build_symmap -from ...keyboard.magic import BaseMagicHandler +from ...keyboard.magic import MagicHandler from ...clients.kvmd import KvmdClientWs from ...clients.kvmd import KvmdClientSession @@ -62,7 +64,7 @@ class _SharedParams: name: str = dataclasses.field(default="PiKVM") -class _Client(RfbClient, BaseMagicHandler): # pylint: disable=too-many-instance-attributes +class _Client(RfbClient): # pylint: disable=too-many-instance-attributes def __init__( # pylint: disable=too-many-arguments,too-many-locals self, reader: asyncio.StreamReader, @@ -103,7 +105,6 @@ class _Client(RfbClient, BaseMagicHandler): # pylint: disable=too-many-instance none_auth_only=none_auth_only, **dataclasses.asdict(shared_params), ) - BaseMagicHandler.__init__(self) self.__desired_fps = desired_fps self.__mouse_output = mouse_output @@ -130,6 +131,18 @@ class _Client(RfbClient, BaseMagicHandler): # pylint: disable=too-many-instance self.__info_switch_units = 0 self.__info_switch_active = "" + self.__magic = MagicHandler( + proxy_handler=self.__on_magic_key_proxy, + key_handlers={ + ecodes.KEY_P: self.__on_magic_clipboard_print, + ecodes.KEY_UP: self.__on_magic_switch_prev, + ecodes.KEY_LEFT: self.__on_magic_switch_prev, + ecodes.KEY_DOWN: self.__on_magic_switch_next, + ecodes.KEY_RIGHT: self.__on_magic_switch_next, + }, + numeric_handler=self.__on_magic_switch_port, + ) + # ===== async def run(self) -> None: @@ -343,35 +356,36 @@ class _Client(RfbClient, BaseMagicHandler): # pylint: disable=too-many-instance async def _on_key_event(self, key: int, state: bool) -> None: assert self.__stage1_authorized.is_passed() - await self._magic_handle_key(key, state) + await self.__magic.handle_key(key, state) - async def _on_magic_switch_prev(self) -> None: + async def __on_magic_switch_prev(self) -> None: assert self.__kvmd_session if self.__info_switch_units > 0: get_logger(0).info("%s [main]: Switching port to the previous one ...", self._remote) await self.__kvmd_session.switch.set_active_prev() - async def _on_magic_switch_next(self) -> None: + async def __on_magic_switch_next(self) -> None: assert self.__kvmd_session if self.__info_switch_units > 0: get_logger(0).info("%s [main]: Switching port to the next one ...", self._remote) await self.__kvmd_session.switch.set_active_next() - async def _on_magic_switch_port(self, first: int, second: int) -> bool: + async def __on_magic_switch_port(self, codes: list[int]) -> bool: assert self.__kvmd_session + assert len(codes) > 0 if self.__info_switch_units <= 0: return True elif 1 <= self.__info_switch_units <= 2: - port = float(first) + port = float(codes[0]) else: # self.__info_switch_units > 2: - if second < 0: + if len(codes) == 1: return False # Wait for the second key - port = (first + 1) + (second + 1) / 10 + port = (codes[0] + 1) + (codes[1] + 1) / 10 get_logger(0).info("%s [main]: Switching port to %s ...", self._remote, port) await self.__kvmd_session.switch.set_active(port) return True - async def _on_magic_clipboard_print(self) -> None: + async def __on_magic_clipboard_print(self) -> None: assert self.__kvmd_session if self.__clipboard: logger = get_logger(0) @@ -384,7 +398,7 @@ class _Client(RfbClient, BaseMagicHandler): # pylint: disable=too-many-instance except Exception: logger.exception("%s [main]: Can't print characters", self._remote) - async def _on_magic_key_proxy(self, key: int, state: bool) -> None: + async def __on_magic_key_proxy(self, key: int, state: bool) -> None: if self.__kvmd_ws: await self.__kvmd_ws.send_key_event(key, state) diff --git a/kvmd/keyboard/magic.py b/kvmd/keyboard/magic.py index cf9ba9b1..915ef8e0 100644 --- a/kvmd/keyboard/magic.py +++ b/kvmd/keyboard/magic.py @@ -22,21 +22,34 @@ import time +from typing import Callable +from typing import Awaitable + from evdev import ecodes # ===== -class BaseMagicHandler: +class MagicHandler: __MAGIC_KEY = ecodes.KEY_LEFTALT __MAGIC_TIMEOUT = 2 __MAGIC_TRIGGER = 2 - def __init__(self) -> None: + def __init__( + self, + proxy_handler: Callable[[int, bool], Awaitable[None]], + key_handlers: (dict[int, Callable[[], Awaitable[None]]] | None)=None, + numeric_handler: (Callable[[list[int]], Awaitable[bool]] | None)=None, + ) -> None: + + self.__proxy_handler = proxy_handler + self.__key_handlers = (key_handlers or {}) + self.__numeric_handler = numeric_handler + self.__taps = 0 self.__ts = 0.0 self.__codes: list[int] = [] - async def _magic_handle_key(self, key: int, state: bool) -> None: # pylint: disable=too-many-branches + async def handle_key(self, key: int, state: bool) -> None: # pylint: disable=too-many-branches if self.__ts + self.__MAGIC_TIMEOUT < time.monotonic(): self.__taps = 0 self.__ts = 0 @@ -53,44 +66,17 @@ class BaseMagicHandler: self.__ts = 0 self.__codes = [] if taps >= self.__MAGIC_TRIGGER: - if key == ecodes.KEY_P: - await self._on_magic_clipboard_print() + if key in self.__key_handlers: + await self.__key_handlers[key]() return - - elif key in [ecodes.KEY_UP, ecodes.KEY_LEFT]: - await self._on_magic_switch_prev() - return - - elif key in [ecodes.KEY_DOWN, ecodes.KEY_RIGHT]: - await self._on_magic_switch_next() - return - - elif ecodes.KEY_1 <= key <= ecodes.KEY_8: + elif self.__numeric_handler is not None and (ecodes.KEY_1 <= key <= ecodes.KEY_8): codes.append(key - ecodes.KEY_1) - if len(codes) == 1: - if not (await self._on_magic_switch_port(codes[0], -1)): - self.__taps = taps - self.__ts = time.monotonic() - self.__codes = codes - elif len(codes) >= 2: - await self._on_magic_switch_port(codes[0], codes[1]) + if not (await self.__numeric_handler(list(codes))): + # Если хандлер хочет код большей длины, он возвращает False, + # и мы ждем следующую цифру. + self.__taps = taps + self.__ts = time.monotonic() + self.__codes = codes return - await self._on_magic_key_proxy(key, state) - - async def _on_magic_clipboard_print(self) -> None: - pass - - async def _on_magic_switch_prev(self) -> None: - pass - - async def _on_magic_switch_next(self) -> None: - pass - - async def _on_magic_switch_port(self, first: int, second: int) -> bool: - _ = first - _ = second - return True - - async def _on_magic_key_proxy(self, key: int, state: bool) -> None: - raise NotImplementedError() + await self.__proxy_handler(key, state)