mirror of
https://github.com/mofeng-git/One-KVM.git
synced 2026-01-29 00:51:53 +08:00
pikvm/pikvm#57: Mouse jiggler
This commit is contained in:
@@ -91,6 +91,7 @@ class HidApi:
|
||||
for (key, validator) in [
|
||||
("keyboard_output", valid_hid_keyboard_output),
|
||||
("mouse_output", valid_hid_mouse_output),
|
||||
("jiggler", valid_bool),
|
||||
]
|
||||
if request.query.get(key) is not None
|
||||
}
|
||||
|
||||
@@ -20,6 +20,9 @@
|
||||
# ========================================================================== #
|
||||
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
|
||||
from typing import Iterable
|
||||
from typing import AsyncGenerator
|
||||
|
||||
@@ -29,6 +32,11 @@ from .. import get_plugin_class
|
||||
|
||||
# =====
|
||||
class BaseHid(BasePlugin):
|
||||
def __init__(self) -> None:
|
||||
self.__jiggler_enabled = False
|
||||
self.__jiggler_absolute = True
|
||||
self.__activity_ts = 0
|
||||
|
||||
def sysprep(self) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
@@ -64,9 +72,14 @@ class BaseHid(BasePlugin):
|
||||
def send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
def set_params(self, keyboard_output: (str | None)=None, mouse_output: (str | None)=None) -> None:
|
||||
_ = keyboard_output
|
||||
_ = mouse_output
|
||||
def set_params(
|
||||
self,
|
||||
keyboard_output: (str | None)=None,
|
||||
mouse_output: (str | None)=None,
|
||||
jiggler: (bool | None)=None,
|
||||
) -> None:
|
||||
|
||||
raise NotImplementedError
|
||||
|
||||
def set_connected(self, connected: bool) -> None:
|
||||
_ = connected
|
||||
@@ -74,6 +87,31 @@ class BaseHid(BasePlugin):
|
||||
def clear_events(self) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
# =====
|
||||
|
||||
async def systask(self) -> None:
|
||||
factor = 1
|
||||
while True:
|
||||
if self.__jiggler_enabled and (self.__activity_ts + 60 < int(time.monotonic())):
|
||||
if self.__jiggler_absolute:
|
||||
self.send_mouse_move_event(100 * factor, 100 * factor)
|
||||
else:
|
||||
self.send_mouse_relative_event(10 * factor, 10 * factor)
|
||||
factor *= -1
|
||||
await asyncio.sleep(1)
|
||||
|
||||
def _bump_activity(self) -> None:
|
||||
self.__activity_ts = int(time.monotonic())
|
||||
|
||||
def _set_jiggler_absolute(self, absolute: bool) -> None:
|
||||
self.__jiggler_absolute = absolute
|
||||
|
||||
def _set_jiggler_enabled(self, enabled: bool) -> None:
|
||||
self.__jiggler_enabled = enabled
|
||||
|
||||
def _get_jiggler_state(self) -> dict:
|
||||
return {"enabled": self.__jiggler_enabled}
|
||||
|
||||
|
||||
# =====
|
||||
def get_hid_class(name: str) -> type[BaseHid]:
|
||||
|
||||
@@ -117,6 +117,7 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
|
||||
**gpio_kwargs: Any,
|
||||
) -> None:
|
||||
|
||||
BaseHid.__init__(self)
|
||||
multiprocessing.Process.__init__(self, daemon=True)
|
||||
|
||||
self.__read_retries = read_retries
|
||||
@@ -177,6 +178,7 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
|
||||
active_mouse = get_active_mouse(outputs1)
|
||||
if online and active_mouse in ["usb_rel", "ps2"]:
|
||||
absolute = False
|
||||
self._set_jiggler_absolute(absolute)
|
||||
|
||||
keyboard_outputs: dict = {"available": [], "active": ""}
|
||||
mouse_outputs: dict = {"available": [], "active": ""}
|
||||
@@ -224,6 +226,7 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
|
||||
"absolute": absolute,
|
||||
"outputs": mouse_outputs,
|
||||
},
|
||||
"jiggler": self._get_jiggler_state(),
|
||||
}
|
||||
|
||||
async def poll_state(self) -> AsyncGenerator[dict, None]:
|
||||
@@ -251,20 +254,31 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
|
||||
def send_key_events(self, keys: Iterable[tuple[str, bool]]) -> None:
|
||||
for (key, state) in keys:
|
||||
self.__queue_event(KeyEvent(key, state))
|
||||
self._bump_activity()
|
||||
|
||||
def send_mouse_button_event(self, button: str, state: bool) -> None:
|
||||
self.__queue_event(MouseButtonEvent(button, state))
|
||||
self._bump_activity()
|
||||
|
||||
def send_mouse_move_event(self, to_x: int, to_y: int) -> None:
|
||||
self.__queue_event(MouseMoveEvent(to_x, to_y))
|
||||
self._bump_activity()
|
||||
|
||||
def send_mouse_relative_event(self, delta_x: int, delta_y: int) -> None:
|
||||
self.__queue_event(MouseRelativeEvent(delta_x, delta_y))
|
||||
self._bump_activity()
|
||||
|
||||
def send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None:
|
||||
self.__queue_event(MouseWheelEvent(delta_x, delta_y))
|
||||
self._bump_activity()
|
||||
|
||||
def set_params(
|
||||
self,
|
||||
keyboard_output: (str | None)=None,
|
||||
mouse_output: (str | None)=None,
|
||||
jiggler: (bool | None)=None,
|
||||
) -> None:
|
||||
|
||||
def set_params(self, keyboard_output: (str | None)=None, mouse_output: (str | None)=None) -> None:
|
||||
events: list[BaseEvent] = []
|
||||
if keyboard_output is not None:
|
||||
events.append(SetKeyboardOutputEvent(keyboard_output))
|
||||
@@ -272,12 +286,16 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
|
||||
events.append(SetMouseOutputEvent(mouse_output))
|
||||
for (index, event) in enumerate(events, 1):
|
||||
self.__queue_event(event, clear=(index == len(events)))
|
||||
if jiggler is not None:
|
||||
self._set_jiggler_enabled(jiggler)
|
||||
self.__notifier.notify()
|
||||
|
||||
def set_connected(self, connected: bool) -> None:
|
||||
self.__queue_event(SetConnectedEvent(connected), clear=True)
|
||||
|
||||
def clear_events(self) -> None:
|
||||
self.__queue_event(ClearEvent(), clear=True)
|
||||
self._bump_activity()
|
||||
|
||||
def __queue_event(self, event: BaseEvent, clear: bool=False) -> None:
|
||||
if not self.__stop_event.is_set():
|
||||
|
||||
@@ -59,7 +59,7 @@ class Plugin(BaseHid): # pylint: disable=too-many-instance-attributes
|
||||
# https://gist.github.com/whitelynx/9f9bd4cb266b3924c64dfdff14bce2e8
|
||||
# https://archlinuxarm.org/forum/viewtopic.php?f=67&t=14244
|
||||
|
||||
def __init__( # pylint: disable=too-many-arguments,too-many-locals,super-init-not-called
|
||||
def __init__( # pylint: disable=too-many-arguments,too-many-locals
|
||||
self,
|
||||
manufacturer: str,
|
||||
product: str,
|
||||
@@ -78,6 +78,9 @@ class Plugin(BaseHid): # pylint: disable=too-many-instance-attributes
|
||||
select_timeout: float,
|
||||
) -> None:
|
||||
|
||||
super().__init__()
|
||||
self._set_jiggler_absolute(False)
|
||||
|
||||
self.__proc: (multiprocessing.Process | None) = None
|
||||
self.__stop_event = multiprocessing.Event()
|
||||
|
||||
@@ -146,6 +149,7 @@ class Plugin(BaseHid): # pylint: disable=too-many-instance-attributes
|
||||
"absolute": False,
|
||||
"outputs": outputs,
|
||||
},
|
||||
"jiggler": self._get_jiggler_state(),
|
||||
}
|
||||
|
||||
async def poll_state(self) -> AsyncGenerator[dict, None]:
|
||||
@@ -175,18 +179,36 @@ class Plugin(BaseHid): # pylint: disable=too-many-instance-attributes
|
||||
def send_key_events(self, keys: Iterable[tuple[str, bool]]) -> None:
|
||||
for (key, state) in keys:
|
||||
self.__server.queue_event(make_keyboard_event(key, state))
|
||||
self._bump_activity()
|
||||
|
||||
def send_mouse_button_event(self, button: str, state: bool) -> None:
|
||||
self.__server.queue_event(MouseButtonEvent(button, state))
|
||||
self._bump_activity()
|
||||
|
||||
def send_mouse_relative_event(self, delta_x: int, delta_y: int) -> None:
|
||||
self.__server.queue_event(MouseRelativeEvent(delta_x, delta_y))
|
||||
self._bump_activity()
|
||||
|
||||
def send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None:
|
||||
self.__server.queue_event(MouseWheelEvent(delta_x, delta_y))
|
||||
self._bump_activity()
|
||||
|
||||
def clear_events(self) -> None:
|
||||
self.__server.clear_events()
|
||||
self._bump_activity()
|
||||
|
||||
def set_params(
|
||||
self,
|
||||
keyboard_output: (str | None)=None,
|
||||
mouse_output: (str | None)=None,
|
||||
jiggler: (bool | None)=None,
|
||||
) -> None:
|
||||
|
||||
_ = keyboard_output
|
||||
_ = mouse_output
|
||||
if jiggler is not None:
|
||||
self._set_jiggler_enabled(jiggler)
|
||||
self.__notifier.notify()
|
||||
|
||||
# =====
|
||||
|
||||
|
||||
@@ -58,6 +58,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
|
||||
read_timeout: float,
|
||||
) -> None:
|
||||
|
||||
BaseHid.__init__(self)
|
||||
multiprocessing.Process.__init__(self, daemon=True)
|
||||
|
||||
self.__device_path = device_path
|
||||
@@ -112,6 +113,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
|
||||
"active": ("usb" if absolute else "usb_rel"),
|
||||
},
|
||||
},
|
||||
"jiggler": self._get_jiggler_state(),
|
||||
}
|
||||
|
||||
async def poll_state(self) -> AsyncGenerator[dict, None]:
|
||||
@@ -139,23 +141,40 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
|
||||
def send_key_events(self, keys: Iterable[tuple[str, bool]]) -> None:
|
||||
for (key, state) in keys:
|
||||
self.__queue_cmd(self.__keyboard.process_key(key, state))
|
||||
self._bump_activity()
|
||||
|
||||
def send_mouse_button_event(self, button: str, state: bool) -> None:
|
||||
self.__queue_cmd(self.__mouse.process_button(button, state))
|
||||
self._bump_activity()
|
||||
|
||||
def send_mouse_move_event(self, to_x: int, to_y: int) -> None:
|
||||
self.__queue_cmd(self.__mouse.process_move(to_x, to_y))
|
||||
self._bump_activity()
|
||||
|
||||
def send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None:
|
||||
self.__queue_cmd(self.__mouse.process_wheel(delta_x, delta_y))
|
||||
self._bump_activity()
|
||||
|
||||
def send_mouse_relative_event(self, delta_x: int, delta_y: int) -> None:
|
||||
self.__queue_cmd(self.__mouse.process_relative(delta_x, delta_y))
|
||||
self._bump_activity()
|
||||
|
||||
def set_params(self, keyboard_output: (str | None)=None, mouse_output: (str | None)=None) -> None:
|
||||
def set_params(
|
||||
self,
|
||||
keyboard_output: (str | None)=None,
|
||||
mouse_output: (str | None)=None,
|
||||
jiggler: (bool | None)=None,
|
||||
) -> None:
|
||||
|
||||
_ = keyboard_output
|
||||
if mouse_output is not None:
|
||||
get_logger(0).info("HID : mouse output = %s", mouse_output)
|
||||
self.__mouse.set_absolute(mouse_output == "usb")
|
||||
absolute = (mouse_output == "usb")
|
||||
self.__mouse.set_absolute(absolute)
|
||||
self._set_jiggler_absolute(absolute)
|
||||
self.__notifier.notify()
|
||||
if jiggler is not None:
|
||||
self._set_jiggler_enabled(jiggler)
|
||||
self.__notifier.notify()
|
||||
|
||||
def set_connected(self, connected: bool) -> None:
|
||||
|
||||
@@ -44,7 +44,7 @@ from .mouse import MouseProcess
|
||||
|
||||
# =====
|
||||
class Plugin(BaseHid): # pylint: disable=too-many-instance-attributes
|
||||
def __init__( # pylint: disable=super-init-not-called
|
||||
def __init__(
|
||||
self,
|
||||
keyboard: dict[str, Any],
|
||||
mouse: dict[str, Any],
|
||||
@@ -53,6 +53,8 @@ class Plugin(BaseHid): # pylint: disable=too-many-instance-attributes
|
||||
udc: str, # XXX: Not from options, see /kvmd/apps/kvmd/__init__.py for details
|
||||
) -> None:
|
||||
|
||||
super().__init__()
|
||||
|
||||
self.__udc = udc
|
||||
|
||||
self.__notifier = aiomulti.AioProcessNotifier()
|
||||
@@ -80,6 +82,8 @@ class Plugin(BaseHid): # pylint: disable=too-many-instance-attributes
|
||||
# но так было проще реализовать переключение режимов
|
||||
self.__mouses["usb_win98"] = self.__mouses["usb"]
|
||||
|
||||
self._set_jiggler_absolute(self.__mouse_current.is_absolute())
|
||||
|
||||
@classmethod
|
||||
def get_plugin_options(cls) -> dict:
|
||||
return {
|
||||
@@ -141,6 +145,7 @@ class Plugin(BaseHid): # pylint: disable=too-many-instance-attributes
|
||||
},
|
||||
**mouse_state,
|
||||
},
|
||||
"jiggler": self._get_jiggler_state(),
|
||||
}
|
||||
|
||||
async def poll_state(self) -> AsyncGenerator[dict, None]:
|
||||
@@ -172,25 +177,40 @@ class Plugin(BaseHid): # pylint: disable=too-many-instance-attributes
|
||||
|
||||
def send_key_events(self, keys: Iterable[tuple[str, bool]]) -> None:
|
||||
self.__keyboard_proc.send_key_events(keys)
|
||||
self._bump_activity()
|
||||
|
||||
def send_mouse_button_event(self, button: str, state: bool) -> None:
|
||||
self.__mouse_current.send_button_event(button, state)
|
||||
self._bump_activity()
|
||||
|
||||
def send_mouse_move_event(self, to_x: int, to_y: int) -> None:
|
||||
self.__mouse_current.send_move_event(to_x, to_y)
|
||||
self._bump_activity()
|
||||
|
||||
def send_mouse_relative_event(self, delta_x: int, delta_y: int) -> None:
|
||||
self.__mouse_current.send_relative_event(delta_x, delta_y)
|
||||
self._bump_activity()
|
||||
|
||||
def send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None:
|
||||
self.__mouse_current.send_wheel_event(delta_x, delta_y)
|
||||
self._bump_activity()
|
||||
|
||||
def set_params(
|
||||
self,
|
||||
keyboard_output: (str | None)=None,
|
||||
mouse_output: (str | None)=None,
|
||||
jiggler: (bool | None)=None,
|
||||
) -> None:
|
||||
|
||||
def set_params(self, keyboard_output: (str | None)=None, mouse_output: (str | None)=None) -> None:
|
||||
_ = keyboard_output
|
||||
if mouse_output in self.__mouses and mouse_output != self.__get_current_mouse_mode():
|
||||
self.__mouse_current.send_clear_event()
|
||||
self.__mouse_current = self.__mouses[mouse_output]
|
||||
self.__mouse_current.set_win98_fix(mouse_output == "usb_win98")
|
||||
self._set_jiggler_absolute(self.__mouse_current.is_absolute())
|
||||
self.__notifier.notify()
|
||||
if jiggler is not None:
|
||||
self._set_jiggler_enabled(jiggler)
|
||||
self.__notifier.notify()
|
||||
|
||||
def clear_events(self) -> None:
|
||||
@@ -198,6 +218,7 @@ class Plugin(BaseHid): # pylint: disable=too-many-instance-attributes
|
||||
self.__mouse_proc.send_clear_event()
|
||||
if self.__mouse_alt_proc:
|
||||
self.__mouse_alt_proc.send_clear_event()
|
||||
self._bump_activity()
|
||||
|
||||
# =====
|
||||
|
||||
|
||||
Reference in New Issue
Block a user