pikvm/pikvm#321: server-side uploading counters

This commit is contained in:
Devaev Maxim
2021-06-08 03:12:24 +03:00
parent cf08c04e55
commit b5ab5699c4
8 changed files with 115 additions and 95 deletions

View File

@@ -20,6 +20,7 @@
# ========================================================================== #
import os
import contextlib
from typing import Dict
@@ -27,6 +28,11 @@ from typing import Type
from typing import AsyncGenerator
from typing import Optional
import aiofiles
import aiofiles.base
from ... import aiofs
from ...errors import OperationError
from ...errors import IsBusyError
@@ -113,7 +119,7 @@ class BaseMsd(BasePlugin):
raise NotImplementedError()
@contextlib.asynccontextmanager
async def write_image(self, name: str) -> AsyncGenerator[None, None]: # pylint: disable=unused-argument
async def write_image(self, name: str, size: int) -> AsyncGenerator[None, None]: # pylint: disable=unused-argument
if self is not None: # XXX: Vulture and pylint hack
raise NotImplementedError()
yield
@@ -128,6 +134,52 @@ class BaseMsd(BasePlugin):
raise NotImplementedError()
class MsdImageWriter:
def __init__(self, path: str, size: int, sync: int) -> None:
self.__name = os.path.basename(path)
self.__path = path
self.__size = size
self.__sync = sync
self.__file: Optional[aiofiles.base.AiofilesContextManager] = None
self.__written = 0
self.__unsynced = 0
def get_file(self) -> aiofiles.base.AiofilesContextManager:
assert self.__file is not None
return self.__file
def get_state(self) -> Dict:
return {
"name": self.__name,
"size": self.__size,
"written": self.__written,
}
async def open(self) -> "MsdImageWriter":
assert self.__file is None
self.__file = await aiofiles.open(self.__path, mode="w+b", buffering=0) # type: ignore
return self
async def write(self, chunk: bytes) -> int:
assert self.__file is not None
await self.__file.write(chunk) # type: ignore
self.__written += len(chunk)
self.__unsynced += len(chunk)
if self.__unsynced >= self.__sync:
await aiofs.afile_sync(self.__file)
self.__unsynced = 0
return self.__written
async def close(self) -> None:
assert self.__file is not None
await aiofs.afile_sync(self.__file)
await self.__file.close() # type: ignore
# =====
def get_msd_class(name: str) -> Type[BaseMsd]:
return get_plugin_class("msd", name) # type: ignore

View File

@@ -70,7 +70,7 @@ class Plugin(BaseMsd):
raise MsdDisabledError()
@contextlib.asynccontextmanager
async def write_image(self, name: str) -> AsyncGenerator[None, None]:
async def write_image(self, name: str, size: int) -> AsyncGenerator[None, None]:
if self is not None: # XXX: Vulture and pylint hack
raise MsdDisabledError()
yield

View File

@@ -32,9 +32,6 @@ from typing import Dict
from typing import AsyncGenerator
from typing import Optional
import aiofiles
import aiofiles.base
from ....logging import get_logger
from ....inotify import InotifyMask
@@ -49,7 +46,6 @@ from ....validators.os import valid_printable_filename
from ....validators.os import valid_command
from .... import aiotools
from .... import aiofs
from .. import MsdError
from .. import MsdIsBusyError
@@ -60,6 +56,7 @@ from .. import MsdImageNotSelected
from .. import MsdUnknownImageError
from .. import MsdImageExistsError
from .. import BaseMsd
from .. import MsdImageWriter
from . import fs
from . import helpers
@@ -165,10 +162,8 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
self.__drive = Drive(gadget, instance=0, lun=0)
self.__new_file: Optional[aiofiles.base.AiofilesContextManager] = None
self.__new_file_written = 0
self.__new_file_unsynced = 0
self.__new_file_tick = 0.0
self.__new_writer: Optional[MsdImageWriter] = None
self.__new_writer_tick = 0.0
self.__notifier = aiotools.AioNotifier()
self.__state = _State(self.__notifier)
@@ -204,11 +199,14 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
del storage["images"][name]["path"]
del storage["images"][name]["in_storage"]
storage["uploading"] = bool(self.__new_file)
if self.__new_file: # При загрузке файла показываем размер вручную
if self.__new_writer:
# При загрузке файла показываем актуальную статистику вручную
storage["uploading"] = self.__new_writer.get_state()
space = fs.get_fs_space(self.__storage_path, fatal=False)
if space:
storage.update(dataclasses.asdict(space))
else:
storage["uploading"] = None
vd: Optional[Dict] = None
if self.__state.vd:
@@ -253,7 +251,7 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
@aiotools.atomic
async def cleanup(self) -> None:
await self.__close_new_file()
await self.__close_new_writer()
# =====
@@ -308,7 +306,7 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
self.__state.vd.connected = connected
@contextlib.asynccontextmanager
async def write_image(self, name: str) -> AsyncGenerator[None, None]:
async def write_image(self, name: str, size: int) -> AsyncGenerator[None, None]:
try:
async with self.__state._region: # pylint: disable=protected-access
try:
@@ -326,16 +324,15 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
await self.__remount_storage(rw=True)
self.__set_image_complete(name, False)
self.__new_file_written = 0
self.__new_file_unsynced = 0
self.__new_file = await aiofiles.open(path, mode="w+b", buffering=0) # type: ignore
self.__new_writer = await MsdImageWriter(path, size, self.__sync_chunk_size).open()
await self.__notifier.notify()
yield
self.__set_image_complete(name, True)
finally:
await self.__close_new_file()
await self.__close_new_writer()
try:
await self.__remount_storage(rw=False)
except Exception:
@@ -350,22 +347,14 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
return self.__upload_chunk_size
async def write_image_chunk(self, chunk: bytes) -> int:
assert self.__new_file
await self.__new_file.write(chunk) # type: ignore
self.__new_file_written += len(chunk)
self.__new_file_unsynced += len(chunk)
if self.__new_file_unsynced >= self.__sync_chunk_size:
await aiofs.afile_sync(self.__new_file)
self.__new_file_unsynced = 0
assert self.__new_writer
written = await self.__new_writer.write(chunk)
now = time.monotonic()
if self.__new_file_tick + 1 < now:
if self.__new_writer_tick + 1 < now:
# Это нужно для ручного оповещения о свободном пространстве на диске, см. get_state()
self.__new_file_tick = now
self.__new_writer_tick = now
await self.__notifier.notify()
return self.__new_file_written
return written
@aiotools.atomic
async def remove(self, name: str) -> None:
@@ -392,18 +381,15 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
# =====
async def __close_new_file(self) -> None:
async def __close_new_writer(self) -> None:
try:
if self.__new_file:
if self.__new_writer:
get_logger().info("Closing new image file ...")
await aiofs.afile_sync(self.__new_file)
await self.__new_file.close() # type: ignore
await self.__new_writer.close()
except Exception:
get_logger().exception("Can't close image file")
finally:
self.__new_file = None
self.__new_file_written = 0
self.__new_file_unsynced = 0
self.__new_writer = None
# =====

View File

@@ -29,13 +29,9 @@ from typing import Dict
from typing import AsyncGenerator
from typing import Optional
import aiofiles
import aiofiles.base
from ....logging import get_logger
from .... import aiotools
from .... import aiofs
from ....yamlconf import Option
@@ -54,10 +50,10 @@ from .. import MsdDisconnectedError
from .. import MsdMultiNotSupported
from .. import MsdCdromNotSupported
from .. import BaseMsd
from .. import MsdImageWriter
from .gpio import Gpio
from .drive import ImageInfo
from .drive import DeviceInfo
@@ -91,9 +87,7 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
self.__device_info: Optional[DeviceInfo] = None
self.__connected = False
self.__device_file: Optional[aiofiles.base.AiofilesContextManager] = None
self.__written = 0
self.__unsynced = 0
self.__device_writer: Optional[MsdImageWriter] = None
self.__notifier = aiotools.AioNotifier()
self.__region = aiotools.AioExclusiveRegion(MsdIsBusyError, self.__notifier)
@@ -132,7 +126,7 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
storage = {
"size": self.__device_info.size,
"free": self.__device_info.free,
"uploading": bool(self.__device_file)
"uploading": (self.__device_writer.get_state() if self.__device_writer else None),
}
drive = {
"image": (self.__device_info.image and dataclasses.asdict(self.__device_info.image)),
@@ -177,7 +171,7 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
@aiotools.atomic
async def cleanup(self) -> None:
try:
await self.__close_device_file()
await self.__close_device_writer()
finally:
self.__gpio.close()
@@ -214,7 +208,7 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
self.__connected = connected
@contextlib.asynccontextmanager
async def write_image(self, name: str) -> AsyncGenerator[None, None]:
async def write_image(self, name: str, size: int) -> AsyncGenerator[None, None]:
async with self.__working():
async with self.__region:
try:
@@ -222,30 +216,22 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
if self.__connected:
raise MsdConnectedError()
self.__device_file = await aiofiles.open(self.__device_info.path, mode="w+b", buffering=0) # type: ignore
self.__written = 0
self.__unsynced = 0
self.__device_writer = await MsdImageWriter(self.__device_info.path, size, self.__sync_chunk_size).open()
await self.__write_image_info(name, complete=False)
await self.__write_image_info(False)
await self.__notifier.notify()
yield
await self.__write_image_info(name, complete=True)
await self.__write_image_info(True)
finally:
await self.__close_device_file()
await self.__close_device_writer()
await self.__load_device_info()
def get_upload_chunk_size(self) -> int:
return self.__upload_chunk_size
async def write_image_chunk(self, chunk: bytes) -> int:
assert self.__device_file
await self.__device_file.write(chunk) # type: ignore
self.__written += len(chunk)
self.__unsynced += len(chunk)
if self.__unsynced >= self.__sync_chunk_size:
await aiofs.afile_sync(self.__device_file)
self.__unsynced = 0
return self.__written
assert self.__device_writer
return (await self.__device_writer.write(chunk))
@aiotools.atomic
async def remove(self, name: str) -> None:
@@ -262,27 +248,21 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
# =====
async def __write_image_info(self, name: str, complete: bool) -> None:
assert self.__device_file
async def __write_image_info(self, complete: bool) -> None:
assert self.__device_writer
assert self.__device_info
if not (await self.__device_info.write_image_info(
device_file=self.__device_file,
image_info=ImageInfo(name, self.__written, complete),
)):
if not (await self.__device_info.write_image_info(self.__device_writer, complete)):
get_logger().error("Can't write image info because device is full")
async def __close_device_file(self) -> None:
async def __close_device_writer(self) -> None:
try:
if self.__device_file:
if self.__device_writer:
get_logger().info("Closing device file ...")
await aiofs.afile_sync(self.__device_file)
await self.__device_file.close() # type: ignore
await self.__device_writer.close() # type: ignore
except Exception:
get_logger().exception("Can't close device file")
finally:
self.__device_file = None
self.__written = 0
self.__unsynced = 0
self.__device_writer = None
async def __load_device_info(self) -> None:
retries = self.__init_retries

View File

@@ -29,11 +29,11 @@ import dataclasses
from typing import IO
from typing import Optional
import aiofiles.base
from .... import aiotools
from .... import aiofs
from .. import MsdImageWriter
# =====
_IMAGE_INFO_SIZE = 4096
@@ -121,11 +121,10 @@ class DeviceInfo:
image=image_info,
)
async def write_image_info(
self,
device_file: aiofiles.base.AiofilesContextManager,
image_info: ImageInfo,
) -> bool:
async def write_image_info(self, device_writer: MsdImageWriter, complete: bool) -> bool:
device_file = device_writer.get_file()
state = device_writer.get_state()
image_info = ImageInfo(state["name"], state["written"], complete)
if self.size - image_info.size > _IMAGE_INFO_SIZE:
await device_file.seek(self.size - _IMAGE_INFO_SIZE) # type: ignore