mirror of
https://github.com/mofeng-git/One-KVM.git
synced 2025-12-12 09:10:30 +08:00
msd images tree
This commit is contained in:
parent
c63bb2adb7
commit
5495f70564
@ -442,9 +442,9 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
|
|||||||
need_reload_state = False
|
need_reload_state = False
|
||||||
for event in (await inotify.get_series(timeout=1)):
|
for event in (await inotify.get_series(timeout=1)):
|
||||||
need_reload_state = True
|
need_reload_state = True
|
||||||
if event.mask & (InotifyMask.DELETE_SELF | InotifyMask.MOVE_SELF | InotifyMask.UNMOUNT):
|
if event.mask & (InotifyMask.DELETE_SELF | InotifyMask.MOVE_SELF | InotifyMask.UNMOUNT | InotifyMask.ISDIR):
|
||||||
# Если выгрузили OTG, что-то отмонтировали или делают еще какую-то странную фигню
|
# Если выгрузили OTG, изменили каталоги, что-то отмонтировали или делают еще какую-то странную фигню
|
||||||
logger.warning("Got fatal inotify event: %s; reinitializing MSD ...", event)
|
logger.info("Got a big inotify event: %s; reinitializing MSD ...", event)
|
||||||
need_restart = True
|
need_restart = True
|
||||||
break
|
break
|
||||||
if need_restart:
|
if need_restart:
|
||||||
|
|||||||
@ -23,6 +23,7 @@
|
|||||||
import os
|
import os
|
||||||
import dataclasses
|
import dataclasses
|
||||||
|
|
||||||
|
from typing import Generator
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
from ....logging import get_logger
|
from ....logging import get_logger
|
||||||
@ -39,24 +40,24 @@ class _Image:
|
|||||||
path: str
|
path: str
|
||||||
storage: Optional["Storage"] = dataclasses.field(compare=False)
|
storage: Optional["Storage"] = dataclasses.field(compare=False)
|
||||||
|
|
||||||
complete: bool = dataclasses.field(init=False, compare=False)
|
in_storage: bool = dataclasses.field(init=False)
|
||||||
in_storage: bool = dataclasses.field(init=False, compare=False)
|
|
||||||
|
|
||||||
|
complete: bool = dataclasses.field(init=False, compare=False)
|
||||||
size: int = dataclasses.field(init=False, compare=False)
|
size: int = dataclasses.field(init=False, compare=False)
|
||||||
mod_ts: float = dataclasses.field(init=False, compare=False)
|
mod_ts: float = dataclasses.field(init=False, compare=False)
|
||||||
|
|
||||||
|
|
||||||
class Image(_Image):
|
class Image(_Image):
|
||||||
|
@property
|
||||||
|
def in_storage(self) -> bool:
|
||||||
|
return (self.storage is not None)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def complete(self) -> bool:
|
def complete(self) -> bool:
|
||||||
if self.storage is not None:
|
if self.storage is not None:
|
||||||
return os.path.exists(self.__get_complete_path())
|
return os.path.exists(self.__get_complete_path())
|
||||||
return True
|
return True
|
||||||
|
|
||||||
@property
|
|
||||||
def in_storage(self) -> bool:
|
|
||||||
return (self.storage is not None)
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def size(self) -> int:
|
def size(self) -> int:
|
||||||
try:
|
try:
|
||||||
@ -69,14 +70,15 @@ class Image(_Image):
|
|||||||
try:
|
try:
|
||||||
return os.stat(self.path).st_mtime
|
return os.stat(self.path).st_mtime
|
||||||
except Exception:
|
except Exception:
|
||||||
return 0
|
return 0.0
|
||||||
|
|
||||||
def exists(self) -> bool:
|
def exists(self) -> bool:
|
||||||
return os.path.exists(self.path)
|
return os.path.exists(self.path)
|
||||||
|
|
||||||
async def remount_rw(self, rw: bool, fatal: bool=True) -> None:
|
async def remount_rw(self, rw: bool, fatal: bool=True) -> None:
|
||||||
assert self.storage
|
assert self.storage
|
||||||
await self.storage.remount_rw(rw, fatal)
|
if self.storage._is_mounted(self): # pylint: disable=protected-access
|
||||||
|
await self.storage.remount_rw(rw, fatal)
|
||||||
|
|
||||||
def remove(self, fatal: bool) -> None:
|
def remove(self, fatal: bool) -> None:
|
||||||
assert self.storage is not None
|
assert self.storage is not None
|
||||||
@ -101,7 +103,10 @@ class Image(_Image):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
def __get_complete_path(self) -> str:
|
def __get_complete_path(self) -> str:
|
||||||
return os.path.join(os.path.dirname(self.path), f".__{self.name}.complete")
|
return os.path.join(
|
||||||
|
os.path.dirname(self.path),
|
||||||
|
".__" + os.path.basename(self.path) + ".complete",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@dataclasses.dataclass(frozen=True)
|
@dataclasses.dataclass(frozen=True)
|
||||||
@ -116,31 +121,43 @@ class Storage:
|
|||||||
self.__remount_cmd = remount_cmd
|
self.__remount_cmd = remount_cmd
|
||||||
|
|
||||||
def get_watchable_paths(self) -> list[str]:
|
def get_watchable_paths(self) -> list[str]:
|
||||||
return [self.__path]
|
paths: list[str] = []
|
||||||
|
for (root_path, dirs, _) in os.walk(self.__path):
|
||||||
|
dirs[:] = list(self.__filter(dirs))
|
||||||
|
paths.append(root_path)
|
||||||
|
return paths
|
||||||
|
|
||||||
def get_images(self) -> dict[str, Image]:
|
def get_images(self) -> dict[str, Image]:
|
||||||
return {
|
images: dict[str, Image] = {}
|
||||||
name: self.get_image_by_name(name)
|
for (root_path, dirs, files) in os.walk(self.__path):
|
||||||
for name in os.listdir(self.__path)
|
dirs[:] = list(self.__filter(dirs))
|
||||||
if not name.startswith(".__") and name != "lost+found"
|
for file in self.__filter(files):
|
||||||
}
|
name = os.path.relpath(os.path.join(root_path, file), self.__path)
|
||||||
|
images[name] = self.get_image_by_name(name)
|
||||||
|
return images
|
||||||
|
|
||||||
|
def __filter(self, items: list[str]) -> Generator[str, None, None]:
|
||||||
|
for item in sorted(map(str.strip, items)):
|
||||||
|
if not item.startswith(".__") and item != "lost+found":
|
||||||
|
yield item
|
||||||
|
|
||||||
def get_image_by_name(self, name: str) -> Image:
|
def get_image_by_name(self, name: str) -> Image:
|
||||||
assert name
|
assert name
|
||||||
path = os.path.join(self.__path, name)
|
path = os.path.join(self.__path, name)
|
||||||
return self.__get_image(name, path)
|
return self.__get_image(name, path, True)
|
||||||
|
|
||||||
def get_image_by_path(self, path: str) -> Image:
|
def get_image_by_path(self, path: str) -> Image:
|
||||||
assert path
|
assert path
|
||||||
name = os.path.basename(path)
|
in_storage = (os.path.commonpath([self.__path, path]) == self.__path)
|
||||||
return self.__get_image(name, path)
|
if in_storage:
|
||||||
|
name = os.path.relpath(path, self.__path)
|
||||||
|
else:
|
||||||
|
name = os.path.basename(path)
|
||||||
|
return self.__get_image(name, path, in_storage)
|
||||||
|
|
||||||
def __get_image(self, name: str, path: str) -> Image:
|
def __get_image(self, name: str, path: str, in_storage: bool) -> Image:
|
||||||
assert name
|
assert name
|
||||||
assert not name.startswith(".__")
|
|
||||||
assert name != "lost+found"
|
|
||||||
assert path
|
assert path
|
||||||
in_storage = (os.path.dirname(path) == self.__path)
|
|
||||||
return Image(name, path, (self if in_storage else None))
|
return Image(name, path, (self if in_storage else None))
|
||||||
|
|
||||||
def get_space(self, fatal: bool) -> (StorageSpace | None):
|
def get_space(self, fatal: bool) -> (StorageSpace | None):
|
||||||
@ -156,6 +173,12 @@ class Storage:
|
|||||||
free=(st.f_bavail * st.f_frsize),
|
free=(st.f_bavail * st.f_frsize),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def _is_mounted(self, image: Image) -> bool:
|
||||||
|
path = image.path
|
||||||
|
while not os.path.ismount(path):
|
||||||
|
path = os.path.dirname(path)
|
||||||
|
return (path == self.__path)
|
||||||
|
|
||||||
async def remount_rw(self, rw: bool, fatal: bool=True) -> None:
|
async def remount_rw(self, rw: bool, fatal: bool=True) -> None:
|
||||||
if not (await aiohelpers.remount("MSD", self.__remount_cmd, rw)):
|
if not (await aiohelpers.remount("MSD", self.__remount_cmd, rw)):
|
||||||
if fatal:
|
if fatal:
|
||||||
|
|||||||
@ -42,10 +42,15 @@ def valid_atx_button(arg: Any) -> str:
|
|||||||
|
|
||||||
def valid_msd_image_name(arg: Any) -> str:
|
def valid_msd_image_name(arg: Any) -> str:
|
||||||
name = "MSD image name"
|
name = "MSD image name"
|
||||||
arg = valid_printable_filename(arg, name=name)
|
arg = valid_stripped_string_not_empty(arg, name)
|
||||||
if arg.startswith(".__") or arg == "lost+found":
|
parts: list[str] = list(filter(None, arg.split("/")))
|
||||||
|
if len(parts) == 0:
|
||||||
raise_error(arg, name)
|
raise_error(arg, name)
|
||||||
return arg
|
for (index, part) in enumerate(list(parts)):
|
||||||
|
parts[index] = valid_printable_filename(part, name=name)
|
||||||
|
if part.startswith(".__") or part == "lost+found":
|
||||||
|
raise_error(part, name)
|
||||||
|
return "/".join(parts)
|
||||||
|
|
||||||
|
|
||||||
def valid_info_fields(arg: Any, variants: set[str]) -> set[str]:
|
def valid_info_fields(arg: Any, variants: set[str]) -> set[str]:
|
||||||
|
|||||||
@ -27,6 +27,7 @@ import pytest
|
|||||||
from kvmd.validators import ValidatorError
|
from kvmd.validators import ValidatorError
|
||||||
from kvmd.validators.kvm import valid_atx_power_action
|
from kvmd.validators.kvm import valid_atx_power_action
|
||||||
from kvmd.validators.kvm import valid_atx_button
|
from kvmd.validators.kvm import valid_atx_button
|
||||||
|
from kvmd.validators.kvm import valid_msd_image_name
|
||||||
from kvmd.validators.kvm import valid_info_fields
|
from kvmd.validators.kvm import valid_info_fields
|
||||||
from kvmd.validators.kvm import valid_log_seek
|
from kvmd.validators.kvm import valid_log_seek
|
||||||
from kvmd.validators.kvm import valid_stream_quality
|
from kvmd.validators.kvm import valid_stream_quality
|
||||||
@ -60,6 +61,64 @@ def test_fail__valid_atx_button(arg: Any) -> None:
|
|||||||
print(valid_atx_button(arg))
|
print(valid_atx_button(arg))
|
||||||
|
|
||||||
|
|
||||||
|
# =====
|
||||||
|
@pytest.mark.parametrize("arg, retval", [
|
||||||
|
("archlinux-2018.07.01-i686.iso", "archlinux-2018.07.01-i686.iso"),
|
||||||
|
("archlinux-2018.07.01-x86_64.iso", "archlinux-2018.07.01-x86_64.iso"),
|
||||||
|
("dsl-4.11.rc1.iso", "dsl-4.11.rc1.iso"),
|
||||||
|
("systemrescuecd-x86-5.3.1.iso", "systemrescuecd-x86-5.3.1.iso"),
|
||||||
|
("ubuntu-16.04.5-desktop-i386.iso", "ubuntu-16.04.5-desktop-i386.iso"),
|
||||||
|
(" тест(){}[ \t].iso\t", "тест(){}[ _].iso"),
|
||||||
|
("\n" + "x" * 1000, "x" * 255),
|
||||||
|
("test", "test"),
|
||||||
|
("test test [test] #test$", "test test [test] #test$"),
|
||||||
|
(".test", ".test"),
|
||||||
|
("..test", "..test"),
|
||||||
|
("..тест..", "..тест.."),
|
||||||
|
("..те\\ст..", "..те\\ст.."),
|
||||||
|
(".....", "....."),
|
||||||
|
(".....txt", ".....txt"),
|
||||||
|
(" .. .", ".. ."),
|
||||||
|
("..\n.", ".._."),
|
||||||
|
("test/", "test"),
|
||||||
|
("/test", "test"),
|
||||||
|
("foo/bar.iso", "foo/bar.iso"),
|
||||||
|
("//foo//bar.iso", "foo/bar.iso"),
|
||||||
|
("foo/lost-found/bar.iso", "foo/lost-found/bar.iso"),
|
||||||
|
("/bar.iso/", "bar.iso"),
|
||||||
|
|
||||||
|
])
|
||||||
|
def test_ok__valid_msd_image_name(arg: Any, retval: str) -> None:
|
||||||
|
assert valid_msd_image_name(arg) == retval
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("arg", [
|
||||||
|
".",
|
||||||
|
"..",
|
||||||
|
" ..",
|
||||||
|
"../test",
|
||||||
|
"./.",
|
||||||
|
"../.",
|
||||||
|
"./..",
|
||||||
|
"../..",
|
||||||
|
"/ ..",
|
||||||
|
".. /",
|
||||||
|
"/.. /",
|
||||||
|
"foo/../bar.iso",
|
||||||
|
"foo/./foo.iso",
|
||||||
|
"foo/lost+found/bar.iso",
|
||||||
|
"../bar.iso",
|
||||||
|
"/../bar.iso",
|
||||||
|
"foo/.__bar.iso",
|
||||||
|
"",
|
||||||
|
" ",
|
||||||
|
None,
|
||||||
|
])
|
||||||
|
def test_fail__valid_msd_image_name(arg: Any) -> None:
|
||||||
|
with pytest.raises(ValidatorError):
|
||||||
|
valid_msd_image_name(arg)
|
||||||
|
|
||||||
|
|
||||||
# =====
|
# =====
|
||||||
@pytest.mark.parametrize("arg", [" foo ", "bar", "foo, ,bar,", " ", " , ", ""])
|
@pytest.mark.parametrize("arg", [" foo ", "bar", "foo, ,bar,", " ", " , ", ""])
|
||||||
def test_ok__valid_info_fields(arg: Any) -> None:
|
def test_ok__valid_info_fields(arg: Any) -> None:
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user