refactoring

This commit is contained in:
Maxim Devaev 2022-07-21 01:44:44 +03:00
parent 42c85021f7
commit 508a6e9b58
3 changed files with 30 additions and 31 deletions

View File

@ -44,6 +44,7 @@ depends=(
python-passlib
python-periphery
python-pyserial
python-pyserial-asyncio
python-spidev
python-setproctitle
python-psutil

View File

@ -21,20 +21,15 @@
import asyncio
# At present this requires building a package from AUR:
# https://aur.archlinux.org/packages/python-pyserial-asyncio
# https://wiki.archlinux.org/title/Arch_User_Repository#Installing_and_upgrading_packages
import serial_asyncio
import functools
from typing import Tuple
from typing import Dict
from typing import Callable
from typing import Optional
from typing import Any
import serial_asyncio
from ...logging import get_logger
from ... import tools
@ -61,11 +56,12 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
instance_name: str,
notifier: aiotools.AioNotifier,
mode: int,
host: str,
port: int,
device_path: str,
speed: int,
timeout: float,
switch_delay: float,
state_poll: float,
@ -73,11 +69,12 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
super().__init__(instance_name, notifier)
self.__mode = mode
self.__host = host
self.__port = port
self.__device_path = device_path
self.__speed = speed
self.__timeout = timeout
self.__switch_delay = switch_delay
self.__state_poll = state_poll
@ -90,11 +87,12 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
@classmethod
def get_plugin_options(cls) -> Dict:
return {
"mode": Option(1, type=functools.partial(valid_number, min=1, max=2)),
"host": Option("", type=valid_ip_or_host),
"host": Option("", type=valid_ip_or_host, if_empty=""),
"port": Option(5000, type=valid_port),
"device": Option("", type=valid_abs_path, unpack_as="device_path"),
"device": Option("", type=valid_abs_path, only_if="!host", unpack_as="device_path"),
"speed": Option(9600, type=valid_tty_speed),
"timeout": Option(5.0, type=valid_float_f01),
"switch_delay": Option(1.0, type=valid_float_f0),
"state_poll": Option(10.0, type=valid_float_f01),
@ -136,49 +134,48 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
async def __send_command(self, cmd: bytes) -> int:
assert len(cmd) == 2
(reader, writer) = await self.__ensure_device()
await self.__ensure_device()
assert self.__reader is not None
assert self.__writer is not None
try:
writer.write(b"\xAA\xBB\x03%s\xEE" % (cmd))
await asyncio.wait_for(writer.drain(), timeout=self.__timeout)
return (await asyncio.wait_for(reader.readexactly(6), timeout=self.__timeout))[4]
self.__writer.write(b"\xAA\xBB\x03%s\xEE" % (cmd))
await asyncio.wait_for(self.__writer.drain(), timeout=self.__timeout)
return (await asyncio.wait_for(self.__reader.readexactly(6), timeout=self.__timeout))[4]
except Exception as err:
get_logger(0).error("Can't send command to TESmart KVM [%s]:%d: %s",
self.__host, self.__port, tools.efmt(err))
await self.__close_device()
raise GpioDriverOfflineError(self)
async def __ensure_device_tcpip(self) -> Tuple[asyncio.StreamReader, asyncio.StreamWriter]:
async def __ensure_device(self) -> None:
if self.__reader is None or self.__writer is None:
if self.__host:
await self.__ensure_device_net()
else:
await self.__ensure_device_serial()
async def __ensure_device_net(self) -> None:
try:
(reader, writer) = await asyncio.wait_for(
(self.__reader, self.__writer) = await asyncio.wait_for(
asyncio.open_connection(self.__host, self.__port),
timeout=self.__timeout,
)
return (reader, writer)
except Exception as err:
get_logger(0).error("Can't connect to TESmart KVM [%s]:%d: %s",
self.__host, self.__port, tools.efmt(err))
raise GpioDriverOfflineError(self)
async def __ensure_device_serial(self) -> Tuple[asyncio.StreamReader, asyncio.StreamWriter]:
async def __ensure_device_serial(self) -> None:
try:
(reader, writer) = await asyncio.wait_for(
(self.__reader, self.__writer) = await asyncio.wait_for(
serial_asyncio.open_serial_connection(url=self.__device_path, baudrate=self.__speed),
timeout=self.__timeout,
)
return (reader, writer)
except Exception as err:
get_logger(0).error("Can't connect to TESmart KVM [%s]:%d: %s",
self.__device_path, self.__speed, tools.efmt(err))
raise GpioDriverOfflineError(self)
async def __ensure_device(self) -> Tuple[asyncio.StreamReader, asyncio.StreamWriter]:
if self.__reader is None or self.__writer is None:
if self.__mode == 1:
(self.__reader, self.__writer) = await self.__ensure_devicee_tcpip()
elif self.__mode == 2:
(self.__reader, self.__writer) = await self.__ensure_device_serial()
return (self.__reader, self.__writer)
async def __close_device(self) -> None:
if self.__writer:
await aiotools.close_writer(self.__writer)

View File

@ -1,3 +1,4 @@
pyserial-asyncio
pyghmi
spidev
pyrad