refactoring and reuse gpio pulse code

This commit is contained in:
Devaev Maxim 2020-09-13 18:23:28 +03:00
parent 41223fa8b2
commit 1e6ab4672f
4 changed files with 32 additions and 41 deletions

View File

@ -23,7 +23,7 @@
import asyncio import asyncio
import threading import threading
from typing import List from typing import Dict
from typing import Optional from typing import Optional
import gpiod import gpiod
@ -32,23 +32,29 @@ from . import aiotools
# ===== # =====
async def pulse(line: gpiod.Line, delay: float, final: float) -> None:
try:
line.set_value(1)
await asyncio.sleep(delay)
finally:
line.set_value(0)
await asyncio.sleep(final)
class AioPinsReader(threading.Thread): class AioPinsReader(threading.Thread):
def __init__( def __init__(
self, self,
path: str, path: str,
consumer: str, consumer: str,
pins: List[int], pins: Dict[int, bool],
inverted: List[bool],
notifier: aiotools.AioNotifier, notifier: aiotools.AioNotifier,
) -> None: ) -> None:
assert len(pins) == len(inverted)
super().__init__(daemon=True) super().__init__(daemon=True)
self.__path = path self.__path = path
self.__consumer = consumer self.__consumer = consumer
self.__pins = pins self.__pins = pins
self.__inverted = dict(zip(pins, inverted))
self.__notifier = notifier self.__notifier = notifier
self.__state = dict.fromkeys(pins, False) self.__state = dict.fromkeys(pins, False)
@ -57,7 +63,7 @@ class AioPinsReader(threading.Thread):
self.__loop: Optional[asyncio.AbstractEventLoop] = None self.__loop: Optional[asyncio.AbstractEventLoop] = None
def get(self, pin: int) -> bool: def get(self, pin: int) -> bool:
return (self.__state[pin] ^ self.__inverted[pin]) return (self.__state[pin] ^ self.__pins[pin])
async def poll(self) -> None: async def poll(self) -> None:
if not self.__pins: if not self.__pins:
@ -75,13 +81,14 @@ class AioPinsReader(threading.Thread):
def run(self) -> None: def run(self) -> None:
assert self.__loop assert self.__loop
with gpiod.Chip(self.__path) as chip: with gpiod.Chip(self.__path) as chip:
lines = chip.get_lines(self.__pins) pins = sorted(self.__pins)
lines = chip.get_lines(pins)
lines.request(self.__consumer, gpiod.LINE_REQ_EV_BOTH_EDGES) lines.request(self.__consumer, gpiod.LINE_REQ_EV_BOTH_EDGES)
lines.event_wait(nsec=1) lines.event_wait(nsec=1)
self.__state = { self.__state = {
pin: bool(value) pin: bool(value)
for (pin, value) in zip(self.__pins, lines.get_values()) for (pin, value) in zip(pins, lines.get_values())
} }
self.__loop.call_soon_threadsafe(self.__notifier.notify_sync) self.__loop.call_soon_threadsafe(self.__notifier.notify_sync)

View File

@ -20,8 +20,6 @@
# ========================================================================== # # ========================================================================== #
import asyncio
from typing import Dict from typing import Dict
from typing import AsyncGenerator from typing import AsyncGenerator
from typing import Optional from typing import Optional
@ -77,8 +75,10 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes
self.__reader = aiogp.AioPinsReader( self.__reader = aiogp.AioPinsReader(
path="/dev/gpiochip0", path="/dev/gpiochip0",
consumer="kvmd/atx-gpio/leds", consumer="kvmd/atx-gpio/leds",
pins=[power_led_pin, hdd_led_pin], pins={
inverted=[power_led_inverted, hdd_led_inverted], power_led_pin: power_led_inverted,
hdd_led_pin: hdd_led_inverted,
},
notifier=self.__notifier, notifier=self.__notifier,
) )
@ -185,10 +185,5 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes
@aiotools.atomic @aiotools.atomic
async def __inner_click(self, name: str, line: gpiod.Line, delay: float) -> None: async def __inner_click(self, name: str, line: gpiod.Line, delay: float) -> None:
try: await aiogp.pulse(line, delay, 1)
line.set_value(1)
await asyncio.sleep(delay)
finally:
line.set_value(0)
await asyncio.sleep(1)
get_logger(0).info("Clicked ATX button %r", name) get_logger(0).info("Clicked ATX button %r", name)

View File

@ -21,7 +21,6 @@
import os import os
import asyncio
import multiprocessing import multiprocessing
import multiprocessing.queues import multiprocessing.queues
import dataclasses import dataclasses
@ -47,6 +46,7 @@ from ...keyboard.mappings import KEYMAP
from ... import aiotools from ... import aiotools
from ... import aiomulti from ... import aiomulti
from ... import aioproc from ... import aioproc
from ... import aiogp
from ...yamlconf import Option from ...yamlconf import Option
@ -186,14 +186,9 @@ class _Gpio:
if self.__reset_pin >= 0: if self.__reset_pin >= 0:
assert self.__reset_line assert self.__reset_line
if not self.__reset_wip: if not self.__reset_wip:
try:
self.__reset_wip = True self.__reset_wip = True
self.__reset_line.set_value(1)
await asyncio.sleep(self.__reset_delay)
finally:
try: try:
self.__reset_line.set_value(0) await aiogp.pulse(self.__reset_line, self.__reset_delay, 1)
await asyncio.sleep(1)
finally: finally:
self.__reset_wip = False self.__reset_wip = False
get_logger(0).info("Reset HID performed") get_logger(0).info("Reset HID performed")

View File

@ -41,6 +41,7 @@ from ...logging import get_logger
from ... import aiotools from ... import aiotools
from ... import aiofs from ... import aiofs
from ... import aiogp
from ...yamlconf import Option from ...yamlconf import Option
@ -196,16 +197,9 @@ class _Gpio:
assert self.__target_line assert self.__target_line
self.__target_line.set_value(1) self.__target_line.set_value(1)
@contextlib.asynccontextmanager async def reset(self) -> None:
async def reset(self) -> AsyncGenerator[None, None]:
assert self.__reset_line assert self.__reset_line
try: await aiogp.pulse(self.__reset_line, self.__reset_delay, 0)
self.__reset_line.set_value(1)
await asyncio.sleep(self.__reset_delay)
self.__reset_line.set_value(0)
yield
finally:
self.__reset_line.set_value(0)
# ===== # =====
@ -302,7 +296,7 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
@aiotools.atomic @aiotools.atomic
async def __inner_reset(self) -> None: async def __inner_reset(self) -> None:
async with self.__gpio.reset(): await self.__gpio.reset()
self.__gpio.switch_to_local() self.__gpio.switch_to_local()
self.__connected = False self.__connected = False
await self.__load_device_info() await self.__load_device_info()