mirror of
https://github.com/mofeng-git/One-KVM.git
synced 2026-02-03 03:21:54 +08:00
refactoring
This commit is contained in:
151
kvmd/plugins/hid/_mcu/proto.py
Normal file
151
kvmd/plugins/hid/_mcu/proto.py
Normal file
@@ -0,0 +1,151 @@
|
||||
# ========================================================================== #
|
||||
# #
|
||||
# KVMD - The main Pi-KVM daemon. #
|
||||
# #
|
||||
# Copyright (C) 2018 Maxim Devaev <mdevaev@gmail.com> #
|
||||
# #
|
||||
# This program is free software: you can redistribute it and/or modify #
|
||||
# it under the terms of the GNU General Public License as published by #
|
||||
# the Free Software Foundation, either version 3 of the License, or #
|
||||
# (at your option) any later version. #
|
||||
# #
|
||||
# This program is distributed in the hope that it will be useful, #
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
|
||||
# GNU General Public License for more details. #
|
||||
# #
|
||||
# You should have received a copy of the GNU General Public License #
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
|
||||
# #
|
||||
# ========================================================================== #
|
||||
|
||||
|
||||
import dataclasses
|
||||
import struct
|
||||
|
||||
from ....keyboard.mappings import KEYMAP
|
||||
|
||||
|
||||
# =====
|
||||
class BaseEvent:
|
||||
def make_request(self) -> bytes:
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class ClearEvent(BaseEvent):
|
||||
def make_request(self) -> bytes:
|
||||
return _make_request(b"\x10\x00\x00\x00\x00")
|
||||
|
||||
|
||||
@dataclasses.dataclass(frozen=True)
|
||||
class KeyEvent(BaseEvent):
|
||||
name: str
|
||||
state: bool
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
assert self.name in KEYMAP
|
||||
|
||||
def make_request(self) -> bytes:
|
||||
code = KEYMAP[self.name].mcu.code
|
||||
return _make_request(struct.pack(">BBBxx", 0x11, code, int(self.state)))
|
||||
|
||||
|
||||
@dataclasses.dataclass(frozen=True)
|
||||
class MouseButtonEvent(BaseEvent):
|
||||
name: str
|
||||
state: bool
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
assert self.name in ["left", "right", "middle", "up", "down"]
|
||||
|
||||
def make_request(self) -> bytes:
|
||||
(code, state_pressed, is_main) = {
|
||||
"left": (0b10000000, 0b00001000, True),
|
||||
"right": (0b01000000, 0b00000100, True),
|
||||
"middle": (0b00100000, 0b00000010, True),
|
||||
"up": (0b10000000, 0b00001000, False), # Back
|
||||
"down": (0b01000000, 0b00000100, False), # Forward
|
||||
}[self.name]
|
||||
if self.state:
|
||||
code |= state_pressed
|
||||
if is_main:
|
||||
main_code = code
|
||||
extra_code = 0
|
||||
else:
|
||||
main_code = 0
|
||||
extra_code = code
|
||||
return _make_request(struct.pack(">BBBxx", 0x13, main_code, extra_code))
|
||||
|
||||
|
||||
@dataclasses.dataclass(frozen=True)
|
||||
class MouseMoveEvent(BaseEvent):
|
||||
to_x: int
|
||||
to_y: int
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
assert -32768 <= self.to_x <= 32767
|
||||
assert -32768 <= self.to_y <= 32767
|
||||
|
||||
def make_request(self) -> bytes:
|
||||
return _make_request(struct.pack(">Bhh", 0x12, self.to_x, self.to_y))
|
||||
|
||||
|
||||
@dataclasses.dataclass(frozen=True)
|
||||
class MouseRelativeEvent(BaseEvent):
|
||||
delta_x: int
|
||||
delta_y: int
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
assert -127 <= self.delta_x <= 127
|
||||
assert -127 <= self.delta_y <= 127
|
||||
|
||||
def make_request(self) -> bytes:
|
||||
return _make_request(struct.pack(">Bbbxx", 0x15, self.delta_x, self.delta_y))
|
||||
|
||||
|
||||
@dataclasses.dataclass(frozen=True)
|
||||
class MouseWheelEvent(BaseEvent):
|
||||
delta_x: int
|
||||
delta_y: int
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
assert -127 <= self.delta_x <= 127
|
||||
assert -127 <= self.delta_y <= 127
|
||||
|
||||
def make_request(self) -> bytes:
|
||||
# Горизонтальная прокрутка пока не поддерживается
|
||||
return _make_request(struct.pack(">Bxbxx", 0x14, self.delta_y))
|
||||
|
||||
|
||||
# =====
|
||||
def check_response(response: bytes) -> bool:
|
||||
assert len(response) in (4, 8), response
|
||||
return (_make_crc16(response[:-2]) == struct.unpack(">H", response[-2:])[0])
|
||||
|
||||
|
||||
def _make_request(command: bytes) -> bytes:
|
||||
assert len(command) == 5, command
|
||||
request = b"\x33" + command
|
||||
request += struct.pack(">H", _make_crc16(request))
|
||||
assert len(request) == 8, request
|
||||
return request
|
||||
|
||||
|
||||
def _make_crc16(data: bytes) -> int:
|
||||
crc = 0xFFFF
|
||||
for byte in data:
|
||||
crc = crc ^ byte
|
||||
for _ in range(8):
|
||||
if crc & 0x0001 == 0:
|
||||
crc = crc >> 1
|
||||
else:
|
||||
crc = crc >> 1
|
||||
crc = crc ^ 0xA001
|
||||
return crc
|
||||
|
||||
|
||||
# =====
|
||||
REQUEST_PING = _make_request(b"\x01\x00\x00\x00\x00")
|
||||
REQUEST_REPEAT = _make_request(b"\x02\x00\x00\x00\x00")
|
||||
|
||||
RESPONSE_LEGACY_OK = b"\x33\x20" + struct.pack(">H", _make_crc16(b"\x33\x20"))
|
||||
Reference in New Issue
Block a user