better api, refactoring

This commit is contained in:
Devaev Maxim 2018-07-02 21:17:19 +03:00
parent 87f8cb350b
commit 0582398521
6 changed files with 136 additions and 84 deletions

View File

@ -25,12 +25,12 @@ def main() -> None:
) )
atx = Atx( atx = Atx(
power_led=int(config["atx"]["leds"]["pinout"]["power"]), power_led=int(config["atx"]["pinout"]["power_led"]),
hdd_led=int(config["atx"]["leds"]["pinout"]["hdd"]), hdd_led=int(config["atx"]["pinout"]["hdd_led"]),
power_switch=int(config["atx"]["switches"]["pinout"]["power"]), power_switch=int(config["atx"]["pinout"]["power_switch"]),
reset_switch=int(config["atx"]["switches"]["pinout"]["reset"]), reset_switch=int(config["atx"]["pinout"]["reset_switch"]),
click_delay=float(config["atx"]["switches"]["click_delay"]), click_delay=float(config["atx"]["click_delay"]),
long_click_delay=float(config["atx"]["switches"]["long_click_delay"]), long_click_delay=float(config["atx"]["long_click_delay"]),
) )
msd = MassStorageDevice( msd = MassStorageDevice(
@ -40,10 +40,12 @@ def main() -> None:
) )
streamer = Streamer( streamer = Streamer(
cap_power=int(config["video"]["pinout"]["cap"]), cap_power=int(config["streamer"]["pinout"]["cap"]),
conv_power=int(config["video"]["pinout"]["conv"]), conv_power=int(config["streamer"]["pinout"]["conv"]),
sync_delay=float(config["video"]["sync_delay"]), sync_delay=float(config["streamer"]["sync_delay"]),
cmd=list(map(str, config["video"]["cmd"])), width=int(config["streamer"]["size"]["width"]),
height=int(config["streamer"]["size"]["height"]),
cmd=list(map(str, config["streamer"]["cmd"])),
loop=loop, loop=loop,
) )
@ -53,8 +55,8 @@ def main() -> None:
msd=msd, msd=msd,
streamer=streamer, streamer=streamer,
heartbeat=float(config["server"]["heartbeat"]), heartbeat=float(config["server"]["heartbeat"]),
atx_leds_poll=float(config["atx"]["leds"]["poll"]), atx_state_poll=float(config["atx"]["state_poll"]),
video_shutdown_delay=float(config["video"]["shutdown_delay"]), streamer_shutdown_delay=float(config["streamer"]["shutdown_delay"]),
msd_chunk_size=int(config["msd"]["chunk_size"]), msd_chunk_size=int(config["msd"]["chunk_size"]),
loop=loop, loop=loop,
).run( ).run(

View File

@ -1,6 +1,6 @@
import asyncio import asyncio
from typing import Tuple from typing import Dict
from .logging import get_logger from .logging import get_logger
@ -29,11 +29,13 @@ class Atx:
self.__lock = asyncio.Lock() self.__lock = asyncio.Lock()
def get_leds(self) -> Tuple[bool, bool]: def get_state(self) -> Dict:
return ( return {
not gpio.read(self.__power_led), "leds": {
not gpio.read(self.__hdd_led), "power": (not gpio.read(self.__power_led)),
) "hdd": (not gpio.read(self.__hdd_led)),
},
}
async def click_power(self) -> None: async def click_power(self) -> None:
if (await self.__click(self.__power_switch, self.__click_delay)): if (await self.__click(self.__power_switch, self.__click_delay)):

View File

@ -118,8 +118,12 @@ class MassStorageDevice:
get_logger().info("Using bind %r as mass-storage device", self._bind) get_logger().info("Using bind %r as mass-storage device", self._bind)
try: try:
loop.run_until_complete(self.connect_to_kvm(no_delay=True)) loop.run_until_complete(self.connect_to_kvm(no_delay=True))
except Exception: except Exception as err:
get_logger().exception("Mass-storage device is not operational") if isinstance(err, MassStorageError):
log = get_logger().warning
else:
log = get_logger().exception
log("Mass-storage device is not operational: %s", err)
self._bind = "" self._bind = ""
else: else:
get_logger().warning("Missing bind; mass-storage device is not operational") get_logger().warning("Missing bind; mass-storage device is not operational")
@ -133,7 +137,7 @@ class MassStorageDevice:
await asyncio.sleep(self.__init_delay) await asyncio.sleep(self.__init_delay)
path = locate_by_bind(self._bind) path = locate_by_bind(self._bind)
if not path: if not path:
raise RuntimeError("Can't locate device by bind %r" % (self._bind)) raise MassStorageError("Can't locate device by bind %r" % (self._bind))
self.__device_info = explore_device(path) self.__device_info = explore_device(path)
get_logger().info("Mass-storage device switched to KVM: %s", self.__device_info) get_logger().info("Mass-storage device switched to KVM: %s", self.__device_info)

View File

@ -1,9 +1,11 @@
import os import os
import signal import signal
import asyncio import asyncio
import json
import time import time
from typing import List from typing import List
from typing import Dict
from typing import Set from typing import Set
from typing import Callable from typing import Callable
from typing import Optional from typing import Optional
@ -38,19 +40,29 @@ def _system_task(method: Callable) -> Callable:
def _exceptions_as_400(msg: str, exceptions: List[Type[Exception]]) -> Callable: def _exceptions_as_400(msg: str, exceptions: List[Type[Exception]]) -> Callable:
def make_wrapper(method: Callable) -> Callable: def make_wrapper(method: Callable) -> Callable:
async def wrap(self: "Server", request: aiohttp.web.Request) -> aiohttp.web.WebSocketResponse: async def wrap(self: "Server", request: aiohttp.web.Request) -> aiohttp.web.Response:
try: try:
return (await method(self, request)) return (await method(self, request))
except tuple(exceptions) as err: # pylint: disable=catching-non-exception except tuple(exceptions) as err: # pylint: disable=catching-non-exception
get_logger().exception(msg) get_logger().exception(msg)
return aiohttp.web.json_response({ return aiohttp.web.json_response({
"error": type(err).__name__, "ok": False,
"error_msg": str(err), "result": {
"error": type(err).__name__,
"error_msg": str(err),
},
}, status=400) }, status=400)
return wrap return wrap
return make_wrapper return make_wrapper
def _json_200(result: Optional[Dict]=None) -> aiohttp.web.Response:
return aiohttp.web.json_response({
"ok": True,
"result": (result or {}),
})
class Server: # pylint: disable=too-many-instance-attributes class Server: # pylint: disable=too-many-instance-attributes
def __init__( def __init__(
self, self,
@ -59,8 +71,8 @@ class Server: # pylint: disable=too-many-instance-attributes
msd: MassStorageDevice, msd: MassStorageDevice,
streamer: Streamer, streamer: Streamer,
heartbeat: float, heartbeat: float,
atx_leds_poll: float, atx_state_poll: float,
video_shutdown_delay: float, streamer_shutdown_delay: float,
msd_chunk_size: int, msd_chunk_size: int,
loop: asyncio.AbstractEventLoop, loop: asyncio.AbstractEventLoop,
) -> None: ) -> None:
@ -70,8 +82,8 @@ class Server: # pylint: disable=too-many-instance-attributes
self.__msd = msd self.__msd = msd
self.__streamer = streamer self.__streamer = streamer
self.__heartbeat = heartbeat self.__heartbeat = heartbeat
self.__video_shutdown_delay = video_shutdown_delay self.__streamer_shutdown_delay = streamer_shutdown_delay
self.__atx_leds_poll = atx_leds_poll self.__atx_state_poll = atx_state_poll
self.__msd_chunk_size = msd_chunk_size self.__msd_chunk_size = msd_chunk_size
self.__loop = loop self.__loop = loop
@ -80,17 +92,25 @@ class Server: # pylint: disable=too-many-instance-attributes
self.__system_tasks: List[asyncio.Task] = [] self.__system_tasks: List[asyncio.Task] = []
self.__restart_video = False self.__reset_streamer = False
def run(self, host: str, port: int) -> None: def run(self, host: str, port: int) -> None:
self.__keyboard.start() self.__keyboard.start()
app = aiohttp.web.Application(loop=self.__loop) app = aiohttp.web.Application(loop=self.__loop)
app.router.add_get("/", self.__root_handler)
app.router.add_get("/ws", self.__ws_handler) app.router.add_get("/ws", self.__ws_handler)
app.router.add_get("/atx", self.__atx_state_handler)
app.router.add_post("/atx/click", self.__atx_click_handler)
app.router.add_get("/msd", self.__msd_state_handler) app.router.add_get("/msd", self.__msd_state_handler)
app.router.add_post("/msd/connect", self.__msd_connect_handler) app.router.add_post("/msd/connect", self.__msd_connect_handler)
app.router.add_post("/msd/write", self.__msd_write_handler) app.router.add_post("/msd/write", self.__msd_write_handler)
app.router.add_get("/streamer", self.__streamer_state_handler)
app.router.add_post("/streamer/reset", self.__streamer_reset_handler)
app.on_shutdown.append(self.__on_shutdown) app.on_shutdown.append(self.__on_shutdown)
app.on_cleanup.append(self.__on_cleanup) app.on_cleanup.append(self.__on_cleanup)
@ -98,44 +118,55 @@ class Server: # pylint: disable=too-many-instance-attributes
self.__loop.create_task(self.__keyboard_watchdog()), self.__loop.create_task(self.__keyboard_watchdog()),
self.__loop.create_task(self.__stream_controller()), self.__loop.create_task(self.__stream_controller()),
self.__loop.create_task(self.__poll_dead_sockets()), self.__loop.create_task(self.__poll_dead_sockets()),
self.__loop.create_task(self.__poll_atx_leds()), self.__loop.create_task(self.__poll_atx_state()),
]) ])
aiohttp.web.run_app(app, host=host, port=port, print=self.__run_app_print) aiohttp.web.run_app(app, host=host, port=port, print=self.__run_app_print)
async def __root_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
return aiohttp.web.Response(text="OK")
async def __ws_handler(self, request: aiohttp.web.Request) -> aiohttp.web.WebSocketResponse: async def __ws_handler(self, request: aiohttp.web.Request) -> aiohttp.web.WebSocketResponse:
ws = aiohttp.web.WebSocketResponse(heartbeat=self.__heartbeat) ws = aiohttp.web.WebSocketResponse(heartbeat=self.__heartbeat)
await ws.prepare(request) await ws.prepare(request)
await self.__register_socket(ws) await self.__register_socket(ws)
async for msg in ws: async for msg in ws:
if msg.type == aiohttp.web.WSMsgType.TEXT: if msg.type == aiohttp.web.WSMsgType.TEXT:
retval = await self.__execute_command(msg.data) await ws.send_str(json.dumps({"msg_type": "echo", "msg": msg.data}))
if retval:
await ws.send_str(retval)
else: else:
break break
return ws return ws
async def __atx_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
return _json_200(self.__atx.get_state())
@_exceptions_as_400("Click error", [RuntimeError])
async def __atx_click_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
button = request.query.get("button")
if button == "power":
await self.__atx.click_power()
elif button == "power_long":
await self.__atx.click_power_long()
elif button == "reset":
await self.__atx.click_reset()
else:
raise RuntimeError("Missing or invalid 'button=%s'" % (button))
return _json_200({"clicked": button})
async def __msd_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: async def __msd_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
return aiohttp.web.json_response(self.__msd.get_state()) return _json_200(self.__msd.get_state())
@_exceptions_as_400("Mass-storage error", [MassStorageError, RuntimeError]) @_exceptions_as_400("Mass-storage error", [MassStorageError, RuntimeError])
async def __msd_connect_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response: async def __msd_connect_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
to = request.query.get("to") to = request.query.get("to")
if to == "kvm": if to == "kvm":
await self.__msd.connect_to_kvm() await self.__msd.connect_to_kvm()
await self.__broadcast("EVENT msd connected_to_kvm") await self.__broadcast_event("msd_state", state="connected_to_kvm") # type: ignore
elif to == "pc": elif to == "pc":
await self.__msd.connect_to_pc() await self.__msd.connect_to_pc()
await self.__broadcast("EVENT msd connected_to_pc") await self.__broadcast_event("msd_state", state="connected_to_pc") # type: ignore
else: else:
raise RuntimeError("Missing or invalid 'to=%s'" % (to)) raise RuntimeError("Missing or invalid 'to=%s'" % (to))
return aiohttp.web.json_response(self.__msd.get_state()) return _json_200(self.__msd.get_state())
@_exceptions_as_400("Can't write image to mass-storage device", [MassStorageError, RuntimeError, OSError]) @_exceptions_as_400("Can't write data to mass-storage device", [MassStorageError, RuntimeError, OSError])
async def __msd_write_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response: async def __msd_write_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
logger = get_logger(0) logger = get_logger(0)
reader = await request.multipart() reader = await request.multipart()
@ -146,18 +177,25 @@ class Server: # pylint: disable=too-many-instance-attributes
raise RuntimeError("Missing 'data' field") raise RuntimeError("Missing 'data' field")
async with self.__msd: async with self.__msd:
await self.__broadcast("EVENT msd busy") await self.__broadcast_event("msd_state", state="busy") # type: ignore
logger.info("Writing image to mass-storage device ...") logger.info("Writing image to mass-storage device ...")
while True: while True:
chunk = await field.read_chunk(self.__msd_chunk_size) chunk = await field.read_chunk(self.__msd_chunk_size)
if not chunk: if not chunk:
break break
writed = await self.__msd.write(chunk) writed = await self.__msd.write(chunk)
await self.__broadcast("EVENT msd free") await self.__broadcast_event("msd_state", state="free") # type: ignore
finally: finally:
if writed != 0: if writed != 0:
logger.info("Writed %d bytes to mass-storage device", writed) logger.info("Writed %d bytes to mass-storage device", writed)
return aiohttp.web.json_response({"writed": writed}) return _json_200({"writed": writed})
async def __streamer_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
return _json_200(self.__streamer.get_state())
async def __streamer_reset_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
self.__reset_streamer = True
return _json_200()
def __run_app_print(self, text: str) -> None: def __run_app_print(self, text: str) -> None:
logger = get_logger() logger = get_logger()
@ -198,16 +236,16 @@ class Server: # pylint: disable=too-many-instance-attributes
if not self.__streamer.is_running(): if not self.__streamer.is_running():
await self.__streamer.start() await self.__streamer.start()
elif prev > 0 and cur == 0: elif prev > 0 and cur == 0:
shutdown_at = time.time() + self.__video_shutdown_delay shutdown_at = time.time() + self.__streamer_shutdown_delay
elif prev == 0 and cur == 0 and time.time() > shutdown_at: elif prev == 0 and cur == 0 and time.time() > shutdown_at:
if self.__streamer.is_running(): if self.__streamer.is_running():
await self.__streamer.stop() await self.__streamer.stop()
if self.__restart_video: if self.__reset_streamer:
if self.__streamer.is_running(): if self.__streamer.is_running():
await self.__streamer.stop() await self.__streamer.stop()
await self.__streamer.start() await self.__streamer.start()
self.__restart_video = False self.__reset_streamer = False
prev = cur prev = cur
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
@ -221,36 +259,25 @@ class Server: # pylint: disable=too-many-instance-attributes
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
@_system_task @_system_task
async def __poll_atx_leds(self) -> None: async def __poll_atx_state(self) -> None:
while True: while True:
if self.__sockets: if self.__sockets:
await self.__broadcast("EVENT atx_leds %d %d" % (self.__atx.get_leds())) await self.__broadcast_event("atx_state", **self.__atx.get_state())
await asyncio.sleep(self.__atx_leds_poll) await asyncio.sleep(self.__atx_state_poll)
async def __broadcast(self, msg: str) -> None: async def __broadcast_event(self, event: str, **kwargs: Dict) -> None:
await asyncio.gather(*[ await asyncio.gather(*[
ws.send_str(msg) ws.send_str(json.dumps({
"msg_type": "event",
"msg": {
"event": event,
"event_attrs": kwargs,
},
}))
for ws in list(self.__sockets) for ws in list(self.__sockets)
if not ws.closed and ws._req.transport # pylint: disable=protected-access if not ws.closed and ws._req.transport # pylint: disable=protected-access
], return_exceptions=True) ], return_exceptions=True)
async def __execute_command(self, command: str) -> Optional[str]:
(command, args) = (command.strip().split(" ", maxsplit=1) + [""])[:2]
if command == "CLICK":
method = {
"power": self.__atx.click_power,
"power_long": self.__atx.click_power_long,
"reset": self.__atx.click_reset,
}.get(args)
if method:
await method()
return None
elif command == "RESTART_VIDEO":
self.__restart_video = True
return None
get_logger().warning("Received an incorrect command: %r", command)
return "ERROR incorrect command"
async def __register_socket(self, ws: aiohttp.web.WebSocketResponse) -> None: async def __register_socket(self, ws: aiohttp.web.WebSocketResponse) -> None:
async with self.__sockets_lock: async with self.__sockets_lock:
self.__sockets.add(ws) self.__sockets.add(ws)

View File

@ -2,6 +2,7 @@ import asyncio
import asyncio.subprocess import asyncio.subprocess
from typing import List from typing import List
from typing import Dict
from typing import Optional from typing import Optional
from .logging import get_logger from .logging import get_logger
@ -16,6 +17,8 @@ class Streamer: # pylint: disable=too-many-instance-attributes
cap_power: int, cap_power: int,
conv_power: int, conv_power: int,
sync_delay: float, sync_delay: float,
width: int,
height: int,
cmd: List[str], cmd: List[str],
loop: asyncio.AbstractEventLoop, loop: asyncio.AbstractEventLoop,
) -> None: ) -> None:
@ -25,7 +28,9 @@ class Streamer: # pylint: disable=too-many-instance-attributes
self.__cap_power = gpio.set_output(cap_power) self.__cap_power = gpio.set_output(cap_power)
self.__conv_power = (gpio.set_output(conv_power) if conv_power > 0 else conv_power) self.__conv_power = (gpio.set_output(conv_power) if conv_power > 0 else conv_power)
self.__sync_delay = sync_delay self.__sync_delay = sync_delay
self.__cmd = cmd self.__width = width
self.__height = height
self.__cmd = [part.format(width=width, height=height) for part in cmd]
self.__loop = loop self.__loop = loop
@ -48,6 +53,15 @@ class Streamer: # pylint: disable=too-many-instance-attributes
def is_running(self) -> bool: def is_running(self) -> bool:
return bool(self.__proc_task) return bool(self.__proc_task)
def get_state(self) -> Dict:
return {
"is_running": self.is_running(),
"size": {
"width": self.__width,
"height": self.__height,
},
}
async def cleanup(self) -> None: async def cleanup(self) -> None:
if self.is_running(): if self.is_running():
await self.stop() await self.stop()

View File

@ -8,21 +8,20 @@ kvmd:
pinout: pinout:
clock: 17 clock: 17
data: 4 data: 4
pulse: 0.0002 pulse: 0.0002
atx: atx:
leds: pinout:
pinout: power_led: 16
power: 16 hdd_led: 12
hdd: 12 power_switch: 26
poll: 0.1 reset_switch: 20
switches: click_delay: 0.1
pinout: long_click_delay: 5.5
power: 26
reset: 20 state_poll: 0.1
click_delay: 0.1
long_click_delay: 5.5
msd: msd:
# FIXME: It's for laptop lol # FIXME: It's for laptop lol
@ -30,17 +29,21 @@ kvmd:
init_delay: 2.0 init_delay: 2.0
chunk_size: 8192 chunk_size: 8192
video: streamer:
pinout: pinout:
cap: 21 cap: 21
conv: 25 conv: 25
sync_delay: 1.0 sync_delay: 1.0
shutdown_delay: 10.0 shutdown_delay: 10.0
size:
width: 720
height: 576
cmd: cmd:
- "/usr/bin/mjpg_streamer" - "/usr/bin/mjpg_streamer"
- "-i" - "-i"
- "input_uvc.so -d /dev/video0 -e 2 -y -n -r 720x576" - "input_uvc.so -d /dev/video0 -e 2 -y -n -r {width}x{height}"
- "-o" - "-o"
- "output_http.so -l localhost -p 8082" - "output_http.so -l localhost -p 8082"