common background systasks

This commit is contained in:
Devaev Maxim 2020-09-05 09:26:55 +03:00
parent e162d84d56
commit 482eeec3e7
3 changed files with 20 additions and 43 deletions

View File

@ -112,13 +112,14 @@ class _Component:
obj: object obj: object
get_state: Optional[Callable[[], Coroutine[Any, Any, Dict]]] = None get_state: Optional[Callable[[], Coroutine[Any, Any, Dict]]] = None
poll_state: Optional[Callable[[], AsyncGenerator[Dict, None]]] = None poll_state: Optional[Callable[[], AsyncGenerator[Dict, None]]] = None
systask: Optional[Callable[[], Coroutine[Any, Any, None]]] = None
cleanup: Optional[Callable[[], Coroutine[Any, Any, Dict]]] = None cleanup: Optional[Callable[[], Coroutine[Any, Any, Dict]]] = None
def __post_init__(self) -> None: def __post_init__(self) -> None:
if isinstance(self.obj, BasePlugin): if isinstance(self.obj, BasePlugin):
object.__setattr__(self, "name", f"{self.name} ({self.obj.get_plugin_name()})") object.__setattr__(self, "name", f"{self.name} ({self.obj.get_plugin_name()})")
for field in ["get_state", "poll_state", "cleanup"]: for field in ["get_state", "poll_state", "systask", "cleanup"]:
object.__setattr__(self, field, getattr(self.obj, field, None)) object.__setattr__(self, field, getattr(self.obj, field, None))
if self.get_state or self.poll_state: if self.get_state or self.poll_state:
assert self.event_type, self assert self.event_type, self
@ -288,6 +289,8 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
self.__run_system_task(self.__stream_controller) self.__run_system_task(self.__stream_controller)
for component in self.__components: for component in self.__components:
if component.systask:
self.__run_system_task(component.systask)
if component.poll_state: if component.poll_state:
self.__run_system_task(self.__poll_state, component.event_type, component.poll_state()) self.__run_system_task(self.__poll_state, component.event_type, component.poll_state())
self.__run_system_task(self.__stream_snapshoter) self.__run_system_task(self.__stream_snapshoter)

View File

@ -201,29 +201,16 @@ class UserGpio:
} }
async def poll_state(self) -> AsyncGenerator[Dict, None]: async def poll_state(self) -> AsyncGenerator[Dict, None]:
reader_task = asyncio.create_task(self.__reader.poll())
waiter_task: Optional[asyncio.Task] = None
prev_state: Dict = {} prev_state: Dict = {}
try:
while True: while True:
if reader_task.cancelled():
break
if reader_task.done():
RuntimeError("BatchReader task is dead")
state = await self.get_state() state = await self.get_state()
if state != prev_state: if state != prev_state:
yield state yield state
prev_state = state prev_state = state
await self.__state_notifier.wait()
if waiter_task is None: async def systask(self) -> None:
waiter_task = asyncio.create_task(self.__state_notifier.wait()) await self.__reader.poll()
if waiter_task in (await aiotools.wait_first(reader_task, waiter_task))[0]:
waiter_task = None
finally:
if not reader_task.done():
reader_task.cancel()
await reader_task
async def cleanup(self) -> None: async def cleanup(self) -> None:
for gout in self.__outputs.values(): for gout in self.__outputs.values():

View File

@ -206,29 +206,16 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
} }
async def poll_state(self) -> AsyncGenerator[Dict, None]: async def poll_state(self) -> AsyncGenerator[Dict, None]:
inotify_task = asyncio.create_task(self.__watch_inotify())
waiter_task: Optional[asyncio.Task] = None
prev_state: Dict = {} prev_state: Dict = {}
try:
while True: while True:
if inotify_task.cancelled():
break
if inotify_task.done():
RuntimeError("Inotify task is dead")
state = await self.get_state() state = await self.get_state()
if state != prev_state: if state != prev_state:
yield state yield state
prev_state = state prev_state = state
await self.__state_notifier.wait()
if waiter_task is None: async def systask(self) -> None:
waiter_task = asyncio.create_task(self.__state_notifier.wait()) await self.__watch_inotify()
if waiter_task in (await aiotools.wait_first(inotify_task, waiter_task))[0]:
waiter_task = None
finally:
if not inotify_task.done():
inotify_task.cancel()
await inotify_task
@aiotools.atomic @aiotools.atomic
async def reset(self) -> None: async def reset(self) -> None: