mirror of
https://github.com/mofeng-git/One-KVM.git
synced 2026-01-31 18:11:54 +08:00
@@ -46,8 +46,6 @@ from ....validators.basic import valid_float_f01
|
||||
from ....validators.os import valid_abs_path
|
||||
from ....validators.hw import valid_gpio_pin_optional
|
||||
|
||||
from ....lanuages import Lanuages
|
||||
|
||||
from .. import BaseHid
|
||||
|
||||
from .gpio import Gpio
|
||||
@@ -146,8 +144,6 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
|
||||
|
||||
self.__stop_event = multiprocessing.Event()
|
||||
|
||||
self.gettext=Lanuages().gettext
|
||||
|
||||
@classmethod
|
||||
def get_plugin_options(cls) -> dict:
|
||||
return {
|
||||
@@ -251,7 +247,7 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
|
||||
@aiotools.atomic_fg
|
||||
async def cleanup(self) -> None:
|
||||
if self.is_alive():
|
||||
get_logger(0).info(self.gettext("Stopping HID daemon ..."))
|
||||
get_logger(0).info("Stopping HID daemon ...")
|
||||
self.__stop_event.set()
|
||||
if self.is_alive() or self.exitcode is not None:
|
||||
self.join()
|
||||
@@ -320,14 +316,14 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
|
||||
with self.__gpio:
|
||||
self.__hid_loop()
|
||||
if self.__phy.has_device():
|
||||
logger.info(self.gettext("Clearing HID events ..."))
|
||||
logger.info("Clearing HID events ...")
|
||||
try:
|
||||
with self.__phy.connected() as conn:
|
||||
self.__process_request(conn, ClearEvent().make_request())
|
||||
except Exception:
|
||||
logger.exception(self.gettext("Can't clear HID events"))
|
||||
logger.exception("Can't clear HID events")
|
||||
except Exception:
|
||||
logger.exception(self.gettext("Unexpected error in the GPIO loop"))
|
||||
logger.exception("Unexpected error in the GPIO loop")
|
||||
time.sleep(1)
|
||||
|
||||
def __hid_loop(self) -> None:
|
||||
@@ -357,24 +353,24 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
|
||||
reset = False
|
||||
except Exception:
|
||||
self.clear_events()
|
||||
get_logger(0).exception(self.gettext("Unexpected error in the HID loop"))
|
||||
get_logger(0).exception("Unexpected error in the HID loop")
|
||||
time.sleep(1)
|
||||
|
||||
def __hid_loop_wait_device(self, reset: bool) -> bool:
|
||||
logger = get_logger(0)
|
||||
if reset:
|
||||
logger.info(self.gettext("Initial HID reset and wait for %s ..."), self.__phy)
|
||||
logger.info("Initial HID reset and wait for %s ...", self.__phy)
|
||||
self.__gpio.reset()
|
||||
# На самом деле SPI и Serial-девайсы не пропадают,
|
||||
# а вот USB CDC (Pico HID Bridge) вполне себе пропадает
|
||||
for _ in range(10):
|
||||
if self.__phy.has_device():
|
||||
logger.info(self.gettext("Physical HID interface found: %s"), self.__phy)
|
||||
logger.info("Physical HID interface found: %s", self.__phy)
|
||||
return True
|
||||
if self.__stop_event.is_set():
|
||||
break
|
||||
time.sleep(1)
|
||||
logger.error(self.gettext("Missing physical HID interface: %s"), self.__phy)
|
||||
logger.error("Missing physical HID interface: %s", self.__phy)
|
||||
self.__set_state_online(False)
|
||||
return False
|
||||
|
||||
@@ -392,28 +388,28 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
|
||||
try:
|
||||
if len(response) < 4:
|
||||
read_retries -= 1
|
||||
raise _TempRequestError(self.gettext(f"No response from HID: request={request!r}"))
|
||||
raise _TempRequestError(f"No response from HID: request={request!r}")
|
||||
|
||||
if not check_response(response):
|
||||
request = REQUEST_REPEAT
|
||||
raise _TempRequestError(self.gettext("Invalid response CRC; requesting response again ..."))
|
||||
raise _TempRequestError("Invalid response CRC; requesting response again ...")
|
||||
|
||||
code = response[1]
|
||||
if code == 0x48: # Request timeout # pylint: disable=no-else-raise
|
||||
raise _TempRequestError(self.gettext(f"Got request timeout from HID: request={request!r}"))
|
||||
raise _TempRequestError(f"Got request timeout from HID: request={request!r}")
|
||||
elif code == 0x40: # CRC Error
|
||||
raise _TempRequestError(self.gettext(f"Got CRC error of request from HID: request={request!r}"))
|
||||
raise _TempRequestError(f"Got CRC error of request from HID: request={request!r}")
|
||||
elif code == 0x45: # Unknown command
|
||||
raise _PermRequestError(self.gettext(f"HID did not recognize the request={request!r}"))
|
||||
raise _PermRequestError(f"HID did not recognize the request={request!r}")
|
||||
elif code == 0x24: # Rebooted?
|
||||
raise _PermRequestError(self.gettext("No previous command state inside HID, seems it was rebooted"))
|
||||
raise _PermRequestError("No previous command state inside HID, seems it was rebooted")
|
||||
elif code == 0x20: # Legacy done
|
||||
self.__set_state_online(True)
|
||||
return True
|
||||
elif code & 0x80: # Pong/Done with state
|
||||
self.__set_state_pong(response)
|
||||
return True
|
||||
raise _TempRequestError(self.gettext(f"Invalid response from HID: request={request!r}, response=0x{response!r}"))
|
||||
raise _TempRequestError(f"Invalid response from HID: request={request!r}, response=0x{response!r}")
|
||||
|
||||
except _RequestError as err:
|
||||
common_retries -= 1
|
||||
@@ -444,7 +440,7 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
|
||||
for msg in error_messages:
|
||||
logger.error(msg)
|
||||
if not (common_retries and read_retries):
|
||||
logger.error(self.gettext("Can't process HID request due many errors: %r", request))
|
||||
logger.error("Can't process HID request due many errors: %r", request)
|
||||
return error_retval
|
||||
|
||||
def __set_state_online(self, online: bool) -> None:
|
||||
|
||||
@@ -27,7 +27,6 @@ import gpiod
|
||||
|
||||
from ....logging import get_logger
|
||||
|
||||
from ....lanuages import Lanuages
|
||||
|
||||
# =====
|
||||
class Gpio: # pylint: disable=too-many-instance-attributes
|
||||
@@ -51,8 +50,6 @@ class Gpio: # pylint: disable=too-many-instance-attributes
|
||||
self.__line_request: (gpiod.LineRequest | None) = None
|
||||
self.__last_power: (bool | None) = None
|
||||
|
||||
self.gettext=Lanuages().gettext
|
||||
|
||||
def __enter__(self) -> None:
|
||||
if self.__power_detect_pin >= 0 or self.__reset_pin >= 0:
|
||||
assert self.__line_request is None
|
||||
@@ -94,7 +91,7 @@ class Gpio: # pylint: disable=too-many-instance-attributes
|
||||
assert self.__line_request
|
||||
power = bool(self.__line_request.get_value(self.__power_detect_pin).value)
|
||||
if power != self.__last_power:
|
||||
get_logger(0).info(self.gettext("HID power state changed: %s -> %s"), self.__last_power, power)
|
||||
get_logger(0).info("HID power state changed: %s -> %s", self.__last_power, power)
|
||||
self.__last_power = power
|
||||
return power
|
||||
return True
|
||||
@@ -108,4 +105,4 @@ class Gpio: # pylint: disable=too-many-instance-attributes
|
||||
finally:
|
||||
self.__line_request.set_value(self.__reset_pin, gpiod.line.Value(self.__reset_inverted))
|
||||
time.sleep(1)
|
||||
get_logger(0).info(self.gettext("Reset HID performed"))
|
||||
get_logger(0).info("Reset HID performed")
|
||||
|
||||
Reference in New Issue
Block a user