diff --git a/kvmd/apps/kvmd/streamer.py b/kvmd/apps/kvmd/streamer.py deleted file mode 100644 index 08c48eb1..00000000 --- a/kvmd/apps/kvmd/streamer.py +++ /dev/null @@ -1,456 +0,0 @@ -# ========================================================================== # -# # -# KVMD - The main PiKVM daemon. # -# # -# Copyright (C) 2018-2024 Maxim Devaev # -# # -# This program is free software: you can redistribute it and/or modify # -# it under the terms of the GNU General Public License as published by # -# the Free Software Foundation, either version 3 of the License, or # -# (at your option) any later version. # -# # -# This program is distributed in the hope that it will be useful, # -# but WITHOUT ANY WARRANTY; without even the implied warranty of # -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # -# GNU General Public License for more details. # -# # -# You should have received a copy of the GNU General Public License # -# along with this program. If not, see . # -# # -# ========================================================================== # - - -import signal -import asyncio -import asyncio.subprocess -import dataclasses -import copy - -from typing import AsyncGenerator -from typing import Any - -import aiohttp - -from ...logging import get_logger - -from ...clients.streamer import StreamerSnapshot -from ...clients.streamer import HttpStreamerClient -from ...clients.streamer import HttpStreamerClientSession - -from ... import tools -from ... import aiotools -from ... import aioproc -from ... import htclient - - -# ===== -class _StreamerParams: - __DESIRED_FPS = "desired_fps" - - __QUALITY = "quality" - - __RESOLUTION = "resolution" - __AVAILABLE_RESOLUTIONS = "available_resolutions" - - __H264_BITRATE = "h264_bitrate" - __H264_GOP = "h264_gop" - - def __init__( # pylint: disable=too-many-arguments - self, - quality: int, - - resolution: str, - available_resolutions: list[str], - - desired_fps: int, - desired_fps_min: int, - desired_fps_max: int, - - h264_bitrate: int, - h264_bitrate_min: int, - h264_bitrate_max: int, - - h264_gop: int, - h264_gop_min: int, - h264_gop_max: int, - ) -> None: - - self.__has_quality = bool(quality) - self.__has_resolution = bool(resolution) - self.__has_h264 = bool(h264_bitrate) - - self.__params: dict = {self.__DESIRED_FPS: min(max(desired_fps, desired_fps_min), desired_fps_max)} - self.__limits: dict = {self.__DESIRED_FPS: {"min": desired_fps_min, "max": desired_fps_max}} - - if self.__has_quality: - self.__params[self.__QUALITY] = quality - - if self.__has_resolution: - self.__params[self.__RESOLUTION] = resolution - self.__limits[self.__AVAILABLE_RESOLUTIONS] = available_resolutions - - if self.__has_h264: - self.__params[self.__H264_BITRATE] = min(max(h264_bitrate, h264_bitrate_min), h264_bitrate_max) - self.__limits[self.__H264_BITRATE] = {"min": h264_bitrate_min, "max": h264_bitrate_max} - self.__params[self.__H264_GOP] = min(max(h264_gop, h264_gop_min), h264_gop_max) - self.__limits[self.__H264_GOP] = {"min": h264_gop_min, "max": h264_gop_max} - - def get_features(self) -> dict: - return { - self.__QUALITY: self.__has_quality, - self.__RESOLUTION: self.__has_resolution, - "h264": self.__has_h264, - } - - def get_limits(self) -> dict: - limits = copy.deepcopy(self.__limits) - if self.__has_resolution: - limits[self.__AVAILABLE_RESOLUTIONS] = list(limits[self.__AVAILABLE_RESOLUTIONS]) - return limits - - def get_params(self) -> dict: - return dict(self.__params) - - def set_params(self, params: dict) -> None: - new_params = dict(self.__params) - - if self.__QUALITY in params and self.__has_quality: - new_params[self.__QUALITY] = min(max(params[self.__QUALITY], 1), 100) - - if self.__RESOLUTION in params and self.__has_resolution: - if params[self.__RESOLUTION] in self.__limits[self.__AVAILABLE_RESOLUTIONS]: - new_params[self.__RESOLUTION] = params[self.__RESOLUTION] - - for (key, enabled) in [ - (self.__DESIRED_FPS, True), - (self.__H264_BITRATE, self.__has_h264), - (self.__H264_GOP, self.__has_h264), - ]: - if key in params and enabled: - if self.__check_limits_min_max(key, params[key]): - new_params[key] = params[key] - - self.__params = new_params - - def __check_limits_min_max(self, key: str, value: int) -> bool: - return (self.__limits[key]["min"] <= value <= self.__limits[key]["max"]) - - -class Streamer: # pylint: disable=too-many-instance-attributes - __ST_FULL = 0xFF - __ST_PARAMS = 0x01 - __ST_STREAMER = 0x02 - __ST_SNAPSHOT = 0x04 - - def __init__( # pylint: disable=too-many-arguments,too-many-locals - self, - - reset_delay: float, - shutdown_delay: float, - state_poll: float, - - unix_path: str, - timeout: float, - snapshot_timeout: float, - - process_name_prefix: str, - - pre_start_cmd: list[str], - pre_start_cmd_remove: list[str], - pre_start_cmd_append: list[str], - - cmd: list[str], - cmd_remove: list[str], - cmd_append: list[str], - - post_stop_cmd: list[str], - post_stop_cmd_remove: list[str], - post_stop_cmd_append: list[str], - - **params_kwargs: Any, - ) -> None: - - self.__reset_delay = reset_delay - self.__shutdown_delay = shutdown_delay - self.__state_poll = state_poll - - self.__unix_path = unix_path - self.__snapshot_timeout = snapshot_timeout - - self.__process_name_prefix = process_name_prefix - - self.__pre_start_cmd = tools.build_cmd(pre_start_cmd, pre_start_cmd_remove, pre_start_cmd_append) - self.__cmd = tools.build_cmd(cmd, cmd_remove, cmd_append) - self.__post_stop_cmd = tools.build_cmd(post_stop_cmd, post_stop_cmd_remove, post_stop_cmd_append) - - self.__params = _StreamerParams(**params_kwargs) - - self.__stop_task: (asyncio.Task | None) = None - self.__stop_wip = False - - self.__streamer_task: (asyncio.Task | None) = None - self.__streamer_proc: (asyncio.subprocess.Process | None) = None # pylint: disable=no-member - - self.__client = HttpStreamerClient( - name="jpeg", - unix_path=self.__unix_path, - timeout=timeout, - user_agent=htclient.make_user_agent("KVMD"), - ) - self.__client_session: (HttpStreamerClientSession | None) = None - - self.__snapshot: (StreamerSnapshot | None) = None - - self.__notifier = aiotools.AioNotifier() - - # ===== - - @aiotools.atomic_fg - async def ensure_start(self, reset: bool) -> None: - if not self.__streamer_task or self.__stop_task: - logger = get_logger(0) - - if self.__stop_task: - if not self.__stop_wip: - self.__stop_task.cancel() - await asyncio.gather(self.__stop_task, return_exceptions=True) - logger.info("Streamer stop cancelled") - return - else: - await asyncio.gather(self.__stop_task, return_exceptions=True) - - if reset and self.__reset_delay > 0: - logger.info("Waiting %.2f seconds for reset delay ...", self.__reset_delay) - await asyncio.sleep(self.__reset_delay) - logger.info("Starting streamer ...") - await self.__inner_start() - - @aiotools.atomic_fg - async def ensure_stop(self, immediately: bool) -> None: - if self.__streamer_task: - logger = get_logger(0) - - if immediately: - if self.__stop_task: - if not self.__stop_wip: - self.__stop_task.cancel() - await asyncio.gather(self.__stop_task, return_exceptions=True) - logger.info("Stopping streamer immediately ...") - await self.__inner_stop() - else: - await asyncio.gather(self.__stop_task, return_exceptions=True) - else: - logger.info("Stopping streamer immediately ...") - await self.__inner_stop() - - elif not self.__stop_task: - - async def delayed_stop() -> None: - try: - await asyncio.sleep(self.__shutdown_delay) - self.__stop_wip = True - logger.info("Stopping streamer after delay ...") - await self.__inner_stop() - finally: - self.__stop_task = None - self.__stop_wip = False - - logger.info("Planning to stop streamer in %.2f seconds ...", self.__shutdown_delay) - self.__stop_task = asyncio.create_task(delayed_stop()) - - def is_working(self) -> bool: - # Запущено и не планирует останавливаться - return bool(self.__streamer_task and not self.__stop_task) - - # ===== - - def set_params(self, params: dict) -> None: - assert not self.__streamer_task - self.__notifier.notify(self.__ST_PARAMS) - return self.__params.set_params(params) - - def get_params(self) -> dict: - return self.__params.get_params() - - # ===== - - async def get_state(self) -> dict: - return { - "features": self.__params.get_features(), - "limits": self.__params.get_limits(), - "params": self.__params.get_params(), - "streamer": (await self.__get_streamer_state()), - "snapshot": self.__get_snapshot_state(), - } - - async def trigger_state(self) -> None: - self.__notifier.notify(self.__ST_FULL) - - async def poll_state(self) -> AsyncGenerator[dict, None]: - # ==== Granularity table ==== - # - features -- Full - # - limits -- Partial, paired with params - # - params -- Partial, paired with limits - # - streamer -- Partial, nullable - # - snapshot -- Partial - # =========================== - - def signal_handler(*_: Any) -> None: - get_logger(0).info("Got SIGUSR2, checking the stream state ...") - self.__notifier.notify(self.__ST_STREAMER) - - get_logger(0).info("Installing SIGUSR2 streamer handler ...") - asyncio.get_event_loop().add_signal_handler(signal.SIGUSR2, signal_handler) - - prev: dict = {} - while True: - new: dict = {} - - mask = await self.__notifier.wait(timeout=self.__state_poll) - if mask == self.__ST_FULL: - new = await self.get_state() - prev = copy.deepcopy(new) - yield new - continue - - if mask < 0: - mask = self.__ST_STREAMER - - def check_update(key: str, value: (dict | None)) -> None: - if prev.get(key) != value: - new[key] = value - - if mask & self.__ST_PARAMS: - check_update("params", self.__params.get_params()) - if mask & self.__ST_STREAMER: - check_update("streamer", await self.__get_streamer_state()) - if mask & self.__ST_SNAPSHOT: - check_update("snapshot", self.__get_snapshot_state()) - - if new and prev != new: - prev.update(copy.deepcopy(new)) - yield new - - async def __get_streamer_state(self) -> (dict | None): - if self.__streamer_task: - session = self.__ensure_client_session() - try: - return (await session.get_state()) - except (aiohttp.ClientConnectionError, aiohttp.ServerConnectionError): - pass - except Exception: - get_logger().exception("Invalid streamer response from /state") - return None - - def __get_snapshot_state(self) -> dict: - if self.__snapshot: - snapshot = dataclasses.asdict(self.__snapshot) - del snapshot["headers"] - del snapshot["data"] - return {"saved": snapshot} - return {"saved": None} - - # ===== - - async def take_snapshot(self, save: bool, load: bool, allow_offline: bool) -> (StreamerSnapshot | None): - if load: - return self.__snapshot - logger = get_logger() - session = self.__ensure_client_session() - try: - snapshot = await session.take_snapshot(self.__snapshot_timeout) - if snapshot.online or allow_offline: - if save: - self.__snapshot = snapshot - self.__notifier.notify(self.__ST_SNAPSHOT) - return snapshot - logger.error("Stream is offline, no signal or so") - except (aiohttp.ClientConnectionError, aiohttp.ServerConnectionError) as ex: - logger.error("Can't connect to streamer: %s", tools.efmt(ex)) - except Exception: - logger.exception("Invalid streamer response from /snapshot") - return None - - def remove_snapshot(self) -> None: - self.__snapshot = None - - # ===== - - @aiotools.atomic_fg - async def cleanup(self) -> None: - await self.ensure_stop(immediately=True) - if self.__client_session: - await self.__client_session.close() - self.__client_session = None - - def __ensure_client_session(self) -> HttpStreamerClientSession: - if not self.__client_session: - self.__client_session = self.__client.make_session() - return self.__client_session - - # ===== - - @aiotools.atomic_fg - async def __inner_start(self) -> None: - assert not self.__streamer_task - await self.__run_hook("PRE-START-CMD", self.__pre_start_cmd) - self.__streamer_task = asyncio.create_task(self.__streamer_task_loop()) - - @aiotools.atomic_fg - async def __inner_stop(self) -> None: - assert self.__streamer_task - self.__streamer_task.cancel() - await asyncio.gather(self.__streamer_task, return_exceptions=True) - await self.__kill_streamer_proc() - await self.__run_hook("POST-STOP-CMD", self.__post_stop_cmd) - self.__streamer_task = None - - # ===== - - async def __streamer_task_loop(self) -> None: # pylint: disable=too-many-branches - logger = get_logger(0) - while True: # pylint: disable=too-many-nested-blocks - try: - await self.__start_streamer_proc() - assert self.__streamer_proc is not None - await aioproc.log_stdout_infinite(self.__streamer_proc, logger) - raise RuntimeError("Streamer unexpectedly died") - except asyncio.CancelledError: - break - except Exception: - if self.__streamer_proc: - logger.exception("Unexpected streamer error: pid=%d", self.__streamer_proc.pid) - else: - logger.exception("Can't start streamer") - await self.__kill_streamer_proc() - await asyncio.sleep(1) - - def __make_cmd(self, cmd: list[str]) -> list[str]: - return [ - part.format( - unix=self.__unix_path, - process_name_prefix=self.__process_name_prefix, - **self.__params.get_params(), - ) - for part in cmd - ] - - async def __run_hook(self, name: str, cmd: list[str]) -> None: - logger = get_logger() - cmd = self.__make_cmd(cmd) - logger.info("%s: %s", name, tools.cmdfmt(cmd)) - try: - await aioproc.log_process(cmd, logger, prefix=name) - except Exception as ex: - logger.exception("Can't execute command: %s", ex) - - async def __start_streamer_proc(self) -> None: - assert self.__streamer_proc is None - cmd = self.__make_cmd(self.__cmd) - self.__streamer_proc = await aioproc.run_process(cmd) - get_logger(0).info("Started streamer pid=%d: %s", self.__streamer_proc.pid, tools.cmdfmt(cmd)) - - async def __kill_streamer_proc(self) -> None: - if self.__streamer_proc: - await aioproc.kill_process(self.__streamer_proc, 1, get_logger(0)) - self.__streamer_proc = None diff --git a/kvmd/apps/kvmd/streamer/__init__.py b/kvmd/apps/kvmd/streamer/__init__.py new file mode 100644 index 00000000..08ca1975 --- /dev/null +++ b/kvmd/apps/kvmd/streamer/__init__.py @@ -0,0 +1,251 @@ +# ========================================================================== # +# # +# KVMD - The main PiKVM daemon. # +# # +# Copyright (C) 2018-2024 Maxim Devaev # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +# ========================================================================== # + + +import signal +import asyncio +import dataclasses +import copy + +from typing import AsyncGenerator +from typing import Any + +import aiohttp + +from ....logging import get_logger + +from ....clients.streamer import StreamerSnapshot +from ....clients.streamer import HttpStreamerClient +from ....clients.streamer import HttpStreamerClientSession + +from .... import tools +from .... import aiotools +from .... import htclient + +from .params import Params +from .runner import Runner + + +# ===== +class Streamer: # pylint: disable=too-many-instance-attributes + __ST_FULL = 0xFF + __ST_PARAMS = 0x01 + __ST_STREAMER = 0x02 + __ST_SNAPSHOT = 0x04 + + def __init__( # pylint: disable=too-many-arguments,too-many-locals + self, + + reset_delay: float, + shutdown_delay: float, + state_poll: float, + + unix_path: str, + timeout: float, + snapshot_timeout: float, + + process_name_prefix: str, + + pre_start_cmd: list[str], + pre_start_cmd_remove: list[str], + pre_start_cmd_append: list[str], + + cmd: list[str], + cmd_remove: list[str], + cmd_append: list[str], + + post_stop_cmd: list[str], + post_stop_cmd_remove: list[str], + post_stop_cmd_append: list[str], + + **params_kwargs: Any, + ) -> None: + + self.__state_poll = state_poll + + self.__unix_path = unix_path + self.__snapshot_timeout = snapshot_timeout + + self.__params = Params(**params_kwargs) + + self.__runner = Runner( + reset_delay=reset_delay, + shutdown_delay=shutdown_delay, + pre_start_cmd=tools.build_cmd(pre_start_cmd, pre_start_cmd_remove, pre_start_cmd_append), + cmd=tools.build_cmd(cmd, cmd_remove, cmd_append), + post_stop_cmd=tools.build_cmd(post_stop_cmd, post_stop_cmd_remove, post_stop_cmd_append), + get_params=(lambda: { + "unix": unix_path, + "process_name_prefix": process_name_prefix, + **self.__params.get_params(), + }), + ) + + self.__client = HttpStreamerClient( + name="jpeg", + unix_path=self.__unix_path, + timeout=timeout, + user_agent=htclient.make_user_agent("KVMD"), + ) + self.__client_session: (HttpStreamerClientSession | None) = None + + self.__snapshot: (StreamerSnapshot | None) = None + + self.__notifier = aiotools.AioNotifier() + + # ===== + + @aiotools.atomic_fg + async def ensure_start(self, reset: bool) -> None: + await self.__runner.ensure_start(reset) + + @aiotools.atomic_fg + async def ensure_stop(self, immediately: bool) -> None: + await self.__runner.ensure_stop(immediately) + + def is_working(self) -> bool: + return self.__runner.is_working() + + # ===== + + def set_params(self, params: dict) -> None: + assert not self.__runner._is_alive() # pylint: disable=protected-access + self.__notifier.notify(self.__ST_PARAMS) + return self.__params.set_params(params) + + def get_params(self) -> dict: + return self.__params.get_params() + + # ===== + + async def get_state(self) -> dict: + return { + "features": self.__params.get_features(), + "limits": self.__params.get_limits(), + "params": self.__params.get_params(), + "streamer": (await self.__get_streamer_state()), + "snapshot": self.__get_snapshot_state(), + } + + async def trigger_state(self) -> None: + self.__notifier.notify(self.__ST_FULL) + + async def poll_state(self) -> AsyncGenerator[dict, None]: + # ==== Granularity table ==== + # - features -- Full + # - limits -- Partial, paired with params + # - params -- Partial, paired with limits + # - streamer -- Partial, nullable + # - snapshot -- Partial + # =========================== + + def signal_handler(*_: Any) -> None: + get_logger(0).info("Got SIGUSR2, checking the stream state ...") + self.__notifier.notify(self.__ST_STREAMER) + + get_logger(0).info("Installing SIGUSR2 streamer handler ...") + asyncio.get_event_loop().add_signal_handler(signal.SIGUSR2, signal_handler) + + prev: dict = {} + while True: + new: dict = {} + + mask = await self.__notifier.wait(timeout=self.__state_poll) + if mask == self.__ST_FULL: + new = await self.get_state() + prev = copy.deepcopy(new) + yield new + continue + + if mask < 0: + mask = self.__ST_STREAMER + + def check_update(key: str, value: (dict | None)) -> None: + if prev.get(key) != value: + new[key] = value + + if mask & self.__ST_PARAMS: + check_update("params", self.__params.get_params()) + if mask & self.__ST_STREAMER: + check_update("streamer", await self.__get_streamer_state()) + if mask & self.__ST_SNAPSHOT: + check_update("snapshot", self.__get_snapshot_state()) + + if new and prev != new: + prev.update(copy.deepcopy(new)) + yield new + + async def __get_streamer_state(self) -> (dict | None): + if self.__runner._is_alive(): # pylint: disable=protected-access + session = self.__ensure_client_session() + try: + return (await session.get_state()) + except (aiohttp.ClientConnectionError, aiohttp.ServerConnectionError): + pass + except Exception: + get_logger().exception("Invalid streamer response from /state") + return None + + def __get_snapshot_state(self) -> dict: + if self.__snapshot: + snapshot = dataclasses.asdict(self.__snapshot) + del snapshot["headers"] + del snapshot["data"] + return {"saved": snapshot} + return {"saved": None} + + # ===== + + async def take_snapshot(self, save: bool, load: bool, allow_offline: bool) -> (StreamerSnapshot | None): + if load: + return self.__snapshot + logger = get_logger() + session = self.__ensure_client_session() + try: + snapshot = await session.take_snapshot(self.__snapshot_timeout) + if snapshot.online or allow_offline: + if save: + self.__snapshot = snapshot + self.__notifier.notify(self.__ST_SNAPSHOT) + return snapshot + logger.error("Stream is offline, no signal or so") + except (aiohttp.ClientConnectionError, aiohttp.ServerConnectionError) as ex: + logger.error("Can't connect to streamer: %s", tools.efmt(ex)) + except Exception: + logger.exception("Invalid streamer response from /snapshot") + return None + + def remove_snapshot(self) -> None: + self.__snapshot = None + + # ===== + + @aiotools.atomic_fg + async def cleanup(self) -> None: + await self.__runner.ensure_stop(immediately=True) + if self.__client_session: + await self.__client_session.close() + self.__client_session = None + + def __ensure_client_session(self) -> HttpStreamerClientSession: + if not self.__client_session: + self.__client_session = self.__client.make_session() + return self.__client_session diff --git a/kvmd/apps/kvmd/streamer/params.py b/kvmd/apps/kvmd/streamer/params.py new file mode 100644 index 00000000..472e233a --- /dev/null +++ b/kvmd/apps/kvmd/streamer/params.py @@ -0,0 +1,117 @@ +# ========================================================================== # +# # +# KVMD - The main PiKVM daemon. # +# # +# Copyright (C) 2018-2024 Maxim Devaev # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +# ========================================================================== # + + +import copy + + +# ===== +class Params: + __DESIRED_FPS = "desired_fps" + + __QUALITY = "quality" + + __RESOLUTION = "resolution" + __AVAILABLE_RESOLUTIONS = "available_resolutions" + + __H264 = "h264" + __H264_BITRATE = "h264_bitrate" + __H264_GOP = "h264_gop" + + def __init__( # pylint: disable=too-many-arguments + self, + quality: int, + + resolution: str, + available_resolutions: list[str], + + desired_fps: int, + desired_fps_min: int, + desired_fps_max: int, + + h264_bitrate: int, + h264_bitrate_min: int, + h264_bitrate_max: int, + + h264_gop: int, + h264_gop_min: int, + h264_gop_max: int, + ) -> None: + + self.__has_quality = bool(quality) + self.__has_resolution = bool(resolution) + self.__has_h264 = bool(h264_bitrate) + + self.__params: dict = {self.__DESIRED_FPS: min(max(desired_fps, desired_fps_min), desired_fps_max)} + self.__limits: dict = {self.__DESIRED_FPS: {"min": desired_fps_min, "max": desired_fps_max}} + + if self.__has_quality: + self.__params[self.__QUALITY] = quality + + if self.__has_resolution: + self.__params[self.__RESOLUTION] = resolution + self.__limits[self.__AVAILABLE_RESOLUTIONS] = available_resolutions + + if self.__has_h264: + self.__params[self.__H264_BITRATE] = min(max(h264_bitrate, h264_bitrate_min), h264_bitrate_max) + self.__limits[self.__H264_BITRATE] = {"min": h264_bitrate_min, "max": h264_bitrate_max} + self.__params[self.__H264_GOP] = min(max(h264_gop, h264_gop_min), h264_gop_max) + self.__limits[self.__H264_GOP] = {"min": h264_gop_min, "max": h264_gop_max} + + def get_features(self) -> dict: + return { + self.__QUALITY: self.__has_quality, + self.__RESOLUTION: self.__has_resolution, + self.__H264: self.__has_h264, + } + + def get_limits(self) -> dict: + limits = copy.deepcopy(self.__limits) + if self.__has_resolution: + limits[self.__AVAILABLE_RESOLUTIONS] = list(limits[self.__AVAILABLE_RESOLUTIONS]) + return limits + + def get_params(self) -> dict: + return dict(self.__params) + + def set_params(self, params: dict) -> None: + new = dict(self.__params) + + if self.__QUALITY in params and self.__has_quality: + new[self.__QUALITY] = min(max(params[self.__QUALITY], 1), 100) + + if self.__RESOLUTION in params and self.__has_resolution: + if params[self.__RESOLUTION] in self.__limits[self.__AVAILABLE_RESOLUTIONS]: + new[self.__RESOLUTION] = params[self.__RESOLUTION] + + for (key, enabled) in [ + (self.__DESIRED_FPS, True), + (self.__H264_BITRATE, self.__has_h264), + (self.__H264_GOP, self.__has_h264), + ]: + if key in params and enabled: + if self.__check_limits_min_max(key, params[key]): + new[key] = params[key] + + self.__params = new + + def __check_limits_min_max(self, key: str, value: int) -> bool: + return (self.__limits[key]["min"] <= value <= self.__limits[key]["max"]) diff --git a/kvmd/apps/kvmd/streamer/runner.py b/kvmd/apps/kvmd/streamer/runner.py new file mode 100644 index 00000000..00f9c7e0 --- /dev/null +++ b/kvmd/apps/kvmd/streamer/runner.py @@ -0,0 +1,183 @@ +# ========================================================================== # +# # +# KVMD - The main PiKVM daemon. # +# # +# Copyright (C) 2018-2024 Maxim Devaev # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +# ========================================================================== # + + +import asyncio +import asyncio.subprocess + +from typing import Callable + +from ....logging import get_logger + +from .... import tools +from .... import aiotools +from .... import aioproc + + +# ===== +class Runner: # pylint: disable=too-many-instance-attributes + def __init__( + self, + reset_delay: float, + shutdown_delay: float, + + pre_start_cmd: list[str], + cmd: list[str], + post_stop_cmd: list[str], + + get_params: Callable[[], dict], + ) -> None: + + self.__reset_delay = reset_delay + self.__shutdown_delay = shutdown_delay + + self.__pre_start_cmd: list[str] = pre_start_cmd + self.__cmd: list[str] = cmd + self.__post_stop_cmd: list[str] = post_stop_cmd + + self.__get_params = get_params + + self.__proc_task: (asyncio.Task | None) = None + self.__proc: (asyncio.subprocess.Process | None) = None # pylint: disable=no-member + + self.__stopper_task: (asyncio.Task | None) = None + self.__stopper_wip = False + + @aiotools.atomic_fg + async def ensure_start(self, reset: bool) -> None: + if not self.__proc_task or self.__stopper_task: + logger = get_logger(0) + + if self.__stopper_task: + if not self.__stopper_wip: + self.__stopper_task.cancel() + await asyncio.gather(self.__stopper_task, return_exceptions=True) + logger.info("Streamer stop cancelled") + return + else: + await asyncio.gather(self.__stopper_task, return_exceptions=True) + + if reset and self.__reset_delay > 0: + logger.info("Waiting %.2f seconds for reset delay ...", self.__reset_delay) + await asyncio.sleep(self.__reset_delay) + logger.info("Starting streamer ...") + await self.__inner_start() + + @aiotools.atomic_fg + async def ensure_stop(self, immediately: bool) -> None: + if self.__proc_task: + logger = get_logger(0) + + if immediately: + if self.__stopper_task: + if not self.__stopper_wip: + self.__stopper_task.cancel() + await asyncio.gather(self.__stopper_task, return_exceptions=True) + logger.info("Stopping streamer immediately ...") + await self.__inner_stop() + else: + await asyncio.gather(self.__stopper_task, return_exceptions=True) + else: + logger.info("Stopping streamer immediately ...") + await self.__inner_stop() + + elif not self.__stopper_task: + + async def delayed_stop() -> None: + try: + await asyncio.sleep(self.__shutdown_delay) + self.__stopper_wip = True + logger.info("Stopping streamer after delay ...") + await self.__inner_stop() + finally: + self.__stopper_task = None + self.__stopper_wip = False + + logger.info("Planning to stop streamer in %.2f seconds ...", self.__shutdown_delay) + self.__stopper_task = asyncio.create_task(delayed_stop()) + + def is_working(self) -> bool: + # Запущено и не планирует останавливаться + return bool(self.__proc_task and not self.__stopper_task) + + # ===== + + def _is_alive(self) -> bool: + return bool(self.__proc_task) + + @aiotools.atomic_fg + async def __inner_start(self) -> None: + assert not self.__proc_task + await self.__run_hook("PRE-START-CMD", self.__pre_start_cmd) + self.__proc_task = asyncio.create_task(self.__process_task_loop()) + + @aiotools.atomic_fg + async def __inner_stop(self) -> None: + assert self.__proc_task + self.__proc_task.cancel() + await asyncio.gather(self.__proc_task, return_exceptions=True) + await self.__kill_process() + await self.__run_hook("POST-STOP-CMD", self.__post_stop_cmd) + self.__proc_task = None + + # ===== + + async def __process_task_loop(self) -> None: # pylint: disable=too-many-branches + logger = get_logger(0) + while True: # pylint: disable=too-many-nested-blocks + try: + await self.__start_process() + assert self.__proc is not None + await aioproc.log_stdout_infinite(self.__proc, logger) + raise RuntimeError("Streamer unexpectedly died") + except asyncio.CancelledError: + break + except Exception: + if self.__proc: + logger.exception("Unexpected streamer error: pid=%d", self.__proc.pid) + else: + logger.exception("Can't start streamer") + await self.__kill_process() + await asyncio.sleep(1) + + def __make_cmd(self, cmd: list[str]) -> list[str]: + params = self.__get_params() + return [part.format(**params) for part in cmd] + + async def __run_hook(self, name: str, cmd: list[str]) -> None: + logger = get_logger() + cmd = self.__make_cmd(cmd) + logger.info("%s: %s", name, tools.cmdfmt(cmd)) + try: + await aioproc.log_process(cmd, logger, prefix=name) + except Exception: + logger.exception("Can't execute %s hook: %s", name, tools.cmdfmt(cmd)) + + async def __start_process(self) -> None: + assert self.__proc is None + cmd = self.__make_cmd(self.__cmd) + self.__proc = await aioproc.run_process(cmd) + get_logger(0).info("Started streamer pid=%d: %s", self.__proc.pid, tools.cmdfmt(cmd)) + + async def __kill_process(self) -> None: + if self.__proc: + await aioproc.kill_process(self.__proc, 1, get_logger(0)) + self.__proc = None