pikvm/pikvm#1069: added option to disable auth on prometheus api

This commit is contained in:
Maxim Devaev 2023-08-18 00:21:07 +03:00
parent 32560563dc
commit 61ce81ab64
5 changed files with 64 additions and 3 deletions

View File

@ -397,6 +397,12 @@ def _get_config_scheme() -> dict:
"enabled": Option(True, type=valid_bool), "enabled": Option(True, type=valid_bool),
}, },
"prometheus": {
"auth": {
"enabled": Option(True, type=valid_bool),
},
},
"hid": { "hid": {
"type": Option("", type=valid_stripped_string_not_empty), "type": Option("", type=valid_stripped_string_not_empty),

View File

@ -75,6 +75,7 @@ def main(argv: (list[str] | None)=None) -> None:
KvmdServer( KvmdServer(
auth_manager=AuthManager( auth_manager=AuthManager(
enabled=config.auth.enabled, enabled=config.auth.enabled,
unauth_paths=([] if config.prometheus.auth.enabled else ["/export/prometheus/metrics"]),
internal_type=config.auth.internal.type, internal_type=config.auth.internal.type,
internal_kwargs=config.auth.internal._unpack(ignore=["type", "force_users"]), internal_kwargs=config.auth.internal._unpack(ignore=["type", "force_users"]),

View File

@ -44,7 +44,7 @@ _COOKIE_AUTH_TOKEN = "auth_token"
async def check_request_auth(auth_manager: AuthManager, exposed: HttpExposed, request: Request) -> None: async def check_request_auth(auth_manager: AuthManager, exposed: HttpExposed, request: Request) -> None:
if exposed.auth_required and auth_manager.is_auth_enabled(): if auth_manager.is_auth_required(exposed):
user = request.headers.get("X-KVMD-User", "") user = request.headers.get("X-KVMD-User", "")
if user: if user:
user = valid_user(user) user = valid_user(user)

View File

@ -30,12 +30,15 @@ from ... import aiotools
from ...plugins.auth import BaseAuthService from ...plugins.auth import BaseAuthService
from ...plugins.auth import get_auth_service_class from ...plugins.auth import get_auth_service_class
from ...htserver import HttpExposed
# ===== # =====
class AuthManager: class AuthManager:
def __init__( def __init__(
self, self,
enabled: bool, enabled: bool,
unauth_paths: list[str],
internal_type: str, internal_type: str,
internal_kwargs: dict, internal_kwargs: dict,
@ -51,6 +54,10 @@ class AuthManager:
if not enabled: if not enabled:
get_logger().warning("AUTHORIZATION IS DISABLED") get_logger().warning("AUTHORIZATION IS DISABLED")
self.__unauth_paths = frozenset(unauth_paths) # To speed up
for path in self.__unauth_paths:
get_logger().warning("Authorization is disabled for API %r", path)
self.__internal_service: (BaseAuthService | None) = None self.__internal_service: (BaseAuthService | None) = None
if enabled: if enabled:
self.__internal_service = get_auth_service_class(internal_type)(**internal_kwargs) self.__internal_service = get_auth_service_class(internal_type)(**internal_kwargs)
@ -70,6 +77,13 @@ class AuthManager:
def is_auth_enabled(self) -> bool: def is_auth_enabled(self) -> bool:
return self.__enabled return self.__enabled
def is_auth_required(self, exposed: HttpExposed) -> bool:
return (
self.is_auth_enabled()
and exposed.auth_required
and exposed.path not in self.__unauth_paths
)
async def authorize(self, user: str, passwd: str) -> bool: async def authorize(self, user: str, passwd: str) -> bool:
assert user == user.strip() assert user == user.strip()
assert user assert user

View File

@ -35,8 +35,15 @@ from kvmd.apps.kvmd.auth import AuthManager
from kvmd.plugins.auth import get_auth_service_class from kvmd.plugins.auth import get_auth_service_class
from kvmd.htserver import HttpExposed
# ===== # =====
_E_AUTH = HttpExposed("GET", "/foo_auth", True, (lambda: None))
_E_UNAUTH = HttpExposed("GET", "/bar_unauth", True, (lambda: None))
_E_FREE = HttpExposed("GET", "/baz_free", False, (lambda: None))
def _make_service_kwargs(path: str) -> dict: def _make_service_kwargs(path: str) -> dict:
cls = get_auth_service_class("htpasswd") cls = get_auth_service_class("htpasswd")
scheme = cls.get_plugin_options() scheme = cls.get_plugin_options()
@ -45,6 +52,7 @@ def _make_service_kwargs(path: str) -> dict:
@contextlib.asynccontextmanager @contextlib.asynccontextmanager
async def _get_configured_manager( async def _get_configured_manager(
unauth_paths: list[str],
internal_path: str, internal_path: str,
external_path: str="", external_path: str="",
force_internal_users: (list[str] | None)=None, force_internal_users: (list[str] | None)=None,
@ -52,6 +60,7 @@ async def _get_configured_manager(
manager = AuthManager( manager = AuthManager(
enabled=True, enabled=True,
unauth_paths=unauth_paths,
internal_type="htpasswd", internal_type="htpasswd",
internal_kwargs=_make_service_kwargs(internal_path), internal_kwargs=_make_service_kwargs(internal_path),
@ -78,8 +87,11 @@ async def test_ok__internal(tmpdir) -> None: # type: ignore
htpasswd.set_password("admin", "pass") htpasswd.set_password("admin", "pass")
htpasswd.save() htpasswd.save()
async with _get_configured_manager(path) as manager: async with _get_configured_manager([], path) as manager:
assert manager.is_auth_enabled() assert manager.is_auth_enabled()
assert manager.is_auth_required(_E_AUTH)
assert manager.is_auth_required(_E_UNAUTH)
assert not manager.is_auth_required(_E_FREE)
assert manager.check("xxx") is None assert manager.check("xxx") is None
manager.logout("xxx") manager.logout("xxx")
@ -118,8 +130,11 @@ async def test_ok__external(tmpdir) -> None: # type: ignore
htpasswd2.set_password("user", "foobar") htpasswd2.set_password("user", "foobar")
htpasswd2.save() htpasswd2.save()
async with _get_configured_manager(path1, path2, ["admin"]) as manager: async with _get_configured_manager([], path1, path2, ["admin"]) as manager:
assert manager.is_auth_enabled() assert manager.is_auth_enabled()
assert manager.is_auth_required(_E_AUTH)
assert manager.is_auth_required(_E_UNAUTH)
assert not manager.is_auth_required(_E_FREE)
assert (await manager.login("local", "foobar")) is None assert (await manager.login("local", "foobar")) is None
assert (await manager.login("admin", "pass2")) is None assert (await manager.login("admin", "pass2")) is None
@ -139,11 +154,33 @@ async def test_ok__external(tmpdir) -> None: # type: ignore
assert manager.check(token) is None assert manager.check(token) is None
@pytest.mark.asyncio
async def test_ok__unauth(tmpdir) -> None: # type: ignore
path = os.path.abspath(str(tmpdir.join("htpasswd")))
htpasswd = passlib.apache.HtpasswdFile(path, new=True)
htpasswd.set_password("admin", "pass")
htpasswd.save()
async with _get_configured_manager([
"", " ",
"foo_auth", "/foo_auth ", " /foo_auth",
"/foo_authx", "/foo_auth/", "/foo_auth/x",
"/bar_unauth", # Only this one is matching
], path) as manager:
assert manager.is_auth_enabled()
assert manager.is_auth_required(_E_AUTH)
assert not manager.is_auth_required(_E_UNAUTH)
assert not manager.is_auth_required(_E_FREE)
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_ok__disabled() -> None: async def test_ok__disabled() -> None:
try: try:
manager = AuthManager( manager = AuthManager(
enabled=False, enabled=False,
unauth_paths=[],
internal_type="foobar", internal_type="foobar",
internal_kwargs={}, internal_kwargs={},
@ -156,6 +193,9 @@ async def test_ok__disabled() -> None:
) )
assert not manager.is_auth_enabled() assert not manager.is_auth_enabled()
assert not manager.is_auth_required(_E_AUTH)
assert not manager.is_auth_required(_E_UNAUTH)
assert not manager.is_auth_required(_E_FREE)
with pytest.raises(AssertionError): with pytest.raises(AssertionError):
await manager.authorize("admin", "admin") await manager.authorize("admin", "admin")