vnc: qemu ext keys

This commit is contained in:
Devaev Maxim
2020-10-08 15:26:37 +03:00
parent f1910f7c8e
commit a0b920a9d6
7 changed files with 114 additions and 28 deletions

View File

@@ -134,6 +134,9 @@ class RfbClient(RfbClientStream): # pylint: disable=too-many-instance-attribute
async def _on_key_event(self, code: int, state: bool) -> None:
raise NotImplementedError
async def _on_ext_key_event(self, code: int, state: bool) -> None:
raise NotImplementedError
async def _on_pointer_event(self, buttons: Dict[str, bool], wheel: Dict[str, int], move: Dict[str, int]) -> None:
raise NotImplementedError
@@ -360,6 +363,7 @@ class RfbClient(RfbClientStream): # pylint: disable=too-many-instance-attribute
4: self.__handle_key_event,
5: self.__handle_pointer_event,
6: self.__handle_client_cut_text,
255: self.__handle_qemu_event,
}
while True:
msg_type = await self._read_number("B")
@@ -380,9 +384,12 @@ class RfbClient(RfbClientStream): # pylint: disable=too-many-instance-attribute
if encodings_count > 1024:
raise RfbError(f"Too many encodings: {encodings_count}")
self._encodings = RfbClientEncodings(frozenset(await self._read_struct("l" * encodings_count)))
get_logger(0).info("[main] %s: Features: resize=%d; rename=%d; leds=%d",
self._remote, self._encodings.has_resize, self._encodings.has_rename, self._encodings.has_leds_state)
get_logger(0).info("[main] %s: Features: resize=%d, rename=%d, leds=%d, extkeys=%d",
self._remote, self._encodings.has_resize, self._encodings.has_rename,
self._encodings.has_leds_state, self._encodings.has_ext_keys)
self.__check_tight_jpeg()
if self._encodings.has_ext_keys: # Preferred method
await self._write_fb_update(0, 0, RfbEncodings.EXT_KEYS, drain=True)
await self._on_set_encodings()
async def __handle_fb_update_request(self) -> None:
@@ -417,6 +424,17 @@ class RfbClient(RfbClientStream): # pylint: disable=too-many-instance-attribute
text = await self._read_text(length)
await self._on_cut_event(text)
async def __handle_qemu_event(self) -> None:
(sub_type, state, code) = await self._read_struct("B H xxxx L")
if sub_type != 0:
raise RfbError(f"Invalid QEMU sub-message type: {sub_type}")
if code == 0xB7:
# For backwards compatibility servers SHOULD accept 0xB7 as a synonym for 0x54 (PrintScreen)
code = 0x54
if code & 0x80:
code = (0xE0 << 8) | (code & ~0x80)
await self._on_ext_key_event(code, bool(state))
def __check_tight_jpeg(self) -> None:
# JpegCompression may only be used when the client has advertized
# a quality level using the JPEG Quality Level Pseudo-encoding

View File

@@ -30,7 +30,8 @@ from typing import Any
class RfbEncodings:
RESIZE = -223 # DesktopSize Pseudo-encoding
RENAME = -307 # DesktopName Pseudo-encoding
LEDS_STATE = -261 # LED State Pseudo-encoding
LEDS_STATE = -261 # QEMU LED State Pseudo-encoding
EXT_KEYS = -258 # QEMU Extended Key Events Pseudo-encoding
TIGHT = 7
TIGHT_JPEG_QUALITIES = dict(zip( # JPEG Quality Level Pseudo-encoding
@@ -46,6 +47,7 @@ class RfbClientEncodings:
has_resize: bool = dataclasses.field(default=False)
has_rename: bool = dataclasses.field(default=False)
has_leds_state: bool = dataclasses.field(default=False)
has_ext_keys: bool = dataclasses.field(default=False)
has_tight: bool = dataclasses.field(default=False)
tight_jpeg_quality: int = dataclasses.field(default=0)
@@ -54,6 +56,7 @@ class RfbClientEncodings:
self.__set("has_resize", (RfbEncodings.RESIZE in self.encodings))
self.__set("has_rename", (RfbEncodings.RENAME in self.encodings))
self.__set("has_leds_state", (RfbEncodings.LEDS_STATE in self.encodings))
self.__set("has_ext_keys", (RfbEncodings.EXT_KEYS in self.encodings))
self.__set("has_tight", (RfbEncodings.TIGHT in self.encodings))
self.__set("tight_jpeg_quality", self.__get_tight_jpeg_quality())

View File

@@ -27,14 +27,18 @@ import dataclasses
import contextlib
from typing import Dict
from typing import Union
from typing import Optional
import aiohttp
from ...logging import get_logger
from ...keyboard.keysym import switch_symmap_modifiers
from ...keyboard.keysym import SymmapModifiers
from ...keyboard.keysym import build_symmap
from ...keyboard.mappings import WebModifiers
from ...keyboard.mappings import X11Modifiers
from ...keyboard.mappings import AT1_TO_WEB
from ...clients.kvmd import KvmdClientWs
from ...clients.kvmd import KvmdClientSession
@@ -240,7 +244,7 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
# =====
async def _on_key_event(self, code: int, state: bool) -> None:
(is_modifier, self.__modifiers) = switch_symmap_modifiers(self.__modifiers, code, state)
is_modifier = self.__switch_modifiers(code, state)
if self.__kvmd_ws:
web_keys = self.__symmap.get(code)
if web_keys:
@@ -253,6 +257,29 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
if web_key is not None:
await self.__kvmd_ws.send_key_event(web_key, state)
async def _on_ext_key_event(self, code: int, state: bool) -> None:
web_key = AT1_TO_WEB.get(code)
if web_key is not None:
self.__switch_modifiers(web_key, state) # Предполагаем, что модификаторы всегда известны
if self.__kvmd_ws:
await self.__kvmd_ws.send_key_event(web_key, state)
def __switch_modifiers(self, key: Union[int, str], state: bool) -> bool:
mod = 0
if key in X11Modifiers.SHIFTS or key in WebModifiers.SHIFTS:
mod = SymmapModifiers.SHIFT
elif key == X11Modifiers.ALTGR or key == WebModifiers.ALT_RIGHT:
mod = SymmapModifiers.ALTGR
elif key in X11Modifiers.CTRLS or key in WebModifiers.CTRLS:
mod = SymmapModifiers.CTRL
if mod == 0:
return False
if state:
self.__modifiers |= mod
else:
self.__modifiers &= ~mod
return True
async def _on_pointer_event(self, buttons: Dict[str, bool], wheel: Dict[str, int], move: Dict[str, int]) -> None:
if self.__kvmd_ws:
for (button, state) in buttons.items():