otg keyboard leds

This commit is contained in:
Devaev Maxim
2020-02-20 11:11:39 +03:00
parent d732b4f518
commit 6cd4a0a988
12 changed files with 302 additions and 66 deletions

View File

@@ -21,6 +21,11 @@
import asyncio
import concurrent.futures
import multiprocessing
import multiprocessing.queues
import queue
import functools
from typing import Dict
from typing import AsyncGenerator
@@ -47,15 +52,12 @@ class Plugin(BaseHid):
keyboard: Dict[str, Any],
mouse: Dict[str, Any],
noop: bool,
state_poll: float,
) -> None:
self.__keyboard_proc = KeyboardProcess(noop=noop, **keyboard)
self.__mouse_proc = MouseProcess(noop=noop, **mouse)
self.__changes_queue: multiprocessing.queues.Queue = multiprocessing.Queue()
self.__state_poll = state_poll
self.__lock = asyncio.Lock()
self.__keyboard_proc = KeyboardProcess(noop=noop, changes_queue=self.__changes_queue, **keyboard)
self.__mouse_proc = MouseProcess(noop=noop, changes_queue=self.__changes_queue, **mouse)
@classmethod
def get_plugin_options(cls) -> Dict:
@@ -66,16 +68,13 @@ class Plugin(BaseHid):
"write_retries": Option(5, type=valid_int_f1),
"write_retries_delay": Option(0.1, type=valid_float_f01),
},
"mouse": {
"device": Option("", type=valid_abs_path, unpack_as="device_path"),
"select_timeout": Option(1.0, type=valid_float_f01),
"write_retries": Option(5, type=valid_int_f1),
"write_retries_delay": Option(0.1, type=valid_float_f01),
},
"noop": Option(False, type=valid_bool),
"state_poll": Option(0.1, type=valid_float_f01),
"noop": Option(False, type=valid_bool),
}
def start(self) -> None:
@@ -83,22 +82,30 @@ class Plugin(BaseHid):
self.__mouse_proc.start()
def get_state(self) -> Dict:
keyboard_online = self.__keyboard_proc.is_online()
mouse_online = self.__mouse_proc.is_online()
keyboard_state = self.__keyboard_proc.get_state()
mouse_state = self.__mouse_proc.get_state()
return {
"online": (keyboard_online and mouse_online),
"keyboard": {"online": keyboard_online},
"mouse": {"online": mouse_online},
"online": (keyboard_state["online"] and mouse_state["online"]),
"keyboard": {"features": {"leds": True}, **keyboard_state},
"mouse": mouse_state,
}
async def poll_state(self) -> AsyncGenerator[Dict, None]:
prev_state: Dict = {}
while self.__keyboard_proc.is_alive() and self.__mouse_proc.is_alive():
state = self.get_state()
if state != prev_state:
yield self.get_state()
prev_state = state
await asyncio.sleep(self.__state_poll)
loop = asyncio.get_running_loop()
wait_for_changes = functools.partial(self.__changes_queue.get, timeout=1)
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
prev_state: Dict = {}
while True:
state = self.get_state()
if state != prev_state:
yield state
prev_state = state
while True:
try:
await loop.run_in_executor(executor, wait_for_changes)
break
except queue.Empty:
pass
async def reset(self) -> None:
self.__keyboard_proc.send_reset_event()