refactoring

This commit is contained in:
Devaev Maxim 2020-11-22 05:48:55 +03:00
parent 6a8ee1a114
commit 48550d2e78
3 changed files with 34 additions and 17 deletions

View File

@ -55,8 +55,6 @@ from .gpio import Gpio
from .proto import REQUEST_PING
from .proto import REQUEST_REPEAT
from .proto import RESPONSE_LEGACY_OK
from .proto import KEYBOARD_CODES_TO_NAMES
from .proto import MOUSE_CODES_TO_NAMES
from .proto import BaseEvent
from .proto import SetKeyboardOutputEvent
from .proto import SetMouseOutputEvent
@ -66,6 +64,8 @@ from .proto import MouseButtonEvent
from .proto import MouseMoveEvent
from .proto import MouseRelativeEvent
from .proto import MouseWheelEvent
from .proto import get_active_keyboard
from .proto import get_active_mouse
from .proto import check_response
@ -163,7 +163,8 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
features = state["status"] & 0xFF
absolute = True
if online and (outputs & 0b00111000) in [0b00010000, 0b00011000]:
active_mouse = get_active_mouse(outputs)
if online and active_mouse in ["usb_rel", "ps2"]:
absolute = False
keyboard_outputs: Dict = {"available": {}, "active": ""}
@ -179,13 +180,12 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
keyboard_outputs["available"]["ps2"] = {"name": "PS/2"}
mouse_outputs["available"]["ps2"] = {"name": "PS/2"}
active = KEYBOARD_CODES_TO_NAMES.get(outputs & 0b00000111, "")
if active in keyboard_outputs["available"]:
keyboard_outputs["active"] = active
active_keyboard = get_active_keyboard(outputs)
if active_keyboard in keyboard_outputs["available"]:
keyboard_outputs["active"] = active_keyboard
active = MOUSE_CODES_TO_NAMES.get(outputs & 0b00111000, "")
if active in mouse_outputs["available"]:
mouse_outputs["active"] = active
if active_mouse in mouse_outputs["available"]:
mouse_outputs["active"] = active_mouse
return {
"online": online,

View File

@ -25,6 +25,8 @@ import struct
from ....keyboard.mappings import KEYMAP
from .... import tools
# =====
class BaseEvent:
@ -32,11 +34,16 @@ class BaseEvent:
raise NotImplementedError
KEYBOARD_NAMES_TO_CODES = {
# =====
_KEYBOARD_NAMES_TO_CODES = {
"usb": 0b00000001,
"ps2": 0b00000011,
}
KEYBOARD_CODES_TO_NAMES = {value: key for (key, value) in KEYBOARD_NAMES_TO_CODES.items()}
_KEYBOARD_CODES_TO_NAMES = tools.swapped_kvs(_KEYBOARD_NAMES_TO_CODES)
def get_active_keyboard(outputs: int) -> str:
return _KEYBOARD_CODES_TO_NAMES.get(outputs & 0b00000111, "")
@dataclasses.dataclass(frozen=True)
@ -44,19 +51,24 @@ class SetKeyboardOutputEvent(BaseEvent):
keyboard: str
def __post_init__(self) -> None:
assert not self.keyboard or self.keyboard in KEYBOARD_NAMES_TO_CODES
assert not self.keyboard or self.keyboard in _KEYBOARD_NAMES_TO_CODES
def make_request(self) -> bytes:
code = KEYBOARD_NAMES_TO_CODES.get(self.keyboard, 0)
code = _KEYBOARD_NAMES_TO_CODES.get(self.keyboard, 0)
return _make_request(struct.pack(">BBxxx", 0x03, code))
MOUSE_NAMES_TO_CODES = {
# =====
_MOUSE_NAMES_TO_CODES = {
"usb": 0b00001000,
"usb_rel": 0b00010000,
"ps2": 0b00011000,
}
MOUSE_CODES_TO_NAMES = {value: key for (key, value) in MOUSE_NAMES_TO_CODES.items()}
_MOUSE_CODES_TO_NAMES = tools.swapped_kvs(_MOUSE_NAMES_TO_CODES)
def get_active_mouse(outputs: int) -> str:
return _MOUSE_CODES_TO_NAMES.get(outputs & 0b00111000, "")
@dataclasses.dataclass(frozen=True)
@ -64,12 +76,13 @@ class SetMouseOutputEvent(BaseEvent):
mouse: str
def __post_init__(self) -> None:
assert not self.mouse or self.mouse in MOUSE_NAMES_TO_CODES
assert not self.mouse or self.mouse in _MOUSE_NAMES_TO_CODES
def make_request(self) -> bytes:
return _make_request(struct.pack(">BBxxx", 0x04, MOUSE_NAMES_TO_CODES.get(self.mouse, 0)))
return _make_request(struct.pack(">BBxxx", 0x04, _MOUSE_NAMES_TO_CODES.get(self.mouse, 0)))
# =====
class ClearEvent(BaseEvent):
def make_request(self) -> bytes:
return _make_request(b"\x10\x00\x00\x00\x00")

View File

@ -57,6 +57,10 @@ def sorted_kvs(dct: Dict[_DictKeyT, _DictValueT]) -> List[Tuple[_DictKeyT, _Dict
return sorted(dct.items(), key=operator.itemgetter(0))
def swapped_kvs(dct: Dict[_DictKeyT, _DictValueT]) -> Dict[_DictValueT, _DictKeyT]:
return {value: key for (key, value) in dct.items()}
# =====
def clear_queue(q: multiprocessing.queues.Queue) -> None: # pylint: disable=invalid-name
for _ in range(q.qsize()):