refactoring

This commit is contained in:
Devaev Maxim 2019-04-07 06:10:55 +03:00
parent dcd774971a
commit f426e13907

View File

@ -34,6 +34,7 @@ from typing import Dict
from typing import Set
from typing import NamedTuple
from typing import AsyncGenerator
from typing import Any
import yaml
import serial
@ -104,6 +105,7 @@ class _MouseWheelEvent(NamedTuple):
return b"\x14\x00" + struct.pack(">b", delta_y) + b"\x00\x00"
# =====
class Hid(multiprocessing.Process): # pylint: disable=too-many-instance-attributes
def __init__( # pylint: disable=too-many-arguments
self,
@ -164,36 +166,16 @@ class Hid(multiprocessing.Process): # pylint: disable=too-many-instance-attribu
gpio.write(self.__reset_pin, False)
async def send_key_event(self, key: str, state: bool) -> None:
if not self.__stop_event.is_set():
async with self.__lock:
if _KeyEvent.is_valid(key):
if state and key not in self.__pressed_keys:
self.__pressed_keys.add(key)
self.__events_queue.put(_KeyEvent(key, state))
elif not state and key in self.__pressed_keys:
self.__pressed_keys.remove(key)
self.__events_queue.put(_KeyEvent(key, state))
await self.__send_bool_event(_KeyEvent, self.__pressed_keys, key, state)
async def send_mouse_move_event(self, to_x: int, to_y: int) -> None:
if not self.__stop_event.is_set():
async with self.__lock:
self.__events_queue.put(_MouseMoveEvent(to_x, to_y))
await self.__send_int_event(_MouseMoveEvent, to_x, to_y)
async def send_mouse_button_event(self, button: str, state: bool) -> None:
if not self.__stop_event.is_set():
async with self.__lock:
if _MouseButtonEvent.is_valid(button):
if state and button not in self.__pressed_mouse_buttons:
self.__pressed_mouse_buttons.add(button)
self.__events_queue.put(_MouseButtonEvent(button, state))
elif not state and button in self.__pressed_mouse_buttons:
self.__pressed_mouse_buttons.remove(button)
self.__events_queue.put(_MouseButtonEvent(button, state))
await self.__send_bool_event(_MouseButtonEvent, self.__pressed_mouse_buttons, button, state)
async def send_mouse_wheel_event(self, delta_y: int) -> None:
if not self.__stop_event.is_set():
async with self.__lock:
self.__events_queue.put(_MouseWheelEvent(delta_y))
await self.__send_int_event(_MouseWheelEvent, delta_y)
async def clear_events(self) -> None:
if not self.__stop_event.is_set():
@ -212,13 +194,32 @@ class Hid(multiprocessing.Process): # pylint: disable=too-many-instance-attribu
self.__emergency_clear_events()
gpio.write(self.__reset_pin, False)
async def __send_bool_event(self, cls: Any, pressed: Set[str], name: str, state: bool) -> None:
if not self.__stop_event.is_set():
async with self.__lock:
if cls.is_valid(name) and (
(state and (name not in pressed)) # Если еще не нажато
or (not state and (name in pressed)) # ... Или еще не отжато
):
if state:
pressed.add(name)
else:
pressed.remove(name)
self.__events_queue.put(cls(name, state))
async def __send_int_event(self, cls: Any, *args: int) -> None:
if not self.__stop_event.is_set():
async with self.__lock:
self.__events_queue.put(cls(*args))
def __unsafe_clear_events(self) -> None:
for button in self.__pressed_mouse_buttons:
self.__events_queue.put(_MouseButtonEvent(button, False))
self.__pressed_mouse_buttons.clear()
for key in self.__pressed_keys:
self.__events_queue.put(_KeyEvent(key, False))
self.__pressed_keys.clear()
for (cls, pressed) in [
(_MouseButtonEvent, self.__pressed_mouse_buttons),
(_KeyEvent, self.__pressed_keys),
]:
for name in pressed:
self.__events_queue.put(cls(name, False))
pressed.clear()
def __emergency_clear_events(self) -> None:
if os.path.exists(self.__device_path):