tesmart rewrite

This commit is contained in:
Devaev Maxim 2021-05-16 05:57:08 +03:00
parent 1fc8434f0a
commit 8db0ab20e0
11 changed files with 109 additions and 83 deletions

View File

@ -88,6 +88,19 @@ async def wait_first(*aws: Awaitable) -> Tuple[Set[asyncio.Future], Set[asyncio.
return (await asyncio.wait(list(aws), return_when=asyncio.FIRST_COMPLETED))
# =====
async def close_writer(writer: asyncio.StreamWriter) -> bool:
closing = writer.is_closing()
if not closing:
writer.transport.abort() # type: ignore
writer.close()
try:
await writer.wait_closed()
except Exception:
pass
return (not closing)
# =====
class AioNotifier:
def __init__(self) -> None:

View File

@ -295,7 +295,7 @@ class UserGpio:
async def cleanup(self) -> None:
for driver in self.__drivers.values():
try:
driver.cleanup()
await driver.cleanup()
except Exception:
get_logger().exception("Can't cleanup driver %s", driver)

View File

@ -27,6 +27,8 @@ import struct
from typing import Tuple
from typing import Any
from .... import aiotools
from .errors import RfbConnectionError
@ -35,18 +37,6 @@ def rfb_format_remote(writer: asyncio.StreamWriter) -> str:
return "[%s]:%d" % (writer.transport.get_extra_info("peername")[:2])
async def rfb_close_writer(writer: asyncio.StreamWriter) -> bool:
closing = writer.is_closing()
if not closing:
writer.transport.abort() # type: ignore
writer.close()
try:
await writer.wait_closed()
except Exception:
pass
return (not closing)
class RfbClientStream:
def __init__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
self.__reader = reader
@ -145,4 +135,4 @@ class RfbClientStream:
self.__writer = ssl_writer
async def _close(self) -> None:
await rfb_close_writer(self.__writer)
await aiotools.close_writer(self.__writer)

View File

@ -55,7 +55,6 @@ from ... import aiotools
from .rfb import RfbClient
from .rfb.stream import rfb_format_remote
from .rfb.stream import rfb_close_writer
from .rfb.errors import RfbError
from .vncauth import VncAuthKvmdCredentials
@ -487,7 +486,7 @@ class VncServer: # pylint: disable=too-many-instance-attributes
except Exception:
logger.exception("[entry] %s: Unhandled exception in client task", remote)
finally:
if (await rfb_close_writer(writer)):
if (await aiotools.close_writer(writer)):
logger.info("[entry] %s: Connection is closed in an emergency", remote)
self.__handle_client = handle_client

View File

@ -86,7 +86,7 @@ class BaseUserGpioDriver(BasePlugin):
async def run(self) -> None:
raise NotImplementedError
def cleanup(self) -> None:
async def cleanup(self) -> None:
raise NotImplementedError
async def read(self, pin: int) -> bool:

View File

@ -105,7 +105,7 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
self.__channel = channel
await self._notifier.notify()
def cleanup(self) -> None:
async def cleanup(self) -> None:
if self.__proc is not None:
if self.__proc.is_alive():
get_logger(0).info("Stopping %s daemon ...", self)

View File

@ -88,7 +88,7 @@ class Plugin(BaseUserGpioDriver):
assert self.__reader
await self.__reader.poll()
def cleanup(self) -> None:
async def cleanup(self) -> None:
if self.__chip:
try:
self.__chip.close()

View File

@ -108,7 +108,7 @@ class Plugin(BaseUserGpioDriver):
prev_raw = raw
await asyncio.sleep(self.__state_poll)
def cleanup(self) -> None:
async def cleanup(self) -> None:
self.__reset_pins()
self.__close_device()
self.__stop = True

View File

@ -131,7 +131,7 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
prev = new
await asyncio.sleep(self.__state_poll)
def cleanup(self) -> None:
async def cleanup(self) -> None:
pass
async def read(self, pin: int) -> bool:

View File

@ -97,7 +97,7 @@ class Plugin(BaseUserGpioDriver):
except Exception:
logger.exception("Unexpected OTG-bind watcher error")
def cleanup(self) -> None:
async def cleanup(self) -> None:
pass
async def read(self, pin: int) -> bool:

View File

@ -20,35 +20,26 @@
# ========================================================================== #
import re
import functools
import errno
import time
import asyncio
from typing import Tuple
from typing import Dict
from typing import Optional
import serial
import socket
import binascii
from ...logging import get_logger
from ... import tools
from ... import aiotools
from ... import aiomulti
from ... import aioproc
from ...yamlconf import Option
from ...validators.basic import valid_number
from ...validators.basic import valid_float_f0
from ...validators.basic import valid_float_f01
from ...validators.os import valid_abs_path
from ...validators.hw import valid_tty_speed
from ...validators.net import valid_ip_or_host
from ...validators.net import valid_port
from . import BaseUserGpioDriver
from . import GpioDriverOfflineError
# =====
@ -58,79 +49,112 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
instance_name: str,
notifier: aiotools.AioNotifier,
tesmart_host: str,
tesmart_port: int,
max_ports: int,
host: str,
port: int,
timeout: float,
send_delay: float,
state_poll: float,
) -> None:
super().__init__(instance_name, notifier)
self.__tesmart_host = tesmart_host
self.__tesmart_port = tesmart_port
self.__max_ports = max_ports
self.__switch_state: Dict[int, bool] = {}
self.__tes_socket: socket
self.__host = host
self.__port = port
self.__timeout = timeout
self.__send_delay = send_delay
self.__state_poll = state_poll
self.__reader: Optional[asyncio.StreamReader] = None
self.__writer: Optional[asyncio.StreamWriter] = None
self.__active: int = -1
@classmethod
def get_plugin_options(cls) -> Dict:
return {
"tesmart_host": Option("192.168.1.10", type=valid_ip_or_host),
"tesmart_port": Option(5000, type=valid_port),
"max_ports": Option(8, type=functools.partial(valid_number, min=4, max=16)),
"host": Option("", type=valid_ip_or_host),
"port": Option(5000, type=valid_port),
"timeout": Option(5.0, type=valid_float_f01),
"send_delay": Option(1.0, type=valid_float_f0),
"state_poll": Option(5.0, type=valid_float_f01),
}
def register_input(self, pin: int, debounce: float) -> None:
_ = pin
if not (0 < pin < 16):
raise RuntimeError(f"Unsupported port number: {pin}")
_ = debounce
def register_output(self, port: int, initial: Optional[bool]) -> None:
if port <= self.__max_ports:
self.__switch_state[port] = initial
def register_output(self, pin: int, initial: Optional[bool]) -> None:
if not (0 < pin < 16):
raise RuntimeError(f"Unsupported port number: {pin}")
_ = initial
def prepare(self) -> None:
self.__tes_socket = socket.create_connection((self.__tesmart_host,self.__tesmart_port))
self.__update_state()
def __update_state(self) -> None:
for port in self.__switch_state:
self.__switch_state[port] = False
selport = self.__get_selected_port()
if selport in self.__switch_state:
self.__switch_state[selport] = True
def __get_selected_port(self) -> int:
retint = self.__send_tesmart_command("1000")
return retint+1
def __send_tesmart_command(self,tes_cmd: str) -> int:
full_cmd="AABB03"+tes_cmd+"EE"
binstr = binascii.unhexlify(full_cmd)
self.__tes_socket.sendall(binstr)
retstr=self.__tes_socket.recv(6)
return int(bytearray(retstr)[4])
pass
async def run(self) -> None:
pass
prev_active = -2
while True:
try:
self.__active = await self.__send_command(b"\x10\x00")
except Exception:
pass
if self.__active != prev_active:
await self._notifier.notify()
prev_active = self.__active
await asyncio.sleep(self.__state_poll)
def cleanup(self) -> None:
pass
async def cleanup(self) -> None:
await self.__close_device()
async def read(self, pin: int) -> bool:
if pin in self.__switch_state:
return self.__switch_state[pin]
return False
return (self.__active == pin)
async def write(self, pin: int, state: bool) -> None:
if state == False:
return
part_cmd="01"+format(pin,"#04x")[2:4]
writeret = self.__send_tesmart_command(part_cmd)
self.__update_state()
if state:
await self.__send_command(b"\x01%.2x" % (pin - 1))
await asyncio.sleep(self.__send_delay) # Slowdown
# =====
async def __send_command(self, cmd: bytes) -> int:
assert len(cmd) == 2
(reader, writer) = await self.__ensure_device()
try:
writer.write(b"\xAA\xBB\x03%s\xEE" % (cmd))
await writer.drain()
return (await reader.readexactly(6))[4]
except Exception as err:
get_logger(0).error("Can't send command to Tesmart KVM [%s]:%d: %s",
self.__host, self.__port, tools.efmt(err))
await self.__close_device()
raise GpioDriverOfflineError(self)
async def __ensure_device(self) -> Tuple[asyncio.StreamReader, asyncio.StreamWriter]:
if self.__reader is None or self.__writer is None:
try:
(reader, writer) = await asyncio.open_connection(self.__host, self.__port)
sock = writer.get_extra_info("socket")
sock.settimeout(self.__timeout)
except Exception as err:
get_logger(0).error("Can't connect to Tesmart KVM [%s]:%d: %s",
self.__host, self.__port, tools.efmt(err))
raise GpioDriverOfflineError(self)
else:
self.__reader = reader
self.__writer = writer
return (self.__reader, self.__writer)
async def __close_device(self) -> None:
if self.__writer:
await aiotools.close_writer(self.__writer)
self.__reader = None
self.__writer = None
self.__active = -1
# =====
def __str__(self) -> str:
return f"tesmart({self._instance_name})"
return f"Tesmart({self._instance_name})"
__repr__ = __str__