auto-reset hid

This commit is contained in:
Devaev Maxim 2020-11-23 05:01:33 +03:00
parent 31ca16a4f4
commit d58f0847d5
2 changed files with 70 additions and 47 deletions

View File

@ -55,6 +55,7 @@ from .gpio import Gpio
from .proto import REQUEST_PING
from .proto import REQUEST_REPEAT
from .proto import RESPONSE_LEGACY_OK
from .proto import BaseEvent
from .proto import SetKeyboardOutputEvent
from .proto import SetMouseOutputEvent
@ -64,6 +65,7 @@ from .proto import MouseButtonEvent
from .proto import MouseMoveEvent
from .proto import MouseRelativeEvent
from .proto import MouseWheelEvent
from .proto import get_active_keyboard
from .proto import get_active_mouse
from .proto import check_response
@ -84,6 +86,12 @@ class _TempRequestError(_RequestError):
pass
# =====
class _HardResetEvent(BaseEvent):
def make_request(self) -> bytes:
raise RuntimeError("Don't call me")
# =====
class BasePhyConnection:
def send(self, request: bytes) -> bytes:
@ -151,7 +159,6 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
}
def sysprep(self) -> None:
self.__gpio.open()
get_logger(0).info("Starting HID daemon ...")
self.start()
@ -214,28 +221,16 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
prev_state = state
await self.__notifier.wait()
@aiotools.atomic
async def reset(self) -> None:
await self.__gpio.reset()
self.__queue_event(_HardResetEvent(), clear=True)
@aiotools.atomic
async def cleanup(self) -> None:
logger = get_logger(0)
try:
if self.is_alive():
logger.info("Stopping HID daemon ...")
self.__stop_event.set()
if self.exitcode is not None:
self.join()
if self.__phy.has_device():
get_logger().info("Clearing HID events ...")
try:
with self.__phy.connected() as conn:
self.__process_request(conn, ClearEvent().make_request())
except Exception:
logger.exception("Can't clear HID events")
finally:
self.__gpio.close()
if self.is_alive():
get_logger(0).info("Stopping HID daemon ...")
self.__stop_event.set()
if self.exitcode is not None:
self.join()
# =====
@ -282,22 +277,42 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
while not self.__stop_event.is_set():
try:
if self.__phy.has_device():
with self.__phy.connected() as conn:
while not (self.__stop_event.is_set() and self.__events_queue.qsize() == 0):
try:
event = self.__events_queue.get(timeout=0.1)
except queue.Empty:
self.__process_request(conn, REQUEST_PING)
else:
if not self.__process_request(conn, event.make_request()):
self.clear_events()
else:
with self.__gpio:
self.__hid_loop()
if self.__phy.has_device():
logger.info("Clearing HID events ...")
try:
with self.__phy.connected() as conn:
self.__process_request(conn, ClearEvent().make_request())
except Exception:
logger.exception("Can't clear HID events")
except Exception:
logger.exception("Unexpected error in the GPIO loop")
time.sleep(1)
def __hid_loop(self) -> None:
logger = get_logger(0)
while not self.__stop_event.is_set():
try:
if not self.__phy.has_device():
logger.error("Missing HID device")
time.sleep(1)
continue
with self.__phy.connected() as conn:
while not (self.__stop_event.is_set() and self.__events_queue.qsize() == 0):
try:
event = self.__events_queue.get(timeout=0.1)
except queue.Empty:
self.__process_request(conn, REQUEST_PING)
else:
if isinstance(event, _HardResetEvent):
self.__gpio.reset()
elif not self.__process_request(conn, event.make_request()):
self.clear_events()
except Exception:
self.clear_events()
logger.exception("Unexpected HID error")
logger.exception("Unexpected error in the HID loop")
time.sleep(1)
def __process_request(self, conn: BasePhyConnection, request: bytes) -> bool: # pylint: disable=too-many-branches
@ -373,3 +388,5 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
if len(response) > 4:
status |= (response[2] << 8) | response[3]
self.__state_flags.update(online=1, status=status)
if response[1] & 0b01000000: # Reset required
self.__gpio.reset()

View File

@ -20,6 +20,10 @@
# ========================================================================== #
import types
import time
from typing import Type
from typing import Optional
import gpiod
@ -27,8 +31,6 @@ import gpiod
from ....logging import get_logger
from .... import env
from .... import aiotools
from .... import aiogp
# =====
@ -46,9 +48,8 @@ class Gpio:
self.__chip: Optional[gpiod.Chip] = None
self.__reset_line: Optional[gpiod.Line] = None
self.__reset_wip = False
def open(self) -> None:
def __enter__(self) -> None:
if self.__reset_pin >= 0:
assert self.__chip is None
assert self.__reset_line is None
@ -56,23 +57,28 @@ class Gpio:
self.__reset_line = self.__chip.get_line(self.__reset_pin)
self.__reset_line.request("kvmd::hid::reset", gpiod.LINE_REQ_DIR_OUT, default_vals=[int(self.__reset_inverted)])
def close(self) -> None:
def __exit__(
self,
_exc_type: Type[BaseException],
_exc: BaseException,
_tb: types.TracebackType,
) -> None:
if self.__chip:
try:
self.__chip.close()
except Exception:
pass
self.__reset_line = None
self.__chip = None
@aiotools.atomic
async def reset(self) -> None:
def reset(self) -> None:
if self.__reset_pin >= 0:
assert self.__reset_line
if not self.__reset_wip:
self.__reset_wip = True
try:
await aiogp.pulse(self.__reset_line, self.__reset_delay, 1, self.__reset_inverted)
finally:
self.__reset_wip = False
get_logger(0).info("Reset HID performed")
else:
get_logger(0).info("Another reset HID in progress")
try:
self.__reset_line.set_value(int(not self.__reset_inverted))
time.sleep(self.__reset_delay)
finally:
self.__reset_line.set_value(int(self.__reset_inverted))
time.sleep(1)
get_logger(0).info("Reset HID performed")