From cc6f7c417e1acd842d7b68fd027db8a63a705127 Mon Sep 17 00:00:00 2001 From: Devaev Maxim Date: Sat, 17 Oct 2020 19:16:08 +0300 Subject: [PATCH] pikvm/pikvm#92: attempt to fix "Multiple access in eof state" --- kvmd/clients/streamer.py | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/kvmd/clients/streamer.py b/kvmd/clients/streamer.py index 1b3f9433..1a86b4f9 100644 --- a/kvmd/clients/streamer.py +++ b/kvmd/clients/streamer.py @@ -20,6 +20,8 @@ # ========================================================================== # +import types + from typing import Tuple from typing import Dict from typing import AsyncGenerator @@ -35,6 +37,20 @@ class StreamerError(Exception): # ===== +def _patch_stream_reader(reader: aiohttp.StreamReader) -> None: + # https://github.com/pikvm/pikvm/issues/92 + # Infinite looping in BodyPartReader.read() because _at_eof flag. + + orig_read = reader.read + + async def read(self: aiohttp.StreamReader, n: int=-1) -> bytes: # pylint: disable=invalid-name + if self.is_eof(): + raise StreamerError("StreamReader.read(): Reached EOF") + return (await orig_read(n)) + + reader.read = types.MethodType(read, reader) # type: ignore + + class StreamerClient: def __init__( self, @@ -61,15 +77,13 @@ class StreamerClient: ) as response: htclient.raise_not_200(response) reader = aiohttp.MultipartReader.from_response(response) + _patch_stream_reader(reader.resp.content) while True: frame = await reader.next() # pylint: disable=not-callable if not isinstance(frame, aiohttp.BodyPartReader): - raise RuntimeError("Expected body part") + raise StreamerError("Expected body part") - if hasattr(frame, "_content"): - if frame._content.is_eof(): # pylint: disable=protected-access - break data = bytes(await frame.read()) if not data: break @@ -80,7 +94,9 @@ class StreamerClient: int(frame.headers["X-UStreamer-Height"]), data, ) - except Exception as err: # Тут бывают и ассерты, и KeyError, и прочая херня из-за корявых исключений в MultipartReader + except Exception as err: # Тут бывают и ассерты, и KeyError, и прочая херня + if isinstance(err, StreamerError): + raise raise StreamerError(f"{type(err).__name__}: {err}") raise StreamerError("Reached EOF")