mirror of
https://github.com/mofeng-git/One-KVM.git
synced 2026-01-29 09:01:54 +08:00
@@ -40,7 +40,7 @@ from ....validators.os import valid_printable_filename
|
||||
from ....validators.kvm import valid_hid_key
|
||||
from ....validators.kvm import valid_hid_mouse_move
|
||||
from ....validators.kvm import valid_hid_mouse_button
|
||||
from ....validators.kvm import valid_hid_mouse_wheel
|
||||
from ....validators.kvm import valid_hid_mouse_delta
|
||||
|
||||
from ....keyboard.keysym import build_symmap
|
||||
from ....keyboard.printer import text_to_web_keys
|
||||
@@ -142,11 +142,20 @@ class HidApi:
|
||||
return
|
||||
self.__hid.send_mouse_move_event(to_x, to_y)
|
||||
|
||||
@exposed_ws("mouse_relative")
|
||||
async def __ws_mouse_relative_handler(self, _: WebSocketResponse, event: Dict) -> None:
|
||||
try:
|
||||
delta_x = valid_hid_mouse_delta(event["delta"]["x"])
|
||||
delta_y = valid_hid_mouse_delta(event["delta"]["y"])
|
||||
except Exception:
|
||||
return
|
||||
self.__hid.send_mouse_relative_event(delta_x, delta_y)
|
||||
|
||||
@exposed_ws("mouse_wheel")
|
||||
async def __ws_mouse_wheel_handler(self, _: WebSocketResponse, event: Dict) -> None:
|
||||
try:
|
||||
delta_x = valid_hid_mouse_wheel(event["delta"]["x"])
|
||||
delta_y = valid_hid_mouse_wheel(event["delta"]["y"])
|
||||
delta_x = valid_hid_mouse_delta(event["delta"]["x"])
|
||||
delta_y = valid_hid_mouse_delta(event["delta"]["y"])
|
||||
except Exception:
|
||||
return
|
||||
self.__hid.send_mouse_wheel_event(delta_x, delta_y)
|
||||
@@ -181,9 +190,16 @@ class HidApi:
|
||||
self.__hid.send_mouse_move_event(to_x, to_y)
|
||||
return make_json_response()
|
||||
|
||||
@exposed_http("POST", "/hid/events/send_mouse_relative")
|
||||
async def __events_send_mouse_relative_handler(self, request: Request) -> Response:
|
||||
delta_x = valid_hid_mouse_delta(request.query.get("delta_x"))
|
||||
delta_y = valid_hid_mouse_delta(request.query.get("delta_y"))
|
||||
self.__hid.send_mouse_relative_event(delta_x, delta_y)
|
||||
return make_json_response()
|
||||
|
||||
@exposed_http("POST", "/hid/events/send_mouse_wheel")
|
||||
async def __events_send_mouse_wheel(self, request: Request) -> Response:
|
||||
delta_x = valid_hid_mouse_wheel(request.query.get("delta_x"))
|
||||
delta_y = valid_hid_mouse_wheel(request.query.get("delta_y"))
|
||||
async def __events_send_mouse_wheel_handler(self, request: Request) -> Response:
|
||||
delta_x = valid_hid_mouse_delta(request.query.get("delta_x"))
|
||||
delta_y = valid_hid_mouse_delta(request.query.get("delta_y"))
|
||||
self.__hid.send_mouse_wheel_event(delta_x, delta_y)
|
||||
return make_json_response()
|
||||
|
||||
@@ -43,7 +43,8 @@ from .. import init
|
||||
|
||||
from .hid import Hid
|
||||
from .hid.keyboard import KEYBOARD_HID
|
||||
from .hid.mouse import MOUSE_HID
|
||||
from .hid.mouse import MOUSE_ABSOLUTE_HID
|
||||
from .hid.mouse import MOUSE_RELATIVE_HID
|
||||
|
||||
|
||||
# =====
|
||||
@@ -203,7 +204,10 @@ def _cmd_start(config: Section) -> None:
|
||||
if config.kvmd.hid.type == "otg":
|
||||
logger.info("===== Required HID =====")
|
||||
_create_hid(gadget_path, config_path, 0, KEYBOARD_HID)
|
||||
_create_hid(gadget_path, config_path, 1, MOUSE_HID)
|
||||
if config.kvmd.hid.mouse.absolute:
|
||||
_create_hid(gadget_path, config_path, 1, MOUSE_ABSOLUTE_HID)
|
||||
else:
|
||||
_create_hid(gadget_path, config_path, 1, MOUSE_RELATIVE_HID)
|
||||
|
||||
if config.kvmd.msd.type == "otg":
|
||||
logger.info("===== Required MSD =====")
|
||||
|
||||
@@ -24,7 +24,7 @@ from . import Hid
|
||||
|
||||
|
||||
# =====
|
||||
MOUSE_HID = Hid(
|
||||
MOUSE_ABSOLUTE_HID = Hid(
|
||||
protocol=0, # None protocol
|
||||
subclass=0, # No subclass
|
||||
|
||||
@@ -84,3 +84,54 @@ MOUSE_HID = Hid(
|
||||
0xC0, # END_COLLECTION
|
||||
]),
|
||||
)
|
||||
|
||||
MOUSE_RELATIVE_HID = Hid(
|
||||
protocol=2, # Mouse protocol
|
||||
subclass=1, # Boot interface subclass
|
||||
|
||||
report_length=5,
|
||||
|
||||
report_descriptor=bytes([
|
||||
# https://github.com/NicoHood/HID/blob/0835e6a/src/SingleReport/BootMouse.cpp
|
||||
|
||||
# Relative mouse
|
||||
0x05, 0x01, # USAGE_PAGE (Generic Desktop)
|
||||
0x09, 0x02, # USAGE (Mouse)
|
||||
0xA1, 0x01, # COLLECTION (Application)
|
||||
|
||||
# 8 Buttons
|
||||
0x05, 0x09, # USAGE_PAGE (Button)
|
||||
0x19, 0x01, # USAGE_MINIMUM (Button 1)
|
||||
0x29, 0x08, # USAGE_MAXIMUM (Button 8)
|
||||
0x15, 0x00, # LOGICAL_MINIMUM (0)
|
||||
0x25, 0x01, # LOGICAL_MAXIMUM (1)
|
||||
0x95, 0x08, # REPORT_COUNT (8)
|
||||
0x75, 0x01, # REPORT_SIZE (1)
|
||||
0x81, 0x02, # INPUT (Data,Var,Abs)
|
||||
|
||||
# X, Y
|
||||
0x05, 0x01, # USAGE_PAGE (Generic Desktop)
|
||||
0x09, 0x30, # USAGE (X)
|
||||
0x09, 0x31, # USAGE (Y)
|
||||
|
||||
# Wheel
|
||||
0x09, 0x38, # USAGE (Wheel)
|
||||
0x15, 0x81, # LOGICAL_MINIMUM (-127)
|
||||
0x25, 0x7F, # LOGICAL_MAXIMUM (127)
|
||||
0x75, 0x08, # REPORT_SIZE (8)
|
||||
0x95, 0x03, # REPORT_COUNT (3)
|
||||
0x81, 0x06, # INPUT (Data,Var,Rel)
|
||||
|
||||
# Horizontal wheel
|
||||
0x05, 0x0C, # USAGE PAGE (Consumer Devices)
|
||||
0x0A, 0x38, 0x02, # USAGE (AC Pan)
|
||||
0x15, 0x81, # LOGICAL_MINIMUM (-127)
|
||||
0x25, 0x7F, # LOGICAL_MAXIMUM (127)
|
||||
0x75, 0x08, # REPORT_SIZE (8)
|
||||
0x95, 0x01, # REPORT_COUNT (1)
|
||||
0x81, 0x06, # INPUT (Data,Var,Rel)
|
||||
|
||||
# End
|
||||
0xC0, # END_COLLECTION
|
||||
]),
|
||||
)
|
||||
|
||||
@@ -59,6 +59,9 @@ class BaseHid(BasePlugin):
|
||||
def send_mouse_move_event(self, to_x: int, to_y: int) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
def send_mouse_relative_event(self, delta_x: int, delta_y: int) -> None:
|
||||
pass # FIXME: SPI
|
||||
|
||||
def send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
@@ -75,6 +75,7 @@ class Plugin(BaseHid):
|
||||
"write_retries": Option(5, type=valid_int_f1),
|
||||
"write_retries_delay": Option(0.1, type=valid_float_f01),
|
||||
"reopen_delay": Option(0.5, type=valid_float_f01),
|
||||
"absolute": Option(True, type=valid_bool),
|
||||
},
|
||||
"noop": Option(False, type=valid_bool),
|
||||
}
|
||||
@@ -130,6 +131,9 @@ class Plugin(BaseHid):
|
||||
def send_mouse_move_event(self, to_x: int, to_y: int) -> None:
|
||||
self.__mouse_proc.send_move_event(to_x, to_y)
|
||||
|
||||
def send_mouse_relative_event(self, delta_x: int, delta_y: int) -> None:
|
||||
self.__mouse_proc.send_relative_event(delta_x, delta_y)
|
||||
|
||||
def send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None:
|
||||
self.__mouse_proc.send_wheel_event(delta_x, delta_y)
|
||||
|
||||
|
||||
@@ -23,6 +23,7 @@
|
||||
import struct
|
||||
import dataclasses
|
||||
|
||||
from typing import Optional
|
||||
from typing import Any
|
||||
|
||||
from ....logging import get_logger
|
||||
@@ -52,6 +53,12 @@ class _MoveEvent(BaseEvent):
|
||||
to_y: int
|
||||
|
||||
|
||||
@dataclasses.dataclass(frozen=True)
|
||||
class _RelativeEvent(BaseEvent):
|
||||
delta_x: int
|
||||
delta_y: int
|
||||
|
||||
|
||||
@dataclasses.dataclass(frozen=True)
|
||||
class _WheelEvent(BaseEvent):
|
||||
delta_x: int
|
||||
@@ -61,21 +68,26 @@ class _WheelEvent(BaseEvent):
|
||||
# =====
|
||||
class MouseProcess(BaseDeviceProcess):
|
||||
def __init__(self, **kwargs: Any) -> None:
|
||||
self.__absolute: bool = kwargs.pop("absolute")
|
||||
|
||||
super().__init__(
|
||||
name="mouse",
|
||||
read_size=0,
|
||||
initial_state={},
|
||||
initial_state={"absolute": self.__absolute}, # Just for the state
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
self.__pressed_buttons: int = 0
|
||||
self.__x = 0
|
||||
self.__x = 0 # For absolute
|
||||
self.__y = 0
|
||||
|
||||
def cleanup(self) -> None:
|
||||
self._stop()
|
||||
get_logger().info("Clearing HID-mouse events ...")
|
||||
report = self.__make_report(0, self.__x, self.__y, 0, 0)
|
||||
if self.__absolute:
|
||||
report = self.__make_report(0, self.__x, self.__y, 0, 0)
|
||||
else:
|
||||
report = self.__make_report(0, 0, 0, 0, 0)
|
||||
self._ensure_write(report, close=True) # Release all buttons
|
||||
|
||||
def send_clear_event(self) -> None:
|
||||
@@ -97,11 +109,18 @@ class MouseProcess(BaseDeviceProcess):
|
||||
self._queue_event(_ButtonEvent(code, state))
|
||||
|
||||
def send_move_event(self, to_x: int, to_y: int) -> None:
|
||||
assert -32768 <= to_x <= 32767
|
||||
assert -32768 <= to_y <= 32767
|
||||
to_x = (to_x + 32768) // 2
|
||||
to_y = (to_y + 32768) // 2
|
||||
self._queue_event(_MoveEvent(to_x, to_y))
|
||||
if self.__absolute:
|
||||
assert -32768 <= to_x <= 32767
|
||||
assert -32768 <= to_y <= 32767
|
||||
to_x = (to_x + 32768) // 2
|
||||
to_y = (to_y + 32768) // 2
|
||||
self._queue_event(_MoveEvent(to_x, to_y))
|
||||
|
||||
def send_relative_event(self, delta_x: int, delta_y: int) -> None:
|
||||
if not self.__absolute:
|
||||
assert -127 <= delta_x <= 127
|
||||
assert -127 <= delta_y <= 127
|
||||
self._queue_event(_RelativeEvent(delta_x, delta_y))
|
||||
|
||||
def send_wheel_event(self, delta_x: int, delta_y: int) -> None:
|
||||
assert -127 <= delta_x <= 127
|
||||
@@ -119,6 +138,8 @@ class MouseProcess(BaseDeviceProcess):
|
||||
return self.__process_button_event(event)
|
||||
elif isinstance(event, _MoveEvent):
|
||||
return self.__process_move_event(event)
|
||||
elif isinstance(event, _RelativeEvent):
|
||||
return self.__process_relative_event(event)
|
||||
elif isinstance(event, _WheelEvent):
|
||||
return self.__process_wheel_event(event)
|
||||
raise RuntimeError(f"Not implemented event: {event}")
|
||||
@@ -144,19 +165,40 @@ class MouseProcess(BaseDeviceProcess):
|
||||
self.__y = event.to_y
|
||||
return self.__send_current_state()
|
||||
|
||||
def __process_relative_event(self, event: _RelativeEvent) -> bool:
|
||||
return self.__send_current_state(relative_event=event)
|
||||
|
||||
def __process_wheel_event(self, event: _WheelEvent) -> bool:
|
||||
return self.__send_current_state(event.delta_x, event.delta_y)
|
||||
return self.__send_current_state(wheel_event=event)
|
||||
|
||||
# =====
|
||||
|
||||
def __send_current_state(self, delta_x: int=0, delta_y: int=0, reopen: bool=False) -> bool:
|
||||
report = self.__make_report(
|
||||
buttons=self.__pressed_buttons,
|
||||
to_x=self.__x,
|
||||
to_y=self.__y,
|
||||
delta_x=delta_x,
|
||||
delta_y=delta_y,
|
||||
)
|
||||
def __send_current_state(
|
||||
self,
|
||||
relative_event: Optional[_RelativeEvent]=None,
|
||||
wheel_event: Optional[_WheelEvent]=None,
|
||||
reopen: bool=False,
|
||||
) -> bool:
|
||||
|
||||
if self.__absolute:
|
||||
assert relative_event is None
|
||||
move_x = self.__x
|
||||
move_y = self.__y
|
||||
else:
|
||||
assert self.__x == self.__y == 0
|
||||
if relative_event is not None:
|
||||
move_x = relative_event.delta_x
|
||||
move_y = relative_event.delta_y
|
||||
else:
|
||||
move_x = move_y = 0
|
||||
|
||||
if wheel_event is not None:
|
||||
wheel_x = wheel_event.delta_x
|
||||
wheel_y = wheel_event.delta_y
|
||||
else:
|
||||
wheel_x = wheel_y = 0
|
||||
|
||||
report = self.__make_report(self.__pressed_buttons, move_x, move_y, wheel_x, wheel_y)
|
||||
if not self._ensure_write(report, reopen=reopen):
|
||||
self.__clear_state()
|
||||
return False
|
||||
@@ -167,7 +209,7 @@ class MouseProcess(BaseDeviceProcess):
|
||||
self.__x = 0
|
||||
self.__y = 0
|
||||
|
||||
def __make_report(self, buttons: int, to_x: int, to_y: int, delta_x: int, delta_y: int) -> bytes:
|
||||
# XXX: Delta Y before X: it's ok.
|
||||
# See /kvmd/apps/otg/hid/keyboard.py for details
|
||||
return struct.pack("<BHHbb", buttons, to_x, to_y, delta_y, delta_x)
|
||||
def __make_report(self, buttons: int, move_x: int, move_y: int, wheel_x: int, wheel_y: int) -> bytes:
|
||||
# XXX: Wheel Y before X: it's ok.
|
||||
# See /kvmd/apps/otg/hid/mouse.py for details
|
||||
return struct.pack(("<BHHbb" if self.__absolute else "<Bbbbb"), buttons, move_x, move_y, wheel_y, wheel_x)
|
||||
|
||||
@@ -96,8 +96,8 @@ def valid_hid_mouse_button(arg: Any) -> str:
|
||||
return check_string_in_list(arg, "HID mouse button", ["left", "right", "middle", "up", "down"])
|
||||
|
||||
|
||||
def valid_hid_mouse_wheel(arg: Any) -> int:
|
||||
arg = valid_number(arg, name="HID mouse wheel")
|
||||
def valid_hid_mouse_delta(arg: Any) -> int:
|
||||
arg = valid_number(arg, name="HID mouse delta")
|
||||
return min(max(-127, arg), 127)
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user