refactoring

This commit is contained in:
Maxim Devaev
2023-05-29 17:34:24 +03:00
parent c584302587
commit be4269fe61
2 changed files with 45 additions and 63 deletions

View File

@@ -45,18 +45,11 @@ from ....validators.hw import valid_tty_speed
from .. import BaseHid from .. import BaseHid
from .tty import TTY from .chip import ChipResponseError
from .chip import Chip
from .mouse import Mouse from .mouse import Mouse
from .keyboard import Keyboard from .keyboard import Keyboard
from .tty import get_info
class _ResError(Exception):
def __init__(self, msg: str) -> None:
super().__init__(msg)
self.msg = msg
# ===== # =====
class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-instance-attributes class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-instance-attributes
@@ -94,7 +87,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
}, self.__notifier, type=int) }, self.__notifier, type=int)
self.__stop_event = multiprocessing.Event() self.__stop_event = multiprocessing.Event()
self.__tty = TTY(device_path, speed, read_timeout) self.__chip = Chip(device_path, speed, read_timeout)
self.__keyboard = Keyboard() self.__keyboard = Keyboard()
self.__mouse = Mouse() self.__mouse = Mouse()
@@ -201,7 +194,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
logger = aioproc.settle("HID", "hid") logger = aioproc.settle("HID", "hid")
while not self.__stop_event.is_set(): while not self.__stop_event.is_set():
try: try:
# self.__tty.connect() # self.__chip.connect()
self.__hid_loop() self.__hid_loop()
except Exception: except Exception:
logger.exception("Unexpected error in the run loop") logger.exception("Unexpected error in the run loop")
@@ -222,7 +215,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
cmd = self.__cmd_queue.get(timeout=0.1) cmd = self.__cmd_queue.get(timeout=0.1)
# get_logger(0).info(f"HID : cmd = {cmd}") # get_logger(0).info(f"HID : cmd = {cmd}")
except queue.Empty: except queue.Empty:
self.__process_cmd(get_info()) self.__process_cmd([])
else: else:
self.__process_cmd(cmd) self.__process_cmd(cmd)
except Exception: except Exception:
@@ -231,35 +224,19 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
time.sleep(2) time.sleep(2)
def __process_cmd(self, cmd: list[int]) -> bool: # pylint: disable=too-many-branches def __process_cmd(self, cmd: list[int]) -> bool: # pylint: disable=too-many-branches
error_retval = False
try: try:
res = self.__tty.send(cmd) led_byte = self.__chip.xfer(cmd)
# get_logger(0).info(f"HID response = {res}") except ChipResponseError as err:
if len(res) < 4:
raise _ResError("No response from HID - might be disconnected")
if not self.__tty.check_res(res):
raise _ResError("Invalid response checksum ...")
# Response Error
if res[4] == 1 and res[5] != 0:
raise _ResError("Command error code = " + str(res[5]))
# get_info response
if res[3] == 0x81:
self.__keyboard.set_leds(res[7])
self.__notifier.notify()
self.__set_state_online(True)
return True
except _ResError as err:
self.__set_state_online(False) self.__set_state_online(False)
get_logger(0).info(err) get_logger(0).info(err)
error_retval = False
time.sleep(2) time.sleep(2)
else:
return error_retval if led_byte >= 0:
self.__keyboard.set_leds(led_byte)
self.__notifier.notify()
self.__set_state_online(True)
return True
return False
def __set_state_online(self, online: bool) -> None: def __set_state_online(self, online: bool) -> None:
self.__state_flags.update(online=int(online)) self.__state_flags.update(online=int(online))

View File

@@ -20,43 +20,48 @@
# ========================================================================== # # ========================================================================== #
import os
import serial import serial
class TTY: # =====
class ChipResponseError(Exception):
pass
# =====
class Chip:
def __init__(self, device_path: str, speed: int, read_timeout: float) -> None: def __init__(self, device_path: str, speed: int, read_timeout: float) -> None:
self.__tty = serial.Serial(device_path, speed, timeout=read_timeout) self.__tty = serial.Serial(device_path, speed, timeout=read_timeout)
self.__device_path = device_path self.__device_path = device_path
def has_device(self) -> bool: def xfer(self, cmd: list[int]) -> int:
return os.path.exists(self.__device_path) self.__send(cmd)
return self.__recv()
def send(self, cmd: list[int]) -> list[int]: def __send(self, cmd: list[int]) -> None:
cmd = self.__wrap_cmd(cmd) # RESET = [0x00,0x0F,0x00]
# GET_INFO = [0x00,0x01,0x00]
if len(cmd) == 0:
cmd = [0x00, 0x01, 0x00]
cmd = [0x57, 0xAB, *cmd, self.__make_checksum(cmd)]
self.__tty.write(serial.to_bytes(cmd)) self.__tty.write(serial.to_bytes(cmd))
data = list(self.__tty.read(5))
def __recv(self) -> int:
data = self.__tty.read(5)
if len(data) < 5:
raise ChipResponseError("Too short response, HID might be disconnected")
if data and data[4]: if data and data[4]:
more_data = list(self.__tty.read(data[4] + 1)) data += self.__tty.read(data[4] + 1)
data.extend(more_data)
return data
def check_res(self, res: list[int]) -> bool: if self.__make_checksum(data[:-1]) != data[-1]:
res_sum = res.pop() raise ChipResponseError("Invalid response checksum")
return (self.__checksum(res) == res_sum)
def __wrap_cmd(self, cmd: list[int]) -> list[int]: if data[4] == 1 and data[5] != 0:
cmd.insert(0, 0xAB) raise ChipResponseError(f"Response error code = {data[5]!r}")
cmd.insert(0, 0x57)
cmd.append(self.__checksum(cmd))
return cmd
def __checksum(self, cmd: list[int]) -> int: # led_byte (info) response
return sum(cmd) % 256 return (data[7] if data[3] == 0x81 else -1)
def __make_checksum(self, cmd: (list[int] | bytes)) -> int:
def get_info() -> list[int]: return (sum(cmd) % 256)
return [0x00, 0x01, 0x00]
# RESET = [0x00,0x0F,0x00]
# GET_INFO = [0x00,0x01,0x00]