multihid firmware

This commit is contained in:
Devaev Maxim
2020-11-19 23:28:23 +03:00
parent 188de71515
commit a77db72355
17 changed files with 636 additions and 379 deletions

View File

@@ -136,6 +136,19 @@ class _MouseMoveEvent(_BaseEvent):
return struct.pack(">Bhh", 0x12, self.to_x, self.to_y)
@dataclasses.dataclass(frozen=True)
class _MouseRelativeEvent(_BaseEvent):
delta_x: int
delta_y: int
def __post_init__(self) -> None:
assert -127 <= self.delta_x <= 127
assert -127 <= self.delta_y <= 127
def make_command(self) -> bytes:
return struct.pack(">Bbbxx", 0x15, self.delta_x, self.delta_y)
@dataclasses.dataclass(frozen=True)
class _MouseWheelEvent(_BaseEvent):
delta_x: int
@@ -196,12 +209,9 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
self.__notifier = aiomulti.AioProcessNotifier()
self.__state_flags = aiomulti.AioSharedFlags({
"keyboard_online": True,
"mouse_online": True,
"caps": False,
"scroll": False,
"num": False,
}, self.__notifier)
"online": 0,
"status": 0,
}, self.__notifier, type=int)
self.__stop_event = multiprocessing.Event()
@@ -226,19 +236,51 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
async def get_state(self) -> Dict:
state = await self.__state_flags.get()
online = bool(state["online"])
pong = (state["status"] >> 16) & 0xFF
outputs = (state["status"] >> 8) & 0xFF
features = state["status"] & 0xFF
absolute = True
if online and (outputs & 0b00111000) in [0b00010000, 0b00011000]:
absolute = False
keyboard_outputs: Dict = {"available": {}, "active": ""}
mouse_outputs: Dict = {"available": {}, "active": ""}
if outputs & 0b10000000: # Dynamic
if features & 0b00000001: # USB
keyboard_outputs["available"]["usb"] = {"name": "USB"}
mouse_outputs["available"]["usb_abs"] = {"name": "USB", "absolute": True}
mouse_outputs["available"]["usb_rel"] = {"name": "USB Relative", "absolute": False}
if features & 0b00000010: # PS/2
keyboard_outputs["available"]["ps2"] = {"name": "PS/2"}
mouse_outputs["available"]["ps2"] = {"name": "PS/2"}
keyboard_outputs["active"] = {
0b00000001: "usb",
0b00000011: "ps2",
}.get(outputs & 0b00000111, "")
mouse_outputs["active"] = {
0b00001000: "usb_abs",
0b00010000: "usb_rel",
0b00011000: "ps2",
}.get(outputs & 0b00111000, "")
return {
"online": (state["keyboard_online"] and state["mouse_online"]),
"online": online,
"keyboard": {
"online": state["keyboard_online"],
"online": (online and not (pong & 0b00001000)),
"leds": {
"caps": state["caps"],
"scroll": state["scroll"],
"num": state["num"],
"caps": bool(pong & 0b00000001),
"scroll": bool(pong & 0b00000010),
"num": bool(pong & 0b00000100),
},
"outputs": keyboard_outputs,
},
"mouse": {
"online": state["mouse_online"],
"absolute": True,
"online": (online and not (pong & 0b00010000)),
"absolute": absolute,
"outputs": mouse_outputs,
},
}
@@ -287,8 +329,7 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
self.__queue_event(_MouseMoveEvent(to_x, to_y))
def send_mouse_relative_event(self, delta_x: int, delta_y: int) -> None:
_ = delta_x # No relative events yet
_ = delta_y
self.__queue_event(_MouseRelativeEvent(delta_x, delta_y))
def send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None:
self.__queue_event(_MouseWheelEvent(delta_x, delta_y))
@@ -350,8 +391,8 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
read_retries -= 1
raise _TempRequestError(f"No response from HID: request={request!r}")
assert len(response) == 4, response
if self.__make_crc16(response[-4:-2]) != struct.unpack(">H", response[-2:])[0]:
assert len(response) in (4, 8), response
if self.__make_crc16(response[:-2]) != struct.unpack(">H", response[-2:])[0]:
request = self.__make_request(b"\x02\x00\x00\x00\x00") # Repeat an answer
raise _TempRequestError("Invalid response CRC; requesting response again ...")
@@ -368,7 +409,7 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
self.__set_state_online(True)
return True
elif code & 0x80: # Pong/Done with state
self.__set_state_code(code)
self.__set_state_pong(response)
return True
raise _TempRequestError(f"Invalid response from HID: request={request!r}; code=0x{code:02X}")
@@ -401,19 +442,13 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
return error_retval
def __set_state_online(self, online: bool) -> None:
self.__state_flags.update(
keyboard_online=online,
mouse_online=online,
)
self.__state_flags.update(online=int(online))
def __set_state_code(self, code: int) -> None:
self.__state_flags.update(
keyboard_online=(not (code & 0b00001000)),
mouse_online=(not (code & 0b00010000)),
caps=bool(code & 0b00000001),
scroll=bool(code & 0b00000010),
num=bool(code & 0b00000100),
)
def __set_state_pong(self, data: bytes) -> None:
status = data[1] << 16
if len(data) > 4:
status |= (data[2] << 8) | data[3]
self.__state_flags.update(online=1, status=status)
def __send_request(self, conn: BasePhyConnection, request: bytes) -> bytes:
if not self.__noop: