reworked server components

This commit is contained in:
Maxim Devaev 2024-02-11 00:39:57 +02:00
parent 0b382c3d59
commit 2a48b7e287

View File

@ -24,13 +24,9 @@ import asyncio
import operator
import dataclasses
from typing import Tuple
from typing import List
from typing import Dict
from typing import Callable
from typing import Coroutine
from typing import AsyncGenerator
from typing import Optional
from typing import Any
from aiohttp.web import Request
@ -103,24 +99,50 @@ class StreamerH264NotSupported(OperationError):
# =====
@dataclasses.dataclass(frozen=True)
class _Component: # pylint: disable=too-many-instance-attributes
name: str
event_type: str
obj: object
sysprep: Optional[Callable[[], None]] = None
systask: Optional[Callable[[], Coroutine[Any, Any, None]]] = None
get_state: Optional[Callable[[], Coroutine[Any, Any, Dict]]] = None
poll_state: Optional[Callable[[], AsyncGenerator[Dict, None]]] = None
cleanup: Optional[Callable[[], Coroutine[Any, Any, Dict]]] = None
class _SubsystemEventSource:
get_state: (Callable[[], Coroutine[Any, Any, dict]] | None) = None
poll_state: (Callable[[], AsyncGenerator[dict, None]] | None) = None
def __post_init__(self) -> None:
if isinstance(self.obj, BasePlugin):
object.__setattr__(self, "name", f"{self.name} ({self.obj.get_plugin_name()})")
for field in ["sysprep", "systask", "get_state", "poll_state", "cleanup"]:
object.__setattr__(self, field, getattr(self.obj, field, None))
if self.get_state or self.poll_state:
assert self.event_type, self
@dataclasses.dataclass
class _Subsystem:
name: str
sysprep: (Callable[[], None] | None)
systask: (Callable[[], Coroutine[Any, Any, None]] | None)
cleanup: (Callable[[], Coroutine[Any, Any, dict]] | None)
sources: dict[str, _SubsystemEventSource]
@classmethod
def make(cls, obj: object, name: str, event_type: str="") -> "_Subsystem":
if isinstance(obj, BasePlugin):
name = f"{name} ({obj.get_plugin_name()})"
sub = _Subsystem(
name=name,
sysprep=getattr(obj, "sysprep", None),
systask=getattr(obj, "systask", None),
cleanup=getattr(obj, "cleanup", None),
sources={},
)
if event_type:
sub.add_source(
event_type=event_type,
get_state=getattr(obj, "get_state", None),
poll_state=getattr(obj, "poll_state", None),
)
return sub
def add_source(
self,
event_type: str,
get_state: (Callable[[], Coroutine[Any, Any, dict]] | None),
poll_state: (Callable[[], AsyncGenerator[dict, None]] | None),
) -> "_Subsystem":
assert event_type
assert event_type not in self.sources, (self, event_type)
assert get_state or poll_state, (self, event_type)
self.sources[event_type] = _SubsystemEventSource(get_state, poll_state)
return self
class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-instance-attributes
@ -139,9 +161,9 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
snapshoter: Snapshoter,
keymap_path: str,
ignore_keys: List[str],
mouse_x_range: Tuple[int, int],
mouse_y_range: Tuple[int, int],
ignore_keys: list[str],
mouse_x_range: tuple[int, int],
mouse_y_range: tuple[int, int],
stream_forever: bool,
) -> None:
@ -152,30 +174,12 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
self.__hid = hid
self.__streamer = streamer
self.__snapshoter = snapshoter # Not a component: No state or cleanup
self.__user_gpio = user_gpio # Has extra state "gpio_scheme_state"
self.__stream_forever = stream_forever
self.__components = [
*[
_Component("Auth manager", "", auth_manager),
],
*[
_Component(f"Info manager ({sub})", f"info_{sub}_state", info_manager.get_submanager(sub))
for sub in sorted(info_manager.get_subs())
],
*[
_Component("User-GPIO", "gpio_state", user_gpio),
_Component("HID", "hid_state", hid),
_Component("ATX", "atx_state", atx),
_Component("MSD", "msd_state", msd),
_Component("Streamer", "streamer_state", streamer),
],
]
self.__hid_api = HidApi(hid, keymap_path, ignore_keys, mouse_x_range, mouse_y_range) # Ugly hack to get keymaps state
self.__streamer_api = StreamerApi(streamer, ocr) # Same hack to get ocr langs state
self.__apis: List[object] = [
self.__apis: list[object] = [
self,
AuthApi(auth_manager),
InfoApi(info_manager),
@ -189,9 +193,22 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
RedfishApi(info_manager, atx),
]
self.__subsystems = [
_Subsystem.make(auth_manager, "Auth manager"),
_Subsystem.make(user_gpio, "User-GPIO", "gpio_state").add_source("gpio_model_state", user_gpio.get_model, None),
_Subsystem.make(hid, "HID", "hid_state").add_source("hid_keymaps_state", self.__hid_api.get_keymaps, None),
_Subsystem.make(atx, "ATX", "atx_state"),
_Subsystem.make(msd, "MSD", "msd_state"),
_Subsystem.make(streamer, "Streamer", "streamer_state").add_source("streamer_ocr_state", self.__streamer_api.get_ocr, None),
*[
_Subsystem.make(info_manager.get_submanager(sub), f"Info manager ({sub})", f"info_{sub}_state",)
for sub in sorted(info_manager.get_subs())
],
]
self.__streamer_notifier = aiotools.AioNotifier()
self.__reset_streamer = False
self.__new_streamer_params: Dict = {}
self.__new_streamer_params: dict = {}
# ===== STREAMER CONTROLLER
@ -228,39 +245,33 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
async def __ws_handler(self, request: Request) -> WebSocketResponse:
stream = valid_bool(request.query.get("stream", True))
async with self._ws_session(request, stream=stream) as ws:
stage1 = [
("gpio_model_state", self.__user_gpio.get_model()),
("hid_keymaps_state", self.__hid_api.get_keymaps()),
("streamer_ocr_state", self.__streamer_api.get_ocr()),
states = [
(event_type, src.get_state())
for sub in self.__subsystems
for (event_type, src) in sub.sources.items()
if src.get_state
]
stage2 = [
(comp.event_type, comp.get_state())
for comp in self.__components
if comp.get_state
]
stages = stage1 + stage2
events = dict(zip(
map(operator.itemgetter(0), stages),
await asyncio.gather(*map(operator.itemgetter(1), stages)),
map(operator.itemgetter(0), states),
await asyncio.gather(*map(operator.itemgetter(1), states)),
))
for stage in [stage1, stage2]:
await asyncio.gather(*[
ws.send_event(event_type, events.pop(event_type))
for (event_type, _) in stage
])
await asyncio.gather(*[
ws.send_event(event_type, events.pop(event_type))
for (event_type, _) in states
])
await ws.send_event("loop", {})
return (await self._ws_loop(ws))
@exposed_ws("ping")
async def __ws_ping_handler(self, ws: WsSession, _: Dict) -> None:
async def __ws_ping_handler(self, ws: WsSession, _: dict) -> None:
await ws.send_event("pong", {})
# ===== SYSTEM STUFF
def run(self, **kwargs: Any) -> None: # type: ignore # pylint: disable=arguments-differ
for comp in self.__components:
if comp.sysprep:
comp.sysprep()
for sub in self.__subsystems:
if sub.sysprep:
sub.sysprep()
aioproc.rename_process("main")
super().run(**kwargs)
@ -269,11 +280,12 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
async def _init_app(self) -> None:
aiotools.create_deadly_task("Stream controller", self.__stream_controller())
for comp in self.__components:
if comp.systask:
aiotools.create_deadly_task(comp.name, comp.systask())
if comp.poll_state:
aiotools.create_deadly_task(f"{comp.name} [poller]", self.__poll_state(comp.event_type, comp.poll_state()))
for sub in self.__subsystems:
if sub.systask:
aiotools.create_deadly_task(sub.name, sub.systask())
for (event_type, src) in sub.sources.items():
if src.poll_state:
aiotools.create_deadly_task(f"{sub.name} [poller]", self.__poll_state(event_type, src.poll_state()))
aiotools.create_deadly_task("Stream snapshoter", self.__stream_snapshoter())
self._add_exposed(*self.__apis)
@ -289,13 +301,13 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
async def _on_cleanup(self) -> None:
logger = get_logger(0)
for comp in self.__components:
if comp.cleanup:
logger.info("Cleaning up %s ...", comp.name)
for sub in self.__subsystems:
if sub.cleanup:
logger.info("Cleaning up %s ...", sub.name)
try:
await comp.cleanup() # type: ignore
await sub.cleanup() # type: ignore
except Exception:
logger.exception("Cleanup error on %s", comp.name)
logger.exception("Cleanup error on %s", sub.name)
logger.info("On-Cleanup complete")
async def _on_ws_opened(self) -> None:
@ -335,7 +347,7 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
prev = cur
await self.__streamer_notifier.wait()
async def __poll_state(self, event_type: str, poller: AsyncGenerator[Dict, None]) -> None:
async def __poll_state(self, event_type: str, poller: AsyncGenerator[dict, None]) -> None:
async for state in poller:
await self._broadcast_ws_event(event_type, state)