This commit is contained in:
Devaev Maxim 2021-01-24 12:02:04 +03:00
parent dc87f8d259
commit 3a2ffca6b7
6 changed files with 101 additions and 61 deletions

View File

@ -556,6 +556,12 @@ def _get_config_scheme() -> Dict:
"wait_timeout": Option(1.0, type=valid_float_f01), "wait_timeout": Option(1.0, type=valid_float_f01),
"drop_same_frames": Option(1.0, type=valid_float_f0), "drop_same_frames": Option(1.0, type=valid_float_f0),
}, },
"h264": {
"sink": Option("", unpack_as="obj"),
"lock_timeout": Option(1.0, type=valid_float_f01),
"wait_timeout": Option(1.0, type=valid_float_f01),
"drop_same_frames": Option(0.0, type=valid_float_f0),
},
}, },
"auth": { "auth": {

View File

@ -46,6 +46,11 @@ def main(argv: Optional[List[str]]=None) -> None:
user_agent = htclient.make_user_agent("KVMD-VNC") user_agent = htclient.make_user_agent("KVMD-VNC")
def make_memsink(name: str) -> Optional[StreamerMemsinkClient]:
if getattr(config.memsink, name).sink:
return StreamerMemsinkClient(name=name, **getattr(config.memsink, name)._unpack())
return None
VncServer( VncServer(
host=config.server.host, host=config.server.host,
port=config.server.port, port=config.server.port,
@ -59,18 +64,12 @@ def main(argv: Optional[List[str]]=None) -> None:
desired_fps=config.desired_fps, desired_fps=config.desired_fps,
keymap_path=config.keymap, keymap_path=config.keymap,
kvmd=KvmdClient( kvmd=KvmdClient(user_agent=user_agent, **config.kvmd._unpack()),
user_agent=user_agent, streamers=list(filter(None, [
**config.kvmd._unpack(), make_memsink("h264"),
), make_memsink("jpeg"),
streamer_http=StreamerHttpClient( StreamerHttpClient(name="jpeg", user_agent=user_agent, **config.streamer._unpack()),
user_agent=user_agent, ])),
**config.streamer._unpack(),
),
streamer_memsink_jpeg=(
StreamerMemsinkClient(**config.memsink.jpeg._unpack())
if config.memsink.jpeg.sink else None
),
vnc_auth_manager=VncAuthManager(**config.auth.vncauth._unpack()), vnc_auth_manager=VncAuthManager(**config.auth.vncauth._unpack()),
**config.server.keepalive._unpack(), **config.server.keepalive._unpack(),

View File

@ -22,6 +22,7 @@
import asyncio import asyncio
import ssl import ssl
import dataclasses
from typing import Tuple from typing import Tuple
from typing import List from typing import List
@ -164,6 +165,13 @@ class RfbClient(RfbClientStream): # pylint: disable=too-many-instance-attribute
else: else:
await self._write_struct("", bytes([0b10011111, length & 0x7F | 0x80, length >> 7 & 0x7F | 0x80, length >> 14 & 0xFF]), data) await self._write_struct("", bytes([0b10011111, length & 0x7F | 0x80, length >> 7 & 0x7F | 0x80, length >> 14 & 0xFF]), data)
async def _send_fb_h264(self, data: bytes) -> None:
assert self._encodings.has_h264
assert len(data) <= 0xFFFFFFFF, len(data)
await self._write_fb_update(self._width, self._height, RfbEncodings.H264, drain=False)
await self._write_struct("LL", len(data), 0, drain=False)
await self._write_struct("", data)
async def _send_resize(self, width: int, height: int) -> None: async def _send_resize(self, width: int, height: int) -> None:
assert self._encodings.has_resize assert self._encodings.has_resize
await self._write_fb_update(width, height, RfbEncodings.RESIZE) await self._write_fb_update(width, height, RfbEncodings.RESIZE)
@ -380,14 +388,18 @@ class RfbClient(RfbClientStream): # pylint: disable=too-many-instance-attribute
raise RfbError(f"Requested unsupported bits_per_pixel={bits_per_pixel} for Tight JPEG; required 16 or 32") raise RfbError(f"Requested unsupported bits_per_pixel={bits_per_pixel} for Tight JPEG; required 16 or 32")
async def __handle_set_encodings(self) -> None: async def __handle_set_encodings(self) -> None:
logger = get_logger(0)
encodings_count = (await self._read_struct("x H"))[0] encodings_count = (await self._read_struct("x H"))[0]
if encodings_count > 1024: if encodings_count > 1024:
raise RfbError(f"Too many encodings: {encodings_count}") raise RfbError(f"Too many encodings: {encodings_count}")
self._encodings = RfbClientEncodings(frozenset(await self._read_struct("l" * encodings_count))) self._encodings = RfbClientEncodings(frozenset(await self._read_struct("l" * encodings_count)))
get_logger(0).info("[main] %s: Features: resize=%d, rename=%d, leds=%d, extkeys=%d", logger.info("[main] %s: Client features (SetEncodings): ...", self._remote)
self._remote, self._encodings.has_resize, self._encodings.has_rename, for (key, value) in dataclasses.asdict(self._encodings).items():
self._encodings.has_leds_state, self._encodings.has_ext_keys) logger.info("[main] %s: ... %s=%s", self._remote, key, value)
self.__check_tight_jpeg() self.__check_tight_jpeg()
if self._encodings.has_ext_keys: # Preferred method if self._encodings.has_ext_keys: # Preferred method
await self._write_fb_update(0, 0, RfbEncodings.EXT_KEYS, drain=True) await self._write_fb_update(0, 0, RfbEncodings.EXT_KEYS, drain=True)
await self._on_set_encodings() await self._on_set_encodings()

View File

@ -39,9 +39,11 @@ class RfbEncodings:
[10, 20, 30, 40, 50, 60, 70, 80, 90, 100], [10, 20, 30, 40, 50, 60, 70, 80, 90, 100],
)) ))
H264 = 0xCAFE0101 # Pi-KVM H264 Encoding
@dataclasses.dataclass(frozen=True) @dataclasses.dataclass(frozen=True)
class RfbClientEncodings: class RfbClientEncodings: # pylint: disable=too-many-instance-attributes
encodings: FrozenSet[int] encodings: FrozenSet[int]
has_resize: bool = dataclasses.field(default=False) has_resize: bool = dataclasses.field(default=False)
@ -52,6 +54,8 @@ class RfbClientEncodings:
has_tight: bool = dataclasses.field(default=False) has_tight: bool = dataclasses.field(default=False)
tight_jpeg_quality: int = dataclasses.field(default=0) tight_jpeg_quality: int = dataclasses.field(default=0)
has_h264: bool = dataclasses.field(default=False)
def __post_init__(self) -> None: def __post_init__(self) -> None:
self.__set("has_resize", (RfbEncodings.RESIZE in self.encodings)) self.__set("has_resize", (RfbEncodings.RESIZE in self.encodings))
self.__set("has_rename", (RfbEncodings.RENAME in self.encodings)) self.__set("has_rename", (RfbEncodings.RENAME in self.encodings))
@ -61,6 +65,8 @@ class RfbClientEncodings:
self.__set("has_tight", (RfbEncodings.TIGHT in self.encodings)) self.__set("has_tight", (RfbEncodings.TIGHT in self.encodings))
self.__set("tight_jpeg_quality", self.__get_tight_jpeg_quality()) self.__set("tight_jpeg_quality", self.__get_tight_jpeg_quality())
self.__set("has_h264", (RfbEncodings.H264 in self.encodings))
def __set(self, key: str, value: Any) -> None: def __set(self, key: str, value: Any) -> None:
object.__setattr__(self, key, value) object.__setattr__(self, key, value)

View File

@ -26,6 +26,8 @@ import socket
import dataclasses import dataclasses
import contextlib import contextlib
from typing import Tuple
from typing import List
from typing import Dict from typing import Dict
from typing import Union from typing import Union
from typing import Optional from typing import Optional
@ -47,8 +49,6 @@ from ...clients.kvmd import KvmdClient
from ...clients.streamer import StreamerError from ...clients.streamer import StreamerError
from ...clients.streamer import StreamerPermError from ...clients.streamer import StreamerPermError
from ...clients.streamer import BaseStreamerClient from ...clients.streamer import BaseStreamerClient
from ...clients.streamer import StreamerHttpClient
from ...clients.streamer import StreamerMemsinkClient
from ... import tools from ... import tools
@ -84,8 +84,7 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
symmap: Dict[int, Dict[int, str]], symmap: Dict[int, Dict[int, str]],
kvmd: KvmdClient, kvmd: KvmdClient,
streamer_http: StreamerHttpClient, streamers: List[BaseStreamerClient],
streamer_memsink_jpeg: Optional[StreamerMemsinkClient],
vnc_credentials: Dict[str, VncAuthKvmdCredentials], vnc_credentials: Dict[str, VncAuthKvmdCredentials],
none_auth_only: bool, none_auth_only: bool,
@ -109,19 +108,19 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
self.__symmap = symmap self.__symmap = symmap
self.__kvmd = kvmd self.__kvmd = kvmd
self.__streamer_http = streamer_http self.__streamers = streamers
self.__streamer_memsink_jpeg = streamer_memsink_jpeg
self.__shared_params = shared_params self.__shared_params = shared_params
self.__authorized = asyncio.Future() # type: ignore self.__stage1_authorized = asyncio.Future() # type: ignore
self.__ws_connected = asyncio.Future() # type: ignore self.__stage2_encodings_accepted = asyncio.Future() # type: ignore
self.__stage3_ws_connected = asyncio.Future() # type: ignore
self.__kvmd_session: Optional[KvmdClientSession] = None self.__kvmd_session: Optional[KvmdClientSession] = None
self.__kvmd_ws: Optional[KvmdClientWs] = None self.__kvmd_ws: Optional[KvmdClientWs] = None
self.__fb_requested = False self.__fb_requested = False
self.__fb_stub_text = "" self.__fb_stub: Optional[Tuple[int, str]] = None
self.__fb_stub_quality = 0
# Эти состояния шарить не обязательно - бекенд исключает дублирующиеся события. # Эти состояния шарить не обязательно - бекенд исключает дублирующиеся события.
# Все это нужно только чтобы не посылать лишние жсоны в сокет KVMD # Все это нужно только чтобы не посылать лишние жсоны в сокет KVMD
@ -149,12 +148,19 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
async def __kvmd_task_loop(self) -> None: async def __kvmd_task_loop(self) -> None:
logger = get_logger(0) logger = get_logger(0)
await self.__authorized await self.__stage1_authorized
logger.info("[kvmd] %s: Waiting for the SetEncodings message ...", self._remote)
try:
await asyncio.wait_for(self.__stage2_encodings_accepted, timeout=5)
except asyncio.TimeoutError:
raise RfbError("No SetEncodings message recieved from the clienta in 5 secs")
assert self.__kvmd_session assert self.__kvmd_session
try: try:
async with self.__kvmd_session.ws() as self.__kvmd_ws: async with self.__kvmd_session.ws() as self.__kvmd_ws:
logger.info("[kvmd] %s: Connected to KVMD websocket", self._remote) logger.info("[kvmd] %s: Connected to KVMD websocket", self._remote)
self.__ws_connected.set_result(None) self.__stage3_ws_connected.set_result(None)
async for event in self.__kvmd_ws.communicate(): async for event in self.__kvmd_ws.communicate():
await self.__process_ws_event(event) await self.__process_ws_event(event)
raise RfbError("KVMD closes the websocket (the server may have been stopped)") raise RfbError("KVMD closes the websocket (the server may have been stopped)")
@ -184,34 +190,29 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
async def __streamer_task_loop(self) -> None: async def __streamer_task_loop(self) -> None:
logger = get_logger(0) logger = get_logger(0)
await self.__ws_connected await self.__stage3_ws_connected
streamer = self.__streamers[0]
name = "streamer_http"
streamer: BaseStreamerClient = self.__streamer_http
if self.__streamer_memsink_jpeg:
(name, streamer) = ("streamer_memsink_jpeg", self.__streamer_memsink_jpeg)
while True: while True:
try: try:
streaming = False streaming = False
async for (online, width, height, data) in streamer.read_stream(): async for (online, width, height, data, h264) in streamer.read_stream():
if not streaming: if not streaming:
logger.info("[%s] %s: Streaming ...", name, self._remote) logger.info("[streamer] %s: Streaming from %s ...", self._remote, streamer)
streaming = True streaming = True
if online: if online:
await self.__send_fb_real(width, height, data) await self.__send_fb_real(width, height, data, h264)
else: else:
await self.__send_fb_stub("No signal") await self.__send_fb_stub("No signal")
except StreamerError as err: except StreamerError as err:
if isinstance(err, StreamerPermError): if isinstance(err, StreamerPermError):
logger.info("[%s] %s: Permanent error: %s; switching to HTTP ...", name, self._remote, err) streamer = self.__streamers[-1]
(name, streamer) = ("streamer_http", self.__streamer_http) logger.info("[streamer] %s: Permanent error: %s; switching to %s ...", self._remote, err, streamer)
else: else:
logger.info("[%s] %s: Waiting for stream: %s", name, self._remote, err) logger.info("[streamer] %s: Waiting for stream: %s", self._remote, err)
await self.__send_fb_stub("Waiting for stream ...") await self.__send_fb_stub("Waiting for stream ...")
await asyncio.sleep(1) await asyncio.sleep(1)
async def __send_fb_real(self, width: int, height: int, data: bytes) -> None: async def __send_fb_real(self, width: int, height: int, data: bytes, h264: bool) -> None:
async with self.__lock: async with self.__lock:
if self.__fb_requested: if self.__fb_requested:
if self._width != width or self._height != height: if self._width != width or self._height != height:
@ -222,19 +223,18 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
await self.__send_fb_stub(msg, no_lock=True) await self.__send_fb_stub(msg, no_lock=True)
return return
await self._send_resize(width, height) await self._send_resize(width, height)
await self._send_fb_jpeg(data) await (self._send_fb_h264 if h264 else self._send_fb_jpeg)(data)
self.__fb_stub_text = "" self.__fb_stub = None
self.__fb_stub_quality = 0
self.__fb_requested = False self.__fb_requested = False
async def __send_fb_stub(self, text: str, no_lock: bool=False) -> None: async def __send_fb_stub(self, text: str, no_lock: bool=False) -> None:
if not no_lock: if not no_lock:
await self.__lock.acquire() await self.__lock.acquire()
try: try:
if self.__fb_requested and (self.__fb_stub_text != text or self.__fb_stub_quality != self._encodings.tight_jpeg_quality): quality = self._encodings.tight_jpeg_quality
await self._send_fb_jpeg(await make_text_jpeg(self._width, self._height, self._encodings.tight_jpeg_quality, text)) if self.__fb_requested and self.__fb_stub != (quality, text):
self.__fb_stub_text = text await self._send_fb_jpeg(await make_text_jpeg(self._width, self._height, quality, text))
self.__fb_stub_quality = self._encodings.tight_jpeg_quality self.__fb_stub = (quality, text)
self.__fb_requested = False self.__fb_requested = False
finally: finally:
if not no_lock: if not no_lock:
@ -245,7 +245,7 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
async def _authorize_userpass(self, user: str, passwd: str) -> bool: async def _authorize_userpass(self, user: str, passwd: str) -> bool:
self.__kvmd_session = self.__kvmd.make_session(user, passwd) self.__kvmd_session = self.__kvmd.make_session(user, passwd)
if (await self.__kvmd_session.auth.check()): if (await self.__kvmd_session.auth.check()):
self.__authorized.set_result(None) self.__stage1_authorized.set_result(None)
return True return True
return False return False
@ -312,7 +312,7 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
self.__mouse_move = move self.__mouse_move = move
async def _on_cut_event(self, text: str) -> None: async def _on_cut_event(self, text: str) -> None:
assert self.__authorized.done() assert self.__stage1_authorized.done()
assert self.__kvmd_session assert self.__kvmd_session
logger = get_logger(0) logger = get_logger(0)
logger.info("[main] %s: Printing %d characters ...", self._remote, len(text)) logger.info("[main] %s: Printing %d characters ...", self._remote, len(text))
@ -327,11 +327,13 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
logger.exception("[main] %s: Can't print characters", self._remote) logger.exception("[main] %s: Can't print characters", self._remote)
async def _on_set_encodings(self) -> None: async def _on_set_encodings(self) -> None:
assert self.__authorized.done() assert self.__stage1_authorized.done()
assert self.__kvmd_session assert self.__kvmd_session
if not self.__stage2_encodings_accepted.done():
self.__stage2_encodings_accepted.set_result(None)
has_quality = (await self.__kvmd_session.streamer.get_state())["features"]["quality"] has_quality = (await self.__kvmd_session.streamer.get_state())["features"]["quality"]
quality = (self._encodings.tight_jpeg_quality if has_quality else None) quality = (self._encodings.tight_jpeg_quality if has_quality else None)
get_logger(0).info("[main] %s: Applying streamer params: quality=%s; desired_fps=%d ...", get_logger(0).info("[main] %s: Applying streamer params: jpeg_quality=%s; desired_fps=%d ...",
self._remote, quality, self.__desired_fps) self._remote, quality, self.__desired_fps)
await self.__kvmd_session.streamer.set_params(quality, self.__desired_fps) await self.__kvmd_session.streamer.set_params(quality, self.__desired_fps)
@ -361,8 +363,7 @@ class VncServer: # pylint: disable=too-many-instance-attributes
keymap_path: str, keymap_path: str,
kvmd: KvmdClient, kvmd: KvmdClient,
streamer_http: StreamerHttpClient, streamers: List[BaseStreamerClient],
streamer_memsink_jpeg: Optional[StreamerMemsinkClient],
vnc_auth_manager: VncAuthManager, vnc_auth_manager: VncAuthManager,
) -> None: ) -> None:
@ -411,8 +412,7 @@ class VncServer: # pylint: disable=too-many-instance-attributes
keymap_name=keymap_name, keymap_name=keymap_name,
symmap=symmap, symmap=symmap,
kvmd=kvmd, kvmd=kvmd,
streamer_http=streamer_http, streamers=streamers,
streamer_memsink_jpeg=streamer_memsink_jpeg,
vnc_credentials=(await self.__vnc_auth_manager.read_credentials())[0], vnc_credentials=(await self.__vnc_auth_manager.read_credentials())[0],
none_auth_only=none_auth_only, none_auth_only=none_auth_only,
shared_params=shared_params, shared_params=shared_params,

View File

@ -53,7 +53,7 @@ class StreamerPermError(StreamerError):
# ===== # =====
class BaseStreamerClient: class BaseStreamerClient:
async def read_stream(self) -> AsyncGenerator[Tuple[bool, int, int, bytes], None]: async def read_stream(self) -> AsyncGenerator[Tuple[bool, int, int, bytes, bool], None]:
if self is not None: # XXX: Vulture and pylint hack if self is not None: # XXX: Vulture and pylint hack
raise NotImplementedError() raise NotImplementedError()
yield yield
@ -77,6 +77,7 @@ def _patch_stream_reader(reader: aiohttp.StreamReader) -> None:
class StreamerHttpClient(BaseStreamerClient): class StreamerHttpClient(BaseStreamerClient):
def __init__( def __init__(
self, self,
name: str,
host: str, host: str,
port: int, port: int,
unix_path: str, unix_path: str,
@ -85,13 +86,14 @@ class StreamerHttpClient(BaseStreamerClient):
) -> None: ) -> None:
assert port or unix_path assert port or unix_path
self.__name = name
self.__host = host self.__host = host
self.__port = port self.__port = port
self.__unix_path = unix_path self.__unix_path = unix_path
self.__timeout = timeout self.__timeout = timeout
self.__user_agent = user_agent self.__user_agent = user_agent
async def read_stream(self) -> AsyncGenerator[Tuple[bool, int, int, bytes], None]: async def read_stream(self) -> AsyncGenerator[Tuple[bool, int, int, bytes, bool], None]:
try: try:
async with self.__make_http_session() as session: async with self.__make_http_session() as session:
async with session.get( async with session.get(
@ -116,6 +118,7 @@ class StreamerHttpClient(BaseStreamerClient):
int(frame.headers["X-UStreamer-Width"]), int(frame.headers["X-UStreamer-Width"]),
int(frame.headers["X-UStreamer-Height"]), int(frame.headers["X-UStreamer-Height"]),
data, data,
False,
) )
except Exception as err: # Тут бывают и ассерты, и KeyError, и прочая херня except Exception as err: # Тут бывают и ассерты, и KeyError, и прочая херня
if isinstance(err, StreamerTempError): if isinstance(err, StreamerTempError):
@ -139,16 +142,21 @@ class StreamerHttpClient(BaseStreamerClient):
assert not handle.startswith("/"), handle assert not handle.startswith("/"), handle
return f"http://{self.__host}:{self.__port}/{handle}" return f"http://{self.__host}:{self.__port}/{handle}"
def __str__(self) -> str:
return f"StreamerHttpClient({self.__name})"
class StreamerMemsinkClient(BaseStreamerClient): class StreamerMemsinkClient(BaseStreamerClient):
def __init__( def __init__(
self, self,
name: str,
obj: str, obj: str,
lock_timeout: float, lock_timeout: float,
wait_timeout: float, wait_timeout: float,
drop_same_frames: float, drop_same_frames: float,
) -> None: ) -> None:
self.__name = name
self.__kwargs: Dict = { self.__kwargs: Dict = {
"obj": obj, "obj": obj,
"lock_timeout": lock_timeout, "lock_timeout": lock_timeout,
@ -156,7 +164,7 @@ class StreamerMemsinkClient(BaseStreamerClient):
"drop_same_frames": drop_same_frames, "drop_same_frames": drop_same_frames,
} }
async def read_stream(self) -> AsyncGenerator[Tuple[bool, int, int, bytes], None]: async def read_stream(self) -> AsyncGenerator[Tuple[bool, int, int, bytes, bool], None]:
if ustreamer is None: if ustreamer is None:
raise StreamerPermError("Missing ustreamer library") raise StreamerPermError("Missing ustreamer library")
try: try:
@ -164,8 +172,17 @@ class StreamerMemsinkClient(BaseStreamerClient):
while True: while True:
frame = await aiotools.run_async(sink.wait_frame) frame = await aiotools.run_async(sink.wait_frame)
if frame is not None: if frame is not None:
yield (frame["online"], frame["width"], frame["height"], frame["data"]) yield (
frame["online"],
frame["width"],
frame["height"],
frame["data"],
(frame["format"] == 875967048), # V4L2_PIX_FMT_H264
)
except FileNotFoundError as err: except FileNotFoundError as err:
raise StreamerTempError(tools.efmt(err)) raise StreamerTempError(tools.efmt(err))
except Exception as err: except Exception as err:
raise StreamerPermError(tools.efmt(err)) raise StreamerPermError(tools.efmt(err))
def __str__(self) -> str:
return f"StreamerMemsinkClient({self.__name})"