streamer state over websocket

This commit is contained in:
Devaev Maxim
2018-11-08 20:42:42 +03:00
parent 363bbdac57
commit 1640725cdc
10 changed files with 229 additions and 171 deletions

View File

@@ -113,6 +113,7 @@ class Server: # pylint: disable=too-many-instance-attributes
heartbeat: float,
atx_state_poll: float,
streamer_state_poll: float,
streamer_shutdown_delay: float,
msd_chunk_size: int,
@@ -126,8 +127,9 @@ class Server: # pylint: disable=too-many-instance-attributes
self.__streamer = streamer
self.__heartbeat = heartbeat
self.__streamer_shutdown_delay = streamer_shutdown_delay
self.__atx_state_poll = atx_state_poll
self.__streamer_state_poll = streamer_state_poll
self.__streamer_shutdown_delay = streamer_shutdown_delay
self.__msd_chunk_size = msd_chunk_size
self.__loop = loop
@@ -138,8 +140,7 @@ class Server: # pylint: disable=too-many-instance-attributes
self.__system_tasks: List[asyncio.Task] = []
self.__reset_streamer = False
self.__streamer_quality = streamer.get_current_quality()
self.__streamer_desired_fps = streamer.get_current_desired_fps()
self.__streamer_params = streamer.get_params()
def run(self, host: str, port: int) -> None:
self.__hid.start()
@@ -175,6 +176,7 @@ class Server: # pylint: disable=too-many-instance-attributes
self.__loop.create_task(self.__stream_controller()),
self.__loop.create_task(self.__poll_dead_sockets()),
self.__loop.create_task(self.__poll_atx_state()),
self.__loop.create_task(self.__poll_streamer_state()),
])
aiohttp.web.run_app(app, host=host, port=port, print=self.__run_app_print)
@@ -347,16 +349,17 @@ class Server: # pylint: disable=too-many-instance-attributes
# ===== STREAMER
async def __streamer_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
return _json(self.__streamer.get_state())
return _json(await self.__streamer.get_state())
@_wrap_exceptions_for_web("Can't set stream params")
async def __streamer_set_params_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
quality = request.query.get("quality")
if quality:
self.__streamer_quality = _valid_int("quality", quality, 1, 100)
desired_fps = request.query.get("desired_fps")
if desired_fps:
self.__streamer_desired_fps = _valid_int("desired_fps", desired_fps, 0, 30)
for (name, validator) in [
("quality", lambda arg: _valid_int("quality", arg, 1, 100)),
("desired_fps", lambda arg: _valid_int("desired_fps", arg, 0, 30)),
]:
value = request.query.get(name)
if value:
self.__streamer_params[name] = validator(value)
return _json()
async def __streamer_reset_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
@@ -402,24 +405,17 @@ class Server: # pylint: disable=too-many-instance-attributes
cur = len(self.__sockets)
if prev == 0 and cur > 0:
if not self.__streamer.is_running():
await self.__streamer.start(self.__streamer_quality, self.__streamer_desired_fps)
await self.__broadcast_event("streamer_state", **self.__streamer.get_state())
await self.__streamer.start(self.__streamer_params)
elif prev > 0 and cur == 0:
shutdown_at = time.time() + self.__streamer_shutdown_delay
elif prev == 0 and cur == 0 and time.time() > shutdown_at:
if self.__streamer.is_running():
await self.__streamer.stop()
await self.__broadcast_event("streamer_state", **self.__streamer.get_state())
if (
self.__reset_streamer
or self.__streamer_quality != self.__streamer.get_current_quality()
or self.__streamer_desired_fps != self.__streamer.get_current_desired_fps()
):
if (self.__reset_streamer or self.__streamer_params != self.__streamer.get_params()):
if self.__streamer.is_running():
await self.__streamer.stop()
await self.__streamer.start(self.__streamer_quality, self.__streamer_desired_fps, no_init_restart=True)
await self.__broadcast_event("streamer_state", **self.__streamer.get_state())
await self.__streamer.start(self.__streamer_params, no_init_restart=True)
self.__reset_streamer = False
prev = cur
@@ -440,6 +436,13 @@ class Server: # pylint: disable=too-many-instance-attributes
await self.__broadcast_event("atx_state", **self.__atx.get_state())
await asyncio.sleep(self.__atx_state_poll)
@_system_task
async def __poll_streamer_state(self) -> None:
while True:
if self.__sockets:
await self.__broadcast_event("streamer_state", **(await self.__streamer.get_state()))
await asyncio.sleep(self.__streamer_state_poll)
async def __broadcast_event(self, event: str, **kwargs: Dict) -> None:
await asyncio.gather(*[
ws.send_str(json.dumps({