mirror of
https://github.com/mofeng-git/One-KVM.git
synced 2026-01-29 00:51:53 +08:00
refactoring
This commit is contained in:
@@ -22,21 +22,34 @@
|
||||
|
||||
import time
|
||||
|
||||
from typing import Callable
|
||||
from typing import Awaitable
|
||||
|
||||
from evdev import ecodes
|
||||
|
||||
|
||||
# =====
|
||||
class BaseMagicHandler:
|
||||
class MagicHandler:
|
||||
__MAGIC_KEY = ecodes.KEY_LEFTALT
|
||||
__MAGIC_TIMEOUT = 2
|
||||
__MAGIC_TRIGGER = 2
|
||||
|
||||
def __init__(self) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
proxy_handler: Callable[[int, bool], Awaitable[None]],
|
||||
key_handlers: (dict[int, Callable[[], Awaitable[None]]] | None)=None,
|
||||
numeric_handler: (Callable[[list[int]], Awaitable[bool]] | None)=None,
|
||||
) -> None:
|
||||
|
||||
self.__proxy_handler = proxy_handler
|
||||
self.__key_handlers = (key_handlers or {})
|
||||
self.__numeric_handler = numeric_handler
|
||||
|
||||
self.__taps = 0
|
||||
self.__ts = 0.0
|
||||
self.__codes: list[int] = []
|
||||
|
||||
async def _magic_handle_key(self, key: int, state: bool) -> None: # pylint: disable=too-many-branches
|
||||
async def handle_key(self, key: int, state: bool) -> None: # pylint: disable=too-many-branches
|
||||
if self.__ts + self.__MAGIC_TIMEOUT < time.monotonic():
|
||||
self.__taps = 0
|
||||
self.__ts = 0
|
||||
@@ -53,44 +66,17 @@ class BaseMagicHandler:
|
||||
self.__ts = 0
|
||||
self.__codes = []
|
||||
if taps >= self.__MAGIC_TRIGGER:
|
||||
if key == ecodes.KEY_P:
|
||||
await self._on_magic_clipboard_print()
|
||||
if key in self.__key_handlers:
|
||||
await self.__key_handlers[key]()
|
||||
return
|
||||
|
||||
elif key in [ecodes.KEY_UP, ecodes.KEY_LEFT]:
|
||||
await self._on_magic_switch_prev()
|
||||
return
|
||||
|
||||
elif key in [ecodes.KEY_DOWN, ecodes.KEY_RIGHT]:
|
||||
await self._on_magic_switch_next()
|
||||
return
|
||||
|
||||
elif ecodes.KEY_1 <= key <= ecodes.KEY_8:
|
||||
elif self.__numeric_handler is not None and (ecodes.KEY_1 <= key <= ecodes.KEY_8):
|
||||
codes.append(key - ecodes.KEY_1)
|
||||
if len(codes) == 1:
|
||||
if not (await self._on_magic_switch_port(codes[0], -1)):
|
||||
self.__taps = taps
|
||||
self.__ts = time.monotonic()
|
||||
self.__codes = codes
|
||||
elif len(codes) >= 2:
|
||||
await self._on_magic_switch_port(codes[0], codes[1])
|
||||
if not (await self.__numeric_handler(list(codes))):
|
||||
# Если хандлер хочет код большей длины, он возвращает False,
|
||||
# и мы ждем следующую цифру.
|
||||
self.__taps = taps
|
||||
self.__ts = time.monotonic()
|
||||
self.__codes = codes
|
||||
return
|
||||
|
||||
await self._on_magic_key_proxy(key, state)
|
||||
|
||||
async def _on_magic_clipboard_print(self) -> None:
|
||||
pass
|
||||
|
||||
async def _on_magic_switch_prev(self) -> None:
|
||||
pass
|
||||
|
||||
async def _on_magic_switch_next(self) -> None:
|
||||
pass
|
||||
|
||||
async def _on_magic_switch_port(self, first: int, second: int) -> bool:
|
||||
_ = first
|
||||
_ = second
|
||||
return True
|
||||
|
||||
async def _on_magic_key_proxy(self, key: int, state: bool) -> None:
|
||||
raise NotImplementedError()
|
||||
await self.__proxy_handler(key, state)
|
||||
|
||||
Reference in New Issue
Block a user