This commit is contained in:
Devaev Maxim
2020-03-20 03:07:27 +03:00
parent ab6264bd5e
commit d5ae32b132
70 changed files with 28455 additions and 102 deletions

307
kvmd/apps/vnc/server.py Normal file
View File

@@ -0,0 +1,307 @@
# ========================================================================== #
# #
# KVMD - The main Pi-KVM daemon. #
# #
# Copyright (C) 2020 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
# ========================================================================== #
import asyncio
import asyncio.queues
import socket
import dataclasses
import json
from typing import Dict
from typing import Optional
import aiohttp
from ...logging import get_logger
from ... import aiotools
from .rfb import RfbClient
from .kvmd import KvmdClient
from .streamer import StreamerError
from .streamer import StreamerClient
from .render import make_text_jpeg
# =====
@dataclasses.dataclass()
class _SharedParams:
width: int = dataclasses.field(default=800)
height: int = dataclasses.field(default=600)
name: str = dataclasses.field(default="Pi-KVM")
class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
def __init__(
self,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
kvmd: KvmdClient,
streamer: StreamerClient,
symmap: Dict[int, str],
shared_params: _SharedParams,
) -> None:
super().__init__(reader, writer, **dataclasses.asdict(shared_params))
self.__kvmd = kvmd
self.__streamer = streamer
self.__symmap = symmap
self.__shared_params = shared_params
self.__authorized = asyncio.Future() # type: ignore
self.__ws_connected = asyncio.Future() # type: ignore
self.__ws_writer_queue: asyncio.queues.Queue = asyncio.Queue()
self.__fb_requested = False
self.__fb_stub_text = ""
self.__fb_stub_quality = 0
# Эти состояния шарить не обязательно - бекенд исключает дублирующиеся события.
# Все это нужно только чтобы не посылать лишние жсоны в сокет KVMD
self.__mouse_buttons: Dict[str, Optional[bool]] = {"left": None, "right": None, "middle": None}
self.__mouse_move = {"x": -1, "y": -1}
# =====
async def run(self) -> None:
await self._run(
kvmd=self.__kvmd_task_loop(),
streamer=self.__streamer_task_loop(),
)
# =====
async def __kvmd_task_loop(self) -> None:
logger = get_logger(0)
await self.__authorized
(user, passwd) = self.__authorized.result()
async with self.__kvmd.ws(user, passwd) as ws:
logger.info("[kvmd] Client %s: Connected to KVMD websocket", self._remote)
self.__ws_connected.set_result(None)
receive_task: Optional[asyncio.Task] = None
writer_task: Optional[asyncio.Task] = None
try:
while True:
if receive_task is None:
receive_task = asyncio.create_task(ws.receive())
if writer_task is None:
writer_task = asyncio.create_task(self.__ws_writer_queue.get())
done = (await aiotools.wait_first(receive_task, writer_task))[0]
if receive_task in done:
msg = receive_task.result()
if msg.type == aiohttp.WSMsgType.TEXT:
await self.__process_ws_event(json.loads(msg.data))
else:
raise RuntimeError(f"Unknown WS message type: {msg!r}")
receive_task = None
if writer_task in done:
await ws.send_str(json.dumps(writer_task.result()))
writer_task = None
finally:
if receive_task:
receive_task.cancel()
if writer_task:
writer_task.cancel()
async def __process_ws_event(self, event: Dict) -> None:
if event["event_type"] == "info_state":
host = event["event"]["meta"].get("server", {}).get("host")
if isinstance(host, str):
name = f"Pi-KVM: {host}"
async with self._lock:
if self._encodings.has_rename:
await self._send_rename(name)
self.__shared_params.name = name
elif event["event_type"] == "hid_state":
async with self._lock:
if self._encodings.has_leds_state:
await self._send_leds_state(**event["event"]["keyboard"]["leds"])
# =====
async def __streamer_task_loop(self) -> None:
logger = get_logger(0)
await self.__ws_connected
while True:
try:
streaming = False
async for (online, width, height, jpeg) in self.__streamer.read():
if not streaming:
logger.info("[streamer] Client %s: Streaming ...", self._remote)
streaming = True
if online:
await self.__send_fb_real(width, height, jpeg)
else:
await self.__send_fb_stub("No signal")
except StreamerError as err:
logger.info("[streamer] Client %s: Waiting for stream: %s", self._remote, str(err))
await self.__send_fb_stub("Waiting for stream ...")
await asyncio.sleep(1)
async def __send_fb_real(self, width: int, height: int, jpeg: bytes) -> None:
async with self._lock:
if self.__fb_requested:
if (self._width, self._height) != (width, height):
self.__shared_params.width = width
self.__shared_params.height = height
if not self._encodings.has_resize:
msg = f"Resoultion changed: {self._width}x{self._height} -> {width}x{height}\nPlease reconnect"
await self.__send_fb_stub(msg, no_lock=True)
return
await self._send_resize(width, height)
await self._send_fb(jpeg)
self.__fb_stub_text = ""
self.__fb_stub_quality = 0
self.__fb_requested = False
async def __send_fb_stub(self, text: str, no_lock: bool=False) -> None:
if not no_lock:
await self._lock.acquire()
try:
if self.__fb_requested and (self.__fb_stub_text != text or self.__fb_stub_quality != self._encodings.tight_jpeg_quality):
await self._send_fb(await make_text_jpeg(self._width, self._height, self._encodings.tight_jpeg_quality, text))
self.__fb_stub_text = text
self.__fb_stub_quality = self._encodings.tight_jpeg_quality
self.__fb_requested = False
finally:
if not no_lock:
self._lock.release()
# =====
async def _authorize(self, user: str, passwd: str) -> bool:
if (await self.__kvmd.authorize(user, passwd)):
self.__authorized.set_result((user, passwd))
return True
return False
async def _on_key_event(self, code: int, state: bool) -> None:
print("KeyEvent", code, state, self.__symmap.get(code)) # TODO
async def _on_pointer_event(self, buttons: Dict[str, bool], wheel: Dict[str, int], move: Dict[str, int]) -> None:
for (button, state) in buttons.items():
if self.__mouse_buttons[button] != state:
await self.__ws_writer_queue.put({
"event_type": "mouse_button",
"event": {"button": button, "state": state},
})
self.__mouse_buttons[button] = state
if wheel["x"] or wheel["y"]:
await self.__ws_writer_queue.put({
"event_type": "mouse_wheel",
"event": {"delta": wheel},
})
if self.__mouse_move != move:
await self.__ws_writer_queue.put({
"event_type": "mouse_move",
"event": {"to": move},
})
self.__mouse_move = move
async def _on_cut_event(self, text: str) -> None:
print("CutEvent", text) # TODO
async def _on_set_encodings(self) -> None:
assert self.__authorized.done()
(user, passwd) = self.__authorized.result()
(quality, desired_fps) = (self._encodings.tight_jpeg_quality, 30)
get_logger(0).info("[main] Client %s: Applying streamer params: quality=%d%%; desired_fps=%d ...",
self._remote, quality, desired_fps)
await self.__kvmd.set_streamer_params(user, passwd, quality=quality, desired_fps=desired_fps)
async def _on_fb_update_request(self) -> None:
self.__fb_requested = True
# =====
class VncServer:
def __init__(
self,
host: str,
port: int,
max_clients: int,
kvmd: KvmdClient,
streamer: StreamerClient,
symmap: Dict[int, str],
) -> None:
self.__host = host
self.__port = port
self.__max_clients = max_clients
self.__kvmd = kvmd
self.__streamer = streamer
self.__symmap = symmap
self.__shared_params = _SharedParams()
def run(self) -> None:
logger = get_logger(0)
logger.info("Listening VNC on TCP [%s]:%d ...", self.__host, self.__port)
sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, False)
sock.bind((self.__host, self.__port))
loop = asyncio.get_event_loop()
server = loop.run_until_complete(asyncio.start_server(
client_connected_cb=self.__handle_client,
sock=sock,
backlog=self.__max_clients,
loop=loop,
))
try:
loop.run_forever()
except (SystemExit, KeyboardInterrupt):
pass
finally:
server.close()
loop.run_until_complete(server.wait_closed())
tasks = asyncio.Task.all_tasks()
for task in tasks:
task.cancel()
loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True))
loop.close()
logger.info("Bye-bye")
async def __handle_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
await _Client(reader, writer, self.__kvmd, self.__streamer, self.__symmap, self.__shared_params).run()