new events model

This commit is contained in:
Maxim Devaev 2024-10-21 17:46:59 +03:00
parent b67a232584
commit cda32a083f
30 changed files with 335 additions and 165 deletions

View File

@ -59,14 +59,25 @@ def queue_get_last_sync( # pylint: disable=invalid-name
# ===== # =====
class AioProcessNotifier: class AioProcessNotifier:
def __init__(self) -> None: def __init__(self) -> None:
self.__queue: "multiprocessing.Queue[None]" = multiprocessing.Queue() self.__queue: "multiprocessing.Queue[int]" = multiprocessing.Queue()
def notify(self) -> None: def notify(self, mask: int=0) -> None:
self.__queue.put_nowait(None) self.__queue.put_nowait(mask)
async def wait(self) -> None: async def wait(self) -> int:
while not (await queue_get_last(self.__queue, 0.1))[0]: while True:
pass mask = await aiotools.run_async(self.__get)
if mask >= 0:
return mask
def __get(self) -> int:
try:
mask = self.__queue.get(timeout=0.1)
while not self.__queue.empty():
mask |= self.__queue.get()
return mask
except queue.Empty:
return -1
# ===== # =====

View File

@ -232,25 +232,26 @@ async def close_writer(writer: asyncio.StreamWriter) -> bool:
# ===== # =====
class AioNotifier: class AioNotifier:
def __init__(self) -> None: def __init__(self) -> None:
self.__queue: "asyncio.Queue[None]" = asyncio.Queue() self.__queue: "asyncio.Queue[int]" = asyncio.Queue()
def notify(self) -> None: def notify(self, mask: int=0) -> None:
self.__queue.put_nowait(None) self.__queue.put_nowait(mask)
async def wait(self, timeout: (float | None)=None) -> None: async def wait(self, timeout: (float | None)=None) -> int:
mask = 0
if timeout is None: if timeout is None:
await self.__queue.get() mask = await self.__queue.get()
else: else:
try: try:
await asyncio.wait_for( mask = await asyncio.wait_for(
asyncio.ensure_future(self.__queue.get()), asyncio.ensure_future(self.__queue.get()),
timeout=timeout, timeout=timeout,
) )
except asyncio.TimeoutError: except asyncio.TimeoutError:
return # False return -1
while not self.__queue.empty(): while not self.__queue.empty():
await self.__queue.get() mask |= await self.__queue.get()
# return True return mask
# ===== # =====

View File

@ -55,10 +55,9 @@ class ExportApi:
@async_lru.alru_cache(maxsize=1, ttl=5) @async_lru.alru_cache(maxsize=1, ttl=5)
async def __get_prometheus_metrics(self) -> str: async def __get_prometheus_metrics(self) -> str:
(atx_state, hw_state, fan_state, gpio_state) = await asyncio.gather(*[ (atx_state, info_state, gpio_state) = await asyncio.gather(*[
self.__atx.get_state(), self.__atx.get_state(),
self.__info_manager.get_submanager("hw").get_state(), self.__info_manager.get_state(["hw", "fan"]),
self.__info_manager.get_submanager("fan").get_state(),
self.__user_gpio.get_state(), self.__user_gpio.get_state(),
]) ])
rows: list[str] = [] rows: list[str] = []
@ -72,8 +71,8 @@ class ExportApi:
for key in ["online", "state"]: for key in ["online", "state"]:
self.__append_prometheus_rows(rows, ch_state["state"], f"pikvm_gpio_{mode}_{key}_{channel}") self.__append_prometheus_rows(rows, ch_state["state"], f"pikvm_gpio_{mode}_{key}_{channel}")
self.__append_prometheus_rows(rows, hw_state["health"], "pikvm_hw") # type: ignore self.__append_prometheus_rows(rows, info_state["hw"]["health"], "pikvm_hw") # type: ignore
self.__append_prometheus_rows(rows, fan_state, "pikvm_fan") self.__append_prometheus_rows(rows, info_state["fan"], "pikvm_fan")
return "\n".join(rows) return "\n".join(rows)

View File

@ -20,8 +20,6 @@
# ========================================================================== # # ========================================================================== #
import asyncio
from aiohttp.web import Request from aiohttp.web import Request
from aiohttp.web import Response from aiohttp.web import Response
@ -43,15 +41,11 @@ class InfoApi:
@exposed_http("GET", "/info") @exposed_http("GET", "/info")
async def __common_state_handler(self, req: Request) -> Response: async def __common_state_handler(self, req: Request) -> Response:
fields = self.__valid_info_fields(req) fields = self.__valid_info_fields(req)
results = dict(zip(fields, await asyncio.gather(*[ return make_json_response(await self.__info_manager.get_state(fields))
self.__info_manager.get_submanager(field).get_state()
for field in fields
])))
return make_json_response(results)
def __valid_info_fields(self, req: Request) -> list[str]: def __valid_info_fields(self, req: Request) -> list[str]:
subs = self.__info_manager.get_subs() available = self.__info_manager.get_subs()
return sorted(valid_info_fields( return sorted(valid_info_fields(
arg=req.query.get("fields", ",".join(subs)), arg=req.query.get("fields", ",".join(available)),
variants=subs, variants=available,
) or subs) ) or available)

View File

@ -88,12 +88,12 @@ class RedfishApi:
@exposed_http("GET", "/redfish/v1/Systems/0") @exposed_http("GET", "/redfish/v1/Systems/0")
async def __server_handler(self, _: Request) -> Response: async def __server_handler(self, _: Request) -> Response:
(atx_state, meta_state) = await asyncio.gather(*[ (atx_state, info_state) = await asyncio.gather(*[
self.__atx.get_state(), self.__atx.get_state(),
self.__info_manager.get_submanager("meta").get_state(), self.__info_manager.get_state(["meta"]),
]) ])
try: try:
host = str(meta_state.get("server", {})["host"]) # type: ignore host = str(info_state["meta"].get("server", {})["host"]) # type: ignore
except Exception: except Exception:
host = "" host = ""
return make_json_response({ return make_json_response({

View File

@ -20,6 +20,10 @@
# ========================================================================== # # ========================================================================== #
import asyncio
from typing import AsyncGenerator
from ....yamlconf import Section from ....yamlconf import Section
from .base import BaseInfoSubmanager from .base import BaseInfoSubmanager
@ -34,7 +38,7 @@ from .fan import FanInfoSubmanager
# ===== # =====
class InfoManager: class InfoManager:
def __init__(self, config: Section) -> None: def __init__(self, config: Section) -> None:
self.__subs = { self.__subs: dict[str, BaseInfoSubmanager] = {
"system": SystemInfoSubmanager(config.kvmd.streamer.cmd), "system": SystemInfoSubmanager(config.kvmd.streamer.cmd),
"auth": AuthInfoSubmanager(config.kvmd.auth.enabled), "auth": AuthInfoSubmanager(config.kvmd.auth.enabled),
"meta": MetaInfoSubmanager(config.kvmd.info.meta), "meta": MetaInfoSubmanager(config.kvmd.info.meta),
@ -42,9 +46,42 @@ class InfoManager:
"hw": HwInfoSubmanager(**config.kvmd.info.hw._unpack()), "hw": HwInfoSubmanager(**config.kvmd.info.hw._unpack()),
"fan": FanInfoSubmanager(**config.kvmd.info.fan._unpack()), "fan": FanInfoSubmanager(**config.kvmd.info.fan._unpack()),
} }
self.__queue: "asyncio.Queue[tuple[str, (dict | None)]]" = asyncio.Queue()
def get_subs(self) -> set[str]: def get_subs(self) -> set[str]:
return set(self.__subs) return set(self.__subs)
def get_submanager(self, name: str) -> BaseInfoSubmanager: async def get_state(self, fields: (list[str] | None)=None) -> dict:
return self.__subs[name] fields = (fields or list(self.__subs))
return dict(zip(fields, await asyncio.gather(*[
self.__subs[field].get_state()
for field in fields
])))
async def trigger_state(self) -> None:
await asyncio.gather(*[
sub.trigger_state()
for sub in self.__subs.values()
])
async def poll_state(self) -> AsyncGenerator[dict, None]:
while True:
(field, value) = await self.__queue.get()
yield {field: value}
async def systask(self) -> None:
tasks = [
asyncio.create_task(self.__poller(field))
for field in self.__subs
]
try:
await asyncio.gather(*tasks)
except Exception:
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
raise
async def __poller(self, field: str) -> None:
async for state in self.__subs[field].poll_state():
self.__queue.put_nowait((field, state))

View File

@ -20,6 +20,10 @@
# ========================================================================== # # ========================================================================== #
from typing import AsyncGenerator
from .... import aiotools
from .base import BaseInfoSubmanager from .base import BaseInfoSubmanager
@ -27,6 +31,15 @@ from .base import BaseInfoSubmanager
class AuthInfoSubmanager(BaseInfoSubmanager): class AuthInfoSubmanager(BaseInfoSubmanager):
def __init__(self, enabled: bool) -> None: def __init__(self, enabled: bool) -> None:
self.__enabled = enabled self.__enabled = enabled
self.__notifier = aiotools.AioNotifier()
async def get_state(self) -> dict: async def get_state(self) -> dict:
return {"enabled": self.__enabled} return {"enabled": self.__enabled}
async def trigger_state(self) -> None:
self.__notifier.notify()
async def poll_state(self) -> AsyncGenerator[(dict | None), None]:
while True:
await self.__notifier.wait()
yield (await self.get_state())

View File

@ -20,7 +20,17 @@
# ========================================================================== # # ========================================================================== #
from typing import AsyncGenerator
# ===== # =====
class BaseInfoSubmanager: class BaseInfoSubmanager:
async def get_state(self) -> (dict | None): async def get_state(self) -> (dict | None):
raise NotImplementedError raise NotImplementedError
async def trigger_state(self) -> None:
raise NotImplementedError
async def poll_state(self) -> AsyncGenerator[(dict | None), None]:
yield None
raise NotImplementedError

View File

@ -24,6 +24,8 @@ import os
import re import re
import asyncio import asyncio
from typing import AsyncGenerator
from ....logging import get_logger from ....logging import get_logger
from ....yamlconf import Section from ....yamlconf import Section
@ -41,6 +43,7 @@ from .base import BaseInfoSubmanager
class ExtrasInfoSubmanager(BaseInfoSubmanager): class ExtrasInfoSubmanager(BaseInfoSubmanager):
def __init__(self, global_config: Section) -> None: def __init__(self, global_config: Section) -> None:
self.__global_config = global_config self.__global_config = global_config
self.__notifier = aiotools.AioNotifier()
async def get_state(self) -> (dict | None): async def get_state(self) -> (dict | None):
try: try:
@ -65,6 +68,14 @@ class ExtrasInfoSubmanager(BaseInfoSubmanager):
if sui is not None: if sui is not None:
await aiotools.shield_fg(sui.close()) await aiotools.shield_fg(sui.close())
async def trigger_state(self) -> None:
self.__notifier.notify()
async def poll_state(self) -> AsyncGenerator[(dict | None), None]:
while True:
await self.__notifier.wait()
yield (await self.get_state())
def __get_extras_path(self, *parts: str) -> str: def __get_extras_path(self, *parts: str) -> str:
return os.path.join(self.__global_config.kvmd.info.extras, *parts) return os.path.join(self.__global_config.kvmd.info.extras, *parts)

View File

@ -21,7 +21,6 @@
import copy import copy
import asyncio
from typing import AsyncGenerator from typing import AsyncGenerator
@ -53,6 +52,8 @@ class FanInfoSubmanager(BaseInfoSubmanager):
self.__timeout = timeout self.__timeout = timeout
self.__state_poll = state_poll self.__state_poll = state_poll
self.__notifier = aiotools.AioNotifier()
async def get_state(self) -> dict: async def get_state(self) -> dict:
monitored = await self.__get_monitored() monitored = await self.__get_monitored()
return { return {
@ -60,24 +61,28 @@ class FanInfoSubmanager(BaseInfoSubmanager):
"state": ((await self.__get_fan_state() if monitored else None)), "state": ((await self.__get_fan_state() if monitored else None)),
} }
async def poll_state(self) -> AsyncGenerator[dict, None]: async def trigger_state(self) -> None:
prev_state: dict = {} self.__notifier.notify(1)
async def poll_state(self) -> AsyncGenerator[(dict | None), None]:
prev: dict = {}
while True: while True:
if self.__unix_path: if self.__unix_path:
pure = state = await self.get_state() if (await self.__notifier.wait(timeout=self.__state_poll)) > 0:
prev = {}
new = await self.get_state()
pure = copy.deepcopy(new)
if pure["state"] is not None: if pure["state"] is not None:
try: try:
pure = copy.deepcopy(state)
pure["state"]["service"]["now_ts"] = 0 pure["state"]["service"]["now_ts"] = 0
except Exception: except Exception:
pass pass
if pure != prev_state: if pure != prev:
yield state prev = pure
prev_state = pure yield new
await asyncio.sleep(self.__state_poll)
else: else:
await self.__notifier.wait()
yield (await self.get_state()) yield (await self.get_state())
await aiotools.wait_infinite()
# ===== # =====

View File

@ -22,6 +22,7 @@
import os import os
import asyncio import asyncio
import copy
from typing import Callable from typing import Callable
from typing import AsyncGenerator from typing import AsyncGenerator
@ -60,6 +61,8 @@ class HwInfoSubmanager(BaseInfoSubmanager):
self.__dt_cache: dict[str, str] = {} self.__dt_cache: dict[str, str] = {}
self.__notifier = aiotools.AioNotifier()
async def get_state(self) -> dict: async def get_state(self) -> dict:
( (
base, base,
@ -97,14 +100,18 @@ class HwInfoSubmanager(BaseInfoSubmanager):
}, },
} }
async def trigger_state(self) -> None:
self.__notifier.notify(1)
async def poll_state(self) -> AsyncGenerator[dict, None]: async def poll_state(self) -> AsyncGenerator[dict, None]:
prev_state: dict = {} prev: dict = {}
while True: while True:
state = await self.get_state() if (await self.__notifier.wait(timeout=self.__state_poll)) > 0:
if state != prev_state: prev = {}
yield state new = await self.get_state()
prev_state = state if new != prev:
await asyncio.sleep(self.__state_poll) prev = copy.deepcopy(new)
yield new
# ===== # =====

View File

@ -20,6 +20,8 @@
# ========================================================================== # # ========================================================================== #
from typing import AsyncGenerator
from ....logging import get_logger from ....logging import get_logger
from ....yamlconf.loader import load_yaml_file from ....yamlconf.loader import load_yaml_file
@ -33,6 +35,7 @@ from .base import BaseInfoSubmanager
class MetaInfoSubmanager(BaseInfoSubmanager): class MetaInfoSubmanager(BaseInfoSubmanager):
def __init__(self, meta_path: str) -> None: def __init__(self, meta_path: str) -> None:
self.__meta_path = meta_path self.__meta_path = meta_path
self.__notifier = aiotools.AioNotifier()
async def get_state(self) -> (dict | None): async def get_state(self) -> (dict | None):
try: try:
@ -40,3 +43,11 @@ class MetaInfoSubmanager(BaseInfoSubmanager):
except Exception: except Exception:
get_logger(0).exception("Can't parse meta") get_logger(0).exception("Can't parse meta")
return None return None
async def trigger_state(self) -> None:
self.__notifier.notify()
async def poll_state(self) -> AsyncGenerator[(dict | None), None]:
while True:
await self.__notifier.wait()
yield (await self.get_state())

View File

@ -24,8 +24,11 @@ import os
import asyncio import asyncio
import platform import platform
from typing import AsyncGenerator
from ....logging import get_logger from ....logging import get_logger
from .... import aiotools
from .... import aioproc from .... import aioproc
from .... import __version__ from .... import __version__
@ -37,6 +40,7 @@ from .base import BaseInfoSubmanager
class SystemInfoSubmanager(BaseInfoSubmanager): class SystemInfoSubmanager(BaseInfoSubmanager):
def __init__(self, streamer_cmd: list[str]) -> None: def __init__(self, streamer_cmd: list[str]) -> None:
self.__streamer_cmd = streamer_cmd self.__streamer_cmd = streamer_cmd
self.__notifier = aiotools.AioNotifier()
async def get_state(self) -> dict: async def get_state(self) -> dict:
streamer_info = await self.__get_streamer_info() streamer_info = await self.__get_streamer_info()
@ -50,6 +54,14 @@ class SystemInfoSubmanager(BaseInfoSubmanager):
}, },
} }
async def trigger_state(self) -> None:
self.__notifier.notify()
async def poll_state(self) -> AsyncGenerator[(dict | None), None]:
while True:
await self.__notifier.wait()
yield (await self.get_state())
# ===== # =====
async def __get_streamer_info(self) -> dict: async def __get_streamer_info(self) -> dict:

View File

@ -150,6 +150,7 @@ class _Subsystem:
class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-instance-attributes class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-instance-attributes
__EV_GPIO_STATE = "gpio_state" __EV_GPIO_STATE = "gpio_state"
__EV_INFO_STATE = "info_state"
def __init__( # pylint: disable=too-many-arguments,too-many-locals def __init__( # pylint: disable=too-many-arguments,too-many-locals
self, self,
@ -205,10 +206,7 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
_Subsystem.make(atx, "ATX", "atx_state"), _Subsystem.make(atx, "ATX", "atx_state"),
_Subsystem.make(msd, "MSD", "msd_state"), _Subsystem.make(msd, "MSD", "msd_state"),
_Subsystem.make(streamer, "Streamer", "streamer_state").add_source("streamer_ocr_state", self.__streamer_api.get_ocr, None, None), _Subsystem.make(streamer, "Streamer", "streamer_state").add_source("streamer_ocr_state", self.__streamer_api.get_ocr, None, None),
*[ _Subsystem.make(info_manager, "Info manager", self.__EV_INFO_STATE),
_Subsystem.make(info_manager.get_submanager(sub), f"Info manager ({sub})", f"info_{sub}_state",)
for sub in sorted(info_manager.get_subs())
],
] ]
self.__streamer_notifier = aiotools.AioNotifier() self.__streamer_notifier = aiotools.AioNotifier()
@ -251,6 +249,7 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
stream = valid_bool(req.query.get("stream", True)) stream = valid_bool(req.query.get("stream", True))
legacy = valid_bool(req.query.get("legacy", True)) legacy = valid_bool(req.query.get("legacy", True))
async with self._ws_session(req, stream=stream, legacy=legacy) as ws: async with self._ws_session(req, stream=stream, legacy=legacy) as ws:
await ws.send_event("loop", {})
states = [ states = [
(event_type, src.get_state()) (event_type, src.get_state())
for sub in self.__subsystems for sub in self.__subsystems
@ -269,7 +268,6 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
for src in sub.sources.values(): for src in sub.sources.values():
if src.trigger_state: if src.trigger_state:
await src.trigger_state() await src.trigger_state()
await ws.send_event("loop", {})
return (await self._ws_loop(ws)) return (await self._ws_loop(ws))
@exposed_ws("ping") @exposed_ws("ping")
@ -366,6 +364,8 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
async def __poll_state(self, event_type: str, poller: AsyncGenerator[dict, None]) -> None: async def __poll_state(self, event_type: str, poller: AsyncGenerator[dict, None]) -> None:
if event_type == self.__EV_GPIO_STATE: if event_type == self.__EV_GPIO_STATE:
await self.__poll_gpio_state(poller) await self.__poll_gpio_state(poller)
elif event_type == self.__EV_INFO_STATE:
await self.__poll_info_state(poller)
else: else:
async for state in poller: async for state in poller:
await self._broadcast_ws_event(event_type, state) await self._broadcast_ws_event(event_type, state)
@ -381,3 +381,9 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
prev["state"]["inputs"].update(state["state"].get("inputs", {})) prev["state"]["inputs"].update(state["state"].get("inputs", {}))
prev["state"]["outputs"].update(state["state"].get("outputs", {})) prev["state"]["outputs"].update(state["state"].get("outputs", {}))
await self._broadcast_ws_event(self.__EV_GPIO_STATE, prev["state"], legacy=True) await self._broadcast_ws_event(self.__EV_GPIO_STATE, prev["state"], legacy=True)
async def __poll_info_state(self, poller: AsyncGenerator[dict, None]) -> None:
async for state in poller:
await self._broadcast_ws_event(self.__EV_INFO_STATE, state, legacy=False)
for (key, value) in state.items():
await self._broadcast_ws_event(f"info_{key}_state", value, legacy=True)

View File

@ -26,6 +26,7 @@ import asyncio
import asyncio.subprocess import asyncio.subprocess
import dataclasses import dataclasses
import functools import functools
import copy
from typing import AsyncGenerator from typing import AsyncGenerator
from typing import Any from typing import Any
@ -136,7 +137,7 @@ class _StreamerParams:
} }
def get_limits(self) -> dict: def get_limits(self) -> dict:
limits = dict(self.__limits) limits = copy.deepcopy(self.__limits)
if self.__has_resolution: if self.__has_resolution:
limits[self.__AVAILABLE_RESOLUTIONS] = list(limits[self.__AVAILABLE_RESOLUTIONS]) limits[self.__AVAILABLE_RESOLUTIONS] = list(limits[self.__AVAILABLE_RESOLUTIONS])
return limits return limits
@ -323,6 +324,9 @@ class Streamer: # pylint: disable=too-many-instance-attributes
"features": self.__params.get_features(), "features": self.__params.get_features(),
} }
async def trigger_state(self) -> None:
self.__notifier.notify(1)
async def poll_state(self) -> AsyncGenerator[dict, None]: async def poll_state(self) -> AsyncGenerator[dict, None]:
def signal_handler(*_: Any) -> None: def signal_handler(*_: Any) -> None:
get_logger(0).info("Got SIGUSR2, checking the stream state ...") get_logger(0).info("Got SIGUSR2, checking the stream state ...")
@ -331,21 +335,14 @@ class Streamer: # pylint: disable=too-many-instance-attributes
get_logger(0).info("Installing SIGUSR2 streamer handler ...") get_logger(0).info("Installing SIGUSR2 streamer handler ...")
asyncio.get_event_loop().add_signal_handler(signal.SIGUSR2, signal_handler) asyncio.get_event_loop().add_signal_handler(signal.SIGUSR2, signal_handler)
waiter_task: (asyncio.Task | None) = None prev: dict = {}
prev_state: dict = {}
while True: while True:
state = await self.get_state() if (await self.__notifier.wait(timeout=self.__state_poll)) > 0:
if state != prev_state: prev = {}
yield state new = await self.get_state()
prev_state = state if new != prev:
prev = copy.deepcopy(new)
if waiter_task is None: yield new
waiter_task = asyncio.create_task(self.__notifier.wait())
if waiter_task in (await aiotools.wait_first(
asyncio.ensure_future(asyncio.sleep(self.__state_poll)),
waiter_task,
))[0]:
waiter_task = None
# ===== # =====

View File

@ -234,7 +234,6 @@ class _GpioOutput: # pylint: disable=too-many-instance-attributes
class UserGpio: class UserGpio:
def __init__(self, config: Section, otg_config: Section) -> None: def __init__(self, config: Section, otg_config: Section) -> None:
self.__notifier = aiotools.AioNotifier() self.__notifier = aiotools.AioNotifier()
self.__full_state_requested = True
self.__drivers = { self.__drivers = {
driver: get_ugpio_driver_class(drv_config.type)( driver: get_ugpio_driver_class(drv_config.type)(
@ -269,14 +268,12 @@ class UserGpio:
} }
async def trigger_state(self) -> None: async def trigger_state(self) -> None:
self.__full_state_requested = True self.__notifier.notify(1)
self.__notifier.notify()
async def poll_state(self) -> AsyncGenerator[dict, None]: async def poll_state(self) -> AsyncGenerator[dict, None]:
prev: dict = {"inputs": {}, "outputs": {}} prev: dict = {"inputs": {}, "outputs": {}}
while True: # pylint: disable=too-many-nested-blocks while True: # pylint: disable=too-many-nested-blocks
if self.__full_state_requested: if (await self.__notifier.wait()) > 0:
self.__full_state_requested = False
full = await self.get_state() full = await self.get_state()
prev = copy.deepcopy(full["state"]) prev = copy.deepcopy(full["state"])
yield full yield full
@ -285,14 +282,13 @@ class UserGpio:
diff: dict = {} diff: dict = {}
for sub in ["inputs", "outputs"]: for sub in ["inputs", "outputs"]:
for ch in new[sub]: for ch in new[sub]:
if new[sub][ch] != prev[sub][ch]: if new[sub][ch] != prev[sub].get(ch):
if sub not in diff: if sub not in diff:
diff[sub] = {} diff[sub] = {}
diff[sub][ch] = new[sub][ch] diff[sub][ch] = new[sub][ch]
if diff: if diff:
prev = copy.deepcopy(new) prev = copy.deepcopy(new)
yield {"state": diff} yield {"state": diff}
await self.__notifier.wait()
async def __get_io_state(self) -> dict: async def __get_io_state(self) -> dict:
return { return {

View File

@ -175,9 +175,10 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
self.__kvmd_ws = None self.__kvmd_ws = None
async def __process_ws_event(self, event_type: str, event: dict) -> None: async def __process_ws_event(self, event_type: str, event: dict) -> None:
if event_type == "info_meta_state": if event_type == "info_state":
if "meta" in event:
try: try:
host = event["server"]["host"] host = event["meta"]["server"]["host"]
except Exception: except Exception:
host = None host = None
else: else:

View File

@ -222,7 +222,7 @@ class KvmdClientSession:
@contextlib.asynccontextmanager @contextlib.asynccontextmanager
async def ws(self) -> AsyncGenerator[KvmdClientWs, None]: async def ws(self) -> AsyncGenerator[KvmdClientWs, None]:
session = self.__ensure_http_session() session = self.__ensure_http_session()
async with session.ws_connect(self.__make_url("ws")) as ws: async with session.ws_connect(self.__make_url("ws"), params={"legacy": 0}) as ws:
yield KvmdClientWs(ws) yield KvmdClientWs(ws)
def __ensure_http_session(self) -> aiohttp.ClientSession: def __ensure_http_session(self) -> aiohttp.ClientSession:
@ -267,16 +267,15 @@ class KvmdClient:
) )
def __make_http_session(self, user: str, passwd: str) -> aiohttp.ClientSession: def __make_http_session(self, user: str, passwd: str) -> aiohttp.ClientSession:
kwargs: dict = { return aiohttp.ClientSession(
"headers": { headers={
"X-KVMD-User": user, "X-KVMD-User": user,
"X-KVMD-Passwd": passwd, "X-KVMD-Passwd": passwd,
"User-Agent": self.__user_agent, "User-Agent": self.__user_agent,
}, },
"connector": aiohttp.UnixConnector(path=self.__unix_path), connector=aiohttp.UnixConnector(path=self.__unix_path),
"timeout": aiohttp.ClientTimeout(total=self.__timeout), timeout=aiohttp.ClientTimeout(total=self.__timeout),
} )
return aiohttp.ClientSession(**kwargs)
def __make_url(self, handle: str) -> str: def __make_url(self, handle: str) -> str:
assert not handle.startswith("/"), handle assert not handle.startswith("/"), handle

View File

@ -48,6 +48,9 @@ class BaseAtx(BasePlugin):
async def get_state(self) -> dict: async def get_state(self) -> dict:
raise NotImplementedError raise NotImplementedError
async def trigger_state(self) -> None:
raise NotImplementedError
async def poll_state(self) -> AsyncGenerator[dict, None]: async def poll_state(self) -> AsyncGenerator[dict, None]:
yield {} yield {}
raise NotImplementedError raise NotImplementedError

View File

@ -36,6 +36,9 @@ class AtxDisabledError(AtxOperationError):
# ===== # =====
class Plugin(BaseAtx): class Plugin(BaseAtx):
def __init__(self) -> None:
self.__notifier = aiotools.AioNotifier()
async def get_state(self) -> dict: async def get_state(self) -> dict:
return { return {
"enabled": False, "enabled": False,
@ -46,10 +49,13 @@ class Plugin(BaseAtx):
}, },
} }
async def trigger_state(self) -> None:
self.__notifier.notify()
async def poll_state(self) -> AsyncGenerator[dict, None]: async def poll_state(self) -> AsyncGenerator[dict, None]:
while True: while True:
await self.__notifier.wait()
yield (await self.get_state()) yield (await self.get_state())
await aiotools.wait_infinite()
# ===== # =====

View File

@ -21,6 +21,7 @@
import asyncio import asyncio
import copy
from typing import AsyncGenerator from typing import AsyncGenerator
@ -130,14 +131,18 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes
}, },
} }
async def trigger_state(self) -> None:
self.__notifier.notify(1)
async def poll_state(self) -> AsyncGenerator[dict, None]: async def poll_state(self) -> AsyncGenerator[dict, None]:
prev_state: dict = {} prev: dict = {}
while True: while True:
state = await self.get_state() if (await self.__notifier.wait()) > 0:
if state != prev_state: prev = {}
yield state new = await self.get_state()
prev_state = state if new != prev:
await self.__notifier.wait() prev = copy.deepcopy(new)
yield new
async def systask(self) -> None: async def systask(self) -> None:
await self.__reader.poll() await self.__reader.poll()

View File

@ -63,6 +63,9 @@ class BaseHid(BasePlugin):
async def get_state(self) -> dict: async def get_state(self) -> dict:
raise NotImplementedError raise NotImplementedError
async def trigger_state(self) -> None:
raise NotImplementedError
async def poll_state(self) -> AsyncGenerator[dict, None]: async def poll_state(self) -> AsyncGenerator[dict, None]:
yield {} yield {}
raise NotImplementedError raise NotImplementedError

View File

@ -23,6 +23,7 @@
import multiprocessing import multiprocessing
import contextlib import contextlib
import queue import queue
import copy
import time import time
from typing import Iterable from typing import Iterable
@ -232,14 +233,18 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
**self._get_jiggler_state(), **self._get_jiggler_state(),
} }
async def trigger_state(self) -> None:
self.__notifier.notify(1)
async def poll_state(self) -> AsyncGenerator[dict, None]: async def poll_state(self) -> AsyncGenerator[dict, None]:
prev_state: dict = {} prev: dict = {}
while True: while True:
state = await self.get_state() if (await self.__notifier.wait()) > 0:
if state != prev_state: prev = {}
yield state new = await self.get_state()
prev_state = state if new != prev:
await self.__notifier.wait() prev = copy.deepcopy(new)
yield new
async def reset(self) -> None: async def reset(self) -> None:
self.__reset_required_event.set() self.__reset_required_event.set()

View File

@ -21,6 +21,7 @@
import multiprocessing import multiprocessing
import copy
import time import time
from typing import Iterable from typing import Iterable
@ -158,14 +159,18 @@ class Plugin(BaseHid): # pylint: disable=too-many-instance-attributes
**self._get_jiggler_state(), **self._get_jiggler_state(),
} }
async def trigger_state(self) -> None:
self.__notifier.notify(1)
async def poll_state(self) -> AsyncGenerator[dict, None]: async def poll_state(self) -> AsyncGenerator[dict, None]:
prev_state: dict = {} prev: dict = {}
while True: while True:
state = await self.get_state() if (await self.__notifier.wait()) > 0:
if state != prev_state: prev = {}
yield state new = await self.get_state()
prev_state = state if new != prev:
await self.__notifier.wait() prev = copy.deepcopy(new)
yield new
async def reset(self) -> None: async def reset(self) -> None:
self.clear_events() self.clear_events()

View File

@ -22,6 +22,7 @@
import multiprocessing import multiprocessing
import queue import queue
import copy
import time import time
from typing import Iterable from typing import Iterable
@ -119,14 +120,18 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
**self._get_jiggler_state(), **self._get_jiggler_state(),
} }
async def trigger_state(self) -> None:
self.__notifier.notify(1)
async def poll_state(self) -> AsyncGenerator[dict, None]: async def poll_state(self) -> AsyncGenerator[dict, None]:
prev_state: dict = {} prev: dict = {}
while True: while True:
state = await self.get_state() if (await self.__notifier.wait()) > 0:
if state != prev_state: prev = {}
yield state new = await self.get_state()
prev_state = state if new != prev:
await self.__notifier.wait() prev = copy.deepcopy(new)
yield new
async def reset(self) -> None: async def reset(self) -> None:
self.__reset_required_event.set() self.__reset_required_event.set()

View File

@ -20,6 +20,8 @@
# ========================================================================== # # ========================================================================== #
import copy
from typing import Iterable from typing import Iterable
from typing import AsyncGenerator from typing import AsyncGenerator
from typing import Any from typing import Any
@ -150,14 +152,18 @@ class Plugin(BaseHid): # pylint: disable=too-many-instance-attributes
**self._get_jiggler_state(), **self._get_jiggler_state(),
} }
async def trigger_state(self) -> None:
self.__notifier.notify(1)
async def poll_state(self) -> AsyncGenerator[dict, None]: async def poll_state(self) -> AsyncGenerator[dict, None]:
prev_state: dict = {} prev: dict = {}
while True: while True:
state = await self.get_state() if (await self.__notifier.wait()) > 0:
if state != prev_state: prev = {}
yield state new = await self.get_state()
prev_state = state if new != prev:
await self.__notifier.wait() prev = copy.deepcopy(new)
yield new
async def reset(self) -> None: async def reset(self) -> None:
self.__keyboard_proc.send_reset_event() self.__keyboard_proc.send_reset_event()

View File

@ -117,6 +117,9 @@ class BaseMsd(BasePlugin):
async def get_state(self) -> dict: async def get_state(self) -> dict:
raise NotImplementedError() raise NotImplementedError()
async def trigger_state(self) -> None:
raise NotImplementedError()
async def poll_state(self) -> AsyncGenerator[dict, None]: async def poll_state(self) -> AsyncGenerator[dict, None]:
if self is not None: # XXX: Vulture and pylint hack if self is not None: # XXX: Vulture and pylint hack
raise NotImplementedError() raise NotImplementedError()

View File

@ -40,6 +40,9 @@ class MsdDisabledError(MsdOperationError):
# ===== # =====
class Plugin(BaseMsd): class Plugin(BaseMsd):
def __init__(self) -> None:
self.__notifier = aiotools.AioNotifier()
async def get_state(self) -> dict: async def get_state(self) -> dict:
return { return {
"enabled": False, "enabled": False,
@ -49,10 +52,13 @@ class Plugin(BaseMsd):
"drive": None, "drive": None,
} }
async def trigger_state(self) -> None:
self.__notifier.notify()
async def poll_state(self) -> AsyncGenerator[dict, None]: async def poll_state(self) -> AsyncGenerator[dict, None]:
while True: while True:
await self.__notifier.wait()
yield (await self.get_state()) yield (await self.get_state())
await aiotools.wait_infinite()
async def reset(self) -> None: async def reset(self) -> None:
raise MsdDisabledError() raise MsdDisabledError()

View File

@ -24,6 +24,7 @@ import asyncio
import contextlib import contextlib
import dataclasses import dataclasses
import functools import functools
import copy
import time import time
from typing import AsyncGenerator from typing import AsyncGenerator
@ -195,14 +196,18 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
"drive": vd, "drive": vd,
} }
async def trigger_state(self) -> None:
self.__notifier.notify(1)
async def poll_state(self) -> AsyncGenerator[dict, None]: async def poll_state(self) -> AsyncGenerator[dict, None]:
prev_state: dict = {} prev: dict = {}
while True: while True:
state = await self.get_state() if (await self.__notifier.wait()) > 0:
if state != prev_state: prev = {}
yield state new = await self.get_state()
prev_state = state if new != prev:
await self.__notifier.wait() prev = copy.deepcopy(new)
yield new
async def systask(self) -> None: async def systask(self) -> None:
await self.__watch_inotify() await self.__watch_inotify()

View File

@ -62,9 +62,21 @@ export function Session() {
/************************************************************************/ /************************************************************************/
var __setAboutInfoMeta = function(state) { var __setInfoState = function(state) {
for (let key of Object.keys(state)) {
switch (key) {
case "meta": __setInfoStateMeta(state.meta); break;
case "hw": __setInfoStateHw(state.hw); break;
case "fan": __setInfoStateFan(state.fan); break;
case "system": __setInfoStateSystem(state.system); break;
case "extras": __setInfoStateExtras(state.extras); break;
}
}
};
var __setInfoStateMeta = function(state) {
if (state !== null) { if (state !== null) {
let text = JSON.stringify(state, undefined, 4).replace(/ /g, "&nbsp;").replace(/\n/g, "<br>"); let text = tools.escape(JSON.stringify(state, undefined, 4)).replace(/ /g, "&nbsp;").replace(/\n/g, "<br>");
$("about-meta").innerHTML = ` $("about-meta").innerHTML = `
<span class="code-comment">// The PiKVM metadata.<br> <span class="code-comment">// The PiKVM metadata.<br>
// You can get this JSON using handle <a target="_blank" href="/api/info?fields=meta">/api/info?fields=meta</a>.<br> // You can get this JSON using handle <a target="_blank" href="/api/info?fields=meta">/api/info?fields=meta</a>.<br>
@ -74,10 +86,10 @@ export function Session() {
${text} ${text}
`; `;
if (state.server && state.server.host) { if (state.server && state.server.host) {
$("kvmd-meta-server-host").innerHTML = `Server: ${state.server.host}`; $("kvmd-meta-server-host").innerText = `Server: ${state.server.host}`;
document.title = `PiKVM Session: ${state.server.host}`; document.title = `PiKVM Session: ${state.server.host}`;
} else { } else {
$("kvmd-meta-server-host").innerHTML = ""; $("kvmd-meta-server-host").innerText = "";
document.title = "PiKVM Session"; document.title = "PiKVM Session";
} }
@ -88,7 +100,7 @@ export function Session() {
} }
}; };
var __setAboutInfoHw = function(state) { var __setInfoStateHw = function(state) {
if (state.health.throttling !== null) { if (state.health.throttling !== null) {
let flags = state.health.throttling.parsed_flags; let flags = state.health.throttling.parsed_flags;
let ignore_past = state.health.throttling.ignore_past; let ignore_past = state.health.throttling.ignore_past;
@ -105,7 +117,7 @@ export function Session() {
__renderAboutInfoHardware(); __renderAboutInfoHardware();
}; };
var __setAboutInfoFan = function(state) { var __setInfoStateFan = function(state) {
let failed = false; let failed = false;
let failed_past = false; let failed_past = false;
if (state.monitored) { if (state.monitored) {
@ -207,11 +219,11 @@ export function Session() {
} }
}; };
var __colored = function(color, text) { var __colored = function(color, html) {
return `<font color="${color}">${text}</font>`; return `<font color="${color}">${html}</font>`;
}; };
var __setAboutInfoSystem = function(state) { var __setInfoStateSystem = function(state) {
$("about-version").innerHTML = ` $("about-version").innerHTML = `
KVMD: <span class="code-comment">${state.kvmd.version}</span><br> KVMD: <span class="code-comment">${state.kvmd.version}</span><br>
<hr> <hr>
@ -221,8 +233,8 @@ export function Session() {
${state.kernel.system} kernel: ${state.kernel.system} kernel:
${__formatUname(state.kernel)} ${__formatUname(state.kernel)}
`; `;
$("kvmd-version-kvmd").innerHTML = state.kvmd.version; $("kvmd-version-kvmd").innerText = state.kvmd.version;
$("kvmd-version-streamer").innerHTML = state.streamer.version; $("kvmd-version-streamer").innerText = state.streamer.version;
}; };
var __formatStreamerFeatures = function(features) { var __formatStreamerFeatures = function(features) {
@ -244,14 +256,14 @@ export function Session() {
}; };
var __formatUl = function(pairs) { var __formatUl = function(pairs) {
let text = "<ul>"; let html = "";
for (let pair of pairs) { for (let pair of pairs) {
text += `<li>${pair[0]}: <span class="code-comment">${pair[1]}</span></li>`; html += `<li>${pair[0]}: <span class="code-comment">${pair[1]}</span></li>`;
} }
return text + "</ul>"; return `<ul>${html}</ul>`;
}; };
var __setExtras = function(state) { var __setInfoStateExtras = function(state) {
let show_hook = null; let show_hook = null;
let close_hook = null; let close_hook = null;
let has_webterm = (state.webterm && (state.webterm.enabled || state.webterm.started)); let has_webterm = (state.webterm && (state.webterm.enabled || state.webterm.started));
@ -354,11 +366,7 @@ export function Session() {
let data = JSON.parse(event.data); let data = JSON.parse(event.data);
switch (data.event_type) { switch (data.event_type) {
case "pong": __missed_heartbeats = 0; break; case "pong": __missed_heartbeats = 0; break;
case "info_meta_state": __setAboutInfoMeta(data.event); break; case "info_state": __setInfoState(data.event); break;
case "info_hw_state": __setAboutInfoHw(data.event); break;
case "info_fan_state": __setAboutInfoFan(data.event); break;
case "info_system_state": __setAboutInfoSystem(data.event); break;
case "info_extras_state": __setExtras(data.event); break;
case "gpio_state": __gpio.setState(data.event); break; case "gpio_state": __gpio.setState(data.event); break;
case "hid_keymaps_state": __hid.setKeymaps(data.event); break; case "hid_keymaps_state": __hid.setKeymaps(data.event); break;
case "hid_state": __hid.setState(data.event); break; case "hid_state": __hid.setState(data.event); break;