accumulate h264 frames

This commit is contained in:
Devaev Maxim 2021-02-03 06:46:53 +03:00
parent 78fc3869f2
commit a0601faafb
3 changed files with 44 additions and 31 deletions

View File

@ -68,7 +68,7 @@ depends=(
iproute2 iproute2
dnsmasq dnsmasq
"raspberrypi-io-access>=0.5" "raspberrypi-io-access>=0.5"
"ustreamer>=3.12" "ustreamer>=3.15"
# Avoid dhcpcd stack trace # Avoid dhcpcd stack trace
dhclient dhclient

View File

@ -123,6 +123,7 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
self.__fb_requested = False self.__fb_requested = False
self.__fb_stub: Optional[Tuple[int, str]] = None self.__fb_stub: Optional[Tuple[int, str]] = None
self.__fb_h264_data = b""
# Эти состояния шарить не обязательно - бекенд исключает дублирующиеся события. # Эти состояния шарить не обязательно - бекенд исключает дублирующиеся события.
# Все это нужно только чтобы не посылать лишние жсоны в сокет KVMD # Все это нужно только чтобы не посылать лишние жсоны в сокет KVMD
@ -195,12 +196,12 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
while True: while True:
try: try:
streaming = False streaming = False
async for (online, width, height, data) in streamer.read_stream(): async for frame in streamer.read_stream():
if not streaming: if not streaming:
logger.info("[streamer] %s: Streaming ...", self._remote) logger.info("[streamer] %s: Streaming ...", self._remote)
streaming = True streaming = True
if online: if frame["online"]:
await self.__send_fb_real(width, height, data, streamer.get_format()) await self.__send_fb_real(frame, streamer.get_format())
else: else:
await self.__send_fb_stub("No signal") await self.__send_fb_stub("No signal")
except StreamerError as err: except StreamerError as err:
@ -229,29 +230,47 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
get_logger(0).info("[streamer] %s: Using default %s", self._remote, streamer) get_logger(0).info("[streamer] %s: Using default %s", self._remote, streamer)
return streamer return streamer
async def __send_fb_real(self, width: int, height: int, data: bytes, fmt: int) -> None: async def __send_fb_real(self, frame: Dict, fmt: int) -> None:
async with self.__lock: async with self.__lock:
if self.__fb_requested: if self.__fb_requested:
if not (await self.__resize_fb_unsafe(frame)):
return
if fmt == StreamFormats.JPEG:
await self._send_fb_jpeg(frame["data"])
elif fmt == StreamFormats.H264:
if not self._encodings.has_h264:
self.__fb_h264_data = b""
raise StreamerPermError("The client doesn't want to accept H264 anymore")
await self._send_fb_h264(self.__fb_h264_data)
else:
raise RuntimeError(f"Unknown format: {fmt}")
self.__fb_stub = None
self.__fb_requested = False
self.__fb_h264_data = b""
elif self._encodings.has_h264 and fmt == StreamFormats.H264:
if frame["key"]:
self.__fb_h264_data = frame["data"]
elif len(self.__fb_h264_data) + len(frame["data"]) > 4194304: # 4Mb
get_logger(0).info("Accumulated H264 buffer is too big; resetting ...")
self.__fb_h264_data = frame["data"]
else:
self.__fb_h264_data += frame["data"]
async def __resize_fb_unsafe(self, frame: Dict) -> bool:
width = frame["width"]
height = frame["height"]
if self._width != width or self._height != height: if self._width != width or self._height != height:
self.__shared_params.width = width self.__shared_params.width = width
self.__shared_params.height = height self.__shared_params.height = height
if not self._encodings.has_resize: if not self._encodings.has_resize:
msg = f"Resoultion changed: {self._width}x{self._height} -> {width}x{height}\nPlease reconnect" msg = f"Resoultion changed: {self._width}x{self._height} -> {width}x{height}\nPlease reconnect"
await self.__send_fb_stub(msg, no_lock=True) await self.__send_fb_stub(msg, no_lock=True)
return return False
await self._send_resize(width, height) await self._send_resize(width, height)
return True
if fmt == StreamFormats.JPEG:
await self._send_fb_jpeg(data)
elif fmt == StreamFormats.H264:
if not self._encodings.has_h264:
raise StreamerPermError("The client doesn't want to accept H264 anymore")
await self._send_fb_h264(data)
else:
raise RuntimeError(f"Unknown format: {fmt}")
self.__fb_stub = None
self.__fb_requested = False
async def __send_fb_stub(self, text: str, no_lock: bool=False) -> None: async def __send_fb_stub(self, text: str, no_lock: bool=False) -> None:
if not no_lock: if not no_lock:

View File

@ -22,7 +22,6 @@
import types import types
from typing import Tuple
from typing import Dict from typing import Dict
from typing import AsyncGenerator from typing import AsyncGenerator
@ -62,7 +61,7 @@ class BaseStreamerClient:
def get_format(self) -> int: def get_format(self) -> int:
raise NotImplementedError() raise NotImplementedError()
async def read_stream(self) -> AsyncGenerator[Tuple[bool, int, int, bytes], None]: async def read_stream(self) -> AsyncGenerator[Dict, None]:
if self is not None: # XXX: Vulture and pylint hack if self is not None: # XXX: Vulture and pylint hack
raise NotImplementedError() raise NotImplementedError()
yield yield
@ -90,7 +89,7 @@ class HttpStreamerClient(BaseStreamerClient):
def get_format(self) -> int: def get_format(self) -> int:
return StreamFormats.JPEG return StreamFormats.JPEG
async def read_stream(self) -> AsyncGenerator[Tuple[bool, int, int, bytes], None]: async def read_stream(self) -> AsyncGenerator[Dict, None]:
try: try:
async with self.__make_http_session() as session: async with self.__make_http_session() as session:
async with session.get( async with session.get(
@ -110,12 +109,12 @@ class HttpStreamerClient(BaseStreamerClient):
if not data: if not data:
break break
yield ( yield {
(frame.headers["X-UStreamer-Online"] == "true"), "online": (frame.headers["X-UStreamer-Online"] == "true"),
int(frame.headers["X-UStreamer-Width"]), "width": int(frame.headers["X-UStreamer-Width"]),
int(frame.headers["X-UStreamer-Height"]), "height": int(frame.headers["X-UStreamer-Height"]),
data, "data": data,
) }
except Exception as err: # Тут бывают и ассерты, и KeyError, и прочая херня except Exception as err: # Тут бывают и ассерты, и KeyError, и прочая херня
if isinstance(err, StreamerTempError): if isinstance(err, StreamerTempError):
raise raise
@ -178,7 +177,7 @@ class MemsinkStreamerClient(BaseStreamerClient):
def get_format(self) -> int: def get_format(self) -> int:
return self.__fmt return self.__fmt
async def read_stream(self) -> AsyncGenerator[Tuple[bool, int, int, bytes], None]: async def read_stream(self) -> AsyncGenerator[Dict, None]:
if ustreamer is None: if ustreamer is None:
raise StreamerPermError("Missing ustreamer library") raise StreamerPermError("Missing ustreamer library")
try: try:
@ -187,12 +186,7 @@ class MemsinkStreamerClient(BaseStreamerClient):
frame = await aiotools.run_async(sink.wait_frame) frame = await aiotools.run_async(sink.wait_frame)
if frame is not None: if frame is not None:
self.__check_format(frame["format"]) self.__check_format(frame["format"])
yield ( yield frame
frame["online"],
frame["width"],
frame["height"],
frame["data"],
)
except StreamerPermError: except StreamerPermError:
raise raise
except FileNotFoundError as err: except FileNotFoundError as err: