mirror of
https://github.com/mofeng-git/One-KVM.git
synced 2026-01-29 00:51:53 +08:00
kvmd-media server
This commit is contained in:
@@ -509,6 +509,32 @@ def _get_config_scheme() -> dict:
|
||||
},
|
||||
},
|
||||
|
||||
"media": {
|
||||
"server": {
|
||||
"unix": Option("/run/kvmd/media.sock", type=valid_abs_path, unpack_as="unix_path"),
|
||||
"unix_rm": Option(True, type=valid_bool),
|
||||
"unix_mode": Option(0o660, type=valid_unix_mode),
|
||||
"heartbeat": Option(15.0, type=valid_float_f01),
|
||||
"access_log_format": Option("[%P / %{X-Real-IP}i] '%r' => %s; size=%b ---"
|
||||
" referer='%{Referer}i'; user_agent='%{User-Agent}i'"),
|
||||
},
|
||||
|
||||
"memsink": {
|
||||
"jpeg": {
|
||||
"sink": Option("", unpack_as="obj"),
|
||||
"lock_timeout": Option(1.0, type=valid_float_f01),
|
||||
"wait_timeout": Option(1.0, type=valid_float_f01),
|
||||
"drop_same_frames": Option(0.0, type=valid_float_f0),
|
||||
},
|
||||
"h264": {
|
||||
"sink": Option("", unpack_as="obj"),
|
||||
"lock_timeout": Option(1.0, type=valid_float_f01),
|
||||
"wait_timeout": Option(1.0, type=valid_float_f01),
|
||||
"drop_same_frames": Option(0.0, type=valid_float_f0),
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
"pst": {
|
||||
"server": {
|
||||
"unix": Option("/run/kvmd/pst.sock", type=valid_abs_path, unpack_as="unix_path"),
|
||||
|
||||
@@ -298,10 +298,10 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
|
||||
logger.exception("Cleanup error on %s", sub.name)
|
||||
logger.info("On-Cleanup complete")
|
||||
|
||||
async def _on_ws_opened(self) -> None:
|
||||
async def _on_ws_opened(self, _: WsSession) -> None:
|
||||
self.__streamer_notifier.notify()
|
||||
|
||||
async def _on_ws_closed(self) -> None:
|
||||
async def _on_ws_closed(self, _: WsSession) -> None:
|
||||
self.__hid.clear_events()
|
||||
self.__streamer_notifier.notify()
|
||||
|
||||
|
||||
48
kvmd/apps/media/__init__.py
Normal file
48
kvmd/apps/media/__init__.py
Normal file
@@ -0,0 +1,48 @@
|
||||
# ========================================================================== #
|
||||
# #
|
||||
# KVMD - The main PiKVM daemon. #
|
||||
# #
|
||||
# Copyright (C) 2020 Maxim Devaev <mdevaev@gmail.com> #
|
||||
# #
|
||||
# This program is free software: you can redistribute it and/or modify #
|
||||
# it under the terms of the GNU General Public License as published by #
|
||||
# the Free Software Foundation, either version 3 of the License, or #
|
||||
# (at your option) any later version. #
|
||||
# #
|
||||
# This program is distributed in the hope that it will be useful, #
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
|
||||
# GNU General Public License for more details. #
|
||||
# #
|
||||
# You should have received a copy of the GNU General Public License #
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
|
||||
# #
|
||||
# ========================================================================== #
|
||||
|
||||
|
||||
from ...clients.streamer import StreamerFormats
|
||||
from ...clients.streamer import MemsinkStreamerClient
|
||||
|
||||
from .. import init
|
||||
|
||||
from .server import MediaServer
|
||||
|
||||
|
||||
# =====
|
||||
def main(argv: (list[str] | None)=None) -> None:
|
||||
config = init(
|
||||
prog="kvmd-media",
|
||||
description="The media proxy",
|
||||
check_run=True,
|
||||
argv=argv,
|
||||
)[2].media
|
||||
|
||||
def make_streamer(name: str, fmt: int) -> (MemsinkStreamerClient | None):
|
||||
if getattr(config.memsink, name).sink:
|
||||
return MemsinkStreamerClient(name.upper(), fmt, **getattr(config.memsink, name)._unpack())
|
||||
return None
|
||||
|
||||
MediaServer(
|
||||
h264_streamer=make_streamer("h264", StreamerFormats.H264),
|
||||
jpeg_streamer=make_streamer("jpeg", StreamerFormats.JPEG),
|
||||
).run(**config.server._unpack())
|
||||
24
kvmd/apps/media/__main__.py
Normal file
24
kvmd/apps/media/__main__.py
Normal file
@@ -0,0 +1,24 @@
|
||||
# ========================================================================== #
|
||||
# #
|
||||
# KVMD - The main PiKVM daemon. #
|
||||
# #
|
||||
# Copyright (C) 2020 Maxim Devaev <mdevaev@gmail.com> #
|
||||
# #
|
||||
# This program is free software: you can redistribute it and/or modify #
|
||||
# it under the terms of the GNU General Public License as published by #
|
||||
# the Free Software Foundation, either version 3 of the License, or #
|
||||
# (at your option) any later version. #
|
||||
# #
|
||||
# This program is distributed in the hope that it will be useful, #
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
|
||||
# GNU General Public License for more details. #
|
||||
# #
|
||||
# You should have received a copy of the GNU General Public License #
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
|
||||
# #
|
||||
# ========================================================================== #
|
||||
|
||||
|
||||
from . import main
|
||||
main()
|
||||
190
kvmd/apps/media/server.py
Normal file
190
kvmd/apps/media/server.py
Normal file
@@ -0,0 +1,190 @@
|
||||
# ========================================================================== #
|
||||
# #
|
||||
# KVMD - The main PiKVM daemon. #
|
||||
# #
|
||||
# Copyright (C) 2020 Maxim Devaev <mdevaev@gmail.com> #
|
||||
# #
|
||||
# This program is free software: you can redistribute it and/or modify #
|
||||
# it under the terms of the GNU General Public License as published by #
|
||||
# the Free Software Foundation, either version 3 of the License, or #
|
||||
# (at your option) any later version. #
|
||||
# #
|
||||
# This program is distributed in the hope that it will be useful, #
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
|
||||
# GNU General Public License for more details. #
|
||||
# #
|
||||
# You should have received a copy of the GNU General Public License #
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
|
||||
# #
|
||||
# ========================================================================== #
|
||||
|
||||
|
||||
import asyncio
|
||||
import dataclasses
|
||||
|
||||
from aiohttp.web import Request
|
||||
from aiohttp.web import WebSocketResponse
|
||||
|
||||
from ...logging import get_logger
|
||||
|
||||
from ... import tools
|
||||
from ... import aiotools
|
||||
|
||||
from ...htserver import exposed_http
|
||||
from ...htserver import exposed_ws
|
||||
from ...htserver import WsSession
|
||||
from ...htserver import HttpServer
|
||||
|
||||
from ...clients.streamer import StreamerError
|
||||
from ...clients.streamer import StreamerPermError
|
||||
from ...clients.streamer import StreamerFormats
|
||||
from ...clients.streamer import BaseStreamerClient
|
||||
|
||||
|
||||
# =====
|
||||
@dataclasses.dataclass
|
||||
class _Source:
|
||||
kind: str
|
||||
fmt: str
|
||||
streamer: BaseStreamerClient
|
||||
meta: dict = dataclasses.field(default_factory=dict)
|
||||
clients: dict[WsSession, "_Client"] = dataclasses.field(default_factory=dict)
|
||||
key_required: bool = dataclasses.field(default=False)
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class _Client:
|
||||
ws: WsSession
|
||||
src: _Source
|
||||
queue: asyncio.Queue[dict]
|
||||
sender: (asyncio.Task | None) = dataclasses.field(default=None)
|
||||
|
||||
|
||||
class MediaServer(HttpServer):
|
||||
__K_VIDEO = "video"
|
||||
|
||||
__F_H264 = "h264"
|
||||
__F_JPEG = "jpeg"
|
||||
|
||||
__Q_SIZE = 32
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
h264_streamer: (BaseStreamerClient | None),
|
||||
jpeg_streamer: (BaseStreamerClient | None),
|
||||
) -> None:
|
||||
|
||||
super().__init__()
|
||||
|
||||
self.__srcs: list[_Source] = []
|
||||
if h264_streamer:
|
||||
self.__srcs.append(_Source(self.__K_VIDEO, self.__F_H264, h264_streamer, {"profile_level_id": "42E01F"}))
|
||||
if jpeg_streamer:
|
||||
self.__srcs.append(_Source(self.__K_VIDEO, self.__F_JPEG, jpeg_streamer))
|
||||
|
||||
# =====
|
||||
|
||||
@exposed_http("GET", "/ws")
|
||||
async def __ws_handler(self, req: Request) -> WebSocketResponse:
|
||||
async with self._ws_session(req) as ws:
|
||||
media: dict = {self.__K_VIDEO: {}}
|
||||
for src in self.__srcs:
|
||||
media[src.kind][src.fmt] = src.meta
|
||||
await ws.send_event("media", media)
|
||||
return (await self._ws_loop(ws))
|
||||
|
||||
@exposed_ws(0)
|
||||
async def __ws_bin_ping_handler(self, ws: WsSession, _: bytes) -> None:
|
||||
await ws.send_bin(255, b"") # Ping-pong
|
||||
|
||||
@exposed_ws("start")
|
||||
async def __ws_start_handler(self, ws: WsSession, event: dict) -> None:
|
||||
try:
|
||||
kind = str(event.get("kind"))
|
||||
fmt = str(event.get("format"))
|
||||
except Exception:
|
||||
return
|
||||
src: (_Source | None) = None
|
||||
for cand in self.__srcs:
|
||||
if ws in cand.clients:
|
||||
return # Don't allow any double streaming
|
||||
if (cand.kind, cand.fmt) == (kind, fmt):
|
||||
src = cand
|
||||
if src:
|
||||
client = _Client(ws, src, asyncio.Queue(self.__Q_SIZE))
|
||||
client.sender = aiotools.create_deadly_task(str(ws), self.__sender(client))
|
||||
src.clients[ws] = client
|
||||
get_logger(0).info("Streaming %s to %s ...", src.streamer, ws)
|
||||
|
||||
# =====
|
||||
|
||||
async def _init_app(self) -> None:
|
||||
logger = get_logger(0)
|
||||
for src in self.__srcs:
|
||||
logger.info("Starting streamer %s ...", src.streamer)
|
||||
aiotools.create_deadly_task(str(src.streamer), self.__streamer(src))
|
||||
self._add_exposed(self)
|
||||
|
||||
async def _on_shutdown(self) -> None:
|
||||
logger = get_logger(0)
|
||||
logger.info("Stopping system tasks ...")
|
||||
await aiotools.stop_all_deadly_tasks()
|
||||
logger.info("Disconnecting clients ...")
|
||||
await self._close_all_wss()
|
||||
logger.info("On-Shutdown complete")
|
||||
|
||||
async def _on_ws_closed(self, ws: WsSession) -> None:
|
||||
for src in self.__srcs:
|
||||
client = src.clients.pop(ws, None)
|
||||
if client and client.sender:
|
||||
get_logger(0).info("Closed stream for %s", ws)
|
||||
client.sender.cancel()
|
||||
return
|
||||
|
||||
# =====
|
||||
|
||||
async def __sender(self, client: _Client) -> None:
|
||||
need_key = StreamerFormats.is_diff(client.src.streamer.get_format())
|
||||
if need_key:
|
||||
client.src.key_required = True
|
||||
has_key = False
|
||||
while True:
|
||||
frame = await client.queue.get()
|
||||
has_key = (not need_key or has_key or frame["key"])
|
||||
if has_key:
|
||||
try:
|
||||
await client.ws.send_bin(1, frame["key"].to_bytes() + frame["data"])
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
async def __streamer(self, src: _Source) -> None:
|
||||
logger = get_logger(0)
|
||||
while True:
|
||||
if len(src.clients) == 0:
|
||||
await asyncio.sleep(1)
|
||||
continue
|
||||
try:
|
||||
async with src.streamer.reading() as read_frame:
|
||||
while len(src.clients) > 0:
|
||||
frame = await read_frame(src.key_required)
|
||||
if frame["key"]:
|
||||
src.key_required = False
|
||||
for client in src.clients.values():
|
||||
try:
|
||||
client.queue.put_nowait(frame)
|
||||
except asyncio.QueueFull:
|
||||
# Если какой-то из клиентов не справляется, очищаем ему очередь и запрашиваем кейфрейм.
|
||||
# Я вижу у такой логики кучу минусов, хз как себя покажет, но лучше пока ничего не придумал.
|
||||
tools.clear_queue(client.queue)
|
||||
src.key_required = True
|
||||
except Exception:
|
||||
pass
|
||||
except StreamerError as ex:
|
||||
if isinstance(ex, StreamerPermError):
|
||||
logger.exception("Streamer failed: %s", src.streamer)
|
||||
else:
|
||||
logger.error("Streamer error: %s: %s", src.streamer, tools.efmt(ex))
|
||||
except Exception:
|
||||
get_logger(0).exception("Unexpected streamer error: %s", src.streamer)
|
||||
await asyncio.sleep(1)
|
||||
@@ -104,10 +104,10 @@ class PstServer(HttpServer): # pylint: disable=too-many-arguments,too-many-inst
|
||||
await self.__remount_storage(rw=False)
|
||||
logger.info("On-Cleanup complete")
|
||||
|
||||
async def _on_ws_opened(self) -> None:
|
||||
async def _on_ws_opened(self, _: WsSession) -> None:
|
||||
self.__notifier.notify()
|
||||
|
||||
async def _on_ws_closed(self) -> None:
|
||||
async def _on_ws_closed(self, _: WsSession) -> None:
|
||||
self.__notifier.notify()
|
||||
|
||||
# ===== SYSTEM TASKS
|
||||
|
||||
Reference in New Issue
Block a user