simplified serial hid

This commit is contained in:
Devaev Maxim 2019-10-01 20:21:58 +03:00
parent 56ae6a5d90
commit f71c06a9a9

View File

@ -32,7 +32,6 @@ import errno
import time import time
from typing import Dict from typing import Dict
from typing import Set
from typing import AsyncGenerator from typing import AsyncGenerator
import serial import serial
@ -64,67 +63,68 @@ class _BaseEvent:
raise NotImplementedError raise NotImplementedError
@dataclasses.dataclass(frozen=True) # pylint: disable=abstract-method class _ClearEvent(_BaseEvent):
class _BoolEvent(_BaseEvent): def make_command(self) -> bytes:
name: str return b"\x10\x00\x00\x00\x00"
state: bool
@dataclasses.dataclass(frozen=True) # pylint: disable=abstract-method
class _IntEvent(_BaseEvent):
x: int
y: int
@dataclasses.dataclass(frozen=True) @dataclasses.dataclass(frozen=True)
class _KeyEvent(_BoolEvent): class _KeyEvent(_BaseEvent):
name: str
state: bool
def __post_init__(self) -> None: def __post_init__(self) -> None:
assert self.name in keymap.KEYMAP assert self.name in keymap.KEYMAP
def make_command(self) -> bytes: def make_command(self) -> bytes:
code = keymap.KEYMAP[self.name].serial.code code = keymap.KEYMAP[self.name].serial.code
key_bytes = bytes([code]) return struct.pack(">BBBxx", 0x11, code, int(self.state))
assert len(key_bytes) == 1, (self, key_bytes, code)
state_bytes = (b"\x01" if self.state else b"\x00")
return b"\x11" + key_bytes + state_bytes + b"\x00\x00"
@dataclasses.dataclass(frozen=True) @dataclasses.dataclass(frozen=True)
class _MouseMoveEvent(_IntEvent): class _MouseMoveEvent(_BaseEvent):
to_x: int
to_y: int
def __post_init__(self) -> None: def __post_init__(self) -> None:
assert -32768 <= self.x <= 32767 assert -32768 <= self.to_x <= 32767
assert -32768 <= self.y <= 32767 assert -32768 <= self.to_y <= 32767
def make_command(self) -> bytes: def make_command(self) -> bytes:
return b"\x12" + struct.pack(">hh", self.x, self.y) return struct.pack(">Bhh", 0x12, self.to_x, self.to_y)
@dataclasses.dataclass(frozen=True) @dataclasses.dataclass(frozen=True)
class _MouseButtonEvent(_BoolEvent): class _MouseButtonEvent(_BaseEvent):
name: str
state: bool
def __post_init__(self) -> None: def __post_init__(self) -> None:
assert self.name in ["left", "right", "middle"] assert self.name in ["left", "right", "middle"]
def make_command(self) -> bytes: def make_command(self) -> bytes:
code = 0 (code, state_pressed) = {
if self.name == "left": "left": (0b10000000, 0b00001000),
code = (0b10000000 | (0b00001000 if self.state else 0)) "right": (0b01000000, 0b00000100),
elif self.name == "right": "middle": (0b00100000, 0b00000010),
code = (0b01000000 | (0b00000100 if self.state else 0)) }[self.name]
elif self.name == "middle": if self.state:
code = (0b00100000 | (0b00000010 if self.state else 0)) code |= state_pressed
assert code, self return struct.pack(">BBxxx", 0x13, code)
return b"\x13" + bytes([code]) + b"\x00\x00\x00"
@dataclasses.dataclass(frozen=True) @dataclasses.dataclass(frozen=True)
class _MouseWheelEvent(_IntEvent): class _MouseWheelEvent(_BaseEvent):
delta_x: int
delta_y: int
def __post_init__(self) -> None: def __post_init__(self) -> None:
assert -127 <= self.x <= 127 assert -127 <= self.delta_x <= 127
assert -127 <= self.y <= 127 assert -127 <= self.delta_y <= 127
def make_command(self) -> bytes: def make_command(self) -> bytes:
# Горизонтальная прокрутка пока не поддерживается # Горизонтальная прокрутка пока не поддерживается
return b"\x14\x00" + struct.pack(">b", self.y) + b"\x00\x00" return struct.pack(">Bxbxx", 0x14, self.delta_y)
# ===== # =====
@ -162,10 +162,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
self.__lock = asyncio.Lock() self.__lock = asyncio.Lock()
self.__pressed_keys: Set[str] = set()
self.__pressed_mouse_buttons: Set[str] = set()
self.__events_queue: multiprocessing.queues.Queue = multiprocessing.Queue() self.__events_queue: multiprocessing.queues.Queue = multiprocessing.Queue()
self.__online_shared = multiprocessing.Value("i", 1) self.__online_shared = multiprocessing.Value("i", 1)
self.__stop_event = multiprocessing.Event() self.__stop_event = multiprocessing.Event()
@ -232,71 +229,42 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
async with self.__lock: async with self.__lock:
try: try:
if self.is_alive(): if self.is_alive():
self.__unsafe_clear_events()
logger.info("Stopping HID daemon ...") logger.info("Stopping HID daemon ...")
self.__stop_event.set() self.__stop_event.set()
else:
logger.warning("Emergency cleaning up HID events ...")
self.__emergency_clear_events()
if self.exitcode is not None: if self.exitcode is not None:
self.join() self.join()
if os.path.exists(self.__device_path):
get_logger().info("Clearing HID events ...")
try:
with self.__get_serial() as tty:
self.__process_command(tty, b"\x10\x00\x00\x00\x00")
except Exception:
logger.exception("Can't clear HID events")
finally: finally:
gpio.write(self.__reset_pin, False) gpio.write(self.__reset_pin, False)
# ===== # =====
async def send_key_event(self, key: str, state: bool) -> None: async def send_key_event(self, key: str, state: bool) -> None:
await self.__send_bool_event(_KeyEvent(key, state), self.__pressed_keys) await self.__queue_event(_KeyEvent(key, state))
async def send_mouse_move_event(self, to_x: int, to_y: int) -> None: async def send_mouse_move_event(self, to_x: int, to_y: int) -> None:
await self.__send_int_event(_MouseMoveEvent(to_x, to_y)) await self.__queue_event(_MouseMoveEvent(to_x, to_y))
async def send_mouse_button_event(self, button: str, state: bool) -> None: async def send_mouse_button_event(self, button: str, state: bool) -> None:
await self.__send_bool_event(_MouseButtonEvent(button, state), self.__pressed_mouse_buttons) await self.__queue_event(_MouseButtonEvent(button, state))
async def send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None: async def send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None:
await self.__send_int_event(_MouseWheelEvent(delta_x, delta_y)) await self.__queue_event(_MouseWheelEvent(delta_x, delta_y))
async def clear_events(self) -> None: async def clear_events(self) -> None:
if not self.__stop_event.is_set(): await self.__queue_event(_ClearEvent())
async with self.__lock:
self.__unsafe_clear_events()
async def __send_bool_event(self, event: _BoolEvent, pressed: Set[str]) -> None: async def __queue_event(self, event: _BaseEvent) -> None:
if not self.__stop_event.is_set():
async with self.__lock:
if (
(event.state and (event.name not in pressed)) # Если еще не нажато
or (not event.state and (event.name in pressed)) # ... Или еще не отжато
):
if event.state:
pressed.add(event.name)
else:
pressed.remove(event.name)
self.__events_queue.put(event)
async def __send_int_event(self, event: _IntEvent) -> None:
if not self.__stop_event.is_set(): if not self.__stop_event.is_set():
async with self.__lock: async with self.__lock:
self.__events_queue.put(event) self.__events_queue.put(event)
def __unsafe_clear_events(self) -> None:
for (cls, pressed) in [
(_MouseButtonEvent, self.__pressed_mouse_buttons),
(_KeyEvent, self.__pressed_keys),
]:
for name in pressed:
self.__events_queue.put(cls(name, False))
pressed.clear()
def __emergency_clear_events(self) -> None:
if os.path.exists(self.__device_path):
try:
with self.__get_serial() as tty:
self.__process_command(tty, b"\x10\x00\x00\x00\x00")
except Exception:
get_logger().exception("Can't execute emergency clear HID events")
def run(self) -> None: # pylint: disable=too-many-branches def run(self) -> None: # pylint: disable=too-many-branches
logger = get_logger(0) logger = get_logger(0)
@ -388,6 +356,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
logger.error("Invalid response from HID: request=%r; code=0x%x", request, code) logger.error("Invalid response from HID: request=%r; code=0x%x", request, code)
common_retries -= 1 common_retries -= 1
error_occured = True error_occured = True
self.__online_shared.value = 0 self.__online_shared.value = 0