mirror of
https://github.com/mofeng-git/One-KVM.git
synced 2025-12-12 09:10:30 +08:00
string pins
This commit is contained in:
parent
939c63fe7d
commit
98ad1145a8
@ -30,7 +30,7 @@ import logging.config
|
||||
from typing import Tuple
|
||||
from typing import List
|
||||
from typing import Dict
|
||||
from typing import Set
|
||||
from typing import Type
|
||||
from typing import Optional
|
||||
|
||||
import pygments
|
||||
@ -48,6 +48,7 @@ from ..plugins.atx import get_atx_class
|
||||
from ..plugins.msd import get_msd_class
|
||||
|
||||
from ..plugins.ugpio import UserGpioModes
|
||||
from ..plugins.ugpio import BaseUserGpioDriver
|
||||
from ..plugins.ugpio import get_ugpio_driver_class
|
||||
|
||||
from ..yamlconf import ConfigError
|
||||
@ -100,7 +101,6 @@ from ..validators.ugpio import valid_ugpio_mode
|
||||
from ..validators.ugpio import valid_ugpio_view_table
|
||||
|
||||
from ..validators.hw import valid_tty_speed
|
||||
from ..validators.hw import valid_gpio_pin
|
||||
from ..validators.hw import valid_otg_gadget
|
||||
from ..validators.hw import valid_otg_id
|
||||
from ..validators.hw import valid_otg_ethernet
|
||||
@ -267,7 +267,7 @@ def _patch_dynamic( # pylint: disable=too-many-locals
|
||||
|
||||
if load_gpio:
|
||||
driver: str
|
||||
drivers: Dict[str, Set[str]] = {} # Name to modes
|
||||
drivers: Dict[str, Type[BaseUserGpioDriver]] = {} # Name to drivers
|
||||
for (driver, params) in { # type: ignore
|
||||
"__gpio__": {},
|
||||
**tools.rget(raw_config, "kvmd", "gpio", "drivers"),
|
||||
@ -277,7 +277,7 @@ def _patch_dynamic( # pylint: disable=too-many-locals
|
||||
|
||||
driver_type = valid_stripped_string_not_empty(params.get("type", "gpio"))
|
||||
driver_class = get_ugpio_driver_class(driver_type)
|
||||
drivers[driver] = driver_class.get_modes()
|
||||
drivers[driver] = driver_class
|
||||
scheme["kvmd"]["gpio"]["drivers"][driver] = {
|
||||
"type": Option(driver_type, type=valid_stripped_string_not_empty),
|
||||
**driver_class.get_plugin_options()
|
||||
@ -294,12 +294,12 @@ def _patch_dynamic( # pylint: disable=too-many-locals
|
||||
|
||||
mode: str = params.get("mode", "")
|
||||
with manual_validated(mode, *path, channel, "mode"):
|
||||
mode = valid_ugpio_mode(mode, drivers[driver])
|
||||
mode = valid_ugpio_mode(mode, drivers[driver].get_modes())
|
||||
|
||||
scheme["kvmd"]["gpio"]["scheme"][channel] = {
|
||||
"driver": Option("__gpio__", type=functools.partial(valid_ugpio_driver, variants=set(drivers))),
|
||||
"pin": Option(-1, type=valid_gpio_pin),
|
||||
"mode": Option("", type=functools.partial(valid_ugpio_mode, variants=drivers[driver])),
|
||||
"pin": Option(None, type=drivers[driver].get_pin_validator()),
|
||||
"mode": Option("", type=functools.partial(valid_ugpio_mode, variants=drivers[driver].get_modes())),
|
||||
"inverted": Option(False, type=valid_bool),
|
||||
**({
|
||||
"busy_delay": Option(0.2, type=valid_float_f01),
|
||||
|
||||
@ -77,7 +77,7 @@ class _GpioInput:
|
||||
) -> None:
|
||||
|
||||
self.__channel = channel
|
||||
self.__pin: int = config.pin
|
||||
self.__pin: str = config.pin
|
||||
self.__inverted: bool = config.inverted
|
||||
|
||||
self.__driver = driver
|
||||
@ -118,7 +118,7 @@ class _GpioOutput: # pylint: disable=too-many-instance-attributes
|
||||
) -> None:
|
||||
|
||||
self.__channel = channel
|
||||
self.__pin: int = config.pin
|
||||
self.__pin: str = config.pin
|
||||
self.__inverted: bool = config.inverted
|
||||
|
||||
self.__switch: bool = config.switch
|
||||
|
||||
@ -22,6 +22,7 @@
|
||||
|
||||
from typing import Set
|
||||
from typing import Type
|
||||
from typing import Callable
|
||||
from typing import Optional
|
||||
from typing import Any
|
||||
|
||||
@ -51,7 +52,6 @@ class GpioDriverOfflineError(GpioOperationError):
|
||||
class UserGpioModes:
|
||||
INPUT = "input"
|
||||
OUTPUT = "output"
|
||||
|
||||
ALL = set([INPUT, OUTPUT])
|
||||
|
||||
|
||||
@ -74,11 +74,15 @@ class BaseUserGpioDriver(BasePlugin):
|
||||
def get_modes(cls) -> Set[str]:
|
||||
return set(UserGpioModes.ALL)
|
||||
|
||||
def register_input(self, pin: int, debounce: float) -> None:
|
||||
@classmethod
|
||||
def get_pin_validator(cls) -> Callable[[Any], str]:
|
||||
raise NotImplementedError
|
||||
|
||||
def register_input(self, pin: str, debounce: float) -> None:
|
||||
_ = pin
|
||||
_ = debounce
|
||||
|
||||
def register_output(self, pin: int, initial: Optional[bool]) -> None:
|
||||
def register_output(self, pin: str, initial: Optional[bool]) -> None:
|
||||
_ = pin
|
||||
_ = initial
|
||||
|
||||
@ -91,10 +95,10 @@ class BaseUserGpioDriver(BasePlugin):
|
||||
async def cleanup(self) -> None:
|
||||
pass
|
||||
|
||||
async def read(self, pin: int) -> bool:
|
||||
async def read(self, pin: str) -> bool:
|
||||
raise NotImplementedError
|
||||
|
||||
async def write(self, pin: int, state: bool) -> None:
|
||||
async def write(self, pin: str, state: bool) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
|
||||
@ -28,7 +28,9 @@ import time
|
||||
|
||||
from typing import Tuple
|
||||
from typing import Dict
|
||||
from typing import Callable
|
||||
from typing import Optional
|
||||
from typing import Any
|
||||
|
||||
import serial
|
||||
|
||||
@ -85,6 +87,10 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
|
||||
"protocol": Option(1, type=functools.partial(valid_number, min=1, max=2)),
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def get_pin_validator(cls) -> Callable[[Any], str]:
|
||||
return (lambda arg: str(valid_number(arg, min=0, max=3, name="Ezcoo channel")))
|
||||
|
||||
def prepare(self) -> None:
|
||||
assert self.__proc is None
|
||||
self.__proc = multiprocessing.Process(target=self.__serial_worker, daemon=True)
|
||||
@ -105,16 +111,16 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
|
||||
if self.__proc.is_alive() or self.__proc.exitcode is not None:
|
||||
self.__proc.join()
|
||||
|
||||
async def read(self, pin: int) -> bool:
|
||||
async def read(self, pin: str) -> bool:
|
||||
if not self.__is_online():
|
||||
raise GpioDriverOfflineError(self)
|
||||
return (self.__channel == pin)
|
||||
return (self.__channel == int(pin))
|
||||
|
||||
async def write(self, pin: int, state: bool) -> None:
|
||||
async def write(self, pin: str, state: bool) -> None:
|
||||
if not self.__is_online():
|
||||
raise GpioDriverOfflineError(self)
|
||||
if state and (0 <= pin <= 3):
|
||||
self.__ctl_queue.put_nowait(pin)
|
||||
if state:
|
||||
self.__ctl_queue.put_nowait(int(pin))
|
||||
|
||||
# =====
|
||||
|
||||
@ -174,9 +180,12 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
|
||||
return (channel, data)
|
||||
|
||||
def __send_channel(self, tty: serial.Serial, channel: int) -> None:
|
||||
# Twice because of ezcoo bugs
|
||||
cmd = (b"SET" if self.__protocol == 1 else b"EZS")
|
||||
tty.write((b"%s OUT1 VS IN%d\n" % (cmd, channel + 1)) * 2)
|
||||
assert 0 <= channel <= 3
|
||||
cmd = b"%s OUT1 VS IN%d\n" % (
|
||||
(b"SET" if self.__protocol == 1 else b"EZS"),
|
||||
channel + 1,
|
||||
)
|
||||
tty.write(cmd * 2) # Twice because of ezcoo bugs
|
||||
tty.flush()
|
||||
|
||||
def __str__(self) -> str:
|
||||
|
||||
@ -21,7 +21,9 @@
|
||||
|
||||
|
||||
from typing import Dict
|
||||
from typing import Callable
|
||||
from typing import Optional
|
||||
from typing import Any
|
||||
|
||||
import gpiod
|
||||
|
||||
@ -31,6 +33,7 @@ from ... import aiogp
|
||||
from ...yamlconf import Option
|
||||
|
||||
from ...validators.os import valid_abs_path
|
||||
from ...validators.hw import valid_gpio_pin
|
||||
|
||||
from . import BaseUserGpioDriver
|
||||
|
||||
@ -63,11 +66,15 @@ class Plugin(BaseUserGpioDriver):
|
||||
"device": Option("/dev/gpiochip0", type=valid_abs_path, unpack_as="device_path"),
|
||||
}
|
||||
|
||||
def register_input(self, pin: int, debounce: float) -> None:
|
||||
self.__input_pins[pin] = aiogp.AioReaderPinParams(False, debounce)
|
||||
@classmethod
|
||||
def get_pin_validator(cls) -> Callable[[Any], str]:
|
||||
return (lambda arg: str(valid_gpio_pin(arg)))
|
||||
|
||||
def register_output(self, pin: int, initial: Optional[bool]) -> None:
|
||||
self.__output_pins[pin] = initial
|
||||
def register_input(self, pin: str, debounce: float) -> None:
|
||||
self.__input_pins[int(pin)] = aiogp.AioReaderPinParams(False, debounce)
|
||||
|
||||
def register_output(self, pin: str, initial: Optional[bool]) -> None:
|
||||
self.__output_pins[int(pin)] = initial
|
||||
|
||||
def prepare(self) -> None:
|
||||
assert self.__reader is None
|
||||
@ -95,14 +102,15 @@ class Plugin(BaseUserGpioDriver):
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
async def read(self, pin: int) -> bool:
|
||||
async def read(self, pin: str) -> bool:
|
||||
assert self.__reader
|
||||
if pin in self.__input_pins:
|
||||
return self.__reader.get(pin)
|
||||
return bool(self.__output_lines[pin].get_value())
|
||||
pin_int = int(pin)
|
||||
if pin_int in self.__input_pins:
|
||||
return self.__reader.get(pin_int)
|
||||
return bool(self.__output_lines[pin_int].get_value())
|
||||
|
||||
async def write(self, pin: int, state: bool) -> None:
|
||||
self.__output_lines[pin].set_value(int(state))
|
||||
async def write(self, pin: str, state: bool) -> None:
|
||||
self.__output_lines[int(pin)].set_value(int(state))
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"GPIO({self._instance_name})"
|
||||
|
||||
@ -25,7 +25,9 @@ import contextlib
|
||||
|
||||
from typing import Dict
|
||||
from typing import Set
|
||||
from typing import Callable
|
||||
from typing import Optional
|
||||
from typing import Any
|
||||
|
||||
import hid
|
||||
|
||||
@ -36,6 +38,7 @@ from ... import aiotools
|
||||
|
||||
from ...yamlconf import Option
|
||||
|
||||
from ...validators.basic import valid_number
|
||||
from ...validators.basic import valid_float_f01
|
||||
from ...validators.os import valid_abs_path
|
||||
|
||||
@ -79,11 +82,12 @@ class Plugin(BaseUserGpioDriver):
|
||||
def get_modes(cls) -> Set[str]:
|
||||
return set([UserGpioModes.OUTPUT])
|
||||
|
||||
def register_input(self, pin: int, debounce: float) -> None:
|
||||
raise RuntimeError(f"Unsupported mode 'input' for pin={pin} on {self}")
|
||||
@classmethod
|
||||
def get_pin_validator(cls) -> Callable[[Any], str]:
|
||||
return (lambda arg: str(valid_number(arg, min=0, max=7, name="HID relay channel")))
|
||||
|
||||
def register_output(self, pin: int, initial: Optional[bool]) -> None:
|
||||
self.__initials[pin] = initial
|
||||
def register_output(self, pin: str, initial: Optional[bool]) -> None:
|
||||
self.__initials[int(pin)] = initial
|
||||
|
||||
def prepare(self) -> None:
|
||||
logger = get_logger(0)
|
||||
@ -113,15 +117,15 @@ class Plugin(BaseUserGpioDriver):
|
||||
self.__close_device()
|
||||
self.__stop = True
|
||||
|
||||
async def read(self, pin: int) -> bool:
|
||||
async def read(self, pin: str) -> bool:
|
||||
try:
|
||||
return self.__inner_read(pin)
|
||||
return self.__inner_read(int(pin))
|
||||
except Exception:
|
||||
raise GpioDriverOfflineError(self)
|
||||
|
||||
async def write(self, pin: int, state: bool) -> None:
|
||||
async def write(self, pin: str, state: bool) -> None:
|
||||
try:
|
||||
return self.__inner_write(pin, state)
|
||||
return self.__inner_write(int(pin), state)
|
||||
except Exception:
|
||||
raise GpioDriverOfflineError(self)
|
||||
|
||||
@ -140,27 +144,20 @@ class Plugin(BaseUserGpioDriver):
|
||||
pin, self, self.__device_path, tools.efmt(err))
|
||||
|
||||
def __inner_read(self, pin: int) -> bool:
|
||||
if self.__check_pin(pin):
|
||||
return bool(self.__inner_read_raw() & (1 << pin))
|
||||
return False
|
||||
assert 0 <= pin <= 7
|
||||
return bool(self.__inner_read_raw() & (1 << pin))
|
||||
|
||||
def __inner_read_raw(self) -> int:
|
||||
with self.__ensure_device("reading") as device:
|
||||
return device.get_feature_report(1, 8)[7]
|
||||
|
||||
def __inner_write(self, pin: int, state: bool) -> None:
|
||||
if self.__check_pin(pin):
|
||||
with self.__ensure_device("writing") as device:
|
||||
report = [(0xFF if state else 0xFD), pin + 1] # Pin numeration starts from 0
|
||||
result = device.send_feature_report(report)
|
||||
if result < 0:
|
||||
raise RuntimeError(f"Retval of send_feature_report() < 0: {result}")
|
||||
|
||||
def __check_pin(self, pin: int) -> bool:
|
||||
ok = (0 <= pin <= 7)
|
||||
if not ok:
|
||||
get_logger(0).warning("Unsupported pin=%d for %s on %s", pin, self, self.__device_path)
|
||||
return ok
|
||||
assert 0 <= pin <= 7
|
||||
with self.__ensure_device("writing") as device:
|
||||
report = [(0xFF if state else 0xFD), pin + 1] # Pin numeration starts from 0
|
||||
result = device.send_feature_report(report)
|
||||
if result < 0:
|
||||
raise RuntimeError(f"Retval of send_feature_report() < 0: {result}")
|
||||
|
||||
@contextlib.contextmanager
|
||||
def __ensure_device(self, context: str) -> hid.device:
|
||||
|
||||
@ -25,7 +25,9 @@ import functools
|
||||
|
||||
from typing import List
|
||||
from typing import Dict
|
||||
from typing import Callable
|
||||
from typing import Optional
|
||||
from typing import Any
|
||||
|
||||
from ...logging import get_logger
|
||||
|
||||
@ -35,6 +37,7 @@ from ... import aioproc
|
||||
|
||||
from ...yamlconf import Option
|
||||
|
||||
from ...validators import check_string_in_list
|
||||
from ...validators.basic import valid_float_f01
|
||||
from ...validators.net import valid_ip_or_host
|
||||
from ...validators.net import valid_port
|
||||
@ -46,12 +49,12 @@ from . import BaseUserGpioDriver
|
||||
|
||||
# =====
|
||||
_OUTPUTS = {
|
||||
1: "on",
|
||||
2: "off",
|
||||
3: "cycle",
|
||||
4: "reset",
|
||||
5: "diag",
|
||||
6: "soft",
|
||||
"1": "on",
|
||||
"2": "off",
|
||||
"3": "cycle",
|
||||
"4": "reset",
|
||||
"5": "diag",
|
||||
"6": "soft",
|
||||
}
|
||||
|
||||
|
||||
@ -108,14 +111,19 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
|
||||
"state_poll": Option(1.0, type=valid_float_f01),
|
||||
}
|
||||
|
||||
def register_input(self, pin: int, debounce: float) -> None:
|
||||
@classmethod
|
||||
def get_pin_validator(cls) -> Callable[[Any], str]:
|
||||
actions = ["0", *_OUTPUTS, "status", *_OUTPUTS.values()]
|
||||
return (lambda arg: check_string_in_list(arg, "IPMI action", actions))
|
||||
|
||||
def register_input(self, pin: str, debounce: float) -> None:
|
||||
_ = debounce
|
||||
if pin != 0:
|
||||
if pin not in ["0", "status"]:
|
||||
raise RuntimeError(f"Unsupported mode 'input' for pin={pin} on {self}")
|
||||
|
||||
def register_output(self, pin: int, initial: Optional[bool]) -> None:
|
||||
def register_output(self, pin: str, initial: Optional[bool]) -> None:
|
||||
_ = initial
|
||||
if pin not in _OUTPUTS:
|
||||
if pin not in [*_OUTPUTS, *_OUTPUTS.values()]:
|
||||
raise RuntimeError(f"Unsupported mode 'output' for pin={pin} on {self}")
|
||||
|
||||
def prepare(self) -> None:
|
||||
@ -131,17 +139,17 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
|
||||
prev = new
|
||||
await asyncio.sleep(self.__state_poll)
|
||||
|
||||
async def read(self, pin: int) -> bool:
|
||||
async def read(self, pin: str) -> bool:
|
||||
if not self.__online:
|
||||
raise GpioDriverOfflineError(self)
|
||||
if pin == 0:
|
||||
if pin == "0":
|
||||
return self.__power
|
||||
return False
|
||||
|
||||
async def write(self, pin: int, state: bool) -> None:
|
||||
async def write(self, pin: str, state: bool) -> None:
|
||||
if not self.__online:
|
||||
raise GpioDriverOfflineError(self)
|
||||
action = _OUTPUTS[pin]
|
||||
action = (_OUTPUTS[pin] if pin.isdigit() else pin)
|
||||
try:
|
||||
proc = await aioproc.log_process(**self.__make_ipmitool_kwargs(action), logger=get_logger(0))
|
||||
if proc.returncode != 0:
|
||||
|
||||
@ -23,6 +23,9 @@
|
||||
import os
|
||||
import asyncio
|
||||
|
||||
from typing import Callable
|
||||
from typing import Any
|
||||
|
||||
from ...logging import get_logger
|
||||
|
||||
from ...inotify import InotifyMask
|
||||
@ -50,6 +53,10 @@ class Plugin(BaseUserGpioDriver):
|
||||
self.__udc = udc
|
||||
self.__driver = ""
|
||||
|
||||
@classmethod
|
||||
def get_pin_validator(cls) -> Callable[[Any], str]:
|
||||
return str
|
||||
|
||||
def prepare(self) -> None:
|
||||
(self.__udc, self.__driver) = usb.find_udc(self.__udc)
|
||||
get_logger().info("Using UDC %s", self.__udc)
|
||||
@ -83,11 +90,11 @@ class Plugin(BaseUserGpioDriver):
|
||||
except Exception:
|
||||
logger.exception("Unexpected OTG-bind watcher error")
|
||||
|
||||
async def read(self, pin: int) -> bool:
|
||||
async def read(self, pin: str) -> bool:
|
||||
_ = pin
|
||||
return os.path.islink(self.__get_driver_path(self.__udc))
|
||||
|
||||
async def write(self, pin: int, state: bool) -> None:
|
||||
async def write(self, pin: str, state: bool) -> None:
|
||||
_ = pin
|
||||
with open(self.__get_driver_path("bind" if state else "unbind"), "w") as ctl_file:
|
||||
ctl_file.write(f"{self.__udc}\n")
|
||||
|
||||
@ -22,8 +22,10 @@
|
||||
|
||||
|
||||
from typing import Dict
|
||||
from typing import Optional
|
||||
from typing import Set
|
||||
from typing import Callable
|
||||
from typing import Optional
|
||||
from typing import Any
|
||||
|
||||
from periphery import PWM
|
||||
|
||||
@ -35,6 +37,7 @@ from ... import aiotools
|
||||
from ...yamlconf import Option
|
||||
|
||||
from ...validators.basic import valid_int_f0
|
||||
from ...validators.hw import valid_gpio_pin
|
||||
|
||||
from . import GpioDriverOfflineError
|
||||
from . import UserGpioModes
|
||||
@ -77,11 +80,12 @@ class Plugin(BaseUserGpioDriver):
|
||||
def get_modes(cls) -> Set[str]:
|
||||
return set([UserGpioModes.OUTPUT])
|
||||
|
||||
def register_input(self, pin: int, debounce: float) -> None:
|
||||
raise RuntimeError(f"Unsupported mode 'input' for pin={pin} on {self}")
|
||||
@classmethod
|
||||
def get_pin_validator(cls) -> Callable[[Any], str]:
|
||||
return (lambda arg: str(valid_gpio_pin(arg)))
|
||||
|
||||
def register_output(self, pin: int, initial: Optional[bool]) -> None:
|
||||
self.__channels[pin] = initial
|
||||
def register_output(self, pin: str, initial: Optional[bool]) -> None:
|
||||
self.__channels[int(pin)] = initial
|
||||
|
||||
def prepare(self) -> None:
|
||||
logger = get_logger(0)
|
||||
@ -106,15 +110,15 @@ class Plugin(BaseUserGpioDriver):
|
||||
get_logger(0).error("Can't cleanup PWM chip %d channel %d: %s",
|
||||
self.__chip, pin, tools.efmt(err))
|
||||
|
||||
async def read(self, pin: int) -> bool:
|
||||
async def read(self, pin: str) -> bool:
|
||||
try:
|
||||
return (self.__pwms[pin].duty_cycle_ns == self.__duty_cycle_push)
|
||||
return (self.__pwms[int(pin)].duty_cycle_ns == self.__duty_cycle_push)
|
||||
except Exception:
|
||||
raise GpioDriverOfflineError(self)
|
||||
|
||||
async def write(self, pin: int, state: bool) -> None:
|
||||
async def write(self, pin: str, state: bool) -> None:
|
||||
try:
|
||||
self.__pwms[pin].duty_cycle_ns = self.__get_duty_cycle(state)
|
||||
self.__pwms[int(pin)].duty_cycle_ns = self.__get_duty_cycle(state)
|
||||
except Exception:
|
||||
raise GpioDriverOfflineError(self)
|
||||
|
||||
|
||||
@ -24,7 +24,9 @@ import asyncio
|
||||
|
||||
from typing import Tuple
|
||||
from typing import Dict
|
||||
from typing import Callable
|
||||
from typing import Optional
|
||||
from typing import Any
|
||||
|
||||
from ...logging import get_logger
|
||||
|
||||
@ -33,6 +35,7 @@ from ... import aiotools
|
||||
|
||||
from ...yamlconf import Option
|
||||
|
||||
from ...validators.basic import valid_number
|
||||
from ...validators.basic import valid_float_f0
|
||||
from ...validators.basic import valid_float_f01
|
||||
from ...validators.net import valid_ip_or_host
|
||||
@ -79,15 +82,9 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
|
||||
"state_poll": Option(10.0, type=valid_float_f01),
|
||||
}
|
||||
|
||||
def register_input(self, pin: int, debounce: float) -> None:
|
||||
if not (0 <= pin < 16):
|
||||
raise RuntimeError(f"Unsupported port number: {pin}")
|
||||
_ = debounce
|
||||
|
||||
def register_output(self, pin: int, initial: Optional[bool]) -> None:
|
||||
if not (0 <= pin < 16):
|
||||
raise RuntimeError(f"Unsupported port number: {pin}")
|
||||
_ = initial
|
||||
@classmethod
|
||||
def get_pin_validator(cls) -> Callable[[Any], str]:
|
||||
return (lambda arg: str(valid_number(arg, min=0, max=15, name="Tesmart channel")))
|
||||
|
||||
async def run(self) -> None:
|
||||
prev_active = -2
|
||||
@ -104,12 +101,13 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
|
||||
async def cleanup(self) -> None:
|
||||
await self.__close_device()
|
||||
|
||||
async def read(self, pin: int) -> bool:
|
||||
return (self.__active == pin)
|
||||
async def read(self, pin: str) -> bool:
|
||||
return (self.__active == int(pin))
|
||||
|
||||
async def write(self, pin: int, state: bool) -> None:
|
||||
async def write(self, pin: str, state: bool) -> None:
|
||||
assert 0 <= pin <= 15
|
||||
if state:
|
||||
await self.__send_command("{:c}{:c}".format(1, pin + 1).encode())
|
||||
await self.__send_command("{:c}{:c}".format(1, int(pin) + 1).encode())
|
||||
await self.__update_notifier.notify()
|
||||
await asyncio.sleep(self.__switch_delay) # Slowdown
|
||||
|
||||
|
||||
@ -24,7 +24,9 @@ import socket
|
||||
import functools
|
||||
|
||||
from typing import Dict
|
||||
from typing import Callable
|
||||
from typing import Optional
|
||||
from typing import Any
|
||||
|
||||
from ...logging import get_logger
|
||||
|
||||
@ -66,11 +68,15 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
|
||||
"mac": Option("", type=valid_mac, if_empty=""),
|
||||
}
|
||||
|
||||
async def read(self, pin: int) -> bool:
|
||||
@classmethod
|
||||
def get_pin_validator(cls) -> Callable[[Any], str]:
|
||||
return str
|
||||
|
||||
async def read(self, pin: str) -> bool:
|
||||
_ = pin
|
||||
return False
|
||||
|
||||
async def write(self, pin: int, state: bool) -> None:
|
||||
async def write(self, pin: str, state: bool) -> None:
|
||||
_ = pin
|
||||
if not state:
|
||||
return
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user