refactoring

This commit is contained in:
Devaev Maxim 2019-04-08 04:58:32 +03:00
parent 7eca51f17b
commit 9243d2a00c
8 changed files with 246 additions and 65 deletions

View File

@ -13,7 +13,7 @@ all:
tox: _testenv
docker run --rm \
time docker run --rm \
--volume `pwd`:/src:ro \
--volume `pwd`/testenv:/src/testenv:rw \
--volume `pwd`/extras:/usr/share/kvmd/extras:ro \

View File

@ -23,87 +23,91 @@
import os
import signal
import asyncio
import dataclasses
import multiprocessing
import multiprocessing.queues
import queue
import struct
import pkgutil
import errno
import time
from typing import Dict
from typing import Set
from typing import NamedTuple
from typing import AsyncGenerator
from typing import Any
import yaml
import serial
import setproctitle
from ...logging import get_logger
from ... import gpio
from ... import keymap
# =====
def _get_keymap() -> Dict[str, int]:
return yaml.safe_load(pkgutil.get_data("kvmd", "data/keymap.yaml").decode()) # type: ignore
class _BaseEvent:
def make_command(self) -> bytes:
raise NotImplementedError
_KEYMAP = _get_keymap()
class _KeyEvent(NamedTuple):
key: str
@dataclasses.dataclass # pylint: disable=abstract-method
class _BoolEvent(_BaseEvent):
name: str
state: bool
@staticmethod
def is_valid(key: str) -> bool:
return (key in _KEYMAP)
@dataclasses.dataclass # pylint: disable=abstract-method
class _IntEvent(_BaseEvent):
x: int
y: int
@dataclasses.dataclass
class _KeyEvent(_BoolEvent):
def __post_init__(self) -> None:
assert self.name in keymap.KEYMAP
def make_command(self) -> bytes:
code = _KEYMAP[self.key]
code = keymap.KEYMAP[self.name]
key_bytes = bytes([code])
assert len(key_bytes) == 1, (self, key_bytes, code)
state_bytes = (b"\x01" if self.state else b"\x00")
return b"\x11" + key_bytes + state_bytes + b"\x00\x00"
class _MouseMoveEvent(NamedTuple):
to_x: int
to_y: int
@dataclasses.dataclass
class _MouseMoveEvent(_IntEvent):
def __post_init__(self) -> None:
assert -32768 <= self.x <= 32767
assert -32768 <= self.y <= 32767
def make_command(self) -> bytes:
to_x = min(max(-32768, self.to_x), 32767)
to_y = min(max(-32768, self.to_y), 32767)
return b"\x12" + struct.pack(">hh", to_x, to_y)
return b"\x12" + struct.pack(">hh", self.x, self.y)
class _MouseButtonEvent(NamedTuple):
button: str
state: bool
@staticmethod
def is_valid(button: str) -> bool:
return (button in ["left", "right"])
@dataclasses.dataclass
class _MouseButtonEvent(_BoolEvent):
def __post_init__(self) -> None:
assert self.name in ["left", "right"]
def make_command(self) -> bytes:
code = 0
if self.button == "left":
if self.name == "left":
code = (0b10000000 | (0b00001000 if self.state else 0))
elif self.button == "right":
elif self.name == "right":
code = (0b01000000 | (0b00000100 if self.state else 0))
assert code, self
return b"\x13" + bytes([code]) + b"\x00\x00\x00"
class _MouseWheelEvent(NamedTuple):
delta_y: int
@dataclasses.dataclass
class _MouseWheelEvent(_IntEvent):
def __post_init__(self) -> None:
assert self.x == 0 # Горизонтальная прокрутка пока не поддерживается
assert -128 <= self.y <= 127
def make_command(self) -> bytes:
delta_y = min(max(-128, self.delta_y), 127)
return b"\x14\x00" + struct.pack(">b", delta_y) + b"\x00\x00"
return b"\x14\x00" + struct.pack(">b", self.y) + b"\x00\x00"
# =====
@ -139,13 +143,13 @@ class Hid(multiprocessing.Process): # pylint: disable=too-many-instance-attribu
self.__state_poll = state_poll
self.__lock = asyncio.Lock()
self.__pressed_keys: Set[str] = set()
self.__pressed_mouse_buttons: Set[str] = set()
self.__lock = asyncio.Lock()
self.__events_queue: multiprocessing.queues.Queue = multiprocessing.Queue()
self.__online_shared = multiprocessing.Value("i", 1)
self.__stop_event = multiprocessing.Event()
def start(self) -> None:
@ -167,16 +171,16 @@ class Hid(multiprocessing.Process): # pylint: disable=too-many-instance-attribu
gpio.write(self.__reset_pin, False)
async def send_key_event(self, key: str, state: bool) -> None:
await self.__send_bool_event(_KeyEvent, self.__pressed_keys, key, state)
await self.__send_bool_event(_KeyEvent(key, state), self.__pressed_keys)
async def send_mouse_move_event(self, to_x: int, to_y: int) -> None:
await self.__send_int_event(_MouseMoveEvent, to_x, to_y)
await self.__send_int_event(_MouseMoveEvent(to_x, to_y))
async def send_mouse_button_event(self, button: str, state: bool) -> None:
await self.__send_bool_event(_MouseButtonEvent, self.__pressed_mouse_buttons, button, state)
await self.__send_bool_event(_MouseButtonEvent(button, state), self.__pressed_mouse_buttons)
async def send_mouse_wheel_event(self, delta_y: int) -> None:
await self.__send_int_event(_MouseWheelEvent, delta_y)
await self.__send_int_event(_MouseWheelEvent(0, delta_y))
async def clear_events(self) -> None:
if not self.__stop_event.is_set():
@ -196,23 +200,23 @@ class Hid(multiprocessing.Process): # pylint: disable=too-many-instance-attribu
self.join()
gpio.write(self.__reset_pin, False)
async def __send_bool_event(self, cls: Any, pressed: Set[str], name: str, state: bool) -> None:
async def __send_bool_event(self, event: _BoolEvent, pressed: Set[str]) -> None:
if not self.__stop_event.is_set():
async with self.__lock:
if cls.is_valid(name) and (
(state and (name not in pressed)) # Если еще не нажато
or (not state and (name in pressed)) # ... Или еще не отжато
if (
(event.state and (event.name not in pressed)) # Если еще не нажато
or (not event.state and (event.name in pressed)) # ... Или еще не отжато
):
if state:
pressed.add(name)
if event.state:
pressed.add(event.name)
else:
pressed.remove(name)
self.__events_queue.put(cls(name, state))
pressed.remove(event.name)
self.__events_queue.put(event)
async def __send_int_event(self, cls: Any, *args: int) -> None:
async def __send_int_event(self, event: _IntEvent) -> None:
if not self.__stop_event.is_set():
async with self.__lock:
self.__events_queue.put(cls(*args))
self.__events_queue.put(event)
def __unsafe_clear_events(self) -> None:
for (cls, pressed) in [
@ -244,7 +248,7 @@ class Hid(multiprocessing.Process): # pylint: disable=too-many-instance-attribu
passed = 0
while not (self.__stop_event.is_set() and self.__events_queue.qsize() == 0):
try:
event = self.__events_queue.get(timeout=0.05)
event: _BaseEvent = self.__events_queue.get(timeout=0.05)
except queue.Empty:
if passed >= 20: # 20 * 0.05 = 1 sec
self.__process_command(tty, b"\x01\x00\x00\x00\x00") # Ping

View File

@ -47,14 +47,20 @@ from ...aioregion import RegionIsBusyError
from ...validators import ValidatorError
from ...validators.basic import valid_bool
from ...validators.auth import valid_user
from ...validators.auth import valid_passwd
from ...validators.auth import valid_auth_token
from ...validators.kvm import valid_atx_button
from ...validators.kvm import valid_kvm_target
from ...validators.kvm import valid_log_seek
from ...validators.kvm import valid_stream_quality
from ...validators.kvm import valid_stream_fps
from ...validators.kvm import valid_hid_key
from ...validators.kvm import valid_hid_mouse_move
from ...validators.kvm import valid_hid_mouse_button
from ...validators.kvm import valid_hid_mouse_wheel
from ... import __version__
@ -384,28 +390,32 @@ class Server: # pylint: disable=too-many-instance-attributes
return ws
async def __handle_ws_key_event(self, event: Dict) -> None:
key = str(event.get("key", ""))[:64].strip()
state = event.get("state")
if key and state in [True, False]:
await self.__hid.send_key_event(key, state)
try:
key = valid_hid_key(event["key"])
state = valid_bool(event["state"])
except Exception:
return
await self.__hid.send_key_event(key, state)
async def __handle_ws_mouse_move_event(self, event: Dict) -> None:
try:
to_x = int(event["to"]["x"])
to_y = int(event["to"]["y"])
to_x = valid_hid_mouse_move(event["to"]["x"])
to_y = valid_hid_mouse_move(event["to"]["y"])
except Exception:
return
await self.__hid.send_mouse_move_event(to_x, to_y)
async def __handle_ws_mouse_button_event(self, event: Dict) -> None:
button = str(event.get("button", ""))[:64].strip()
state = event.get("state")
if button and state in [True, False]:
await self.__hid.send_mouse_button_event(button, state)
try:
button = valid_hid_mouse_button(event["button"])
state = valid_bool(event["state"])
except Exception:
return
await self.__hid.send_mouse_button_event(button, state)
async def __handle_ws_mouse_wheel_event(self, event: Dict) -> None:
try:
delta_y = int(event["delta"]["y"])
delta_y = valid_hid_mouse_wheel(event["delta"]["y"])
except Exception:
return
await self.__hid.send_mouse_wheel_event(delta_y)

36
kvmd/keymap.py Normal file
View File

@ -0,0 +1,36 @@
# ========================================================================== #
# #
# KVMD - The main Pi-KVM daemon. #
# #
# Copyright (C) 2018 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
# ========================================================================== #
import pkgutil
from typing import Dict
import yaml
# =====
def _get_keymap() -> Dict[str, int]:
return yaml.safe_load(pkgutil.get_data("kvmd", "data/keymap.yaml").decode()) # type: ignore
# =====
KEYMAP = _get_keymap()

View File

@ -23,8 +23,11 @@
import re
from typing import List
from typing import Mapping
from typing import Sequence
from typing import Callable
from typing import NoReturn
from typing import Union
from typing import Any
@ -54,13 +57,13 @@ def check_not_none_string(arg: Any, name: str, strip: bool=True) -> str:
return arg
def check_in_list(arg: Any, name: str, variants: List) -> Any:
def check_in_list(arg: Any, name: str, variants: Union[Sequence, Mapping]) -> Any:
if arg not in variants:
raise_error(arg, name)
return arg
def check_string_in_list(arg: Any, name: str, variants: List[str], lower: bool=True) -> Any:
def check_string_in_list(arg: Any, name: str, variants: Union[Sequence[str], Mapping[str, Any]], lower: bool=True) -> Any:
arg = check_not_none_string(arg, name)
if lower:
arg = arg.lower()

View File

@ -22,6 +22,8 @@
from typing import Any
from .. import keymap
from . import check_string_in_list
from .basic import valid_number
@ -46,3 +48,22 @@ def valid_stream_quality(arg: Any) -> int:
def valid_stream_fps(arg: Any) -> int:
return int(valid_number(arg, min=0, max=30, name="stream FPS"))
# =====
def valid_hid_key(arg: Any) -> str:
return check_string_in_list(arg, "HID key", keymap.KEYMAP, lower=False)
def valid_hid_mouse_move(arg: Any) -> int:
arg = valid_number(arg, name="HID mouse move")
return min(max(-32768, arg), 32767)
def valid_hid_mouse_button(arg: Any) -> str:
return check_string_in_list(arg, "HID mouse button", ["left", "right"])
def valid_hid_mouse_wheel(arg: Any) -> int:
arg = valid_number(arg, name="HID mouse wheel")
return min(max(-128, arg), 127)

36
tests/test_keymap.py Normal file
View File

@ -0,0 +1,36 @@
# ========================================================================== #
# #
# KVMD - The main Pi-KVM daemon. #
# #
# Copyright (C) 2018 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
# ========================================================================== #
import pytest
from kvmd.keymap import KEYMAP
# =====
def test_keymap__ok() -> None:
assert type(KEYMAP["KeyA"]) == int # pylint: disable=unidiomatic-typecheck
assert KEYMAP["KeyA"] == 1
def test_keymap__fail() -> None:
with pytest.raises(KeyError):
print(KEYMAP["keya"])

View File

@ -24,12 +24,18 @@ from typing import Any
import pytest
from kvmd.keymap import KEYMAP
from kvmd.validators import ValidatorError
from kvmd.validators.kvm import valid_atx_button
from kvmd.validators.kvm import valid_kvm_target
from kvmd.validators.kvm import valid_log_seek
from kvmd.validators.kvm import valid_stream_quality
from kvmd.validators.kvm import valid_stream_fps
from kvmd.validators.kvm import valid_hid_key
from kvmd.validators.kvm import valid_hid_mouse_move
from kvmd.validators.kvm import valid_hid_mouse_button
from kvmd.validators.kvm import valid_hid_mouse_wheel
# =====
@ -96,3 +102,68 @@ def test_ok__valid_stream_fps(arg: Any) -> None:
def test_fail__valid_stream_fps(arg: Any) -> None:
with pytest.raises(ValidatorError):
print(valid_stream_fps(arg))
# =====
def test_ok__valid_hid_key() -> None:
for key in KEYMAP:
print(valid_hid_key(key))
print(valid_hid_key(key + " "))
@pytest.mark.parametrize("arg", ["test", "", None, "keya"])
def test_fail__valid_hid_key(arg: Any) -> None:
with pytest.raises(ValidatorError):
print(valid_hid_key(arg))
# =====
@pytest.mark.parametrize("arg", [-20000, "1 ", "-1", 1, -1, 0, "20000 "])
def test_ok__valid_hid_mouse_move(arg: Any) -> None:
assert valid_hid_mouse_move(arg) == int(str(arg).strip())
def test_ok__valid_hid_mouse_move__m50000() -> None:
assert valid_hid_mouse_move(-50000) == -32768
def test_ok__valid_hid_mouse_move__p50000() -> None:
assert valid_hid_mouse_move(50000) == 32767
@pytest.mark.parametrize("arg", ["test", "", None, 1.1])
def test_fail__valid_hid_mouse_move(arg: Any) -> None:
with pytest.raises(ValidatorError):
print(valid_hid_mouse_move(arg))
# =====
@pytest.mark.parametrize("arg", ["LEFT ", "RIGHT "])
def test_ok__valid_hid_mouse_button(arg: Any) -> None:
assert valid_hid_mouse_button(arg) == arg.strip().lower()
@pytest.mark.parametrize("arg", ["test", "", None])
def test_fail__valid_hid_mouse_button(arg: Any) -> None:
with pytest.raises(ValidatorError):
print(valid_hid_mouse_button(arg))
# =====
@pytest.mark.parametrize("arg", [-100, "1 ", "-1", 1, -1, 0, "100 "])
def test_ok__valid_hid_mouse_wheel(arg: Any) -> None:
assert valid_hid_mouse_wheel(arg) == int(str(arg).strip())
def test_ok__valid_hid_mouse_wheel__m200() -> None:
assert valid_hid_mouse_wheel(-200) == -128
def test_ok__valid_hid_mouse_wheel__p200() -> None:
assert valid_hid_mouse_wheel(200) == 127
@pytest.mark.parametrize("arg", ["test", "", None, 1.1])
def test_fail__valid_hid_mouse_wheel(arg: Any) -> None:
with pytest.raises(ValidatorError):
print(valid_hid_mouse_wheel(arg))