sink source for vnc

This commit is contained in:
Devaev Maxim 2021-01-22 04:26:04 +03:00
parent 7c39b3facd
commit ebe40697a5
7 changed files with 105 additions and 21 deletions

View File

@ -67,7 +67,7 @@ depends=(
iproute2 iproute2
dnsmasq dnsmasq
"raspberrypi-io-access>=0.5" "raspberrypi-io-access>=0.5"
"ustreamer>=1.19" "ustreamer>=3.12"
# Avoid dhcpcd stack trace # Avoid dhcpcd stack trace
dhclient dhclient

View File

@ -549,6 +549,15 @@ def _get_config_scheme() -> Dict:
"timeout": Option(5.0, type=valid_float_f01), "timeout": Option(5.0, type=valid_float_f01),
}, },
"memsink": {
"jpeg": {
"sink": Option("", unpack_as="obj"),
"lock_timeout": Option(1.0, type=valid_float_f01),
"wait_timeout": Option(1.0, type=valid_float_f01),
"drop_same_frames": Option(1.0, type=valid_float_f0),
},
},
"auth": { "auth": {
"vncauth": { "vncauth": {
"enabled": Option(False, type=valid_bool), "enabled": Option(False, type=valid_bool),

View File

@ -24,7 +24,8 @@ from typing import List
from typing import Optional from typing import Optional
from ...clients.kvmd import KvmdClient from ...clients.kvmd import KvmdClient
from ...clients.streamer import StreamerClient from ...clients.streamer import StreamerHttpClient
from ...clients.streamer import StreamerMemsinkClient
from ... import htclient from ... import htclient
@ -62,10 +63,14 @@ def main(argv: Optional[List[str]]=None) -> None:
user_agent=user_agent, user_agent=user_agent,
**config.kvmd._unpack(), **config.kvmd._unpack(),
), ),
streamer=StreamerClient( streamer_http=StreamerHttpClient(
user_agent=user_agent, user_agent=user_agent,
**config.streamer._unpack(), **config.streamer._unpack(),
), ),
streamer_memsink_jpeg=(
StreamerMemsinkClient(**config.memsink.jpeg._unpack())
if config.memsink.jpeg.sink else None
),
vnc_auth_manager=VncAuthManager(**config.auth.vncauth._unpack()), vnc_auth_manager=VncAuthManager(**config.auth.vncauth._unpack()),
**config.server.keepalive._unpack(), **config.server.keepalive._unpack(),

View File

@ -151,7 +151,7 @@ class RfbClient(RfbClientStream): # pylint: disable=too-many-instance-attribute
# ===== # =====
async def _send_fb(self, jpeg: bytes) -> None: async def _send_fb_jpeg(self, jpeg: bytes) -> None:
assert self._encodings.has_tight assert self._encodings.has_tight
assert self._encodings.tight_jpeg_quality > 0 assert self._encodings.tight_jpeg_quality > 0
assert len(jpeg) <= 4194303, len(jpeg) assert len(jpeg) <= 4194303, len(jpeg)

View File

@ -45,7 +45,10 @@ from ...clients.kvmd import KvmdClientSession
from ...clients.kvmd import KvmdClient from ...clients.kvmd import KvmdClient
from ...clients.streamer import StreamerError from ...clients.streamer import StreamerError
from ...clients.streamer import StreamerClient from ...clients.streamer import StreamerPermError
from ...clients.streamer import BaseStreamerClient
from ...clients.streamer import StreamerHttpClient
from ...clients.streamer import StreamerMemsinkClient
from .rfb import RfbClient from .rfb import RfbClient
from .rfb.stream import rfb_format_remote from .rfb.stream import rfb_format_remote
@ -79,7 +82,8 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
symmap: Dict[int, Dict[int, str]], symmap: Dict[int, Dict[int, str]],
kvmd: KvmdClient, kvmd: KvmdClient,
streamer: StreamerClient, streamer_http: StreamerHttpClient,
streamer_memsink_jpeg: Optional[StreamerMemsinkClient],
vnc_credentials: Dict[str, VncAuthKvmdCredentials], vnc_credentials: Dict[str, VncAuthKvmdCredentials],
none_auth_only: bool, none_auth_only: bool,
@ -103,7 +107,8 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
self.__symmap = symmap self.__symmap = symmap
self.__kvmd = kvmd self.__kvmd = kvmd
self.__streamer = streamer self.__streamer_http = streamer_http
self.__streamer_memsink_jpeg = streamer_memsink_jpeg
self.__shared_params = shared_params self.__shared_params = shared_params
@ -178,19 +183,29 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
async def __streamer_task_loop(self) -> None: async def __streamer_task_loop(self) -> None:
logger = get_logger(0) logger = get_logger(0)
await self.__ws_connected await self.__ws_connected
name = "streamer_http"
streamer: BaseStreamerClient = self.__streamer_http
if self.__streamer_memsink_jpeg:
(name, streamer) = ("streamer_memsink_jpeg", self.__streamer_memsink_jpeg)
while True: while True:
try: try:
streaming = False streaming = False
async for (online, width, height, jpeg) in self.__streamer.read_stream(): async for (online, width, height, jpeg) in streamer.read_stream():
if not streaming: if not streaming:
logger.info("[streamer] %s: Streaming ...", self._remote) logger.info("[%s] %s: Streaming ...", name, self._remote)
streaming = True streaming = True
if online: if online:
await self.__send_fb_real(width, height, jpeg) await self.__send_fb_real(width, height, jpeg)
else: else:
await self.__send_fb_stub("No signal") await self.__send_fb_stub("No signal")
except StreamerError as err: except StreamerError as err:
logger.info("[streamer] %s: Waiting for stream: %s", self._remote, err) if isinstance(err, StreamerPermError):
logger.info("[%s] %s: Permanent error: %s; switching to HTTP ...", name, self._remote, err)
(name, streamer) = ("streamer_http", self.__streamer_http)
else:
logger.info("[%s] %s: Waiting for stream: %s", name, self._remote, err)
await self.__send_fb_stub("Waiting for stream ...") await self.__send_fb_stub("Waiting for stream ...")
await asyncio.sleep(1) await asyncio.sleep(1)
@ -205,7 +220,7 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
await self.__send_fb_stub(msg, no_lock=True) await self.__send_fb_stub(msg, no_lock=True)
return return
await self._send_resize(width, height) await self._send_resize(width, height)
await self._send_fb(jpeg) await self._send_fb_jpeg(jpeg)
self.__fb_stub_text = "" self.__fb_stub_text = ""
self.__fb_stub_quality = 0 self.__fb_stub_quality = 0
self.__fb_requested = False self.__fb_requested = False
@ -215,7 +230,7 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
await self.__lock.acquire() await self.__lock.acquire()
try: try:
if self.__fb_requested and (self.__fb_stub_text != text or self.__fb_stub_quality != self._encodings.tight_jpeg_quality): if self.__fb_requested and (self.__fb_stub_text != text or self.__fb_stub_quality != self._encodings.tight_jpeg_quality):
await self._send_fb(await make_text_jpeg(self._width, self._height, self._encodings.tight_jpeg_quality, text)) await self._send_fb_jpeg(await make_text_jpeg(self._width, self._height, self._encodings.tight_jpeg_quality, text))
self.__fb_stub_text = text self.__fb_stub_text = text
self.__fb_stub_quality = self._encodings.tight_jpeg_quality self.__fb_stub_quality = self._encodings.tight_jpeg_quality
self.__fb_requested = False self.__fb_requested = False
@ -344,7 +359,8 @@ class VncServer: # pylint: disable=too-many-instance-attributes
keymap_path: str, keymap_path: str,
kvmd: KvmdClient, kvmd: KvmdClient,
streamer: StreamerClient, streamer_http: StreamerHttpClient,
streamer_memsink_jpeg: Optional[StreamerMemsinkClient],
vnc_auth_manager: VncAuthManager, vnc_auth_manager: VncAuthManager,
) -> None: ) -> None:
@ -393,7 +409,8 @@ class VncServer: # pylint: disable=too-many-instance-attributes
keymap_name=keymap_name, keymap_name=keymap_name,
symmap=symmap, symmap=symmap,
kvmd=kvmd, kvmd=kvmd,
streamer=streamer, streamer_http=streamer_http,
streamer_memsink_jpeg=streamer_memsink_jpeg,
vnc_credentials=(await self.__vnc_auth_manager.read_credentials())[0], vnc_credentials=(await self.__vnc_auth_manager.read_credentials())[0],
none_auth_only=none_auth_only, none_auth_only=none_auth_only,
shared_params=shared_params, shared_params=shared_params,

View File

@ -28,6 +28,12 @@ from typing import AsyncGenerator
import aiohttp import aiohttp
try:
import ustreamer
except ImportError:
ustreamer = None
from .. import aiotools
from .. import htclient from .. import htclient
@ -36,6 +42,22 @@ class StreamerError(Exception):
pass pass
class StreamerTempError(StreamerError):
pass
class StreamerPermError(StreamerError):
pass
# =====
class BaseStreamerClient:
async def read_stream(self) -> AsyncGenerator[Tuple[bool, int, int, bytes], None]:
if self is not None: # XXX: Vulture and pylint hack
raise NotImplementedError()
yield
# ===== # =====
def _patch_stream_reader(reader: aiohttp.StreamReader) -> None: def _patch_stream_reader(reader: aiohttp.StreamReader) -> None:
# https://github.com/pikvm/pikvm/issues/92 # https://github.com/pikvm/pikvm/issues/92
@ -45,13 +67,13 @@ def _patch_stream_reader(reader: aiohttp.StreamReader) -> None:
async def read(self: aiohttp.StreamReader, n: int=-1) -> bytes: # pylint: disable=invalid-name async def read(self: aiohttp.StreamReader, n: int=-1) -> bytes: # pylint: disable=invalid-name
if self.is_eof(): if self.is_eof():
raise StreamerError("StreamReader.read(): Reached EOF") raise StreamerTempError("StreamReader.read(): Reached EOF")
return (await orig_read(n)) return (await orig_read(n))
reader.read = types.MethodType(read, reader) # type: ignore reader.read = types.MethodType(read, reader) # type: ignore
class StreamerClient: class StreamerHttpClient(BaseStreamerClient):
def __init__( def __init__(
self, self,
host: str, host: str,
@ -82,7 +104,7 @@ class StreamerClient:
while True: while True:
frame = await reader.next() # pylint: disable=not-callable frame = await reader.next() # pylint: disable=not-callable
if not isinstance(frame, aiohttp.BodyPartReader): if not isinstance(frame, aiohttp.BodyPartReader):
raise StreamerError("Expected body part") raise StreamerTempError("Expected body part")
data = bytes(await frame.read()) data = bytes(await frame.read())
if not data: if not data:
@ -95,10 +117,10 @@ class StreamerClient:
data, data,
) )
except Exception as err: # Тут бывают и ассерты, и KeyError, и прочая херня except Exception as err: # Тут бывают и ассерты, и KeyError, и прочая херня
if isinstance(err, StreamerError): if isinstance(err, StreamerTempError):
raise raise
raise StreamerError(f"{type(err).__name__}: {err}") raise StreamerTempError(f"{type(err).__name__}: {err}")
raise StreamerError("Reached EOF") raise StreamerTempError("Reached EOF")
def __make_http_session(self) -> aiohttp.ClientSession: def __make_http_session(self) -> aiohttp.ClientSession:
kwargs: Dict = { kwargs: Dict = {
@ -115,3 +137,34 @@ class StreamerClient:
def __make_url(self, handle: str) -> str: def __make_url(self, handle: str) -> str:
assert not handle.startswith("/"), handle assert not handle.startswith("/"), handle
return f"http://{self.__host}:{self.__port}/{handle}" return f"http://{self.__host}:{self.__port}/{handle}"
class StreamerMemsinkClient(BaseStreamerClient):
def __init__(
self,
obj: str,
lock_timeout: float,
wait_timeout: float,
drop_same_frames: float,
) -> None:
self.__kwargs: Dict = {
"obj": obj,
"lock_timeout": lock_timeout,
"wait_timeout": wait_timeout,
"drop_same_frames": drop_same_frames,
}
async def read_stream(self) -> AsyncGenerator[Tuple[bool, int, int, bytes], None]:
if ustreamer is None:
raise StreamerPermError("Missing ustreamer library")
try:
with ustreamer.Memsink(**self.__kwargs) as sink:
while True:
frame = await aiotools.run_async(sink.wait_frame)
if frame is not None:
yield (frame["online"], frame["width"], frame["height"], frame["data"])
except FileNotFoundError as err:
raise StreamerTempError(f"{type(err).__name__}: {err}")
except Exception as err:
raise StreamerPermError(f"{type(err).__name__}: {err}")

View File

@ -64,7 +64,7 @@ ENV USTREAMER_MIN_VERSION $USTREAMER_MIN_VERSION
RUN echo $USTREAMER_MIN_VERSION RUN echo $USTREAMER_MIN_VERSION
RUN git clone https://github.com/pikvm/ustreamer \ RUN git clone https://github.com/pikvm/ustreamer \
&& cd ustreamer \ && cd ustreamer \
&& make PREFIX=/usr install \ && make WITH_PYTHON=1 PREFIX=/usr DESTDIR=/ install \
&& cd - \ && cd - \
&& rm -rf ustreamer && rm -rf ustreamer