debounce for gpiod AioReader

This commit is contained in:
Devaev Maxim
2020-09-16 00:03:44 +03:00
parent 7cdf5976a8
commit 00069931c1
7 changed files with 98 additions and 42 deletions

View File

@@ -22,7 +22,9 @@
import os import os
import asyncio import asyncio
import asyncio.queues
import threading import threading
import dataclasses
from typing import Tuple from typing import Tuple
from typing import Dict from typing import Dict
@@ -49,12 +51,19 @@ async def pulse(line: gpiod.Line, delay: float, final: float) -> None:
await asyncio.sleep(final) await asyncio.sleep(final)
class AioPinsReader: # pylint: disable=too-many-instance-attributes # =====
@dataclasses.dataclass(frozen=True)
class AioReaderPinParams:
inverted: bool
debounce: float
class AioReader: # pylint: disable=too-many-instance-attributes
def __init__( def __init__(
self, self,
path: str, path: str,
consumer: str, consumer: str,
pins: Dict[int, bool], # (pin, inverted) pins: Dict[int, AioReaderPinParams],
notifier: aiotools.AioNotifier, notifier: aiotools.AioNotifier,
) -> None: ) -> None:
@@ -63,15 +72,16 @@ class AioPinsReader: # pylint: disable=too-many-instance-attributes
self.__pins = pins self.__pins = pins
self.__notifier = notifier self.__notifier = notifier
self.__state = dict.fromkeys(pins, 0) self.__values: Optional[Dict[int, _DebouncedValue]] = None
self.__loop: Optional[asyncio.AbstractEventLoop] = None
self.__thread = threading.Thread(target=self.__run, daemon=True) self.__thread = threading.Thread(target=self.__run, daemon=True)
self.__stop_event = threading.Event() self.__stop_event = threading.Event()
self.__loop: Optional[asyncio.AbstractEventLoop] = None
def get(self, pin: int) -> bool: def get(self, pin: int) -> bool:
return (bool(self.__state[pin]) ^ self.__pins[pin]) value = (self.__values[pin].get() if self.__values is not None else False)
return (value ^ self.__pins[pin].inverted)
async def poll(self) -> None: async def poll(self) -> None:
if not self.__pins: if not self.__pins:
@@ -87,37 +97,39 @@ class AioPinsReader: # pylint: disable=too-many-instance-attributes
await aiotools.run_async(self.__thread.join) await aiotools.run_async(self.__thread.join)
def __run(self) -> None: def __run(self) -> None:
assert self.__values is None
assert self.__loop
with gpiod.Chip(self.__path) as chip: with gpiod.Chip(self.__path) as chip:
pins = sorted(self.__pins) pins = sorted(self.__pins)
lines = chip.get_lines(pins) lines = chip.get_lines(pins)
lines.request(self.__consumer, gpiod.LINE_REQ_EV_BOTH_EDGES) lines.request(self.__consumer, gpiod.LINE_REQ_EV_BOTH_EDGES)
def read_state() -> Dict[int, int]:
return dict(zip(pins, lines.get_values()))
lines.event_wait(nsec=1) lines.event_wait(nsec=1)
self.__state = read_state() self.__values = {
self.__notify() pin: _DebouncedValue(
initial=bool(value),
debounce=self.__pins[pin].debounce,
notifier=self.__notifier,
loop=self.__loop,
)
for (pin, value) in zip(pins, lines.get_values())
}
self.__loop.call_soon_threadsafe(self.__notifier.notify_sync)
while not self.__stop_event.is_set(): while not self.__stop_event.is_set():
changed = False
ev_lines = lines.event_wait(1) ev_lines = lines.event_wait(1)
if ev_lines: if ev_lines:
for ev_line in ev_lines: for ev_line in ev_lines:
events = ev_line.event_read_multiple() events = ev_line.event_read_multiple()
if events: if events:
(pin, value) = self.__parse_event(events[-1]) (pin, value) = self.__parse_event(events[-1])
if self.__state[pin] != value: self.__values[pin].set(bool(value))
self.__state[pin] = value
changed = True
else: # Timeout else: # Timeout
# Ensure state to avoid driver bugs # Размер буфера ядра - 16 эвентов на линии. При превышении этого числа,
state = read_state() # новые эвенты потеряются. Это не баг, это фича, как мне объяснили в LKML.
if self.__state != state: # Штош. Будем с этим жить и синхронизировать состояния при таймауте.
self.__state = state for (pin, value) in zip(pins, lines.get_values()):
changed = True self.__values[pin].set(bool(value))
if changed:
self.__notify()
def __parse_event(self, event: gpiod.LineEvent) -> Tuple[int, int]: def __parse_event(self, event: gpiod.LineEvent) -> Tuple[int, int]:
pin = event.source.offset() pin = event.source.offset()
@@ -127,6 +139,42 @@ class AioPinsReader: # pylint: disable=too-many-instance-attributes
return (pin, 0) return (pin, 0)
raise RuntimeError(f"Invalid event {event} type: {event.type}") raise RuntimeError(f"Invalid event {event} type: {event.type}")
def __notify(self) -> None:
assert self.__loop class _DebouncedValue:
self.__loop.call_soon_threadsafe(self.__notifier.notify_sync) def __init__(
self,
initial: bool,
debounce: float,
notifier: aiotools.AioNotifier,
loop: asyncio.AbstractEventLoop,
) -> None:
self.__value = initial
self.__debounce = debounce
self.__notifier = notifier
self.__loop = loop
self.__queue: asyncio.queues.Queue = asyncio.Queue(loop=loop)
self.__task = loop.create_task(self.__consumer())
def set(self, value: bool) -> None:
if self.__loop.is_running():
self.__check_alive()
self.__loop.call_soon_threadsafe(self.__queue.put_nowait, value)
def get(self) -> bool:
return self.__value
def __check_alive(self) -> None:
if self.__task.done() and not self.__task.cancelled():
raise RuntimeError("Dead debounce consumer")
async def __consumer(self) -> None:
while True:
value = await self.__queue.get()
while not self.__queue.empty():
value = await self.__queue.get()
if self.__value != value:
self.__value = value
await self.__notifier.notify()
await asyncio.sleep(self.__debounce)

View File

@@ -227,7 +227,9 @@ def _patch_dynamic( # pylint: disable=too-many-locals
"min_delay": Option(0.1, type=valid_float_f01), "min_delay": Option(0.1, type=valid_float_f01),
"max_delay": Option(0.1, type=valid_float_f01), "max_delay": Option(0.1, type=valid_float_f01),
}, },
} if mode == UserGpioModes.OUTPUT else {}) } if mode == UserGpioModes.OUTPUT else { # input
"debounce": Option(0.1, type=valid_float_f0),
})
} }
rebuild = True rebuild = True

View File

@@ -81,7 +81,7 @@ class _GpioInput:
self.__inverted: bool = config.inverted self.__inverted: bool = config.inverted
self.__driver = driver self.__driver = driver
self.__driver.register_input(self.__pin) self.__driver.register_input(self.__pin, config.debounce)
def get_scheme(self) -> Dict: def get_scheme(self) -> Dict:
return { return {

View File

@@ -34,6 +34,7 @@ from ... import aiogp
from ...yamlconf import Option from ...yamlconf import Option
from ...validators.basic import valid_bool from ...validators.basic import valid_bool
from ...validators.basic import valid_float_f0
from ...validators.basic import valid_float_f01 from ...validators.basic import valid_float_f01
from ...validators.hw import valid_gpio_pin from ...validators.hw import valid_gpio_pin
@@ -47,9 +48,12 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes
def __init__( # pylint: disable=too-many-arguments,super-init-not-called def __init__( # pylint: disable=too-many-arguments,super-init-not-called
self, self,
power_led_pin: int, power_led_pin: int,
hdd_led_pin: int,
power_led_inverted: bool, power_led_inverted: bool,
power_led_debounce: float,
hdd_led_pin: int,
hdd_led_inverted: bool, hdd_led_inverted: bool,
hdd_led_debounce: float,
power_switch_pin: int, power_switch_pin: int,
reset_switch_pin: int, reset_switch_pin: int,
@@ -72,12 +76,12 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes
self.__power_switch_line: Optional[gpiod.Line] = None self.__power_switch_line: Optional[gpiod.Line] = None
self.__reset_switch_line: Optional[gpiod.Line] = None self.__reset_switch_line: Optional[gpiod.Line] = None
self.__reader = aiogp.AioPinsReader( self.__reader = aiogp.AioReader(
path=aiogp.DEVICE_PATH, path=aiogp.DEVICE_PATH,
consumer="kvmd/atx-gpio/leds", consumer="kvmd/atx-gpio/leds",
pins={ pins={
power_led_pin: power_led_inverted, power_led_pin: aiogp.AioReaderPinParams(power_led_inverted, power_led_debounce),
hdd_led_pin: hdd_led_inverted, hdd_led_pin: aiogp.AioReaderPinParams(hdd_led_inverted, hdd_led_debounce),
}, },
notifier=self.__notifier, notifier=self.__notifier,
) )
@@ -86,9 +90,12 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes
def get_plugin_options(cls) -> Dict: def get_plugin_options(cls) -> Dict:
return { return {
"power_led_pin": Option(-1, type=valid_gpio_pin), "power_led_pin": Option(-1, type=valid_gpio_pin),
"hdd_led_pin": Option(-1, type=valid_gpio_pin),
"power_led_inverted": Option(False, type=valid_bool), "power_led_inverted": Option(False, type=valid_bool),
"hdd_led_inverted": Option(False, type=valid_bool), "power_led_debounce": Option(0.1, type=valid_float_f0),
"hdd_led_pin": Option(-1, type=valid_gpio_pin),
"hdd_led_inverted": Option(False, type=valid_bool),
"hdd_led_debounce": Option(0.1, type=valid_float_f0),
"power_switch_pin": Option(-1, type=valid_gpio_pin), "power_switch_pin": Option(-1, type=valid_gpio_pin),
"reset_switch_pin": Option(-1, type=valid_gpio_pin), "reset_switch_pin": Option(-1, type=valid_gpio_pin),

View File

@@ -74,7 +74,7 @@ class BaseUserGpioDriver(BasePlugin):
def get_modes(cls) -> Set[str]: def get_modes(cls) -> Set[str]:
return set(UserGpioModes.ALL) return set(UserGpioModes.ALL)
def register_input(self, pin: int) -> None: def register_input(self, pin: int, debounce: float) -> None:
raise NotImplementedError raise NotImplementedError
def register_output(self, pin: int, initial: Optional[bool]) -> None: def register_output(self, pin: int, initial: Optional[bool]) -> None:

View File

@@ -21,7 +21,6 @@
from typing import Dict from typing import Dict
from typing import Set
from typing import Optional from typing import Optional
import gpiod import gpiod
@@ -42,26 +41,26 @@ class Plugin(BaseUserGpioDriver):
super().__init__(instance_name, notifier) super().__init__(instance_name, notifier)
self.__input_pins: Set[int] = set() self.__input_pins: Dict[int, aiogp.AioReaderPinParams] = {}
self.__output_pins: Dict[int, Optional[bool]] = {} self.__output_pins: Dict[int, Optional[bool]] = {}
self.__reader: Optional[aiogp.AioPinsReader] = None self.__reader: Optional[aiogp.AioReader] = None
self.__chip: Optional[gpiod.Chip] = None self.__chip: Optional[gpiod.Chip] = None
self.__output_lines: Dict[int, gpiod.Line] = {} self.__output_lines: Dict[int, gpiod.Line] = {}
def register_input(self, pin: int) -> None: def register_input(self, pin: int, debounce: float) -> None:
self.__input_pins.add(pin) self.__input_pins[pin] = aiogp.AioReaderPinParams(False, debounce)
def register_output(self, pin: int, initial: Optional[bool]) -> None: def register_output(self, pin: int, initial: Optional[bool]) -> None:
self.__output_pins[pin] = initial self.__output_pins[pin] = initial
def prepare(self) -> None: def prepare(self) -> None:
assert self.__reader is None assert self.__reader is None
self.__reader = aiogp.AioPinsReader( self.__reader = aiogp.AioReader(
path=aiogp.DEVICE_PATH, path=aiogp.DEVICE_PATH,
consumer="kvmd/ugpio-gpio/inputs", consumer="kvmd/ugpio-gpio/inputs",
pins=dict.fromkeys(self.__input_pins, False), pins=self.__input_pins,
notifier=self._notifier, notifier=self._notifier,
) )

View File

@@ -79,7 +79,7 @@ class Plugin(BaseUserGpioDriver):
def get_modes(cls) -> Set[str]: def get_modes(cls) -> Set[str]:
return set([UserGpioModes.OUTPUT]) return set([UserGpioModes.OUTPUT])
def register_input(self, pin: int) -> None: def register_input(self, pin: int, debounce: float) -> None:
raise RuntimeError(f"Unsupported mode 'input' for pin={pin} on {self}") raise RuntimeError(f"Unsupported mode 'input' for pin={pin} on {self}")
def register_output(self, pin: int, initial: Optional[bool]) -> None: def register_output(self, pin: int, initial: Optional[bool]) -> None: