mirror of
https://github.com/mofeng-git/One-KVM.git
synced 2026-01-31 18:11:54 +08:00
user gpio
This commit is contained in:
35
kvmd/gpio.py
35
kvmd/gpio.py
@@ -20,14 +20,20 @@
|
||||
# ========================================================================== #
|
||||
|
||||
|
||||
import asyncio
|
||||
import contextlib
|
||||
|
||||
from typing import Tuple
|
||||
from typing import List
|
||||
from typing import Generator
|
||||
from typing import Optional
|
||||
|
||||
from RPi import GPIO
|
||||
|
||||
from .logging import get_logger
|
||||
|
||||
from . import aiotools
|
||||
|
||||
|
||||
# =====
|
||||
@contextlib.contextmanager
|
||||
@@ -59,6 +65,31 @@ def read(pin: int) -> bool:
|
||||
return bool(GPIO.input(pin))
|
||||
|
||||
|
||||
def write(pin: int, flag: bool) -> None:
|
||||
def write(pin: int, state: bool) -> None:
|
||||
assert pin >= 0, pin
|
||||
GPIO.output(pin, flag)
|
||||
GPIO.output(pin, state)
|
||||
|
||||
|
||||
class BatchReader:
|
||||
def __init__(self, pins: List[int], interval: float, notifier: aiotools.AioNotifier) -> None:
|
||||
self.__pins = pins
|
||||
self.__flags: Tuple[Optional[bool], ...] = (None,) * len(pins)
|
||||
self.__state = dict.fromkeys(pins, False)
|
||||
|
||||
self.__interval = interval
|
||||
self.__notifier = notifier
|
||||
|
||||
def get(self, pin: int) -> bool:
|
||||
return self.__state[pin]
|
||||
|
||||
async def poll(self) -> None:
|
||||
if not self.__pins:
|
||||
await aiotools.wait_infinite()
|
||||
else:
|
||||
while True:
|
||||
flags = tuple(map(read, self.__pins))
|
||||
if flags != self.__flags:
|
||||
self.__flags = flags
|
||||
self.__state = dict(zip(self.__pins, flags))
|
||||
await self.__notifier.notify()
|
||||
await asyncio.sleep(self.__interval)
|
||||
|
||||
Reference in New Issue
Block a user