commond kvmd ws client

This commit is contained in:
Devaev Maxim 2020-05-24 11:41:38 +03:00
parent 6d7351502e
commit cf47e0c880
2 changed files with 99 additions and 66 deletions

View File

@ -22,11 +22,9 @@
import os
import asyncio
import asyncio.queues
import socket
import dataclasses
import contextlib
import json
from typing import Dict
from typing import Optional
@ -38,14 +36,13 @@ from ...logging import get_logger
from ...keyboard.keysym import SymmapWebKey
from ...keyboard.keysym import build_symmap
from ...clients.kvmd import KvmdClientWs
from ...clients.kvmd import KvmdClientSession
from ...clients.kvmd import KvmdClient
from ...clients.streamer import StreamerError
from ...clients.streamer import StreamerClient
from ... import aiotools
from .rfb import RfbClient
from .rfb.stream import rfb_format_remote
from .rfb.stream import rfb_close_writer
@ -106,10 +103,10 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
self.__shared_params = shared_params
self.__kvmd_session: Optional[KvmdClientSession] = None
self.__authorized = asyncio.Future() # type: ignore
self.__ws_connected = asyncio.Future() # type: ignore
self.__ws_writer_queue: asyncio.queues.Queue = asyncio.Queue()
self.__kvmd_session: Optional[KvmdClientSession] = None
self.__kvmd_ws: Optional[KvmdClientWs] = None
self.__fb_requested = False
self.__fb_stub_text = ""
@ -133,48 +130,23 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
finally:
if self.__kvmd_session:
await self.__kvmd_session.close()
self.__kvmd_session = None
# =====
async def __kvmd_task_loop(self) -> None:
logger = get_logger(0)
await self.__authorized
assert self.__kvmd_session
async with self.__kvmd_session.ws() as ws:
logger.info("[kvmd] Client %s: Connected to KVMD websocket", self._remote)
self.__ws_connected.set_result(None)
receive_task: Optional[asyncio.Task] = None
writer_task: Optional[asyncio.Task] = None
try:
while True:
if receive_task is None:
receive_task = asyncio.create_task(ws.receive())
if writer_task is None:
writer_task = asyncio.create_task(self.__ws_writer_queue.get())
done = (await aiotools.wait_first(receive_task, writer_task))[0]
if receive_task in done:
msg = receive_task.result()
if msg.type == aiohttp.WSMsgType.TEXT:
await self.__process_ws_event(json.loads(msg.data))
elif msg.type == aiohttp.WSMsgType.CLOSE:
raise RfbError("KVMD closed the wesocket (it may have been stopped)")
else:
raise RuntimeError(f"Unhandled WS message type: {msg!r}")
receive_task = None
if writer_task in done:
await ws.send_str(json.dumps(writer_task.result()))
writer_task = None
finally:
if receive_task:
receive_task.cancel()
if writer_task:
writer_task.cancel()
try:
async with self.__kvmd_session.ws() as self.__kvmd_ws:
logger.info("[kvmd] Client %s: Connected to KVMD websocket", self._remote)
self.__ws_connected.set_result(None)
async for event in self.__kvmd_ws.communicate():
await self.__process_ws_event(event)
raise RfbError("KVMD closes the websocket (the server may have been stopped)")
finally:
self.__kvmd_ws = None
async def __process_ws_event(self, event: Dict) -> None:
if event["event_type"] == "info_state":
@ -262,33 +234,23 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
# =====
async def _on_key_event(self, code: int, state: bool) -> None:
if (web_key := self.__symmap.get(code)) is not None:
await self.__ws_writer_queue.put({
"event_type": "key",
"event": {"key": web_key.name, "state": state},
})
if self.__kvmd_ws:
if (web_key := self.__symmap.get(code)) is not None:
await self.__kvmd_ws.send_key_event(web_key.name, state)
async def _on_pointer_event(self, buttons: Dict[str, bool], wheel: Dict[str, int], move: Dict[str, int]) -> None:
for (button, state) in buttons.items():
if self.__mouse_buttons[button] != state:
await self.__ws_writer_queue.put({
"event_type": "mouse_button",
"event": {"button": button, "state": state},
})
self.__mouse_buttons[button] = state
if self.__kvmd_ws:
for (button, state) in buttons.items():
if self.__mouse_buttons[button] != state:
await self.__kvmd_ws.send_mouse_button_event(button, state)
self.__mouse_buttons[button] = state
if wheel["x"] or wheel["y"]:
await self.__ws_writer_queue.put({
"event_type": "mouse_wheel",
"event": {"delta": wheel},
})
if wheel["x"] or wheel["y"]:
await self.__kvmd_ws.send_mouse_wheel_event(wheel["x"], wheel["y"])
if self.__mouse_move != move:
await self.__ws_writer_queue.put({
"event_type": "mouse_move",
"event": {"to": move},
})
self.__mouse_move = move
if self.__mouse_move != move:
await self.__kvmd_ws.send_mouse_move_event(move["x"], move["y"])
self.__mouse_move = move
async def _on_cut_event(self, text: str) -> None:
assert self.__authorized.done()

View File

@ -20,7 +20,10 @@
# ========================================================================== #
import asyncio
import asyncio.queues
import contextlib
import json
import types
from typing import Tuple
@ -111,6 +114,73 @@ class _AtxApiPart(_BaseApiPart):
raise
# =====
class KvmdClientWs:
def __init__(self, ws: aiohttp.ClientWebSocketResponse) -> None:
self.__ws = ws
self.__writer_queue: asyncio.queues.Queue = asyncio.Queue()
self.__communicated = False
async def communicate(self) -> AsyncGenerator[Dict, None]:
assert not self.__communicated
self.__communicated = True
receive_task: Optional[asyncio.Task] = None
writer_task: Optional[asyncio.Task] = None
try:
while True:
if receive_task is None:
receive_task = asyncio.create_task(self.__ws.receive())
if writer_task is None:
writer_task = asyncio.create_task(self.__writer_queue.get())
done = (await aiotools.wait_first(receive_task, writer_task))[0]
if receive_task in done:
msg = receive_task.result()
if msg.type == aiohttp.WSMsgType.TEXT:
yield json.loads(msg.data)
elif msg.type == aiohttp.WSMsgType.CLOSE:
break
else:
raise RuntimeError(f"Unhandled WS message type: {msg!r}")
receive_task = None
if writer_task in done:
await self.__ws.send_str(json.dumps(writer_task.result()))
writer_task = None
finally:
if receive_task:
receive_task.cancel()
if writer_task:
writer_task.cancel()
self.__communicated = False
async def send_key_event(self, key: str, state: bool) -> None:
await self.__writer_queue.put({
"event_type": "key",
"event": {"key": key, "state": state},
})
async def send_mouse_button_event(self, button: str, state: bool) -> None:
await self.__writer_queue.put({
"event_type": "mouse_button",
"event": {"button": button, "state": state},
})
async def send_mouse_move_event(self, to_x: int, to_y: int) -> None:
await self.__writer_queue.put({
"event_type": "mouse_move",
"event": {"to": {"x": to_x, "y": to_y}},
})
async def send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None:
await self.__writer_queue.put({
"event_type": "mouse_wheel",
"event": {"delta": {"x": delta_x, "y": delta_y}},
})
class KvmdClientSession:
def __init__(
self,
@ -124,16 +194,17 @@ class KvmdClientSession:
self.__http_session: Optional[aiohttp.ClientSession] = None
args = (self.__ensure_http_session, make_url)
self.auth = _AuthApiPart(*args)
self.streamer = _StreamerApiPart(*args)
self.hid = _HidApiPart(*args)
self.atx = _AtxApiPart(*args)
@contextlib.asynccontextmanager
async def ws(self) -> AsyncGenerator[aiohttp.ClientWebSocketResponse, None]:
async def ws(self) -> AsyncGenerator[KvmdClientWs, None]:
session = self.__ensure_http_session()
async with session.ws_connect(self.__make_url("ws")) as ws:
yield ws
yield KvmdClientWs(ws)
def __ensure_http_session(self) -> aiohttp.ClientSession:
if not self.__http_session: