improved auth logging

This commit is contained in:
Maxim Devaev 2025-02-09 19:44:42 +02:00
parent c3dc5b9553
commit 75a4aa0736

View File

@ -22,6 +22,7 @@
import dataclasses
import time
import datetime
import secrets
import pyotp
@ -72,7 +73,8 @@ class AuthManager: # pylint: disable=too-many-instance-attributes
assert expire >= 0
self.__expire = expire
if expire > 0:
get_logger().warning("Maximum user session time is limited: %d seconds", expire)
get_logger().info("Maximum user session time is limited: %s",
self.__format_seconds(expire))
self.__unauth_paths = frozenset(unauth_paths) # To speed up
for path in self.__unauth_paths:
@ -81,14 +83,16 @@ class AuthManager: # pylint: disable=too-many-instance-attributes
self.__internal_service: (BaseAuthService | None) = None
if enabled:
self.__internal_service = get_auth_service_class(internal_type)(**internal_kwargs)
get_logger().info("Using internal auth service %r", self.__internal_service.get_plugin_name())
get_logger().info("Using internal auth service %r",
self.__internal_service.get_plugin_name())
self.__force_internal_users = force_internal_users
self.__external_service: (BaseAuthService | None) = None
if enabled and external_type:
self.__external_service = get_auth_service_class(external_type)(**external_kwargs)
get_logger().info("Using external auth service %r", self.__external_service.get_plugin_name())
get_logger().info("Using external auth service %r",
self.__external_service.get_plugin_name())
self.__totp_secret_path = totp_secret_path
@ -125,8 +129,8 @@ class AuthManager: # pylint: disable=too-many-instance-attributes
else:
service = self.__internal_service
ok = (await service.authorize(user, passwd))
pname = service.get_plugin_name()
ok = (await service.authorize(user, passwd))
if ok:
get_logger().info("Authorized user %r via auth service %r", user, pname)
else:
@ -146,7 +150,10 @@ class AuthManager: # pylint: disable=too-many-instance-attributes
expire_ts=self.__make_expire_ts(expire),
)
self.__sessions[token] = session
get_logger().info("Logged in user %r (expire_ts=%d)", session.user, session.expire_ts)
get_logger().info("Logged in user %r; expire=%s, sessions_now=%d",
session.user,
self.__format_expire_ts(session.expire_ts),
self.__get_sessions_number(session.user))
return token
return None
@ -182,6 +189,22 @@ class AuthManager: # pylint: disable=too-many-instance-attributes
def __get_now_ts(self) -> int:
return int(time.monotonic())
def __format_expire_ts(self, expire_ts: int) -> str:
if expire_ts > 0:
seconds = expire_ts - self.__get_now_ts()
return f"[{self.__format_seconds(seconds)}]"
return "INF"
def __format_seconds(self, seconds: int) -> str:
return str(datetime.timedelta(seconds=seconds))
def __get_sessions_number(self, user: str) -> int:
return sum(
1
for session in self.__sessions.values()
if session.user == user
)
def logout(self, token: str) -> None:
assert self.__enabled
if token in self.__sessions:
@ -191,7 +214,7 @@ class AuthManager: # pylint: disable=too-many-instance-attributes
if session.user == user:
count += 1
del self.__sessions[key_t]
get_logger().info("Logged out user %r (was=%d)", user, count)
get_logger().info("Logged out user %r; sessions_closed=%d", user, count)
def check(self, token: str) -> (str | None):
assert self.__enabled
@ -206,6 +229,9 @@ class AuthManager: # pylint: disable=too-many-instance-attributes
return session.user
else:
del self.__sessions[token]
get_logger().info("The session of user %r is expired; sessions_left=%d",
session.user,
self.__get_sessions_number(session.user))
return None
@aiotools.atomic_fg