usefull stuff for kvmd

This commit is contained in:
Devaev Maxim 2018-06-27 23:24:50 +03:00
parent 4804aa53b5
commit 3fefb0d519
5 changed files with 77 additions and 22 deletions

View File

@ -60,9 +60,10 @@ class _Application:
vga_power=self.__config["video"]["pinout"]["vga"],
sync_delay=self.__config["video"]["sync_delay"],
mjpg_streamer=self.__config["video"]["mjpg_streamer"],
loop=self.__loop,
)
self.__system_futures: List[asyncio.Future] = []
self.__system_tasks: List[asyncio.Task] = []
def run(self) -> None:
app = aiohttp.web.Application(loop=self.__loop)
@ -71,10 +72,10 @@ class _Application:
app.on_shutdown.append(self.__on_shutdown)
app.on_cleanup.append(self.__on_cleanup)
self.__system_futures.extend([
asyncio.ensure_future(self.__poll_dead_sockets(), loop=self.__loop),
asyncio.ensure_future(self.__poll_atx_leds(), loop=self.__loop),
asyncio.ensure_future(self.__poll_streamer_events(), loop=self.__loop),
self.__system_tasks.extend([
self.__loop.create_task(self.__poll_dead_sockets()),
self.__loop.create_task(self.__poll_atx_leds()),
self.__loop.create_task(self.__poll_streamer_events()),
])
aiohttp.web.run_app(
@ -109,9 +110,9 @@ class _Application:
logger = get_logger()
logger.info("Cancelling tasks ...")
for future in self.__system_futures:
future.cancel()
await asyncio.gather(*self.__system_futures)
for task in self.__system_tasks:
task.cancel()
await asyncio.gather(*self.__system_tasks)
logger.info("Cleaning up GPIO ...")
GPIO.cleanup()

View File

@ -11,13 +11,14 @@ from RPi import GPIO
# =====
class Streamer:
class Streamer: # pylint: disable=too-many-instance-attributes
def __init__(
self,
cap_power: int,
vga_power: int,
sync_delay: float,
mjpg_streamer: Dict,
loop: asyncio.AbstractEventLoop,
) -> None:
self.__cap_power = self.__set_output_pin(cap_power)
@ -30,9 +31,11 @@ class Streamer:
" -o 'output_http.so -p -l %(host)s %(port)s'"
) % (mjpg_streamer)
self.__loop = loop
self.__lock = asyncio.Lock()
self.__events_queue: asyncio.Queue = asyncio.Queue()
self.__proc_future: Optional[asyncio.Future] = None
self.__proc_task: Optional[asyncio.Task] = None
def __set_output_pin(self, pin: int) -> int:
GPIO.setup(pin, GPIO.OUT)
@ -46,18 +49,18 @@ class Streamer:
async def start(self) -> None:
async with self.__lock:
get_logger().info("Starting mjpg_streamer ...")
assert not self.__proc_future
assert not self.__proc_task
await self.__set_hw_enabled(True)
self.__proc_future = asyncio.ensure_future(self.__process(), loop=asyncio.get_event_loop())
self.__proc_task = self.__loop.create_task(self.__process())
async def stop(self) -> None:
async with self.__lock:
get_logger().info("Stopping mjpg_streamer ...")
if self.__proc_future:
self.__proc_future.cancel()
await asyncio.gather(self.__proc_future, return_exceptions=True)
if self.__proc_task:
self.__proc_task.cancel()
await asyncio.gather(self.__proc_task, return_exceptions=True)
await self.__set_hw_enabled(False)
self.__proc_future = None
self.__proc_task = None
await self.__events_queue.put("mjpg_streamer stopped")
async def __set_hw_enabled(self, enabled: bool) -> None:

View File

@ -2,5 +2,3 @@ RPi.GPIO
aiohttp
contextlog
pyyaml
bumpversion
tox

View File

@ -6,26 +6,26 @@ skipsdist = True
basepython = python3.6
[testenv:flake8]
commands = flake8 kvmd
commands = flake8 kvmd wscli.py
deps =
flake8
flake8-double-quotes
-rdev_requirements.txt
[testenv:pylint]
commands = pylint --output-format=colorized --reports=no kvmd
commands = pylint --output-format=colorized --reports=no kvmd wscli.py
deps =
pylint
-rdev_requirements.txt
[testenv:mypy]
commands = mypy kvmd
commands = mypy kvmd wscli.py
deps =
mypy
-rdev_requirements.txt
[testenv:vulture]
commands = vulture kvmd
commands = vulture kvmd wscli.py
deps =
vulture
-rdev_requirements.txt

53
kvmd/wscli.py Executable file
View File

@ -0,0 +1,53 @@
#!/usr/bin/env python3
import sys
import signal
import asyncio
import argparse
import aiohttp
# =====
async def _run_client(loop: asyncio.AbstractEventLoop, url: str) -> None:
def stdin_callback() -> None:
line = sys.stdin.buffer.readline().decode()
if line:
asyncio.ensure_future(ws.send_str(line), loop=loop)
else:
loop.stop()
loop.add_reader(sys.stdin.fileno(), stdin_callback)
async def dispatch() -> None:
while True:
msg = await ws.receive()
if msg.type == aiohttp.WSMsgType.TEXT:
print("Received:", msg.data.strip())
else:
if msg.type == aiohttp.WSMsgType.CLOSE:
await ws.close()
elif msg.type == aiohttp.WSMsgType.ERROR:
print("Error during receive:", ws.exception())
elif msg.type == aiohttp.WSMsgType.CLOSED:
pass
break
async with aiohttp.ClientSession().ws_connect(url) as ws:
await dispatch()
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("-u", "--url", default="http://localhost:8080")
options = parser.parse_args()
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, loop.stop)
loop.create_task(_run_client(loop, options.url))
loop.run_forever()
if __name__ == "__main__":
main()