gpio diff events mode

This commit is contained in:
Maxim Devaev
2024-10-19 08:59:52 +03:00
parent 3852d0a456
commit 90d8e745e3
6 changed files with 161 additions and 93 deletions

View File

@@ -21,6 +21,7 @@
import asyncio
import copy
from typing import AsyncGenerator
from typing import Callable
@@ -68,12 +69,12 @@ class GpioChannelIsBusyError(IsBusyError, GpioError):
class _GpioInput:
def __init__(
self,
channel: str,
ch: str,
config: Section,
driver: BaseUserGpioDriver,
) -> None:
self.__channel = channel
self.__ch = ch
self.__pin: str = str(config.pin)
self.__inverted: bool = config.inverted
@@ -100,7 +101,7 @@ class _GpioInput:
}
def __str__(self) -> str:
return f"Input({self.__channel}, driver={self.__driver}, pin={self.__pin})"
return f"Input({self.__ch}, driver={self.__driver}, pin={self.__pin})"
__repr__ = __str__
@@ -108,13 +109,13 @@ class _GpioInput:
class _GpioOutput: # pylint: disable=too-many-instance-attributes
def __init__(
self,
channel: str,
ch: str,
config: Section,
driver: BaseUserGpioDriver,
notifier: aiotools.AioNotifier,
) -> None:
self.__channel = channel
self.__ch = ch
self.__pin: str = str(config.pin)
self.__inverted: bool = config.inverted
@@ -224,7 +225,7 @@ class _GpioOutput: # pylint: disable=too-many-instance-attributes
await self.__driver.write(self.__pin, (state ^ self.__inverted))
def __str__(self) -> str:
return f"Output({self.__channel}, driver={self.__driver}, pin={self.__pin})"
return f"Output({self.__ch}, driver={self.__driver}, pin={self.__pin})"
__repr__ = __str__
@@ -232,9 +233,8 @@ class _GpioOutput: # pylint: disable=too-many-instance-attributes
# =====
class UserGpio:
def __init__(self, config: Section, otg_config: Section) -> None:
self.__view = config.view
self.__notifier = aiotools.AioNotifier()
self.__full_state_requested = True
self.__drivers = {
driver: get_ugpio_driver_class(drv_config.type)(
@@ -249,45 +249,64 @@ class UserGpio:
self.__inputs: dict[str, _GpioInput] = {}
self.__outputs: dict[str, _GpioOutput] = {}
for (channel, ch_config) in tools.sorted_kvs(config.scheme):
for (ch, ch_config) in tools.sorted_kvs(config.scheme):
driver = self.__drivers[ch_config.driver]
if ch_config.mode == UserGpioModes.INPUT:
self.__inputs[channel] = _GpioInput(channel, ch_config, driver)
self.__inputs[ch] = _GpioInput(ch, ch_config, driver)
else: # output:
self.__outputs[channel] = _GpioOutput(channel, ch_config, driver, self.__notifier)
self.__outputs[ch] = _GpioOutput(ch, ch_config, driver, self.__notifier)
async def get_model(self) -> dict:
return {
"scheme": {
"inputs": {channel: gin.get_scheme() for (channel, gin) in self.__inputs.items()},
"outputs": {
channel: gout.get_scheme()
for (channel, gout) in self.__outputs.items()
if not gout.is_const()
},
},
"view": self.__make_view(),
}
self.__scheme = self.__make_scheme()
self.__view = self.__make_view(config.view)
async def get_state(self) -> dict:
return {
"inputs": {channel: await gin.get_state() for (channel, gin) in self.__inputs.items()},
"model": {
"scheme": copy.deepcopy(self.__scheme),
"view": copy.deepcopy(self.__view),
},
"state": (await self.__get_io_state()),
}
async def trigger_state(self) -> None:
self.__full_state_requested = True
self.__notifier.notify()
async def poll_state(self) -> AsyncGenerator[dict, None]:
prev: dict = {"inputs": {}, "outputs": {}}
while True: # pylint: disable=too-many-nested-blocks
if self.__full_state_requested:
self.__full_state_requested = False
full = await self.get_state()
prev = copy.deepcopy(full["state"])
yield full
else:
new = await self.__get_io_state()
diff: dict = {}
for sub in ["inputs", "outputs"]:
for ch in new[sub]:
if new[sub][ch] != prev[sub][ch]:
if sub not in diff:
diff[sub] = {}
diff[sub][ch] = new[sub][ch]
if diff:
prev = copy.deepcopy(new)
yield {"state": diff}
await self.__notifier.wait()
async def __get_io_state(self) -> dict:
return {
"inputs": {
ch: (await gin.get_state())
for (ch, gin) in self.__inputs.items()
},
"outputs": {
channel: await gout.get_state()
for (channel, gout) in self.__outputs.items()
ch: (await gout.get_state())
for (ch, gout) in self.__outputs.items()
if not gout.is_const()
},
}
async def poll_state(self) -> AsyncGenerator[dict, None]:
prev_state: dict = {}
while True:
state = await self.get_state()
if state != prev_state:
yield state
prev_state = state
await self.__notifier.wait()
def sysprep(self) -> None:
get_logger(0).info("Preparing User-GPIO drivers ...")
for (_, driver) in tools.sorted_kvs(self.__drivers):
@@ -307,28 +326,43 @@ class UserGpio:
except Exception:
get_logger().exception("Can't cleanup driver %s", driver)
async def switch(self, channel: str, state: bool, wait: bool) -> None:
gout = self.__outputs.get(channel)
async def switch(self, ch: str, state: bool, wait: bool) -> None:
gout = self.__outputs.get(ch)
if gout is None:
raise GpioChannelNotFoundError()
await gout.switch(state, wait)
async def pulse(self, channel: str, delay: float, wait: bool) -> None:
gout = self.__outputs.get(channel)
async def pulse(self, ch: str, delay: float, wait: bool) -> None:
gout = self.__outputs.get(ch)
if gout is None:
raise GpioChannelNotFoundError()
await gout.pulse(delay, wait)
# =====
def __make_view(self) -> dict:
def __make_scheme(self) -> dict:
return {
"header": {"title": self.__make_view_title()},
"table": self.__make_view_table(),
"inputs": {
ch: gin.get_scheme()
for (ch, gin) in self.__inputs.items()
},
"outputs": {
ch: gout.get_scheme()
for (ch, gout) in self.__outputs.items()
if not gout.is_const()
},
}
def __make_view_title(self) -> list[dict]:
raw_title = self.__view["header"]["title"]
# =====
def __make_view(self, view: dict) -> dict:
return {
"header": {"title": self.__make_view_title(view)},
"table": self.__make_view_table(view),
}
def __make_view_title(self, view: dict) -> list[dict]:
raw_title = view["header"]["title"]
title: list[dict] = []
if isinstance(raw_title, list):
for item in raw_title:
@@ -342,9 +376,9 @@ class UserGpio:
title.append(self.__make_item_label(f"#{raw_title}"))
return title
def __make_view_table(self) -> list[list[dict] | None]:
def __make_view_table(self, view: dict) -> list[list[dict] | None]:
table: list[list[dict] | None] = []
for row in self.__view["table"]:
for row in view["table"]:
if len(row) == 0:
table.append(None)
continue