diff --git a/kvmd/apps/kvmd/server.py b/kvmd/apps/kvmd/server.py index 8eecaf7f..f53fc796 100644 --- a/kvmd/apps/kvmd/server.py +++ b/kvmd/apps/kvmd/server.py @@ -322,18 +322,17 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins while True: cur = (self.__has_stream_clients() or self.__snapshoter.snapshoting() or self.__stream_forever) if not prev and cur: - await self.__streamer.ensure_start(reset=False) + await self.__streamer.ensure_start() elif prev and not cur: - await self.__streamer.ensure_stop(immediately=False) + await self.__streamer.ensure_stop() - if self.__reset_streamer or self.__new_streamer_params: - start = self.__streamer.is_working() - await self.__streamer.ensure_stop(immediately=True) - if self.__new_streamer_params: - self.__streamer.set_params(self.__new_streamer_params) - self.__new_streamer_params = {} - if start: - await self.__streamer.ensure_start(reset=self.__reset_streamer) + if self.__new_streamer_params: + self.__streamer.set_params(self.__new_streamer_params) + self.__new_streamer_params = {} + self.__reset_streamer = True + + if self.__reset_streamer: + await self.__streamer.ensure_restart() self.__reset_streamer = False prev = cur diff --git a/kvmd/apps/kvmd/streamer/__init__.py b/kvmd/apps/kvmd/streamer/__init__.py index 08ca1975..5262546d 100644 --- a/kvmd/apps/kvmd/streamer/__init__.py +++ b/kvmd/apps/kvmd/streamer/__init__.py @@ -83,6 +83,7 @@ class Streamer: # pylint: disable=too-many-instance-attributes self.__unix_path = unix_path self.__snapshot_timeout = snapshot_timeout + self.__process_name_prefix = process_name_prefix self.__params = Params(**params_kwargs) @@ -92,11 +93,6 @@ class Streamer: # pylint: disable=too-many-instance-attributes pre_start_cmd=tools.build_cmd(pre_start_cmd, pre_start_cmd_remove, pre_start_cmd_append), cmd=tools.build_cmd(cmd, cmd_remove, cmd_append), post_stop_cmd=tools.build_cmd(post_stop_cmd, post_stop_cmd_remove, post_stop_cmd_append), - get_params=(lambda: { - "unix": unix_path, - "process_name_prefix": process_name_prefix, - **self.__params.get_params(), - }), ) self.__client = HttpStreamerClient( @@ -114,20 +110,27 @@ class Streamer: # pylint: disable=too-many-instance-attributes # ===== @aiotools.atomic_fg - async def ensure_start(self, reset: bool) -> None: - await self.__runner.ensure_start(reset) + async def ensure_start(self) -> None: + await self.__runner.ensure_start(self.__make_params()) @aiotools.atomic_fg - async def ensure_stop(self, immediately: bool) -> None: - await self.__runner.ensure_stop(immediately) + async def ensure_restart(self) -> None: + await self.__runner.ensure_restart(self.__make_params()) - def is_working(self) -> bool: - return self.__runner.is_working() + def __make_params(self) -> dict: + return { + "unix": self.__unix_path, + "process_name_prefix": self.__process_name_prefix, + **self.__params.get_params(), + } + + @aiotools.atomic_fg + async def ensure_stop(self) -> None: + await self.__runner.ensure_stop(immediately=False) # ===== def set_params(self, params: dict) -> None: - assert not self.__runner._is_alive() # pylint: disable=protected-access self.__notifier.notify(self.__ST_PARAMS) return self.__params.set_params(params) @@ -194,7 +197,7 @@ class Streamer: # pylint: disable=too-many-instance-attributes yield new async def __get_streamer_state(self) -> (dict | None): - if self.__runner._is_alive(): # pylint: disable=protected-access + if self.__runner.is_running(): session = self.__ensure_client_session() try: return (await session.get_state()) diff --git a/kvmd/apps/kvmd/streamer/runner.py b/kvmd/apps/kvmd/streamer/runner.py index 00f9c7e0..fef5a5e1 100644 --- a/kvmd/apps/kvmd/streamer/runner.py +++ b/kvmd/apps/kvmd/streamer/runner.py @@ -23,8 +23,6 @@ import asyncio import asyncio.subprocess -from typing import Callable - from ....logging import get_logger from .... import tools @@ -42,8 +40,6 @@ class Runner: # pylint: disable=too-many-instance-attributes pre_start_cmd: list[str], cmd: list[str], post_stop_cmd: list[str], - - get_params: Callable[[], dict], ) -> None: self.__reset_delay = reset_delay @@ -53,8 +49,7 @@ class Runner: # pylint: disable=too-many-instance-attributes self.__cmd: list[str] = cmd self.__post_stop_cmd: list[str] = post_stop_cmd - self.__get_params = get_params - + self.__proc_params: dict = {} self.__proc_task: (asyncio.Task | None) = None self.__proc: (asyncio.subprocess.Process | None) = None # pylint: disable=no-member @@ -62,7 +57,7 @@ class Runner: # pylint: disable=too-many-instance-attributes self.__stopper_wip = False @aiotools.atomic_fg - async def ensure_start(self, reset: bool) -> None: + async def ensure_start(self, params: dict) -> None: if not self.__proc_task or self.__stopper_task: logger = get_logger(0) @@ -75,11 +70,19 @@ class Runner: # pylint: disable=too-many-instance-attributes else: await asyncio.gather(self.__stopper_task, return_exceptions=True) - if reset and self.__reset_delay > 0: - logger.info("Waiting %.2f seconds for reset delay ...", self.__reset_delay) - await asyncio.sleep(self.__reset_delay) logger.info("Starting streamer ...") - await self.__inner_start() + await self.__inner_start(params) + + @aiotools.atomic_fg + async def ensure_restart(self, params: dict) -> None: + logger = get_logger(0) + start = bool(self.__proc_task and not self.__stopper_task) # Если запущено и не планирует останавливаться + await self.ensure_stop(immediately=True) + if self.__reset_delay > 0: + logger.info("Waiting %.2f seconds for reset delay ...", self.__reset_delay) + await asyncio.sleep(self.__reset_delay) + if start: + await self.ensure_start(params) @aiotools.atomic_fg async def ensure_stop(self, immediately: bool) -> None: @@ -114,18 +117,15 @@ class Runner: # pylint: disable=too-many-instance-attributes logger.info("Planning to stop streamer in %.2f seconds ...", self.__shutdown_delay) self.__stopper_task = asyncio.create_task(delayed_stop()) - def is_working(self) -> bool: - # Запущено и не планирует останавливаться - return bool(self.__proc_task and not self.__stopper_task) + def is_running(self) -> bool: + return bool(self.__proc_task) # ===== - def _is_alive(self) -> bool: - return bool(self.__proc_task) - @aiotools.atomic_fg - async def __inner_start(self) -> None: + async def __inner_start(self, params: dict) -> None: assert not self.__proc_task + self.__proc_params = params await self.__run_hook("PRE-START-CMD", self.__pre_start_cmd) self.__proc_task = asyncio.create_task(self.__process_task_loop()) @@ -159,8 +159,7 @@ class Runner: # pylint: disable=too-many-instance-attributes await asyncio.sleep(1) def __make_cmd(self, cmd: list[str]) -> list[str]: - params = self.__get_params() - return [part.format(**params) for part in cmd] + return [part.format(**self.__proc_params) for part in cmd] async def __run_hook(self, name: str, cmd: list[str]) -> None: logger = get_logger()