mirror of
https://github.com/mofeng-git/One-KVM.git
synced 2026-01-29 00:51:53 +08:00
refactoring
This commit is contained in:
@@ -75,19 +75,21 @@ class BaseUserGpioDriver(BasePlugin):
|
|||||||
return set(UserGpioModes.ALL)
|
return set(UserGpioModes.ALL)
|
||||||
|
|
||||||
def register_input(self, pin: int, debounce: float) -> None:
|
def register_input(self, pin: int, debounce: float) -> None:
|
||||||
raise NotImplementedError
|
_ = pin
|
||||||
|
_ = debounce
|
||||||
|
|
||||||
def register_output(self, pin: int, initial: Optional[bool]) -> None:
|
def register_output(self, pin: int, initial: Optional[bool]) -> None:
|
||||||
raise NotImplementedError
|
_ = pin
|
||||||
|
_ = initial
|
||||||
|
|
||||||
def prepare(self) -> None:
|
def prepare(self) -> None:
|
||||||
raise NotImplementedError
|
pass
|
||||||
|
|
||||||
async def run(self) -> None:
|
async def run(self) -> None:
|
||||||
raise NotImplementedError
|
await aiotools.wait_infinite()
|
||||||
|
|
||||||
async def cleanup(self) -> None:
|
async def cleanup(self) -> None:
|
||||||
raise NotImplementedError
|
pass
|
||||||
|
|
||||||
async def read(self, pin: int) -> bool:
|
async def read(self, pin: int) -> bool:
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|||||||
@@ -85,14 +85,6 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
|
|||||||
"protocol": Option(1, type=functools.partial(valid_number, min=1, max=2)),
|
"protocol": Option(1, type=functools.partial(valid_number, min=1, max=2)),
|
||||||
}
|
}
|
||||||
|
|
||||||
def register_input(self, pin: int, debounce: float) -> None:
|
|
||||||
_ = pin
|
|
||||||
_ = debounce
|
|
||||||
|
|
||||||
def register_output(self, pin: int, initial: Optional[bool]) -> None:
|
|
||||||
_ = pin
|
|
||||||
_ = initial
|
|
||||||
|
|
||||||
def prepare(self) -> None:
|
def prepare(self) -> None:
|
||||||
assert self.__proc is None
|
assert self.__proc is None
|
||||||
self.__proc = multiprocessing.Process(target=self.__serial_worker, daemon=True)
|
self.__proc = multiprocessing.Process(target=self.__serial_worker, daemon=True)
|
||||||
|
|||||||
@@ -131,9 +131,6 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
|
|||||||
prev = new
|
prev = new
|
||||||
await asyncio.sleep(self.__state_poll)
|
await asyncio.sleep(self.__state_poll)
|
||||||
|
|
||||||
async def cleanup(self) -> None:
|
|
||||||
pass
|
|
||||||
|
|
||||||
async def read(self, pin: int) -> bool:
|
async def read(self, pin: int) -> bool:
|
||||||
if not self.__online:
|
if not self.__online:
|
||||||
raise GpioDriverOfflineError(self)
|
raise GpioDriverOfflineError(self)
|
||||||
|
|||||||
@@ -23,8 +23,6 @@
|
|||||||
import os
|
import os
|
||||||
import asyncio
|
import asyncio
|
||||||
|
|
||||||
from typing import Optional
|
|
||||||
|
|
||||||
from ...logging import get_logger
|
from ...logging import get_logger
|
||||||
|
|
||||||
from ...inotify import InotifyMask
|
from ...inotify import InotifyMask
|
||||||
@@ -52,14 +50,6 @@ class Plugin(BaseUserGpioDriver):
|
|||||||
self.__udc = udc
|
self.__udc = udc
|
||||||
self.__driver = ""
|
self.__driver = ""
|
||||||
|
|
||||||
def register_input(self, pin: int, debounce: float) -> None:
|
|
||||||
_ = pin
|
|
||||||
_ = debounce
|
|
||||||
|
|
||||||
def register_output(self, pin: int, initial: Optional[bool]) -> None:
|
|
||||||
_ = pin
|
|
||||||
_ = initial
|
|
||||||
|
|
||||||
def prepare(self) -> None:
|
def prepare(self) -> None:
|
||||||
(self.__udc, self.__driver) = usb.find_udc(self.__udc)
|
(self.__udc, self.__driver) = usb.find_udc(self.__udc)
|
||||||
get_logger().info("Using UDC %s", self.__udc)
|
get_logger().info("Using UDC %s", self.__udc)
|
||||||
@@ -93,9 +83,6 @@ class Plugin(BaseUserGpioDriver):
|
|||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("Unexpected OTG-bind watcher error")
|
logger.exception("Unexpected OTG-bind watcher error")
|
||||||
|
|
||||||
async def cleanup(self) -> None:
|
|
||||||
pass
|
|
||||||
|
|
||||||
async def read(self, pin: int) -> bool:
|
async def read(self, pin: int) -> bool:
|
||||||
_ = pin
|
_ = pin
|
||||||
return os.path.islink(self.__get_driver_path(self.__udc))
|
return os.path.islink(self.__get_driver_path(self.__udc))
|
||||||
|
|||||||
@@ -97,9 +97,6 @@ class Plugin(BaseUserGpioDriver):
|
|||||||
logger.error("Can't get PWM chip %d channel %d: %s",
|
logger.error("Can't get PWM chip %d channel %d: %s",
|
||||||
self.__chip, pin, tools.efmt(err))
|
self.__chip, pin, tools.efmt(err))
|
||||||
|
|
||||||
async def run(self) -> None:
|
|
||||||
await aiotools.wait_infinite()
|
|
||||||
|
|
||||||
async def cleanup(self) -> None:
|
async def cleanup(self) -> None:
|
||||||
for (pin, pwm) in self.__pwms.items():
|
for (pin, pwm) in self.__pwms.items():
|
||||||
try:
|
try:
|
||||||
|
|||||||
@@ -89,9 +89,6 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
|
|||||||
raise RuntimeError(f"Unsupported port number: {pin}")
|
raise RuntimeError(f"Unsupported port number: {pin}")
|
||||||
_ = initial
|
_ = initial
|
||||||
|
|
||||||
def prepare(self) -> None:
|
|
||||||
pass
|
|
||||||
|
|
||||||
async def run(self) -> None:
|
async def run(self) -> None:
|
||||||
prev_active = -2
|
prev_active = -2
|
||||||
while True:
|
while True:
|
||||||
|
|||||||
@@ -66,23 +66,6 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
|
|||||||
"mac": Option("", type=valid_mac, if_empty=""),
|
"mac": Option("", type=valid_mac, if_empty=""),
|
||||||
}
|
}
|
||||||
|
|
||||||
def register_input(self, pin: int, debounce: float) -> None:
|
|
||||||
_ = pin
|
|
||||||
_ = debounce
|
|
||||||
|
|
||||||
def register_output(self, pin: int, initial: Optional[bool]) -> None:
|
|
||||||
_ = pin
|
|
||||||
_ = initial
|
|
||||||
|
|
||||||
def prepare(self) -> None:
|
|
||||||
get_logger(0).info("Probing driver %s on MAC %s and %s:%d ...", self, self.__mac, self.__ip, self.__port)
|
|
||||||
|
|
||||||
async def run(self) -> None:
|
|
||||||
await aiotools.wait_infinite()
|
|
||||||
|
|
||||||
async def cleanup(self) -> None:
|
|
||||||
pass
|
|
||||||
|
|
||||||
async def read(self, pin: int) -> bool:
|
async def read(self, pin: int) -> bool:
|
||||||
_ = pin
|
_ = pin
|
||||||
return False
|
return False
|
||||||
|
|||||||
Reference in New Issue
Block a user