refactoring

This commit is contained in:
Devaev Maxim 2019-12-09 01:21:38 +03:00
parent 272ea08adf
commit dd52a85cf6
11 changed files with 717 additions and 413 deletions

View File

@ -38,7 +38,7 @@ from .info import InfoManager
from .logreader import LogReader from .logreader import LogReader
from .wol import WakeOnLan from .wol import WakeOnLan
from .streamer import Streamer from .streamer import Streamer
from .server import Server from .server import KvmdServer
# ===== # =====
@ -62,7 +62,7 @@ def main(argv: Optional[List[str]]=None) -> None:
config = config.kvmd config = config.kvmd
Server( KvmdServer(
auth_manager=AuthManager( auth_manager=AuthManager(
internal_type=config.auth.internal.type, internal_type=config.auth.internal.type,
internal_kwargs=config.auth.internal._unpack(ignore=["type", "force_users"]), internal_kwargs=config.auth.internal._unpack(ignore=["type", "force_users"]),
@ -78,6 +78,9 @@ def main(argv: Optional[List[str]]=None) -> None:
atx=get_atx_class(config.atx.type)(**config.atx._unpack(ignore=["type"])), atx=get_atx_class(config.atx.type)(**config.atx._unpack(ignore=["type"])),
msd=get_msd_class(config.msd.type)(**msd_kwargs), msd=get_msd_class(config.msd.type)(**msd_kwargs),
streamer=Streamer(**config.streamer._unpack()), streamer=Streamer(**config.streamer._unpack()),
).run(**config.server._unpack())
heartbeat=config.server.heartbeat,
sync_chunk_size=config.server.sync_chunk_size,
).run(**config.server._unpack(ignore=["heartbeat", "sync_chunk_size"]))
get_logger(0).info("Bye-bye") get_logger(0).info("Bye-bye")

View File

@ -0,0 +1,20 @@
# ========================================================================== #
# #
# KVMD - The main Pi-KVM daemon. #
# #
# Copyright (C) 2018 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
# ========================================================================== #

64
kvmd/apps/kvmd/api/atx.py Normal file
View File

@ -0,0 +1,64 @@
# ========================================================================== #
# #
# KVMD - The main Pi-KVM daemon. #
# #
# Copyright (C) 2018 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
# ========================================================================== #
import aiohttp.web
from ....plugins.atx import BaseAtx
from ....validators.kvm import valid_atx_power_action
from ....validators.kvm import valid_atx_button
from ..http import exposed_http
from ..http import make_json_response
# =====
class AtxApi:
def __init__(self, atx: BaseAtx) -> None:
self.__atx = atx
# =====
@exposed_http("GET", "/atx")
async def __state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
return make_json_response(self.__atx.get_state())
@exposed_http("POST", "/atx/power")
async def __power_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
action = valid_atx_power_action(request.query.get("action"))
processing = await ({
"on": self.__atx.power_on,
"off": self.__atx.power_off,
"off_hard": self.__atx.power_off_hard,
"reset_hard": self.__atx.power_reset_hard,
}[action])()
return make_json_response({"processing": processing})
@exposed_http("POST", "/atx/click")
async def __click_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
button = valid_atx_button(request.query.get("button"))
await ({
"power": self.__atx.click_power,
"power_long": self.__atx.click_power_long,
"reset": self.__atx.click_reset,
}[button])()
return make_json_response()

93
kvmd/apps/kvmd/api/hid.py Normal file
View File

@ -0,0 +1,93 @@
# ========================================================================== #
# #
# KVMD - The main Pi-KVM daemon. #
# #
# Copyright (C) 2018 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
# ========================================================================== #
from typing import Dict
import aiohttp.web
from ....plugins.hid import BaseHid
from ....validators.basic import valid_bool
from ....validators.kvm import valid_hid_key
from ....validators.kvm import valid_hid_mouse_move
from ....validators.kvm import valid_hid_mouse_button
from ....validators.kvm import valid_hid_mouse_wheel
from ..http import exposed_http
from ..http import exposed_ws
from ..http import make_json_response
# =====
class HidApi:
def __init__(self, hid: BaseHid) -> None:
self.__hid = hid
# =====
@exposed_http("GET", "/hid/state")
async def __state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
return make_json_response(self.__hid.get_state())
@exposed_http("GET", "/hid/reset")
async def __reset_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
await self.__hid.reset()
return make_json_response()
# =====
@exposed_ws("key")
async def __ws_key_handler(self, _: aiohttp.web.WebSocketResponse, event: Dict) -> None:
try:
key = valid_hid_key(event["key"])
state = valid_bool(event["state"])
except Exception:
return
await self.__hid.send_key_event(key, state)
@exposed_ws("mouse_button")
async def __ws_mouse_button_handler(self, _: aiohttp.web.WebSocketResponse, event: Dict) -> None:
try:
button = valid_hid_mouse_button(event["button"])
state = valid_bool(event["state"])
except Exception:
return
await self.__hid.send_mouse_button_event(button, state)
@exposed_ws("mouse_move")
async def __ws_mouse_move_handler(self, _: aiohttp.web.WebSocketResponse, event: Dict) -> None:
try:
to_x = valid_hid_mouse_move(event["to"]["x"])
to_y = valid_hid_mouse_move(event["to"]["y"])
except Exception:
return
await self.__hid.send_mouse_move_event(to_x, to_y)
@exposed_ws("mouse_wheel")
async def __ws_mouse_wheel_handler(self, _: aiohttp.web.WebSocketResponse, event: Dict) -> None:
try:
delta_x = valid_hid_mouse_wheel(event["delta"]["x"])
delta_y = valid_hid_mouse_wheel(event["delta"]["y"])
except Exception:
return
await self.__hid.send_mouse_wheel_event(delta_x, delta_y)

57
kvmd/apps/kvmd/api/log.py Normal file
View File

@ -0,0 +1,57 @@
# ========================================================================== #
# #
# KVMD - The main Pi-KVM daemon. #
# #
# Copyright (C) 2018 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
# ========================================================================== #
from typing import Optional
import aiohttp.web
from ....validators.basic import valid_bool
from ....validators.kvm import valid_log_seek
from ..logreader import LogReader
from ..http import exposed_http
# =====
class LogApi:
def __init__(self, log_reader: LogReader) -> None:
self.__log_reader = log_reader
# =====
@exposed_http("GET", "/log")
async def __log_handler(self, request: aiohttp.web.Request) -> aiohttp.web.StreamResponse:
seek = valid_log_seek(request.query.get("seek", "0"))
follow = valid_bool(request.query.get("follow", "false"))
response: Optional[aiohttp.web.StreamResponse] = None
async for record in self.__log_reader.poll_log(seek, follow):
if response is None:
response = aiohttp.web.StreamResponse(status=200, reason="OK", headers={"Content-Type": "text/plain"})
await response.prepare(request)
await response.write(("[%s %s] --- %s" % (
record["dt"].strftime("%Y-%m-%d %H:%M:%S"),
record["service"],
record["msg"],
)).encode("utf-8") + b"\r\n")
return response

106
kvmd/apps/kvmd/api/msd.py Normal file
View File

@ -0,0 +1,106 @@
# ========================================================================== #
# #
# KVMD - The main Pi-KVM daemon. #
# #
# Copyright (C) 2018 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
# ========================================================================== #
import aiohttp
import aiohttp.web
from ....logging import get_logger
from ....plugins.msd import BaseMsd
from ....validators.basic import valid_bool
from ....validators.kvm import valid_msd_image_name
from ..http import exposed_http
from ..http import make_json_response
from ..http import get_multipart_field
# ======
class MsdApi:
def __init__(self, msd: BaseMsd, sync_chunk_size: int) -> None:
self.__msd = msd
self.__sync_chunk_size = sync_chunk_size
# =====
@exposed_http("GET", "/msd")
async def __state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
return make_json_response(await self.__msd.get_state())
@exposed_http("POST", "/msd/set_params")
async def __set_params_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
params = {
key: validator(request.query.get(param))
for (param, key, validator) in [
("image", "name", (lambda arg: str(arg).strip() and valid_msd_image_name(arg))),
("cdrom", "cdrom", valid_bool),
]
if request.query.get(param) is not None
}
await self.__msd.set_params(**params) # type: ignore
return make_json_response()
@exposed_http("POST", "/msd/connect")
async def __connect_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
await self.__msd.connect()
return make_json_response()
@exposed_http("POST", "/msd/disconnect")
async def __disconnect_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
await self.__msd.disconnect()
return make_json_response()
@exposed_http("POST", "/msd/write")
async def __write_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
logger = get_logger(0)
reader = await request.multipart()
name = ""
written = 0
try:
name_field = await get_multipart_field(reader, "image")
name = valid_msd_image_name((await name_field.read()).decode("utf-8"))
data_field = await get_multipart_field(reader, "data")
async with self.__msd.write_image(name):
logger.info("Writing image %r to MSD ...", name)
while True:
chunk = await data_field.read_chunk(self.__sync_chunk_size)
if not chunk:
break
written = await self.__msd.write_image_chunk(chunk)
finally:
if written != 0:
logger.info("Written image %r with size=%d bytes to MSD", name, written)
return make_json_response({"image": {"name": name, "size": written}})
@exposed_http("POST", "/msd/remove")
async def __remove_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
await self.__msd.remove(valid_msd_image_name(request.query.get("image")))
return make_json_response()
@exposed_http("POST", "/msd/reset")
async def __reset_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
await self.__msd.reset()
return make_json_response()

45
kvmd/apps/kvmd/api/wol.py Normal file
View File

@ -0,0 +1,45 @@
# ========================================================================== #
# #
# KVMD - The main Pi-KVM daemon. #
# #
# Copyright (C) 2018 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
# ========================================================================== #
import aiohttp.web
from ..wol import WakeOnLan
from ..http import exposed_http
from ..http import make_json_response
# =====
class WolApi:
def __init__(self, wol: WakeOnLan) -> None:
self.__wol = wol
# =====
@exposed_http("GET", "/wol")
async def __wol_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
return make_json_response(self.__wol.get_state())
@exposed_http("POST", "/wol/wakeup")
async def __wol_wakeup_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
await self.__wol.wakeup()
return make_json_response()

179
kvmd/apps/kvmd/http.py Normal file
View File

@ -0,0 +1,179 @@
import os
import socket
import dataclasses
import inspect
import json
from typing import List
from typing import Dict
from typing import Callable
from typing import Optional
import aiohttp
import aiohttp.web
from ...logging import get_logger
from ...validators import ValidatorError
# =====
class HttpError(Exception):
pass
class UnauthorizedError(HttpError):
pass
class ForbiddenError(HttpError):
pass
# =====
@dataclasses.dataclass(frozen=True)
class HttpExposed:
method: str
path: str
auth_required: bool
handler: Callable
_HTTP_EXPOSED = "_http_exposed"
_HTTP_METHOD = "_http_method"
_HTTP_PATH = "_http_path"
_HTTP_AUTH_REQUIRED = "_http_auth_required"
def exposed_http(http_method: str, path: str, auth_required: bool=True) -> Callable:
def set_attrs(handler: Callable) -> Callable:
setattr(handler, _HTTP_EXPOSED, True)
setattr(handler, _HTTP_METHOD, http_method)
setattr(handler, _HTTP_PATH, path)
setattr(handler, _HTTP_AUTH_REQUIRED, auth_required)
return handler
return set_attrs
def get_exposed_http(obj: object) -> List[HttpExposed]:
return [
HttpExposed(
method=getattr(handler, _HTTP_METHOD),
path=getattr(handler, _HTTP_PATH),
auth_required=getattr(handler, _HTTP_AUTH_REQUIRED),
handler=handler,
)
for name in dir(obj)
if inspect.ismethod(handler := getattr(obj, name)) and getattr(handler, _HTTP_EXPOSED, False)
]
# =====
@dataclasses.dataclass(frozen=True)
class WsExposed:
event_type: str
handler: Callable
_WS_EXPOSED = "_ws_exposed"
_WS_EVENT_TYPE = "_ws_event_type"
def exposed_ws(event_type: str) -> Callable:
def set_attrs(handler: Callable) -> Callable:
setattr(handler, _WS_EXPOSED, True)
setattr(handler, _WS_EVENT_TYPE, event_type)
return handler
return set_attrs
def get_exposed_ws(obj: object) -> List[WsExposed]:
return [
WsExposed(
event_type=getattr(handler, _WS_EVENT_TYPE),
handler=handler,
)
for name in dir(obj)
if inspect.ismethod(handler := getattr(obj, name)) and getattr(handler, _WS_EXPOSED, False)
]
# =====
def make_json_response(
result: Optional[Dict]=None,
status: int=200,
set_cookies: Optional[Dict[str, str]]=None,
) -> aiohttp.web.Response:
response = aiohttp.web.Response(
text=json.dumps({
"ok": (status == 200),
"result": (result or {}),
}, sort_keys=True, indent=4),
status=status,
content_type="application/json",
)
if set_cookies:
for (key, value) in set_cookies.items():
response.set_cookie(key, value)
return response
def make_json_exception(err: Exception, status: int) -> aiohttp.web.Response:
name = type(err).__name__
msg = str(err)
if not isinstance(err, (UnauthorizedError, ForbiddenError)):
get_logger().error("API error: %s: %s", name, msg)
return make_json_response({
"error": name,
"error_msg": msg,
}, status=status)
# =====
async def get_multipart_field(reader: aiohttp.MultipartReader, name: str) -> aiohttp.BodyPartReader:
field = await reader.next()
if not field or field.name != name:
raise ValidatorError(f"Missing {name!r} field")
return field
# =====
class HttpServer:
def run(
self,
host: str,
port: int,
unix_path: str,
unix_rm: bool,
unix_mode: int,
access_log_format: str,
) -> None:
assert port or unix_path
if unix_path:
socket_kwargs: Dict = {}
if unix_rm and os.path.exists(unix_path):
os.remove(unix_path)
server_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
server_socket.bind(unix_path)
if unix_mode:
os.chmod(unix_path, unix_mode)
socket_kwargs = {"sock": server_socket}
else:
socket_kwargs = {"host": host, "port": port}
aiohttp.web.run_app(
app=self._make_app(),
access_log_format=access_log_format,
print=self.__run_app_print,
**socket_kwargs,
)
async def _make_app(self) -> aiohttp.web.Application:
raise NotImplementedError
def __run_app_print(self, text: str) -> None:
logger = get_logger(0)
for line in text.strip().splitlines():
logger.info(line.strip())

View File

@ -22,9 +22,7 @@
import os import os
import signal import signal
import socket
import asyncio import asyncio
import inspect
import json import json
import time import time
@ -34,7 +32,9 @@ from typing import List
from typing import Dict from typing import Dict
from typing import Set from typing import Set
from typing import Callable from typing import Callable
from typing import AsyncGenerator
from typing import Optional from typing import Optional
from typing import Any
import aiohttp import aiohttp
import aiohttp.web import aiohttp.web
@ -56,22 +56,12 @@ from ...plugins.msd import BaseMsd
from ...validators import ValidatorError from ...validators import ValidatorError
from ...validators.basic import valid_bool
from ...validators.auth import valid_user from ...validators.auth import valid_user
from ...validators.auth import valid_passwd from ...validators.auth import valid_passwd
from ...validators.auth import valid_auth_token from ...validators.auth import valid_auth_token
from ...validators.kvm import valid_atx_power_action
from ...validators.kvm import valid_atx_button
from ...validators.kvm import valid_log_seek
from ...validators.kvm import valid_stream_quality from ...validators.kvm import valid_stream_quality
from ...validators.kvm import valid_stream_fps from ...validators.kvm import valid_stream_fps
from ...validators.kvm import valid_msd_image_name
from ...validators.kvm import valid_hid_key
from ...validators.kvm import valid_hid_mouse_move
from ...validators.kvm import valid_hid_mouse_button
from ...validators.kvm import valid_hid_mouse_wheel
from ... import aiotools from ... import aiotools
@ -85,6 +75,23 @@ from .streamer import Streamer
from .wol import WolDisabledError from .wol import WolDisabledError
from .wol import WakeOnLan from .wol import WakeOnLan
from .http import UnauthorizedError
from .http import ForbiddenError
from .http import HttpExposed
from .http import exposed_http
from .http import exposed_ws
from .http import get_exposed_http
from .http import get_exposed_ws
from .http import make_json_response
from .http import make_json_exception
from .http import HttpServer
from .api.log import LogApi
from .api.wol import WolApi
from .api.hid import HidApi
from .api.atx import AtxApi
from .api.msd import MsdApi
# ===== # =====
try: try:
@ -104,125 +111,12 @@ AccessLogger._format_P = staticmethod(_format_P) # type: ignore # pylint: disa
# ===== # =====
class HttpError(Exception):
pass
class UnauthorizedError(HttpError):
pass
class ForbiddenError(HttpError):
pass
def _json(
result: Optional[Dict]=None,
status: int=200,
set_cookies: Optional[Dict[str, str]]=None,
) -> aiohttp.web.Response:
response = aiohttp.web.Response(
text=json.dumps({
"ok": (status == 200),
"result": (result or {}),
}, sort_keys=True, indent=4),
status=status,
content_type="application/json",
)
if set_cookies:
for (key, value) in set_cookies.items():
response.set_cookie(key, value)
return response
def _json_exception(err: Exception, status: int) -> aiohttp.web.Response:
name = type(err).__name__
msg = str(err)
if not isinstance(err, (UnauthorizedError, ForbiddenError)):
get_logger().error("API error: %s: %s", name, msg)
return _json({
"error": name,
"error_msg": msg,
}, status=status)
async def _get_multipart_field(reader: aiohttp.MultipartReader, name: str) -> aiohttp.BodyPartReader:
field = await reader.next()
if not field or field.name != name:
raise ValidatorError(f"Missing {name!r} field")
return field
_ATTR_EXPOSED = "_server_exposed"
_ATTR_EXPOSED_METHOD = "_server_exposed_method"
_ATTR_EXPOSED_PATH = "_server_exposed_path"
_ATTR_SYSTEM_TASK = "system_task"
_HEADER_AUTH_USER = "X-KVMD-User" _HEADER_AUTH_USER = "X-KVMD-User"
_HEADER_AUTH_PASSWD = "X-KVMD-Passwd" _HEADER_AUTH_PASSWD = "X-KVMD-Passwd"
_COOKIE_AUTH_TOKEN = "auth_token" _COOKIE_AUTH_TOKEN = "auth_token"
def _exposed(http_method: str, path: str, auth_required: bool=True) -> Callable:
def make_wrapper(handler: Callable) -> Callable:
async def wrapper(self: "Server", request: aiohttp.web.Request) -> aiohttp.web.Response:
try:
if auth_required:
user = request.headers.get(_HEADER_AUTH_USER, "")
passwd = request.headers.get(_HEADER_AUTH_PASSWD, "")
token = request.cookies.get(_COOKIE_AUTH_TOKEN, "")
if user:
user = valid_user(user)
setattr(request, _ATTR_KVMD_AUTH_INFO, f"{user} (xhdr)")
if not (await self._auth_manager.authorize(user, valid_passwd(passwd))):
raise ForbiddenError("Forbidden")
elif token:
user = self._auth_manager.check(valid_auth_token(token))
if not user:
setattr(request, _ATTR_KVMD_AUTH_INFO, "- (token)")
raise ForbiddenError("Forbidden")
setattr(request, _ATTR_KVMD_AUTH_INFO, f"{user} (token)")
else:
raise UnauthorizedError("Unauthorized")
return (await handler(self, request))
except (AtxIsBusyError, MsdIsBusyError) as err:
return _json_exception(err, 409)
except (ValidatorError, AtxOperationError, MsdOperationError, WolDisabledError) as err:
return _json_exception(err, 400)
except UnauthorizedError as err:
return _json_exception(err, 401)
except ForbiddenError as err:
return _json_exception(err, 403)
setattr(wrapper, _ATTR_EXPOSED, True)
setattr(wrapper, _ATTR_EXPOSED_METHOD, http_method)
setattr(wrapper, _ATTR_EXPOSED_PATH, path)
return wrapper
return make_wrapper
def _system_task(method: Callable) -> Callable:
async def wrapper(self: "Server") -> None:
try:
await method(self)
raise RuntimeError(f"Dead system task: {method}")
except asyncio.CancelledError:
pass
except Exception:
get_logger().exception("Unhandled exception, killing myself ...")
os.kill(os.getpid(), signal.SIGTERM)
setattr(wrapper, _ATTR_SYSTEM_TASK, True)
return wrapper
class _Events(Enum): class _Events(Enum):
INFO_STATE = "info_state" INFO_STATE = "info_state"
WOL_STATE = "wol_state" WOL_STATE = "wol_state"
@ -232,8 +126,8 @@ class _Events(Enum):
STREAMER_STATE = "streamer_state" STREAMER_STATE = "streamer_state"
class Server: # pylint: disable=too-many-instance-attributes class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-instance-attributes
def __init__( def __init__( # pylint: disable=too-many-arguments
self, self,
auth_manager: AuthManager, auth_manager: AuthManager,
info_manager: InfoManager, info_manager: InfoManager,
@ -244,11 +138,13 @@ class Server: # pylint: disable=too-many-instance-attributes
atx: BaseAtx, atx: BaseAtx,
msd: BaseMsd, msd: BaseMsd,
streamer: Streamer, streamer: Streamer,
heartbeat: float,
sync_chunk_size: int,
) -> None: ) -> None:
self._auth_manager = auth_manager self.__auth_manager = auth_manager
self.__info_manager = info_manager self.__info_manager = info_manager
self.__log_reader = log_reader
self.__wol = wol self.__wol = wol
self.__hid = hid self.__hid = hid
@ -256,8 +152,19 @@ class Server: # pylint: disable=too-many-instance-attributes
self.__msd = msd self.__msd = msd
self.__streamer = streamer self.__streamer = streamer
self.__heartbeat: Optional[float] = None # Assigned in run() for consistance self.__heartbeat = heartbeat
self.__sync_chunk_size: Optional[int] = None # Ditto
self.__apis: List[object] = [
self,
LogApi(log_reader),
WolApi(wol),
HidApi(hid),
AtxApi(atx),
MsdApi(msd, sync_chunk_size),
]
self.__ws_handlers: Dict[str, Callable] = {}
self.__sockets: Set[aiohttp.web.WebSocketResponse] = set() self.__sockets: Set[aiohttp.web.WebSocketResponse] = set()
self.__sockets_lock = asyncio.Lock() self.__sockets_lock = asyncio.Lock()
@ -266,45 +173,6 @@ class Server: # pylint: disable=too-many-instance-attributes
self.__reset_streamer = False self.__reset_streamer = False
self.__streamer_params = streamer.get_params() self.__streamer_params = streamer.get_params()
def run(
self,
host: str,
port: int,
unix_path: str,
unix_rm: bool,
unix_mode: int,
heartbeat: float,
sync_chunk_size: int,
access_log_format: str,
) -> None:
self.__hid.start()
setproctitle.setproctitle(f"kvmd/main: {setproctitle.getproctitle()}")
self.__heartbeat = heartbeat
self.__sync_chunk_size = sync_chunk_size
assert port or unix_path
if unix_path:
socket_kwargs: Dict = {}
if unix_rm and os.path.exists(unix_path):
os.remove(unix_path)
server_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
server_socket.bind(unix_path)
if unix_mode:
os.chmod(unix_path, unix_mode)
socket_kwargs = {"sock": server_socket}
else:
socket_kwargs = {"host": host, "port": port}
aiohttp.web.run_app(
app=self.__make_app(),
access_log_format=access_log_format,
print=self.__run_app_print,
**socket_kwargs,
)
async def __make_info(self) -> Dict: async def __make_info(self) -> Dict:
return { return {
"version": { "version": {
@ -318,66 +186,60 @@ class Server: # pylint: disable=too-many-instance-attributes
# ===== AUTH # ===== AUTH
@_exposed("POST", "/auth/login", auth_required=False) @exposed_http("POST", "/auth/login", auth_required=False)
async def __auth_login_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response: async def __auth_login_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
credentials = await request.post() credentials = await request.post()
token = await self._auth_manager.login( token = await self.__auth_manager.login(
user=valid_user(credentials.get("user", "")), user=valid_user(credentials.get("user", "")),
passwd=valid_passwd(credentials.get("passwd", "")), passwd=valid_passwd(credentials.get("passwd", "")),
) )
if token: if token:
return _json({}, set_cookies={_COOKIE_AUTH_TOKEN: token}) return make_json_response({}, set_cookies={_COOKIE_AUTH_TOKEN: token})
raise ForbiddenError("Forbidden") raise ForbiddenError("Forbidden")
@_exposed("POST", "/auth/logout") @exposed_http("POST", "/auth/logout")
async def __auth_logout_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response: async def __auth_logout_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
token = valid_auth_token(request.cookies.get(_COOKIE_AUTH_TOKEN, "")) token = valid_auth_token(request.cookies.get(_COOKIE_AUTH_TOKEN, ""))
self._auth_manager.logout(token) self.__auth_manager.logout(token)
return _json({}) return make_json_response({})
@_exposed("GET", "/auth/check") @exposed_http("GET", "/auth/check")
async def __auth_check_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: async def __auth_check_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
return _json({}) return make_json_response({})
# ===== SYSTEM # ===== SYSTEM
@_exposed("GET", "/info") @exposed_http("GET", "/info")
async def __info_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: async def __info_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
return _json(await self.__make_info()) return make_json_response(await self.__make_info())
@_exposed("GET", "/log") # ===== STREAMER
async def __log_handler(self, request: aiohttp.web.Request) -> aiohttp.web.StreamResponse:
seek = valid_log_seek(request.query.get("seek", "0"))
follow = valid_bool(request.query.get("follow", "false"))
response: Optional[aiohttp.web.StreamResponse] = None
async for record in self.__log_reader.poll_log(seek, follow):
if response is None:
response = aiohttp.web.StreamResponse(status=200, reason="OK", headers={"Content-Type": "text/plain"})
await response.prepare(request)
await response.write(("[%s %s] --- %s" % (
record["dt"].strftime("%Y-%m-%d %H:%M:%S"),
record["service"],
record["msg"],
)).encode("utf-8") + b"\r\n")
return response
# ===== Wake-on-LAN @exposed_http("GET", "/streamer")
async def __streamer_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
return make_json_response(await self.__streamer.get_state())
@_exposed("GET", "/wol") @exposed_http("POST", "/streamer/set_params")
async def __wol_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: async def __streamer_set_params_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
return _json(self.__wol.get_state()) for (name, validator) in [
("quality", valid_stream_quality),
("desired_fps", valid_stream_fps),
]:
value = request.query.get(name)
if value:
self.__streamer_params[name] = validator(value)
return make_json_response()
@_exposed("POST", "/wol/wakeup") @exposed_http("POST", "/streamer/reset")
async def __wol_wakeup_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response: async def __streamer_reset_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
await self.__wol.wakeup() self.__reset_streamer = True
return _json() return make_json_response()
# ===== WEBSOCKET # ===== WEBSOCKET
@_exposed("GET", "/ws") @exposed_http("GET", "/ws")
async def __ws_handler(self, request: aiohttp.web.Request) -> aiohttp.web.WebSocketResponse: async def __ws_handler(self, request: aiohttp.web.Request) -> aiohttp.web.WebSocketResponse:
logger = get_logger(0) logger = get_logger(0)
assert self.__heartbeat is not None
ws = aiohttp.web.WebSocketResponse(heartbeat=self.__heartbeat) ws = aiohttp.web.WebSocketResponse(heartbeat=self.__heartbeat)
await ws.prepare(request) await ws.prepare(request)
await self.__register_socket(ws) await self.__register_socket(ws)
@ -396,203 +258,95 @@ class Server: # pylint: disable=too-many-instance-attributes
except Exception as err: except Exception as err:
logger.error("Can't parse JSON event from websocket: %s", err) logger.error("Can't parse JSON event from websocket: %s", err)
else: else:
event_type = event.get("event_type") handler = self.__ws_handlers.get(event.get("event_type"))
if event_type == "ping": if handler:
await ws.send_str(json.dumps({"msg_type": "pong"})) await handler(ws, event)
elif event_type == "key":
await self.__handle_ws_key_event(event)
elif event_type == "mouse_button":
await self.__handle_ws_mouse_button_event(event)
elif event_type == "mouse_move":
await self.__handle_ws_mouse_move_event(event)
elif event_type == "mouse_wheel":
await self.__handle_ws_mouse_wheel_event(event)
else: else:
logger.error("Unknown websocket event: %r", event) logger.error("Unknown websocket event: %r", event)
else: else:
break break
return ws return ws
async def __handle_ws_key_event(self, event: Dict) -> None: @exposed_ws("ping")
try: async def __ws_ping_handler(self, ws: aiohttp.web.WebSocketResponse, _: Dict) -> None:
key = valid_hid_key(event["key"]) await ws.send_str(json.dumps({"msg_type": "pong"}))
state = valid_bool(event["state"])
except Exception:
return
await self.__hid.send_key_event(key, state)
async def __handle_ws_mouse_button_event(self, event: Dict) -> None:
try:
button = valid_hid_mouse_button(event["button"])
state = valid_bool(event["state"])
except Exception:
return
await self.__hid.send_mouse_button_event(button, state)
async def __handle_ws_mouse_move_event(self, event: Dict) -> None:
try:
to_x = valid_hid_mouse_move(event["to"]["x"])
to_y = valid_hid_mouse_move(event["to"]["y"])
except Exception:
return
await self.__hid.send_mouse_move_event(to_x, to_y)
async def __handle_ws_mouse_wheel_event(self, event: Dict) -> None:
try:
delta_x = valid_hid_mouse_wheel(event["delta"]["x"])
delta_y = valid_hid_mouse_wheel(event["delta"]["y"])
except Exception:
return
await self.__hid.send_mouse_wheel_event(delta_x, delta_y)
# ===== HID
@_exposed("GET", "/hid")
async def __hid_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
return _json(self.__hid.get_state())
@_exposed("POST", "/hid/reset")
async def __hid_reset_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
await self.__hid.reset()
return _json()
# ===== ATX
@_exposed("GET", "/atx")
async def __atx_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
return _json(self.__atx.get_state())
@_exposed("POST", "/atx/power")
async def __atx_power_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
action = valid_atx_power_action(request.query.get("action"))
processing = await ({
"on": self.__atx.power_on,
"off": self.__atx.power_off,
"off_hard": self.__atx.power_off_hard,
"reset_hard": self.__atx.power_reset_hard,
}[action])()
return _json({"processing": processing})
@_exposed("POST", "/atx/click")
async def __atx_click_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
button = valid_atx_button(request.query.get("button"))
await ({
"power": self.__atx.click_power,
"power_long": self.__atx.click_power_long,
"reset": self.__atx.click_reset,
}[button])()
return _json()
# ===== MSD
@_exposed("GET", "/msd")
async def __msd_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
return _json(await self.__msd.get_state())
@_exposed("POST", "/msd/set_params")
async def __msd_set_params_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
params = {
key: validator(request.query.get(param))
for (param, key, validator) in [
("image", "name", (lambda arg: str(arg).strip() and valid_msd_image_name(arg))),
("cdrom", "cdrom", valid_bool),
]
if request.query.get(param) is not None
}
await self.__msd.set_params(**params) # type: ignore
return _json()
@_exposed("POST", "/msd/connect")
async def __msd_connect_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
await self.__msd.connect()
return _json()
@_exposed("POST", "/msd/disconnect")
async def __msd_disconnect_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
await self.__msd.disconnect()
return _json()
@_exposed("POST", "/msd/write")
async def __msd_write_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
assert self.__sync_chunk_size is not None
logger = get_logger(0)
reader = await request.multipart()
name = ""
written = 0
try:
name_field = await _get_multipart_field(reader, "image")
name = valid_msd_image_name((await name_field.read()).decode("utf-8"))
data_field = await _get_multipart_field(reader, "data")
async with self.__msd.write_image(name):
logger.info("Writing image %r to MSD ...", name)
while True:
chunk = await data_field.read_chunk(self.__sync_chunk_size)
if not chunk:
break
written = await self.__msd.write_image_chunk(chunk)
finally:
if written != 0:
logger.info("Written image %r with size=%d bytes to MSD", name, written)
return _json({"image": {"name": name, "size": written}})
@_exposed("POST", "/msd/remove")
async def __msd_remove_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
await self.__msd.remove(valid_msd_image_name(request.query.get("image")))
return _json()
@_exposed("POST", "/msd/reset")
async def __msd_reset_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
await self.__msd.reset()
return _json()
# ===== STREAMER
@_exposed("GET", "/streamer")
async def __streamer_state_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
return _json(await self.__streamer.get_state())
@_exposed("POST", "/streamer/set_params")
async def __streamer_set_params_handler(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
for (name, validator) in [
("quality", valid_stream_quality),
("desired_fps", valid_stream_fps),
]:
value = request.query.get(name)
if value:
self.__streamer_params[name] = validator(value)
return _json()
@_exposed("POST", "/streamer/reset")
async def __streamer_reset_handler(self, _: aiohttp.web.Request) -> aiohttp.web.Response:
self.__reset_streamer = True
return _json()
# ===== SYSTEM STUFF # ===== SYSTEM STUFF
async def __make_app(self) -> aiohttp.web.Application: def run(self, **kwargs: Any) -> None: # type: ignore # pylint: disable=arguments-differ
self.__hid.start()
setproctitle.setproctitle(f"kvmd/main: {setproctitle.getproctitle()}")
super().run(**kwargs)
async def _make_app(self) -> aiohttp.web.Application:
app = aiohttp.web.Application() app = aiohttp.web.Application()
app.on_shutdown.append(self.__on_shutdown) app.on_shutdown.append(self.__on_shutdown)
app.on_cleanup.append(self.__on_cleanup) app.on_cleanup.append(self.__on_cleanup)
for name in dir(self): self.__run_system_task(self.__stream_controller)
method = getattr(self, name) self.__run_system_task(self.__poll_dead_sockets)
if inspect.ismethod(method): self.__run_system_task(self.__poll_state, _Events.HID_STATE, self.__hid.poll_state())
if getattr(method, _ATTR_SYSTEM_TASK, False): self.__run_system_task(self.__poll_state, _Events.ATX_STATE, self.__atx.poll_state())
self.__system_tasks.append(asyncio.create_task(method())) self.__run_system_task(self.__poll_state, _Events.MSD_STATE, self.__msd.poll_state())
elif getattr(method, _ATTR_EXPOSED, False): self.__run_system_task(self.__poll_state, _Events.STREAMER_STATE, self.__streamer.poll_state())
app.router.add_route(
getattr(method, _ATTR_EXPOSED_METHOD), for api in self.__apis:
getattr(method, _ATTR_EXPOSED_PATH), for http_exposed in get_exposed_http(api):
method, self.__add_app_route(app, http_exposed)
) for ws_exposed in get_exposed_ws(api):
self.__ws_handlers[ws_exposed.event_type] = ws_exposed.handler
return app return app
def __run_app_print(self, text: str) -> None: def __run_system_task(self, method: Callable, *args: Any) -> None:
logger = get_logger() async def wrapper() -> None:
for line in text.strip().splitlines(): try:
logger.info(line.strip()) await method(*args)
raise RuntimeError(f"Dead system task: {method.__name__}"
f"({', '.join(getattr(arg, '__name__', str(arg)) for arg in args)})")
except asyncio.CancelledError:
pass
except Exception:
get_logger().exception("Unhandled exception, killing myself ...")
os.kill(os.getpid(), signal.SIGTERM)
self.__system_tasks.append(asyncio.create_task(wrapper()))
def __add_app_route(self, app: aiohttp.web.Application, exposed: HttpExposed) -> None:
async def wrapper(request: aiohttp.web.Request) -> aiohttp.web.Response:
try:
if exposed.auth_required:
user = request.headers.get("X-KVMD-User", "")
passwd = request.headers.get("X-KVMD-Passwd", "")
token = request.cookies.get(_COOKIE_AUTH_TOKEN, "")
if user:
user = valid_user(user)
setattr(request, _ATTR_KVMD_AUTH_INFO, f"{user} (xhdr)")
if not (await self.__auth_manager.authorize(user, valid_passwd(passwd))):
raise ForbiddenError("Forbidden")
elif token:
user = self.__auth_manager.check(valid_auth_token(token))
if not user:
setattr(request, _ATTR_KVMD_AUTH_INFO, "- (token)")
raise ForbiddenError("Forbidden")
setattr(request, _ATTR_KVMD_AUTH_INFO, f"{user} (token)")
else:
raise UnauthorizedError("Unauthorized")
return (await exposed.handler(request))
except (AtxIsBusyError, MsdIsBusyError) as err:
return make_json_exception(err, 409)
except (ValidatorError, AtxOperationError, MsdOperationError, WolDisabledError) as err:
return make_json_exception(err, 400)
except UnauthorizedError as err:
return make_json_exception(err, 401)
except ForbiddenError as err:
return make_json_exception(err, 403)
app.router.add_route(exposed.method, exposed.path, wrapper)
async def __on_shutdown(self, _: aiohttp.web.Application) -> None: async def __on_shutdown(self, _: aiohttp.web.Application) -> None:
logger = get_logger(0) logger = get_logger(0)
@ -614,7 +368,7 @@ class Server: # pylint: disable=too-many-instance-attributes
async def __on_cleanup(self, _: aiohttp.web.Application) -> None: async def __on_cleanup(self, _: aiohttp.web.Application) -> None:
logger = get_logger(0) logger = get_logger(0)
for (name, obj) in [ for (name, obj) in [
("Auth manager", self._auth_manager), ("Auth manager", self.__auth_manager),
("Streamer", self.__streamer), ("Streamer", self.__streamer),
("MSD", self.__msd), ("MSD", self.__msd),
("ATX", self.__atx), ("ATX", self.__atx),
@ -663,7 +417,6 @@ class Server: # pylint: disable=too-many-instance-attributes
# ===== SYSTEM TASKS # ===== SYSTEM TASKS
@_system_task
async def __stream_controller(self) -> None: async def __stream_controller(self) -> None:
prev = 0 prev = 0
shutdown_at = 0.0 shutdown_at = 0.0
@ -688,7 +441,6 @@ class Server: # pylint: disable=too-many-instance-attributes
prev = cur prev = cur
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
@_system_task
async def __poll_dead_sockets(self) -> None: async def __poll_dead_sockets(self) -> None:
while True: while True:
for ws in list(self.__sockets): for ws in list(self.__sockets):
@ -696,22 +448,6 @@ class Server: # pylint: disable=too-many-instance-attributes
await self.__remove_socket(ws) await self.__remove_socket(ws)
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
@_system_task async def __poll_state(self, event_type: _Events, poller: AsyncGenerator[Dict, None]) -> None:
async def __poll_hid_state(self) -> None: async for state in poller:
async for state in self.__hid.poll_state(): await self.__broadcast_event(event_type, state)
await self.__broadcast_event(_Events.HID_STATE, state)
@_system_task
async def __poll_atx_state(self) -> None:
async for state in self.__atx.poll_state():
await self.__broadcast_event(_Events.ATX_STATE, state)
@_system_task
async def __poll_msd_state(self) -> None:
async for state in self.__msd.poll_state():
await self.__broadcast_event(_Events.MSD_STATE, state)
@_system_task
async def __poll_streamer_state(self) -> None:
async for state in self.__streamer.poll_state():
await self.__broadcast_event(_Events.STREAMER_STATE, state)

View File

@ -88,6 +88,7 @@ def main() -> None:
"kvmd.plugins.msd.otg", "kvmd.plugins.msd.otg",
"kvmd.apps", "kvmd.apps",
"kvmd.apps.kvmd", "kvmd.apps.kvmd",
"kvmd.apps.kvmd.api",
"kvmd.apps.otg", "kvmd.apps.otg",
"kvmd.apps.otg.hid", "kvmd.apps.otg.hid",
"kvmd.apps.otgmsd", "kvmd.apps.otgmsd",

View File

@ -35,7 +35,7 @@ deps =
[testenv:vulture] [testenv:vulture]
whitelist_externals = bash whitelist_externals = bash
commands = bash -c 'vulture --ignore-names=_format_P,Plugin --ignore-decorators=@_exposed,@_system_task,@pytest.fixture kvmd testenv/tests *.py testenv/linters/vulture-wl.py' commands = bash -c 'vulture --ignore-names=_format_P,Plugin --ignore-decorators=@exposed_http,@exposed_ws,@pytest.fixture kvmd testenv/tests *.py testenv/linters/vulture-wl.py'
deps = deps =
vulture vulture
-rrequirements.txt -rrequirements.txt