From bc880009c17dba6cb64cfa18ec393f3994204881 Mon Sep 17 00:00:00 2001 From: Maxim Devaev Date: Fri, 9 May 2025 10:08:41 +0300 Subject: [PATCH] common BaseMagicHandler class --- kvmd/apps/vnc/server.py | 94 ++++++++++++---------------------------- kvmd/keyboard/magic.py | 96 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 124 insertions(+), 66 deletions(-) create mode 100644 kvmd/keyboard/magic.py diff --git a/kvmd/apps/vnc/server.py b/kvmd/apps/vnc/server.py index b2bd4d29..4dc6a199 100644 --- a/kvmd/apps/vnc/server.py +++ b/kvmd/apps/vnc/server.py @@ -25,13 +25,10 @@ import asyncio import socket import dataclasses import contextlib -import time import aiohttp import async_lru -from evdev import ecodes - from ...logging import get_logger from ...keyboard.keysym import SymmapModifiers @@ -39,6 +36,7 @@ from ...keyboard.keysym import build_symmap from ...keyboard.mappings import EvdevModifiers from ...keyboard.mappings import X11Modifiers from ...keyboard.mappings import AT1_TO_EVDEV +from ...keyboard.magic import BaseMagicHandler from ...mouse import MOUSE_TO_EVDEV @@ -70,11 +68,7 @@ class _SharedParams: name: str = dataclasses.field(default="PiKVM") -class _Client(RfbClient): # pylint: disable=too-many-instance-attributes - __MAGIC_KEY = ecodes.KEY_LEFTALT - __MAGIC_TIMEOUT = 2 - __MAGIC_TRIGGER = 2 - +class _Client(RfbClient, BaseMagicHandler): # pylint: disable=too-many-instance-attributes def __init__( # pylint: disable=too-many-arguments,too-many-locals self, reader: asyncio.StreamReader, @@ -100,7 +94,8 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes shared_params: _SharedParams, ) -> None: - super().__init__( + RfbClient.__init__( + self, reader=reader, writer=writer, tls_ciphers=tls_ciphers, @@ -113,6 +108,7 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes none_auth_only=none_auth_only, **dataclasses.asdict(shared_params), ) + BaseMagicHandler.__init__(self) self.__desired_fps = desired_fps self.__mouse_output = mouse_output @@ -142,10 +138,6 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes self.__clipboard = "" - self.__magic_taps = 0 - self.__magic_ts = 0.0 - self.__magic_codes: list[int] = [] - self.__info_host = "" self.__info_switch_units = 0 self.__info_switch_active = "" @@ -384,17 +376,17 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes if key: if fake_shift: - await self.__handle_key(ecodes.KEY_LEFTSHIFT, True) - await self.__handle_key(key, state) + await self._magic_handle_key(EvdevModifiers.SHIFT_LEFT, True) + await self._magic_handle_key(key, state) if fake_shift: - await self.__handle_key(ecodes.KEY_LEFTSHIFT, False) + await self._magic_handle_key(EvdevModifiers.SHIFT_LEFT, False) async def _on_ext_key_event(self, code: int, state: bool) -> None: assert self.__stage1_authorized.is_passed() key = AT1_TO_EVDEV.get(code, 0) if key: self.__switch_modifiers_evdev(key, state) # Предполагаем, что модификаторы всегда известны - await self.__handle_key(key, state) + await self._magic_handle_key(key, state) def __switch_modifiers_x11(self, key: int, state: bool) -> bool: mod = 0 @@ -428,67 +420,33 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes self.__modifiers &= ~mod return True - async def __handle_key(self, key: int, state: bool) -> None: # pylint: disable=too-many-branches - if self.__magic_ts + self.__MAGIC_TIMEOUT < time.monotonic(): - self.__magic_taps = 0 - self.__magic_ts = 0 - self.__magic_codes = [] - - if key == self.__MAGIC_KEY: - if not state: - self.__magic_taps += 1 - self.__magic_ts = time.monotonic() - elif state: - taps = self.__magic_taps - codes = self.__magic_codes - self.__magic_taps = 0 - self.__magic_ts = 0 - self.__magic_codes = [] - if taps >= self.__MAGIC_TRIGGER: - if key == ecodes.KEY_P: - await self.__handle_magic_clipboard_print() - return - elif key in [ecodes.KEY_UP, ecodes.KEY_LEFT]: - await self.__handle_magic_switch_prev() - return - elif key in [ecodes.KEY_DOWN, ecodes.KEY_RIGHT]: - await self.__handle_magic_switch_next() - return - elif ecodes.KEY_1 <= key <= ecodes.KEY_8: - if 1 <= self.__info_switch_units <= 2: - await self.__handle_magic_switch_port(key - ecodes.KEY_1) - elif self.__info_switch_units > 2: - codes.append(key - ecodes.KEY_1 + 1) - if len(codes) == 1: - self.__magic_taps = taps - self.__magic_ts = time.monotonic() - self.__magic_codes = codes - elif len(codes) >= 2: - await self.__handle_magic_switch_port(codes[0] + codes[1] / 10) - return - - if self.__kvmd_ws: - await self.__kvmd_ws.send_key_event(key, state) - - async def __handle_magic_switch_prev(self) -> None: + async def _on_magic_switch_prev(self) -> None: assert self.__kvmd_session if self.__info_switch_units > 0: get_logger(0).info("%s [main]: Switching port to the previous one ...", self._remote) await self.__kvmd_session.switch.set_active_prev() - async def __handle_magic_switch_next(self) -> None: + async def _on_magic_switch_next(self) -> None: assert self.__kvmd_session if self.__info_switch_units > 0: get_logger(0).info("%s [main]: Switching port to the next one ...", self._remote) await self.__kvmd_session.switch.set_active_next() - async def __handle_magic_switch_port(self, port: float) -> None: + async def _on_magic_switch_port(self, first: int, second: int) -> bool: assert self.__kvmd_session - if self.__info_switch_units > 0: - get_logger(0).info("%s [main]: Switching port to %s ...", self._remote, port) - await self.__kvmd_session.switch.set_active(port) + if self.__info_switch_units <= 0: + return True + elif 1 <= self.__info_switch_units <= 2: + port = float(first) + else: # self.__info_switch_units > 2: + if second < 0: + return False # Wait for the second key + port = (first + 1) + (second + 1) / 10 + get_logger(0).info("%s [main]: Switching port to %s ...", self._remote, port) + await self.__kvmd_session.switch.set_active(port) + return True - async def __handle_magic_clipboard_print(self) -> None: + async def _on_magic_clipboard_print(self) -> None: assert self.__kvmd_session if self.__clipboard: logger = get_logger(0) @@ -501,6 +459,10 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes except Exception: logger.exception("%s [main]: Can't print characters", self._remote) + async def _on_magic_key_proxy(self, key: int, state: bool) -> None: + if self.__kvmd_ws: + await self.__kvmd_ws.send_key_event(key, state) + # ===== async def _on_pointer_event(self, buttons: dict[str, bool], wheel: tuple[int, int], move: tuple[int, int]) -> None: diff --git a/kvmd/keyboard/magic.py b/kvmd/keyboard/magic.py new file mode 100644 index 00000000..cf9ba9b1 --- /dev/null +++ b/kvmd/keyboard/magic.py @@ -0,0 +1,96 @@ +# ========================================================================== # +# # +# KVMD - The main PiKVM daemon. # +# # +# Copyright (C) 2020 Maxim Devaev # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +# ========================================================================== # + + +import time + +from evdev import ecodes + + +# ===== +class BaseMagicHandler: + __MAGIC_KEY = ecodes.KEY_LEFTALT + __MAGIC_TIMEOUT = 2 + __MAGIC_TRIGGER = 2 + + def __init__(self) -> None: + self.__taps = 0 + self.__ts = 0.0 + self.__codes: list[int] = [] + + async def _magic_handle_key(self, key: int, state: bool) -> None: # pylint: disable=too-many-branches + if self.__ts + self.__MAGIC_TIMEOUT < time.monotonic(): + self.__taps = 0 + self.__ts = 0 + self.__codes = [] + + if key == self.__MAGIC_KEY: + if not state: + self.__taps += 1 + self.__ts = time.monotonic() + elif state: + taps = self.__taps + codes = self.__codes + self.__taps = 0 + self.__ts = 0 + self.__codes = [] + if taps >= self.__MAGIC_TRIGGER: + if key == ecodes.KEY_P: + await self._on_magic_clipboard_print() + return + + elif key in [ecodes.KEY_UP, ecodes.KEY_LEFT]: + await self._on_magic_switch_prev() + return + + elif key in [ecodes.KEY_DOWN, ecodes.KEY_RIGHT]: + await self._on_magic_switch_next() + return + + elif ecodes.KEY_1 <= key <= ecodes.KEY_8: + codes.append(key - ecodes.KEY_1) + if len(codes) == 1: + if not (await self._on_magic_switch_port(codes[0], -1)): + self.__taps = taps + self.__ts = time.monotonic() + self.__codes = codes + elif len(codes) >= 2: + await self._on_magic_switch_port(codes[0], codes[1]) + return + + await self._on_magic_key_proxy(key, state) + + async def _on_magic_clipboard_print(self) -> None: + pass + + async def _on_magic_switch_prev(self) -> None: + pass + + async def _on_magic_switch_next(self) -> None: + pass + + async def _on_magic_switch_port(self, first: int, second: int) -> bool: + _ = first + _ = second + return True + + async def _on_magic_key_proxy(self, key: int, state: bool) -> None: + raise NotImplementedError()