many fixes for h264 chain

This commit is contained in:
Devaev Maxim 2021-01-26 06:28:30 +03:00
parent a0ae387a6c
commit 4c32ce01ad
3 changed files with 81 additions and 26 deletions

View File

@ -24,8 +24,10 @@ from typing import List
from typing import Optional from typing import Optional
from ...clients.kvmd import KvmdClient from ...clients.kvmd import KvmdClient
from ...clients.streamer import StreamerHttpClient from ...clients.streamer import StreamFormats
from ...clients.streamer import StreamerMemsinkClient from ...clients.streamer import BaseStreamerClient
from ...clients.streamer import HttpStreamerClient
from ...clients.streamer import MemsinkStreamerClient
from ... import htclient from ... import htclient
@ -46,11 +48,17 @@ def main(argv: Optional[List[str]]=None) -> None:
user_agent = htclient.make_user_agent("KVMD-VNC") user_agent = htclient.make_user_agent("KVMD-VNC")
def make_memsink(name: str) -> Optional[StreamerMemsinkClient]: def make_memsink_streamer(name: str, fmt: int) -> Optional[MemsinkStreamerClient]:
if getattr(config.memsink, name).sink: if getattr(config.memsink, name).sink:
return StreamerMemsinkClient(name=name, **getattr(config.memsink, name)._unpack()) return MemsinkStreamerClient(name.upper(), fmt, **getattr(config.memsink, name)._unpack())
return None return None
streamers: List[BaseStreamerClient] = list(filter(None, [
make_memsink_streamer("h264", StreamFormats.H264),
make_memsink_streamer("jpeg", StreamFormats.JPEG),
HttpStreamerClient(name="JPEG", user_agent=user_agent, **config.streamer._unpack()),
]))
VncServer( VncServer(
host=config.server.host, host=config.server.host,
port=config.server.port, port=config.server.port,
@ -65,11 +73,7 @@ def main(argv: Optional[List[str]]=None) -> None:
keymap_path=config.keymap, keymap_path=config.keymap,
kvmd=KvmdClient(user_agent=user_agent, **config.kvmd._unpack()), kvmd=KvmdClient(user_agent=user_agent, **config.kvmd._unpack()),
streamers=list(filter(None, [ streamers=streamers,
make_memsink("h264"),
make_memsink("jpeg"),
StreamerHttpClient(name="jpeg", user_agent=user_agent, **config.streamer._unpack()),
])),
vnc_auth_manager=VncAuthManager(**config.auth.vncauth._unpack()), vnc_auth_manager=VncAuthManager(**config.auth.vncauth._unpack()),
**config.server.keepalive._unpack(), **config.server.keepalive._unpack(),

View File

@ -48,6 +48,7 @@ from ...clients.kvmd import KvmdClient
from ...clients.streamer import StreamerError from ...clients.streamer import StreamerError
from ...clients.streamer import StreamerPermError from ...clients.streamer import StreamerPermError
from ...clients.streamer import StreamFormats
from ...clients.streamer import BaseStreamerClient from ...clients.streamer import BaseStreamerClient
from ... import tools from ... import tools
@ -154,7 +155,7 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
try: try:
await asyncio.wait_for(self.__stage2_encodings_accepted, timeout=5) await asyncio.wait_for(self.__stage2_encodings_accepted, timeout=5)
except asyncio.TimeoutError: except asyncio.TimeoutError:
raise RfbError("No SetEncodings message recieved from the clienta in 5 secs") raise RfbError("No SetEncodings message recieved from the client in 5 secs")
assert self.__kvmd_session assert self.__kvmd_session
try: try:
@ -191,28 +192,45 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
async def __streamer_task_loop(self) -> None: async def __streamer_task_loop(self) -> None:
logger = get_logger(0) logger = get_logger(0)
await self.__stage3_ws_connected await self.__stage3_ws_connected
streamer = self.__streamers[0] streamer = self.__get_preferred_streamer()
while True: while True:
try: try:
streaming = False streaming = False
async for (online, width, height, data, h264) in streamer.read_stream(): async for (online, width, height, data) in streamer.read_stream():
if not streaming: if not streaming:
logger.info("[streamer] %s: Streaming from %s ...", self._remote, streamer) logger.info("[streamer] %s: Streaming ...", self._remote)
streaming = True streaming = True
if online: if online:
await self.__send_fb_real(width, height, data, h264) await self.__send_fb_real(width, height, data, streamer.get_format())
else: else:
await self.__send_fb_stub("No signal") await self.__send_fb_stub("No signal")
except StreamerError as err: except StreamerError as err:
if isinstance(err, StreamerPermError): if isinstance(err, StreamerPermError):
streamer = self.__streamers[-1] streamer = self.__get_default_streamer()
logger.info("[streamer] %s: Permanent error: %s; switching to %s ...", self._remote, err, streamer) logger.info("[streamer] %s: Permanent error: %s; switching to %s ...", self._remote, err, streamer)
else: else:
logger.info("[streamer] %s: Waiting for stream: %s", self._remote, err) logger.info("[streamer] %s: Waiting for stream: %s", self._remote, err)
await self.__send_fb_stub("Waiting for stream ...") await self.__send_fb_stub("Waiting for stream ...")
await asyncio.sleep(1) await asyncio.sleep(1)
async def __send_fb_real(self, width: int, height: int, data: bytes, h264: bool) -> None: def __get_preferred_streamer(self) -> BaseStreamerClient:
formats = {
StreamFormats.JPEG: "has_tight",
StreamFormats.H264: "has_h264",
}
streamer: Optional[BaseStreamerClient] = None
for streamer in self.__streamers:
if getattr(self._encodings, formats[streamer.get_format()]):
get_logger(0).info("[streamer] %s: Using preferred %s", self._remote, streamer)
return streamer
raise RuntimeError("No streamers found")
def __get_default_streamer(self) -> BaseStreamerClient:
streamer = self.__streamers[-1]
get_logger(0).info("[streamer] %s: Using default %s", self._remote, streamer)
return streamer
async def __send_fb_real(self, width: int, height: int, data: bytes, fmt: int) -> None:
async with self.__lock: async with self.__lock:
if self.__fb_requested: if self.__fb_requested:
if self._width != width or self._height != height: if self._width != width or self._height != height:
@ -223,7 +241,16 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
await self.__send_fb_stub(msg, no_lock=True) await self.__send_fb_stub(msg, no_lock=True)
return return
await self._send_resize(width, height) await self._send_resize(width, height)
await (self._send_fb_h264 if h264 else self._send_fb_jpeg)(data)
if fmt == StreamFormats.JPEG:
await self._send_fb_jpeg(data)
elif fmt == StreamFormats.H264:
if not self._encodings.has_h264:
raise StreamerPermError("The client doesn't want to accept H264 anymore")
await self._send_fb_h264(data)
else:
raise RuntimeError(f"Unknown format: {fmt}")
self.__fb_stub = None self.__fb_stub = None
self.__fb_requested = False self.__fb_requested = False

View File

@ -52,14 +52,23 @@ class StreamerPermError(StreamerError):
# ===== # =====
class StreamFormats:
JPEG = 1195724874 # V4L2_PIX_FMT_JPEG
H264 = 875967048 # V4L2_PIX_FMT_H264
_MJPEG = 1196444237 # V4L2_PIX_FMT_MJPEG
class BaseStreamerClient: class BaseStreamerClient:
async def read_stream(self) -> AsyncGenerator[Tuple[bool, int, int, bytes, bool], None]: def get_format(self) -> int:
raise NotImplementedError()
async def read_stream(self) -> AsyncGenerator[Tuple[bool, int, int, bytes], None]:
if self is not None: # XXX: Vulture and pylint hack if self is not None: # XXX: Vulture and pylint hack
raise NotImplementedError() raise NotImplementedError()
yield yield
class StreamerHttpClient(BaseStreamerClient): class HttpStreamerClient(BaseStreamerClient):
def __init__( def __init__(
self, self,
name: str, name: str,
@ -78,7 +87,10 @@ class StreamerHttpClient(BaseStreamerClient):
self.__timeout = timeout self.__timeout = timeout
self.__user_agent = user_agent self.__user_agent = user_agent
async def read_stream(self) -> AsyncGenerator[Tuple[bool, int, int, bytes, bool], None]: def get_format(self) -> int:
return StreamFormats.JPEG
async def read_stream(self) -> AsyncGenerator[Tuple[bool, int, int, bytes], None]:
try: try:
async with self.__make_http_session() as session: async with self.__make_http_session() as session:
async with session.get( async with session.get(
@ -103,7 +115,6 @@ class StreamerHttpClient(BaseStreamerClient):
int(frame.headers["X-UStreamer-Width"]), int(frame.headers["X-UStreamer-Width"]),
int(frame.headers["X-UStreamer-Height"]), int(frame.headers["X-UStreamer-Height"]),
data, data,
False,
) )
except Exception as err: # Тут бывают и ассерты, и KeyError, и прочая херня except Exception as err: # Тут бывают и ассерты, и KeyError, и прочая херня
if isinstance(err, StreamerTempError): if isinstance(err, StreamerTempError):
@ -141,13 +152,14 @@ class StreamerHttpClient(BaseStreamerClient):
reader.read = types.MethodType(read, reader) # type: ignore reader.read = types.MethodType(read, reader) # type: ignore
def __str__(self) -> str: def __str__(self) -> str:
return f"StreamerHttpClient({self.__name})" return f"HttpStreamerClient({self.__name})"
class StreamerMemsinkClient(BaseStreamerClient): class MemsinkStreamerClient(BaseStreamerClient):
def __init__( def __init__(
self, self,
name: str, name: str,
fmt: int,
obj: str, obj: str,
lock_timeout: float, lock_timeout: float,
wait_timeout: float, wait_timeout: float,
@ -155,6 +167,7 @@ class StreamerMemsinkClient(BaseStreamerClient):
) -> None: ) -> None:
self.__name = name self.__name = name
self.__fmt = fmt
self.__kwargs: Dict = { self.__kwargs: Dict = {
"obj": obj, "obj": obj,
"lock_timeout": lock_timeout, "lock_timeout": lock_timeout,
@ -162,7 +175,10 @@ class StreamerMemsinkClient(BaseStreamerClient):
"drop_same_frames": drop_same_frames, "drop_same_frames": drop_same_frames,
} }
async def read_stream(self) -> AsyncGenerator[Tuple[bool, int, int, bytes, bool], None]: def get_format(self) -> int:
return self.__fmt
async def read_stream(self) -> AsyncGenerator[Tuple[bool, int, int, bytes], None]:
if ustreamer is None: if ustreamer is None:
raise StreamerPermError("Missing ustreamer library") raise StreamerPermError("Missing ustreamer library")
try: try:
@ -170,17 +186,25 @@ class StreamerMemsinkClient(BaseStreamerClient):
while True: while True:
frame = await aiotools.run_async(sink.wait_frame) frame = await aiotools.run_async(sink.wait_frame)
if frame is not None: if frame is not None:
self.__check_format(frame["format"])
yield ( yield (
frame["online"], frame["online"],
frame["width"], frame["width"],
frame["height"], frame["height"],
frame["data"], frame["data"],
(frame["format"] == 875967048), # V4L2_PIX_FMT_H264
) )
except StreamerPermError:
raise
except FileNotFoundError as err: except FileNotFoundError as err:
raise StreamerTempError(tools.efmt(err)) raise StreamerTempError(tools.efmt(err))
except Exception as err: except Exception as err:
raise StreamerPermError(tools.efmt(err)) raise StreamerPermError(tools.efmt(err))
def __check_format(self, fmt: int) -> None:
if fmt == StreamFormats._MJPEG: # pylint: disable=protected-access
fmt = StreamFormats.JPEG
if fmt != self.__fmt:
raise StreamerPermError("Invalid sink format")
def __str__(self) -> str: def __str__(self) -> str:
return f"StreamerMemsinkClient({self.__name})" return f"MemsinkStreamerClient({self.__name})"