mirror of
https://github.com/mofeng-git/One-KVM.git
synced 2026-01-29 00:51:53 +08:00
gpio view and refactoring
This commit is contained in:
@@ -78,10 +78,11 @@ from ..validators.kvm import valid_stream_fps
|
||||
from ..validators.kvm import valid_stream_resolution
|
||||
from ..validators.kvm import valid_hid_key
|
||||
from ..validators.kvm import valid_hid_mouse_move
|
||||
from ..validators.kvm import valid_ugpio_mode
|
||||
from ..validators.kvm import valid_ugpio_view_table
|
||||
|
||||
from ..validators.hw import valid_gpio_pin
|
||||
from ..validators.hw import valid_gpio_pin_optional
|
||||
from ..validators.hw import valid_gpio_mode
|
||||
from ..validators.hw import valid_otg_gadget
|
||||
from ..validators.hw import valid_otg_id
|
||||
|
||||
@@ -175,13 +176,13 @@ def _patch_dynamic( # pylint: disable=too-many-locals
|
||||
if load_gpio:
|
||||
for (channel, params) in raw_config.get("kvmd", {}).get("gpio", {}).get("scheme", {}).items():
|
||||
try:
|
||||
mode = valid_gpio_mode(params.get("mode", ""))
|
||||
mode = valid_ugpio_mode(params.get("mode", ""))
|
||||
except Exception:
|
||||
pass
|
||||
finally:
|
||||
ch_scheme: Dict = {
|
||||
"pin": Option(-1, type=valid_gpio_pin),
|
||||
"mode": Option("", type=valid_gpio_mode),
|
||||
"mode": Option("", type=valid_ugpio_mode),
|
||||
"inverted": Option(False, type=valid_bool),
|
||||
}
|
||||
if mode == "output":
|
||||
@@ -325,6 +326,13 @@ def _get_config_scheme() -> Dict:
|
||||
"gpio": {
|
||||
"state_poll": Option(0.1, type=valid_float_f01),
|
||||
"scheme": {}, # Dymanic content
|
||||
"view": {
|
||||
"header": {
|
||||
"title": Option("Switches"),
|
||||
"leds": Option([], type=valid_string_list),
|
||||
},
|
||||
"table": Option([], type=valid_ugpio_view_table),
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
|
||||
@@ -26,7 +26,7 @@ from aiohttp.web import Response
|
||||
from ....validators.basic import valid_bool
|
||||
from ....validators.basic import valid_float_f0
|
||||
|
||||
from ....validators.hw import valid_gpio_channel
|
||||
from ....validators.kvm import valid_ugpio_channel
|
||||
|
||||
from ..ugpio import UserGpio
|
||||
|
||||
@@ -45,19 +45,20 @@ class UserGpioApi:
|
||||
async def __state_handler(self, _: Request) -> Response:
|
||||
return make_json_response({
|
||||
"scheme": (await self.__user_gpio.get_scheme()),
|
||||
"view": (await self.__user_gpio.get_view()),
|
||||
"state": (await self.__user_gpio.get_state()),
|
||||
})
|
||||
|
||||
@exposed_http("POST", "/gpio/switch")
|
||||
async def __switch_handler(self, request: Request) -> Response:
|
||||
channel = valid_gpio_channel(request.query.get("channel"))
|
||||
channel = valid_ugpio_channel(request.query.get("channel"))
|
||||
state = valid_bool(request.query.get("state"))
|
||||
done = await self.__user_gpio.switch(channel, state)
|
||||
return make_json_response({"done": done})
|
||||
|
||||
@exposed_http("POST", "/gpio/pulse")
|
||||
async def __pulse_handler(self, request: Request) -> Response:
|
||||
channel = valid_gpio_channel(request.query.get("channel"))
|
||||
channel = valid_ugpio_channel(request.query.get("channel"))
|
||||
delay = valid_float_f0(request.query.get("delay", "0"))
|
||||
await self.__user_gpio.pulse(channel, delay)
|
||||
return make_json_response()
|
||||
|
||||
@@ -244,6 +244,7 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
|
||||
await self.__register_ws_client(client)
|
||||
try:
|
||||
await self.__broadcast_event("gpio_scheme_state", await self.__user_gpio.get_scheme())
|
||||
await self.__broadcast_event("gpio_view_state", await self.__user_gpio.get_view())
|
||||
await asyncio.gather(*[
|
||||
self.__broadcast_event(component.event_type, await component.get_state())
|
||||
for component in self.__components
|
||||
|
||||
@@ -167,6 +167,8 @@ class _GpioOutput: # pylint: disable=too-many-instance-attributes
|
||||
# =====
|
||||
class UserGpio:
|
||||
def __init__(self, config: Section) -> None:
|
||||
self.__view = config.view
|
||||
|
||||
self.__state_notifier = aiotools.AioNotifier()
|
||||
self.__reader = gpio.BatchReader(
|
||||
pins=[gpio.set_input(ch_config.pin) for ch_config in config.scheme.values()],
|
||||
@@ -189,6 +191,9 @@ class UserGpio:
|
||||
"outputs": {channel: gout.get_scheme() for (channel, gout) in self.__outputs.items()},
|
||||
}
|
||||
|
||||
async def get_view(self) -> Dict:
|
||||
return self.__view
|
||||
|
||||
async def get_state(self) -> Dict:
|
||||
return {
|
||||
"inputs": {channel: gin.get_state() for (channel, gin) in self.__inputs.items()},
|
||||
|
||||
@@ -23,7 +23,6 @@
|
||||
from typing import Any
|
||||
|
||||
from . import check_in_list
|
||||
from . import check_string_in_list
|
||||
from . import check_re_match
|
||||
|
||||
from .basic import valid_number
|
||||
@@ -44,14 +43,6 @@ def valid_gpio_pin_optional(arg: Any) -> int:
|
||||
return int(valid_number(arg, min=-1, name="optional GPIO pin"))
|
||||
|
||||
|
||||
def valid_gpio_mode(arg: Any) -> str:
|
||||
return check_string_in_list(arg, "GPIO mode", ["input", "output"])
|
||||
|
||||
|
||||
def valid_gpio_channel(arg: Any) -> str:
|
||||
return check_re_match(arg, "GPIO channel", r"^[a-zA-Z_][a-zA-Z0-9_-]*$")[:255]
|
||||
|
||||
|
||||
def valid_otg_gadget(arg: Any) -> str:
|
||||
return check_re_match(arg, "OTG gadget name", r"^[a-z_][a-z0-9_-]*$")[:255]
|
||||
|
||||
|
||||
@@ -20,12 +20,14 @@
|
||||
# ========================================================================== #
|
||||
|
||||
|
||||
from typing import List
|
||||
from typing import Any
|
||||
|
||||
from ..keyboard.mappings import KEYMAP
|
||||
|
||||
from . import raise_error
|
||||
from . import check_string_in_list
|
||||
from . import check_re_match
|
||||
|
||||
from .basic import valid_stripped_string_not_empty
|
||||
from .basic import valid_number
|
||||
@@ -86,3 +88,19 @@ def valid_hid_mouse_button(arg: Any) -> str:
|
||||
def valid_hid_mouse_wheel(arg: Any) -> int:
|
||||
arg = valid_number(arg, name="HID mouse wheel")
|
||||
return min(max(-127, arg), 127)
|
||||
|
||||
|
||||
# =====
|
||||
def valid_ugpio_mode(arg: Any) -> str:
|
||||
return check_string_in_list(arg, "GPIO mode", ["input", "output"])
|
||||
|
||||
|
||||
def valid_ugpio_channel(arg: Any) -> str:
|
||||
return check_re_match(arg, "GPIO channel", r"^[a-zA-Z_][a-zA-Z0-9_-]*$")[:255]
|
||||
|
||||
|
||||
def valid_ugpio_view_table(arg: Any) -> List[List[str]]:
|
||||
try:
|
||||
return list(map(str, list(arg))) # type: ignore
|
||||
except Exception:
|
||||
raise_error("<skipped>", "GPIO view table")
|
||||
|
||||
Reference in New Issue
Block a user