ch9329: using bytes instead of list[int]

This commit is contained in:
Maxim Devaev
2023-06-01 17:07:40 +03:00
parent ce81c872ea
commit 2730b11840
4 changed files with 30 additions and 28 deletions

View File

@@ -64,7 +64,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
self.__read_timeout = read_timeout self.__read_timeout = read_timeout
self.__reset_required_event = multiprocessing.Event() self.__reset_required_event = multiprocessing.Event()
self.__cmd_queue: "multiprocessing.Queue[list]" = multiprocessing.Queue() self.__cmd_queue: "multiprocessing.Queue[bytes]" = multiprocessing.Queue()
self.__notifier = aiomulti.AioProcessNotifier() self.__notifier = aiomulti.AioProcessNotifier()
self.__state_flags = aiomulti.AioSharedFlags({ self.__state_flags = aiomulti.AioSharedFlags({
@@ -163,7 +163,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
def clear_events(self) -> None: def clear_events(self) -> None:
tools.clear_queue(self.__cmd_queue) tools.clear_queue(self.__cmd_queue)
def __queue_cmd(self, cmd: list[int], clear: bool=False) -> None: def __queue_cmd(self, cmd: bytes, clear: bool=False) -> None:
if not self.__stop_event.is_set(): if not self.__stop_event.is_set():
if clear: if clear:
# FIXME: Если очистка производится со стороны процесса хида, то возможна гонка между # FIXME: Если очистка производится со стороны процесса хида, то возможна гонка между
@@ -196,7 +196,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
cmd = self.__cmd_queue.get(timeout=0.1) cmd = self.__cmd_queue.get(timeout=0.1)
# get_logger(0).info(f"HID : cmd = {cmd}") # get_logger(0).info(f"HID : cmd = {cmd}")
except queue.Empty: except queue.Empty:
self.__process_cmd([]) self.__process_cmd(b"")
else: else:
self.__process_cmd(cmd) self.__process_cmd(cmd)
except Exception: except Exception:
@@ -204,7 +204,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
get_logger(0).exception("Unexpected error in the HID loop") get_logger(0).exception("Unexpected error in the HID loop")
time.sleep(2) time.sleep(2)
def __process_cmd(self, cmd: list[int]) -> bool: # pylint: disable=too-many-branches def __process_cmd(self, cmd: bytes) -> bool: # pylint: disable=too-many-branches
try: try:
led_byte = self.__chip.xfer(cmd) led_byte = self.__chip.xfer(cmd)
except ChipResponseError as err: except ChipResponseError as err:

View File

@@ -34,17 +34,17 @@ class Chip:
self.__tty = serial.Serial(device_path, speed, timeout=read_timeout) self.__tty = serial.Serial(device_path, speed, timeout=read_timeout)
self.__device_path = device_path self.__device_path = device_path
def xfer(self, cmd: list[int]) -> int: def xfer(self, cmd: bytes) -> int:
self.__send(cmd) self.__send(cmd)
return self.__recv() return self.__recv()
def __send(self, cmd: list[int]) -> None: def __send(self, cmd: bytes) -> None:
# RESET = [0x00,0x0F,0x00] # RESET = [0x00,0x0F,0x00]
# GET_INFO = [0x00,0x01,0x00] # GET_INFO = [0x00,0x01,0x00]
if len(cmd) == 0: if len(cmd) == 0:
cmd = [0x00, 0x01, 0x00] cmd = b"\x00\x01\x00"
cmd = [0x57, 0xAB] + cmd cmd = b"\x57\xAB" + cmd
cmd.append(self.__make_checksum(cmd)) cmd += self.__make_checksum(cmd).to_bytes()
self.__tty.write(serial.to_bytes(cmd)) self.__tty.write(serial.to_bytes(cmd))
def __recv(self) -> int: def __recv(self) -> int:
@@ -64,5 +64,5 @@ class Chip:
# led_byte (info) response # led_byte (info) response
return (data[7] if data[3] == 0x81 else -1) return (data[7] if data[3] == 0x81 else -1)
def __make_checksum(self, cmd: (list[int] | bytes)) -> int: def __make_checksum(self, cmd: bytes) -> int:
return (sum(cmd) % 256) return (sum(cmd) % 256)

View File

@@ -46,7 +46,7 @@ class Keyboard:
async def get_leds(self) -> dict[str, bool]: async def get_leds(self) -> dict[str, bool]:
return (await self.__leds.get()) return (await self.__leds.get())
def process_key(self, key: str, state: bool) -> list[int]: def process_key(self, key: str, state: bool) -> bytes:
code = KEYMAP[key].usb.code code = KEYMAP[key].usb.code
is_modifier = KEYMAP[key].usb.is_modifier is_modifier = KEYMAP[key].usb.is_modifier
if state: if state:
@@ -65,4 +65,4 @@ class Keyboard:
] ]
for (index, code) in enumerate(self.__active_keys): for (index, code) in enumerate(self.__active_keys):
cmd[index + 5] = code cmd[index + 5] = code
return cmd return bytes(cmd)

View File

@@ -32,6 +32,8 @@ class Mouse: # pylint: disable=too-many-instance-attributes
self.__buttons = 0 self.__buttons = 0
self.__to_x = (0, 0) self.__to_x = (0, 0)
self.__to_y = (0, 0) self.__to_y = (0, 0)
self.__delta_x = 0
self.__delta_y = 0
self.__wheel_y = 0 self.__wheel_y = 0
def set_absolute(self, flag: bool) -> None: def set_absolute(self, flag: bool) -> None:
@@ -40,7 +42,7 @@ class Mouse: # pylint: disable=too-many-instance-attributes
def is_absolute(self) -> bool: def is_absolute(self) -> bool:
return self.__absolute return self.__absolute
def process_button(self, button: str, state: bool) -> list[int]: def process_button(self, button: str, state: bool) -> bytes:
code = 0x00 code = 0x00
match button: match button:
case "left": case "left":
@@ -60,11 +62,11 @@ class Mouse: # pylint: disable=too-many-instance-attributes
self.__buttons &= ~code self.__buttons &= ~code
self.__wheel_y = 0 self.__wheel_y = 0
if not self.__absolute: if not self.__absolute:
return self.__make_relative_cmd() return self.__make_relative_cmd()
else: else:
return self.__make_absolute_cmd() return self.__make_absolute_cmd()
def process_move(self, to_x: int, to_y: int) -> list[int]: def process_move(self, to_x: int, to_y: int) -> bytes:
self.__to_x = self.__fix_absolute(to_x) self.__to_x = self.__fix_absolute(to_x)
self.__to_y = self.__fix_absolute(to_y) self.__to_y = self.__fix_absolute(to_y)
self.__wheel_y = 0 self.__wheel_y = 0
@@ -75,37 +77,37 @@ class Mouse: # pylint: disable=too-many-instance-attributes
to_fixed = math.ceil(MouseRange.remap(value, 0, MouseRange.MAX) / 8) to_fixed = math.ceil(MouseRange.remap(value, 0, MouseRange.MAX) / 8)
return (to_fixed >> 8, to_fixed & 0xFF) return (to_fixed >> 8, to_fixed & 0xFF)
def process_wheel(self, delta_x: int, delta_y: int) -> list[int]: def process_wheel(self, delta_x: int, delta_y: int) -> bytes:
_ = delta_x _ = delta_x
assert -127 <= delta_y <= 127 assert -127 <= delta_y <= 127
self.__wheel_y = (1 if delta_y > 0 else 255) self.__wheel_y = (1 if delta_y > 0 else 255)
if not self.__absolute: if not self.__absolute:
return self.__make_relative_cmd() return self.__make_relative_cmd()
else: else:
return self.__make_absolute_cmd() return self.__make_absolute_cmd()
def process_relative(self, delta_x: int, delta_y: int) -> list[int]: def process_relative(self, delta_x: int, delta_y: int) -> bytes:
self.__delta_x = self.__fix_relative(delta_x) self.__delta_x = self.__fix_relative(delta_x)
self.__delta_y = self.__fix_relative(delta_y) self.__delta_y = self.__fix_relative(delta_y)
self.__wheel_y = 0 self.__wheel_y = 0
return self.__make_relative_cmd() return self.__make_relative_cmd()
def __make_absolute_cmd(self) -> list[int]: def __make_absolute_cmd(self) -> bytes:
return [ return bytes([
0, 0x04, 0x07, 0x02, 0, 0x04, 0x07, 0x02,
self.__buttons, self.__buttons,
self.__to_x[1], self.__to_x[0], self.__to_x[1], self.__to_x[0],
self.__to_y[1], self.__to_y[0], self.__to_y[1], self.__to_y[0],
self.__wheel_y, self.__wheel_y,
] ])
def __make_relative_cmd(self) -> list[int]: def __make_relative_cmd(self) -> bytes:
return [ return bytes([
0, 0x05, 0x05, 0x01, 0, 0x05, 0x05, 0x01,
self.__buttons, self.__buttons,
self.__delta_x, self.__delta_y, self.__delta_x, self.__delta_y,
self.__wheel_y, self.__wheel_y,
] ])
def __fix_relative(self, value: int) -> int: def __fix_relative(self, value: int) -> int:
assert -127 <= value <= 127 assert -127 <= value <= 127