new snapshot api

This commit is contained in:
Devaev Maxim 2020-05-29 19:49:47 +03:00
parent a5fcafe2a5
commit 81fec121d0
6 changed files with 202 additions and 27 deletions

View File

@ -0,0 +1,110 @@
# ========================================================================== #
# #
# KVMD - The main Pi-KVM daemon. #
# #
# Copyright (C) 2018 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
# ========================================================================== #
import io
import functools
from aiohttp.web import Request
from aiohttp.web import Response
from PIL import Image
from ....validators.basic import valid_bool
from ....validators.basic import valid_int_f0
from ....validators.kvm import valid_stream_quality
from .... import aiotools
from ..http import UnavailableError
from ..http import exposed_http
from ..http import make_json_response
from ..streamer import StreamerSnapshot
from ..streamer import Streamer
# =====
class StreamerApi:
def __init__(self, streamer: Streamer) -> None:
self.__streamer = streamer
# =====
@exposed_http("GET", "/streamer")
async def __state_handler(self, _: Request) -> Response:
return make_json_response(await self.__streamer.get_state())
@exposed_http("GET", "/streamer/snapshot")
async def __make_snapshot_handler(self, request: Request) -> Response:
if (snapshot := await self.__streamer.make_snapshot(
save=valid_bool(request.query.get("save", "false")),
load=valid_bool(request.query.get("load", "false")),
allow_offline=valid_bool(request.query.get("allow_offline", "false")),
)):
if valid_bool(request.query.get("preview", "false")):
data = await self.__make_preview(
snapshot=snapshot,
max_width=valid_int_f0(request.query.get("preview_max_width", "0")),
max_height=valid_int_f0(request.query.get("preview_max_height", "0")),
quality=valid_stream_quality(request.query.get("preview_quality", "80")),
)
else:
data = snapshot.data
return Response(
body=data,
headers=dict(snapshot.headers),
content_type="image/jpeg",
)
raise UnavailableError()
@exposed_http("DELETE", "/streamer/snapshot")
async def __remove_snapshot_handler(self, _: Request) -> Response:
self.__streamer.remove_snapshot()
return make_json_response()
# =====
async def __make_preview(self, snapshot: StreamerSnapshot, max_width: int, max_height: int, quality: int) -> bytes:
if max_width == 0 and max_height == 0:
max_width = snapshot.width // 5
max_height = snapshot.height // 5
else:
max_width = min((max_width or snapshot.width), snapshot.width)
max_height = min((max_height or snapshot.height), snapshot.height)
if max_width == snapshot.width and max_height == snapshot.height:
return snapshot.data
else:
return (await aiotools.run_async(self.__inner_make_preview, snapshot, max_width, max_height, quality))
@functools.lru_cache(maxsize=1)
def __inner_make_preview(self, snapshot: StreamerSnapshot, max_width: int, max_height: int, quality: int) -> bytes:
assert 0 < max_width <= snapshot.width
assert 0 < max_height <= snapshot.height
assert not (max_width == snapshot.width and max_height == snapshot.height)
with io.BytesIO(snapshot.data) as snapshot_bio:
with io.BytesIO() as preview_bio:
with Image.open(snapshot_bio) as image:
image.thumbnail((max_width, max_height), Image.ANTIALIAS)
image.save(preview_bio, format="jpeg", quality=quality)
return preview_bio.getvalue()

View File

@ -39,6 +39,11 @@ class ForbiddenError(HttpError):
super().__init__("Forbidden", 403) super().__init__("Forbidden", 403)
class UnavailableError(HttpError):
def __init__(self) -> None:
super().__init__("Service Unavailable", 503)
# ===== # =====
@dataclasses.dataclass(frozen=True) @dataclasses.dataclass(frozen=True)
class HttpExposed: class HttpExposed:

View File

@ -83,6 +83,7 @@ from .api.wol import WolApi
from .api.hid import HidApi from .api.hid import HidApi
from .api.atx import AtxApi from .api.atx import AtxApi
from .api.msd import MsdApi from .api.msd import MsdApi
from .api.streamer import StreamerApi
# ===== # =====
@ -133,6 +134,7 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
HidApi(hid, keymap_path), HidApi(hid, keymap_path),
AtxApi(atx), AtxApi(atx),
MsdApi(msd, sync_chunk_size), MsdApi(msd, sync_chunk_size),
StreamerApi(streamer),
] ]
self.__ws_handlers: Dict[str, Callable] = {} self.__ws_handlers: Dict[str, Callable] = {}
@ -164,11 +166,7 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
async def __info_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: async def __info_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
return make_json_response(await self.__make_info()) return make_json_response(await self.__make_info())
# ===== STREAMER # ===== STREAMER CONTROLLER
@exposed_http("GET", "/streamer")
async def __streamer_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
return make_json_response(await self.__streamer.get_state())
@exposed_http("POST", "/streamer/set_params") @exposed_http("POST", "/streamer/set_params")
async def __streamer_set_params_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response: async def __streamer_set_params_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:

View File

@ -24,7 +24,10 @@ import os
import signal import signal
import asyncio import asyncio
import asyncio.subprocess import asyncio.subprocess
import dataclasses
import operator
from typing import Tuple
from typing import List from typing import List
from typing import Dict from typing import Dict
from typing import AsyncGenerator from typing import AsyncGenerator
@ -42,6 +45,16 @@ from ... import gpio
# ===== # =====
@dataclasses.dataclass(frozen=True)
class StreamerSnapshot:
online: bool
width: int
height: int
mtime: float
headers: Tuple[Tuple[str, str], ...]
data: bytes
class Streamer: # pylint: disable=too-many-instance-attributes class Streamer: # pylint: disable=too-many-instance-attributes
def __init__( # pylint: disable=too-many-arguments,too-many-locals def __init__( # pylint: disable=too-many-arguments,too-many-locals
self, self,
@ -101,6 +114,10 @@ class Streamer: # pylint: disable=too-many-instance-attributes
self.__http_session: Optional[aiohttp.ClientSession] = None self.__http_session: Optional[aiohttp.ClientSession] = None
self.__snapshot: Optional[StreamerSnapshot] = None
self.__state_notifier = aiotools.AioNotifier()
# ===== # =====
@aiotools.atomic @aiotools.atomic
@ -163,6 +180,8 @@ class Streamer: # pylint: disable=too-many-instance-attributes
# Запущено и не планирует останавливаться # Запущено и не планирует останавливаться
return bool(self.__streamer_task and not self.__stop_task) return bool(self.__streamer_task and not self.__stop_task)
# =====
def set_params(self, params: Dict) -> None: def set_params(self, params: Dict) -> None:
assert not self.__streamer_task assert not self.__streamer_task
self.__params = { self.__params = {
@ -176,6 +195,8 @@ class Streamer: # pylint: disable=too-many-instance-attributes
def get_params(self) -> Dict: def get_params(self) -> Dict:
return dict(self.__params) return dict(self.__params)
# =====
async def get_state(self) -> Dict: async def get_state(self) -> Dict:
state = None state = None
if self.__streamer_task: if self.__streamer_task:
@ -188,18 +209,24 @@ class Streamer: # pylint: disable=too-many-instance-attributes
pass pass
except Exception: except Exception:
get_logger().exception("Invalid streamer response from /state") get_logger().exception("Invalid streamer response from /state")
snapshot: Optional[Dict] = None
if self.__snapshot:
snapshot = dataclasses.asdict(self.__snapshot)
del snapshot["headers"]
del snapshot["data"]
return { return {
"limits": {"max_fps": self.__max_fps}, "limits": {"max_fps": self.__max_fps},
"params": self.__params, "params": self.__params,
"snapshot": {"saved": snapshot},
"state": state, "state": state,
} }
async def poll_state(self) -> AsyncGenerator[Dict, None]: async def poll_state(self) -> AsyncGenerator[Dict, None]:
notifier = aiotools.AioNotifier()
def signal_handler(*_: Any) -> None: def signal_handler(*_: Any) -> None:
get_logger(0).info("Got SIGUSR2, checking the stream state ...") get_logger(0).info("Got SIGUSR2, checking the stream state ...")
asyncio.ensure_future(notifier.notify()) asyncio.ensure_future(self.__state_notifier.notify())
get_logger(0).info("Installing SIGUSR2 streamer handler ...") get_logger(0).info("Installing SIGUSR2 streamer handler ...")
asyncio.get_event_loop().add_signal_handler(signal.SIGUSR2, signal_handler) asyncio.get_event_loop().add_signal_handler(signal.SIGUSR2, signal_handler)
@ -213,10 +240,12 @@ class Streamer: # pylint: disable=too-many-instance-attributes
prev_state = state prev_state = state
if waiter_task is None: if waiter_task is None:
waiter_task = asyncio.create_task(notifier.wait()) waiter_task = asyncio.create_task(self.__state_notifier.wait())
if waiter_task in (await aiotools.wait_first(asyncio.sleep(self.__state_poll), waiter_task))[0]: if waiter_task in (await aiotools.wait_first(asyncio.sleep(self.__state_poll), waiter_task))[0]:
waiter_task = None waiter_task = None
# =====
async def get_info(self) -> Dict: async def get_info(self) -> Dict:
version = (await aioproc.read_process([self.__cmd[0], "--version"], err_to_null=True))[1] version = (await aioproc.read_process([self.__cmd[0], "--version"], err_to_null=True))[1]
return { return {
@ -224,6 +253,49 @@ class Streamer: # pylint: disable=too-many-instance-attributes
"version": version, "version": version,
} }
async def make_snapshot(self, save: bool, load: bool, allow_offline: bool) -> Optional[StreamerSnapshot]:
if load:
return self.__snapshot
else:
session = self.__ensure_http_session()
try:
async with session.get(self.__make_url("snapshot")) as response:
htclient.raise_not_200(response)
online = (response.headers["X-UStreamer-Online"] == "true")
if online or allow_offline:
snapshot = StreamerSnapshot(
online=online,
width=int(response.headers["X-UStreamer-Width"]),
height=int(response.headers["X-UStreamer-Height"]),
mtime=float(response.headers["X-Timestamp"]),
headers=tuple(
(key, value)
for (key, value) in sorted(response.headers.items(), key=operator.itemgetter(0))
if key.lower().startswith("x-ustreamer-") or key.lower() in [
"x-timestamp",
"access-control-allow-origin",
"cache-control",
"pragma",
"expires",
]
),
data=bytes(await response.read()),
)
if save:
self.__snapshot = snapshot
await self.__state_notifier.notify()
return snapshot
except (aiohttp.ClientConnectionError, aiohttp.ServerConnectionError):
pass
except Exception:
get_logger().exception("Invalid streamer response from /snapshot")
return None
def remove_snapshot(self) -> None:
self.__snapshot = None
# =====
@aiotools.atomic @aiotools.atomic
async def cleanup(self) -> None: async def cleanup(self) -> None:
try: try:

View File

@ -54,7 +54,7 @@ class StreamerClient:
async def read_stream(self) -> AsyncGenerator[Tuple[bool, int, int, bytes], None]: async def read_stream(self) -> AsyncGenerator[Tuple[bool, int, int, bytes], None]:
try: try:
async with self.__make_http_session(infinite=True) as session: async with self.__make_http_session() as session:
async with session.get( async with session.get(
url=self.__make_url("stream"), url=self.__make_url("stream"),
params={"extra_headers": "1"}, params={"extra_headers": "1"},
@ -84,24 +84,14 @@ class StreamerClient:
raise StreamerError(f"{type(err).__name__}: {err}") raise StreamerError(f"{type(err).__name__}: {err}")
raise StreamerError("Reached EOF") raise StreamerError("Reached EOF")
# async def get_snapshot(self) -> Tuple[bool, bytes]: def __make_http_session(self) -> aiohttp.ClientSession:
# async with self.__make_http_session(infinite=False) as session: kwargs: Dict = {
# async with session.get(self.__make_url("snapshot")) as response: "headers": {"User-Agent": self.__user_agent},
# htclient.raise_not_200(response) "timeout": aiohttp.ClientTimeout(
# return (
# (response.headers["X-UStreamer-Online"] == "true"),
# bytes(await response.read()),
# )
def __make_http_session(self, infinite: bool) -> aiohttp.ClientSession:
kwargs: Dict = {"headers": {"User-Agent": self.__user_agent}}
if infinite:
kwargs["timeout"] = aiohttp.ClientTimeout(
connect=self.__timeout, connect=self.__timeout,
sock_read=self.__timeout, sock_read=self.__timeout,
) ),
else: }
kwargs["timeout"] = aiohttp.ClientTimeout(total=self.__timeout)
if self.__unix_path: if self.__unix_path:
kwargs["connector"] = aiohttp.UnixConnector(path=self.__unix_path) kwargs["connector"] = aiohttp.UnixConnector(path=self.__unix_path)
return aiohttp.ClientSession(**kwargs) return aiohttp.ClientSession(**kwargs)

View File

@ -193,7 +193,7 @@ export function Streamer() {
var __clickScreenshotButton = function() { var __clickScreenshotButton = function() {
let el_a = document.createElement("a"); let el_a = document.createElement("a");
el_a.href = "/streamer/snapshot"; el_a.href = "/api/streamer/snapshot?allow_offline=1";
el_a.target = "_blank"; el_a.target = "_blank";
document.body.appendChild(el_a); document.body.appendChild(el_a);
el_a.click(); el_a.click();