feat: merge upstream master - version 4.94

Merge upstream PiKVM master branch updates:

- Bump version from 4.93 to 4.94
- HID: improved jiggler pattern for better compatibility
- Streamer: major refactoring for improved performance and maintainability
- Prometheus: tidying GPIO channel name formatting
- Web: added __gpio-label class for custom styling
- HID: customizable /api/hid/print delay configuration
- ATX: independent power/reset regions for better control
- OLED: added --fill option for display testing
- Web: improved keyboard handling in modal dialogs
- Web: enhanced login error messages
- Switch: added heartbeat functionality
- Web: mouse touch code simplification and refactoring
- Configs: use systemd-networkd-wait-online --any by default
- PKGBUILD: use cp -r to install systemd units properly
- Various bug fixes and performance improvements
This commit is contained in:
mofeng-git
2025-08-21 11:21:41 +08:00
205 changed files with 9359 additions and 4653 deletions

View File

@@ -7,23 +7,24 @@ RUN echo 'Server = https://mirrors.tuna.tsinghua.edu.cn/archlinux/$repo/os/$arch
&& pacman-key --init \
&& pacman-key --populate archlinux
RUN pacman --noconfirm --ask=4 -Syy \
&& pacman --needed --noconfirm --ask=4 -S \
RUN \
--mount=type=cache,id=kvmd-pacman-pkg,target=/var/cache/pacman/pkg \
--mount=type=cache,id=kvmd-pacman-db,target=/var/lib/pacman/sync \
PACMAN="pacman --noconfirm --ask=4 --needed" \
&& $PACMAN -Syy \
archlinux-keyring \
&& pacman --needed --noconfirm --ask=4 -S \
&& $PACMAN -S \
glibc \
pacman \
openssl \
openssl-1.1 \
&& pacman-db-upgrade \
&& pacman --noconfirm --ask=4 -Syu \
&& pacman --needed --noconfirm --ask=4 -S \
&& $PACMAN -Syu \
p11-kit \
ca-certificates \
ca-certificates-mozilla \
ca-certificates-utils \
&& pacman -Syu --noconfirm --ask=4 \
&& pacman -S --needed --noconfirm --ask=4 \
&& $PACMAN -Syu \
base-devel \
autoconf-archive \
help2man \
@@ -46,10 +47,13 @@ RUN pacman --noconfirm --ask=4 -Syy \
python-aiofiles \
python-async-lru \
python-passlib \
python-bcrypt \
python-pyotp \
python-qrcode \
python-pyserial \
python-pyusb \
python-pyudev \
python-evdev \
python-setproctitle \
python-psutil \
python-netifaces \
@@ -76,8 +80,7 @@ RUN pacman --noconfirm --ask=4 -Syy \
eslint \
npm \
shellcheck \
&& (pacman -Sc --noconfirm || true) \
&& rm -rf /var/cache/pacman/pkg/*
&& :
COPY testenv/requirements.txt requirements.txt
RUN pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple \
@@ -97,11 +100,12 @@ WORKDIR /
ARG USTREAMER_MIN_VERSION
ENV USTREAMER_MIN_VERSION $USTREAMER_MIN_VERSION
RUN echo $USTREAMER_MIN_VERSION
RUN git clone https://github.com/pikvm/ustreamer \
RUN \
--mount=type=tmpfs,target=/tmp \
cd /tmp \
&& git clone --depth=1 https://github.com/pikvm/ustreamer \
&& cd ustreamer \
&& make WITH_PYTHON=1 PREFIX=/usr DESTDIR=/ install \
&& cd - \
&& rm -rf ustreamer
&& make WITH_PYTHON=1 PREFIX=/usr DESTDIR=/ install
RUN mkdir -p \
/etc/kvmd/{nginx,vnc} \
@@ -114,4 +118,4 @@ COPY testenv/fakes/sys /fake_sysfs/sys
COPY testenv/fakes/proc /fake_procfs/proc
COPY testenv/fakes/etc /fake_etc/etc
CMD /bin/bash
CMD ["/bin/bash"]

View File

@@ -1,5 +1,5 @@
[mypy]
python_version = 3.11
python_version = 3.13
ignore_missing_imports = true
disallow_untyped_defs = true
strict_optional = true

View File

@@ -29,6 +29,7 @@ _AtxApiPart.switch_power
_UsbKey.arduino_modifier_code
_KeyMapping.web_name
_KeyMapping.evdev_name
_KeyMapping.mcu_code
_KeyMapping.usb_key
_KeyMapping.ps2_key
@@ -58,6 +59,7 @@ Dumper.ignore_aliases
_auth_server_port_fixture
_test_user
Switch.__x_set_dummies
Switch.__x_set_port_names
Switch.__x_set_atx_cp_delays
Switch.__x_set_atx_cpl_delays
@@ -67,18 +69,27 @@ Nak.BUSY
Nak.NO_DOWNLINK
Nak.DOWNLINK_OVERFLOW
UnitFlags.flashing_busy
UnitFlags.has_hpd
StateCache.get_dummies
StateCache.get_port_names
StateCache.get_atx_cp_delays
StateCache.get_atx_cpl_delays
StorageContext.write_edids
StorageContext.write_dummies
StorageContext.write_colors
StorageContext.write_port_names
StorageContext.write_atx_cp_delays
StorageContext.write_atx_cpl_delays
StorageContext.write_atx_cr_delays
StorageContext.read_edids
StorageContext.read_dummies
StorageContext.read_colors
StorageContext.read_port_names
StorageContext.read_atx_cp_delays
StorageContext.read_atx_cpl_delays
StorageContext.read_atx_cr_delays
RequestUnixCredentials.pid
RequestUnixCredentials.gid
KvmdClientWs.send_mouse_relative_event

View File

@@ -1,9 +1,10 @@
python-periphery
pyserial-asyncio
pyghmi
git+https://opendev.org/x/pyghmi.git#33cff21882b6782c20b054e6e8adcf94b5e09561
spidev
pyrad
types-PyYAML
types-aiofiles
luma.oled
pyfatfs
gpiod>=2.3

View File

@@ -29,12 +29,12 @@ import getpass
from typing import Generator
from typing import Any
import passlib.apache
import pytest
from kvmd.apps.htpasswd import main
from kvmd.crypto import KvmdHtpasswdFile
# =====
def _make_passwd(user: str) -> str:
@@ -42,28 +42,30 @@ def _make_passwd(user: str) -> str:
@pytest.fixture(name="htpasswd", params=[[], ["admin"], ["admin", "user"]])
def _htpasswd_fixture(request) -> Generator[passlib.apache.HtpasswdFile, None, None]: # type: ignore
def _htpasswd_fixture(request) -> Generator[KvmdHtpasswdFile, None, None]: # type: ignore
(fd, path) = tempfile.mkstemp()
os.close(fd)
htpasswd = passlib.apache.HtpasswdFile(path)
htpasswd = KvmdHtpasswdFile(path)
for user in request.param:
htpasswd.set_password(user, _make_passwd(user))
htpasswd.save()
yield htpasswd
os.remove(path)
try:
yield htpasswd
finally:
os.remove(path)
def _run_htpasswd(cmd: list[str], htpasswd_path: str, internal_type: str="htpasswd") -> None:
def _run_htpasswd(cmd: list[str], htpasswd_path: str, int_type: str="htpasswd") -> None:
cmd = ["kvmd-htpasswd", *cmd, "--set-options"]
if internal_type != "htpasswd": # By default
cmd.append("kvmd/auth/internal/type=" + internal_type)
if int_type != "htpasswd": # By default
cmd.append("kvmd/auth/internal/type=" + int_type)
if htpasswd_path:
cmd.append("kvmd/auth/internal/file=" + htpasswd_path)
main(cmd)
# =====
def test_ok__list(htpasswd: passlib.apache.HtpasswdFile, capsys) -> None: # type: ignore
def test_ok__list(htpasswd: KvmdHtpasswdFile, capsys) -> None: # type: ignore
_run_htpasswd(["list"], htpasswd.path)
(out, err) = capsys.readouterr()
assert len(err) == 0
@@ -71,24 +73,32 @@ def test_ok__list(htpasswd: passlib.apache.HtpasswdFile, capsys) -> None: # typ
# =====
def test_ok__set_change_stdin(htpasswd: passlib.apache.HtpasswdFile, mocker) -> None: # type: ignore
def test_ok__set_stdin(htpasswd: KvmdHtpasswdFile, mocker) -> None: # type: ignore
old_users = set(htpasswd.users())
if old_users:
assert htpasswd.check_password("admin", _make_passwd("admin"))
mocker.patch.object(builtins, "input", (lambda: " test "))
_run_htpasswd(["set", "admin", "--read-stdin"], htpasswd.path)
with pytest.raises(SystemExit, match="The user 'new' is not exist"):
_run_htpasswd(["set", "new", "--read-stdin"], htpasswd.path)
htpasswd.load(force=True)
assert htpasswd.check_password("admin", " test ")
assert old_users == set(htpasswd.users())
def test_ok__set_add_stdin(htpasswd: passlib.apache.HtpasswdFile, mocker) -> None: # type: ignore
def test_ok__add_stdin(htpasswd: KvmdHtpasswdFile, mocker) -> None: # type: ignore
old_users = set(htpasswd.users())
if old_users:
mocker.patch.object(builtins, "input", (lambda: " test "))
_run_htpasswd(["set", "new", "--read-stdin"], htpasswd.path)
_run_htpasswd(["add", "new", "--read-stdin"], htpasswd.path)
with pytest.raises(SystemExit, match="The user 'new' is already exists"):
_run_htpasswd(["add", "new", "--read-stdin"], htpasswd.path)
htpasswd.load(force=True)
assert htpasswd.check_password("new", " test ")
@@ -96,20 +106,24 @@ def test_ok__set_add_stdin(htpasswd: passlib.apache.HtpasswdFile, mocker) -> Non
# =====
def test_ok__set_change_getpass(htpasswd: passlib.apache.HtpasswdFile, mocker) -> None: # type: ignore
def test_ok__set_getpass(htpasswd: KvmdHtpasswdFile, mocker) -> None: # type: ignore
old_users = set(htpasswd.users())
if old_users:
assert htpasswd.check_password("admin", _make_passwd("admin"))
mocker.patch.object(getpass, "getpass", (lambda *_, **__: " test "))
_run_htpasswd(["set", "admin"], htpasswd.path)
with pytest.raises(SystemExit, match="The user 'new' is not exist"):
_run_htpasswd(["set", "new"], htpasswd.path)
htpasswd.load(force=True)
assert htpasswd.check_password("admin", " test ")
assert old_users == set(htpasswd.users())
def test_fail__set_change_getpass(htpasswd: passlib.apache.HtpasswdFile, mocker) -> None: # type: ignore
def test_fail__set_getpass(htpasswd: KvmdHtpasswdFile, mocker) -> None: # type: ignore
old_users = set(htpasswd.users())
if old_users:
assert htpasswd.check_password("admin", _make_passwd("admin"))
@@ -137,13 +151,15 @@ def test_fail__set_change_getpass(htpasswd: passlib.apache.HtpasswdFile, mocker)
# =====
def test_ok__del(htpasswd: passlib.apache.HtpasswdFile) -> None:
def test_ok__del(htpasswd: KvmdHtpasswdFile) -> None:
old_users = set(htpasswd.users())
if old_users:
assert htpasswd.check_password("admin", _make_passwd("admin"))
_run_htpasswd(["del", "admin"], htpasswd.path)
_run_htpasswd(["del", "admin"], htpasswd.path)
with pytest.raises(SystemExit, match="The user 'admin' is not exist"):
_run_htpasswd(["del", "admin"], htpasswd.path)
htpasswd.load(force=True)
assert not htpasswd.check_password("admin", _make_passwd("admin"))
@@ -152,13 +168,13 @@ def test_ok__del(htpasswd: passlib.apache.HtpasswdFile) -> None:
# =====
def test_fail__not_htpasswd() -> None:
with pytest.raises(SystemExit, match="Error: KVMD internal auth not using 'htpasswd'"):
_run_htpasswd(["list"], "", internal_type="http")
with pytest.raises(SystemExit, match="Error: KVMD internal auth does not use 'htpasswd'"):
_run_htpasswd(["list"], "", int_type="http")
def test_fail__unknown_plugin() -> None:
with pytest.raises(SystemExit, match="ConfigError: Unknown plugin 'auth/foobar'"):
_run_htpasswd(["list"], "", internal_type="foobar")
_run_htpasswd(["list"], "", int_type="foobar")
def test_fail__invalid_passwd(mocker, tmpdir) -> None: # type: ignore
@@ -166,4 +182,4 @@ def test_fail__invalid_passwd(mocker, tmpdir) -> None: # type: ignore
open(path, "w").close() # pylint: disable=consider-using-with
mocker.patch.object(builtins, "input", (lambda: "\n"))
with pytest.raises(SystemExit, match="The argument is not a valid passwd characters"):
_run_htpasswd(["set", "admin", "--read-stdin"], path)
_run_htpasswd(["add", "admin", "--read-stdin"], path)

View File

@@ -21,27 +21,37 @@
import os
import asyncio
import base64
import contextlib
from typing import AsyncGenerator
import passlib.apache
from aiohttp.test_utils import make_mocked_request
import pytest
from kvmd.validators import ValidatorError
from kvmd.yamlconf import make_config
from kvmd.apps.kvmd.auth import AuthManager
from kvmd.apps.kvmd.api.auth import check_request_auth
from kvmd.htserver import UnauthorizedError
from kvmd.htserver import ForbiddenError
from kvmd.plugins.auth import get_auth_service_class
from kvmd.htserver import HttpExposed
from kvmd.crypto import KvmdHtpasswdFile
# =====
_E_AUTH = HttpExposed("GET", "/foo_auth", True, (lambda: None))
_E_UNAUTH = HttpExposed("GET", "/bar_unauth", True, (lambda: None))
_E_FREE = HttpExposed("GET", "/baz_free", False, (lambda: None))
_E_AUTH = HttpExposed("GET", "/foo_auth", auth_required=True, allow_usc=True, handler=(lambda: None))
_E_UNAUTH = HttpExposed("GET", "/bar_unauth", auth_required=True, allow_usc=True, handler=(lambda: None))
_E_FREE = HttpExposed("GET", "/baz_free", auth_required=False, allow_usc=True, handler=(lambda: None))
def _make_service_kwargs(path: str) -> dict:
@@ -53,21 +63,24 @@ def _make_service_kwargs(path: str) -> dict:
@contextlib.asynccontextmanager
async def _get_configured_manager(
unauth_paths: list[str],
internal_path: str,
external_path: str="",
force_internal_users: (list[str] | None)=None,
int_path: str,
ext_path: str="",
force_int_users: (list[str] | None)=None,
) -> AsyncGenerator[AuthManager, None]:
manager = AuthManager(
enabled=True,
expire=0,
usc_users=[],
usc_groups=[],
unauth_paths=unauth_paths,
internal_type="htpasswd",
internal_kwargs=_make_service_kwargs(internal_path),
force_internal_users=(force_internal_users or []),
int_type="htpasswd",
int_kwargs=_make_service_kwargs(int_path),
force_int_users=(force_int_users or []),
external_type=("htpasswd" if external_path else ""),
external_kwargs=(_make_service_kwargs(external_path) if external_path else {}),
ext_type=("htpasswd" if ext_path else ""),
ext_kwargs=(_make_service_kwargs(ext_path) if ext_path else {}),
totp_secret_path="",
)
@@ -80,10 +93,61 @@ async def _get_configured_manager(
# =====
@pytest.mark.asyncio
async def test_ok__internal(tmpdir) -> None: # type: ignore
async def test_ok__request(tmpdir) -> None: # type: ignore
path = os.path.abspath(str(tmpdir.join("htpasswd")))
htpasswd = passlib.apache.HtpasswdFile(path, new=True)
htpasswd = KvmdHtpasswdFile(path, new=True)
htpasswd.set_password("admin", "pass")
htpasswd.save()
async with _get_configured_manager([], path) as manager:
async def check(exposed: HttpExposed, **kwargs) -> None: # type: ignore
await check_request_auth(manager, exposed, make_mocked_request(exposed.method, exposed.path, **kwargs))
await check(_E_FREE)
with pytest.raises(UnauthorizedError):
await check(_E_AUTH)
# ===
with pytest.raises(ForbiddenError):
await check(_E_AUTH, headers={"X-KVMD-User": "admin", "X-KVMD-Passwd": "foo"})
with pytest.raises(ForbiddenError):
await check(_E_AUTH, headers={"X-KVMD-User": "adminx", "X-KVMD-Passwd": "pass"})
await check(_E_AUTH, headers={"X-KVMD-User": "admin", "X-KVMD-Passwd": "pass"})
# ===
with pytest.raises(UnauthorizedError):
await check(_E_AUTH, headers={"Cookie": "auth_token="})
with pytest.raises(ValidatorError):
await check(_E_AUTH, headers={"Cookie": "auth_token=0"})
with pytest.raises(ForbiddenError):
await check(_E_AUTH, headers={"Cookie": f"auth_token={'0' * 64}"})
token = await manager.login("admin", "pass", 0)
assert token
await check(_E_AUTH, headers={"Cookie": f"auth_token={token}"})
manager.logout(token)
with pytest.raises(ForbiddenError):
await check(_E_AUTH, headers={"Cookie": f"auth_token={token}"})
# ===
with pytest.raises(ForbiddenError):
await check(_E_AUTH, headers={"Authorization": "basic " + base64.b64encode(b"admin:foo").decode()})
with pytest.raises(ForbiddenError):
await check(_E_AUTH, headers={"Authorization": "basic " + base64.b64encode(b"adminx:pass").decode()})
await check(_E_AUTH, headers={"Authorization": "basic " + base64.b64encode(b"admin:pass").decode()})
@pytest.mark.asyncio
async def test_ok__expire(tmpdir) -> None: # type: ignore
path = os.path.abspath(str(tmpdir.join("htpasswd")))
htpasswd = KvmdHtpasswdFile(path, new=True)
htpasswd.set_password("admin", "pass")
htpasswd.save()
@@ -96,15 +160,15 @@ async def test_ok__internal(tmpdir) -> None: # type: ignore
assert manager.check("xxx") is None
manager.logout("xxx")
assert (await manager.login("user", "foo")) is None
assert (await manager.login("admin", "foo")) is None
assert (await manager.login("user", "pass")) is None
assert (await manager.login("user", "foo", 3)) is None
assert (await manager.login("admin", "foo", 3)) is None
assert (await manager.login("user", "pass", 3)) is None
token1 = await manager.login("admin", "pass")
token1 = await manager.login("admin", "pass", 3)
assert isinstance(token1, str)
assert len(token1) == 64
token2 = await manager.login("admin", "pass")
token2 = await manager.login("admin", "pass", 3)
assert isinstance(token2, str)
assert len(token2) == 64
assert token1 != token2
@@ -119,7 +183,75 @@ async def test_ok__internal(tmpdir) -> None: # type: ignore
assert manager.check(token2) is None
assert manager.check("foobar") is None
token3 = await manager.login("admin", "pass")
token3 = await manager.login("admin", "pass", 3)
assert isinstance(token3, str)
assert len(token3) == 64
assert token1 != token3
assert token2 != token3
token4 = await manager.login("admin", "pass", 6)
assert isinstance(token4, str)
assert len(token4) == 64
assert token1 != token4
assert token2 != token4
assert token3 != token4
await asyncio.sleep(4)
assert manager.check(token1) is None
assert manager.check(token2) is None
assert manager.check(token3) is None
assert manager.check(token4) == "admin"
await asyncio.sleep(3)
assert manager.check(token1) is None
assert manager.check(token2) is None
assert manager.check(token3) is None
assert manager.check(token4) is None
@pytest.mark.asyncio
async def test_ok__internal(tmpdir) -> None: # type: ignore
path = os.path.abspath(str(tmpdir.join("htpasswd")))
htpasswd = KvmdHtpasswdFile(path, new=True)
htpasswd.set_password("admin", "pass")
htpasswd.save()
async with _get_configured_manager([], path) as manager:
assert manager.is_auth_enabled()
assert manager.is_auth_required(_E_AUTH)
assert manager.is_auth_required(_E_UNAUTH)
assert not manager.is_auth_required(_E_FREE)
assert manager.check("xxx") is None
manager.logout("xxx")
assert (await manager.login("user", "foo", 0)) is None
assert (await manager.login("admin", "foo", 0)) is None
assert (await manager.login("user", "pass", 0)) is None
token1 = await manager.login("admin", "pass", 0)
assert isinstance(token1, str)
assert len(token1) == 64
token2 = await manager.login("admin", "pass", 0)
assert isinstance(token2, str)
assert len(token2) == 64
assert token1 != token2
assert manager.check(token1) == "admin"
assert manager.check(token2) == "admin"
assert manager.check("foobar") is None
manager.logout(token1)
assert manager.check(token1) is None
assert manager.check(token2) is None
assert manager.check("foobar") is None
token3 = await manager.login("admin", "pass", 0)
assert isinstance(token3, str)
assert len(token3) == 64
assert token1 != token3
@@ -131,12 +263,12 @@ async def test_ok__external(tmpdir) -> None: # type: ignore
path1 = os.path.abspath(str(tmpdir.join("htpasswd1")))
path2 = os.path.abspath(str(tmpdir.join("htpasswd2")))
htpasswd1 = passlib.apache.HtpasswdFile(path1, new=True)
htpasswd1 = KvmdHtpasswdFile(path1, new=True)
htpasswd1.set_password("admin", "pass1")
htpasswd1.set_password("local", "foobar")
htpasswd1.save()
htpasswd2 = passlib.apache.HtpasswdFile(path2, new=True)
htpasswd2 = KvmdHtpasswdFile(path2, new=True)
htpasswd2.set_password("admin", "pass2")
htpasswd2.set_password("user", "foobar")
htpasswd2.save()
@@ -147,17 +279,17 @@ async def test_ok__external(tmpdir) -> None: # type: ignore
assert manager.is_auth_required(_E_UNAUTH)
assert not manager.is_auth_required(_E_FREE)
assert (await manager.login("local", "foobar")) is None
assert (await manager.login("admin", "pass2")) is None
assert (await manager.login("local", "foobar", 0)) is None
assert (await manager.login("admin", "pass2", 0)) is None
token = await manager.login("admin", "pass1")
token = await manager.login("admin", "pass1", 0)
assert token is not None
assert manager.check(token) == "admin"
manager.logout(token)
assert manager.check(token) is None
token = await manager.login("user", "foobar")
token = await manager.login("user", "foobar", 0)
assert token is not None
assert manager.check(token) == "user"
@@ -169,7 +301,7 @@ async def test_ok__external(tmpdir) -> None: # type: ignore
async def test_ok__unauth(tmpdir) -> None: # type: ignore
path = os.path.abspath(str(tmpdir.join("htpasswd")))
htpasswd = passlib.apache.HtpasswdFile(path, new=True)
htpasswd = KvmdHtpasswdFile(path, new=True)
htpasswd.set_password("admin", "pass")
htpasswd.save()
@@ -191,14 +323,17 @@ async def test_ok__disabled() -> None:
try:
manager = AuthManager(
enabled=False,
expire=0,
usc_users=[],
usc_groups=[],
unauth_paths=[],
internal_type="foobar",
internal_kwargs={},
force_internal_users=[],
int_type="foobar",
int_kwargs={},
force_int_users=[],
external_type="",
external_kwargs={},
ext_type="",
ext_kwargs={},
totp_secret_path="",
)
@@ -212,7 +347,7 @@ async def test_ok__disabled() -> None:
await manager.authorize("admin", "admin")
with pytest.raises(AssertionError):
await manager.login("admin", "admin")
await manager.login("admin", "admin", 0)
with pytest.raises(AssertionError):
manager.logout("xxx")

View File

@@ -22,14 +22,17 @@
import pytest
from kvmd.keyboard.mappings import KEYMAP
from . import get_configured_auth_service
# =====
def test_ok__keymap() -> None:
assert KEYMAP["KeyA"].mcu.code == 1
def test_fail__keymap() -> None:
with pytest.raises(KeyError):
print(KEYMAP["keya"])
@pytest.mark.asyncio
async def test_ok__forbidden_service() -> None: # type: ignore
async with get_configured_auth_service("forbidden") as service:
assert not (await service.authorize("user", "foo"))
assert not (await service.authorize("admin", "foo"))
assert not (await service.authorize("user", "pass"))
assert not (await service.authorize("admin", "pass"))
assert not (await service.authorize("admin", "admin"))
assert not (await service.authorize("admin", ""))
assert not (await service.authorize("", ""))

View File

@@ -22,10 +22,10 @@
import os
import passlib.apache
import pytest
from kvmd.crypto import KvmdHtpasswdFile
from . import get_configured_auth_service
@@ -34,7 +34,7 @@ from . import get_configured_auth_service
async def test_ok__htpasswd_service(tmpdir) -> None: # type: ignore
path = os.path.abspath(str(tmpdir.join("htpasswd")))
htpasswd = passlib.apache.HtpasswdFile(path, new=True)
htpasswd = KvmdHtpasswdFile(path, new=True)
htpasswd.set_password("admin", "pass")
htpasswd.save()

View File

@@ -28,6 +28,7 @@ from kvmd.validators import ValidatorError
from kvmd.validators.auth import valid_user
from kvmd.validators.auth import valid_users_list
from kvmd.validators.auth import valid_passwd
from kvmd.validators.auth import valid_expire
from kvmd.validators.auth import valid_auth_token
@@ -109,6 +110,20 @@ def test_fail__valid_passwd(arg: Any) -> None:
print(valid_passwd(arg))
# =====
@pytest.mark.parametrize("arg", ["0 ", 0, 1, 13])
def test_ok__valid_expire(arg: Any) -> None:
value = valid_expire(arg)
assert type(value) is int # pylint: disable=unidiomatic-typecheck
assert value == int(str(arg).strip())
@pytest.mark.parametrize("arg", ["test", "", None, -1, -13, 1.1])
def test_fail__valid_expire(arg: Any) -> None:
with pytest.raises(ValidatorError):
print(valid_expire(arg))
# =====
@pytest.mark.parametrize("arg", [
("0" * 64) + " ",

View File

@@ -34,6 +34,13 @@ from kvmd.validators.basic import valid_float_f01
from kvmd.validators.basic import valid_string_list
# =====
def _to_int(arg: Any) -> int:
if isinstance(arg, str) and arg.strip().startswith(("0x", "0X")):
arg = int(arg.strip()[2:], 16)
return int(str(arg).strip())
# =====
@pytest.mark.parametrize("arg, retval", [
("1", True),
@@ -60,34 +67,34 @@ def test_fail__valid_bool(arg: Any) -> None:
# =====
@pytest.mark.parametrize("arg", ["1 ", "-1", 1, -1, 0, 100500])
@pytest.mark.parametrize("arg", ["1 ", "-1", 1, -1, 0, 100500, " 0xff"])
def test_ok__valid_number(arg: Any) -> None:
assert valid_number(arg) == int(str(arg).strip())
assert valid_number(arg) == _to_int(arg)
@pytest.mark.parametrize("arg", ["test", "", None, "1x", 100500.0])
@pytest.mark.parametrize("arg", ["test", "", None, "1x", 100500.0, "ff"])
def test_fail__valid_number(arg: Any) -> None:
with pytest.raises(ValidatorError):
print(valid_number(arg))
@pytest.mark.parametrize("arg", [-5, 0, 5, "-5 ", "0 ", "5 "])
@pytest.mark.parametrize("arg", [-5, 0, 5, "-5 ", "0 ", "5 ", " 0x05"])
def test_ok__valid_number__min_max(arg: Any) -> None:
assert valid_number(arg, -5, 5) == int(str(arg).strip())
assert valid_number(arg, -5, 5) == _to_int(arg)
@pytest.mark.parametrize("arg", ["test", "", None, -6, 6, "-6 ", "6 "])
@pytest.mark.parametrize("arg", ["test", "", None, -6, 6, "-6 ", "6 ", "0x06"])
def test_fail__valid_number__min_max(arg: Any) -> None: # pylint: disable=invalid-name
with pytest.raises(ValidatorError):
print(valid_number(arg, -5, 5))
# =====
@pytest.mark.parametrize("arg", [0, 1, 5, "5 "])
@pytest.mark.parametrize("arg", [0, 1, 5, "5 ", " 0x05"])
def test_ok__valid_int_f0(arg: Any) -> None:
value = valid_int_f0(arg)
assert type(value) is int # pylint: disable=unidiomatic-typecheck
assert value == int(str(arg).strip())
assert value == _to_int(arg)
@pytest.mark.parametrize("arg", ["test", "", None, -6, "-6 ", "5.0"])
@@ -97,14 +104,14 @@ def test_fail__valid_int_f0(arg: Any) -> None:
# =====
@pytest.mark.parametrize("arg", [1, 5, "5 "])
@pytest.mark.parametrize("arg", [1, 5, "5 ", " 0x05"])
def test_ok__valid_int_f1(arg: Any) -> None:
value = valid_int_f1(arg)
assert type(value) is int # pylint: disable=unidiomatic-typecheck
assert value == int(str(arg).strip())
assert value == _to_int(arg)
@pytest.mark.parametrize("arg", ["test", "", None, -6, "-6 ", 0, "0 ", "5.0"])
@pytest.mark.parametrize("arg", ["test", "", None, -6, "-6 ", 0, "0 ", "5.0", "0x0"])
def test_fail__valid_int_f1(arg: Any) -> None:
with pytest.raises(ValidatorError):
print(valid_int_f1(arg))

View File

@@ -24,7 +24,7 @@ from typing import Any
import pytest
from kvmd.keyboard.mappings import KEYMAP
from kvmd.keyboard.mappings import WEB_TO_EVDEV
from kvmd.validators import ValidatorError
from kvmd.validators.hid import valid_hid_key
@@ -35,7 +35,7 @@ from kvmd.validators.hid import valid_hid_mouse_delta
# =====
def test_ok__valid_hid_key() -> None:
for key in KEYMAP:
for key in WEB_TO_EVDEV:
print(valid_hid_key(key))
print(valid_hid_key(key + " "))

View File

@@ -94,6 +94,9 @@ def test_fail__valid_switch_edid_id__allowed_default(arg: Any) -> None:
# =====
@pytest.mark.parametrize("arg", [
"f" * 256,
"0" * 256,
"1a" * 128,
"f" * 512,
"0" * 512,
"1a" * 256,

View File

@@ -1,4 +1,8 @@
kvmd:
auth:
usc:
users: [root]
server:
unix_mode: 0666
@@ -11,8 +15,8 @@ kvmd:
mouse:
device: /dev/null
# absolute_win98_fix: true
# mouse_alt:
# device: /dev/null
mouse_alt:
device: /dev/null
noop: true
msd:
@@ -45,9 +49,9 @@ kvmd:
__v4_locator__:
type: locator
device: /dev/kvmd-gpio
relay:
type: hidrelay
device: /dev/hidraw0
# relay:
# type: hidrelay
# device: /dev/hidraw0
cmd1:
type: cmd
cmd: [/bin/sleep, 5]
@@ -94,20 +98,20 @@ kvmd:
mode: output
switch: false
relay1:
pin: 0
mode: output
initial: null
driver: relay
relay2:
pin: 1
mode: output
initial: null
driver: relay
pulse:
delay: 2
max_delay: 5
# relay1:
# pin: 0
# mode: output
# initial: null
# driver: relay
#
# relay2:
# pin: 1
# mode: output
# initial: null
# driver: relay
# pulse:
# delay: 2
# max_delay: 5
cmd1:
pin: 0

View File

@@ -1,4 +1,8 @@
kvmd:
auth:
usc:
users: [root]
server:
unix_mode: 0666
@@ -10,6 +14,8 @@ kvmd:
device: /dev/null
mouse:
device: /dev/null
mouse_alt:
device: /dev/null
noop: true
mouse_alt: