Merge remote-tracking branch 'upstream/master'

This commit is contained in:
mofeng-git
2024-11-20 15:18:34 +00:00
166 changed files with 5421 additions and 2645 deletions

View File

@@ -26,12 +26,12 @@ import dataclasses
import functools
import time
import os
import copy
from typing import AsyncGenerator
from ....logging import get_logger
from ....inotify import InotifyMask
from ....inotify import Inotify
from ....yamlconf import Option
@@ -97,15 +97,17 @@ class _State:
@contextlib.asynccontextmanager
async def busy(self, check_online: bool=True) -> AsyncGenerator[None, None]:
async with self._region:
async with self._lock:
self.__notifier.notify()
if check_online:
if self.vd is None:
raise MsdOfflineError()
assert self.storage
yield
self.__notifier.notify()
try:
with self._region:
async with self._lock:
self.__notifier.notify()
if check_online:
if self.vd is None:
raise MsdOfflineError()
assert self.storage
yield
finally:
self.__notifier.notify()
def is_busy(self) -> bool:
return self._region.is_busy()
@@ -141,10 +143,11 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
self.__notifier = aiotools.AioNotifier()
self.__state = _State(self.__notifier)
self.__reset = False
logger = get_logger(0)
logger.info("Using OTG gadget %r as MSD", gadget)
aiotools.run_sync(self.__reload_state(notify=False))
aiotools.run_sync(self.__unsafe_reload_state())
@classmethod
def get_plugin_options(cls) -> dict:
@@ -164,14 +167,13 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
},
}
# =====
async def get_state(self) -> dict:
async with self.__state._lock: # pylint: disable=protected-access
storage: (dict | None) = None
if self.__state.storage:
if self.__writer:
# При загрузке файла показываем актуальную статистику вручную
await self.__storage.reload_parts_info()
assert self.__state.vd
storage = dataclasses.asdict(self.__state.storage)
for name in list(storage["images"]):
del storage["images"][name]["name"]
@@ -185,34 +187,50 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
vd: (dict | None) = None
if self.__state.vd:
assert self.__state.storage
vd = dataclasses.asdict(self.__state.vd)
if vd["image"]:
del vd["image"]["path"]
return {
"enabled": True,
"online": (bool(self.__state.vd) and self.__drive.is_enabled()),
"online": (bool(vd) and self.__drive.is_enabled()),
"busy": self.__state.is_busy(),
"storage": storage,
"drive": vd,
}
async def poll_state(self) -> AsyncGenerator[dict, None]:
prev_state: dict = {}
while True:
state = await self.get_state()
if state != prev_state:
yield state
prev_state = state
await self.__notifier.wait()
async def trigger_state(self) -> None:
self.__notifier.notify(1)
async def systask(self) -> None:
await self.__watch_inotify()
async def poll_state(self) -> AsyncGenerator[dict, None]:
prev: dict = {}
while True:
if (await self.__notifier.wait()) > 0:
prev = {}
new = await self.get_state()
if not prev or (prev.get("online") != new["online"]):
prev = copy.deepcopy(new)
yield new
else:
diff: dict = {}
for sub in ["busy", "drive"]:
if prev.get(sub) != new[sub]:
diff[sub] = new[sub]
for sub in ["images", "parts", "downloading", "uploading"]:
if (prev.get("storage") or {}).get(sub) != (new["storage"] or {}).get(sub):
if "storage" not in diff:
diff["storage"] = {}
diff["storage"][sub] = new["storage"][sub]
if diff:
prev = copy.deepcopy(new)
yield diff
@aiotools.atomic_fg
async def reset(self) -> None:
async with self.__state.busy(check_online=False):
try:
self.__reset = True
self.__drive.set_image_path("")
self.__drive.set_cdrom_flag(False)
self.__drive.set_rw_flag(False)
@@ -220,11 +238,6 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
except Exception:
get_logger(0).exception("Can't reset MSD properly")
@aiotools.atomic_fg
async def cleanup(self) -> None:
await self.__close_reader()
await self.__close_writer()
# =====
@aiotools.atomic_fg
@@ -296,11 +309,12 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
@contextlib.asynccontextmanager
async def read_image(self, name: str) -> AsyncGenerator[MsdFileReader, None]:
try:
async with self.__state._region: # pylint: disable=protected-access
with self.__state._region: # pylint: disable=protected-access
try:
async with self.__state._lock: # pylint: disable=protected-access
self.__notifier.notify()
self.__STATE_check_disconnected()
image = await self.__STATE_get_storage_image(name)
self.__reader = await MsdFileReader(
notifier=self.__notifier,
@@ -308,7 +322,10 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
path=image.path,
chunk_size=self.__read_chunk_size,
).open()
self.__notifier.notify()
yield self.__reader
finally:
await aiotools.shield_fg(self.__close_reader())
finally:
@@ -316,18 +333,40 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
@contextlib.asynccontextmanager
async def write_image(self, name: str, size: int, remove_incomplete: (bool | None)) -> AsyncGenerator[MsdFileWriter, None]:
image: (Image | None) = None
complete = False
async def finish_writing() -> None:
# Делаем под блокировкой, чтобы эвент айнотифи не был обработан
# до того, как мы не закончим все процедуры.
async with self.__state._lock: # pylint: disable=protected-access
try:
self.__notifier.notify()
finally:
try:
if image:
await image.set_complete(complete)
finally:
try:
if image and remove_incomplete and not complete:
await image.remove(fatal=False)
finally:
try:
await self.__close_writer()
finally:
if image:
await image.remount_rw(False, fatal=False)
try:
async with self.__state._region: # pylint: disable=protected-access
image: (Image | None) = None
with self.__state._region: # pylint: disable=protected-access
try:
async with self.__state._lock: # pylint: disable=protected-access
self.__notifier.notify()
self.__STATE_check_disconnected()
image = await self.__STORAGE_create_new_image(name)
image = await self.__STORAGE_create_new_image(name)
await image.remount_rw(True)
await image.set_complete(False)
self.__writer = await MsdFileWriter(
notifier=self.__notifier,
name=image.name,
@@ -339,22 +378,12 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
self.__notifier.notify()
yield self.__writer
await image.set_complete(self.__writer.is_complete())
complete = await self.__writer.finish()
finally:
try:
if image and remove_incomplete and self.__writer and not self.__writer.is_complete():
await image.remove(fatal=False)
finally:
try:
await aiotools.shield_fg(self.__close_writer())
finally:
if image:
await aiotools.shield_fg(image.remount_rw(False, fatal=False))
await aiotools.shield_fg(finish_writing())
finally:
# Между закрытием файла и эвентом айнотифи состояние может быть не обновлено,
# так что форсим обновление вручную, чтобы получить актуальное состояние.
await aiotools.shield_fg(self.__reload_state())
self.__notifier.notify()
@aiotools.atomic_fg
async def remove(self, name: str) -> None:
@@ -404,17 +433,26 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
async def __close_reader(self) -> None:
if self.__reader:
await self.__reader.close()
self.__reader = None
try:
await self.__reader.close()
finally:
self.__reader = None
async def __close_writer(self) -> None:
if self.__writer:
await self.__writer.close()
self.__writer = None
try:
await self.__writer.close()
finally:
self.__writer = None
# =====
async def __watch_inotify(self) -> None:
@aiotools.atomic_fg
async def cleanup(self) -> None:
await self.__close_reader()
await self.__close_writer()
async def systask(self) -> None:
logger = get_logger(0)
while True:
try:
@@ -426,19 +464,25 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
await asyncio.sleep(5)
with Inotify() as inotify:
await inotify.watch(InotifyMask.ALL_MODIFY_EVENTS, *self.__storage.get_watchable_paths())
await inotify.watch(InotifyMask.ALL_MODIFY_EVENTS, *self.__drive.get_watchable_paths())
# Из-за гонки между первым релоадом и установкой вотчеров,
# мы можем потерять какие-то каталоги стораджа, но это допустимо,
# так как всегда есть ручной перезапуск.
await inotify.watch_all_changes(*self.__storage.get_watchable_paths())
await inotify.watch_all_changes(*self.__drive.get_watchable_paths())
# После установки вотчеров еще раз проверяем стейт, чтобы ничего не потерять
# После установки вотчеров еще раз проверяем стейт,
# чтобы не потерять состояние привода.
await self.__reload_state()
while self.__state.vd: # Если живы после предыдущей проверки
need_restart = False
need_restart = self.__reset
self.__reset = False
need_reload_state = False
for event in (await inotify.get_series(timeout=1)):
need_reload_state = True
if event.mask & (InotifyMask.DELETE_SELF | InotifyMask.MOVE_SELF | InotifyMask.UNMOUNT | InotifyMask.ISDIR):
# Если выгрузили OTG, изменили каталоги, что-то отмонтировали или делают еще какую-то странную фигню
if event.restart:
# Если выгрузили OTG, изменили каталоги, что-то отмонтировали или делают еще какую-то странную фигню.
# Проверяется маска InotifyMask.ALL_RESTART_EVENTS
logger.info("Got a big inotify event: %s; reinitializing MSD ...", event)
need_restart = True
break
@@ -446,56 +490,71 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
break
if need_reload_state:
await self.__reload_state()
elif self.__writer:
# При загрузке файла обновляем статистику раз в секунду (по таймауту).
# Это не нужно при обычном релоаде, потому что там и так проверяются все разделы.
await self.__reload_parts_info()
except Exception:
logger.exception("Unexpected MSD watcher error")
time.sleep(1)
await asyncio.sleep(1)
async def __reload_state(self, notify: bool=True) -> None:
logger = get_logger(0)
async def __reload_state(self) -> None:
async with self.__state._lock: # pylint: disable=protected-access
try:
path = self.__drive.get_image_path()
drive_state = _DriveState(
image=((await self.__storage.make_image_by_path(path)) if path else None),
cdrom=self.__drive.get_cdrom_flag(),
rw=self.__drive.get_rw_flag(),
)
await self.__unsafe_reload_state()
self.__notifier.notify()
await self.__storage.reload()
async def __reload_parts_info(self) -> None:
assert self.__writer # Использовать только при записи образа
async with self.__state._lock: # pylint: disable=protected-access
await self.__storage.reload_parts_info()
self.__notifier.notify()
if self.__state.vd is None and drive_state.image is None:
# Если только что включились и образ не подключен - попробовать
# перемонтировать хранилище (и создать images и meta).
logger.info("Probing to remount storage ...")
await self.__storage.remount_rw(True)
await self.__storage.remount_rw(False)
await self.__setup_initial()
# ===== Don't call this directly ====
except Exception:
logger.exception("Error while reloading MSD state; switching to offline")
self.__state.storage = None
self.__state.vd = None
async def __unsafe_reload_state(self) -> None:
logger = get_logger(0)
try:
path = self.__drive.get_image_path()
drive_state = _DriveState(
image=((await self.__storage.make_image_by_path(path)) if path else None),
cdrom=self.__drive.get_cdrom_flag(),
rw=self.__drive.get_rw_flag(),
)
await self.__storage.reload()
if self.__state.vd is None and drive_state.image is None:
# Если только что включились и образ не подключен - попробовать
# перемонтировать хранилище (и создать images и meta).
logger.info("Probing to remount storage ...")
await self.__storage.remount_rw(True)
await self.__storage.remount_rw(False)
await self.__unsafe_setup_initial()
except Exception:
logger.exception("Error while reloading MSD state; switching to offline")
self.__state.storage = None
self.__state.vd = None
else:
self.__state.storage = self.__storage
if drive_state.image:
# При подключенном образе виртуальный стейт заменяется реальным
self.__state.vd = _VirtualDriveState.from_drive_state(drive_state)
else:
self.__state.storage = self.__storage
if drive_state.image:
# При подключенном образе виртуальный стейт заменяется реальным
if self.__state.vd is None:
# Если раньше MSD был отключен
self.__state.vd = _VirtualDriveState.from_drive_state(drive_state)
else:
if self.__state.vd is None:
# Если раньше MSD был отключен
self.__state.vd = _VirtualDriveState.from_drive_state(drive_state)
image = self.__state.vd.image
if image and (not image.in_storage or not (await image.exists())):
# Если только что отключили ручной образ вне хранилища или ранее выбранный образ был удален
self.__state.vd.image = None
image = self.__state.vd.image
if image and (not image.in_storage or not (await image.exists())):
# Если только что отключили ручной образ вне хранилища или ранее выбранный образ был удален
self.__state.vd.image = None
self.__state.vd.connected = False
if notify:
self.__notifier.notify()
self.__state.vd.connected = False
async def __setup_initial(self) -> None:
async def __unsafe_setup_initial(self) -> None:
if self.__initial_image:
logger = get_logger(0)
image = await self.__storage.make_image_by_name(self.__initial_image)