removed busyloop from stream controller

This commit is contained in:
Devaev Maxim 2020-03-01 00:37:25 +03:00
parent 75d9b858d7
commit cae9ad9a21
3 changed files with 22 additions and 1 deletions

View File

@ -56,6 +56,7 @@ class AioProcessNotifier:
return False
# =====
class AioSharedFlags:
def __init__(
self,

View File

@ -22,6 +22,7 @@
import os
import asyncio
import asyncio.queues
import functools
import contextlib
import types
@ -158,3 +159,17 @@ class AioExclusiveRegion:
_tb: types.TracebackType,
) -> None:
self.exit()
# =====
class AioNotifier:
def __init__(self) -> None:
self.__queue: asyncio.queues.Queue = asyncio.Queue()
async def notify(self) -> None:
await self.__queue.put(None)
async def wait(self) -> None:
await self.__queue.get()
while not self.__queue.empty():
await self.__queue.get()

View File

@ -148,6 +148,7 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
self.__system_tasks: List[asyncio.Task] = []
self.__streamer_notifier = aiotools.AioNotifier()
self.__reset_streamer = False
self.__new_streamer_params: Dict = {}
@ -210,11 +211,13 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
value = request.query.get(name)
if value:
self.__new_streamer_params[name] = validator(value)
await self.__streamer_notifier.notify()
return make_json_response()
@exposed_http("POST", "/streamer/reset")
async def __streamer_reset_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
self.__reset_streamer = True
await self.__streamer_notifier.notify()
return make_json_response()
# ===== WEBSOCKET
@ -384,6 +387,7 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
self.__sockets.add(ws)
remote: Optional[str] = (ws._req.remote if ws._req is not None else None) # pylint: disable=protected-access
get_logger().info("Registered new client socket: remote=%s; id=%d; active=%d", remote, id(ws), len(self.__sockets))
await self.__streamer_notifier.notify()
async def __remove_socket(self, ws: aiohttp.web.WebSocketResponse) -> None:
async with self.__sockets_lock:
@ -397,6 +401,7 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
raise
except Exception:
pass
await self.__streamer_notifier.notify()
# ===== SYSTEM TASKS
@ -420,7 +425,7 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
self.__reset_streamer = False
prev = cur
await asyncio.sleep(0.1)
await self.__streamer_notifier.wait()
async def __poll_state(self, event_type: _Events, poller: AsyncGenerator[Dict, None]) -> None:
async for state in poller: