using libgpiod for the serial hid

This commit is contained in:
Devaev Maxim 2020-09-12 20:03:51 +03:00
parent 6f75496550
commit fa5e6735ed
2 changed files with 49 additions and 26 deletions

View File

@ -44,10 +44,6 @@ def _clear_gpio(config: Section) -> None:
with gpio.bcm(): with gpio.bcm():
for (name, pin) in [ for (name, pin) in [
*([
("hid_serial/reset", config.hid.reset_pin),
] if config.hid.type == "serial" else []),
*([ *([
("atx_gpio/power_switch", config.atx.power_switch_pin), ("atx_gpio/power_switch", config.atx.power_switch_pin),
("atx_gpio/reset_switch", config.atx.reset_switch_pin), ("atx_gpio/reset_switch", config.atx.reset_switch_pin),

View File

@ -35,7 +35,9 @@ from typing import List
from typing import Dict from typing import Dict
from typing import Iterable from typing import Iterable
from typing import AsyncGenerator from typing import AsyncGenerator
from typing import Optional
import gpiod
import serial import serial
from ...logging import get_logger from ...logging import get_logger
@ -45,7 +47,6 @@ from ...keyboard.mappings import KEYMAP
from ... import aiotools from ... import aiotools
from ... import aiomulti from ... import aiomulti
from ... import aioproc from ... import aioproc
from ... import gpio
from ...yamlconf import Option from ...yamlconf import Option
@ -57,7 +58,7 @@ from ...validators.basic import valid_float_f01
from ...validators.os import valid_abs_path from ...validators.os import valid_abs_path
from ...validators.hw import valid_tty_speed from ...validators.hw import valid_tty_speed
from ...validators.hw import valid_gpio_pin from ...validators.hw import valid_gpio_pin_optional
from . import BaseHid from . import BaseHid
@ -156,6 +157,47 @@ class _MouseWheelEvent(_BaseEvent):
return struct.pack(">Bxbxx", 0x14, self.delta_y) return struct.pack(">Bxbxx", 0x14, self.delta_y)
class _Gpio:
def __init__(self, reset_pin: int, reset_delay: float) -> None:
self.__reset_pin = reset_pin
self.__reset_delay = reset_delay
self.__chip: Optional[gpiod.Chip] = None
self.__reset_line: Optional[gpiod.Line] = None
self.__reset_wip = False
def open(self) -> None:
if self.__reset_pin >= 0:
assert self.__chip is None
assert self.__reset_line is None
self.__chip = gpiod.Chip("/dev/gpiochip0")
self.__reset_line = self.__chip.get_line(self.__reset_pin)
self.__reset_line.request("kvmd/hid-serial/reset", gpiod.LINE_REQ_DIR_OUT, default_val=0)
def close(self) -> None:
if self.__chip:
self.__chip.close()
@aiotools.atomic
async def reset(self) -> None:
if self.__reset_pin >= 0:
assert self.__reset_line
if not self.__reset_wip:
try:
self.__reset_wip = True
self.__reset_line.set_value(1)
await asyncio.sleep(self.__reset_delay)
finally:
try:
self.__reset_line.set_value(0)
await asyncio.sleep(1)
finally:
self.__reset_wip = False
get_logger(0).info("Reset HID performed")
else:
get_logger(0).info("Another reset HID in progress")
# ===== # =====
class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-instance-attributes class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-instance-attributes
def __init__( # pylint: disable=too-many-arguments,super-init-not-called def __init__( # pylint: disable=too-many-arguments,super-init-not-called
@ -175,9 +217,6 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
multiprocessing.Process.__init__(self, daemon=True) multiprocessing.Process.__init__(self, daemon=True)
self.__reset_pin = gpio.set_output(reset_pin, False)
self.__reset_delay = reset_delay
self.__device_path = device_path self.__device_path = device_path
self.__speed = speed self.__speed = speed
self.__read_timeout = read_timeout self.__read_timeout = read_timeout
@ -187,7 +226,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
self.__errors_threshold = errors_threshold self.__errors_threshold = errors_threshold
self.__noop = noop self.__noop = noop
self.__reset_wip = False self.__gpio = _Gpio(reset_pin, reset_delay)
self.__events_queue: multiprocessing.queues.Queue = multiprocessing.Queue() self.__events_queue: multiprocessing.queues.Queue = multiprocessing.Queue()
@ -204,7 +243,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
@classmethod @classmethod
def get_plugin_options(cls) -> Dict: def get_plugin_options(cls) -> Dict:
return { return {
"reset_pin": Option(-1, type=valid_gpio_pin), "reset_pin": Option(-1, type=valid_gpio_pin_optional),
"reset_delay": Option(0.1, type=valid_float_f01), "reset_delay": Option(0.1, type=valid_float_f01),
"device": Option("", type=valid_abs_path, unpack_as="device_path"), "device": Option("", type=valid_abs_path, unpack_as="device_path"),
@ -218,6 +257,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
} }
def sysprep(self) -> None: def sysprep(self) -> None:
self.__gpio.open()
get_logger(0).info("Starting HID daemon ...") get_logger(0).info("Starting HID daemon ...")
self.start() self.start()
@ -247,20 +287,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
@aiotools.atomic @aiotools.atomic
async def reset(self) -> None: async def reset(self) -> None:
if not self.__reset_wip: await self.__gpio.reset()
try:
self.__reset_wip = True
gpio.write(self.__reset_pin, True)
await asyncio.sleep(self.__reset_delay)
finally:
try:
gpio.write(self.__reset_pin, False)
await asyncio.sleep(1)
finally:
self.__reset_wip = False
get_logger().info("Reset HID performed")
else:
get_logger().info("Another reset HID in progress")
@aiotools.atomic @aiotools.atomic
async def cleanup(self) -> None: async def cleanup(self) -> None:
@ -279,7 +306,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
except Exception: except Exception:
logger.exception("Can't clear HID events") logger.exception("Can't clear HID events")
finally: finally:
gpio.write(self.__reset_pin, False) self.__gpio.close()
# ===== # =====