queue-based vnc fb task

This commit is contained in:
Devaev Maxim 2021-02-04 02:23:59 +03:00
parent 32bd2453eb
commit ffeb626ef8

View File

@ -26,7 +26,6 @@ import socket
import dataclasses import dataclasses
import contextlib import contextlib
from typing import Tuple
from typing import List from typing import List
from typing import Dict from typing import Dict
from typing import Union from typing import Union
@ -121,9 +120,8 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
self.__kvmd_session: Optional[KvmdClientSession] = None self.__kvmd_session: Optional[KvmdClientSession] = None
self.__kvmd_ws: Optional[KvmdClientWs] = None self.__kvmd_ws: Optional[KvmdClientWs] = None
self.__fb_requested = False self.__fb_notifier = aiotools.AioNotifier()
self.__fb_stub: Optional[Tuple[int, str]] = None self.__fb_queue: "asyncio.Queue[Dict]" = asyncio.Queue()
self.__fb_h264_data = b""
# Эти состояния шарить не обязательно - бекенд исключает дублирующиеся события. # Эти состояния шарить не обязательно - бекенд исключает дублирующиеся события.
# Все это нужно только чтобы не посылать лишние жсоны в сокет KVMD # Все это нужно только чтобы не посылать лишние жсоны в сокет KVMD
@ -141,6 +139,7 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
await self._run( await self._run(
kvmd=self.__kvmd_task_loop(), kvmd=self.__kvmd_task_loop(),
streamer=self.__streamer_task_loop(), streamer=self.__streamer_task_loop(),
fb_sendeer=self.__fb_sender_task_loop(),
) )
finally: finally:
if self.__kvmd_session: if self.__kvmd_session:
@ -201,16 +200,16 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
logger.info("[streamer] %s: Streaming ...", self._remote) logger.info("[streamer] %s: Streaming ...", self._remote)
streaming = True streaming = True
if frame["online"]: if frame["online"]:
await self.__send_fb_real(frame) await self.__queue_fb_real(frame)
else: else:
await self.__send_fb_stub("No signal") await self.__queue_fb_stub("No signal")
except StreamerError as err: except StreamerError as err:
if isinstance(err, StreamerPermError): if isinstance(err, StreamerPermError):
streamer = self.__get_default_streamer() streamer = self.__get_default_streamer()
logger.info("[streamer] %s: Permanent error: %s; switching to %s ...", self._remote, err, streamer) logger.info("[streamer] %s: Permanent error: %s; switching to %s ...", self._remote, err, streamer)
else: else:
logger.info("[streamer] %s: Waiting for stream: %s", self._remote, err) logger.info("[streamer] %s: Waiting for stream: %s", self._remote, err)
await self.__send_fb_stub("Waiting for stream ...") await self.__queue_fb_stub("Waiting for stream ...")
await asyncio.sleep(1) await asyncio.sleep(1)
def __get_preferred_streamer(self) -> BaseStreamerClient: def __get_preferred_streamer(self) -> BaseStreamerClient:
@ -230,65 +229,68 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
get_logger(0).info("[streamer] %s: Using default %s", self._remote, streamer) get_logger(0).info("[streamer] %s: Using default %s", self._remote, streamer)
return streamer return streamer
async def __send_fb_real(self, frame: Dict) -> None: async def __queue_fb_real(self, frame: Dict) -> None:
async with self.__lock: await self.__fb_queue.put(frame)
if self.__fb_requested:
if not (await self.__resize_fb_unsafe(frame)):
return
if frame["format"] == StreamFormats.JPEG: async def __queue_fb_stub(self, text: str) -> None:
await self._send_fb_jpeg(frame["data"]) await self.__fb_queue.put(await self.__make_stub_frame(text))
elif frame["format"] == StreamFormats.H264:
async def __make_stub_frame(self, text: str) -> Dict:
return {
"data": (await make_text_jpeg(self._width, self._height, self._encodings.tight_jpeg_quality, text)),
"width": self._width,
"height": self._height,
"format": StreamFormats.JPEG,
}
async def __fb_sender_task_loop(self) -> None:
while True:
await self.__fb_notifier.wait()
last: Optional[Dict] = None
while True:
frame = await self.__fb_queue.get()
if (
last is None # pylint: disable=too-many-boolean-expressions
or frame["format"] == StreamFormats.JPEG
or last["format"] != frame["format"]
or (frame["format"] == StreamFormats.H264 and (
frame["key"]
or last["width"] != frame["width"]
or last["height"] != frame["height"]
or len(last["data"]) + len(frame["data"]) > 4194304
))
):
last = frame
if self.__fb_queue.qsize() == 0:
break
continue
assert frame["format"] == StreamFormats.H264
last["data"] += frame["data"]
if self.__fb_queue.qsize() == 0:
break
async with self.__lock:
if self._width != last["width"] or self._height != last["height"]:
self.__shared_params.width = last["width"]
self.__shared_params.height = last["height"]
if not self._encodings.has_resize:
msg = (
f"Resoultion changed: {self._width}x{self._height}"
f" -> {last['width']}x{last['height']}\nPlease reconnect"
)
await self._send_fb_jpeg(await self.__make_stub_frame(msg))
return
await self._send_resize(last["width"], last["height"])
if last["format"] == StreamFormats.JPEG:
await self._send_fb_jpeg(last["data"])
elif last["format"] == StreamFormats.H264:
if not self._encodings.has_h264: if not self._encodings.has_h264:
self.__fb_h264_data = b"" raise RfbError("The client doesn't want to accept H264 anymore")
raise StreamerPermError("The client doesn't want to accept H264 anymore") await self._send_fb_h264(last["data"])
self.__append_h264_data(frame)
await self._send_fb_h264(self.__fb_h264_data)
else: else:
raise RuntimeError(f"Unknown format: {frame['format']}") raise RuntimeError(f"Unknown format: {last['format']}")
self.__fb_stub = None
self.__fb_requested = False
self.__fb_h264_data = b""
elif self._encodings.has_h264 and frame["format"] == StreamFormats.H264:
self.__append_h264_data(frame)
async def __resize_fb_unsafe(self, frame: Dict) -> bool:
width = frame["width"]
height = frame["height"]
if self._width != width or self._height != height:
self.__shared_params.width = width
self.__shared_params.height = height
if not self._encodings.has_resize:
msg = f"Resoultion changed: {self._width}x{self._height} -> {width}x{height}\nPlease reconnect"
await self.__send_fb_stub(msg, no_lock=True)
return False
await self._send_resize(width, height)
return True
def __append_h264_data(self, frame: Dict) -> None:
if frame["key"]:
self.__fb_h264_data = frame["data"]
elif len(self.__fb_h264_data) + len(frame["data"]) > 4194304: # 4Mb
get_logger(0).info("Accumulated H264 buffer is too big; resetting ...")
self.__fb_h264_data = frame["data"]
else:
self.__fb_h264_data += frame["data"]
async def __send_fb_stub(self, text: str, no_lock: bool=False) -> None:
if not no_lock:
await self.__lock.acquire()
try:
quality = self._encodings.tight_jpeg_quality
if self.__fb_requested and self.__fb_stub != (quality, text):
await self._send_fb_jpeg(await make_text_jpeg(self._width, self._height, quality, text))
self.__fb_stub = (quality, text)
self.__fb_requested = False
self.__fb_h264_data = b""
finally:
if not no_lock:
self.__lock.release()
# ===== # =====
@ -386,8 +388,7 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
await self.__kvmd_session.streamer.set_params(quality, self.__desired_fps) await self.__kvmd_session.streamer.set_params(quality, self.__desired_fps)
async def _on_fb_update_request(self) -> None: async def _on_fb_update_request(self) -> None:
async with self.__lock: await self.__fb_notifier.notify()
self.__fb_requested = True
# ===== # =====