mirror of
https://github.com/mofeng-git/One-KVM.git
synced 2025-12-12 17:20:30 +08:00
cleanup on driver side
This commit is contained in:
parent
04284584fe
commit
170ed92bd4
@ -117,7 +117,6 @@ class _GpioOutput: # pylint: disable=too-many-instance-attributes
|
|||||||
self.__channel = channel
|
self.__channel = channel
|
||||||
self.__pin: int = config.pin
|
self.__pin: int = config.pin
|
||||||
self.__inverted: bool = config.inverted
|
self.__inverted: bool = config.inverted
|
||||||
self.__initial: Optional[bool] = config.initial
|
|
||||||
|
|
||||||
self.__switch: bool = config.switch
|
self.__switch: bool = config.switch
|
||||||
|
|
||||||
@ -160,13 +159,6 @@ class _GpioOutput: # pylint: disable=too-many-instance-attributes
|
|||||||
"busy": busy,
|
"busy": busy,
|
||||||
}
|
}
|
||||||
|
|
||||||
def cleanup(self) -> None:
|
|
||||||
if self.__initial is not None:
|
|
||||||
try:
|
|
||||||
self.__driver.write(self.__pin, (self.__initial ^ self.__inverted))
|
|
||||||
except Exception:
|
|
||||||
get_logger().exception("Can't cleanup %s", self)
|
|
||||||
|
|
||||||
async def switch(self, state: bool) -> bool:
|
async def switch(self, state: bool) -> bool:
|
||||||
if not self.__switch:
|
if not self.__switch:
|
||||||
raise GpioSwitchNotSupported()
|
raise GpioSwitchNotSupported()
|
||||||
@ -278,8 +270,6 @@ class UserGpio:
|
|||||||
])
|
])
|
||||||
|
|
||||||
async def cleanup(self) -> None:
|
async def cleanup(self) -> None:
|
||||||
for gout in self.__outputs.values():
|
|
||||||
gout.cleanup()
|
|
||||||
for driver in self.__drivers.values():
|
for driver in self.__drivers.values():
|
||||||
try:
|
try:
|
||||||
driver.cleanup()
|
driver.cleanup()
|
||||||
|
|||||||
@ -74,7 +74,7 @@ class BaseUserGpioDriver(BasePlugin):
|
|||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
def cleanup(self) -> None:
|
def cleanup(self) -> None:
|
||||||
pass
|
raise NotImplementedError
|
||||||
|
|
||||||
def read(self, pin: int) -> bool:
|
def read(self, pin: int) -> bool:
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|||||||
@ -83,6 +83,11 @@ class Plugin(BaseUserGpioDriver):
|
|||||||
assert self.__reader
|
assert self.__reader
|
||||||
await self.__reader.poll()
|
await self.__reader.poll()
|
||||||
|
|
||||||
|
def cleanup(self) -> None:
|
||||||
|
for (pin, initial) in self.__output_pins.items():
|
||||||
|
if initial is not None:
|
||||||
|
gpio.write(pin, initial)
|
||||||
|
|
||||||
def read(self, pin: int) -> bool:
|
def read(self, pin: int) -> bool:
|
||||||
return gpio.read(pin)
|
return gpio.read(pin)
|
||||||
|
|
||||||
|
|||||||
@ -81,19 +81,19 @@ class Plugin(BaseUserGpioDriver):
|
|||||||
|
|
||||||
def prepare(self) -> None:
|
def prepare(self) -> None:
|
||||||
logger = get_logger(0)
|
logger = get_logger(0)
|
||||||
logger.info("Initializing %s ...", self)
|
logger.info("Probing driver %s ...", self)
|
||||||
try:
|
try:
|
||||||
for (pid, state) in self.__initials.items():
|
with self.__ensure_device("probing"):
|
||||||
if state is not None:
|
pass
|
||||||
self.write(pid, state)
|
except Exception as err:
|
||||||
except Exception:
|
logger.error("Can't probe %s: %s: %s", self, type(err).__name__, err)
|
||||||
logger.exception("Can't perform first initialization of %s", self)
|
self.__reset_pins()
|
||||||
|
|
||||||
async def run(self) -> None:
|
async def run(self) -> None:
|
||||||
prev_raw = -1
|
prev_raw = -1
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
raw = self.__read_raw()
|
raw = self.__inner_read_raw()
|
||||||
except Exception:
|
except Exception:
|
||||||
raw = -1
|
raw = -1
|
||||||
if raw != prev_raw:
|
if raw != prev_raw:
|
||||||
@ -102,29 +102,50 @@ class Plugin(BaseUserGpioDriver):
|
|||||||
await asyncio.sleep(self.__state_poll)
|
await asyncio.sleep(self.__state_poll)
|
||||||
|
|
||||||
def cleanup(self) -> None:
|
def cleanup(self) -> None:
|
||||||
|
self.__reset_pins()
|
||||||
self.__close_device()
|
self.__close_device()
|
||||||
self.__stop = True
|
self.__stop = True
|
||||||
|
|
||||||
def read(self, pin: int) -> bool:
|
def read(self, pin: int) -> bool:
|
||||||
if self.__check_pin(pin):
|
|
||||||
try:
|
try:
|
||||||
return bool(self.__read_raw() & (1 << pin))
|
return self.__inner_read(pin)
|
||||||
except Exception:
|
except Exception:
|
||||||
raise GpioDriverOfflineError(self) from None
|
raise GpioDriverOfflineError(self)
|
||||||
return False
|
|
||||||
|
|
||||||
def write(self, pin: int, state: bool) -> None:
|
def write(self, pin: int, state: bool) -> None:
|
||||||
if self.__check_pin(pin):
|
|
||||||
try:
|
try:
|
||||||
|
return self.__inner_write(pin, state)
|
||||||
|
except Exception:
|
||||||
|
raise GpioDriverOfflineError(self)
|
||||||
|
|
||||||
|
# =====
|
||||||
|
|
||||||
|
def __reset_pins(self) -> None:
|
||||||
|
logger = get_logger(0)
|
||||||
|
for (pin, state) in self.__initials.items():
|
||||||
|
if state is not None:
|
||||||
|
logger.info("Resetting pin=%d to state=%d of %s: ...", pin, state, self)
|
||||||
|
try:
|
||||||
|
self.__inner_write(pin, state)
|
||||||
|
except Exception as err:
|
||||||
|
logger.error("Can't reset pin=%d of %s: %s: %s", pin, self, type(err).__name__, err)
|
||||||
|
|
||||||
|
def __inner_read(self, pin: int) -> bool:
|
||||||
|
if self.__check_pin(pin):
|
||||||
|
return bool(self.__inner_read_raw() & (1 << pin))
|
||||||
|
return False
|
||||||
|
|
||||||
|
def __inner_read_raw(self) -> int:
|
||||||
|
with self.__ensure_device("reading") as device:
|
||||||
|
return device.get_feature_report(1, 8)[7]
|
||||||
|
|
||||||
|
def __inner_write(self, pin: int, state: bool) -> None:
|
||||||
|
if self.__check_pin(pin):
|
||||||
with self.__ensure_device("writing") as device:
|
with self.__ensure_device("writing") as device:
|
||||||
report = [(0xFF if state else 0xFD), pin + 1] # Pin numeration starts from 0
|
report = [(0xFF if state else 0xFD), pin + 1] # Pin numeration starts from 0
|
||||||
result = device.send_feature_report(report)
|
result = device.send_feature_report(report)
|
||||||
if result < 0:
|
if result < 0:
|
||||||
raise RuntimeError(f"Retval of send_feature_report() < 0: {result}")
|
raise RuntimeError(f"Retval of send_feature_report() < 0: {result}")
|
||||||
except Exception:
|
|
||||||
raise GpioDriverOfflineError(self) from None
|
|
||||||
|
|
||||||
# =====
|
|
||||||
|
|
||||||
def __check_pin(self, pin: int) -> bool:
|
def __check_pin(self, pin: int) -> bool:
|
||||||
ok = (0 <= pin <= 7)
|
ok = (0 <= pin <= 7)
|
||||||
@ -132,10 +153,6 @@ class Plugin(BaseUserGpioDriver):
|
|||||||
get_logger(0).warning("Unsupported pin for %s: %d", self, pin)
|
get_logger(0).warning("Unsupported pin for %s: %d", self, pin)
|
||||||
return ok
|
return ok
|
||||||
|
|
||||||
def __read_raw(self) -> int:
|
|
||||||
with self.__ensure_device("reading") as device:
|
|
||||||
return device.get_feature_report(1, 8)[7]
|
|
||||||
|
|
||||||
@contextlib.contextmanager
|
@contextlib.contextmanager
|
||||||
def __ensure_device(self, context: str) -> hid.device:
|
def __ensure_device(self, context: str) -> hid.device:
|
||||||
assert not self.__stop
|
assert not self.__stop
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user