delayed shutdown of streamer

This commit is contained in:
Devaev Maxim
2018-06-28 03:29:17 +03:00
parent 1a419cc52d
commit 30134ba3c6
3 changed files with 44 additions and 23 deletions

View File

@@ -30,6 +30,7 @@ kvmd:
cap: 21 cap: 21
vga: 25 vga: 25
sync_delay: 1.0 sync_delay: 1.0
shutdown_delay: 10.0
mjpg_streamer: mjpg_streamer:
prog: /usr/bin/mjpg_streamer prog: /usr/bin/mjpg_streamer

View File

@@ -2,6 +2,7 @@ import asyncio
import argparse import argparse
import logging import logging
import logging.config import logging.config
import time
from typing import List from typing import List
from typing import Dict from typing import Dict
@@ -73,6 +74,7 @@ class _Application:
app.on_cleanup.append(self.__on_cleanup) app.on_cleanup.append(self.__on_cleanup)
self.__system_tasks.extend([ self.__system_tasks.extend([
self.__loop.create_task(self.__stream_controller()),
self.__loop.create_task(self.__poll_dead_sockets()), self.__loop.create_task(self.__poll_dead_sockets()),
self.__loop.create_task(self.__poll_atx_leds()), self.__loop.create_task(self.__poll_atx_leds()),
]) ])
@@ -101,23 +103,45 @@ class _Application:
return ws return ws
async def __on_shutdown(self, _: aiohttp.web.Application) -> None: async def __on_shutdown(self, _: aiohttp.web.Application) -> None:
get_logger().info("Shutting down ...") logger = get_logger()
logger.info("Cancelling system tasks ...")
for task in self.__system_tasks:
task.cancel()
await asyncio.gather(*self.__system_tasks)
logger.info("Disconnecting clients ...")
for ws in list(self.__sockets): for ws in list(self.__sockets):
await self.__remove_socket(ws) await self.__remove_socket(ws)
async def __on_cleanup(self, _: aiohttp.web.Application) -> None: async def __on_cleanup(self, _: aiohttp.web.Application) -> None:
logger = get_logger() logger = get_logger()
logger.info("Cancelling tasks ...") if self.__streamer.is_running():
for task in self.__system_tasks: await self.__streamer.stop()
task.cancel()
await asyncio.gather(*self.__system_tasks)
logger.info("Cleaning up GPIO ...") logger.info("Cleaning up GPIO ...")
GPIO.cleanup() GPIO.cleanup()
logger.info("Bye-bye") logger.info("Bye-bye")
@_system_task
async def __stream_controller(self) -> None:
prev = 0
shutdown_at = 0.0
while True:
cur = len(self.__sockets)
if prev == 0 and cur > 0:
if not self.__streamer.is_running():
await self.__streamer.start()
elif prev > 0 and cur == 0:
shutdown_at = time.time() + self.__config["video"]["shutdown_delay"]
elif prev == 0 and cur == 0 and time.time() > shutdown_at:
if self.__streamer.is_running():
await self.__streamer.stop()
prev = cur
await asyncio.sleep(0.1)
@_system_task @_system_task
async def __poll_dead_sockets(self) -> None: async def __poll_dead_sockets(self) -> None:
while True: while True:
@@ -159,8 +183,6 @@ class _Application:
self.__sockets.add(ws) self.__sockets.add(ws)
get_logger().info("Registered new client socket: remote=%s; id=%d; active=%d", get_logger().info("Registered new client socket: remote=%s; id=%d; active=%d",
ws._req.remote, id(ws), len(self.__sockets)) # pylint: disable=protected-access ws._req.remote, id(ws), len(self.__sockets)) # pylint: disable=protected-access
if len(self.__sockets) == 1:
await self.__streamer.start()
async def __remove_socket(self, ws: aiohttp.web.WebSocketResponse) -> None: async def __remove_socket(self, ws: aiohttp.web.WebSocketResponse) -> None:
async with self.__sockets_lock: async with self.__sockets_lock:
@@ -171,8 +193,6 @@ class _Application:
await ws.close() await ws.close()
except Exception: except Exception:
pass pass
if not self.__sockets:
await self.__streamer.stop()
def main() -> None: def main() -> None:

View File

@@ -32,7 +32,6 @@ class Streamer: # pylint: disable=too-many-instance-attributes
self.__loop = loop self.__loop = loop
self.__lock = asyncio.Lock()
self.__proc_task: Optional[asyncio.Task] = None self.__proc_task: Optional[asyncio.Task] = None
def __set_output_pin(self, pin: int) -> int: def __set_output_pin(self, pin: int) -> int:
@@ -41,23 +40,24 @@ class Streamer: # pylint: disable=too-many-instance-attributes
return pin return pin
async def start(self) -> None: async def start(self) -> None:
async with self.__lock:
get_logger().info("Starting mjpg_streamer ...")
assert not self.__proc_task assert not self.__proc_task
get_logger().info("Starting mjpg_streamer ...")
await self.__set_hw_enabled(True) await self.__set_hw_enabled(True)
self.__proc_task = self.__loop.create_task(self.__process()) self.__proc_task = self.__loop.create_task(self.__process())
async def stop(self) -> None: async def stop(self) -> None:
async with self.__lock: assert self.__proc_task
get_logger().info("Stopping mjpg_streamer ...") get_logger().info("Stopping mjpg_streamer ...")
if self.__proc_task:
self.__proc_task.cancel() self.__proc_task.cancel()
await asyncio.gather(self.__proc_task, return_exceptions=True) await asyncio.gather(self.__proc_task, return_exceptions=True)
await self.__set_hw_enabled(False) await self.__set_hw_enabled(False)
self.__proc_task = None self.__proc_task = None
def is_running(self) -> bool:
return bool(self.__proc_task)
async def __set_hw_enabled(self, enabled: bool) -> None: async def __set_hw_enabled(self, enabled: bool) -> None:
# This sequence is important for enable # XXX: This sequence is very important for enable
GPIO.output(self.__cap_power, enabled) GPIO.output(self.__cap_power, enabled)
if enabled: if enabled:
await asyncio.sleep(self.__sync_delay) await asyncio.sleep(self.__sync_delay)