This commit is contained in:
Devaev Maxim
2020-09-03 06:51:02 +03:00
parent 5307765399
commit 68ab7ce33c
11 changed files with 305 additions and 15 deletions

View File

@@ -187,6 +187,7 @@ def _patch_dynamic( # pylint: disable=too-many-locals
}
if mode == "output":
ch_scheme.update({
"busy_delay": Option(0.2, type=valid_float_f01),
"initial": Option(False, type=valid_bool),
"switch": Option(True, type=valid_bool),
"pulse": {
@@ -328,8 +329,7 @@ def _get_config_scheme() -> Dict:
"scheme": {}, # Dymanic content
"view": {
"header": {
"title": Option("Switches"),
"leds": Option([], type=valid_string_list),
"title": Option("GPIO"),
},
"table": Option([], type=valid_ugpio_view_table),
},

View File

@@ -44,8 +44,7 @@ class UserGpioApi:
@exposed_http("GET", "/gpio")
async def __state_handler(self, _: Request) -> Response:
return make_json_response({
"scheme": (await self.__user_gpio.get_scheme()),
"view": (await self.__user_gpio.get_view()),
"model": (await self.__user_gpio.get_model()),
"state": (await self.__user_gpio.get_state()),
})

View File

@@ -243,8 +243,7 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
await client.ws.prepare(request)
await self.__register_ws_client(client)
try:
await self.__broadcast_event("gpio_scheme_state", await self.__user_gpio.get_scheme())
await self.__broadcast_event("gpio_view_state", await self.__user_gpio.get_view())
await self.__broadcast_event("gpio_model_state", await self.__user_gpio.get_model())
await asyncio.gather(*[
self.__broadcast_event(component.event_type, await component.get_state())
for component in self.__components

View File

@@ -23,6 +23,7 @@
import asyncio
import operator
from typing import List
from typing import Dict
from typing import AsyncGenerator
from typing import Optional
@@ -88,6 +89,7 @@ class _GpioOutput: # pylint: disable=too-many-instance-attributes
self.__pulse_delay: float = config.pulse.delay
self.__min_pulse_delay: float = config.pulse.min_delay
self.__max_pulse_delay: float = config.pulse.max_delay
self.__busy_delay: float = config.busy_delay
self.__state = config.initial
self.__region = aiotools.AioExclusiveRegion(GpioChannelIsBusyError, notifier)
@@ -97,8 +99,8 @@ class _GpioOutput: # pylint: disable=too-many-instance-attributes
"switch": self.__switch,
"pulse": {
"delay": self.__pulse_delay,
"min_delay": self.__min_pulse_delay,
"max_delay": self.__max_pulse_delay,
"min_delay": (self.__min_pulse_delay if self.__pulse_delay else 0),
"max_delay": (self.__max_pulse_delay if self.__pulse_delay else 0),
},
}
@@ -125,8 +127,10 @@ class _GpioOutput: # pylint: disable=too-many-instance-attributes
self.__write(state)
self.__state = state
get_logger(0).info("Switched GPIO %s to %d", self, state)
await asyncio.sleep(self.__busy_delay)
return True
self.__state = real_state
await asyncio.sleep(self.__busy_delay)
return False
@aiotools.atomic
@@ -146,7 +150,7 @@ class _GpioOutput: # pylint: disable=too-many-instance-attributes
await asyncio.sleep(delay)
finally:
self.__write(False)
await asyncio.sleep(1)
await asyncio.sleep(self.__busy_delay)
get_logger(0).info("Pulsed GPIO %s", self)
def __read(self) -> bool:
@@ -185,15 +189,15 @@ class UserGpio:
else: # output:
self.__outputs[channel] = _GpioOutput(channel, ch_config, self.__state_notifier)
async def get_scheme(self) -> Dict:
async def get_model(self) -> Dict:
return {
"inputs": {channel: gin.get_scheme() for (channel, gin) in self.__inputs.items()},
"outputs": {channel: gout.get_scheme() for (channel, gout) in self.__outputs.items()},
"scheme": {
"inputs": {channel: gin.get_scheme() for (channel, gin) in self.__inputs.items()},
"outputs": {channel: gout.get_scheme() for (channel, gout) in self.__outputs.items()},
},
"view": self.__make_view(),
}
async def get_view(self) -> Dict:
return self.__view
async def get_state(self) -> Dict:
return {
"inputs": {channel: gin.get_state() for (channel, gin) in self.__inputs.items()},
@@ -240,3 +244,37 @@ class UserGpio:
if gout is None:
raise GpioChannelNotFoundError()
await gout.pulse(delay)
# =====
def __make_view(self) -> Dict:
table: List[Optional[List[Dict]]] = []
for row in self.__view["table"]:
if len(row) == 0:
table.append(None)
continue
items: List[Dict] = []
for item in map(str.strip, row):
if item.startswith("#") or len(item) == 0:
items.append({
"type": "label",
"text": item[1:].strip(),
})
elif (parts := list(map(str.strip, item.split(",", 1)))):
if parts[0] in self.__inputs:
items.append({
"type": "input",
"channel": parts[0],
})
elif parts[0] in self.__outputs:
items.append({
"type": "output",
"channel": parts[0],
"text": (parts[1] if len(parts) > 1 else "Click"),
})
table.append(items)
return {
"header": self.__view["header"],
"table": table,
}