mirror of
https://github.com/mofeng-git/One-KVM.git
synced 2026-01-29 09:01:54 +08:00
libgpiod 2.x api
This commit is contained in:
@@ -54,9 +54,7 @@ class Plugin(BaseUserGpioDriver):
|
||||
self.__output_pins: dict[int, (bool | None)] = {}
|
||||
|
||||
self.__reader: (aiogp.AioReader | None) = None
|
||||
|
||||
self.__chip: (gpiod.Chip | None) = None
|
||||
self.__output_lines: dict[int, gpiod.Line] = {}
|
||||
self.__outputs_request: (gpiod.LineRequest | None) = None
|
||||
|
||||
@classmethod
|
||||
def get_plugin_options(cls) -> dict:
|
||||
@@ -76,27 +74,34 @@ class Plugin(BaseUserGpioDriver):
|
||||
|
||||
def prepare(self) -> None:
|
||||
assert self.__reader is None
|
||||
assert self.__outputs_request is None
|
||||
self.__reader = aiogp.AioReader(
|
||||
path=self.__device_path,
|
||||
consumer="kvmd::gpio::inputs",
|
||||
pins=self.__input_pins,
|
||||
notifier=self._notifier,
|
||||
)
|
||||
|
||||
self.__chip = gpiod.Chip(self.__device_path)
|
||||
for (pin, initial) in self.__output_pins.items():
|
||||
line = self.__chip.get_line(pin)
|
||||
line.request("kvmd::gpio::outputs", gpiod.LINE_REQ_DIR_OUT, default_vals=[int(initial or False)])
|
||||
self.__output_lines[pin] = line
|
||||
if self.__output_pins:
|
||||
self.__outputs_request = gpiod.request_lines(
|
||||
self.__device_path,
|
||||
consumer="kvmd::gpiod::outputs",
|
||||
config={
|
||||
pin: gpiod.LineSettings(
|
||||
direction=gpiod.line.Direction.OUTPUT,
|
||||
output_value=gpiod.line.Value(initial or False),
|
||||
)
|
||||
for (pin, initial) in self.__output_pins.items()
|
||||
},
|
||||
)
|
||||
|
||||
async def run(self) -> None:
|
||||
assert self.__reader
|
||||
await self.__reader.poll()
|
||||
|
||||
async def cleanup(self) -> None:
|
||||
if self.__chip:
|
||||
if self.__outputs_request:
|
||||
try:
|
||||
self.__chip.close()
|
||||
self.__outputs_request.release()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
@@ -105,10 +110,15 @@ class Plugin(BaseUserGpioDriver):
|
||||
pin_int = int(pin)
|
||||
if pin_int in self.__input_pins:
|
||||
return self.__reader.get(pin_int)
|
||||
return bool(self.__output_lines[pin_int].get_value())
|
||||
assert self.__outputs_request
|
||||
assert pin_int in self.__output_pins
|
||||
return bool(self.__outputs_request.get_value(pin_int).value)
|
||||
|
||||
async def write(self, pin: str, state: bool) -> None:
|
||||
self.__output_lines[int(pin)].set_value(int(state))
|
||||
assert self.__outputs_request
|
||||
pin_int = int(pin)
|
||||
assert pin_int in self.__output_pins
|
||||
self.__outputs_request.set_value(pin_int, gpiod.line.Value(state))
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"GPIO({self._instance_name})"
|
||||
|
||||
@@ -53,9 +53,7 @@ class Plugin(BaseUserGpioDriver):
|
||||
self.__device_path = device_path
|
||||
|
||||
self.__tasks: dict[int, (asyncio.Task | None)] = {}
|
||||
|
||||
self.__chip: (gpiod.Chip | None) = None
|
||||
self.__lines: dict[int, gpiod.Line] = {}
|
||||
self.__line_request: (gpiod.LineRequest | None) = None
|
||||
|
||||
@classmethod
|
||||
def get_plugin_options(cls) -> dict:
|
||||
@@ -76,11 +74,16 @@ class Plugin(BaseUserGpioDriver):
|
||||
self.__tasks[int(pin)] = None
|
||||
|
||||
def prepare(self) -> None:
|
||||
self.__chip = gpiod.Chip(self.__device_path)
|
||||
for pin in self.__tasks:
|
||||
line = self.__chip.get_line(pin)
|
||||
line.request("kvmd::locator::outputs", gpiod.LINE_REQ_DIR_OUT, default_vals=[0])
|
||||
self.__lines[pin] = line
|
||||
self.__line_request = gpiod.request_lines(
|
||||
self.__device_path,
|
||||
consumer="kvmd::locator",
|
||||
config={
|
||||
tuple(self.__tasks): gpiod.LineSettings(
|
||||
direction=gpiod.line.Direction.OUTPUT,
|
||||
output_value=gpiod.line.Value(False),
|
||||
),
|
||||
},
|
||||
)
|
||||
|
||||
async def cleanup(self) -> None:
|
||||
tasks = [
|
||||
@@ -91,9 +94,9 @@ class Plugin(BaseUserGpioDriver):
|
||||
for task in tasks:
|
||||
task.cancel()
|
||||
await asyncio.gather(*tasks, return_exceptions=True)
|
||||
if self.__chip:
|
||||
if self.__line_request:
|
||||
try:
|
||||
self.__chip.close()
|
||||
self.__line_request.release()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
@@ -111,17 +114,18 @@ class Plugin(BaseUserGpioDriver):
|
||||
self.__tasks[pin_int] = None
|
||||
|
||||
async def __blink(self, pin: int) -> None:
|
||||
line = self.__lines[pin]
|
||||
assert pin in self.__tasks
|
||||
assert self.__line_request
|
||||
try:
|
||||
state = 1
|
||||
state = True
|
||||
while True:
|
||||
line.set_value(state)
|
||||
state = int(not state)
|
||||
self.__line_request.set_value(pin, gpiod.line.Value(state))
|
||||
state = (not state)
|
||||
await asyncio.sleep(0.1)
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
finally:
|
||||
line.set_value(0)
|
||||
self.__line_request.set_value(pin, gpiod.line.Value(False))
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"Locator({self._instance_name})"
|
||||
|
||||
Reference in New Issue
Block a user