diff --git a/kvmd/apps/kvmd/api/auth.py b/kvmd/apps/kvmd/api/auth.py index bdad5305..d5df01ac 100644 --- a/kvmd/apps/kvmd/api/auth.py +++ b/kvmd/apps/kvmd/api/auth.py @@ -54,18 +54,18 @@ async def check_request_auth(auth_manager: AuthManager, exposed: HttpExposed, re user = valid_user(user) passwd = req.headers.get("X-KVMD-Passwd", "") set_request_auth_info(req, f"{user} (xhdr)") - if not (await auth_manager.authorize(user, valid_passwd(passwd))): - raise ForbiddenError() - return + if (await auth_manager.authorize(user, valid_passwd(passwd))): + return + raise ForbiddenError() token = req.cookies.get(_COOKIE_AUTH_TOKEN, "") if token: user = auth_manager.check(valid_auth_token(token)) # type: ignore - if not user: - set_request_auth_info(req, "- (token)") - raise ForbiddenError() - set_request_auth_info(req, f"{user} (token)") - return + if user: + set_request_auth_info(req, f"{user} (token)") + return + set_request_auth_info(req, "- (token)") + raise ForbiddenError() basic_auth = req.headers.get("Authorization", "") if basic_auth and basic_auth[:6].lower() == "basic ": @@ -75,9 +75,9 @@ async def check_request_auth(auth_manager: AuthManager, exposed: HttpExposed, re raise UnauthorizedError() user = valid_user(user) set_request_auth_info(req, f"{user} (basic)") - if not (await auth_manager.authorize(user, valid_passwd(passwd))): - raise ForbiddenError() - return + if (await auth_manager.authorize(user, valid_passwd(passwd))): + return + raise ForbiddenError() if exposed.allow_usc: creds = get_request_unix_credentials(req) @@ -86,6 +86,7 @@ async def check_request_auth(auth_manager: AuthManager, exposed: HttpExposed, re if user: set_request_auth_info(req, f"{user}[{creds.uid}] (unix)") return + raise UnauthorizedError() raise UnauthorizedError() diff --git a/testenv/tests/apps/kvmd/test_auth.py b/testenv/tests/apps/kvmd/test_auth.py index b2d4f2df..b5c8dfbb 100644 --- a/testenv/tests/apps/kvmd/test_auth.py +++ b/testenv/tests/apps/kvmd/test_auth.py @@ -22,15 +22,24 @@ import os import asyncio +import base64 import contextlib from typing import AsyncGenerator +from aiohttp.test_utils import make_mocked_request + import pytest +from kvmd.validators import ValidatorError + from kvmd.yamlconf import make_config from kvmd.apps.kvmd.auth import AuthManager +from kvmd.apps.kvmd.api.auth import check_request_auth + +from kvmd.htserver import UnauthorizedError +from kvmd.htserver import ForbiddenError from kvmd.plugins.auth import get_auth_service_class @@ -83,6 +92,57 @@ async def _get_configured_manager( # ===== +@pytest.mark.asyncio +async def test_ok__request(tmpdir) -> None: # type: ignore + path = os.path.abspath(str(tmpdir.join("htpasswd"))) + + htpasswd = KvmdHtpasswdFile(path, new=True) + htpasswd.set_password("admin", "pass") + htpasswd.save() + + async with _get_configured_manager([], path) as manager: + async def check(exposed: HttpExposed, **kwargs) -> None: # type: ignore + await check_request_auth(manager, exposed, make_mocked_request(exposed.method, exposed.path, **kwargs)) + + await check(_E_FREE) + with pytest.raises(UnauthorizedError): + await check(_E_AUTH) + + # === + + with pytest.raises(ForbiddenError): + await check(_E_AUTH, headers={"X-KVMD-User": "admin", "X-KVMD-Passwd": "foo"}) + with pytest.raises(ForbiddenError): + await check(_E_AUTH, headers={"X-KVMD-User": "adminx", "X-KVMD-Passwd": "pass"}) + + await check(_E_AUTH, headers={"X-KVMD-User": "admin", "X-KVMD-Passwd": "pass"}) + + # === + + with pytest.raises(UnauthorizedError): + await check(_E_AUTH, headers={"Cookie": "auth_token="}) + with pytest.raises(ValidatorError): + await check(_E_AUTH, headers={"Cookie": "auth_token=0"}) + with pytest.raises(ForbiddenError): + await check(_E_AUTH, headers={"Cookie": f"auth_token={'0' * 64}"}) + + token = await manager.login("admin", "pass", 0) + assert token + await check(_E_AUTH, headers={"Cookie": f"auth_token={token}"}) + manager.logout(token) + with pytest.raises(ForbiddenError): + await check(_E_AUTH, headers={"Cookie": f"auth_token={token}"}) + + # === + + with pytest.raises(ForbiddenError): + await check(_E_AUTH, headers={"Authorization": "basic " + base64.b64encode(b"admin:foo").decode()}) + with pytest.raises(ForbiddenError): + await check(_E_AUTH, headers={"Authorization": "basic " + base64.b64encode(b"adminx:pass").decode()}) + + await check(_E_AUTH, headers={"Authorization": "basic " + base64.b64encode(b"admin:pass").decode()}) + + @pytest.mark.asyncio async def test_ok__expire(tmpdir) -> None: # type: ignore path = os.path.abspath(str(tmpdir.join("htpasswd")))