mirror of
https://github.com/mofeng-git/One-KVM.git
synced 2025-12-12 17:20:30 +08:00
96 lines
4.1 KiB
Python
96 lines
4.1 KiB
Python
# ========================================================================== #
|
|
# #
|
|
# KVMD - The main Pi-KVM daemon. #
|
|
# #
|
|
# Copyright (C) 2018 Maxim Devaev <mdevaev@gmail.com> #
|
|
# #
|
|
# This program is free software: you can redistribute it and/or modify #
|
|
# it under the terms of the GNU General Public License as published by #
|
|
# the Free Software Foundation, either version 3 of the License, or #
|
|
# (at your option) any later version. #
|
|
# #
|
|
# This program is distributed in the hope that it will be useful, #
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
|
|
# GNU General Public License for more details. #
|
|
# #
|
|
# You should have received a copy of the GNU General Public License #
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
|
|
# #
|
|
# ========================================================================== #
|
|
|
|
|
|
from aiohttp.web import Request
|
|
from aiohttp.web import Response
|
|
|
|
from ....validators.auth import valid_user
|
|
from ....validators.auth import valid_passwd
|
|
from ....validators.auth import valid_auth_token
|
|
|
|
from ..http import UnauthorizedError
|
|
from ..http import ForbiddenError
|
|
from ..http import HttpExposed
|
|
from ..http import exposed_http
|
|
from ..http import make_json_response
|
|
from ..http import set_request_auth_info
|
|
|
|
from ..auth import AuthManager
|
|
|
|
|
|
# =====
|
|
_COOKIE_AUTH_TOKEN = "auth_token"
|
|
|
|
|
|
async def check_request_auth(auth_manager: AuthManager, exposed: HttpExposed, request: Request) -> None:
|
|
if exposed.auth_required and auth_manager.is_auth_enabled():
|
|
user = request.headers.get("X-KVMD-User", "")
|
|
passwd = request.headers.get("X-KVMD-Passwd", "")
|
|
token = request.cookies.get(_COOKIE_AUTH_TOKEN, "")
|
|
|
|
if user:
|
|
user = valid_user(user)
|
|
set_request_auth_info(request, f"{user} (xhdr)")
|
|
if not (await auth_manager.authorize(user, valid_passwd(passwd))):
|
|
raise ForbiddenError()
|
|
|
|
elif token:
|
|
user = auth_manager.check(valid_auth_token(token))
|
|
if not user:
|
|
set_request_auth_info(request, "- (token)")
|
|
raise ForbiddenError()
|
|
set_request_auth_info(request, f"{user} (token)")
|
|
|
|
else:
|
|
raise UnauthorizedError()
|
|
|
|
|
|
class AuthApi:
|
|
def __init__(self, auth_manager: AuthManager) -> None:
|
|
self.__auth_manager = auth_manager
|
|
|
|
# =====
|
|
|
|
@exposed_http("POST", "/auth/login", auth_required=False)
|
|
async def __login_handler(self, request: Request) -> Response:
|
|
if self.__auth_manager.is_auth_enabled():
|
|
credentials = await request.post()
|
|
token = await self.__auth_manager.login(
|
|
user=valid_user(credentials.get("user", "")),
|
|
passwd=valid_passwd(credentials.get("passwd", "")),
|
|
)
|
|
if token:
|
|
return make_json_response(set_cookies={_COOKIE_AUTH_TOKEN: token})
|
|
raise ForbiddenError()
|
|
return make_json_response()
|
|
|
|
@exposed_http("POST", "/auth/logout")
|
|
async def __logout_handler(self, request: Request) -> Response:
|
|
if self.__auth_manager.is_auth_enabled():
|
|
token = valid_auth_token(request.cookies.get(_COOKIE_AUTH_TOKEN, ""))
|
|
self.__auth_manager.logout(token)
|
|
return make_json_response()
|
|
|
|
@exposed_http("GET", "/auth/check")
|
|
async def __check_handler(self, _: Request) -> Response:
|
|
return make_json_response()
|