mcu hid: optional power detecting on the hid device

This commit is contained in:
Maxim Devaev
2023-08-06 03:36:54 +03:00
parent 472605734e
commit 92c3620a86
4 changed files with 61 additions and 21 deletions

View File

@@ -28,6 +28,7 @@ import time
from typing import Iterable from typing import Iterable
from typing import Generator from typing import Generator
from typing import AsyncGenerator from typing import AsyncGenerator
from typing import Any
from ....logging import get_logger from ....logging import get_logger
@@ -103,18 +104,13 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
def __init__( # pylint: disable=too-many-arguments,super-init-not-called def __init__( # pylint: disable=too-many-arguments,super-init-not-called
self, self,
phy: BasePhy, phy: BasePhy,
gpio_device_path: str,
reset_pin: int,
reset_inverted: bool,
reset_delay: float,
reset_self: bool, reset_self: bool,
read_retries: int, read_retries: int,
common_retries: int, common_retries: int,
retries_delay: float, retries_delay: float,
errors_threshold: int, errors_threshold: int,
noop: bool, noop: bool,
**gpio_kwargs: Any,
) -> None: ) -> None:
multiprocessing.Process.__init__(self, daemon=True) multiprocessing.Process.__init__(self, daemon=True)
@@ -126,7 +122,8 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
self.__noop = noop self.__noop = noop
self.__phy = phy self.__phy = phy
self.__gpio = Gpio(gpio_device_path, reset_pin, reset_inverted, reset_delay) gpio_device_path = gpio_kwargs.pop("gpio_device_path")
self.__gpio = Gpio(device_path=gpio_device_path, **gpio_kwargs)
self.__reset_self = reset_self self.__reset_self = reset_self
self.__reset_required_event = multiprocessing.Event() self.__reset_required_event = multiprocessing.Event()
@@ -144,11 +141,15 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
@classmethod @classmethod
def get_plugin_options(cls) -> dict: def get_plugin_options(cls) -> dict:
return { return {
"gpio_device": Option("/dev/gpiochip0", type=valid_abs_path, unpack_as="gpio_device_path"), # <gpio_kwargs>
"reset_pin": Option(4, type=valid_gpio_pin_optional), "gpio_device": Option("/dev/gpiochip0", type=valid_abs_path, unpack_as="gpio_device_path"),
"reset_inverted": Option(False, type=valid_bool), "power_detect_pin": Option(-1, type=valid_gpio_pin_optional),
"reset_delay": Option(0.1, type=valid_float_f01), "power_detect_pull_down": Option(False, type=valid_bool),
"reset_self": Option(False, type=valid_bool), "reset_pin": Option(4, type=valid_gpio_pin_optional),
"reset_inverted": Option(False, type=valid_bool),
"reset_delay": Option(0.1, type=valid_float_f01),
# </gpio_kwargs>
"reset_self": Option(False, type=valid_bool),
"read_retries": Option(5, type=valid_int_f1), "read_retries": Option(5, type=valid_int_f1),
"common_retries": Option(5, type=valid_int_f1), "common_retries": Option(5, type=valid_int_f1),
@@ -329,18 +330,18 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
def __hid_loop_wait_device(self) -> bool: def __hid_loop_wait_device(self) -> bool:
logger = get_logger(0) logger = get_logger(0)
logger.info("Initial HID reset and wait ...") logger.info("Initial HID reset and wait for %s ...", self.__phy)
self.__gpio.reset() self.__gpio.reset()
# На самом деле SPI и Serial-девайсы не пропадают, просто резет и ожидание # На самом деле SPI и Serial-девайсы не пропадают, просто резет и ожидание
# логичнее всего делать именно здесь. Ну и на будущее, да # логичнее всего делать именно здесь. Ну и на будущее, да
for _ in range(10): for _ in range(10):
if self.__phy.has_device(): if self.__phy.has_device():
logger.info("HID found") logger.info("Physical HID interface found: %s", self.__phy)
return True return True
if self.__stop_event.is_set(): if self.__stop_event.is_set():
break break
time.sleep(1) time.sleep(1)
logger.error("Missing HID") logger.error("Missing physical HID interface: %s", self.__phy)
return False return False
def __process_request(self, conn: BasePhyConnection, request: bytes) -> bool: # pylint: disable=too-many-branches def __process_request(self, conn: BasePhyConnection, request: bytes) -> bool: # pylint: disable=too-many-branches
@@ -352,7 +353,7 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
read_retries = self.__read_retries read_retries = self.__read_retries
error_retval = False error_retval = False
while common_retries and read_retries: while self.__gpio.is_powered() and common_retries and read_retries:
response = (RESPONSE_LEGACY_OK if self.__noop else conn.send(request)) response = (RESPONSE_LEGACY_OK if self.__noop else conn.send(request))
try: try:
if len(response) < 4: if len(response) < 4:
@@ -402,6 +403,10 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
if common_retries and read_retries: if common_retries and read_retries:
time.sleep(self.__retries_delay) time.sleep(self.__retries_delay)
if not self.__gpio.is_powered():
self.__set_state_online(False)
return True
for msg in error_messages: for msg in error_messages:
logger.error(msg) logger.error(msg)
if not (common_retries and read_retries): if not (common_retries and read_retries):

View File

@@ -29,30 +29,48 @@ from ....logging import get_logger
# ===== # =====
class Gpio: class Gpio: # pylint: disable=too-many-instance-attributes
def __init__( def __init__(
self, self,
device_path: str, device_path: str,
power_detect_pin: int,
power_detect_pull_down: bool,
reset_pin: int, reset_pin: int,
reset_inverted: bool, reset_inverted: bool,
reset_delay: float, reset_delay: float,
) -> None: ) -> None:
self.__device_path = device_path self.__device_path = device_path
self.__power_detect_pin = power_detect_pin
self.__power_detect_pull_down = power_detect_pull_down
self.__reset_pin = reset_pin self.__reset_pin = reset_pin
self.__reset_inverted = reset_inverted self.__reset_inverted = reset_inverted
self.__reset_delay = reset_delay self.__reset_delay = reset_delay
self.__chip: (gpiod.Chip | None) = None self.__chip: (gpiod.Chip | None) = None
self.__power_detect_line: (gpiod.Line | None) = None
self.__reset_line: (gpiod.Line | None) = None self.__reset_line: (gpiod.Line | None) = None
self.__last_power: (bool | None) = None
def __enter__(self) -> None: def __enter__(self) -> None:
if self.__reset_pin >= 0: if self.__power_detect_pin >= 0 or self.__reset_pin >= 0:
assert self.__chip is None assert self.__chip is None
assert self.__reset_line is None
self.__chip = gpiod.Chip(self.__device_path) self.__chip = gpiod.Chip(self.__device_path)
self.__reset_line = self.__chip.get_line(self.__reset_pin) if self.__power_detect_pin >= 0:
self.__reset_line.request("kvmd::hid::reset", gpiod.LINE_REQ_DIR_OUT, default_vals=[int(self.__reset_inverted)]) assert self.__power_detect_line is None
self.__power_detect_line = self.__chip.get_line(self.__power_detect_pin)
self.__power_detect_line.request(
"kvmd::hid::power_detect", gpiod.LINE_REQ_DIR_IN,
flags=(gpiod.LINE_REQ_FLAG_BIAS_PULL_DOWN if self.__power_detect_pull_down else 0),
)
if self.__reset_pin >= 0:
assert self.__reset_line is None
self.__reset_line = self.__chip.get_line(self.__reset_pin)
self.__reset_line.request(
"kvmd::hid::reset", gpiod.LINE_REQ_DIR_OUT,
default_vals=[int(self.__reset_inverted)],
)
def __exit__( def __exit__(
self, self,
@@ -66,9 +84,20 @@ class Gpio:
self.__chip.close() self.__chip.close()
except Exception: except Exception:
pass pass
self.__last_power = None
self.__power_detect_line = None
self.__reset_line = None self.__reset_line = None
self.__chip = None self.__chip = None
def is_powered(self) -> bool:
if self.__power_detect_line is not None:
power = bool(self.__power_detect_line.get_value())
if power != self.__last_power:
get_logger(0).info("HID power state changed: %s -> %s", self.__last_power, power)
self.__last_power = power
return power
return True
def reset(self) -> None: def reset(self) -> None:
if self.__reset_pin >= 0: if self.__reset_pin >= 0:
assert self.__reset_line assert self.__reset_line

View File

@@ -80,6 +80,9 @@ class _SerialPhy(BasePhy):
with serial.Serial(self.__device_path, self.__speed, timeout=self.__read_timeout) as tty: with serial.Serial(self.__device_path, self.__speed, timeout=self.__read_timeout) as tty:
yield _SerialPhyConnection(tty) yield _SerialPhyConnection(tty)
def __str__(self) -> str:
return f"Serial(path={self.__device_path})"
# ===== # =====
class Plugin(BaseMcuHid): class Plugin(BaseMcuHid):

View File

@@ -162,6 +162,9 @@ class _SpiPhy(BasePhy): # pylint: disable=too-many-instance-attributes
else: else:
yield None yield None
def __str__(self) -> str:
return f"SPI(bus={self.__bus}, chip={self.__chip})"
# ===== # =====
class Plugin(BaseMcuHid): class Plugin(BaseMcuHid):