partial streamer events

This commit is contained in:
Maxim Devaev
2024-10-23 19:27:24 +03:00
parent 0e4a70e7b9
commit a26aee3543
7 changed files with 207 additions and 135 deletions

View File

@@ -151,6 +151,7 @@ class _Subsystem:
class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-instance-attributes
__EV_GPIO_STATE = "gpio_state"
__EV_INFO_STATE = "info_state"
__EV_STREAMER_STATE = "streamer_state"
def __init__( # pylint: disable=too-many-arguments,too-many-locals
self,
@@ -362,13 +363,16 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
)
async def __poll_state(self, event_type: str, poller: AsyncGenerator[dict, None]) -> None:
if event_type == self.__EV_GPIO_STATE:
await self.__poll_gpio_state(poller)
elif event_type == self.__EV_INFO_STATE:
await self.__poll_info_state(poller)
else:
async for state in poller:
await self._broadcast_ws_event(event_type, state)
match event_type:
case self.__EV_GPIO_STATE:
await self.__poll_gpio_state(poller)
case self.__EV_INFO_STATE:
await self.__poll_info_state(poller)
case self.__EV_STREAMER_STATE:
await self.__poll_streamer_state(poller)
case _:
async for state in poller:
await self._broadcast_ws_event(event_type, state)
async def __poll_gpio_state(self, poller: AsyncGenerator[dict, None]) -> None:
prev: dict = {"state": {"inputs": {}, "outputs": {}}}
@@ -387,3 +391,11 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
await self._broadcast_ws_event(self.__EV_INFO_STATE, state, legacy=False)
for (key, value) in state.items():
await self._broadcast_ws_event(f"info_{key}_state", value, legacy=True)
async def __poll_streamer_state(self, poller: AsyncGenerator[dict, None]) -> None:
prev: dict = {}
async for state in poller:
await self._broadcast_ws_event(self.__EV_STREAMER_STATE, state, legacy=False)
prev.update(state)
if "features" in prev: # Complete/Full
await self._broadcast_ws_event(self.__EV_STREAMER_STATE, prev, legacy=True)

View File

@@ -137,6 +137,11 @@ class _StreamerParams:
class Streamer: # pylint: disable=too-many-instance-attributes
__ST_FULL = 0xFF
__ST_PARAMS = 0x01
__ST_STREAMER = 0x02
__ST_SNAPSHOT = 0x04
def __init__( # pylint: disable=too-many-arguments,too-many-locals
self,
@@ -261,6 +266,7 @@ class Streamer: # pylint: disable=too-many-instance-attributes
def set_params(self, params: dict) -> None:
assert not self.__streamer_task
self.__notifier.notify(self.__ST_PARAMS)
return self.__params.set_params(params)
def get_params(self) -> dict:
@@ -269,49 +275,72 @@ class Streamer: # pylint: disable=too-many-instance-attributes
# =====
async def get_state(self) -> dict:
streamer_state = None
if self.__streamer_task:
session = self.__ensure_client_session()
try:
streamer_state = await session.get_state()
except (aiohttp.ClientConnectionError, aiohttp.ServerConnectionError):
pass
except Exception:
get_logger().exception("Invalid streamer response from /state")
snapshot: (dict | None) = None
if self.__snapshot:
snapshot = dataclasses.asdict(self.__snapshot)
del snapshot["headers"]
del snapshot["data"]
return {
"features": self.__params.get_features(),
"limits": self.__params.get_limits(),
"params": self.__params.get_params(),
"snapshot": {"saved": snapshot},
"streamer": streamer_state,
"features": self.__params.get_features(),
"streamer": (await self.__get_streamer_state()),
"snapshot": self.__get_snapshot_state(),
}
async def trigger_state(self) -> None:
self.__notifier.notify(1)
self.__notifier.notify(self.__ST_FULL)
async def poll_state(self) -> AsyncGenerator[dict, None]:
def signal_handler(*_: Any) -> None:
get_logger(0).info("Got SIGUSR2, checking the stream state ...")
self.__notifier.notify()
self.__notifier.notify(self.__ST_STREAMER)
get_logger(0).info("Installing SIGUSR2 streamer handler ...")
asyncio.get_event_loop().add_signal_handler(signal.SIGUSR2, signal_handler)
prev: dict = {}
while True:
if (await self.__notifier.wait(timeout=self.__state_poll)) > 0:
prev = {}
new = await self.get_state()
if new != prev:
new: dict = {}
mask = await self.__notifier.wait(timeout=self.__state_poll)
if mask == self.__ST_FULL:
new = await self.get_state()
prev = copy.deepcopy(new)
yield new
continue
if mask < 0:
mask = self.__ST_STREAMER
def check_update(key: str, value: (dict | None)) -> None:
if prev.get(key) != value:
new[key] = value
if mask & self.__ST_PARAMS:
check_update("params", self.__params.get_params())
if mask & self.__ST_STREAMER:
check_update("streamer", await self.__get_streamer_state())
if mask & self.__ST_SNAPSHOT:
check_update("snapshot", self.__get_snapshot_state())
if new and prev != new:
prev.update(copy.deepcopy(new))
yield new
async def __get_streamer_state(self) -> (dict | None):
if self.__streamer_task:
session = self.__ensure_client_session()
try:
return (await session.get_state())
except (aiohttp.ClientConnectionError, aiohttp.ServerConnectionError):
pass
except Exception:
get_logger().exception("Invalid streamer response from /state")
return None
def __get_snapshot_state(self) -> dict:
if self.__snapshot:
snapshot = dataclasses.asdict(self.__snapshot)
del snapshot["headers"]
del snapshot["data"]
return {"saved": snapshot}
return {"saved": None}
# =====
@@ -325,7 +354,7 @@ class Streamer: # pylint: disable=too-many-instance-attributes
if snapshot.online or allow_offline:
if save:
self.__snapshot = snapshot
self.__notifier.notify()
self.__notifier.notify(self.__ST_SNAPSHOT)
return snapshot
logger.error("Stream is offline, no signal or so")
except (aiohttp.ClientConnectionError, aiohttp.ServerConnectionError) as ex: