Implemented VNC ContinuousUpdates

This commit is contained in:
Maxim Devaev 2022-11-02 03:23:37 +03:00
parent c57928a0f1
commit 08241e9255
4 changed files with 107 additions and 54 deletions

View File

@ -159,6 +159,9 @@ class RfbClient(RfbClientStream): # pylint: disable=too-many-instance-attribute
async def _on_fb_update_request(self) -> None: async def _on_fb_update_request(self) -> None:
raise NotImplementedError raise NotImplementedError
async def _on_enable_cont_updates(self, enabled: bool) -> None:
raise NotImplementedError
# ===== # =====
async def _send_fb_jpeg(self, data: bytes) -> None: async def _send_fb_jpeg(self, data: bytes) -> None:
@ -398,6 +401,7 @@ class RfbClient(RfbClientStream): # pylint: disable=too-many-instance-attribute
4: self.__handle_key_event, 4: self.__handle_key_event,
5: self.__handle_pointer_event, 5: self.__handle_pointer_event,
6: self.__handle_client_cut_text, 6: self.__handle_client_cut_text,
150: self.__handle_enable_cont_updates,
255: self.__handle_qemu_event, 255: self.__handle_qemu_event,
} }
while True: while True:
@ -429,6 +433,9 @@ class RfbClient(RfbClientStream): # pylint: disable=too-many-instance-attribute
logger.info("%s [main]: ... %s", self._remote, item) logger.info("%s [main]: ... %s", self._remote, item)
self.__check_encodings() self.__check_encodings()
if self._encodings.has_cont_updates:
await self._write_struct("allow ContUpdates", "B", 150)
if self._encodings.has_ext_keys: # Preferred method if self._encodings.has_ext_keys: # Preferred method
await self._write_fb_update("ExtKeys FBUR", 0, 0, RfbEncodings.EXT_KEYS, drain=True) await self._write_fb_update("ExtKeys FBUR", 0, 0, RfbEncodings.EXT_KEYS, drain=True)
await self._on_set_encodings() await self._on_set_encodings()
@ -473,6 +480,12 @@ class RfbClient(RfbClientStream): # pylint: disable=too-many-instance-attribute
text = await self._read_text("cut text data", length) text = await self._read_text("cut text data", length)
await self._on_cut_event(text) await self._on_cut_event(text)
async def __handle_enable_cont_updates(self) -> None:
enabled = bool((await self._read_struct("enabled ContUpdates", "B HH HH"))[0])
await self._on_enable_cont_updates(enabled)
if not enabled:
await self._write_struct("disabled ContUpdates", "B", 150)
async def __handle_qemu_event(self) -> None: async def __handle_qemu_event(self) -> None:
(sub_type, state, code) = await self._read_struct("QEMU event (key?)", "B H xxxx L") (sub_type, state, code) = await self._read_struct("QEMU event (key?)", "B H xxxx L")
if sub_type != 0: if sub_type != 0:

View File

@ -31,6 +31,7 @@ class RfbEncodings:
RENAME = -307 # DesktopName Pseudo-encoding RENAME = -307 # DesktopName Pseudo-encoding
LEDS_STATE = -261 # QEMU LED State Pseudo-encoding LEDS_STATE = -261 # QEMU LED State Pseudo-encoding
EXT_KEYS = -258 # QEMU Extended Key Events Pseudo-encoding EXT_KEYS = -258 # QEMU Extended Key Events Pseudo-encoding
CONT_UPDATES = -313 # ContinuousUpdates Pseudo-encoding
TIGHT = 7 TIGHT = 7
TIGHT_JPEG_QUALITIES = dict(zip( # JPEG Quality Level Pseudo-encoding TIGHT_JPEG_QUALITIES = dict(zip( # JPEG Quality Level Pseudo-encoding
@ -53,6 +54,7 @@ class RfbClientEncodings: # pylint: disable=too-many-instance-attributes
has_rename: bool = dataclasses.field(default=False, metadata=_make_meta(RfbEncodings.RENAME)) # noqa: E224 has_rename: bool = dataclasses.field(default=False, metadata=_make_meta(RfbEncodings.RENAME)) # noqa: E224
has_leds_state: bool = dataclasses.field(default=False, metadata=_make_meta(RfbEncodings.LEDS_STATE)) # noqa: E224 has_leds_state: bool = dataclasses.field(default=False, metadata=_make_meta(RfbEncodings.LEDS_STATE)) # noqa: E224
has_ext_keys: bool = dataclasses.field(default=False, metadata=_make_meta(RfbEncodings.EXT_KEYS)) # noqa: E224 has_ext_keys: bool = dataclasses.field(default=False, metadata=_make_meta(RfbEncodings.EXT_KEYS)) # noqa: E224
has_cont_updates: bool = dataclasses.field(default=False, metadata=_make_meta(RfbEncodings.CONT_UPDATES)) # noqa: E224
has_tight: bool = dataclasses.field(default=False, metadata=_make_meta(RfbEncodings.TIGHT)) # noqa: E224 has_tight: bool = dataclasses.field(default=False, metadata=_make_meta(RfbEncodings.TIGHT)) # noqa: E224
tight_jpeg_quality: int = dataclasses.field(default=0, metadata=_make_meta(frozenset(RfbEncodings.TIGHT_JPEG_QUALITIES))) # noqa: E224 tight_jpeg_quality: int = dataclasses.field(default=0, metadata=_make_meta(frozenset(RfbEncodings.TIGHT_JPEG_QUALITIES))) # noqa: E224

View File

@ -122,6 +122,8 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
self.__fb_notifier = aiotools.AioNotifier() self.__fb_notifier = aiotools.AioNotifier()
self.__fb_queue: "asyncio.Queue[dict]" = asyncio.Queue() self.__fb_queue: "asyncio.Queue[dict]" = asyncio.Queue()
self.__fb_cont_updates = False
self.__fb_key_required = False
# Эти состояния шарить не обязательно - бекенд исключает дублирующиеся события. # Эти состояния шарить не обязательно - бекенд исключает дублирующиеся события.
# Все это нужно только чтобы не посылать лишние жсоны в сокет KVMD # Все это нужно только чтобы не посылать лишние жсоны в сокет KVMD
@ -198,7 +200,9 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
while True: while True:
try: try:
streaming = False streaming = False
async for frame in streamer.read_stream(): async with streamer.reading() as read_frame:
while True:
frame = await read_frame(self.__fb_key_required)
if not streaming: if not streaming:
logger.info("%s [streamer]: Streaming ...", self._remote) logger.info("%s [streamer]: Streaming ...", self._remote)
streaming = True streaming = True
@ -284,6 +288,8 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
f" -> {last['width']}x{last['height']}\nPlease reconnect" f" -> {last['width']}x{last['height']}\nPlease reconnect"
) )
await self._send_fb_jpeg((await self.__make_text_frame(msg))["data"]) await self._send_fb_jpeg((await self.__make_text_frame(msg))["data"])
if self.__fb_cont_updates:
self.__fb_notifier.notify()
continue continue
await self._send_resize(last["width"], last["height"]) await self._send_resize(last["width"], last["height"])
@ -294,12 +300,18 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
if last["format"] == StreamFormats.JPEG: if last["format"] == StreamFormats.JPEG:
await self._send_fb_jpeg(last["data"]) await self._send_fb_jpeg(last["data"])
if self.__fb_cont_updates:
self.__fb_notifier.notify()
elif last["format"] == StreamFormats.H264: elif last["format"] == StreamFormats.H264:
if not self._encodings.has_h264: if not self._encodings.has_h264:
raise RfbError("The client doesn't want to accept H264 anymore") raise RfbError("The client doesn't want to accept H264 anymore")
if has_h264_key: if has_h264_key:
self.__fb_key_required = False
await self._send_fb_h264(last["data"]) await self._send_fb_h264(last["data"])
if self.__fb_cont_updates:
self.__fb_notifier.notify()
else: else:
self.__fb_key_required = True
self.__fb_notifier.notify() self.__fb_notifier.notify()
else: else:
raise RuntimeError(f"Unknown format: {last['format']}") raise RuntimeError(f"Unknown format: {last['format']}")
@ -413,6 +425,13 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
await self.__kvmd_session.streamer.set_params(quality, self.__desired_fps) await self.__kvmd_session.streamer.set_params(quality, self.__desired_fps)
async def _on_fb_update_request(self) -> None: async def _on_fb_update_request(self) -> None:
if not self.__fb_cont_updates:
self.__fb_notifier.notify()
async def _on_enable_cont_updates(self, enabled: bool) -> None:
get_logger(0).info("%s [main]: Applying ContUpdates=%s ...", self._remote, enabled)
self.__fb_cont_updates = enabled
if enabled:
self.__fb_notifier.notify() self.__fb_notifier.notify()

View File

@ -20,16 +20,16 @@
# ========================================================================== # # ========================================================================== #
import contextlib
import types import types
from typing import Callable
from typing import Awaitable
from typing import Generator
from typing import AsyncGenerator from typing import AsyncGenerator
import aiohttp import aiohttp
try:
import ustreamer import ustreamer
except ImportError:
ustreamer = None
from .. import tools from .. import tools
from .. import aiotools from .. import aiotools
@ -60,12 +60,24 @@ class BaseStreamerClient:
def get_format(self) -> int: def get_format(self) -> int:
raise NotImplementedError() raise NotImplementedError()
async def read_stream(self) -> AsyncGenerator[dict, None]: @contextlib.asynccontextmanager
async def reading(self) -> AsyncGenerator[Callable[[bool], Awaitable[dict]], None]:
if self is not None: # XXX: Vulture and pylint hack if self is not None: # XXX: Vulture and pylint hack
raise NotImplementedError() raise NotImplementedError()
yield yield
# =====
@contextlib.contextmanager
def _http_handle_errors() -> Generator[None, None, None]:
try:
yield
except Exception as err: # Тут бывают и ассерты, и KeyError, и прочая херня
if isinstance(err, StreamerTempError):
raise
raise StreamerTempError(tools.efmt(err))
class HttpStreamerClient(BaseStreamerClient): class HttpStreamerClient(BaseStreamerClient):
def __init__( def __init__(
self, self,
@ -83,8 +95,9 @@ class HttpStreamerClient(BaseStreamerClient):
def get_format(self) -> int: def get_format(self) -> int:
return StreamFormats.JPEG return StreamFormats.JPEG
async def read_stream(self) -> AsyncGenerator[dict, None]: @contextlib.asynccontextmanager
try: async def reading(self) -> AsyncGenerator[Callable[[bool], Awaitable[dict]], None]:
with _http_handle_errors():
async with self.__make_http_session() as session: async with self.__make_http_session() as session:
async with session.get( async with session.get(
url=self.__make_url("stream"), url=self.__make_url("stream"),
@ -94,27 +107,26 @@ class HttpStreamerClient(BaseStreamerClient):
reader = aiohttp.MultipartReader.from_response(response) reader = aiohttp.MultipartReader.from_response(response)
self.__patch_stream_reader(reader.resp.content) self.__patch_stream_reader(reader.resp.content)
while True: async def read_frame(key_required: bool) -> dict:
_ = key_required
with _http_handle_errors():
frame = await reader.next() # pylint: disable=not-callable frame = await reader.next() # pylint: disable=not-callable
if not isinstance(frame, aiohttp.BodyPartReader): if not isinstance(frame, aiohttp.BodyPartReader):
raise StreamerTempError("Expected body part") raise StreamerTempError("Expected body part")
data = bytes(await frame.read()) data = bytes(await frame.read())
if not data: if not data:
break raise StreamerTempError("Reached EOF")
yield { return {
"online": (frame.headers["X-UStreamer-Online"] == "true"), "online": (frame.headers["X-UStreamer-Online"] == "true"),
"width": int(frame.headers["X-UStreamer-Width"]), "width": int(frame.headers["X-UStreamer-Width"]),
"height": int(frame.headers["X-UStreamer-Height"]), "height": int(frame.headers["X-UStreamer-Height"]),
"data": data, "data": data,
"format": StreamFormats.JPEG, "format": StreamFormats.JPEG,
} }
except Exception as err: # Тут бывают и ассерты, и KeyError, и прочая херня
if isinstance(err, StreamerTempError): yield read_frame
raise
raise StreamerTempError(tools.efmt(err))
raise StreamerTempError("Reached EOF")
def __make_http_session(self) -> aiohttp.ClientSession: def __make_http_session(self) -> aiohttp.ClientSession:
kwargs: dict = { kwargs: dict = {
@ -148,6 +160,19 @@ class HttpStreamerClient(BaseStreamerClient):
return f"HttpStreamerClient({self.__name})" return f"HttpStreamerClient({self.__name})"
# =====
@contextlib.contextmanager
def _memsink_handle_errors() -> Generator[None, None, None]:
try:
yield
except StreamerPermError:
raise
except FileNotFoundError as err:
raise StreamerTempError(tools.efmt(err))
except Exception as err:
raise StreamerPermError(tools.efmt(err))
class MemsinkStreamerClient(BaseStreamerClient): class MemsinkStreamerClient(BaseStreamerClient):
def __init__( def __init__(
self, self,
@ -171,25 +196,19 @@ class MemsinkStreamerClient(BaseStreamerClient):
def get_format(self) -> int: def get_format(self) -> int:
return self.__fmt return self.__fmt
async def read_stream(self) -> AsyncGenerator[dict, None]: @contextlib.asynccontextmanager
if ustreamer is None: async def reading(self) -> AsyncGenerator[Callable[[bool], Awaitable[dict]], None]:
raise StreamerPermError("Missing ustreamer library") with _memsink_handle_errors():
try:
with ustreamer.Memsink(**self.__kwargs) as sink: with ustreamer.Memsink(**self.__kwargs) as sink:
key_required = (self.__fmt == StreamFormats.H264) async def read_frame(key_required: bool) -> dict:
key_required = (key_required and self.__fmt == StreamFormats.H264)
with _memsink_handle_errors():
while True: while True:
frame = await aiotools.run_async(sink.wait_frame, key_required) frame = await aiotools.run_async(sink.wait_frame, key_required)
if frame is not None: if frame is not None:
self.__check_format(frame["format"]) self.__check_format(frame["format"])
if frame["key"]: return frame
key_required = False yield read_frame
yield frame
except StreamerPermError:
raise
except FileNotFoundError as err:
raise StreamerTempError(tools.efmt(err))
except Exception as err:
raise StreamerPermError(tools.efmt(err))
def __check_format(self, fmt: int) -> None: def __check_format(self, fmt: int) -> None:
if fmt == StreamFormats._MJPEG: # pylint: disable=protected-access if fmt == StreamFormats._MJPEG: # pylint: disable=protected-access