mirror of
https://github.com/mofeng-git/One-KVM.git
synced 2026-01-29 00:51:53 +08:00
using libgpiod for the ugpio driver
This commit is contained in:
@@ -24,12 +24,10 @@ from typing import Dict
|
||||
from typing import Set
|
||||
from typing import Optional
|
||||
|
||||
import gpiod
|
||||
|
||||
from ... import aiotools
|
||||
from ... import gpio
|
||||
|
||||
from ...yamlconf import Option
|
||||
|
||||
from ...validators.basic import valid_float_f01
|
||||
from ... import aiogp
|
||||
|
||||
from . import BaseUserGpioDriver
|
||||
|
||||
@@ -40,24 +38,17 @@ class Plugin(BaseUserGpioDriver):
|
||||
self,
|
||||
instance_name: str,
|
||||
notifier: aiotools.AioNotifier,
|
||||
|
||||
state_poll: float,
|
||||
) -> None:
|
||||
|
||||
super().__init__(instance_name, notifier)
|
||||
|
||||
self.__state_poll = state_poll
|
||||
|
||||
self.__input_pins: Set[int] = set()
|
||||
self.__output_pins: Dict[int, Optional[bool]] = {}
|
||||
|
||||
self.__reader: Optional[gpio.BatchReader] = None
|
||||
self.__reader: Optional[aiogp.AioPinsReader] = None
|
||||
|
||||
@classmethod
|
||||
def get_plugin_options(cls) -> Dict:
|
||||
return {
|
||||
"state_poll": Option(0.1, type=valid_float_f01),
|
||||
}
|
||||
self.__chip: Optional[gpiod.Chip] = None
|
||||
self.__output_lines: Dict[int, gpiod.Line] = {}
|
||||
|
||||
def register_input(self, pin: int) -> None:
|
||||
self.__input_pins.add(pin)
|
||||
@@ -67,32 +58,38 @@ class Plugin(BaseUserGpioDriver):
|
||||
|
||||
def prepare(self) -> None:
|
||||
assert self.__reader is None
|
||||
self.__reader = gpio.BatchReader(
|
||||
pins=set([
|
||||
*map(gpio.set_input, self.__input_pins),
|
||||
*[
|
||||
gpio.set_output(pin, initial)
|
||||
for (pin, initial) in self.__output_pins.items()
|
||||
],
|
||||
]),
|
||||
interval=self.__state_poll,
|
||||
self.__reader = aiogp.AioPinsReader(
|
||||
path="/dev/gpiochip0",
|
||||
consumer="kvmd/ugpio-gpio/inputs",
|
||||
pins=dict.fromkeys(self.__input_pins, False),
|
||||
notifier=self._notifier,
|
||||
)
|
||||
|
||||
self.__chip = gpiod.Chip("/dev/gpiochip0")
|
||||
for (pin, initial) in self.__output_pins.items():
|
||||
line = self.__chip.get_line(pin)
|
||||
line.request("kvmd/ugpio-gpio/outputs", gpiod.LINE_REQ_DIR_OUT, default_val=int(initial or False))
|
||||
self.__output_lines[pin] = line
|
||||
|
||||
async def run(self) -> None:
|
||||
assert self.__reader
|
||||
await self.__reader.poll()
|
||||
|
||||
def cleanup(self) -> None:
|
||||
for (pin, initial) in self.__output_pins.items():
|
||||
if initial is not None:
|
||||
gpio.write(pin, initial)
|
||||
if self.__chip:
|
||||
try:
|
||||
self.__chip.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def read(self, pin: int) -> bool:
|
||||
return gpio.read(pin)
|
||||
assert self.__reader
|
||||
if pin in self.__input_pins:
|
||||
return self.__reader.get(pin)
|
||||
return bool(self.__output_lines[pin].get_value())
|
||||
|
||||
def write(self, pin: int, state: bool) -> None:
|
||||
gpio.write(pin, state)
|
||||
self.__output_lines[pin].set_value(int(state))
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"GPIO({self._instance_name})"
|
||||
|
||||
Reference in New Issue
Block a user