vnc: common key event handler

This commit is contained in:
Maxim Devaev 2025-05-09 23:24:21 +03:00
parent 8fb4bc6be7
commit 1356187771
2 changed files with 76 additions and 80 deletions

View File

@ -34,6 +34,10 @@ from ....logging import get_logger
from .... import tools
from .... import aiotools
from ....keyboard.keysym import SymmapModifiers
from ....keyboard.mappings import EvdevModifiers
from ....keyboard.mappings import X11Modifiers
from ....keyboard.mappings import AT1_TO_EVDEV
from ....mouse import MouseRange
from .errors import RfbError
@ -71,6 +75,7 @@ class RfbClient(RfbClientStream): # pylint: disable=too-many-instance-attribute
width: int,
height: int,
name: str,
symmap: dict[int, dict[int, int]],
scroll_rate: int,
vncpasses: set[str],
@ -89,6 +94,7 @@ class RfbClient(RfbClientStream): # pylint: disable=too-many-instance-attribute
self._height = height
self.__name = name
self.__scroll_rate = scroll_rate
self.__symmap = symmap
self.__vncpasses = vncpasses
self.__vencrypt = vencrypt
@ -105,6 +111,9 @@ class RfbClient(RfbClientStream): # pylint: disable=too-many-instance-attribute
self.__lock = asyncio.Lock()
# Эти состояния шарить не обязательно - бекенд исключает дублирующиеся события.
# Все это нужно только чтобы не посылать лишние события в сокет KVMD
self.__modifiers = 0
self.__mouse_buttons: dict[int, bool] = {}
self.__mouse_move = (-1, -1, -1, -1) # (width, height, X, Y)
@ -164,10 +173,7 @@ class RfbClient(RfbClientStream): # pylint: disable=too-many-instance-attribute
# =====
async def _on_key_event(self, code: int, state: bool) -> None:
raise NotImplementedError
async def _on_ext_key_event(self, code: int, state: bool) -> None:
async def _on_key_event(self, key: int, state: bool) -> None:
raise NotImplementedError
async def _on_mouse_button_event(self, button: int, state: bool) -> None:
@ -509,7 +515,64 @@ class RfbClient(RfbClientStream): # pylint: disable=too-many-instance-attribute
async def __handle_key_event(self) -> None:
(state, code) = await self._read_struct("key event", "? xx L")
await self._on_key_event(code, state) # type: ignore
state = bool(state)
is_modifier = self.__switch_modifiers_x11(code, state)
variants = self.__symmap.get(code)
fake_shift = False
if variants:
if is_modifier:
key = variants.get(0)
else:
key = variants.get(self.__modifiers)
if key is None:
key = variants.get(0)
if key is None and self.__modifiers == 0 and SymmapModifiers.SHIFT in variants:
# JUMP doesn't send shift events:
# - https://github.com/pikvm/pikvm/issues/820
key = variants[SymmapModifiers.SHIFT]
fake_shift = True
if key:
if fake_shift:
await self._on_key_event(EvdevModifiers.SHIFT_LEFT, True)
await self._on_key_event(key, state)
if fake_shift:
await self._on_key_event(EvdevModifiers.SHIFT_LEFT, False)
def __switch_modifiers_x11(self, code: int, state: bool) -> bool:
mod = 0
if code in X11Modifiers.SHIFTS:
mod = SymmapModifiers.SHIFT
elif code == X11Modifiers.ALTGR:
mod = SymmapModifiers.ALTGR
elif code in X11Modifiers.CTRLS:
mod = SymmapModifiers.CTRL
if mod == 0:
return False
if state:
self.__modifiers |= mod
else:
self.__modifiers &= ~mod
return True
def __switch_modifiers_evdev(self, key: int, state: bool) -> bool:
mod = 0
if key in EvdevModifiers.SHIFTS:
mod = SymmapModifiers.SHIFT
elif key == EvdevModifiers.ALT_RIGHT:
mod = SymmapModifiers.ALTGR
elif key in EvdevModifiers.CTRLS:
mod = SymmapModifiers.CTRL
if mod == 0:
return False
if state:
self.__modifiers |= mod
else:
self.__modifiers &= ~mod
return True
async def __handle_pointer_event(self) -> None:
(buttons, to_x, to_y) = await self._read_struct("pointer event", "B HH")
@ -559,6 +622,7 @@ class RfbClient(RfbClientStream): # pylint: disable=too-many-instance-attribute
async def __handle_qemu_event(self) -> None:
(sub_type, state, code) = await self._read_struct("QEMU event (key?)", "B H xxxx L")
state = bool(state)
if sub_type != 0:
raise RfbError(f"Invalid QEMU sub-message type: {sub_type}")
if code == 0xB7:
@ -566,4 +630,7 @@ class RfbClient(RfbClientStream): # pylint: disable=too-many-instance-attribute
code = 0x54
if code & 0x80:
code = (0xE0 << 8) | (code & ~0x80)
await self._on_ext_key_event(code, bool(state))
key = AT1_TO_EVDEV.get(code, 0)
if key:
self.__switch_modifiers_evdev(key, state) # Предполагаем, что модификаторы всегда известны
await self._on_key_event(key, state)

View File

@ -31,11 +31,7 @@ import async_lru
from ...logging import get_logger
from ...keyboard.keysym import SymmapModifiers
from ...keyboard.keysym import build_symmap
from ...keyboard.mappings import EvdevModifiers
from ...keyboard.mappings import X11Modifiers
from ...keyboard.mappings import AT1_TO_EVDEV
from ...keyboard.magic import BaseMagicHandler
from ...clients.kvmd import KvmdClientWs
@ -100,6 +96,7 @@ class _Client(RfbClient, BaseMagicHandler): # pylint: disable=too-many-instance
tls_timeout=tls_timeout,
x509_cert_path=x509_cert_path,
x509_key_path=x509_key_path,
symmap=symmap,
scroll_rate=scroll_rate,
vncpasses=vncpasses,
vencrypt=vencrypt,
@ -111,7 +108,6 @@ class _Client(RfbClient, BaseMagicHandler): # pylint: disable=too-many-instance
self.__desired_fps = desired_fps
self.__mouse_output = mouse_output
self.__keymap_name = keymap_name
self.__symmap = symmap
self.__kvmd = kvmd
self.__streamers = streamers
@ -128,10 +124,6 @@ class _Client(RfbClient, BaseMagicHandler): # pylint: disable=too-many-instance
self.__fb_queue: "asyncio.Queue[dict]" = asyncio.Queue()
self.__fb_has_key = False
# Эти состояния шарить не обязательно - бекенд исключает дублирующиеся события.
# Все это нужно только чтобы не посылать лишние жсоны в сокет KVMD
self.__modifiers = 0
self.__clipboard = ""
self.__info_host = ""
@ -349,72 +341,9 @@ class _Client(RfbClient, BaseMagicHandler): # pylint: disable=too-many-instance
# =====
async def _on_key_event(self, code: int, state: bool) -> None:
async def _on_key_event(self, key: int, state: bool) -> None:
assert self.__stage1_authorized.is_passed()
is_modifier = self.__switch_modifiers_x11(code, state)
variants = self.__symmap.get(code)
fake_shift = False
if variants:
if is_modifier:
key = variants.get(0)
else:
key = variants.get(self.__modifiers)
if key is None:
key = variants.get(0)
if key is None and self.__modifiers == 0 and SymmapModifiers.SHIFT in variants:
# JUMP doesn't send shift events:
# - https://github.com/pikvm/pikvm/issues/820
key = variants[SymmapModifiers.SHIFT]
fake_shift = True
if key:
if fake_shift:
await self._magic_handle_key(EvdevModifiers.SHIFT_LEFT, True)
await self._magic_handle_key(key, state)
if fake_shift:
await self._magic_handle_key(EvdevModifiers.SHIFT_LEFT, False)
async def _on_ext_key_event(self, code: int, state: bool) -> None:
assert self.__stage1_authorized.is_passed()
key = AT1_TO_EVDEV.get(code, 0)
if key:
self.__switch_modifiers_evdev(key, state) # Предполагаем, что модификаторы всегда известны
await self._magic_handle_key(key, state)
def __switch_modifiers_x11(self, key: int, state: bool) -> bool:
mod = 0
if key in X11Modifiers.SHIFTS:
mod = SymmapModifiers.SHIFT
elif key == X11Modifiers.ALTGR:
mod = SymmapModifiers.ALTGR
elif key in X11Modifiers.CTRLS:
mod = SymmapModifiers.CTRL
if mod == 0:
return False
if state:
self.__modifiers |= mod
else:
self.__modifiers &= ~mod
return True
def __switch_modifiers_evdev(self, key: int, state: bool) -> bool:
mod = 0
if key in EvdevModifiers.SHIFTS:
mod = SymmapModifiers.SHIFT
elif key == EvdevModifiers.ALT_RIGHT:
mod = SymmapModifiers.ALTGR
elif key in EvdevModifiers.CTRLS:
mod = SymmapModifiers.CTRL
if mod == 0:
return False
if state:
self.__modifiers |= mod
else:
self.__modifiers &= ~mod
return True
await self._magic_handle_key(key, state)
async def _on_magic_switch_prev(self) -> None:
assert self.__kvmd_session