One-KVM/kvmd/plugins/msd/otg/storage.py
2023-03-16 01:32:05 +02:00

200 lines
7.0 KiB
Python

# ========================================================================== #
# #
# KVMD - The main PiKVM daemon. #
# #
# Copyright (C) 2018-2022 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
# ========================================================================== #
import os
import dataclasses
from typing import Generator
from typing import Optional
import aiofiles
import aiofiles.os
from ....logging import get_logger
from .... import aiohelpers
from .. import MsdError
# =====
@dataclasses.dataclass(frozen=True)
class _Image:
name: str
path: str
in_storage: bool = dataclasses.field(init=False)
complete: bool = dataclasses.field(init=False, compare=False)
removable: bool = dataclasses.field(init=False, compare=False)
size: int = dataclasses.field(init=False, compare=False)
mod_ts: float = dataclasses.field(init=False, compare=False)
class Image(_Image):
def __init__(self, name: str, path: str, storage: Optional["Storage"]) -> None:
super().__init__(name, path)
self.__storage = storage
(self.__dir_path, file_name) = os.path.split(path)
self.__complete_path = os.path.join(self.__dir_path, f".__{file_name}.complete")
self.__adopted = (storage._is_adopted(self) if storage else True)
@property
def in_storage(self) -> bool:
return bool(self.__storage)
@property
def complete(self) -> bool:
if self.__storage:
return os.path.exists(self.__complete_path)
return True
@property
def removable(self) -> bool:
if not self.__storage:
return False
if not self.__adopted:
return True
return os.access(self.__dir_path, os.W_OK)
@property
def size(self) -> int:
try:
return os.stat(self.path).st_size
except Exception:
return 0
@property
def mod_ts(self) -> float:
try:
return os.stat(self.path).st_mtime
except Exception:
return 0.0
async def exists(self) -> bool:
return (await aiofiles.os.path.exists(self.path))
async def remount_rw(self, rw: bool, fatal: bool=True) -> None:
assert self.__storage
if not self.__adopted:
await self.__storage.remount_rw(rw, fatal)
async def remove(self, fatal: bool) -> None:
assert self.__storage
try:
await aiofiles.os.remove(self.path)
except FileNotFoundError:
pass
except Exception:
if fatal:
raise
await self.set_complete(False)
async def set_complete(self, flag: bool) -> None:
assert self.__storage
if flag:
async with aiofiles.open(self.__complete_path, "w"):
pass
else:
try:
await aiofiles.os.remove(self.__complete_path)
except FileNotFoundError:
pass
@dataclasses.dataclass(frozen=True)
class StorageSpace:
size: int
free: int
class Storage:
def __init__(self, path: str, remount_cmd: list[str]) -> None:
self.__path = path
self.__remount_cmd = remount_cmd
def get_watchable_paths(self) -> list[str]:
paths: list[str] = []
for (root_path, dirs, _) in os.walk(self.__path):
dirs[:] = list(self.__filter(dirs))
paths.append(root_path)
return paths
def get_images(self) -> dict[str, Image]:
images: dict[str, Image] = {}
for (root_path, dirs, files) in os.walk(self.__path):
dirs[:] = list(self.__filter(dirs))
for file in self.__filter(files):
name = os.path.relpath(os.path.join(root_path, file), self.__path)
images[name] = self.get_image_by_name(name)
return images
def __filter(self, items: list[str]) -> Generator[str, None, None]:
for item in sorted(map(str.strip, items)):
if not item.startswith(".") and item != "lost+found":
yield item
def get_image_by_name(self, name: str) -> Image:
assert name
path = os.path.join(self.__path, name)
return self.__get_image(name, path, True)
def get_image_by_path(self, path: str) -> Image:
assert path
in_storage = (os.path.commonpath([self.__path, path]) == self.__path)
if in_storage:
name = os.path.relpath(path, self.__path)
else:
name = os.path.basename(path)
return self.__get_image(name, path, in_storage)
def __get_image(self, name: str, path: str, in_storage: bool) -> Image:
assert name
assert path
return Image(name, path, (self if in_storage else None))
def get_space(self, fatal: bool) -> (StorageSpace | None):
try:
st = os.statvfs(self.__path)
except Exception as err:
if fatal:
raise
get_logger().warning("Can't get free space of filesystem %s: %s", self.__path, err)
return None
return StorageSpace(
size=(st.f_blocks * st.f_frsize),
free=(st.f_bavail * st.f_frsize),
)
def _is_adopted(self, image: Image) -> bool:
# True, если образ находится вне хранилища
# или в другой точке монтирования под ним
if not image.in_storage:
return True
path = image.path
while not os.path.ismount(path):
path = os.path.dirname(path)
return (self.__path != path)
async def remount_rw(self, rw: bool, fatal: bool=True) -> None:
if not (await aiohelpers.remount("MSD", self.__remount_cmd, rw)):
if fatal:
raise MsdError("Can't execute remount helper")