mirror of
https://github.com/mofeng-git/One-KVM.git
synced 2025-12-12 01:00:29 +08:00
new msd fs structure
This commit is contained in:
parent
b4fa35f05f
commit
7c7ac38bfe
@ -25,7 +25,12 @@ import os
|
||||
import shutil
|
||||
import subprocess
|
||||
|
||||
from ... import fstab
|
||||
from os.path import join # pylint: disable=ungrouped-imports
|
||||
from os.path import exists # pylint: disable=ungrouped-imports
|
||||
|
||||
from ...fstab import Partition
|
||||
from ...fstab import find_msd
|
||||
from ...fstab import find_pst
|
||||
|
||||
|
||||
# =====
|
||||
@ -43,7 +48,7 @@ def _remount(path: str, rw: bool) -> None:
|
||||
|
||||
|
||||
def _mkdir(path: str) -> None:
|
||||
if not os.path.exists(path):
|
||||
if not exists(path):
|
||||
_log(f"MKDIR --- {path}")
|
||||
try:
|
||||
os.mkdir(path)
|
||||
@ -51,6 +56,23 @@ def _mkdir(path: str) -> None:
|
||||
raise SystemExit(f"Can't create directory: {err}")
|
||||
|
||||
|
||||
def _rmdir(path: str) -> None:
|
||||
if exists(path):
|
||||
_log(f"RMDIR --- {path}")
|
||||
try:
|
||||
os.rmdir(path)
|
||||
except Exception as err:
|
||||
raise SystemExit(f"Can't remove directory: {err}")
|
||||
|
||||
|
||||
def _move(src: str, dest: str) -> None:
|
||||
_log(f"MOVE --- {src} --> {dest}")
|
||||
try:
|
||||
os.rename(src, dest)
|
||||
except Exception as err:
|
||||
raise SystemExit(f"Can't move file: {err}")
|
||||
|
||||
|
||||
def _chown(path: str, user: str) -> None:
|
||||
_log(f"CHOWN --- {user} - {path}")
|
||||
try:
|
||||
@ -59,20 +81,42 @@ def _chown(path: str, user: str) -> None:
|
||||
raise SystemExit(f"Can't change ownership: {err}")
|
||||
|
||||
|
||||
# =====
|
||||
def _fix_msd(part: Partition) -> None:
|
||||
images_path = join(part.root_path, "images")
|
||||
meta_path = join(part.root_path, "meta")
|
||||
if exists(images_path) and exists(meta_path):
|
||||
for name in os.listdir(meta_path):
|
||||
_move(join(meta_path, name), join(part.root_path, f".__{name}"))
|
||||
_rmdir(meta_path)
|
||||
for name in os.listdir(images_path):
|
||||
_move(join(images_path, name), os.path.join(part.root_path, name))
|
||||
_rmdir(images_path)
|
||||
if part.user:
|
||||
_chown(part.root_path, part.user)
|
||||
|
||||
|
||||
def _fix_pst(part: Partition) -> None:
|
||||
path = os.path.join(part.root_path, "data")
|
||||
_mkdir(path)
|
||||
if part.user:
|
||||
_chown(path, part.user)
|
||||
|
||||
|
||||
# =====
|
||||
def main() -> None:
|
||||
if len(sys.argv) != 2 or sys.argv[1] not in ["ro", "rw"]:
|
||||
raise SystemExit(f"Usage: {sys.argv[0]} [ro|rw]")
|
||||
|
||||
finder = None
|
||||
dirs: list[str] = []
|
||||
fix = None
|
||||
app = os.path.basename(sys.argv[0])
|
||||
if app == "kvmd-helper-otgmsd-remount":
|
||||
finder = fstab.find_msd
|
||||
dirs = ["images", "meta"]
|
||||
finder = find_msd
|
||||
fix = _fix_msd
|
||||
elif app == "kvmd-helper-pst-remount":
|
||||
finder = fstab.find_pst
|
||||
dirs = ["data"]
|
||||
finder = find_pst
|
||||
fix = _fix_pst
|
||||
else:
|
||||
raise SystemExit("Unknown application target")
|
||||
|
||||
@ -82,9 +126,5 @@ def main() -> None:
|
||||
part = finder()
|
||||
_remount(part.mount_path, rw)
|
||||
if rw and part.root_path:
|
||||
for name in dirs:
|
||||
path = os.path.join(part.root_path, name)
|
||||
_mkdir(path)
|
||||
if part.user:
|
||||
_chown(path, part.user)
|
||||
fix(part)
|
||||
_log(f"Storage in the {'RW' if rw else 'RO'}-mode now")
|
||||
|
||||
@ -37,8 +37,8 @@ from ....yamlconf import Option
|
||||
|
||||
from ....validators.basic import valid_bool
|
||||
from ....validators.basic import valid_number
|
||||
from ....validators.os import valid_printable_filename
|
||||
from ....validators.os import valid_command
|
||||
from ....validators.kvm import valid_msd_image_name
|
||||
|
||||
from .... import aiotools
|
||||
from .... import aiohelpers
|
||||
@ -170,7 +170,7 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
|
||||
], type=valid_command),
|
||||
|
||||
"initial": {
|
||||
"image": Option("", type=valid_printable_filename, if_empty=""),
|
||||
"image": Option("", type=valid_msd_image_name, if_empty=""),
|
||||
"cdrom": Option(False, type=valid_bool),
|
||||
},
|
||||
}
|
||||
|
||||
@ -46,7 +46,7 @@ class Image(_Image):
|
||||
@property
|
||||
def complete(self) -> bool:
|
||||
if self.storage is not None:
|
||||
return os.path.exists(self.storage._get_complete_path(self)) # pylint: disable=protected-access
|
||||
return os.path.exists(self.__get_complete_path())
|
||||
return True
|
||||
|
||||
@property
|
||||
@ -83,7 +83,7 @@ class Image(_Image):
|
||||
|
||||
def set_complete(self, flag: bool) -> None:
|
||||
assert self.storage is not None
|
||||
path = self.storage._get_complete_path(self) # pylint: disable=protected-access
|
||||
path = self.__get_complete_path()
|
||||
if flag:
|
||||
open(path, "w").close() # pylint: disable=consider-using-with
|
||||
else:
|
||||
@ -92,6 +92,9 @@ class Image(_Image):
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
|
||||
def __get_complete_path(self) -> str:
|
||||
return os.path.join(os.path.dirname(self.path), f".__{self.name}.complete")
|
||||
|
||||
|
||||
@dataclasses.dataclass(frozen=True)
|
||||
class StorageSpace:
|
||||
@ -102,24 +105,20 @@ class StorageSpace:
|
||||
class Storage:
|
||||
def __init__(self, path: str) -> None:
|
||||
self.__path = path
|
||||
self.__images_path = os.path.join(self.__path, "images")
|
||||
self.__meta_path = os.path.join(self.__path, "meta")
|
||||
|
||||
def _get_complete_path(self, image: Image) -> str:
|
||||
return os.path.join(self.__meta_path, image.name + ".complete")
|
||||
|
||||
def get_watchable_paths(self) -> list[str]:
|
||||
return [self.__images_path, self.__meta_path]
|
||||
return [self.__path]
|
||||
|
||||
def get_images(self) -> dict[str, Image]:
|
||||
return {
|
||||
name: self.get_image_by_name(name)
|
||||
for name in os.listdir(self.__images_path)
|
||||
for name in os.listdir(self.__path)
|
||||
if not name.startswith(".__") and name != "lost+found"
|
||||
}
|
||||
|
||||
def get_image_by_name(self, name: str) -> Image:
|
||||
assert name
|
||||
path = os.path.join(self.__images_path, name)
|
||||
path = os.path.join(self.__path, name)
|
||||
return self.__get_image(name, path)
|
||||
|
||||
def get_image_by_path(self, path: str) -> Image:
|
||||
@ -129,8 +128,10 @@ class Storage:
|
||||
|
||||
def __get_image(self, name: str, path: str) -> Image:
|
||||
assert name
|
||||
assert not name.startswith(".__")
|
||||
assert name != "lost+found"
|
||||
assert path
|
||||
in_storage = (os.path.dirname(path) == self.__images_path)
|
||||
in_storage = (os.path.dirname(path) == self.__path)
|
||||
return Image(name, path, (self if in_storage else None))
|
||||
|
||||
def get_space(self, fatal: bool) -> (StorageSpace | None):
|
||||
|
||||
@ -41,7 +41,11 @@ def valid_atx_button(arg: Any) -> str:
|
||||
|
||||
|
||||
def valid_msd_image_name(arg: Any) -> str:
|
||||
return valid_printable_filename(arg, name="MSD image name") # pragma: nocover
|
||||
name = "MSD image name"
|
||||
arg = valid_printable_filename(arg, name=name)
|
||||
if arg.startswith(".__") or arg == "lost+found":
|
||||
raise_error(arg, name)
|
||||
return arg
|
||||
|
||||
|
||||
def valid_info_fields(arg: Any, variants: set[str]) -> set[str]:
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user