major keymaps improvement

This commit is contained in:
Devaev Maxim 2020-05-23 15:57:02 +03:00
parent a795fe5ed6
commit e9d86c058d
20 changed files with 139 additions and 105 deletions

View File

@ -56,8 +56,6 @@ ipmi:
vnc: vnc:
keymap: /usr/share/kvmd/keymaps/en-us
kvmd: kvmd:
unix: /run/kvmd/kvmd.sock unix: /run/kvmd/kvmd.sock

View File

@ -55,8 +55,6 @@ ipmi:
unix: /run/kvmd/kvmd.sock unix: /run/kvmd/kvmd.sock
vnc: vnc:
keymap: /usr/share/kvmd/keymaps/en-us
kvmd: kvmd:
unix: /run/kvmd/kvmd.sock unix: /run/kvmd/kvmd.sock

View File

@ -56,8 +56,6 @@ ipmi:
unix: /run/kvmd/kvmd.sock unix: /run/kvmd/kvmd.sock
vnc: vnc:
keymap: /usr/share/kvmd/keymaps/en-us
kvmd: kvmd:
unix: /run/kvmd/kvmd.sock unix: /run/kvmd/kvmd.sock

View File

@ -56,8 +56,6 @@ ipmi:
unix: /run/kvmd/kvmd.sock unix: /run/kvmd/kvmd.sock
vnc: vnc:
keymap: /usr/share/kvmd/keymaps/en-us
kvmd: kvmd:
unix: /run/kvmd/kvmd.sock unix: /run/kvmd/kvmd.sock

View File

@ -60,8 +60,6 @@ ipmi:
unix: /run/kvmd/kvmd.sock unix: /run/kvmd/kvmd.sock
vnc: vnc:
keymap: /usr/share/kvmd/keymaps/en-us
kvmd: kvmd:
unix: /run/kvmd/kvmd.sock unix: /run/kvmd/kvmd.sock

View File

@ -60,8 +60,6 @@ ipmi:
unix: /run/kvmd/kvmd.sock unix: /run/kvmd/kvmd.sock
vnc: vnc:
keymap: /usr/share/kvmd/keymaps/en-us
kvmd: kvmd:
unix: /run/kvmd/kvmd.sock unix: /run/kvmd/kvmd.sock

View File

@ -64,8 +64,6 @@ ipmi:
unix: /run/kvmd/kvmd.sock unix: /run/kvmd/kvmd.sock
vnc: vnc:
keymap: /usr/share/kvmd/keymaps/en-us
kvmd: kvmd:
unix: /run/kvmd/kvmd.sock unix: /run/kvmd/kvmd.sock

View File

@ -64,8 +64,6 @@ ipmi:
unix: /run/kvmd/kvmd.sock unix: /run/kvmd/kvmd.sock
vnc: vnc:
keymap: /usr/share/kvmd/keymaps/en-us
kvmd: kvmd:
unix: /run/kvmd/kvmd.sock unix: /run/kvmd/kvmd.sock

View File

@ -57,8 +57,6 @@ ipmi:
unix: /run/kvmd/kvmd.sock unix: /run/kvmd/kvmd.sock
vnc: vnc:
keymap: /usr/share/kvmd/keymaps/en-us
kvmd: kvmd:
unix: /run/kvmd/kvmd.sock unix: /run/kvmd/kvmd.sock

View File

@ -57,8 +57,6 @@ ipmi:
unix: /run/kvmd/kvmd.sock unix: /run/kvmd/kvmd.sock
vnc: vnc:
keymap: /usr/share/kvmd/keymaps/en-us
kvmd: kvmd:
unix: /run/kvmd/kvmd.sock unix: /run/kvmd/kvmd.sock

View File

@ -226,6 +226,7 @@ def _get_config_scheme() -> Dict:
"hid": { "hid": {
"type": Option("", type=valid_stripped_string_not_empty), "type": Option("", type=valid_stripped_string_not_empty),
"keymap": Option("/usr/share/kvmd/keymaps/en-us", type=valid_abs_path),
# Dynamic content # Dynamic content
}, },
@ -324,7 +325,7 @@ def _get_config_scheme() -> Dict:
"vnc": { "vnc": {
"desired_fps": Option(30, type=valid_stream_fps), "desired_fps": Option(30, type=valid_stream_fps),
"keymap": Option("", type=valid_abs_path), "keymap": Option("/usr/share/kvmd/keymaps/en-us", type=valid_abs_path),
"server": { "server": {
"host": Option("::", type=valid_ip_or_host), "host": Option("::", type=valid_ip_or_host),

View File

@ -76,13 +76,15 @@ def main(argv: Optional[List[str]]=None) -> None:
log_reader=LogReader(), log_reader=LogReader(),
wol=WakeOnLan(**config.wol._unpack()), wol=WakeOnLan(**config.wol._unpack()),
hid=get_hid_class(config.hid.type)(**config.hid._unpack(ignore=["type"])), hid=get_hid_class(config.hid.type)(**config.hid._unpack(ignore=["type", "keymap"])),
atx=get_atx_class(config.atx.type)(**config.atx._unpack(ignore=["type"])), atx=get_atx_class(config.atx.type)(**config.atx._unpack(ignore=["type"])),
msd=get_msd_class(config.msd.type)(**msd_kwargs), msd=get_msd_class(config.msd.type)(**msd_kwargs),
streamer=Streamer(**config.streamer._unpack()), streamer=Streamer(**config.streamer._unpack()),
heartbeat=config.server.heartbeat, heartbeat=config.server.heartbeat,
sync_chunk_size=config.server.sync_chunk_size, sync_chunk_size=config.server.sync_chunk_size,
keymap_path=config.hid.keymap,
).run(**config.server._unpack(ignore=["heartbeat", "sync_chunk_size"])) ).run(**config.server._unpack(ignore=["heartbeat", "sync_chunk_size"]))
get_logger(0).info("Bye-bye") get_logger(0).info("Bye-bye")

View File

@ -20,9 +20,13 @@
# ========================================================================== # # ========================================================================== #
import os
import stat
import asyncio import asyncio
import functools
from typing import Dict from typing import Dict
from typing import Set
from aiohttp.web import Request from aiohttp.web import Request
from aiohttp.web import Response from aiohttp.web import Response
@ -30,14 +34,21 @@ from aiohttp.web import WebSocketResponse
from ....plugins.hid import BaseHid from ....plugins.hid import BaseHid
from ....validators import raise_error
from ....validators.basic import valid_bool from ....validators.basic import valid_bool
from ....validators.basic import valid_number from ....validators.basic import valid_number
from ....validators.os import valid_printable_filename
from ....validators.kvm import valid_hid_key from ....validators.kvm import valid_hid_key
from ....validators.kvm import valid_hid_mouse_move from ....validators.kvm import valid_hid_mouse_move
from ....validators.kvm import valid_hid_mouse_button from ....validators.kvm import valid_hid_mouse_button
from ....validators.kvm import valid_hid_mouse_wheel from ....validators.kvm import valid_hid_mouse_wheel
from ....keyboard.keysym import SymmapWebKey
from ....keyboard.keysym import build_symmap
from ....keyboard.printer import text_to_web_keys from ....keyboard.printer import text_to_web_keys
from ..http import exposed_http from ..http import exposed_http
@ -47,9 +58,14 @@ from ..http import make_json_response
# ===== # =====
class HidApi: class HidApi:
def __init__(self, hid: BaseHid) -> None: def __init__(self, hid: BaseHid, keymap_path: str) -> None:
self.__hid = hid self.__hid = hid
self.__keymaps_dir_path = os.path.dirname(keymap_path)
self.__default_keymap_name = os.path.basename(keymap_path)
self.__ensure_symmap(self.__default_keymap_name)
self.__key_lock = asyncio.Lock() self.__key_lock = asyncio.Lock()
# ===== # =====
@ -63,17 +79,50 @@ class HidApi:
await self.__hid.reset() await self.__hid.reset()
return make_json_response() return make_json_response()
# =====
@exposed_http("GET", "/hid/keymaps")
async def __keymaps_handler(self, _: Request) -> Response:
keymaps: Set[str] = set()
for keymap_name in os.listdir(self.__keymaps_dir_path):
path = os.path.join(self.__keymaps_dir_path, keymap_name)
if os.access(path, os.R_OK) and stat.S_ISREG(os.stat(path).st_mode):
keymaps.add(keymap_name)
return make_json_response({
"keymaps": {
"default": self.__default_keymap_name,
"available": sorted(keymaps),
},
})
@exposed_http("POST", "/hid/print") @exposed_http("POST", "/hid/print")
async def __print_handler(self, request: Request) -> Response: async def __print_handler(self, request: Request) -> Response:
text = await request.text() text = await request.text()
limit = int(valid_number(request.query.get("limit", "1024"), min=0, type=int)) limit = int(valid_number(request.query.get("limit", "1024"), min=0, type=int))
if limit > 0: if limit > 0:
text = text[:limit] text = text[:limit]
symmap = self.__ensure_symmap(request.query.get("keymap", self.__default_keymap_name))
async with self.__key_lock: async with self.__key_lock:
for (key, state) in text_to_web_keys(text): for (key, state) in text_to_web_keys(text, symmap):
self.__hid.send_key_event(key, state) self.__hid.send_key_event(key, state)
return make_json_response() return make_json_response()
def __ensure_symmap(self, keymap_name: str) -> Dict[int, SymmapWebKey]:
keymap_name = valid_printable_filename(keymap_name, "keymap")
path = os.path.join(self.__keymaps_dir_path, keymap_name)
try:
st = os.stat(path)
if not (os.access(path, os.R_OK) and stat.S_ISREG(st.st_mode)):
raise_error(keymap_name, "keymap")
except Exception:
raise_error(keymap_name, "keymap")
return self.__inner_ensure_symmap(path, st.st_mtime)
@functools.lru_cache(maxsize=10)
def __inner_ensure_symmap(self, path: str, mtime: int) -> Dict[int, SymmapWebKey]:
_ = mtime # For LRU
return build_symmap(path)
# ===== # =====
@exposed_ws("key") @exposed_ws("key")

View File

@ -119,6 +119,8 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
heartbeat: float, heartbeat: float,
sync_chunk_size: int, sync_chunk_size: int,
keymap_path: str,
) -> None: ) -> None:
self.__auth_manager = auth_manager self.__auth_manager = auth_manager
@ -136,7 +138,7 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
self, self,
LogApi(log_reader), LogApi(log_reader),
WolApi(wol), WolApi(wol),
HidApi(hid), HidApi(hid, keymap_path),
AtxApi(atx), AtxApi(atx),
MsdApi(msd, sync_chunk_size), MsdApi(msd, sync_chunk_size),
] ]

View File

@ -23,8 +23,6 @@
from typing import List from typing import List
from typing import Optional from typing import Optional
from ...keyboard.keysym import build_symmap
from ...clients.kvmd import KvmdClient from ...clients.kvmd import KvmdClient
from ...clients.streamer import StreamerClient from ...clients.streamer import StreamerClient
@ -56,7 +54,7 @@ def main(argv: Optional[List[str]]=None) -> None:
tls_timeout=config.server.tls.timeout, tls_timeout=config.server.tls.timeout,
desired_fps=config.desired_fps, desired_fps=config.desired_fps,
symmap=build_symmap(config.keymap), keymap_path=config.keymap,
kvmd=KvmdClient( kvmd=KvmdClient(
user_agent=user_agent, user_agent=user_agent,

View File

@ -20,6 +20,7 @@
# ========================================================================== # # ========================================================================== #
import os
import asyncio import asyncio
import asyncio.queues import asyncio.queues
import socket import socket
@ -34,6 +35,9 @@ import aiohttp
from ...logging import get_logger from ...logging import get_logger
from ...keyboard.keysym import SymmapWebKey
from ...keyboard.keysym import build_symmap
from ...clients.kvmd import KvmdClient from ...clients.kvmd import KvmdClient
from ...clients.streamer import StreamerError from ...clients.streamer import StreamerError
@ -69,7 +73,8 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
tls_timeout: float, tls_timeout: float,
desired_fps: int, desired_fps: int,
symmap: Dict[int, str], keymap_name: str,
symmap: Dict[int, SymmapWebKey],
kvmd: KvmdClient, kvmd: KvmdClient,
streamer: StreamerClient, streamer: StreamerClient,
@ -92,6 +97,7 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
) )
self.__desired_fps = desired_fps self.__desired_fps = desired_fps
self.__keymap_name = keymap_name
self.__symmap = symmap self.__symmap = symmap
self.__kvmd = kvmd self.__kvmd = kvmd
@ -249,10 +255,10 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
# ===== # =====
async def _on_key_event(self, code: int, state: bool) -> None: async def _on_key_event(self, code: int, state: bool) -> None:
if (web_name := self.__symmap.get(code)) is not None: if (web_key := self.__symmap.get(code)) is not None:
await self.__ws_writer_queue.put({ await self.__ws_writer_queue.put({
"event_type": "key", "event_type": "key",
"event": {"key": web_name, "state": state}, "event": {"key": web_key.name, "state": state},
}) })
async def _on_pointer_event(self, buttons: Dict[str, bool], wheel: Dict[str, int], move: Dict[str, int]) -> None: async def _on_pointer_event(self, buttons: Dict[str, bool], wheel: Dict[str, int], move: Dict[str, int]) -> None:
@ -283,7 +289,14 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
logger = get_logger(0) logger = get_logger(0)
logger.info("[main] Client %s: Printing %d characters ...", self._remote, len(text)) logger.info("[main] Client %s: Printing %d characters ...", self._remote, len(text))
try: try:
await self.__kvmd.hid.print(user, passwd, text, 0) (default, available) = await self.__kvmd.hid.get_keymaps(user, passwd)
await self.__kvmd.hid.print(
user=user,
passwd=passwd,
text=text,
limit=0,
keymap_name=(self.__keymap_name if self.__keymap_name in available else default),
)
except Exception: except Exception:
logger.exception("[main] Client %s: Can't print characters", self._remote) logger.exception("[main] Client %s: Can't print characters", self._remote)
@ -311,7 +324,7 @@ class VncServer: # pylint: disable=too-many-instance-attributes
tls_timeout: float, tls_timeout: float,
desired_fps: int, desired_fps: int,
symmap: Dict[int, str], keymap_path: str,
kvmd: KvmdClient, kvmd: KvmdClient,
streamer: StreamerClient, streamer: StreamerClient,
@ -322,6 +335,9 @@ class VncServer: # pylint: disable=too-many-instance-attributes
self.__port = port self.__port = port
self.__max_clients = max_clients self.__max_clients = max_clients
keymap_name = os.path.basename(keymap_path)
symmap = build_symmap(keymap_path)
self.__vnc_auth_manager = vnc_auth_manager self.__vnc_auth_manager = vnc_auth_manager
shared_params = _SharedParams() shared_params = _SharedParams()
@ -343,6 +359,7 @@ class VncServer: # pylint: disable=too-many-instance-attributes
tls_ciphers=tls_ciphers, tls_ciphers=tls_ciphers,
tls_timeout=tls_timeout, tls_timeout=tls_timeout,
desired_fps=desired_fps, desired_fps=desired_fps,
keymap_name=keymap_name,
symmap=symmap, symmap=symmap,
kvmd=kvmd, kvmd=kvmd,
streamer=streamer, streamer=streamer,

View File

@ -22,7 +22,9 @@
import contextlib import contextlib
from typing import Tuple
from typing import Dict from typing import Dict
from typing import Set
from typing import AsyncGenerator from typing import AsyncGenerator
import aiohttp import aiohttp
@ -90,11 +92,18 @@ class _StreamerClientPart(_BaseClientPart):
class _HidClientPart(_BaseClientPart): class _HidClientPart(_BaseClientPart):
async def print(self, user: str, passwd: str, text: str, limit: int) -> None: async def get_keymaps(self, user: str, passwd: str) -> Tuple[str, Set[str]]:
async with self._make_session(user, passwd) as session:
async with session.get(self._make_url("hid/keymaps")) as response:
aiotools.raise_not_200(response)
result = (await response.json())["result"]
return (result["keymaps"]["default"], set(result["keymaps"]["available"]))
async def print(self, user: str, passwd: str, text: str, limit: int, keymap_name: str) -> None:
async with self._make_session(user, passwd) as session: async with self._make_session(user, passwd) as session:
async with session.post( async with session.post(
url=self._make_url("hid/print"), url=self._make_url("hid/print"),
params={"limit": limit}, params={"limit": limit, "keymap": keymap_name},
data=text, data=text,
) as response: ) as response:
aiotools.raise_not_200(response) aiotools.raise_not_200(response)

View File

@ -20,6 +20,7 @@
# ========================================================================== # # ========================================================================== #
import dataclasses
import pkgutil import pkgutil
import functools import functools
@ -29,22 +30,34 @@ import Xlib.keysymdef
from ..logging import get_logger from ..logging import get_logger
from .mappings import At1Key
from .mappings import X11_TO_AT1 from .mappings import X11_TO_AT1
from .mappings import AT1_TO_WEB from .mappings import AT1_TO_WEB
# ===== # =====
def build_symmap(path: str) -> Dict[int, str]: @dataclasses.dataclass(frozen=True)
class SymmapWebKey:
name: str
shift: bool
def build_symmap(path: str) -> Dict[int, SymmapWebKey]:
# https://github.com/qemu/qemu/blob/95a9457fd44ad97c518858a4e1586a5498f9773c/ui/keymaps.c # https://github.com/qemu/qemu/blob/95a9457fd44ad97c518858a4e1586a5498f9773c/ui/keymaps.c
symmap: Dict[int, str] = {} symmap: Dict[int, SymmapWebKey] = {}
for (x11_code, at1_key) in X11_TO_AT1.items(): for (x11_code, at1_key) in X11_TO_AT1.items():
symmap[x11_code] = AT1_TO_WEB[at1_key.code] symmap[x11_code] = SymmapWebKey(
name=AT1_TO_WEB[at1_key.code],
shift=False,
)
for (x11_code, at1_code) in _read_keyboard_layout(path).items(): for (x11_code, at1_key) in _read_keyboard_layout(path).items():
if (web_name := AT1_TO_WEB.get(at1_code)) is not None: if (web_name := AT1_TO_WEB.get(at1_key.code)) is not None:
# mypy bug symmap[x11_code] = SymmapWebKey(
symmap[x11_code] = web_name # type: ignore name=web_name,
shift=at1_key.shift,
)
return symmap return symmap
@ -76,14 +89,14 @@ def _resolve_keysym(name: str) -> int:
return 0 return 0
def _read_keyboard_layout(path: str) -> Dict[int, int]: # Keysym to evdev (at1) def _read_keyboard_layout(path: str) -> Dict[int, At1Key]: # Keysym to evdev (at1)
logger = get_logger(0) logger = get_logger(0)
logger.info("Reading keyboard layout %s ...", path) logger.info("Reading keyboard layout %s ...", path)
with open(path) as layout_file: with open(path) as layout_file:
lines = list(map(str.strip, layout_file.read().split("\n"))) lines = list(map(str.strip, layout_file.read().split("\n")))
layout: Dict[int, int] = {} layout: Dict[int, At1Key] = {}
for (number, line) in enumerate(lines): for (number, line) in enumerate(lines):
if len(line) == 0 or line.startswith(("#", "map ", "include ")): if len(line) == 0 or line.startswith(("#", "map ", "include ")):
continue continue
@ -92,7 +105,10 @@ def _read_keyboard_layout(path: str) -> Dict[int, int]: # Keysym to evdev (at1)
if len(parts) >= 2: if len(parts) >= 2:
if (code := _resolve_keysym(parts[0])) != 0: if (code := _resolve_keysym(parts[0])) != 0:
try: try:
layout[code] = int(parts[1], 16) layout[code] = At1Key(
code=int(parts[1], 16),
shift=bool(len(parts) == 3 and parts[2] == "shift"),
)
except ValueError as err: except ValueError as err:
logger.error("Can't parse layout line #%d: %s", number, str(err)) logger.error("Can't parse layout line #%d: %s", number, str(err))
return layout return layout

View File

@ -20,84 +20,43 @@
# ========================================================================== # # ========================================================================== #
import string
from typing import Tuple from typing import Tuple
from typing import Dict
from typing import Generator from typing import Generator
from .mappings import KEYMAP from .keysym import SymmapWebKey
# ===== # =====
_LOWER_CHARS = { def text_to_web_keys(
"\n": "Enter", text: str,
"\t": "Tab", symmap: Dict[int, SymmapWebKey],
" ": "Space", shift_key: str="ShiftLeft",
"`": "Backquote", ) -> Generator[Tuple[str, bool], None, None]:
"\\": "Backslash",
"[": "BracketLeft",
"]": "BracketLeft",
",": "Comma",
".": "Period",
"-": "Minus",
"'": "Quote",
";": "Semicolon",
"/": "Slash",
"=": "Equal",
**{str(number): f"Digit{number}" for number in range(0, 10)},
**{ch: f"Key{ch.upper()}" for ch in string.ascii_lowercase},
}
assert not set(_LOWER_CHARS.values()).difference(KEYMAP)
_UPPER_CHARS = {
"~": "Backquote",
"|": "Backslash",
"{": "BracketLeft",
"}": "BracketRight",
"<": "Comma",
">": "Period",
"!": "Digit1",
"@": "Digit2",
"#": "Digit3",
"$": "Digit4",
"%": "Digit5",
"^": "Digit6",
"&": "Digit7",
"*": "Digit8",
"(": "Digit9",
")": "Digit0",
"_": "Minus",
"\"": "Quote",
":": "Semicolon",
"?": "Slash",
"+": "Equal",
**{ch: f"Key{ch}" for ch in string.ascii_uppercase},
}
assert not set(_UPPER_CHARS.values()).difference(KEYMAP)
# =====
def text_to_web_keys(text: str, shift_key: str="ShiftLeft") -> Generator[Tuple[str, bool], None, None]:
assert shift_key in ["ShiftLeft", "ShiftRight"] assert shift_key in ["ShiftLeft", "ShiftRight"]
shifted = False shifted = False
for ch in text: for ch in text:
upper = False try:
key = _LOWER_CHARS.get(ch) code = ord(ch)
if key is None: if not (0x20 <= code <= 0x7E):
if (key := _UPPER_CHARS.get(ch)) is None: # https://stackoverflow.com/questions/12343987/convert-ascii-character-to-x11-keycode
# https://www.ascii-code.com
continue
key = symmap[code]
except Exception:
continue continue
upper = True
if upper and not shifted: if key.shift and not shifted:
yield (shift_key, True) yield (shift_key, True)
shifted = True shifted = True
elif not upper and shifted: elif not key.shift and shifted:
yield (shift_key, False) yield (shift_key, False)
shifted = False shifted = False
yield (key, True) yield (key.name, True)
yield (key, False) yield (key.name, False)
if shifted: if shifted:
yield (shift_key, False) yield (shift_key, False)

View File

@ -114,6 +114,7 @@ class Plugin(BaseHid):
# ===== # =====
def send_key_event(self, key: str, state: bool) -> None: def send_key_event(self, key: str, state: bool) -> None:
print(key, int(state))
self.__keyboard_proc.send_key_event(key, state) self.__keyboard_proc.send_key_event(key, state)
def send_mouse_button_event(self, button: str, state: bool) -> None: def send_mouse_button_event(self, button: str, state: bool) -> None: