pikvm/pikvm#1204: Expire user session

This commit is contained in:
Maxim Devaev
2025-02-08 23:30:52 +02:00
parent abbd65a9a0
commit a7c3cdc1ea
8 changed files with 225 additions and 57 deletions

View File

@@ -21,6 +21,7 @@
import os
import asyncio
import contextlib
from typing import AsyncGenerator
@@ -79,6 +80,64 @@ async def _get_configured_manager(
# =====
@pytest.mark.asyncio
async def test_ok__expire(tmpdir) -> None: # type: ignore
path = os.path.abspath(str(tmpdir.join("htpasswd")))
htpasswd = passlib.apache.HtpasswdFile(path, new=True)
htpasswd.set_password("admin", "pass")
htpasswd.save()
async with _get_configured_manager([], path) as manager:
assert manager.is_auth_enabled()
assert manager.is_auth_required(_E_AUTH)
assert manager.is_auth_required(_E_UNAUTH)
assert not manager.is_auth_required(_E_FREE)
assert manager.check("xxx") is None
manager.logout("xxx")
assert (await manager.login("user", "foo", 3)) is None
assert (await manager.login("admin", "foo", 3)) is None
assert (await manager.login("user", "pass", 3)) is None
token1 = await manager.login("admin", "pass", 3)
assert isinstance(token1, str)
assert len(token1) == 64
token2 = await manager.login("admin", "pass", 3)
assert isinstance(token2, str)
assert len(token2) == 64
assert token1 != token2
assert manager.check(token1) == "admin"
assert manager.check(token2) == "admin"
assert manager.check("foobar") is None
manager.logout(token1)
assert manager.check(token1) is None
assert manager.check(token2) is None
assert manager.check("foobar") is None
token3 = await manager.login("admin", "pass", 3)
assert isinstance(token3, str)
assert len(token3) == 64
assert token1 != token3
assert token2 != token3
await asyncio.sleep(4)
assert manager.check(token1) is None
assert manager.check(token2) is None
assert manager.check(token3) is None
# Check for removed token
assert manager.check(token1) is None
assert manager.check(token2) is None
assert manager.check(token3) is None
@pytest.mark.asyncio
async def test_ok__internal(tmpdir) -> None: # type: ignore
path = os.path.abspath(str(tmpdir.join("htpasswd")))
@@ -96,15 +155,15 @@ async def test_ok__internal(tmpdir) -> None: # type: ignore
assert manager.check("xxx") is None
manager.logout("xxx")
assert (await manager.login("user", "foo")) is None
assert (await manager.login("admin", "foo")) is None
assert (await manager.login("user", "pass")) is None
assert (await manager.login("user", "foo", 0)) is None
assert (await manager.login("admin", "foo", 0)) is None
assert (await manager.login("user", "pass", 0)) is None
token1 = await manager.login("admin", "pass")
token1 = await manager.login("admin", "pass", 0)
assert isinstance(token1, str)
assert len(token1) == 64
token2 = await manager.login("admin", "pass")
token2 = await manager.login("admin", "pass", 0)
assert isinstance(token2, str)
assert len(token2) == 64
assert token1 != token2
@@ -119,7 +178,7 @@ async def test_ok__internal(tmpdir) -> None: # type: ignore
assert manager.check(token2) is None
assert manager.check("foobar") is None
token3 = await manager.login("admin", "pass")
token3 = await manager.login("admin", "pass", 0)
assert isinstance(token3, str)
assert len(token3) == 64
assert token1 != token3
@@ -147,17 +206,17 @@ async def test_ok__external(tmpdir) -> None: # type: ignore
assert manager.is_auth_required(_E_UNAUTH)
assert not manager.is_auth_required(_E_FREE)
assert (await manager.login("local", "foobar")) is None
assert (await manager.login("admin", "pass2")) is None
assert (await manager.login("local", "foobar", 0)) is None
assert (await manager.login("admin", "pass2", 0)) is None
token = await manager.login("admin", "pass1")
token = await manager.login("admin", "pass1", 0)
assert token is not None
assert manager.check(token) == "admin"
manager.logout(token)
assert manager.check(token) is None
token = await manager.login("user", "foobar")
token = await manager.login("user", "foobar", 0)
assert token is not None
assert manager.check(token) == "user"
@@ -212,7 +271,7 @@ async def test_ok__disabled() -> None:
await manager.authorize("admin", "admin")
with pytest.raises(AssertionError):
await manager.login("admin", "admin")
await manager.login("admin", "admin", 0)
with pytest.raises(AssertionError):
manager.logout("xxx")

View File

@@ -28,6 +28,7 @@ from kvmd.validators import ValidatorError
from kvmd.validators.auth import valid_user
from kvmd.validators.auth import valid_users_list
from kvmd.validators.auth import valid_passwd
from kvmd.validators.auth import valid_expire
from kvmd.validators.auth import valid_auth_token
@@ -109,6 +110,20 @@ def test_fail__valid_passwd(arg: Any) -> None:
print(valid_passwd(arg))
# =====
@pytest.mark.parametrize("arg", ["0 ", 0, 1, 13])
def test_ok__valid_expire(arg: Any) -> None:
value = valid_expire(arg)
assert type(value) is int # pylint: disable=unidiomatic-typecheck
assert value == int(str(arg).strip())
@pytest.mark.parametrize("arg", ["test", "", None, -1, -13, 1.1])
def test_fail__valid_expire(arg: Any) -> None:
with pytest.raises(ValidatorError):
print(valid_expire(arg))
# =====
@pytest.mark.parametrize("arg", [
("0" * 64) + " ",