mirror of
https://github.com/mofeng-git/One-KVM.git
synced 2025-12-13 01:30:31 +08:00
better auth testing
This commit is contained in:
parent
060140d654
commit
a168ce9d8f
@ -46,7 +46,6 @@ class Plugin(BaseAuthService):
|
|||||||
self,
|
self,
|
||||||
url: str,
|
url: str,
|
||||||
verify: bool,
|
verify: bool,
|
||||||
post: bool,
|
|
||||||
user: str,
|
user: str,
|
||||||
passwd: str,
|
passwd: str,
|
||||||
timeout: float,
|
timeout: float,
|
||||||
@ -54,7 +53,6 @@ class Plugin(BaseAuthService):
|
|||||||
|
|
||||||
self.__url = url
|
self.__url = url
|
||||||
self.__verify = verify
|
self.__verify = verify
|
||||||
self.__post = post
|
|
||||||
self.__user = user
|
self.__user = user
|
||||||
self.__passwd = passwd
|
self.__passwd = passwd
|
||||||
self.__timeout = timeout
|
self.__timeout = timeout
|
||||||
@ -64,31 +62,29 @@ class Plugin(BaseAuthService):
|
|||||||
@classmethod
|
@classmethod
|
||||||
def get_options(cls) -> Dict[str, Option]:
|
def get_options(cls) -> Dict[str, Option]:
|
||||||
return {
|
return {
|
||||||
"url": Option("http://localhost/auth_post"),
|
"url": Option("http://localhost/auth"),
|
||||||
"verify": Option(True, type=valid_bool),
|
"verify": Option(True, type=valid_bool),
|
||||||
"post": Option(True, type=valid_bool),
|
|
||||||
"user": Option(""),
|
"user": Option(""),
|
||||||
"passwd": Option(""),
|
"passwd": Option(""),
|
||||||
"timeout": Option(5.0, type=valid_float_f01),
|
"timeout": Option(5.0, type=valid_float_f01),
|
||||||
}
|
}
|
||||||
|
|
||||||
async def login(self, user: str, passwd: str) -> bool:
|
async def login(self, user: str, passwd: str) -> bool:
|
||||||
kwargs: Dict = {
|
|
||||||
"method": "GET",
|
|
||||||
"url": self.__url,
|
|
||||||
"timeout": self.__timeout,
|
|
||||||
"headers": {
|
|
||||||
"User-Agent": "KVMD/%s" % (__version__),
|
|
||||||
"X-KVMD-User": user,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
if self.__post:
|
|
||||||
kwargs["method"] = "POST"
|
|
||||||
kwargs["json"] = {"user": user, "passwd": passwd}
|
|
||||||
|
|
||||||
session = self.__ensure_session()
|
session = self.__ensure_session()
|
||||||
try:
|
try:
|
||||||
async with session.request(**kwargs) as response:
|
async with session.request(
|
||||||
|
method="POST",
|
||||||
|
url=self.__url,
|
||||||
|
timeout=self.__timeout,
|
||||||
|
json={
|
||||||
|
"user": user,
|
||||||
|
"passwd": passwd
|
||||||
|
},
|
||||||
|
headers={
|
||||||
|
"User-Agent": "KVMD/%s" % (__version__),
|
||||||
|
"X-KVMD-User": user,
|
||||||
|
},
|
||||||
|
) as response:
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
assert response.status == 200
|
assert response.status == 200
|
||||||
return True
|
return True
|
||||||
|
|||||||
@ -19,6 +19,7 @@ deps =
|
|||||||
pylint
|
pylint
|
||||||
pytest
|
pytest
|
||||||
pytest-asyncio
|
pytest-asyncio
|
||||||
|
aiohttp-basicauth
|
||||||
-rrequirements.txt
|
-rrequirements.txt
|
||||||
|
|
||||||
[testenv:mypy]
|
[testenv:mypy]
|
||||||
@ -41,6 +42,7 @@ deps =
|
|||||||
pytest-mock
|
pytest-mock
|
||||||
pytest-asyncio
|
pytest-asyncio
|
||||||
pytest-aiohttp
|
pytest-aiohttp
|
||||||
|
aiohttp-basicauth
|
||||||
-rrequirements.txt
|
-rrequirements.txt
|
||||||
|
|
||||||
[testenv:eslint]
|
[testenv:eslint]
|
||||||
|
|||||||
@ -35,12 +35,14 @@ async def test_ok__htpasswd_service(tmpdir) -> None: # type: ignore
|
|||||||
path = os.path.abspath(str(tmpdir.join("htpasswd")))
|
path = os.path.abspath(str(tmpdir.join("htpasswd")))
|
||||||
|
|
||||||
htpasswd = passlib.apache.HtpasswdFile(path, new=True)
|
htpasswd = passlib.apache.HtpasswdFile(path, new=True)
|
||||||
htpasswd.set_password("admin", "foo")
|
htpasswd.set_password("admin", "pass")
|
||||||
htpasswd.save()
|
htpasswd.save()
|
||||||
|
|
||||||
async with get_configured_auth_service("htpasswd", file=path) as service:
|
async with get_configured_auth_service("htpasswd", file=path) as service:
|
||||||
assert (await service.login("admin", "foo"))
|
|
||||||
assert not (await service.login("user", "foo"))
|
assert not (await service.login("user", "foo"))
|
||||||
|
assert not (await service.login("admin", "foo"))
|
||||||
|
assert not (await service.login("user", "pass"))
|
||||||
|
assert (await service.login("admin", "pass"))
|
||||||
|
|
||||||
htpasswd.set_password("admin", "bar")
|
htpasswd.set_password("admin", "bar")
|
||||||
htpasswd.set_password("user", "bar")
|
htpasswd.set_password("user", "bar")
|
||||||
|
|||||||
@ -20,9 +20,11 @@
|
|||||||
# ========================================================================== #
|
# ========================================================================== #
|
||||||
|
|
||||||
|
|
||||||
|
from typing import Dict
|
||||||
from typing import AsyncGenerator
|
from typing import AsyncGenerator
|
||||||
|
|
||||||
import aiohttp.web
|
import aiohttp.web
|
||||||
|
import aiohttp_basicauth
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
@ -30,19 +32,27 @@ from . import get_configured_auth_service
|
|||||||
|
|
||||||
|
|
||||||
# =====
|
# =====
|
||||||
async def _handle_auth_post(request: aiohttp.web.BaseRequest) -> aiohttp.web.Response:
|
async def _handle_auth(request: aiohttp.web.BaseRequest) -> aiohttp.web.Response:
|
||||||
status = 400
|
status = 400
|
||||||
if request.method == "POST":
|
if request.method == "POST":
|
||||||
credentials = (await request.json())
|
credentials = (await request.json())
|
||||||
if credentials["user"] == "admin" and credentials["passwd"] == "foobar":
|
if credentials["user"] == "admin" and credentials["passwd"] == "pass":
|
||||||
status = 200
|
status = 200
|
||||||
return aiohttp.web.Response(text=str(status), status=status)
|
return aiohttp.web.Response(text=str(status), status=status)
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(name="auth_server_port")
|
@pytest.fixture(name="auth_server_port")
|
||||||
async def _auth_server_port_fixture(aiohttp_server) -> AsyncGenerator[int, None]: # type: ignore
|
async def _auth_server_port_fixture(aiohttp_server) -> AsyncGenerator[int, None]: # type: ignore
|
||||||
app = aiohttp.web.Application()
|
auth = aiohttp_basicauth.BasicAuthMiddleware(
|
||||||
app.router.add_post("/auth_post", _handle_auth_post)
|
username="server-admin",
|
||||||
|
password="server-pass",
|
||||||
|
force=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
app = aiohttp.web.Application(middlewares=[auth])
|
||||||
|
app.router.add_post("/auth", _handle_auth)
|
||||||
|
app.router.add_post("/auth_plus_basic", auth.required(_handle_auth))
|
||||||
|
|
||||||
server = await aiohttp_server(app)
|
server = await aiohttp_server(app)
|
||||||
try:
|
try:
|
||||||
yield server.port
|
yield server.port
|
||||||
@ -52,9 +62,18 @@ async def _auth_server_port_fixture(aiohttp_server) -> AsyncGenerator[int, None]
|
|||||||
|
|
||||||
# =====
|
# =====
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_ok__http_service(auth_server_port: int) -> None:
|
@pytest.mark.parametrize("kwargs", [
|
||||||
url = "http://localhost:%d/auth_post" % (auth_server_port)
|
{},
|
||||||
async with get_configured_auth_service("http", url=url) as service:
|
{"verify": False},
|
||||||
assert not (await service.login("admin", "foo"))
|
{"user": "server-admin", "passwd": "server-pass"},
|
||||||
assert not (await service.login("user", "foo"))
|
])
|
||||||
assert (await service.login("admin", "foobar"))
|
async def test_ok(auth_server_port: int, kwargs: Dict) -> None:
|
||||||
|
url = "http://localhost:%d/%s" % (
|
||||||
|
auth_server_port,
|
||||||
|
("auth_plus_basic" if kwargs.get("user") else "auth"),
|
||||||
|
)
|
||||||
|
async with get_configured_auth_service("http", url=url, **kwargs) as service:
|
||||||
|
assert not (await service.login("user", "foobar"))
|
||||||
|
assert not (await service.login("admin", "foobar"))
|
||||||
|
assert not (await service.login("user", "pass"))
|
||||||
|
assert (await service.login("admin", "pass"))
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user