refactoring; reduce cpu consumption in streamer controller

This commit is contained in:
Devaev Maxim
2020-02-29 16:46:35 +03:00
parent 1470ebe6fa
commit 831b4fa16c
2 changed files with 114 additions and 62 deletions

View File

@@ -24,7 +24,6 @@ import os
import signal import signal
import asyncio import asyncio
import json import json
import time
from enum import Enum from enum import Enum
@@ -150,15 +149,16 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
self.__system_tasks: List[asyncio.Task] = [] self.__system_tasks: List[asyncio.Task] = []
self.__reset_streamer = False self.__reset_streamer = False
self.__streamer_params = streamer.get_params() self.__new_streamer_params: Dict = {}
async def __make_info(self) -> Dict: async def __make_info(self) -> Dict:
streamer_info = await self.__streamer.get_info()
return { return {
"version": { "version": {
"kvmd": __version__, "kvmd": __version__,
"streamer": await self.__streamer.get_version(), "streamer": streamer_info["version"],
}, },
"streamer": self.__streamer.get_app(), "streamer": streamer_info["app"],
"meta": await self.__info_manager.get_meta(), "meta": await self.__info_manager.get_meta(),
"extras": await self.__info_manager.get_extras(), "extras": await self.__info_manager.get_extras(),
} }
@@ -209,7 +209,7 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
]: ]:
value = request.query.get(name) value = request.query.get(name)
if value: if value:
self.__streamer_params[name] = validator(value) self.__new_streamer_params[name] = validator(value)
return make_json_response() return make_json_response()
@exposed_http("POST", "/streamer/reset") @exposed_http("POST", "/streamer/reset")
@@ -400,23 +400,21 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
async def __stream_controller(self) -> None: async def __stream_controller(self) -> None:
prev = 0 prev = 0
shutdown_at = 0.0
while True: while True:
cur = len(self.__sockets) cur = len(self.__sockets)
if prev == 0 and cur > 0: if prev == 0 and cur > 0:
if not self.__streamer.is_running(): await self.__streamer.ensure_start(init_restart=True)
await self.__streamer.start(self.__streamer_params)
elif prev > 0 and cur == 0: elif prev > 0 and cur == 0:
shutdown_at = time.time() + self.__streamer.shutdown_delay await self.__streamer.ensure_stop(immediately=False)
elif prev == 0 and cur == 0 and time.time() > shutdown_at:
if self.__streamer.is_running():
await self.__streamer.stop()
if (self.__reset_streamer or self.__streamer_params != self.__streamer.get_params()): if self.__reset_streamer or self.__new_streamer_params:
if self.__streamer.is_running(): start = self.__streamer.is_working()
await self.__streamer.stop() await self.__streamer.ensure_stop(immediately=True)
await self.__streamer.start(self.__streamer_params, no_init_restart=True) if self.__new_streamer_params:
self.__streamer.set_params(self.__new_streamer_params)
self.__new_streamer_params = {}
if start:
await self.__streamer.ensure_start(init_restart=False)
self.__reset_streamer = False self.__reset_streamer = False
prev = cur prev = cur

View File

@@ -73,7 +73,7 @@ class Streamer: # pylint: disable=too-many-instance-attributes
self.__sync_delay = sync_delay self.__sync_delay = sync_delay
self.__init_delay = init_delay self.__init_delay = init_delay
self.__init_restart_after = init_restart_after self.__init_restart_after = init_restart_after
self.shutdown_delay = shutdown_delay self.__shutdown_delay = shutdown_delay
self.__state_poll = state_poll self.__state_poll = state_poll
self.__params = { self.__params = {
@@ -92,15 +92,78 @@ class Streamer: # pylint: disable=too-many-instance-attributes
self.__cmd = cmd self.__cmd = cmd
self.__stop_task: Optional[asyncio.Task] = None
self.__stop_wip = False
self.__streamer_task: Optional[asyncio.Task] = None self.__streamer_task: Optional[asyncio.Task] = None
self.__streamer_proc: Optional[asyncio.subprocess.Process] = None # pylint: disable=no-member self.__streamer_proc: Optional[asyncio.subprocess.Process] = None # pylint: disable=no-member
self.__http_session: Optional[aiohttp.ClientSession] = None self.__http_session: Optional[aiohttp.ClientSession] = None
async def start(self, params: Dict, no_init_restart: bool=False) -> None: # =====
logger = get_logger()
logger.info("Starting streamer ...")
@aiotools.atomic
async def ensure_start(self, init_restart: bool) -> None:
if not self.__streamer_task or self.__stop_task:
logger = get_logger(0)
if self.__stop_task:
if not self.__stop_wip:
self.__stop_task.cancel()
await asyncio.gather(self.__stop_task, return_exceptions=True)
logger.info("Streamer stop cancelled")
return
else:
await asyncio.gather(self.__stop_task, return_exceptions=True)
logger.info("Starting streamer ...")
await self.__inner_start()
if self.__init_restart_after > 0.0 and init_restart:
await asyncio.sleep(self.__init_restart_after)
logger.info("Stopping streamer to restart ...")
await self.__inner_stop()
logger.info("Starting again ...")
await self.__inner_start()
@aiotools.atomic
async def ensure_stop(self, immediately: bool) -> None:
if self.__streamer_task:
logger = get_logger(0)
if immediately:
if self.__stop_task:
if not self.__stop_wip:
self.__stop_task.cancel()
await asyncio.gather(self.__stop_task, return_exceptions=True)
logger.info("Stopping streamer immediately ...")
await self.__inner_stop()
else:
await asyncio.gather(self.__stop_task, return_exceptions=True)
else:
logger.info("Stopping streamer immediately ...")
await self.__inner_stop()
elif not self.__stop_task:
async def delayed_stop() -> None:
try:
await asyncio.sleep(self.__shutdown_delay)
self.__stop_wip = True
logger.info("Stopping streamer after delay ...")
await self.__inner_stop()
finally:
self.__stop_task = None
self.__stop_wip = False
logger.info("Planning to stop streamer in %.2f seconds ...", self.__shutdown_delay)
self.__stop_task = asyncio.create_task(delayed_stop())
def is_working(self) -> bool:
# Запущено и не планирует останавливаться
return bool(self.__streamer_task and not self.__stop_task)
def set_params(self, params: Dict) -> None:
assert not self.__streamer_task
self.__params = { self.__params = {
key: min(max(params.get(key, self.__params[key]), a), b) key: min(max(params.get(key, self.__params[key]), a), b)
for (key, a, b) in [ for (key, a, b) in [
@@ -109,43 +172,27 @@ class Streamer: # pylint: disable=too-many-instance-attributes
] ]
} }
await self.__inner_start()
if self.__init_restart_after > 0.0 and not no_init_restart:
logger.info("Stopping streamer to restart ...")
await self.__inner_stop()
logger.info("Starting again ...")
await self.__inner_start()
async def stop(self) -> None:
get_logger().info("Stopping streamer ...")
await self.__inner_stop()
def is_running(self) -> bool:
return bool(self.__streamer_task)
def get_params(self) -> Dict:
return dict(self.__params)
async def get_state(self) -> Dict: async def get_state(self) -> Dict:
session = self.__ensure_session()
state = None state = None
try: if self.__streamer_task:
async with session.get( session = self.__ensure_session()
url=f"http://{self.__host}:{self.__port}/state", try:
headers={"User-Agent": f"KVMD/{__version__}"}, async with session.get(
timeout=self.__timeout, url=f"http://{self.__host}:{self.__port}/state",
) as response: headers={"User-Agent": f"KVMD/{__version__}"},
response.raise_for_status() timeout=self.__timeout,
state = (await response.json())["result"] ) as response:
except (aiohttp.ClientConnectionError, aiohttp.ServerConnectionError): response.raise_for_status()
pass state = (await response.json())["result"]
except asyncio.CancelledError: # pylint: disable=try-except-raise except (aiohttp.ClientConnectionError, aiohttp.ServerConnectionError):
raise pass
except Exception: except asyncio.CancelledError: # pylint: disable=try-except-raise
get_logger().exception("Invalid streamer response from /state") raise
except Exception:
get_logger().exception("Invalid streamer response from /state")
return { return {
"limits": {"max_fps": self.__max_fps}, "limits": {"max_fps": self.__max_fps},
"params": self.get_params(), "params": self.__params,
"state": state, "state": state,
} }
@@ -158,10 +205,7 @@ class Streamer: # pylint: disable=too-many-instance-attributes
prev_state = state prev_state = state
await asyncio.sleep(self.__state_poll) await asyncio.sleep(self.__state_poll)
def get_app(self) -> str: async def get_info(self) -> Dict:
return os.path.basename(self.__cmd[0])
async def get_version(self) -> str:
proc = await asyncio.create_subprocess_exec( proc = await asyncio.create_subprocess_exec(
*[self.__cmd[0], "--version"], *[self.__cmd[0], "--version"],
stdout=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE,
@@ -169,19 +213,23 @@ class Streamer: # pylint: disable=too-many-instance-attributes
preexec_fn=(lambda: signal.signal(signal.SIGINT, signal.SIG_IGN)), preexec_fn=(lambda: signal.signal(signal.SIGINT, signal.SIG_IGN)),
) )
(stdout, _) = await proc.communicate() (stdout, _) = await proc.communicate()
return stdout.decode(errors="ignore").strip() return {
"app": os.path.basename(self.__cmd[0]),
"version": stdout.decode(errors="ignore").strip(),
}
@aiotools.atomic @aiotools.atomic
async def cleanup(self) -> None: async def cleanup(self) -> None:
try: try:
if self.is_running(): await self.ensure_stop(immediately=True)
await self.stop()
if self.__http_session: if self.__http_session:
await self.__http_session.close() await self.__http_session.close()
self.__http_session = None self.__http_session = None
finally: finally:
await self.__set_hw_enabled(False) await self.__set_hw_enabled(False)
# =====
def __ensure_session(self) -> aiohttp.ClientSession: def __ensure_session(self) -> aiohttp.ClientSession:
if not self.__http_session: if not self.__http_session:
if self.__unix_path: if self.__unix_path:
@@ -190,11 +238,15 @@ class Streamer: # pylint: disable=too-many-instance-attributes
self.__http_session = aiohttp.ClientSession() self.__http_session = aiohttp.ClientSession()
return self.__http_session return self.__http_session
# =====
@aiotools.atomic
async def __inner_start(self) -> None: async def __inner_start(self) -> None:
assert not self.__streamer_task assert not self.__streamer_task
await self.__set_hw_enabled(True) await self.__set_hw_enabled(True)
self.__streamer_task = asyncio.create_task(self.__streamer_task_loop()) self.__streamer_task = asyncio.create_task(self.__streamer_task_loop())
@aiotools.atomic
async def __inner_stop(self) -> None: async def __inner_stop(self) -> None:
assert self.__streamer_task assert self.__streamer_task
self.__streamer_task.cancel() self.__streamer_task.cancel()
@@ -203,6 +255,7 @@ class Streamer: # pylint: disable=too-many-instance-attributes
await self.__set_hw_enabled(False) await self.__set_hw_enabled(False)
self.__streamer_task = None self.__streamer_task = None
@aiotools.atomic
async def __set_hw_enabled(self, enabled: bool) -> None: async def __set_hw_enabled(self, enabled: bool) -> None:
# XXX: This sequence is very important to enable converter and cap board # XXX: This sequence is very important to enable converter and cap board
if self.__cap_pin >= 0: if self.__cap_pin >= 0:
@@ -214,9 +267,10 @@ class Streamer: # pylint: disable=too-many-instance-attributes
if enabled: if enabled:
await asyncio.sleep(self.__init_delay) await asyncio.sleep(self.__init_delay)
# =====
async def __streamer_task_loop(self) -> None: # pylint: disable=too-many-branches async def __streamer_task_loop(self) -> None: # pylint: disable=too-many-branches
logger = get_logger(0) logger = get_logger(0)
while True: # pylint: disable=too-many-nested-blocks while True: # pylint: disable=too-many-nested-blocks
try: try:
await self.__start_streamer_proc() await self.__start_streamer_proc()