rewritten anelpwr plugin

This commit is contained in:
Maxim Devaev 2023-03-07 19:50:45 +02:00
parent d3eba45644
commit 002031baf1

View File

@ -19,102 +19,152 @@
# # # #
# ========================================================================== # # ========================================================================== #
import requests
import aiohttp import asyncio
import functools import functools
from typing import Callable from typing import Callable
from typing import Any from typing import Any
import aiohttp
from ...logging import get_logger from ...logging import get_logger
from ... import tools from ... import tools
from ... import aiotools from ... import aiotools
from ... import htclient
from ...yamlconf import Option from ...yamlconf import Option
from ...validators.hw import valid_number
from ...validators.basic import valid_stripped_string_not_empty from ...validators.basic import valid_stripped_string_not_empty
from ...validators.basic import valid_bool
from ...validators.basic import valid_number
from ...validators.basic import valid_float_f01
from . import UserGpioModes
from . import BaseUserGpioDriver from . import BaseUserGpioDriver
from . import GpioDriverOfflineError from . import GpioDriverOfflineError
# ===== # =====
class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attributes class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attributes
def __init__( # pylint: disable=super-init-not-called def __init__(
self, self,
instance_name: str, instance_name: str,
notifier: aiotools.AioNotifier, notifier: aiotools.AioNotifier,
host: str,
port: str, url: str,
verify: bool,
user: str, user: str,
password: str, passwd: str,
state_poll: float,
timeout: float,
) -> None: ) -> None:
super().__init__(instance_name, notifier) super().__init__(instance_name, notifier)
self.__host = host self.__url = url
self.__port = port self.__verify = verify
self.__user = user self.__user = user
self.__password = password self.__passwd = passwd
self.__state_poll = state_poll
self.__timeout = timeout
self.__initial: dict[str, (bool | None)] = {}
self.__state: dict[str, (bool | None)] = {}
self.__update_notifier = aiotools.AioNotifier()
self.__http_session: (aiohttp.ClientSession | None) = None
@classmethod @classmethod
def get_plugin_options(cls) -> dict[str, Option]: def get_plugin_options(cls) -> dict[str, Option]:
return { return {
"host": Option([], type=valid_stripped_string_not_empty), "url": Option("", type=valid_stripped_string_not_empty),
"port": Option([], type=valid_number), "verify": Option(True, type=valid_bool),
"user": Option([], type=valid_stripped_string_not_empty), "user": Option(""),
"password": Option([], type=valid_stripped_string_not_empty), "passwd": Option(""),
"state_poll": Option(5.0, type=valid_float_f01),
"timeout": Option(5.0, type=valid_float_f01),
} }
@classmethod
def get_modes(cls) -> set[str]:
return set(UserGpioModes.ALL)
@classmethod @classmethod
def get_pin_validator(cls) -> Callable[[Any], Any]: def get_pin_validator(cls) -> Callable[[Any], Any]:
return functools.partial(valid_number, min=1,max=8) return functools.partial(valid_number, min=0, max=7, name="ANELPWR channel")
def register_input(self, pin: str, debounce: float) -> None:
_ = debounce
self.__state[pin] = None
def register_output(self, pin: str, initial: (bool | None)) -> None:
self.__initial[pin] = initial
self.__state[pin] = None
def prepare(self) -> None:
async def inner_prepare() -> None:
await asyncio.gather(*[
self.write(pin, state)
for (pin, state) in self.__initial.items()
if state is not None
], return_exceptions=True)
aiotools.run_sync(inner_prepare())
async def run(self) -> None:
prev_state: (dict | None) = None
while True:
session = self.__ensure_http_session()
try:
async with session.get(f"{self.__url}/strg.cfg") as response:
htclient.raise_not_200(response)
parts = (await response.text()).split(";")
for pin in self.__state:
self.__state[pin] = (parts[1 + int(pin) * 5] == "1")
except Exception as err:
get_logger().error("Failed ANELPWR bulk GET request: %s", tools.efmt(err))
self.__state = dict.fromkeys(self.__state, None)
if self.__state != prev_state:
self._notifier.notify()
prev_state = self.__state
await self.__update_notifier.wait(self.__state_poll)
async def cleanup(self) -> None:
if self.__http_session:
await self.__http_session.close()
self.__http_session = None
async def read(self, pin: str) -> bool: async def read(self, pin: str) -> bool:
try: if self.__state[pin] is None:
#status = 0
#body = ""
async with aiohttp.ClientSession() as session:
url = f"http://{self.__host}:{self.__port}/strg.cfg"
async with session.get(url,auth=aiohttp.BasicAuth(self.__user,self.__password)) as resp:
body = await resp.text()
if ( resp.status != 200 ):
get_logger(0).error(f"http get returned {resp.status} form {self.__host}")
raise GpioDriverOfflineError(self)
return '1' == body.split(';')[1 + (int(pin) - 1) * 5]
except Exception as e:
get_logger(0).error(e)
raise GpioDriverOfflineError(self) raise GpioDriverOfflineError(self)
return self.__state[pin] # type: ignore
async def write(self, pin: str, state: bool) -> None: async def write(self, pin: str, state: bool) -> None:
_ = pin session = self.__ensure_http_session()
if state:
onoff = f"F{int(pin)-1}=1"
else:
onoff = f"F{int(pin)-1}=0"
try: try:
async with aiohttp.ClientSession() as session: async with session.post(
url = f'http://{self.__host}:{self.__port}/ctrl.htm' url=f"{self.__url}//ctrl.htm",
headers={'Content-Type':'text/plain'} data=f"F{pin}={int(state)}",
auth=aiohttp.BasicAuth(self.__user,self.__password) headers={"Content-Type": "text/plain"},
async with session.post(url,auth=auth,headers=headers,data=onoff) as resp: ) as response:
await resp.text() htclient.raise_not_200(response)
if 200 != resp.status: except Exception as err:
raise GpioDriverOfflineError(self) get_logger().error("Failed ANELPWR POST request to pin %s: %s", pin, tools.efmt(err))
except Exception as e:
get_logger(0).error(e)
raise GpioDriverOfflineError(self) raise GpioDriverOfflineError(self)
else:
self.__update_notifier.notify()
def __ensure_http_session(self) -> aiohttp.ClientSession:
if not self.__http_session:
kwargs: dict = {
"headers": {
"User-Agent": htclient.make_user_agent("KVMD"),
},
"timeout": aiohttp.ClientTimeout(total=self.__timeout),
}
if self.__user:
kwargs["auth"] = aiohttp.BasicAuth(self.__user, self.__passwd)
if not self.__verify:
kwargs["connector"] = aiohttp.TCPConnector(ssl=False)
self.__http_session = aiohttp.ClientSession(**kwargs)
return self.__http_session
def __str__(self) -> str: def __str__(self) -> str:
return f"ANELPWR({self._instance_name})" return f"ANELPWR({self._instance_name})"