refactoring

This commit is contained in:
Maxim Devaev
2024-11-02 14:26:39 +02:00
parent 8192b1fa95
commit d6b61cb407
10 changed files with 258 additions and 234 deletions

View File

@@ -21,9 +21,11 @@
import asyncio
import functools
import time
from typing import Iterable
from typing import Callable
from typing import AsyncGenerator
from typing import Any
@@ -31,14 +33,37 @@ from ...yamlconf import Option
from ...validators.basic import valid_bool
from ...validators.basic import valid_int_f1
from ...validators.basic import valid_string_list
from ...validators.hid import valid_hid_key
from ...validators.hid import valid_hid_mouse_move
from ...mouse import MouseRange
from .. import BasePlugin
from .. import get_plugin_class
# =====
class BaseHid(BasePlugin):
def __init__(self, jiggler_enabled: bool, jiggler_active: bool, jiggler_interval: int) -> None:
class BaseHid(BasePlugin): # pylint: disable=too-many-instance-attributes
def __init__(
self,
ignore_keys: list[str],
mouse_x_min: int,
mouse_x_max: int,
mouse_y_min: int,
mouse_y_max: int,
jiggler_enabled: bool,
jiggler_active: bool,
jiggler_interval: int,
) -> None:
self.__ignore_keys = ignore_keys
self.__mouse_x_range = (mouse_x_min, mouse_x_max)
self.__mouse_y_range = (mouse_y_min, mouse_y_max)
self.__jiggler_enabled = jiggler_enabled
self.__jiggler_active = jiggler_active
self.__jiggler_interval = jiggler_interval
@@ -46,8 +71,17 @@ class BaseHid(BasePlugin):
self.__activity_ts = 0
@classmethod
def _get_jiggler_options(cls) -> dict[str, Any]:
def _get_base_options(cls) -> dict[str, Any]:
return {
"ignore_keys": Option([], type=functools.partial(valid_string_list, subval=valid_hid_key)),
"mouse_x_range": {
"min": Option(MouseRange.MIN, type=valid_hid_mouse_move, unpack_as="mouse_x_min"),
"max": Option(MouseRange.MAX, type=valid_hid_mouse_move, unpack_as="mouse_x_max"),
},
"mouse_y_range": {
"min": Option(MouseRange.MIN, type=valid_hid_mouse_move, unpack_as="mouse_y_min"),
"max": Option(MouseRange.MAX, type=valid_hid_mouse_move, unpack_as="mouse_y_max"),
},
"jiggler": {
"enabled": Option(False, type=valid_bool, unpack_as="jiggler_enabled"),
"active": Option(False, type=valid_bool, unpack_as="jiggler_active"),
@@ -76,25 +110,6 @@ class BaseHid(BasePlugin):
async def cleanup(self) -> None:
pass
# =====
def send_key_events(self, keys: Iterable[tuple[str, bool]]) -> None:
raise NotImplementedError
def send_mouse_button_event(self, button: str, state: bool) -> None:
raise NotImplementedError
def send_mouse_move_event(self, to_x: int, to_y: int) -> None:
_ = to_x
_ = to_y
def send_mouse_relative_event(self, delta_x: int, delta_y: int) -> None:
_ = delta_x
_ = delta_y
def send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None:
raise NotImplementedError
def set_params(
self,
keyboard_output: (str | None)=None,
@@ -107,25 +122,100 @@ class BaseHid(BasePlugin):
def set_connected(self, connected: bool) -> None:
_ = connected
def clear_events(self) -> None:
# =====
def send_key_events(self, keys: Iterable[tuple[str, bool]], no_ignore_keys: bool=False) -> None:
for (key, state) in keys:
if no_ignore_keys or key not in self.__ignore_keys:
self.send_key_event(key, state)
def send_key_event(self, key: str, state: bool) -> None:
self._send_key_event(key, state)
self.__bump_activity()
def _send_key_event(self, key: str, state: bool) -> None:
raise NotImplementedError
# =====
async def systask(self) -> None:
factor = 1
while True:
if self.__jiggler_active and (self.__activity_ts + self.__jiggler_interval < int(time.monotonic())):
for _ in range(5):
if self.__jiggler_absolute:
self.send_mouse_move_event(100 * factor, 100 * factor)
else:
self.send_mouse_relative_event(10 * factor, 10 * factor)
factor *= -1
await asyncio.sleep(0.1)
await asyncio.sleep(1)
def send_mouse_button_event(self, button: str, state: bool) -> None:
self._send_mouse_button_event(button, state)
self.__bump_activity()
def _bump_activity(self) -> None:
def _send_mouse_button_event(self, button: str, state: bool) -> None:
raise NotImplementedError
# =====
def send_mouse_move_event(self, to_x: int, to_y: int) -> None:
if self.__mouse_x_range != MouseRange.RANGE:
to_x = MouseRange.remap(to_x, *self.__mouse_x_range)
if self.__mouse_y_range != MouseRange.RANGE:
to_y = MouseRange.remap(to_y, *self.__mouse_y_range)
self._send_mouse_move_event(to_x, to_y)
self.__bump_activity()
def _send_mouse_move_event(self, to_x: int, to_y: int) -> None:
_ = to_x # XXX: NotImplementedError
_ = to_y
# =====
def send_mouse_relative_events(self, deltas: Iterable[tuple[int, int]], squash: bool) -> None:
self.__process_mouse_delta_event(deltas, squash, self.send_mouse_relative_event)
def send_mouse_relative_event(self, delta_x: int, delta_y: int) -> None:
self._send_mouse_relative_event(delta_x, delta_y)
self.__bump_activity()
def _send_mouse_relative_event(self, delta_x: int, delta_y: int) -> None:
_ = delta_x # XXX: NotImplementedError
_ = delta_y
# =====
def send_mouse_wheel_events(self, deltas: Iterable[tuple[int, int]], squash: bool) -> None:
self.__process_mouse_delta_event(deltas, squash, self.send_mouse_wheel_event)
def send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None:
self._send_mouse_wheel_event(delta_x, delta_y)
self.__bump_activity()
def _send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None:
raise NotImplementedError
# =====
def clear_events(self) -> None:
self._clear_events() # Don't bump activity here
def _clear_events(self) -> None:
raise NotImplementedError
# =====
def __process_mouse_delta_event(
self,
deltas: Iterable[tuple[int, int]],
squash: bool,
handler: Callable[[int, int], None],
) -> None:
if squash:
prev = (0, 0)
for cur in deltas:
if abs(prev[0] + cur[0]) > 127 or abs(prev[1] + cur[1]) > 127:
handler(*prev)
prev = cur
else:
prev = (prev[0] + cur[0], prev[1] + cur[1])
if prev[0] or prev[1]:
handler(*prev)
else:
for xy in deltas:
handler(*xy)
def __bump_activity(self) -> None:
self.__activity_ts = int(time.monotonic())
def _set_jiggler_absolute(self, absolute: bool) -> None:
@@ -144,6 +234,21 @@ class BaseHid(BasePlugin):
},
}
# =====
async def systask(self) -> None:
factor = 1
while True:
if self.__jiggler_active and (self.__activity_ts + self.__jiggler_interval < int(time.monotonic())):
for _ in range(5):
if self.__jiggler_absolute:
self.send_mouse_move_event(100 * factor, 100 * factor)
else:
self.send_mouse_relative_event(10 * factor, 10 * factor)
factor *= -1
await asyncio.sleep(0.1)
await asyncio.sleep(1)
# =====
def get_hid_class(name: str) -> type[BaseHid]:

View File

@@ -26,7 +26,6 @@ import queue
import copy
import time
from typing import Iterable
from typing import Generator
from typing import AsyncGenerator
from typing import Any
@@ -109,17 +108,22 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
def __init__( # pylint: disable=too-many-arguments,super-init-not-called
self,
phy: BasePhy,
ignore_keys: list[str],
mouse_x_range: dict[str, Any],
mouse_y_range: dict[str, Any],
jiggler: dict[str, Any],
reset_self: bool,
read_retries: int,
common_retries: int,
retries_delay: float,
errors_threshold: int,
noop: bool,
jiggler: dict[str, Any],
**gpio_kwargs: Any,
) -> None:
BaseHid.__init__(self, **jiggler)
BaseHid.__init__(self, ignore_keys=ignore_keys, **mouse_x_range, **mouse_y_range, **jiggler)
multiprocessing.Process.__init__(self, daemon=True)
self.__read_retries = read_retries
@@ -164,7 +168,7 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
"errors_threshold": Option(5, type=valid_int_f0),
"noop": Option(False, type=valid_bool),
**cls._get_jiggler_options(),
**cls._get_base_options(),
}
def sysprep(self) -> None:
@@ -259,27 +263,6 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
# =====
def send_key_events(self, keys: Iterable[tuple[str, bool]]) -> None:
for (key, state) in keys:
self.__queue_event(KeyEvent(key, state))
self._bump_activity()
def send_mouse_button_event(self, button: str, state: bool) -> None:
self.__queue_event(MouseButtonEvent(button, state))
self._bump_activity()
def send_mouse_move_event(self, to_x: int, to_y: int) -> None:
self.__queue_event(MouseMoveEvent(to_x, to_y))
self._bump_activity()
def send_mouse_relative_event(self, delta_x: int, delta_y: int) -> None:
self.__queue_event(MouseRelativeEvent(delta_x, delta_y))
self._bump_activity()
def send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None:
self.__queue_event(MouseWheelEvent(delta_x, delta_y))
self._bump_activity()
def set_params(
self,
keyboard_output: (str | None)=None,
@@ -301,9 +284,23 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
def set_connected(self, connected: bool) -> None:
self.__queue_event(SetConnectedEvent(connected), clear=True)
def clear_events(self) -> None:
def _send_key_event(self, key: str, state: bool) -> None:
self.__queue_event(KeyEvent(key, state))
def _send_mouse_button_event(self, button: str, state: bool) -> None:
self.__queue_event(MouseButtonEvent(button, state))
def _send_mouse_move_event(self, to_x: int, to_y: int) -> None:
self.__queue_event(MouseMoveEvent(to_x, to_y))
def _send_mouse_relative_event(self, delta_x: int, delta_y: int) -> None:
self.__queue_event(MouseRelativeEvent(delta_x, delta_y))
def _send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None:
self.__queue_event(MouseWheelEvent(delta_x, delta_y))
def _clear_events(self) -> None:
self.__queue_event(ClearEvent(), clear=True)
self._bump_activity()
def __queue_event(self, event: BaseEvent, clear: bool=False) -> None:
if not self.__stop_event.is_set():

View File

@@ -24,7 +24,6 @@ import multiprocessing
import copy
import time
from typing import Iterable
from typing import AsyncGenerator
from typing import Any
@@ -64,6 +63,11 @@ class Plugin(BaseHid): # pylint: disable=too-many-instance-attributes
def __init__( # pylint: disable=too-many-arguments,too-many-locals
self,
ignore_keys: list[str],
mouse_x_range: dict[str, Any],
mouse_y_range: dict[str, Any],
jiggler: dict[str, Any],
manufacturer: str,
product: str,
description: str,
@@ -79,11 +83,9 @@ class Plugin(BaseHid): # pylint: disable=too-many-instance-attributes
max_clients: int,
socket_timeout: float,
select_timeout: float,
jiggler: dict[str, Any],
) -> None:
super().__init__(**jiggler)
super().__init__(ignore_keys=ignore_keys, **mouse_x_range, **mouse_y_range, **jiggler)
self._set_jiggler_absolute(False)
self.__proc: (multiprocessing.Process | None) = None
@@ -127,7 +129,7 @@ class Plugin(BaseHid): # pylint: disable=too-many-instance-attributes
"socket_timeout": Option(5.0, type=valid_float_f01),
"select_timeout": Option(1.0, type=valid_float_f01),
**cls._get_jiggler_options(),
**cls._get_base_options(),
}
def sysprep(self) -> None:
@@ -187,27 +189,6 @@ class Plugin(BaseHid): # pylint: disable=too-many-instance-attributes
# =====
def send_key_events(self, keys: Iterable[tuple[str, bool]]) -> None:
for (key, state) in keys:
self.__server.queue_event(make_keyboard_event(key, state))
self._bump_activity()
def send_mouse_button_event(self, button: str, state: bool) -> None:
self.__server.queue_event(MouseButtonEvent(button, state))
self._bump_activity()
def send_mouse_relative_event(self, delta_x: int, delta_y: int) -> None:
self.__server.queue_event(MouseRelativeEvent(delta_x, delta_y))
self._bump_activity()
def send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None:
self.__server.queue_event(MouseWheelEvent(delta_x, delta_y))
self._bump_activity()
def clear_events(self) -> None:
self.__server.clear_events()
self._bump_activity()
def set_params(
self,
keyboard_output: (str | None)=None,
@@ -221,6 +202,21 @@ class Plugin(BaseHid): # pylint: disable=too-many-instance-attributes
self._set_jiggler_active(jiggler)
self.__notifier.notify()
def _send_key_event(self, key: str, state: bool) -> None:
self.__server.queue_event(make_keyboard_event(key, state))
def _send_mouse_button_event(self, button: str, state: bool) -> None:
self.__server.queue_event(MouseButtonEvent(button, state))
def _send_mouse_relative_event(self, delta_x: int, delta_y: int) -> None:
self.__server.queue_event(MouseRelativeEvent(delta_x, delta_y))
def _send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None:
self.__server.queue_event(MouseWheelEvent(delta_x, delta_y))
def _clear_events(self) -> None:
self.__server.clear_events()
# =====
def __server_worker(self) -> None: # pylint: disable=too-many-branches

View File

@@ -25,7 +25,6 @@ import queue
import copy
import time
from typing import Iterable
from typing import AsyncGenerator
from typing import Any
@@ -55,13 +54,17 @@ from .keyboard import Keyboard
class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-instance-attributes
def __init__( # pylint: disable=too-many-arguments,super-init-not-called
self,
ignore_keys: list[str],
mouse_x_range: dict[str, Any],
mouse_y_range: dict[str, Any],
jiggler: dict[str, Any],
device_path: str,
speed: int,
read_timeout: float,
jiggler: dict[str, Any],
) -> None:
BaseHid.__init__(self, **jiggler)
BaseHid.__init__(self, ignore_keys=ignore_keys, **mouse_x_range, **mouse_y_range, **jiggler)
multiprocessing.Process.__init__(self, daemon=True)
self.__device_path = device_path
@@ -89,7 +92,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
"device": Option("/dev/kvmd-hid", type=valid_abs_path, unpack_as="device_path"),
"speed": Option(9600, type=valid_tty_speed),
"read_timeout": Option(0.3, type=valid_float_f01),
**cls._get_jiggler_options(),
**cls._get_base_options(),
}
def sysprep(self) -> None:
@@ -146,27 +149,6 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
# =====
def send_key_events(self, keys: Iterable[tuple[str, bool]]) -> None:
for (key, state) in keys:
self.__queue_cmd(self.__keyboard.process_key(key, state))
self._bump_activity()
def send_mouse_button_event(self, button: str, state: bool) -> None:
self.__queue_cmd(self.__mouse.process_button(button, state))
self._bump_activity()
def send_mouse_move_event(self, to_x: int, to_y: int) -> None:
self.__queue_cmd(self.__mouse.process_move(to_x, to_y))
self._bump_activity()
def send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None:
self.__queue_cmd(self.__mouse.process_wheel(delta_x, delta_y))
self._bump_activity()
def send_mouse_relative_event(self, delta_x: int, delta_y: int) -> None:
self.__queue_cmd(self.__mouse.process_relative(delta_x, delta_y))
self._bump_activity()
def set_params(
self,
keyboard_output: (str | None)=None,
@@ -185,10 +167,22 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
self._set_jiggler_active(jiggler)
self.__notifier.notify()
def set_connected(self, connected: bool) -> None:
pass
def _send_key_event(self, key: str, state: bool) -> None:
self.__queue_cmd(self.__keyboard.process_key(key, state))
def clear_events(self) -> None:
def _send_mouse_button_event(self, button: str, state: bool) -> None:
self.__queue_cmd(self.__mouse.process_button(button, state))
def _send_mouse_move_event(self, to_x: int, to_y: int) -> None:
self.__queue_cmd(self.__mouse.process_move(to_x, to_y))
def _send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None:
self.__queue_cmd(self.__mouse.process_wheel(delta_x, delta_y))
def _send_mouse_relative_event(self, delta_x: int, delta_y: int) -> None:
self.__queue_cmd(self.__mouse.process_relative(delta_x, delta_y))
def _clear_events(self) -> None:
tools.clear_queue(self.__cmd_queue)
def __queue_cmd(self, cmd: bytes, clear: bool=False) -> None:

View File

@@ -22,7 +22,6 @@
import copy
from typing import Iterable
from typing import AsyncGenerator
from typing import Any
@@ -48,15 +47,20 @@ from .mouse import MouseProcess
class Plugin(BaseHid): # pylint: disable=too-many-instance-attributes
def __init__(
self,
ignore_keys: list[str],
mouse_x_range: dict[str, Any],
mouse_y_range: dict[str, Any],
jiggler: dict[str, Any],
keyboard: dict[str, Any],
mouse: dict[str, Any],
mouse_alt: dict[str, Any],
jiggler: dict[str, Any],
noop: bool,
udc: str, # XXX: Not from options, see /kvmd/apps/kvmd/__init__.py for details
) -> None:
super().__init__(**jiggler)
super().__init__(ignore_keys=ignore_keys, **mouse_x_range, **mouse_y_range, **jiggler)
self.__udc = udc
@@ -115,7 +119,7 @@ class Plugin(BaseHid): # pylint: disable=too-many-instance-attributes
"horizontal_wheel": Option(True, type=valid_bool),
},
"noop": Option(False, type=valid_bool),
**cls._get_jiggler_options(),
**cls._get_base_options(),
}
def sysprep(self) -> None:
@@ -183,26 +187,6 @@ class Plugin(BaseHid): # pylint: disable=too-many-instance-attributes
# =====
def send_key_events(self, keys: Iterable[tuple[str, bool]]) -> None:
self.__keyboard_proc.send_key_events(keys)
self._bump_activity()
def send_mouse_button_event(self, button: str, state: bool) -> None:
self.__mouse_current.send_button_event(button, state)
self._bump_activity()
def send_mouse_move_event(self, to_x: int, to_y: int) -> None:
self.__mouse_current.send_move_event(to_x, to_y)
self._bump_activity()
def send_mouse_relative_event(self, delta_x: int, delta_y: int) -> None:
self.__mouse_current.send_relative_event(delta_x, delta_y)
self._bump_activity()
def send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None:
self.__mouse_current.send_wheel_event(delta_x, delta_y)
self._bump_activity()
def set_params(
self,
keyboard_output: (str | None)=None,
@@ -221,12 +205,26 @@ class Plugin(BaseHid): # pylint: disable=too-many-instance-attributes
self._set_jiggler_active(jiggler)
self.__notifier.notify()
def clear_events(self) -> None:
def _send_key_event(self, key: str, state: bool) -> None:
self.__keyboard_proc.send_key_event(key, state)
def _send_mouse_button_event(self, button: str, state: bool) -> None:
self.__mouse_current.send_button_event(button, state)
def _send_mouse_move_event(self, to_x: int, to_y: int) -> None:
self.__mouse_current.send_move_event(to_x, to_y)
def _send_mouse_relative_event(self, delta_x: int, delta_y: int) -> None:
self.__mouse_current.send_relative_event(delta_x, delta_y)
def _send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None:
self.__mouse_current.send_wheel_event(delta_x, delta_y)
def _clear_events(self) -> None:
self.__keyboard_proc.send_clear_event()
self.__mouse_proc.send_clear_event()
if self.__mouse_alt_proc:
self.__mouse_alt_proc.send_clear_event()
self._bump_activity()
# =====

View File

@@ -20,7 +20,6 @@
# ========================================================================== #
from typing import Iterable
from typing import Generator
from typing import Any
@@ -68,9 +67,8 @@ class KeyboardProcess(BaseDeviceProcess):
self._clear_queue()
self._queue_event(ResetEvent())
def send_key_events(self, keys: Iterable[tuple[str, bool]]) -> None:
for (key, state) in keys:
self._queue_event(make_keyboard_event(key, state))
def send_key_event(self, key: str, state: bool) -> None:
self._queue_event(make_keyboard_event(key, state))
# =====