mirror of
https://github.com/mofeng-git/One-KVM.git
synced 2025-12-12 09:10:30 +08:00
193 lines
8.2 KiB
Python
193 lines
8.2 KiB
Python
# ========================================================================== #
|
|
# #
|
|
# KVMD - The main PiKVM daemon. #
|
|
# #
|
|
# Copyright (C) 2020 Maxim Devaev <mdevaev@gmail.com> #
|
|
# #
|
|
# This program is free software: you can redistribute it and/or modify #
|
|
# it under the terms of the GNU General Public License as published by #
|
|
# the Free Software Foundation, either version 3 of the License, or #
|
|
# (at your option) any later version. #
|
|
# #
|
|
# This program is distributed in the hope that it will be useful, #
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
|
|
# GNU General Public License for more details. #
|
|
# #
|
|
# You should have received a copy of the GNU General Public License #
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
|
|
# #
|
|
# ========================================================================== #
|
|
|
|
|
|
import asyncio
|
|
import errno
|
|
|
|
from typing import Callable
|
|
from typing import Coroutine
|
|
|
|
import aiohttp
|
|
import async_lru
|
|
|
|
from evdev import ecodes
|
|
|
|
from ...logging import get_logger
|
|
|
|
from ... import tools
|
|
from ... import aiotools
|
|
|
|
from ...keyboard.magic import MagicHandler
|
|
|
|
from ...clients.kvmd import KvmdClient
|
|
from ...clients.kvmd import KvmdClientSession
|
|
from ...clients.kvmd import KvmdClientWs
|
|
|
|
from .hid import Hid
|
|
from .multi import MultiHid
|
|
|
|
|
|
# =====
|
|
class LocalHidServer: # pylint: disable=too-many-instance-attributes
|
|
def __init__(self, kvmd: KvmdClient) -> None:
|
|
self.__kvmd = kvmd
|
|
|
|
self.__kvmd_session: (KvmdClientSession | None) = None
|
|
self.__kvmd_ws: (KvmdClientWs | None) = None
|
|
|
|
self.__queue: asyncio.Queue[tuple[int, tuple]] = asyncio.Queue()
|
|
self.__hid = MultiHid(self.__queue)
|
|
|
|
self.__info_switch_units = 0
|
|
self.__info_switch_active = ""
|
|
self.__info_mouse_absolute = True
|
|
self.__info_mouse_outputs: list[str] = []
|
|
|
|
self.__magic = MagicHandler(
|
|
proxy_handler=self.__on_magic_key_proxy,
|
|
key_handlers={
|
|
ecodes.KEY_H: self.__on_magic_grab,
|
|
ecodes.KEY_K: self.__on_magic_ungrab,
|
|
ecodes.KEY_UP: self.__on_magic_switch_prev,
|
|
ecodes.KEY_LEFT: self.__on_magic_switch_prev,
|
|
ecodes.KEY_DOWN: self.__on_magic_switch_next,
|
|
ecodes.KEY_RIGHT: self.__on_magic_switch_next,
|
|
},
|
|
numeric_handler=self.__on_magic_switch_port,
|
|
)
|
|
|
|
def run(self) -> None:
|
|
try:
|
|
aiotools.run(self.__inner_run())
|
|
finally:
|
|
get_logger(0).info("Bye-bye")
|
|
|
|
async def __inner_run(self) -> None:
|
|
await aiotools.spawn_and_follow(
|
|
self.__create_loop(self.__hid.run),
|
|
self.__create_loop(self.__queue_worker),
|
|
self.__create_loop(self.__api_worker),
|
|
)
|
|
|
|
async def __create_loop(self, func: Callable[[], Coroutine]) -> None:
|
|
while True:
|
|
try:
|
|
await func()
|
|
except Exception as ex:
|
|
if isinstance(ex, OSError) and ex.errno == errno.ENODEV: # pylint: disable=no-member
|
|
pass # Device disconnected
|
|
elif isinstance(ex, aiohttp.ClientError):
|
|
get_logger(0).error("KVMD client error: %s", tools.efmt(ex))
|
|
else:
|
|
get_logger(0).exception("Unhandled exception in the loop: %s", func)
|
|
await asyncio.sleep(5)
|
|
|
|
async def __queue_worker(self) -> None:
|
|
while True:
|
|
(event, args) = await self.__queue.get()
|
|
if event == Hid.KEY:
|
|
await self.__magic.handle_key(*args)
|
|
continue
|
|
elif self.__hid.is_grabbed() and self.__kvmd_session and self.__kvmd_ws:
|
|
match event:
|
|
case Hid.MOUSE_BUTTON:
|
|
await self.__kvmd_ws.send_mouse_button_event(*args)
|
|
case Hid.MOUSE_REL:
|
|
await self.__ensure_mouse_relative()
|
|
await self.__kvmd_ws.send_mouse_relative_event(*args)
|
|
case Hid.MOUSE_WHEEL:
|
|
await self.__kvmd_ws.send_mouse_wheel_event(*args)
|
|
|
|
async def __api_worker(self) -> None:
|
|
logger = get_logger(0)
|
|
async with self.__kvmd.make_session() as session:
|
|
async with session.ws(stream=False) as ws:
|
|
logger.info("KVMD session opened")
|
|
self.__kvmd_session = session
|
|
self.__kvmd_ws = ws
|
|
try:
|
|
async for (event_type, event) in ws.communicate():
|
|
if event_type == "hid":
|
|
if "leds" in event.get("keyboard", {}):
|
|
await self.__hid.set_leds(**event["keyboard"]["leds"])
|
|
if "absolute" in event.get("mouse", {}):
|
|
self.__info_mouse_outputs = event["mouse"]["outputs"]["available"]
|
|
self.__info_mouse_absolute = event["mouse"]["absolute"]
|
|
elif event_type == "switch":
|
|
if "model" in event:
|
|
self.__info_switch_units = len(event["model"]["units"])
|
|
if "summary" in event:
|
|
self.__info_switch_active = event["summary"]["active_id"]
|
|
finally:
|
|
logger.info("KVMD session closed")
|
|
self.__kvmd_session = None
|
|
self.__kvmd_ws = None
|
|
|
|
# =====
|
|
|
|
async def __ensure_mouse_relative(self) -> None:
|
|
if self.__info_mouse_absolute:
|
|
# Avoid unnecessary LRU checks, just to speed up a bit
|
|
await self.__inner_ensure_mouse_relative()
|
|
|
|
@async_lru.alru_cache(maxsize=1, ttl=1)
|
|
async def __inner_ensure_mouse_relative(self) -> None:
|
|
if self.__kvmd_session and self.__info_mouse_absolute:
|
|
for output in ["usb_rel", "ps2"]:
|
|
if output in self.__info_mouse_outputs:
|
|
await self.__kvmd_session.hid.set_params(mouse_output=output)
|
|
|
|
async def __on_magic_key_proxy(self, key: int, state: bool) -> None:
|
|
if self.__hid.is_grabbed() and self.__kvmd_ws:
|
|
await self.__kvmd_ws.send_key_event(key, state)
|
|
|
|
async def __on_magic_grab(self) -> None:
|
|
await self.__hid.set_grabbed(True)
|
|
|
|
async def __on_magic_ungrab(self) -> None:
|
|
await self.__hid.set_grabbed(False)
|
|
|
|
async def __on_magic_switch_prev(self) -> None:
|
|
if self.__kvmd_session and self.__info_switch_units > 0:
|
|
get_logger(0).info("Switching port to the previous one ...")
|
|
await self.__kvmd_session.switch.set_active_prev()
|
|
|
|
async def __on_magic_switch_next(self) -> None:
|
|
if self.__kvmd_session and self.__info_switch_units > 0:
|
|
get_logger(0).info("Switching port to the next one ...")
|
|
await self.__kvmd_session.switch.set_active_next()
|
|
|
|
async def __on_magic_switch_port(self, codes: list[int]) -> bool:
|
|
assert len(codes) > 0
|
|
if self.__info_switch_units <= 0:
|
|
return True
|
|
elif 1 <= self.__info_switch_units <= 2:
|
|
port = float(codes[0])
|
|
else: # self.__info_switch_units > 2:
|
|
if len(codes) == 1:
|
|
return False # Wait for the second key
|
|
port = (codes[0] + 1) + (codes[1] + 1) / 10
|
|
if self.__kvmd_session:
|
|
get_logger(0).info("Switching port to %s ...", port)
|
|
await self.__kvmd_session.switch.set_active(port)
|
|
return True
|