mirror of
https://github.com/mofeng-git/One-KVM.git
synced 2026-01-29 00:51:53 +08:00
only available gpio modes
This commit is contained in:
@@ -22,6 +22,7 @@
|
|||||||
|
|
||||||
import sys
|
import sys
|
||||||
import os
|
import os
|
||||||
|
import functools
|
||||||
import argparse
|
import argparse
|
||||||
import logging
|
import logging
|
||||||
import logging.config
|
import logging.config
|
||||||
@@ -43,6 +44,8 @@ from ..plugins.auth import get_auth_service_class
|
|||||||
from ..plugins.hid import get_hid_class
|
from ..plugins.hid import get_hid_class
|
||||||
from ..plugins.atx import get_atx_class
|
from ..plugins.atx import get_atx_class
|
||||||
from ..plugins.msd import get_msd_class
|
from ..plugins.msd import get_msd_class
|
||||||
|
|
||||||
|
from ..plugins.ugpio import UserGpioModes
|
||||||
from ..plugins.ugpio import get_ugpio_driver_class
|
from ..plugins.ugpio import get_ugpio_driver_class
|
||||||
|
|
||||||
from ..yamlconf import ConfigError
|
from ..yamlconf import ConfigError
|
||||||
@@ -181,29 +184,40 @@ def _patch_dynamic( # pylint: disable=too-many-locals
|
|||||||
rebuild = True
|
rebuild = True
|
||||||
|
|
||||||
if load_gpio:
|
if load_gpio:
|
||||||
drivers: Set[str] = set()
|
driver: str
|
||||||
|
drivers: Dict[str, Set[str]] = {} # Name to modes
|
||||||
for (driver, params) in { # type: ignore
|
for (driver, params) in { # type: ignore
|
||||||
"__gpio__": {},
|
"__gpio__": {},
|
||||||
**tools.rget(raw_config, "kvmd", "gpio", "drivers"),
|
**tools.rget(raw_config, "kvmd", "gpio", "drivers"),
|
||||||
}.items():
|
}.items():
|
||||||
with manual_validated(driver, "kvmd", "gpio", "drivers", "<key>"):
|
with manual_validated(driver, "kvmd", "gpio", "drivers", "<key>"):
|
||||||
driver = valid_ugpio_driver(driver)
|
driver = valid_ugpio_driver(driver)
|
||||||
|
|
||||||
driver_type = valid_stripped_string_not_empty(params.get("type", "gpio"))
|
driver_type = valid_stripped_string_not_empty(params.get("type", "gpio"))
|
||||||
|
driver_class = get_ugpio_driver_class(driver_type)
|
||||||
|
drivers[driver] = driver_class.get_modes()
|
||||||
scheme["kvmd"]["gpio"]["drivers"][driver] = {
|
scheme["kvmd"]["gpio"]["drivers"][driver] = {
|
||||||
"type": Option(driver_type, type=valid_stripped_string_not_empty),
|
"type": Option(driver_type, type=valid_stripped_string_not_empty),
|
||||||
**get_ugpio_driver_class(driver_type).get_plugin_options()
|
**driver_class.get_plugin_options()
|
||||||
}
|
}
|
||||||
drivers.add(driver)
|
|
||||||
|
|
||||||
for (channel, params) in tools.rget(raw_config, "kvmd", "gpio", "scheme").items():
|
path = ("kvmd", "gpio", "scheme")
|
||||||
with manual_validated(channel, "kvmd", "gpio", "scheme", "<key>"):
|
for (channel, params) in tools.rget(raw_config, *path).items():
|
||||||
|
with manual_validated(channel, *path, "<key>"):
|
||||||
channel = valid_ugpio_channel(channel)
|
channel = valid_ugpio_channel(channel)
|
||||||
with manual_validated(params.get("mode", ""), "kvmd", "gpio", "scheme", channel, "mode"):
|
|
||||||
mode = valid_ugpio_mode(params.get("mode", ""))
|
driver = params.get("driver", "__gpio__")
|
||||||
|
with manual_validated(driver, *path, channel, "driver"):
|
||||||
|
driver = valid_ugpio_driver(driver, set(drivers))
|
||||||
|
|
||||||
|
mode: str = params.get("mode", "")
|
||||||
|
with manual_validated(mode, *path, channel, "mode"):
|
||||||
|
mode = valid_ugpio_mode(mode, drivers[driver])
|
||||||
|
|
||||||
scheme["kvmd"]["gpio"]["scheme"][channel] = {
|
scheme["kvmd"]["gpio"]["scheme"][channel] = {
|
||||||
"driver": Option("__gpio__", type=(lambda arg: valid_ugpio_driver(arg, drivers))),
|
"driver": Option("__gpio__", type=functools.partial(valid_ugpio_driver, variants=set(drivers))),
|
||||||
"pin": Option(-1, type=valid_gpio_pin),
|
"pin": Option(-1, type=valid_gpio_pin),
|
||||||
"mode": Option("", type=valid_ugpio_mode),
|
"mode": Option("", type=functools.partial(valid_ugpio_mode, variants=drivers[driver])),
|
||||||
"inverted": Option(False, type=valid_bool),
|
"inverted": Option(False, type=valid_bool),
|
||||||
**({
|
**({
|
||||||
"busy_delay": Option(0.2, type=valid_float_f01),
|
"busy_delay": Option(0.2, type=valid_float_f01),
|
||||||
@@ -214,7 +228,7 @@ def _patch_dynamic( # pylint: disable=too-many-locals
|
|||||||
"min_delay": Option(0.1, type=valid_float_f01),
|
"min_delay": Option(0.1, type=valid_float_f01),
|
||||||
"max_delay": Option(0.1, type=valid_float_f01),
|
"max_delay": Option(0.1, type=valid_float_f01),
|
||||||
},
|
},
|
||||||
} if mode == "output" else {})
|
} if mode == UserGpioModes.OUTPUT else {})
|
||||||
}
|
}
|
||||||
|
|
||||||
rebuild = True
|
rebuild = True
|
||||||
|
|||||||
@@ -60,12 +60,6 @@ def _clear_gpio(config: Section) -> None:
|
|||||||
|
|
||||||
("streamer/cap", config.streamer.cap_pin),
|
("streamer/cap", config.streamer.cap_pin),
|
||||||
("streamer/conv", config.streamer.conv_pin),
|
("streamer/conv", config.streamer.conv_pin),
|
||||||
|
|
||||||
# *([
|
|
||||||
# (f"gpio/{channel}", params.pin)
|
|
||||||
# for (channel, params) in config.gpio.scheme.items()
|
|
||||||
# if params.mode == "output"
|
|
||||||
# ]),
|
|
||||||
]:
|
]:
|
||||||
if pin >= 0:
|
if pin >= 0:
|
||||||
logger.info("Writing 0 to GPIO pin=%d (%s)", pin, name)
|
logger.info("Writing 0 to GPIO pin=%d (%s)", pin, name)
|
||||||
|
|||||||
@@ -32,6 +32,8 @@ from .... import tools
|
|||||||
|
|
||||||
from ....plugins.atx import BaseAtx
|
from ....plugins.atx import BaseAtx
|
||||||
|
|
||||||
|
from ....plugins.ugpio import UserGpioModes
|
||||||
|
|
||||||
from ..info import InfoManager
|
from ..info import InfoManager
|
||||||
from ..ugpio import UserGpio
|
from ..ugpio import UserGpio
|
||||||
|
|
||||||
@@ -59,7 +61,7 @@ class ExportApi:
|
|||||||
self.__append_prometheus_rows(rows, atx_state["enabled"], "pikvm_atx_enabled")
|
self.__append_prometheus_rows(rows, atx_state["enabled"], "pikvm_atx_enabled")
|
||||||
self.__append_prometheus_rows(rows, atx_state["leds"]["power"], "pikvm_atx_power")
|
self.__append_prometheus_rows(rows, atx_state["leds"]["power"], "pikvm_atx_power")
|
||||||
|
|
||||||
for mode in ["input", "output"]:
|
for mode in sorted(UserGpioModes.ALL):
|
||||||
for (channel, ch_state) in gpio_state[f"{mode}s"].items():
|
for (channel, ch_state) in gpio_state[f"{mode}s"].items():
|
||||||
for key in ["online", "state"]:
|
for key in ["online", "state"]:
|
||||||
self.__append_prometheus_rows(rows, ch_state["state"], f"pikvm_gpio_{mode}_{key}_{channel}")
|
self.__append_prometheus_rows(rows, ch_state["state"], f"pikvm_gpio_{mode}_{key}_{channel}")
|
||||||
|
|||||||
@@ -34,6 +34,7 @@ from ...logging import get_logger
|
|||||||
from ...plugins.ugpio import GpioError
|
from ...plugins.ugpio import GpioError
|
||||||
from ...plugins.ugpio import GpioOperationError
|
from ...plugins.ugpio import GpioOperationError
|
||||||
from ...plugins.ugpio import GpioDriverOfflineError
|
from ...plugins.ugpio import GpioDriverOfflineError
|
||||||
|
from ...plugins.ugpio import UserGpioModes
|
||||||
from ...plugins.ugpio import BaseUserGpioDriver
|
from ...plugins.ugpio import BaseUserGpioDriver
|
||||||
from ...plugins.ugpio import get_ugpio_driver_class
|
from ...plugins.ugpio import get_ugpio_driver_class
|
||||||
|
|
||||||
@@ -250,7 +251,7 @@ class UserGpio:
|
|||||||
|
|
||||||
for (channel, ch_config) in tools.sorted_kvs(config.scheme):
|
for (channel, ch_config) in tools.sorted_kvs(config.scheme):
|
||||||
driver = self.__drivers[ch_config.driver]
|
driver = self.__drivers[ch_config.driver]
|
||||||
if ch_config.mode == "input":
|
if ch_config.mode == UserGpioModes.INPUT:
|
||||||
self.__inputs[channel] = _GpioInput(channel, ch_config, driver)
|
self.__inputs[channel] = _GpioInput(channel, ch_config, driver)
|
||||||
else: # output:
|
else: # output:
|
||||||
self.__outputs[channel] = _GpioOutput(channel, ch_config, driver, self.__notifier)
|
self.__outputs[channel] = _GpioOutput(channel, ch_config, driver, self.__notifier)
|
||||||
@@ -331,12 +332,12 @@ class UserGpio:
|
|||||||
if parts:
|
if parts:
|
||||||
if parts[0] in self.__inputs:
|
if parts[0] in self.__inputs:
|
||||||
items.append({
|
items.append({
|
||||||
"type": "input",
|
"type": UserGpioModes.INPUT,
|
||||||
"channel": parts[0],
|
"channel": parts[0],
|
||||||
})
|
})
|
||||||
elif parts[0] in self.__outputs:
|
elif parts[0] in self.__outputs:
|
||||||
items.append({
|
items.append({
|
||||||
"type": "output",
|
"type": UserGpioModes.OUTPUT,
|
||||||
"channel": parts[0],
|
"channel": parts[0],
|
||||||
"text": (parts[1] if len(parts) > 1 else "Click"),
|
"text": (parts[1] if len(parts) > 1 else "Click"),
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -20,6 +20,7 @@
|
|||||||
# ========================================================================== #
|
# ========================================================================== #
|
||||||
|
|
||||||
|
|
||||||
|
from typing import Set
|
||||||
from typing import Type
|
from typing import Type
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
from typing import Any
|
from typing import Any
|
||||||
@@ -46,6 +47,14 @@ class GpioDriverOfflineError(GpioOperationError):
|
|||||||
super().__init__(f"GPIO driver {driver} is offline")
|
super().__init__(f"GPIO driver {driver} is offline")
|
||||||
|
|
||||||
|
|
||||||
|
# =====
|
||||||
|
class UserGpioModes:
|
||||||
|
INPUT = "input"
|
||||||
|
OUTPUT = "output"
|
||||||
|
|
||||||
|
ALL = set([INPUT, OUTPUT])
|
||||||
|
|
||||||
|
|
||||||
# =====
|
# =====
|
||||||
class BaseUserGpioDriver(BasePlugin):
|
class BaseUserGpioDriver(BasePlugin):
|
||||||
def __init__( # pylint: disable=super-init-not-called
|
def __init__( # pylint: disable=super-init-not-called
|
||||||
@@ -61,6 +70,10 @@ class BaseUserGpioDriver(BasePlugin):
|
|||||||
def get_instance_id(self) -> str:
|
def get_instance_id(self) -> str:
|
||||||
return self._instance_name
|
return self._instance_name
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_modes(cls) -> Set[str]:
|
||||||
|
return set(UserGpioModes.ALL)
|
||||||
|
|
||||||
def register_input(self, pin: int) -> None:
|
def register_input(self, pin: int) -> None:
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
|
|||||||
@@ -24,6 +24,7 @@ import asyncio
|
|||||||
import contextlib
|
import contextlib
|
||||||
|
|
||||||
from typing import Dict
|
from typing import Dict
|
||||||
|
from typing import Set
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
import hid
|
import hid
|
||||||
@@ -39,6 +40,7 @@ from ...validators.basic import valid_float_f01
|
|||||||
from ...validators.os import valid_abs_path
|
from ...validators.os import valid_abs_path
|
||||||
|
|
||||||
from . import GpioDriverOfflineError
|
from . import GpioDriverOfflineError
|
||||||
|
from . import UserGpioModes
|
||||||
from . import BaseUserGpioDriver
|
from . import BaseUserGpioDriver
|
||||||
|
|
||||||
|
|
||||||
@@ -73,8 +75,12 @@ class Plugin(BaseUserGpioDriver):
|
|||||||
"state_poll": Option(5.0, type=valid_float_f01),
|
"state_poll": Option(5.0, type=valid_float_f01),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_modes(cls) -> Set[str]:
|
||||||
|
return set([UserGpioModes.OUTPUT])
|
||||||
|
|
||||||
def register_input(self, pin: int) -> None:
|
def register_input(self, pin: int) -> None:
|
||||||
_ = pin
|
raise RuntimeError(f"Unsupported mode 'input' for pin={pin} on {self}")
|
||||||
|
|
||||||
def register_output(self, pin: int, initial: Optional[bool]) -> None:
|
def register_output(self, pin: int, initial: Optional[bool]) -> None:
|
||||||
self.__initials[pin] = initial
|
self.__initials[pin] = initial
|
||||||
@@ -153,7 +159,7 @@ class Plugin(BaseUserGpioDriver):
|
|||||||
def __check_pin(self, pin: int) -> bool:
|
def __check_pin(self, pin: int) -> bool:
|
||||||
ok = (0 <= pin <= 7)
|
ok = (0 <= pin <= 7)
|
||||||
if not ok:
|
if not ok:
|
||||||
get_logger(0).warning("Unsupported pin for %s on %s: %d", self, self.__device_path, pin)
|
get_logger(0).warning("Unsupported pin=%d for %s on %s", pin, self, self.__device_path)
|
||||||
return ok
|
return ok
|
||||||
|
|
||||||
@contextlib.contextmanager
|
@contextlib.contextmanager
|
||||||
|
|||||||
@@ -116,8 +116,8 @@ def valid_ugpio_channel(arg: Any) -> str:
|
|||||||
return check_len(check_re_match(arg, name, r"^[a-zA-Z_][a-zA-Z0-9_-]*$"), name, 255)
|
return check_len(check_re_match(arg, name, r"^[a-zA-Z_][a-zA-Z0-9_-]*$"), name, 255)
|
||||||
|
|
||||||
|
|
||||||
def valid_ugpio_mode(arg: Any) -> str:
|
def valid_ugpio_mode(arg: Any, variants: Set[str]) -> str:
|
||||||
return check_string_in_list(arg, "GPIO mode", ["input", "output"])
|
return check_string_in_list(arg, "GPIO driver's pin mode", variants)
|
||||||
|
|
||||||
|
|
||||||
def valid_ugpio_view_table(arg: Any) -> List[List[str]]:
|
def valid_ugpio_view_table(arg: Any) -> List[List[str]]:
|
||||||
|
|||||||
@@ -44,6 +44,8 @@ from kvmd.validators.kvm import valid_ugpio_channel
|
|||||||
from kvmd.validators.kvm import valid_ugpio_mode
|
from kvmd.validators.kvm import valid_ugpio_mode
|
||||||
from kvmd.validators.kvm import valid_ugpio_view_table
|
from kvmd.validators.kvm import valid_ugpio_view_table
|
||||||
|
|
||||||
|
from kvmd.plugins.ugpio import UserGpioModes
|
||||||
|
|
||||||
|
|
||||||
# =====
|
# =====
|
||||||
@pytest.mark.parametrize("arg", ["ON ", "OFF ", "OFF_HARD ", "RESET_HARD "])
|
@pytest.mark.parametrize("arg", ["ON ", "OFF ", "OFF_HARD ", "RESET_HARD "])
|
||||||
@@ -254,13 +256,13 @@ def test_fail__valid_ugpio_driver_variants(arg: Any) -> None:
|
|||||||
# =====
|
# =====
|
||||||
@pytest.mark.parametrize("arg", ["Input ", " OUTPUT "])
|
@pytest.mark.parametrize("arg", ["Input ", " OUTPUT "])
|
||||||
def test_ok__valid_ugpio_mode(arg: Any) -> None:
|
def test_ok__valid_ugpio_mode(arg: Any) -> None:
|
||||||
assert valid_ugpio_mode(arg) == arg.strip().lower()
|
assert valid_ugpio_mode(arg, UserGpioModes.ALL) == arg.strip().lower()
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("arg", ["test", "", None])
|
@pytest.mark.parametrize("arg", ["test", "", None])
|
||||||
def test_fail__valid_ugpio_mode(arg: Any) -> None:
|
def test_fail__valid_ugpio_mode(arg: Any) -> None:
|
||||||
with pytest.raises(ValidatorError):
|
with pytest.raises(ValidatorError):
|
||||||
print(valid_ugpio_mode(arg))
|
print(valid_ugpio_mode(arg, UserGpioModes.ALL))
|
||||||
|
|
||||||
|
|
||||||
# =====
|
# =====
|
||||||
|
|||||||
Reference in New Issue
Block a user