mirror of
https://github.com/mofeng-git/One-KVM.git
synced 2026-02-01 18:41:54 +08:00
Add CH9329 Serial to HID support (#121)
* Add ch9329 plugin * refactoring ch9329 * refactor ch9329 and cleanup * refactoring * fixing lint errors * clarifying list type
This commit is contained in:
271
kvmd/plugins/hid/ch9329/__init__.py
Normal file
271
kvmd/plugins/hid/ch9329/__init__.py
Normal file
@@ -0,0 +1,271 @@
|
|||||||
|
# ========================================================================== #
|
||||||
|
# #
|
||||||
|
# KVMD - The main PiKVM daemon. #
|
||||||
|
# #
|
||||||
|
# Copyright (C) 2018-2022 Maxim Devaev <mdevaev@gmail.com> #
|
||||||
|
# #
|
||||||
|
# This program is free software: you can redistribute it and/or modify #
|
||||||
|
# it under the terms of the GNU General Public License as published by #
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or #
|
||||||
|
# (at your option) any later version. #
|
||||||
|
# #
|
||||||
|
# This program is distributed in the hope that it will be useful, #
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
|
||||||
|
# GNU General Public License for more details. #
|
||||||
|
# #
|
||||||
|
# You should have received a copy of the GNU General Public License #
|
||||||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
|
||||||
|
# #
|
||||||
|
# ========================================================================== #
|
||||||
|
|
||||||
|
|
||||||
|
import multiprocessing
|
||||||
|
import queue
|
||||||
|
import time
|
||||||
|
|
||||||
|
from typing import Iterable
|
||||||
|
from typing import AsyncGenerator
|
||||||
|
|
||||||
|
from ....logging import get_logger
|
||||||
|
|
||||||
|
from .... import tools
|
||||||
|
from .... import aiotools
|
||||||
|
from .... import aiomulti
|
||||||
|
from .... import aioproc
|
||||||
|
|
||||||
|
from ....yamlconf import Option
|
||||||
|
|
||||||
|
from ....validators.basic import valid_bool
|
||||||
|
from ....validators.basic import valid_int_f0
|
||||||
|
from ....validators.basic import valid_int_f1
|
||||||
|
from ....validators.basic import valid_float_f01
|
||||||
|
from ....validators.os import valid_abs_path
|
||||||
|
from ....validators.hw import valid_tty_speed
|
||||||
|
|
||||||
|
from .. import BaseHid
|
||||||
|
|
||||||
|
from .tty import TTY
|
||||||
|
from .mouse import Mouse
|
||||||
|
from .keyboard import Keyboard
|
||||||
|
|
||||||
|
from .tty import get_info
|
||||||
|
|
||||||
|
|
||||||
|
class _ResError(Exception):
|
||||||
|
def __init__(self, msg: str) -> None:
|
||||||
|
super().__init__(msg)
|
||||||
|
self.msg = msg
|
||||||
|
|
||||||
|
|
||||||
|
# =====
|
||||||
|
class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-instance-attributes
|
||||||
|
def __init__( # pylint: disable=too-many-arguments,super-init-not-called
|
||||||
|
self,
|
||||||
|
device_path: str,
|
||||||
|
speed: int,
|
||||||
|
read_timeout: float,
|
||||||
|
read_retries: int,
|
||||||
|
common_retries: int,
|
||||||
|
retries_delay: float,
|
||||||
|
errors_threshold: int,
|
||||||
|
noop: bool,
|
||||||
|
) -> None:
|
||||||
|
|
||||||
|
multiprocessing.Process.__init__(self, daemon=True)
|
||||||
|
|
||||||
|
self.__device_path = device_path
|
||||||
|
self.__speed = speed
|
||||||
|
self.__read_timeout = read_timeout
|
||||||
|
self.__read_retries = read_retries
|
||||||
|
self.__common_retries = common_retries
|
||||||
|
self.__retries_delay = retries_delay
|
||||||
|
self.__errors_threshold = errors_threshold
|
||||||
|
self.__noop = noop
|
||||||
|
|
||||||
|
self.__reset_required_event = multiprocessing.Event()
|
||||||
|
self.__cmd_queue: "multiprocessing.Queue[list]" = multiprocessing.Queue()
|
||||||
|
|
||||||
|
self.__notifier = aiomulti.AioProcessNotifier()
|
||||||
|
self.__state_flags = aiomulti.AioSharedFlags({
|
||||||
|
"online": 0,
|
||||||
|
"busy": 0,
|
||||||
|
"status": 0,
|
||||||
|
}, self.__notifier, type=int)
|
||||||
|
|
||||||
|
self.__stop_event = multiprocessing.Event()
|
||||||
|
self.__tty = TTY(device_path, speed, read_timeout)
|
||||||
|
self.__keyboard = Keyboard()
|
||||||
|
self.__mouse = Mouse()
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_plugin_options(cls) -> dict:
|
||||||
|
return {
|
||||||
|
"device": Option("/dev/kvmd-hid", type=valid_abs_path, unpack_as="device_path"),
|
||||||
|
"speed": Option(9600, type=valid_tty_speed),
|
||||||
|
"read_timeout": Option(0.3, type=valid_float_f01),
|
||||||
|
"read_retries": Option(5, type=valid_int_f1),
|
||||||
|
"common_retries": Option(5, type=valid_int_f1),
|
||||||
|
"retries_delay": Option(0.5, type=valid_float_f01),
|
||||||
|
"errors_threshold": Option(5, type=valid_int_f0),
|
||||||
|
"noop": Option(False, type=valid_bool),
|
||||||
|
}
|
||||||
|
|
||||||
|
def sysprep(self) -> None:
|
||||||
|
get_logger(0).info("Starting HID daemon ...")
|
||||||
|
self.start()
|
||||||
|
|
||||||
|
async def get_state(self) -> dict:
|
||||||
|
state = await self.__state_flags.get()
|
||||||
|
online = bool(state["online"])
|
||||||
|
active_mouse = self.__mouse.active()
|
||||||
|
absolute = (active_mouse == "usb")
|
||||||
|
keyboard_leds = await self.__keyboard.leds()
|
||||||
|
|
||||||
|
return {
|
||||||
|
"online": online,
|
||||||
|
"busy": False,
|
||||||
|
"connected": None,
|
||||||
|
"keyboard": {
|
||||||
|
"online": True,
|
||||||
|
"leds": keyboard_leds,
|
||||||
|
"outputs": {"available": [], "active": ""},
|
||||||
|
},
|
||||||
|
"mouse": {
|
||||||
|
"online": True,
|
||||||
|
"absolute": absolute,
|
||||||
|
"outputs": {
|
||||||
|
"available": ["usb", "usb_rel"],
|
||||||
|
"active": active_mouse
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
async def poll_state(self) -> AsyncGenerator[dict, None]:
|
||||||
|
prev_state: dict = {}
|
||||||
|
while True:
|
||||||
|
state = await self.get_state()
|
||||||
|
if state != prev_state:
|
||||||
|
yield state
|
||||||
|
prev_state = state
|
||||||
|
await self.__notifier.wait()
|
||||||
|
|
||||||
|
async def reset(self) -> None:
|
||||||
|
self.__reset_required_event.set()
|
||||||
|
|
||||||
|
@aiotools.atomic_fg
|
||||||
|
async def cleanup(self) -> None:
|
||||||
|
if self.is_alive():
|
||||||
|
get_logger(0).info("Stopping HID daemon ...")
|
||||||
|
self.__stop_event.set()
|
||||||
|
if self.is_alive() or self.exitcode is not None:
|
||||||
|
self.join()
|
||||||
|
|
||||||
|
# =====
|
||||||
|
|
||||||
|
def send_key_events(self, keys: Iterable[tuple[str, bool]]) -> None:
|
||||||
|
for (key, state) in keys:
|
||||||
|
self.__queue_cmd(self.__keyboard.key(key, state))
|
||||||
|
|
||||||
|
def send_mouse_button_event(self, button: str, state: bool) -> None:
|
||||||
|
self.__queue_cmd(self.__mouse.button(button, state))
|
||||||
|
|
||||||
|
def send_mouse_move_event(self, to_x: int, to_y: int) -> None:
|
||||||
|
self.__queue_cmd(self.__mouse.move(to_x, to_y))
|
||||||
|
|
||||||
|
def send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None:
|
||||||
|
self.__queue_cmd(self.__mouse.wheel(delta_x, delta_y))
|
||||||
|
|
||||||
|
def send_mouse_relative_event(self, delta_x: int, delta_y: int) -> None:
|
||||||
|
self.__queue_cmd(self.__mouse.relative(delta_x, delta_y))
|
||||||
|
|
||||||
|
def set_params(self, keyboard_output: (str | None)=None, mouse_output: (str | None)=None) -> None:
|
||||||
|
if mouse_output is not None:
|
||||||
|
get_logger(0).info("HID : mouse output = %s", mouse_output)
|
||||||
|
self.__mouse.set_active(mouse_output)
|
||||||
|
self.__notifier.notify()
|
||||||
|
|
||||||
|
def set_connected(self, connected: bool) -> None:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def clear_events(self) -> None:
|
||||||
|
tools.clear_queue(self.__cmd_queue)
|
||||||
|
|
||||||
|
def __queue_cmd(self, cmd: list[int], clear: bool=False) -> None:
|
||||||
|
if not self.__stop_event.is_set():
|
||||||
|
if clear:
|
||||||
|
# FIXME: Если очистка производится со стороны процесса хида, то возможна гонка между
|
||||||
|
# очисткой и добавлением нового события. Неприятно, но не смертельно.
|
||||||
|
# Починить блокировкой после перехода на асинхронные очереди.
|
||||||
|
tools.clear_queue(self.__cmd_queue)
|
||||||
|
self.__cmd_queue.put_nowait(cmd)
|
||||||
|
|
||||||
|
def run(self) -> None: # pylint: disable=too-many-branches
|
||||||
|
logger = aioproc.settle("HID", "hid")
|
||||||
|
while not self.__stop_event.is_set():
|
||||||
|
try:
|
||||||
|
# self.__tty.connect()
|
||||||
|
self.__hid_loop()
|
||||||
|
except Exception:
|
||||||
|
logger.exception("Unexpected error in the run loop")
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
def __hid_loop(self) -> None:
|
||||||
|
while not self.__stop_event.is_set():
|
||||||
|
try:
|
||||||
|
|
||||||
|
while not (self.__stop_event.is_set() and self.__cmd_queue.qsize() == 0):
|
||||||
|
if self.__reset_required_event.is_set():
|
||||||
|
try:
|
||||||
|
self.__set_state_busy(True)
|
||||||
|
# self.__process_request(conn, RESET)
|
||||||
|
finally:
|
||||||
|
self.__reset_required_event.clear()
|
||||||
|
try:
|
||||||
|
cmd = self.__cmd_queue.get(timeout=0.1)
|
||||||
|
# get_logger(0).info(f"HID : cmd = {cmd}")
|
||||||
|
except queue.Empty:
|
||||||
|
self.__process_cmd(get_info())
|
||||||
|
else:
|
||||||
|
self.__process_cmd(cmd)
|
||||||
|
except Exception:
|
||||||
|
self.clear_events()
|
||||||
|
get_logger(0).exception("Unexpected error in the HID loop")
|
||||||
|
time.sleep(2)
|
||||||
|
|
||||||
|
def __process_cmd(self, cmd: list[int]) -> bool: # pylint: disable=too-many-branches
|
||||||
|
error_retval = False
|
||||||
|
try:
|
||||||
|
res = self.__tty.send(cmd)
|
||||||
|
# get_logger(0).info(f"HID response = {res}")
|
||||||
|
if len(res) < 4:
|
||||||
|
raise _ResError("No response from HID - might be disconnected")
|
||||||
|
|
||||||
|
if not self.__tty.check_res(res):
|
||||||
|
raise _ResError("Invalid response checksum ...")
|
||||||
|
|
||||||
|
# Response Error
|
||||||
|
if res[4] == 1 and res[5] != 0:
|
||||||
|
raise _ResError("Command error code = " + str(res[5]))
|
||||||
|
|
||||||
|
# get_info response
|
||||||
|
if res[3] == 0x81:
|
||||||
|
self.__keyboard.set_leds(res[7])
|
||||||
|
self.__notifier.notify()
|
||||||
|
|
||||||
|
self.__set_state_online(True)
|
||||||
|
return True
|
||||||
|
|
||||||
|
except Exception as err:
|
||||||
|
self.__set_state_online(False)
|
||||||
|
get_logger(0).info(err)
|
||||||
|
time.sleep(2)
|
||||||
|
error_retval = False
|
||||||
|
|
||||||
|
return error_retval
|
||||||
|
|
||||||
|
def __set_state_online(self, online: bool) -> None:
|
||||||
|
self.__state_flags.update(online=int(online))
|
||||||
|
|
||||||
|
def __set_state_busy(self, busy: bool) -> None:
|
||||||
|
self.__state_flags.update(busy=int(busy))
|
||||||
71
kvmd/plugins/hid/ch9329/keyboard.py
Normal file
71
kvmd/plugins/hid/ch9329/keyboard.py
Normal file
@@ -0,0 +1,71 @@
|
|||||||
|
# ========================================================================== #
|
||||||
|
# #
|
||||||
|
# KVMD - The main PiKVM daemon. #
|
||||||
|
# #
|
||||||
|
# Copyright (C) 2018-2022 Maxim Devaev <mdevaev@gmail.com> #
|
||||||
|
# #
|
||||||
|
# This program is free software: you can redistribute it and/or modify #
|
||||||
|
# it under the terms of the GNU General Public License as published by #
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or #
|
||||||
|
# (at your option) any later version. #
|
||||||
|
# #
|
||||||
|
# This program is distributed in the hope that it will be useful, #
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
|
||||||
|
# GNU General Public License for more details. #
|
||||||
|
# #
|
||||||
|
# You should have received a copy of the GNU General Public License #
|
||||||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
|
||||||
|
# #
|
||||||
|
# ========================================================================== #
|
||||||
|
|
||||||
|
from .... import aiomulti
|
||||||
|
|
||||||
|
from ....keyboard.mappings import KEYMAP
|
||||||
|
|
||||||
|
|
||||||
|
class Keyboard:
|
||||||
|
def __init__(self) -> None:
|
||||||
|
|
||||||
|
self.__notifier = aiomulti.AioProcessNotifier()
|
||||||
|
self.__leds = aiomulti.AioSharedFlags({
|
||||||
|
"num": False,
|
||||||
|
"caps": False,
|
||||||
|
"scroll": False,
|
||||||
|
}, self.__notifier, type=bool)
|
||||||
|
|
||||||
|
self.__active_keys: list[list] = []
|
||||||
|
|
||||||
|
def key(self, key: str, state: bool) -> list[int]:
|
||||||
|
if state:
|
||||||
|
self.__active_keys.append([key, self.__is_modifier(key)])
|
||||||
|
else:
|
||||||
|
self.__active_keys.remove([key, self.__is_modifier(key)])
|
||||||
|
return self.__key()
|
||||||
|
|
||||||
|
async def leds(self) -> dict:
|
||||||
|
leds = await self.__leds.get()
|
||||||
|
return leds
|
||||||
|
|
||||||
|
def set_leds(self, led_byte: int) -> None:
|
||||||
|
num = bool(led_byte & 1)
|
||||||
|
caps = bool((led_byte >> 1) & 1)
|
||||||
|
scroll = bool((led_byte >> 2) & 1)
|
||||||
|
self.__leds.update(num=num, caps=caps, scroll=scroll)
|
||||||
|
|
||||||
|
def __key(self) -> list[int]:
|
||||||
|
cmd = [0x00, 0x02, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]
|
||||||
|
counter = 0
|
||||||
|
for key in self.__active_keys:
|
||||||
|
if key[1]:
|
||||||
|
cmd[3 + counter] = self.__keycode(key[0])
|
||||||
|
else:
|
||||||
|
cmd[5 + counter] = self.__keycode(key[0])
|
||||||
|
counter += 1
|
||||||
|
return cmd
|
||||||
|
|
||||||
|
def __keycode(self, key: str) -> int:
|
||||||
|
return KEYMAP[key].usb.code
|
||||||
|
|
||||||
|
def __is_modifier(self, key: str) -> bool:
|
||||||
|
return KEYMAP[key].usb.is_modifier
|
||||||
115
kvmd/plugins/hid/ch9329/mouse.py
Normal file
115
kvmd/plugins/hid/ch9329/mouse.py
Normal file
@@ -0,0 +1,115 @@
|
|||||||
|
# ========================================================================== #
|
||||||
|
# #
|
||||||
|
# KVMD - The main PiKVM daemon. #
|
||||||
|
# #
|
||||||
|
# Copyright (C) 2018-2022 Maxim Devaev <mdevaev@gmail.com> #
|
||||||
|
# #
|
||||||
|
# This program is free software: you can redistribute it and/or modify #
|
||||||
|
# it under the terms of the GNU General Public License as published by #
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or #
|
||||||
|
# (at your option) any later version. #
|
||||||
|
# #
|
||||||
|
# This program is distributed in the hope that it will be useful, #
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
|
||||||
|
# GNU General Public License for more details. #
|
||||||
|
# #
|
||||||
|
# You should have received a copy of the GNU General Public License #
|
||||||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
|
||||||
|
# #
|
||||||
|
# ========================================================================== #
|
||||||
|
|
||||||
|
import math
|
||||||
|
|
||||||
|
from ....mouse import MouseRange
|
||||||
|
|
||||||
|
|
||||||
|
class Mouse: # pylint: disable=too-many-instance-attributes
|
||||||
|
def __init__(self) -> None:
|
||||||
|
self.__active = "usb"
|
||||||
|
self.__button = ""
|
||||||
|
self.__clicked = False
|
||||||
|
self.__to_x = [0, 0]
|
||||||
|
self.__to_y = [0, 0]
|
||||||
|
self.__wheel_y = 0
|
||||||
|
self.__delta_x = 0
|
||||||
|
self.__delta_y = 0
|
||||||
|
|
||||||
|
def button(self, button: str, clicked: bool) -> list[int]:
|
||||||
|
self.__button = button
|
||||||
|
self.__clicked = clicked
|
||||||
|
self.__wheel_y = 0
|
||||||
|
if self.__active != "usb":
|
||||||
|
self.__to_x = [0, 0]
|
||||||
|
self.__to_y = [0, 0]
|
||||||
|
return self.__absolute()
|
||||||
|
|
||||||
|
def move(self, to_x: int, to_y: int) -> list[int]:
|
||||||
|
assert MouseRange.MIN <= to_x <= MouseRange.MAX
|
||||||
|
assert MouseRange.MIN <= to_y <= MouseRange.MAX
|
||||||
|
self.__to_x = self.__to_fixed(to_x)
|
||||||
|
self.__to_y = self.__to_fixed(to_y)
|
||||||
|
self.__wheel_y = 0
|
||||||
|
return self.__absolute()
|
||||||
|
|
||||||
|
def wheel(self, delta_x: int, delta_y: int) -> list[int]:
|
||||||
|
assert -127 <= delta_y <= 127
|
||||||
|
_ = delta_x
|
||||||
|
self.__wheel_y = 1 if delta_y > 0 else 255
|
||||||
|
return self.__absolute()
|
||||||
|
|
||||||
|
def relative(self, delta_x: int, delta_y: int) -> list[int]:
|
||||||
|
assert -127 <= delta_x <= 127
|
||||||
|
assert -127 <= delta_y <= 127
|
||||||
|
delta_x = math.ceil(delta_x / 3)
|
||||||
|
delta_y = math.ceil(delta_y / 3)
|
||||||
|
self.__delta_x = delta_x if delta_x >= 0 else 255 + delta_x
|
||||||
|
self.__delta_y = delta_y if delta_y >= 0 else 255 + delta_y
|
||||||
|
return self.__relative()
|
||||||
|
|
||||||
|
def active(self) -> str:
|
||||||
|
return self.__active
|
||||||
|
|
||||||
|
def set_active(self, name: str) -> None:
|
||||||
|
self.__active = name
|
||||||
|
|
||||||
|
def __absolute(self) -> list[int]:
|
||||||
|
code = 0x00
|
||||||
|
if self.__clicked:
|
||||||
|
code = self.__button_code(self.__button)
|
||||||
|
cmd = [0x00, 0x04, 0x07, 0x02, code, 0x00, 0x00, 0x00, 0x00, 0x00]
|
||||||
|
if len(self.__to_x) == 2:
|
||||||
|
cmd[6] = self.__to_x[0]
|
||||||
|
cmd[5] = self.__to_x[1]
|
||||||
|
if len(self.__to_y) == 2:
|
||||||
|
cmd[8] = self.__to_y[0]
|
||||||
|
cmd[7] = self.__to_y[1]
|
||||||
|
if self.__wheel_y:
|
||||||
|
cmd[9] = self.__wheel_y
|
||||||
|
return cmd
|
||||||
|
|
||||||
|
def __relative(self) -> list[int]:
|
||||||
|
code = 0x00
|
||||||
|
if self.__clicked:
|
||||||
|
code = self.__button_code(self.__button)
|
||||||
|
cmd = [0x00, 0x05, 0x05, 0x01, code, self.__delta_x, self.__delta_y, 0x00]
|
||||||
|
return cmd
|
||||||
|
|
||||||
|
def __button_code(self, name: str) -> int:
|
||||||
|
code = 0x00
|
||||||
|
match name:
|
||||||
|
case "left":
|
||||||
|
code = 0x01
|
||||||
|
case "right":
|
||||||
|
code = 0x02
|
||||||
|
case "middle":
|
||||||
|
code = 0x04
|
||||||
|
case "up":
|
||||||
|
code = 0x08
|
||||||
|
case "down":
|
||||||
|
code = 0x10
|
||||||
|
return code
|
||||||
|
|
||||||
|
def __to_fixed(self, num: int) -> list[int]:
|
||||||
|
to_fixed = math.ceil(MouseRange.remap(num, 0, MouseRange.MAX) / 8)
|
||||||
|
return [to_fixed >> 8, to_fixed & 0xFF]
|
||||||
62
kvmd/plugins/hid/ch9329/tty.py
Normal file
62
kvmd/plugins/hid/ch9329/tty.py
Normal file
@@ -0,0 +1,62 @@
|
|||||||
|
# ========================================================================== #
|
||||||
|
# #
|
||||||
|
# KVMD - The main PiKVM daemon. #
|
||||||
|
# #
|
||||||
|
# Copyright (C) 2018-2022 Maxim Devaev <mdevaev@gmail.com> #
|
||||||
|
# #
|
||||||
|
# This program is free software: you can redistribute it and/or modify #
|
||||||
|
# it under the terms of the GNU General Public License as published by #
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or #
|
||||||
|
# (at your option) any later version. #
|
||||||
|
# #
|
||||||
|
# This program is distributed in the hope that it will be useful, #
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
|
||||||
|
# GNU General Public License for more details. #
|
||||||
|
# #
|
||||||
|
# You should have received a copy of the GNU General Public License #
|
||||||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
|
||||||
|
# #
|
||||||
|
# ========================================================================== #
|
||||||
|
|
||||||
|
|
||||||
|
import os
|
||||||
|
import serial
|
||||||
|
|
||||||
|
|
||||||
|
class TTY:
|
||||||
|
def __init__(self, device_path: str, speed: int, read_timeout: float) -> None:
|
||||||
|
self.__tty = serial.Serial(device_path, speed, timeout=read_timeout)
|
||||||
|
self.__device_path = device_path
|
||||||
|
|
||||||
|
def has_device(self) -> bool:
|
||||||
|
return os.path.exists(self.__device_path)
|
||||||
|
|
||||||
|
def send(self, cmd: list[int]) -> list[int]:
|
||||||
|
cmd = self.__wrap_cmd(cmd)
|
||||||
|
self.__tty.write(serial.to_bytes(cmd))
|
||||||
|
data = list(self.__tty.read(5))
|
||||||
|
if data and data[4]:
|
||||||
|
more_data = list(self.__tty.read(data[4] + 1))
|
||||||
|
data.extend(more_data)
|
||||||
|
return data
|
||||||
|
|
||||||
|
def check_res(self, res: list[int]) -> bool:
|
||||||
|
res_sum = res.pop()
|
||||||
|
return (self.__checksum(res) == res_sum)
|
||||||
|
|
||||||
|
def __wrap_cmd(self, cmd: list[int]) -> list[int]:
|
||||||
|
cmd.insert(0, 0xAB)
|
||||||
|
cmd.insert(0, 0x57)
|
||||||
|
cmd.append(self.__checksum(cmd))
|
||||||
|
return cmd
|
||||||
|
|
||||||
|
def __checksum(self, cmd: list[int]) -> int:
|
||||||
|
return sum(cmd) % 256
|
||||||
|
|
||||||
|
|
||||||
|
def get_info() -> list[int]:
|
||||||
|
return [0x00, 0x01, 0x00]
|
||||||
|
|
||||||
|
# RESET = [0x00,0x0F,0x00]
|
||||||
|
# GET_INFO = [0x00,0x01,0x00]
|
||||||
Reference in New Issue
Block a user