mirror of
https://github.com/mofeng-git/One-KVM.git
synced 2025-12-12 17:20:30 +08:00
better cleanup testing, remove all unix sockets
This commit is contained in:
parent
1d75b738a0
commit
b8e3ceef6d
@ -63,9 +63,15 @@ def main(argv: Optional[List[str]]=None) -> None:
|
||||
except subprocess.CalledProcessError:
|
||||
pass
|
||||
|
||||
unix_path = config.server.unix
|
||||
if unix_path and os.path.exists(unix_path):
|
||||
logger.info("Removing socket %r ...", unix_path)
|
||||
os.remove(unix_path)
|
||||
for (owner, unix_path) in [
|
||||
("KVMD", config.server.unix),
|
||||
("streamer", config.streamer.unix),
|
||||
]:
|
||||
if unix_path and os.path.exists(unix_path):
|
||||
logger.info("Removing %s socket %r ...", owner, unix_path)
|
||||
try:
|
||||
os.remove(unix_path)
|
||||
except Exception:
|
||||
logger.exception("Can't remove %s socket %r", owner, unix_path)
|
||||
|
||||
logger.info("Bye-bye")
|
||||
|
||||
@ -28,7 +28,7 @@ deps =
|
||||
-rrequirements.txt
|
||||
|
||||
[testenv:vulture]
|
||||
commands = vulture --ignore-names=_format_P --ignore-decorators=@_exposed,@_system_task kvmd genmap.py tests testenv/linters/vulture-wl.py
|
||||
commands = vulture --ignore-names=_format_P --ignore-decorators=@_exposed,@_system_task,@pytest.fixture kvmd genmap.py tests testenv/linters/vulture-wl.py
|
||||
deps =
|
||||
vulture
|
||||
-rrequirements.txt
|
||||
|
||||
@ -20,15 +20,54 @@
|
||||
# ========================================================================== #
|
||||
|
||||
|
||||
import os
|
||||
import string
|
||||
import random
|
||||
import multiprocessing
|
||||
import multiprocessing.queues
|
||||
import time
|
||||
|
||||
import setproctitle
|
||||
|
||||
from kvmd.apps.cleanup import main
|
||||
|
||||
|
||||
# =====
|
||||
def test_main() -> None:
|
||||
open("/tmp/foobar.sock", "w").close()
|
||||
def test_main(tmpdir) -> None: # type: ignore
|
||||
queue: multiprocessing.queues.Queue = multiprocessing.Queue()
|
||||
|
||||
ustreamer_fake_name = "ustr-" + "".join(
|
||||
random.choice(string.ascii_lowercase + string.digits)
|
||||
for _ in range(5)
|
||||
)
|
||||
|
||||
ustreamer_sock_path = os.path.abspath(str(tmpdir.join("ustreamer-fake.sock")))
|
||||
kvmd_sock_path = os.path.abspath(str(tmpdir.join("kvmd-fake.sock")))
|
||||
|
||||
def ustreamer_fake() -> None:
|
||||
setproctitle.setproctitle(ustreamer_fake_name)
|
||||
queue.put(True)
|
||||
while True:
|
||||
time.sleep(1)
|
||||
|
||||
proc = multiprocessing.Process(target=ustreamer_fake, daemon=True)
|
||||
proc.start()
|
||||
assert queue.get(timeout=5)
|
||||
|
||||
assert proc.is_alive()
|
||||
main([
|
||||
"kvmd-cleanup",
|
||||
"--set-options",
|
||||
"kvmd/server/port=0",
|
||||
"kvmd/server/unix=" + kvmd_sock_path,
|
||||
"kvmd/hid/device=/dev/null",
|
||||
"kvmd/streamer/unix=/tmp/foobar.sock",
|
||||
"kvmd/streamer/port=0",
|
||||
"kvmd/streamer/unix=" + ustreamer_sock_path,
|
||||
"kvmd/streamer/cmd=[\"%s\"]" % (ustreamer_fake_name),
|
||||
])
|
||||
assert not proc.is_alive()
|
||||
|
||||
assert not os.path.exists(ustreamer_sock_path)
|
||||
assert not os.path.exists(kvmd_sock_path)
|
||||
|
||||
proc.join()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user