mirror of
https://github.com/mofeng-git/One-KVM.git
synced 2025-12-12 17:20:30 +08:00
refactoring
This commit is contained in:
parent
dda30309a4
commit
3a68e82f94
@ -30,7 +30,7 @@ from kvmd.aioregion import AioExclusiveRegion
|
|||||||
|
|
||||||
# =====
|
# =====
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_ok__aioregion__one(event_loop: asyncio.AbstractEventLoop) -> None:
|
async def test_ok__access_one(event_loop: asyncio.AbstractEventLoop) -> None:
|
||||||
_ = event_loop
|
_ = event_loop
|
||||||
region = AioExclusiveRegion(RegionIsBusyError)
|
region = AioExclusiveRegion(RegionIsBusyError)
|
||||||
|
|
||||||
@ -48,7 +48,7 @@ async def test_ok__aioregion__one(event_loop: asyncio.AbstractEventLoop) -> None
|
|||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_fail__aioregion__one(event_loop: asyncio.AbstractEventLoop) -> None:
|
async def test_fail__access_one(event_loop: asyncio.AbstractEventLoop) -> None:
|
||||||
_ = event_loop
|
_ = event_loop
|
||||||
region = AioExclusiveRegion(RegionIsBusyError)
|
region = AioExclusiveRegion(RegionIsBusyError)
|
||||||
|
|
||||||
@ -69,7 +69,7 @@ async def test_fail__aioregion__one(event_loop: asyncio.AbstractEventLoop) -> No
|
|||||||
|
|
||||||
# =====
|
# =====
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_ok__aioregion__two(event_loop: asyncio.AbstractEventLoop) -> None:
|
async def test_ok__access_two(event_loop: asyncio.AbstractEventLoop) -> None:
|
||||||
region = AioExclusiveRegion(RegionIsBusyError)
|
region = AioExclusiveRegion(RegionIsBusyError)
|
||||||
|
|
||||||
async def func1() -> None:
|
async def func1() -> None:
|
||||||
@ -92,7 +92,7 @@ async def test_ok__aioregion__two(event_loop: asyncio.AbstractEventLoop) -> None
|
|||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_fail__aioregion__two(event_loop: asyncio.AbstractEventLoop) -> None:
|
async def test_fail__access_two(event_loop: asyncio.AbstractEventLoop) -> None:
|
||||||
region = AioExclusiveRegion(RegionIsBusyError)
|
region = AioExclusiveRegion(RegionIsBusyError)
|
||||||
|
|
||||||
async def func1() -> None:
|
async def func1() -> None:
|
||||||
|
|||||||
@ -32,7 +32,7 @@ from kvmd.apps.cleanup import main
|
|||||||
|
|
||||||
|
|
||||||
# =====
|
# =====
|
||||||
def test_main(tmpdir) -> None: # type: ignore
|
def test_ok(tmpdir) -> None: # type: ignore
|
||||||
queue: multiprocessing.queues.Queue = multiprocessing.Queue()
|
queue: multiprocessing.queues.Queue = multiprocessing.Queue()
|
||||||
|
|
||||||
ustreamer_fake_name = "ustr-" + secrets.token_hex(3)
|
ustreamer_fake_name = "ustr-" + secrets.token_hex(3)
|
||||||
|
|||||||
@ -54,7 +54,7 @@ def _htpasswd_fixture(request) -> Generator[passlib.apache.HtpasswdFile, None, N
|
|||||||
os.remove(path)
|
os.remove(path)
|
||||||
|
|
||||||
|
|
||||||
def _run_main(htpasswd: passlib.apache.HtpasswdFile, cmd: List[str]) -> None:
|
def _run_htpasswd(htpasswd: passlib.apache.HtpasswdFile, cmd: List[str]) -> None:
|
||||||
main([
|
main([
|
||||||
"kvmd-htpasswd",
|
"kvmd-htpasswd",
|
||||||
*cmd,
|
*cmd,
|
||||||
@ -64,32 +64,32 @@ def _run_main(htpasswd: passlib.apache.HtpasswdFile, cmd: List[str]) -> None:
|
|||||||
|
|
||||||
|
|
||||||
# =====
|
# =====
|
||||||
def test_ok__main_list(htpasswd: passlib.apache.HtpasswdFile, capsys) -> None: # type: ignore
|
def test_ok__list(htpasswd: passlib.apache.HtpasswdFile, capsys) -> None: # type: ignore
|
||||||
_run_main(htpasswd, ["list"])
|
_run_htpasswd(htpasswd, ["list"])
|
||||||
(out, err) = capsys.readouterr()
|
(out, err) = capsys.readouterr()
|
||||||
assert len(err) == 0
|
assert len(err) == 0
|
||||||
assert sorted(filter(None, out.split("\n"))) == sorted(htpasswd.users()) == sorted(set(htpasswd.users()))
|
assert sorted(filter(None, out.split("\n"))) == sorted(htpasswd.users()) == sorted(set(htpasswd.users()))
|
||||||
|
|
||||||
|
|
||||||
# =====
|
# =====
|
||||||
def test_ok__main_set__change_stdin(htpasswd: passlib.apache.HtpasswdFile, mocker) -> None: # type: ignore
|
def test_ok__set_change_stdin(htpasswd: passlib.apache.HtpasswdFile, mocker) -> None: # type: ignore
|
||||||
old_users = set(htpasswd.users())
|
old_users = set(htpasswd.users())
|
||||||
if old_users:
|
if old_users:
|
||||||
assert htpasswd.check_password("admin", _make_passwd("admin"))
|
assert htpasswd.check_password("admin", _make_passwd("admin"))
|
||||||
|
|
||||||
mocker.patch.object(builtins, "input", (lambda: " test "))
|
mocker.patch.object(builtins, "input", (lambda: " test "))
|
||||||
_run_main(htpasswd, ["set", "admin", "--read-stdin"])
|
_run_htpasswd(htpasswd, ["set", "admin", "--read-stdin"])
|
||||||
|
|
||||||
htpasswd.load(force=True)
|
htpasswd.load(force=True)
|
||||||
assert htpasswd.check_password("admin", " test ")
|
assert htpasswd.check_password("admin", " test ")
|
||||||
assert old_users == set(htpasswd.users())
|
assert old_users == set(htpasswd.users())
|
||||||
|
|
||||||
|
|
||||||
def test_ok__main_set__add_stdin(htpasswd: passlib.apache.HtpasswdFile, mocker) -> None: # type: ignore
|
def test_ok__set_add_stdin(htpasswd: passlib.apache.HtpasswdFile, mocker) -> None: # type: ignore
|
||||||
old_users = set(htpasswd.users())
|
old_users = set(htpasswd.users())
|
||||||
if old_users:
|
if old_users:
|
||||||
mocker.patch.object(builtins, "input", (lambda: " test "))
|
mocker.patch.object(builtins, "input", (lambda: " test "))
|
||||||
_run_main(htpasswd, ["set", "new", "--read-stdin"])
|
_run_htpasswd(htpasswd, ["set", "new", "--read-stdin"])
|
||||||
|
|
||||||
htpasswd.load(force=True)
|
htpasswd.load(force=True)
|
||||||
assert htpasswd.check_password("new", " test ")
|
assert htpasswd.check_password("new", " test ")
|
||||||
@ -97,20 +97,20 @@ def test_ok__main_set__add_stdin(htpasswd: passlib.apache.HtpasswdFile, mocker)
|
|||||||
|
|
||||||
|
|
||||||
# =====
|
# =====
|
||||||
def test_ok__main_set__change_getpass(htpasswd: passlib.apache.HtpasswdFile, mocker) -> None: # type: ignore
|
def test_ok__set_change_getpass(htpasswd: passlib.apache.HtpasswdFile, mocker) -> None: # type: ignore
|
||||||
old_users = set(htpasswd.users())
|
old_users = set(htpasswd.users())
|
||||||
if old_users:
|
if old_users:
|
||||||
assert htpasswd.check_password("admin", _make_passwd("admin"))
|
assert htpasswd.check_password("admin", _make_passwd("admin"))
|
||||||
|
|
||||||
mocker.patch.object(getpass, "getpass", (lambda *_, **__: " test "))
|
mocker.patch.object(getpass, "getpass", (lambda *_, **__: " test "))
|
||||||
_run_main(htpasswd, ["set", "admin"])
|
_run_htpasswd(htpasswd, ["set", "admin"])
|
||||||
|
|
||||||
htpasswd.load(force=True)
|
htpasswd.load(force=True)
|
||||||
assert htpasswd.check_password("admin", " test ")
|
assert htpasswd.check_password("admin", " test ")
|
||||||
assert old_users == set(htpasswd.users())
|
assert old_users == set(htpasswd.users())
|
||||||
|
|
||||||
|
|
||||||
def test_fail__main_set__change_getpass(htpasswd: passlib.apache.HtpasswdFile, mocker) -> None: # type: ignore
|
def test_fail__set_change_getpass(htpasswd: passlib.apache.HtpasswdFile, mocker) -> None: # type: ignore
|
||||||
old_users = set(htpasswd.users())
|
old_users = set(htpasswd.users())
|
||||||
if old_users:
|
if old_users:
|
||||||
assert htpasswd.check_password("admin", _make_passwd("admin"))
|
assert htpasswd.check_password("admin", _make_passwd("admin"))
|
||||||
@ -129,7 +129,7 @@ def test_fail__main_set__change_getpass(htpasswd: passlib.apache.HtpasswdFile, m
|
|||||||
|
|
||||||
mocker.patch.object(getpass, "getpass", fake_getpass)
|
mocker.patch.object(getpass, "getpass", fake_getpass)
|
||||||
with pytest.raises(SystemExit, match="Sorry, passwords do not match"):
|
with pytest.raises(SystemExit, match="Sorry, passwords do not match"):
|
||||||
_run_main(htpasswd, ["set", "admin"])
|
_run_htpasswd(htpasswd, ["set", "admin"])
|
||||||
assert count == 2
|
assert count == 2
|
||||||
|
|
||||||
htpasswd.load(force=True)
|
htpasswd.load(force=True)
|
||||||
@ -138,13 +138,13 @@ def test_fail__main_set__change_getpass(htpasswd: passlib.apache.HtpasswdFile, m
|
|||||||
|
|
||||||
|
|
||||||
# =====
|
# =====
|
||||||
def test_ok__main_del(htpasswd: passlib.apache.HtpasswdFile) -> None:
|
def test_ok__del(htpasswd: passlib.apache.HtpasswdFile) -> None:
|
||||||
old_users = set(htpasswd.users())
|
old_users = set(htpasswd.users())
|
||||||
|
|
||||||
if old_users:
|
if old_users:
|
||||||
assert htpasswd.check_password("admin", _make_passwd("admin"))
|
assert htpasswd.check_password("admin", _make_passwd("admin"))
|
||||||
|
|
||||||
_run_main(htpasswd, ["del", "admin"])
|
_run_htpasswd(htpasswd, ["del", "admin"])
|
||||||
|
|
||||||
htpasswd.load(force=True)
|
htpasswd.load(force=True)
|
||||||
assert not htpasswd.check_password("admin", _make_passwd("admin"))
|
assert not htpasswd.check_password("admin", _make_passwd("admin"))
|
||||||
|
|||||||
@ -24,7 +24,7 @@ from kvmd import gpio
|
|||||||
|
|
||||||
|
|
||||||
# =====
|
# =====
|
||||||
def test_gpio__loopback_initial_false() -> None:
|
def test_ok__loopback_initial_false() -> None:
|
||||||
# pylint: disable=singleton-comparison
|
# pylint: disable=singleton-comparison
|
||||||
with gpio.bcm():
|
with gpio.bcm():
|
||||||
assert gpio.set_output(0) == 0
|
assert gpio.set_output(0) == 0
|
||||||
@ -33,7 +33,7 @@ def test_gpio__loopback_initial_false() -> None:
|
|||||||
assert gpio.read(0) is True
|
assert gpio.read(0) is True
|
||||||
|
|
||||||
|
|
||||||
def test_gpio__loopback_initial_true() -> None:
|
def test_ok__loopback_initial_true() -> None:
|
||||||
# pylint: disable=singleton-comparison
|
# pylint: disable=singleton-comparison
|
||||||
with gpio.bcm():
|
with gpio.bcm():
|
||||||
assert gpio.set_output(0, True) == 0
|
assert gpio.set_output(0, True) == 0
|
||||||
@ -42,7 +42,7 @@ def test_gpio__loopback_initial_true() -> None:
|
|||||||
assert gpio.read(0) is False
|
assert gpio.read(0) is False
|
||||||
|
|
||||||
|
|
||||||
def test_gpio__input() -> None:
|
def test_ok__input() -> None:
|
||||||
# pylint: disable=singleton-comparison
|
# pylint: disable=singleton-comparison
|
||||||
with gpio.bcm():
|
with gpio.bcm():
|
||||||
assert gpio.set_input(0) == 0
|
assert gpio.set_input(0) == 0
|
||||||
|
|||||||
@ -26,11 +26,11 @@ from kvmd.keymap import KEYMAP
|
|||||||
|
|
||||||
|
|
||||||
# =====
|
# =====
|
||||||
def test_keymap__ok() -> None:
|
def test_ok__keymap() -> None:
|
||||||
assert type(KEYMAP["KeyA"]) == int # pylint: disable=unidiomatic-typecheck
|
assert type(KEYMAP["KeyA"]) == int # pylint: disable=unidiomatic-typecheck
|
||||||
assert KEYMAP["KeyA"] == 1
|
assert KEYMAP["KeyA"] == 1
|
||||||
|
|
||||||
|
|
||||||
def test_keymap__fail() -> None:
|
def test_fail__keymap() -> None:
|
||||||
with pytest.raises(KeyError):
|
with pytest.raises(KeyError):
|
||||||
print(KEYMAP["keya"])
|
print(KEYMAP["keya"])
|
||||||
|
|||||||
@ -31,5 +31,5 @@ from kvmd.logging import get_logger
|
|||||||
(1, "_pytest.python"),
|
(1, "_pytest.python"),
|
||||||
(2, "pluggy.callers"),
|
(2, "pluggy.callers"),
|
||||||
])
|
])
|
||||||
def test_get_logger(depth: int, name: str) -> None:
|
def test_ok__get_logger(depth: int, name: str) -> None:
|
||||||
assert get_logger(depth).name == name
|
assert get_logger(depth).name == name
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user