One-KVM/kvmd/apps/kvmd/ugpio.py
2020-09-22 04:42:39 +03:00

373 lines
13 KiB
Python

# ========================================================================== #
# #
# KVMD - The main Pi-KVM daemon. #
# #
# Copyright (C) 2018 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
# ========================================================================== #
import asyncio
from typing import List
from typing import Dict
from typing import AsyncGenerator
from typing import Callable
from typing import Optional
from typing import Any
from ...logging import get_logger
from ...plugins.ugpio import GpioError
from ...plugins.ugpio import GpioOperationError
from ...plugins.ugpio import GpioDriverOfflineError
from ...plugins.ugpio import UserGpioModes
from ...plugins.ugpio import BaseUserGpioDriver
from ...plugins.ugpio import get_ugpio_driver_class
from ... import tools
from ... import aiotools
from ...yamlconf import Section
from ...errors import IsBusyError
# =====
class GpioChannelNotFoundError(GpioOperationError):
def __init__(self) -> None:
super().__init__("GPIO channel is not found")
class GpioSwitchNotSupported(GpioOperationError):
def __init__(self) -> None:
super().__init__("This GPIO channel does not support switching")
class GpioPulseNotSupported(GpioOperationError):
def __init__(self) -> None:
super().__init__("This GPIO channel does not support pulsing")
class GpioChannelIsBusyError(IsBusyError, GpioError):
def __init__(self) -> None:
super().__init__("Performing another GPIO operation on this channel, please try again later")
# =====
class _GpioInput:
def __init__(
self,
channel: str,
config: Section,
driver: BaseUserGpioDriver,
) -> None:
self.__channel = channel
self.__pin: int = config.pin
self.__inverted: bool = config.inverted
self.__driver = driver
self.__driver.register_input(self.__pin, config.debounce)
def get_scheme(self) -> Dict:
return {
"hw": {
"driver": self.__driver.get_instance_id(),
"pin": self.__pin,
},
}
def get_state(self) -> Dict:
(online, state) = (True, False)
try:
state = (self.__driver.read(self.__pin) ^ self.__inverted)
except GpioDriverOfflineError:
online = False
return {
"online": online,
"state": state,
}
def __str__(self) -> str:
return f"Input({self.__channel}, driver={self.__driver}, pin={self.__pin})"
__repr__ = __str__
class _GpioOutput: # pylint: disable=too-many-instance-attributes
def __init__(
self,
channel: str,
config: Section,
driver: BaseUserGpioDriver,
notifier: aiotools.AioNotifier,
) -> None:
self.__channel = channel
self.__pin: int = config.pin
self.__inverted: bool = config.inverted
self.__switch: bool = config.switch
self.__pulse_delay = 0.0
self.__min_pulse_delay = 0.0
self.__max_pulse_delay = 0.0
if config.pulse.delay:
assert config.pulse.max_delay > 0
self.__pulse_delay = min(max(config.pulse.delay, config.pulse.min_delay), config.pulse.max_delay)
self.__min_pulse_delay = config.pulse.min_delay
self.__max_pulse_delay = config.pulse.max_delay
self.__busy_delay: float = config.busy_delay
self.__driver = driver
self.__driver.register_output(self.__pin, (None if config.initial is None else (config.initial ^ config.inverted)))
self.__region = aiotools.AioExclusiveRegion(GpioChannelIsBusyError, notifier)
def get_scheme(self) -> Dict:
return {
"switch": self.__switch,
"pulse": {
"delay": self.__pulse_delay,
"min_delay": self.__min_pulse_delay,
"max_delay": self.__max_pulse_delay,
},
"hw": {
"driver": self.__driver.get_instance_id(),
"pin": self.__pin,
},
}
def get_state(self) -> Dict:
busy = self.__region.is_busy()
(online, state) = (True, False)
if not busy:
try:
state = self.__read()
except GpioDriverOfflineError:
online = False
return {
"online": online,
"state": state,
"busy": busy,
}
async def switch(self, state: bool, wait: bool) -> None:
if not self.__switch:
raise GpioSwitchNotSupported()
await self.__run_action(wait, "switch", self.__inner_switch, state)
@aiotools.atomic
async def pulse(self, delay: float, wait: bool) -> None:
if not self.__pulse_delay:
raise GpioPulseNotSupported()
delay = min(max((delay or self.__pulse_delay), self.__min_pulse_delay), self.__max_pulse_delay)
await self.__run_action(wait, "pulse", self.__inner_pulse, delay)
# =====
@aiotools.atomic
async def __run_action(self, wait: bool, name: str, method: Callable, *args: Any) -> None:
if wait:
async with self.__region:
await method(*args)
else:
await aiotools.run_region_task(
f"Can't perform {name} of {self} or operation was not completed",
self.__region, self.__action_task_wrapper, name, method, *args,
)
@aiotools.atomic
async def __action_task_wrapper(self, name: str, method: Callable, *args: Any) -> None:
try:
return (await method(*args))
except GpioDriverOfflineError:
get_logger(0).error("Can't perform %s of %s or operation was not completed: driver offline", name, self)
@aiotools.atomic
async def __inner_switch(self, state: bool) -> None:
self.__write(state)
get_logger(0).info("Ensured switch %s to state=%d", self, state)
await asyncio.sleep(self.__busy_delay)
@aiotools.atomic
async def __inner_pulse(self, delay: float) -> None:
try:
self.__write(True)
await asyncio.sleep(delay)
finally:
self.__write(False)
await asyncio.sleep(self.__busy_delay)
get_logger(0).info("Pulsed %s with delay=%.2f", self, delay)
# =====
def __read(self) -> bool:
return (self.__driver.read(self.__pin) ^ self.__inverted)
def __write(self, state: bool) -> None:
self.__driver.write(self.__pin, (state ^ self.__inverted))
def __str__(self) -> str:
return f"Output({self.__channel}, driver={self.__driver}, pin={self.__pin})"
__repr__ = __str__
# =====
class UserGpio:
def __init__(self, config: Section) -> None:
self.__view = config.view
self.__notifier = aiotools.AioNotifier()
self.__drivers = {
driver: get_ugpio_driver_class(drv_config.type)(
instance_name=driver,
notifier=self.__notifier,
**drv_config._unpack(ignore=["instance_name", "notifier", "type"]),
)
for (driver, drv_config) in tools.sorted_kvs(config.drivers)
}
self.__inputs: Dict[str, _GpioInput] = {}
self.__outputs: Dict[str, _GpioOutput] = {}
for (channel, ch_config) in tools.sorted_kvs(config.scheme):
driver = self.__drivers[ch_config.driver]
if ch_config.mode == UserGpioModes.INPUT:
self.__inputs[channel] = _GpioInput(channel, ch_config, driver)
else: # output:
self.__outputs[channel] = _GpioOutput(channel, ch_config, driver, self.__notifier)
async def get_model(self) -> Dict:
return {
"scheme": {
"inputs": {channel: gin.get_scheme() for (channel, gin) in self.__inputs.items()},
"outputs": {channel: gout.get_scheme() for (channel, gout) in self.__outputs.items()},
},
"view": self.__make_view(),
}
async def get_state(self) -> Dict:
return {
"inputs": {channel: gin.get_state() for (channel, gin) in self.__inputs.items()},
"outputs": {channel: gout.get_state() for (channel, gout) in self.__outputs.items()},
}
async def poll_state(self) -> AsyncGenerator[Dict, None]:
prev_state: Dict = {}
while True:
state = await self.get_state()
if state != prev_state:
yield state
prev_state = state
await self.__notifier.wait()
def sysprep(self) -> None:
get_logger().info("Preparing User-GPIO drivers ...")
for (_, driver) in tools.sorted_kvs(self.__drivers):
driver.prepare()
async def systask(self) -> None:
get_logger(0).info("Running User-GPIO drivers ...")
await asyncio.gather(*[
driver.run()
for (_, driver) in tools.sorted_kvs(self.__drivers)
])
async def cleanup(self) -> None:
for driver in self.__drivers.values():
try:
driver.cleanup()
except Exception:
get_logger().exception("Can't cleanup driver %s", driver)
async def switch(self, channel: str, state: bool, wait: bool) -> None:
gout = self.__outputs.get(channel)
if gout is None:
raise GpioChannelNotFoundError()
await gout.switch(state, wait)
async def pulse(self, channel: str, delay: float, wait: bool) -> None:
gout = self.__outputs.get(channel)
if gout is None:
raise GpioChannelNotFoundError()
await gout.pulse(delay, wait)
# =====
def __make_view(self) -> Dict:
table: List[Optional[List[Dict]]] = []
for row in self.__view["table"]:
if len(row) == 0:
table.append(None)
continue
items: List[Dict] = []
for item in map(str.strip, row):
if item.startswith("#") or len(item) == 0:
items.append(self.__make_view_label(item))
else:
parts = list(map(str.strip, item.split("|", 2)))
if parts:
if parts[0] in self.__inputs:
items.append(self.__make_view_input(parts))
elif parts[0] in self.__outputs:
items.append(self.__make_view_output(parts))
table.append(items)
return {
"header": self.__view["header"],
"table": table,
}
def __make_view_label(self, item: str) -> Dict:
assert item.startswith("#")
return {
"type": "label",
"text": item[1:].strip(),
}
def __make_view_input(self, parts: List[str]) -> Dict:
assert len(parts) >= 1
color = (parts[1] if len(parts) > 1 else None)
if color not in ["green", "yellow", "red"]:
color = "green"
return {
"type": UserGpioModes.INPUT,
"channel": parts[0],
"color": color,
}
def __make_view_output(self, parts: List[str]) -> Dict:
assert len(parts) >= 1
confirm = False
text = "Click"
if len(parts) == 2:
text = parts[1]
elif len(parts) == 3:
confirm = (parts[1] == "confirm")
text = parts[2]
return {
"type": UserGpioModes.OUTPUT,
"channel": parts[0],
"confirm": confirm,
"text": text,
}