mirror of
https://github.com/mofeng-git/One-KVM.git
synced 2026-01-29 09:01:54 +08:00
auth plugins
This commit is contained in:
@@ -59,7 +59,7 @@ def _run_htpasswd(htpasswd: passlib.apache.HtpasswdFile, cmd: List[str]) -> None
|
||||
"kvmd-htpasswd",
|
||||
*cmd,
|
||||
"--set-options",
|
||||
"kvmd/auth/htpasswd/file=" + htpasswd.path,
|
||||
"kvmd/auth/internal/file=" + htpasswd.path,
|
||||
])
|
||||
|
||||
|
||||
|
||||
55
tests/test_auth_service_htpasswd.py
Normal file
55
tests/test_auth_service_htpasswd.py
Normal file
@@ -0,0 +1,55 @@
|
||||
# ========================================================================== #
|
||||
# #
|
||||
# KVMD - The main Pi-KVM daemon. #
|
||||
# #
|
||||
# Copyright (C) 2018 Maxim Devaev <mdevaev@gmail.com> #
|
||||
# #
|
||||
# This program is free software: you can redistribute it and/or modify #
|
||||
# it under the terms of the GNU General Public License as published by #
|
||||
# the Free Software Foundation, either version 3 of the License, or #
|
||||
# (at your option) any later version. #
|
||||
# #
|
||||
# This program is distributed in the hope that it will be useful, #
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
|
||||
# GNU General Public License for more details. #
|
||||
# #
|
||||
# You should have received a copy of the GNU General Public License #
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
|
||||
# #
|
||||
# ========================================================================== #
|
||||
|
||||
|
||||
import os
|
||||
|
||||
import passlib.apache
|
||||
|
||||
import pytest
|
||||
|
||||
from kvmd.plugins.auth import get_auth_service_class
|
||||
|
||||
|
||||
# =====
|
||||
@pytest.mark.asyncio
|
||||
async def test_ok__htpasswd_service(tmpdir) -> None: # type: ignore
|
||||
path = os.path.abspath(str(tmpdir.join("htpasswd")))
|
||||
|
||||
htpasswd = passlib.apache.HtpasswdFile(path, new=True)
|
||||
htpasswd.set_password("admin", "foo")
|
||||
htpasswd.save()
|
||||
|
||||
service = get_auth_service_class("htpasswd")(path=path)
|
||||
|
||||
assert (await service.login("admin", "foo"))
|
||||
assert not (await service.login("user", "foo"))
|
||||
|
||||
htpasswd.set_password("admin", "bar")
|
||||
htpasswd.set_password("user", "bar")
|
||||
htpasswd.save()
|
||||
|
||||
assert (await service.login("admin", "bar"))
|
||||
assert (await service.login("user", "bar"))
|
||||
assert not (await service.login("admin", "foo"))
|
||||
assert not (await service.login("user", "foo"))
|
||||
|
||||
await service.cleanup()
|
||||
69
tests/test_auth_service_http.py
Normal file
69
tests/test_auth_service_http.py
Normal file
@@ -0,0 +1,69 @@
|
||||
# ========================================================================== #
|
||||
# #
|
||||
# KVMD - The main Pi-KVM daemon. #
|
||||
# #
|
||||
# Copyright (C) 2018 Maxim Devaev <mdevaev@gmail.com> #
|
||||
# #
|
||||
# This program is free software: you can redistribute it and/or modify #
|
||||
# it under the terms of the GNU General Public License as published by #
|
||||
# the Free Software Foundation, either version 3 of the License, or #
|
||||
# (at your option) any later version. #
|
||||
# #
|
||||
# This program is distributed in the hope that it will be useful, #
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
|
||||
# GNU General Public License for more details. #
|
||||
# #
|
||||
# You should have received a copy of the GNU General Public License #
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
|
||||
# #
|
||||
# ========================================================================== #
|
||||
|
||||
|
||||
from typing import AsyncGenerator
|
||||
|
||||
import aiohttp.web
|
||||
|
||||
import pytest
|
||||
|
||||
from kvmd.plugins.auth import get_auth_service_class
|
||||
|
||||
|
||||
# =====
|
||||
async def _handle_auth_post(request: aiohttp.web.BaseRequest) -> aiohttp.web.Response:
|
||||
status = 400
|
||||
if request.method == "POST":
|
||||
credentials = (await request.json())
|
||||
if credentials["user"] == "admin" and credentials["passwd"] == "foobar":
|
||||
status = 200
|
||||
return aiohttp.web.Response(text=str(status), status=status)
|
||||
|
||||
|
||||
@pytest.fixture(name="auth_server_port")
|
||||
async def _auth_server_port_fixture(aiohttp_server) -> AsyncGenerator[int, None]: # type: ignore
|
||||
app = aiohttp.web.Application()
|
||||
app.router.add_post("/auth_post", _handle_auth_post)
|
||||
server = await aiohttp_server(app)
|
||||
try:
|
||||
yield server.port
|
||||
finally:
|
||||
await server.close()
|
||||
|
||||
|
||||
# =====
|
||||
@pytest.mark.asyncio
|
||||
async def test_ok__http_service(auth_server_port: int) -> None:
|
||||
service = get_auth_service_class("http")(
|
||||
url="http://localhost:%d/auth_post" % (auth_server_port),
|
||||
verify=False,
|
||||
post=True,
|
||||
user="",
|
||||
passwd="",
|
||||
timeout=5.0,
|
||||
)
|
||||
|
||||
assert not (await service.login("admin", "foo"))
|
||||
assert not (await service.login("user", "foo"))
|
||||
assert (await service.login("admin", "foobar"))
|
||||
|
||||
await service.cleanup()
|
||||
@@ -28,7 +28,6 @@ from kvmd.validators import ValidatorError
|
||||
from kvmd.validators.auth import valid_user
|
||||
from kvmd.validators.auth import valid_passwd
|
||||
from kvmd.validators.auth import valid_auth_token
|
||||
from kvmd.validators.auth import valid_auth_type
|
||||
|
||||
|
||||
# =====
|
||||
@@ -106,14 +105,3 @@ def test_ok__valid_auth_token(arg: Any) -> None:
|
||||
def test_fail__valid_auth_token(arg: Any) -> None:
|
||||
with pytest.raises(ValidatorError):
|
||||
print(valid_auth_token(arg))
|
||||
|
||||
|
||||
@pytest.mark.parametrize("arg", ["HTPASSWD ", "htpasswd"])
|
||||
def test_ok__valid_auth_type(arg: Any) -> None:
|
||||
assert valid_auth_type(arg) == arg.strip().lower()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("arg", ["test", "", None])
|
||||
def test_fail__valid_auth_type(arg: Any) -> None:
|
||||
with pytest.raises(ValidatorError):
|
||||
print(valid_auth_type(arg))
|
||||
|
||||
Reference in New Issue
Block a user