otg keyboard hid

This commit is contained in:
Devaev Maxim 2019-09-28 05:21:09 +03:00
parent 31c17bb583
commit c16e4c953c
4 changed files with 456 additions and 0 deletions

View File

@ -0,0 +1,107 @@
# ========================================================================== #
# #
# KVMD - The main Pi-KVM daemon. #
# #
# Copyright (C) 2018 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
# ========================================================================== #
import asyncio
from typing import Dict
from typing import AsyncGenerator
from typing import Any
from ....yamlconf import Option
from ....validators.basic import valid_bool
from ....validators.basic import valid_int_f1
from ....validators.basic import valid_float_f01
from ....validators.os import valid_abs_path
from .. import BaseHid
from .keyboard import KeyboardProcess
# =====
class Plugin(BaseHid):
def __init__( # pylint: disable=super-init-not-called
self,
keyboard: Dict[str, Any],
noop: bool,
state_poll: float,
) -> None:
self.__keyboard_proc = KeyboardProcess(noop=noop, **keyboard)
self.__state_poll = state_poll
self.__lock = asyncio.Lock()
@classmethod
def get_plugin_options(cls) -> Dict[str, Option]:
return {
"keyboard": {
"device": Option("", type=valid_abs_path, unpack_as="device_path"),
"timeout": Option(1.0, type=valid_float_f01),
"retries": Option(5, type=valid_int_f1),
"retries_delay": Option(1.0, type=valid_float_f01),
},
"noop": Option(False, type=valid_bool),
"state_poll": Option(0.1, type=valid_float_f01),
}
def start(self) -> None:
self.__keyboard_proc.start()
def get_state(self) -> Dict:
return {"online": self.__keyboard_proc.is_online()}
async def poll_state(self) -> AsyncGenerator[Dict, None]:
prev_state: Dict = {}
while self.__keyboard_proc.is_alive():
state = self.get_state()
if state != prev_state:
yield self.get_state()
prev_state = state
await asyncio.sleep(self.__state_poll)
async def reset(self) -> None:
self.__keyboard_proc.send_reset_event()
async def cleanup(self) -> None:
self.__keyboard_proc.cleanup()
# =====
async def send_key_event(self, key: str, state: bool) -> None:
self.__keyboard_proc.send_key_event(key, state)
async def send_mouse_move_event(self, to_x: int, to_y: int) -> None:
pass
async def send_mouse_button_event(self, button: str, state: bool) -> None:
pass
async def send_mouse_wheel_event(self, delta_y: int) -> None:
pass
async def clear_events(self) -> None:
self.__keyboard_proc.send_clear_event()

197
kvmd/plugins/hid/otg/hid.py Normal file
View File

@ -0,0 +1,197 @@
# ========================================================================== #
# #
# KVMD - The main Pi-KVM daemon. #
# #
# Copyright (C) 2018 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
# ========================================================================== #
import os
import select
import signal
import multiprocessing
import multiprocessing.queues
import queue
import errno
import time
import setproctitle
from ....logging import get_logger
# =====
class BaseEvent:
pass
class DeviceProcess(multiprocessing.Process): # pylint: disable=too-many-instance-attributes
def __init__(
self,
name: str,
device_path: str,
timeout: float,
retries: int,
retries_delay: float,
noop: bool,
) -> None:
super().__init__(daemon=True)
self.__name = name
self.__device_path = device_path
self.__timeout = timeout
self.__retries = retries
self.__retries_delay = retries_delay
self.__noop = noop
self.__fd = -1
self.__events_queue: multiprocessing.queues.Queue = multiprocessing.Queue()
self.__online_shared = multiprocessing.Value("i", 1)
self.__stop_event = multiprocessing.Event()
def run(self) -> None:
logger = get_logger(0)
logger.info("Started HID-%s pid=%d", self.__name, os.getpid())
signal.signal(signal.SIGINT, signal.SIG_IGN)
setproctitle.setproctitle(f"[hid-{self.__name}] {setproctitle.getproctitle()}")
while not self.__stop_event.is_set():
try:
while not self.__stop_event.is_set():
passed = 0
try:
event: BaseEvent = self.__events_queue.get(timeout=0.05)
except queue.Empty:
if passed >= 20: # 20 * 0.05 = 1 sec
self._ensure_device() # Check device
passed = 0
else:
passed += 1
else:
self._process_event(event)
passed = 0
except Exception:
logger.error("Unexpected HID-%s error", self.__name)
self._close_device()
finally:
time.sleep(1)
self._close_device()
def is_online(self) -> bool:
return bool(self.__online_shared.value)
def _stop(self) -> None:
if self.is_alive():
get_logger().info("Stopping HID-%s daemon ...", self.__name)
self.__stop_event.set()
if self.exitcode is not None:
self.join()
def _process_event(self, event: BaseEvent) -> None:
raise NotImplementedError
def _queue_event(self, event: BaseEvent) -> None:
self.__events_queue.put(event)
def _write_report(self, report: bytes) -> bool:
if self.__noop:
return True
assert self.__fd >= 0
logger = get_logger()
retries = self.__retries
while retries:
try:
written = os.write(self.__fd, report)
if written == len(report):
self.__online_shared.value = 1
return True
else:
logger.error("HID-%s write error: written (%s) != report length (%d)",
self.__name, written, len(report))
self._close_device()
except Exception as err:
if isinstance(err, OSError) and errno == errno.EAGAIN:
msg = "Can't write report to HID-%s {}: %s: %s"
msg.format(" (maybe unplugged)" if retries == 1 else "")
logger.error(msg, self.__name, type(err).__name__, err) # TODO: debug
else:
logger.exception("Can't write report to HID-%s", self.__name)
self._close_device()
retries -= 1
self.__online_shared.value = 0
if retries:
logger.error("Retries left (HID-%s, write_report): %d", self.__name, retries)
time.sleep(self.__retries_delay)
return False
def _ensure_device(self) -> bool:
if self.__noop:
return True
logger = get_logger()
if self.__fd < 0:
try:
self.__fd = os.open(self.__device_path, os.O_WRONLY|os.O_NONBLOCK)
except FileNotFoundError:
logger.error("Missing HID-%s device: %s", self.__name, self.__device_path)
except Exception:
logger.exception("Can't open HID-%s device: %s", self.__name, self.__device_path)
if self.__fd >= 0:
retries = self.__retries
while retries:
try:
if select.select([], [self.__fd], [], self.__timeout)[1]:
self.__online_shared.value = 1
return True
else:
msg = "HID-%s is unavailable for writing"
if retries == 1:
msg += " (maybe unplugged)"
logger.error(msg, self.__name) # TODO: debug
except Exception as err:
logger.error("Can't select() HID-%s: %s: %s", self.__name, type(err).__name__, err)
retries -= 1
self.__online_shared.value = 0
if retries:
logger.error("Retries left (HID-%s, ensure_device): %d", self.__name, retries)
time.sleep(self.__retries_delay)
self._close_device()
return False
def _close_device(self) -> None:
if self.__fd >= 0:
try:
os.close(self.__fd)
except Exception:
pass
finally:
self.__fd = -1

View File

@ -0,0 +1,151 @@
# ========================================================================== #
# #
# KVMD - The main Pi-KVM daemon. #
# #
# Copyright (C) 2018 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
# ========================================================================== #
import dataclasses
from typing import List
from typing import Set
from typing import Optional
from typing import Any
from ....logging import get_logger
from .... import keymap
from .hid import BaseEvent
from .hid import DeviceProcess
# =====
class _ClearEvent(BaseEvent):
pass
class _ResetEvent(BaseEvent):
pass
@dataclasses.dataclass(frozen=True)
class _KeyEvent(BaseEvent):
key: keymap.OtgKey
state: bool
# =====
class KeyboardProcess(DeviceProcess):
def __init__(self, **kwargs: Any) -> None:
super().__init__(name="keyboard", **kwargs)
self.__pressed_modifiers: Set[keymap.OtgKey] = set()
self.__pressed_keys: List[Optional[keymap.OtgKey]] = [None] * 6
def cleanup(self) -> None:
self._stop()
get_logger().info("Clearing HID-keyboard events ...")
if self._ensure_device():
try:
self._write_report(b"\x00" * 8) # Release all keys and modifiers
finally:
self._close_device()
def send_clear_event(self) -> None:
self._queue_event(_ClearEvent())
def send_reset_event(self) -> None:
self._queue_event(_ResetEvent())
def send_key_event(self, key: str, state: bool) -> None:
assert key in keymap.KEYMAP
self._queue_event(_KeyEvent(key=keymap.KEYMAP[key].otg, state=state))
# =====
def _process_event(self, event: BaseEvent) -> None:
if isinstance(event, _ClearEvent):
self.__process_clear_event()
elif isinstance(event, _ResetEvent):
self.__process_clear_event(reopen=True)
elif isinstance(event, _KeyEvent):
self.__process_key_event(event)
def __process_clear_event(self, reopen: bool=False) -> None:
self.__clear_modifiers()
self.__clear_keys()
if reopen:
self._close_device()
self.__send_current_state()
def __process_key_event(self, event: _KeyEvent) -> None:
if event.key.is_modifier:
if event.key in self.__pressed_modifiers:
# Ранее нажатый модификатор отжимаем
self.__pressed_modifiers.remove(event.key)
if not self.__send_current_state():
return
if event.state:
# Нажимаем если нужно
self.__pressed_modifiers.add(event.key)
self.__send_current_state()
else: # regular key
if event.key in self.__pressed_keys:
# Ранее нажатую клавишу отжимаем
self.__pressed_keys[self.__pressed_keys.index(event.key)] = None
if not self.__send_current_state():
return
elif event.state and None not in self.__pressed_keys:
# Если нужно нажать что-то новое, но свободных слотов нет - отжимаем всё
self.__clear_keys()
if not self.__send_current_state():
return
if event.state:
# Нажимаем если нужно
self.__pressed_keys[self.__pressed_keys.index(None)] = event.key
self.__send_current_state()
def __send_current_state(self) -> bool:
ok = False
if self._ensure_device():
modifiers = 0
for key in self.__pressed_modifiers:
assert key.is_modifier
modifiers |= key.code
assert len(self.__pressed_keys) == 6
keys = [
(0 if key is None else key.code)
for key in self.__pressed_keys
]
print(self.__pressed_modifiers, self.__pressed_keys)
ok = self._write_report(bytes([modifiers, 0] + keys))
if not ok:
self.__clear_modifiers()
self.__clear_keys()
return ok
def __clear_modifiers(self) -> None:
self.__pressed_modifiers.clear()
def __clear_keys(self) -> None:
self.__pressed_keys = [None] * 6

View File

@ -82,6 +82,7 @@ def main() -> None:
"kvmd.plugins", "kvmd.plugins",
"kvmd.plugins.auth", "kvmd.plugins.auth",
"kvmd.plugins.hid", "kvmd.plugins.hid",
"kvmd.plugins.hid.otg",
"kvmd.plugins.atx", "kvmd.plugins.atx",
"kvmd.plugins.msd", "kvmd.plugins.msd",
"kvmd.apps", "kvmd.apps",