refactoring and graceful self-kill

This commit is contained in:
Devaev Maxim 2018-06-28 07:06:47 +03:00
parent 89164b184a
commit ba3c49a816
2 changed files with 177 additions and 168 deletions

View File

@ -1,181 +1,16 @@
import asyncio
import logging
import time
from typing import List
from typing import Set
from typing import Callable
from typing import Optional
import aiohttp
from .application import init
from .atx import Atx
from .streamer import Streamer
from .server import Server
from . import gpio
# =====
_logger = logging.getLogger(__name__)
def _system_task(method: Callable) -> Callable:
async def wrap(self: "_Server") -> None:
try:
await method(self)
except asyncio.CancelledError:
pass
except Exception:
_logger.exception("Unhandled exception")
raise SystemExit(1)
return wrap
class _Server: # pylint: disable=too-many-instance-attributes
def __init__(
self,
atx: Atx,
streamer: Streamer,
heartbeat: float,
atx_leds_poll: float,
video_shutdown_delay: float,
loop: asyncio.AbstractEventLoop,
) -> None:
self.__atx = atx
self.__streamer = streamer
self.__heartbeat = heartbeat
self.__video_shutdown_delay = video_shutdown_delay
self.__atx_leds_poll = atx_leds_poll
self.__loop = loop
self.__sockets: Set[aiohttp.web.WebSocketResponse] = set()
self.__sockets_lock = asyncio.Lock()
self.__system_tasks: List[asyncio.Task] = []
def run(self, host: str, port: int) -> None:
app = aiohttp.web.Application(loop=self.__loop)
app.router.add_get("/", self.__root_handler)
app.router.add_get("/ws", self.__ws_handler)
app.on_shutdown.append(self.__on_shutdown)
app.on_cleanup.append(self.__on_cleanup)
self.__system_tasks.extend([
self.__loop.create_task(self.__stream_controller()),
self.__loop.create_task(self.__poll_dead_sockets()),
self.__loop.create_task(self.__poll_atx_leds()),
])
aiohttp.web.run_app(
app=app,
host=host,
port=port,
print=(lambda text: [_logger.info(line.strip()) for line in text.strip().splitlines()]), # type: ignore
)
async def __root_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
return aiohttp.web.Response(text="OK")
async def __ws_handler(self, request: aiohttp.web.Request) -> aiohttp.web.WebSocketResponse:
ws = aiohttp.web.WebSocketResponse(heartbeat=self.__heartbeat)
await ws.prepare(request)
await self.__register_socket(ws)
async for msg in ws:
if msg.type == aiohttp.web.WSMsgType.TEXT:
retval = await self.__execute_command(msg.data)
if retval:
await ws.send_str(retval)
else:
break
return ws
async def __on_shutdown(self, _: aiohttp.web.Application) -> None:
_logger.info("Cancelling system tasks ...")
for task in self.__system_tasks:
task.cancel()
await asyncio.gather(*self.__system_tasks)
_logger.info("Disconnecting clients ...")
for ws in list(self.__sockets):
await self.__remove_socket(ws)
async def __on_cleanup(self, _: aiohttp.web.Application) -> None:
if self.__streamer.is_running():
await self.__streamer.stop()
@_system_task
async def __stream_controller(self) -> None:
prev = 0
shutdown_at = 0.0
while True:
cur = len(self.__sockets)
if prev == 0 and cur > 0:
if not self.__streamer.is_running():
await self.__streamer.start()
elif prev > 0 and cur == 0:
shutdown_at = time.time() + self.__video_shutdown_delay
elif prev == 0 and cur == 0 and time.time() > shutdown_at:
if self.__streamer.is_running():
await self.__streamer.stop()
prev = cur
await asyncio.sleep(0.1)
@_system_task
async def __poll_dead_sockets(self) -> None:
while True:
for ws in list(self.__sockets):
if ws.closed or not ws._req.transport: # pylint: disable=protected-access
await self.__remove_socket(ws)
await asyncio.sleep(0.1)
@_system_task
async def __poll_atx_leds(self) -> None:
while True:
if self.__sockets:
await self.__broadcast("EVENT atx_leds %d %d" % (self.__atx.get_leds()))
await asyncio.sleep(self.__atx_leds_poll)
async def __broadcast(self, msg: str) -> None:
await asyncio.gather(*[
ws.send_str(msg)
for ws in list(self.__sockets)
if not ws.closed and ws._req.transport # pylint: disable=protected-access
], return_exceptions=True)
async def __execute_command(self, command: str) -> Optional[str]:
(command, args) = (command.strip().split(" ", maxsplit=1) + [""])[:2]
if command == "CLICK":
method = {
"power": self.__atx.click_power,
"power_long": self.__atx.click_power_long,
"reset": self.__atx.click_reset,
}.get(args)
if method:
await method()
return None
_logger.warning("Received an incorrect command: %r", command)
return "ERROR incorrect command"
async def __register_socket(self, ws: aiohttp.web.WebSocketResponse) -> None:
async with self.__sockets_lock:
self.__sockets.add(ws)
_logger.info("Registered new client socket: remote=%s; id=%d; active=%d",
ws._req.remote, id(ws), len(self.__sockets)) # pylint: disable=protected-access
async def __remove_socket(self, ws: aiohttp.web.WebSocketResponse) -> None:
async with self.__sockets_lock:
try:
self.__sockets.remove(ws)
_logger.info("Removed client socket: remote=%s; id=%d; active=%d",
ws._req.remote, id(ws), len(self.__sockets)) # pylint: disable=protected-access
await ws.close()
except Exception:
pass
def main() -> None:
config = init()
with gpio.bcm():
@ -198,7 +33,7 @@ def main() -> None:
loop=loop,
)
_Server(
Server(
atx=atx,
streamer=streamer,
heartbeat=config["server"]["heartbeat"],
@ -209,4 +44,4 @@ def main() -> None:
host=config["server"]["host"],
port=config["server"]["port"],
)
_logger.info("Bye-bye")
logging.getLogger(__name__).info("Bye-bye")

174
kvmd/kvmd/server.py Normal file
View File

@ -0,0 +1,174 @@
import os
import signal
import asyncio
import logging
import time
from typing import List
from typing import Set
from typing import Callable
from typing import Optional
import aiohttp
from .atx import Atx
from .streamer import Streamer
# =====
_logger = logging.getLogger(__name__)
def _system_task(method: Callable) -> Callable:
async def wrap(self: "Server") -> None:
try:
await method(self)
except asyncio.CancelledError:
pass
except Exception:
_logger.exception("Unhandled exception, killing myself ...")
os.kill(os.getpid(), signal.SIGTERM)
return wrap
class Server: # pylint: disable=too-many-instance-attributes
def __init__(
self,
atx: Atx,
streamer: Streamer,
heartbeat: float,
atx_leds_poll: float,
video_shutdown_delay: float,
loop: asyncio.AbstractEventLoop,
) -> None:
self.__atx = atx
self.__streamer = streamer
self.__heartbeat = heartbeat
self.__video_shutdown_delay = video_shutdown_delay
self.__atx_leds_poll = atx_leds_poll
self.__loop = loop
self.__sockets: Set[aiohttp.web.WebSocketResponse] = set()
self.__sockets_lock = asyncio.Lock()
self.__system_tasks: List[asyncio.Task] = []
def run(self, host: str, port: int) -> None:
app = aiohttp.web.Application(loop=self.__loop)
app.router.add_get("/", self.__root_handler)
app.router.add_get("/ws", self.__ws_handler)
app.on_shutdown.append(self.__on_shutdown)
app.on_cleanup.append(self.__on_cleanup)
self.__system_tasks.extend([
self.__loop.create_task(self.__stream_controller()),
self.__loop.create_task(self.__poll_dead_sockets()),
self.__loop.create_task(self.__poll_atx_leds()),
])
aiohttp.web.run_app(
app=app,
host=host,
port=port,
print=(lambda text: [_logger.info(line.strip()) for line in text.strip().splitlines()]), # type: ignore
)
async def __root_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
return aiohttp.web.Response(text="OK")
async def __ws_handler(self, request: aiohttp.web.Request) -> aiohttp.web.WebSocketResponse:
ws = aiohttp.web.WebSocketResponse(heartbeat=self.__heartbeat)
await ws.prepare(request)
await self.__register_socket(ws)
async for msg in ws:
if msg.type == aiohttp.web.WSMsgType.TEXT:
retval = await self.__execute_command(msg.data)
if retval:
await ws.send_str(retval)
else:
break
return ws
async def __on_shutdown(self, _: aiohttp.web.Application) -> None:
_logger.info("Cancelling system tasks ...")
for task in self.__system_tasks:
task.cancel()
await asyncio.gather(*self.__system_tasks)
_logger.info("Disconnecting clients ...")
for ws in list(self.__sockets):
await self.__remove_socket(ws)
async def __on_cleanup(self, _: aiohttp.web.Application) -> None:
if self.__streamer.is_running():
await self.__streamer.stop()
@_system_task
async def __stream_controller(self) -> None:
prev = 0
shutdown_at = 0.0
while True:
cur = len(self.__sockets)
if prev == 0 and cur > 0:
if not self.__streamer.is_running():
await self.__streamer.start()
elif prev > 0 and cur == 0:
shutdown_at = time.time() + self.__video_shutdown_delay
elif prev == 0 and cur == 0 and time.time() > shutdown_at:
if self.__streamer.is_running():
await self.__streamer.stop()
prev = cur
await asyncio.sleep(0.1)
@_system_task
async def __poll_dead_sockets(self) -> None:
while True:
for ws in list(self.__sockets):
if ws.closed or not ws._req.transport: # pylint: disable=protected-access
await self.__remove_socket(ws)
await asyncio.sleep(0.1)
@_system_task
async def __poll_atx_leds(self) -> None:
while True:
if self.__sockets:
await self.__broadcast("EVENT atx_leds %d %d" % (self.__atx.get_leds()))
await asyncio.sleep(self.__atx_leds_poll)
async def __broadcast(self, msg: str) -> None:
await asyncio.gather(*[
ws.send_str(msg)
for ws in list(self.__sockets)
if not ws.closed and ws._req.transport # pylint: disable=protected-access
], return_exceptions=True)
async def __execute_command(self, command: str) -> Optional[str]:
(command, args) = (command.strip().split(" ", maxsplit=1) + [""])[:2]
if command == "CLICK":
method = {
"power": self.__atx.click_power,
"power_long": self.__atx.click_power_long,
"reset": self.__atx.click_reset,
}.get(args)
if method:
await method()
return None
_logger.warning("Received an incorrect command: %r", command)
return "ERROR incorrect command"
async def __register_socket(self, ws: aiohttp.web.WebSocketResponse) -> None:
async with self.__sockets_lock:
self.__sockets.add(ws)
_logger.info("Registered new client socket: remote=%s; id=%d; active=%d",
ws._req.remote, id(ws), len(self.__sockets)) # pylint: disable=protected-access
async def __remove_socket(self, ws: aiohttp.web.WebSocketResponse) -> None:
async with self.__sockets_lock:
try:
self.__sockets.remove(ws)
_logger.info("Removed client socket: remote=%s; id=%d; active=%d",
ws._req.remote, id(ws), len(self.__sockets)) # pylint: disable=protected-access
await ws.close()
except Exception:
pass