refactoring

This commit is contained in:
Maxim Devaev 2025-02-09 23:20:28 +02:00
parent 302e7c2877
commit 97b405297b
4 changed files with 44 additions and 44 deletions

View File

@ -79,12 +79,12 @@ def main(argv: (list[str] | None)=None) -> None:
expire=config.auth.expire,
unauth_paths=([] if config.prometheus.auth.enabled else ["/export/prometheus/metrics"]),
internal_type=config.auth.internal.type,
internal_kwargs=config.auth.internal._unpack(ignore=["type", "force_users"]),
force_internal_users=config.auth.internal.force_users,
int_type=config.auth.internal.type,
int_kwargs=config.auth.internal._unpack(ignore=["type", "force_users"]),
force_int_users=config.auth.internal.force_users,
external_type=config.auth.external.type,
external_kwargs=(config.auth.external._unpack(ignore=["type"]) if config.auth.external.type else {}),
ext_type=config.auth.external.type,
ext_kwargs=(config.auth.external._unpack(ignore=["type"]) if config.auth.external.type else {}),
totp_secret_path=config.auth.totp.secret.file,
),

View File

@ -56,12 +56,12 @@ class AuthManager: # pylint: disable=too-many-instance-attributes
expire: int,
unauth_paths: list[str],
internal_type: str,
internal_kwargs: dict,
force_internal_users: list[str],
int_type: str,
int_kwargs: dict,
force_int_users: list[str],
external_type: str,
external_kwargs: dict,
ext_type: str,
ext_kwargs: dict,
totp_secret_path: str,
) -> None:
@ -80,19 +80,19 @@ class AuthManager: # pylint: disable=too-many-instance-attributes
for path in self.__unauth_paths:
get_logger().warning("Authorization is disabled for API %r", path)
self.__internal_service: (BaseAuthService | None) = None
self.__int_service: (BaseAuthService | None) = None
if enabled:
self.__internal_service = get_auth_service_class(internal_type)(**internal_kwargs)
self.__int_service = get_auth_service_class(int_type)(**int_kwargs)
get_logger().info("Using internal auth service %r",
self.__internal_service.get_plugin_name())
self.__int_service.get_plugin_name())
self.__force_internal_users = force_internal_users
self.__force_int_users = force_int_users
self.__external_service: (BaseAuthService | None) = None
if enabled and external_type:
self.__external_service = get_auth_service_class(external_type)(**external_kwargs)
self.__ext_service: (BaseAuthService | None) = None
if enabled and ext_type:
self.__ext_service = get_auth_service_class(ext_type)(**ext_kwargs)
get_logger().info("Using external auth service %r",
self.__external_service.get_plugin_name())
self.__ext_service.get_plugin_name())
self.__totp_secret_path = totp_secret_path
@ -112,7 +112,7 @@ class AuthManager: # pylint: disable=too-many-instance-attributes
assert user == user.strip()
assert user
assert self.__enabled
assert self.__internal_service
assert self.__int_service
if self.__totp_secret_path:
with open(self.__totp_secret_path) as file:
@ -124,10 +124,10 @@ class AuthManager: # pylint: disable=too-many-instance-attributes
return False
passwd = passwd[:-6]
if user not in self.__force_internal_users and self.__external_service:
service = self.__external_service
if user not in self.__force_int_users and self.__ext_service:
service = self.__ext_service
else:
service = self.__internal_service
service = self.__int_service
pname = service.get_plugin_name()
ok = (await service.authorize(user, passwd))
@ -237,7 +237,7 @@ class AuthManager: # pylint: disable=too-many-instance-attributes
@aiotools.atomic_fg
async def cleanup(self) -> None:
if self.__enabled:
assert self.__internal_service
await self.__internal_service.cleanup()
if self.__external_service:
await self.__external_service.cleanup()
assert self.__int_service
await self.__int_service.cleanup()
if self.__ext_service:
await self.__ext_service.cleanup()

View File

@ -53,10 +53,10 @@ def _htpasswd_fixture(request) -> Generator[passlib.apache.HtpasswdFile, None, N
os.remove(path)
def _run_htpasswd(cmd: list[str], htpasswd_path: str, internal_type: str="htpasswd") -> None:
def _run_htpasswd(cmd: list[str], htpasswd_path: str, int_type: str="htpasswd") -> None:
cmd = ["kvmd-htpasswd", *cmd, "--set-options"]
if internal_type != "htpasswd": # By default
cmd.append("kvmd/auth/internal/type=" + internal_type)
if int_type != "htpasswd": # By default
cmd.append("kvmd/auth/internal/type=" + int_type)
if htpasswd_path:
cmd.append("kvmd/auth/internal/file=" + htpasswd_path)
main(cmd)
@ -153,12 +153,12 @@ def test_ok__del(htpasswd: passlib.apache.HtpasswdFile) -> None:
# =====
def test_fail__not_htpasswd() -> None:
with pytest.raises(SystemExit, match="Error: KVMD internal auth not using 'htpasswd'"):
_run_htpasswd(["list"], "", internal_type="http")
_run_htpasswd(["list"], "", int_type="http")
def test_fail__unknown_plugin() -> None:
with pytest.raises(SystemExit, match="ConfigError: Unknown plugin 'auth/foobar'"):
_run_htpasswd(["list"], "", internal_type="foobar")
_run_htpasswd(["list"], "", int_type="foobar")
def test_fail__invalid_passwd(mocker, tmpdir) -> None: # type: ignore

View File

@ -54,9 +54,9 @@ def _make_service_kwargs(path: str) -> dict:
@contextlib.asynccontextmanager
async def _get_configured_manager(
unauth_paths: list[str],
internal_path: str,
external_path: str="",
force_internal_users: (list[str] | None)=None,
int_path: str,
ext_path: str="",
force_int_users: (list[str] | None)=None,
) -> AsyncGenerator[AuthManager, None]:
manager = AuthManager(
@ -64,12 +64,12 @@ async def _get_configured_manager(
expire=0,
unauth_paths=unauth_paths,
internal_type="htpasswd",
internal_kwargs=_make_service_kwargs(internal_path),
force_internal_users=(force_internal_users or []),
int_type="htpasswd",
int_kwargs=_make_service_kwargs(int_path),
force_int_users=(force_int_users or []),
external_type=("htpasswd" if external_path else ""),
external_kwargs=(_make_service_kwargs(external_path) if external_path else {}),
ext_type=("htpasswd" if ext_path else ""),
ext_kwargs=(_make_service_kwargs(ext_path) if ext_path else {}),
totp_secret_path="",
)
@ -264,12 +264,12 @@ async def test_ok__disabled() -> None:
expire=0,
unauth_paths=[],
internal_type="foobar",
internal_kwargs={},
force_internal_users=[],
int_type="foobar",
int_kwargs={},
force_int_users=[],
external_type="",
external_kwargs={},
ext_type="",
ext_kwargs={},
totp_secret_path="",
)