speed up msd

This commit is contained in:
Devaev Maxim
2021-05-26 12:26:31 +03:00
parent 5da412ae7c
commit e480629724
10 changed files with 66 additions and 17 deletions

View File

@@ -34,7 +34,6 @@ async def read(path: str) -> str:
return (await afile.read()) return (await afile.read())
async def afile_write_now(afile: aiofiles.base.AiofilesContextManager, data: bytes) -> None: async def afile_sync(afile: aiofiles.base.AiofilesContextManager) -> None:
await afile.write(data) # type: ignore
await afile.flush() # type: ignore await afile.flush() # type: ignore
await aiotools.run_async(os.fsync, afile.fileno()) # type: ignore await aiotools.run_async(os.fsync, afile.fileno()) # type: ignore

View File

@@ -331,7 +331,6 @@ def _get_config_scheme() -> Dict:
"unix_rm": Option(True, type=valid_bool), "unix_rm": Option(True, type=valid_bool),
"unix_mode": Option(0o660, type=valid_unix_mode), "unix_mode": Option(0o660, type=valid_unix_mode),
"heartbeat": Option(15.0, type=valid_float_f01), "heartbeat": Option(15.0, type=valid_float_f01),
"sync_chunk_size": Option(65536, type=functools.partial(valid_number, min=1024)),
"access_log_format": Option("[%P / %{X-Real-IP}i] '%r' => %s; size=%b ---" "access_log_format": Option("[%P / %{X-Real-IP}i] '%r' => %s; size=%b ---"
" referer='%{Referer}i'; user_agent='%{User-Agent}i'"), " referer='%{Referer}i'; user_agent='%{User-Agent}i'"),
}, },

View File

@@ -101,7 +101,6 @@ def main(argv: Optional[List[str]]=None) -> None:
), ),
heartbeat=config.server.heartbeat, heartbeat=config.server.heartbeat,
sync_chunk_size=config.server.sync_chunk_size,
keymap_path=config.hid.keymap, keymap_path=config.hid.keymap,
ignore_keys=config.hid.ignore_keys, ignore_keys=config.hid.ignore_keys,
@@ -109,6 +108,6 @@ def main(argv: Optional[List[str]]=None) -> None:
mouse_y_range=(config.hid.mouse_y_range.min, config.hid.mouse_y_range.max), mouse_y_range=(config.hid.mouse_y_range.min, config.hid.mouse_y_range.max),
stream_forever=config.streamer.forever, stream_forever=config.streamer.forever,
).run(**config.server._unpack(ignore=["heartbeat", "sync_chunk_size"])) ).run(**config.server._unpack(ignore=["heartbeat"]))
get_logger(0).info("Bye-bye") get_logger(0).info("Bye-bye")

View File

@@ -37,9 +37,8 @@ from ..http import get_multipart_field
# ====== # ======
class MsdApi: class MsdApi:
def __init__(self, msd: BaseMsd, sync_chunk_size: int) -> None: def __init__(self, msd: BaseMsd) -> None:
self.__msd = msd self.__msd = msd
self.__sync_chunk_size = sync_chunk_size
# ===== # =====
@@ -80,7 +79,7 @@ class MsdApi:
async with self.__msd.write_image(name): async with self.__msd.write_image(name):
logger.info("Writing image %r to MSD ...", name) logger.info("Writing image %r to MSD ...", name)
while True: while True:
chunk = await data_field.read_chunk(self.__sync_chunk_size) chunk = await data_field.read_chunk(self.__msd.get_upload_chunk_size())
if not chunk: if not chunk:
break break
written = await self.__msd.write_image_chunk(chunk) written = await self.__msd.write_image_chunk(chunk)

View File

@@ -158,7 +158,6 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
snapshoter: Snapshoter, snapshoter: Snapshoter,
heartbeat: float, heartbeat: float,
sync_chunk_size: int,
keymap_path: str, keymap_path: str,
ignore_keys: List[str], ignore_keys: List[str],
@@ -206,7 +205,7 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
UserGpioApi(user_gpio), UserGpioApi(user_gpio),
self.__hid_api, self.__hid_api,
AtxApi(atx), AtxApi(atx),
MsdApi(msd, sync_chunk_size), MsdApi(msd),
StreamerApi(streamer), StreamerApi(streamer),
ExportApi(info_manager, atx, user_gpio), ExportApi(info_manager, atx, user_gpio),
RedfishApi(info_manager, atx), RedfishApi(info_manager, atx),

View File

@@ -118,6 +118,9 @@ class BaseMsd(BasePlugin):
raise NotImplementedError() raise NotImplementedError()
yield yield
def get_upload_chunk_size(self) -> int:
raise NotImplementedError()
async def write_image_chunk(self, chunk: bytes) -> int: async def write_image_chunk(self, chunk: bytes) -> int:
raise NotImplementedError() raise NotImplementedError()

View File

@@ -75,6 +75,9 @@ class Plugin(BaseMsd):
raise MsdDisabledError() raise MsdDisabledError()
yield yield
def get_upload_chunk_size(self) -> int:
raise MsdDisabledError()
async def write_image_chunk(self, chunk: bytes) -> int: async def write_image_chunk(self, chunk: bytes) -> int:
raise MsdDisabledError() raise MsdDisabledError()

View File

@@ -24,6 +24,7 @@ import os
import asyncio import asyncio
import contextlib import contextlib
import dataclasses import dataclasses
import functools
import time import time
from typing import List from typing import List
@@ -42,6 +43,7 @@ from ....inotify import Inotify
from ....yamlconf import Option from ....yamlconf import Option
from ....validators.basic import valid_bool from ....validators.basic import valid_bool
from ....validators.basic import valid_number
from ....validators.os import valid_abs_dir from ....validators.os import valid_abs_dir
from ....validators.os import valid_printable_filename from ....validators.os import valid_printable_filename
from ....validators.os import valid_command from ....validators.os import valid_command
@@ -135,6 +137,9 @@ class _State:
class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
def __init__( # pylint: disable=super-init-not-called def __init__( # pylint: disable=super-init-not-called
self, self,
upload_chunk_size: int,
sync_chunk_size: int,
storage_path: str, storage_path: str,
remount_cmd: List[str], remount_cmd: List[str],
@@ -145,6 +150,9 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
gadget: str, # XXX: Not from options, see /kvmd/apps/kvmd/__init__.py for details gadget: str, # XXX: Not from options, see /kvmd/apps/kvmd/__init__.py for details
) -> None: ) -> None:
self.__upload_chunk_size = upload_chunk_size
self.__sync_chunk_size = sync_chunk_size
self.__storage_path = os.path.normpath(storage_path) self.__storage_path = os.path.normpath(storage_path)
self.__images_path = os.path.join(self.__storage_path, "images") self.__images_path = os.path.join(self.__storage_path, "images")
self.__meta_path = os.path.join(self.__storage_path, "meta") self.__meta_path = os.path.join(self.__storage_path, "meta")
@@ -159,6 +167,7 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
self.__new_file: Optional[aiofiles.base.AiofilesContextManager] = None self.__new_file: Optional[aiofiles.base.AiofilesContextManager] = None
self.__new_file_written = 0 self.__new_file_written = 0
self.__new_file_unsynced = 0
self.__new_file_tick = 0.0 self.__new_file_tick = 0.0
self.__notifier = aiotools.AioNotifier() self.__notifier = aiotools.AioNotifier()
@@ -172,9 +181,14 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
def get_plugin_options(cls) -> Dict: def get_plugin_options(cls) -> Dict:
sudo = ["/usr/bin/sudo", "--non-interactive"] sudo = ["/usr/bin/sudo", "--non-interactive"]
return { return {
"storage": Option("/var/lib/kvmd/msd", type=valid_abs_dir, unpack_as="storage_path"), "upload_chunk_size": Option(65536, type=functools.partial(valid_number, min=1024)),
"remount_cmd": Option([*sudo, "/usr/bin/kvmd-helper-otgmsd-remount", "{mode}"], type=valid_command), "sync_chunk_size": Option(4194304, type=functools.partial(valid_number, min=1024)),
"unlock_cmd": Option([*sudo, "/usr/bin/kvmd-helper-otgmsd-unlock", "unlock"], type=valid_command),
"storage": Option("/var/lib/kvmd/msd", type=valid_abs_dir, unpack_as="storage_path"),
"remount_cmd": Option([*sudo, "/usr/bin/kvmd-helper-otgmsd-remount", "{mode}"], type=valid_command),
"unlock_cmd": Option([*sudo, "/usr/bin/kvmd-helper-otgmsd-unlock", "unlock"], type=valid_command),
"initial": { "initial": {
"image": Option("", type=(lambda arg: (valid_printable_filename(arg) if arg else ""))), "image": Option("", type=(lambda arg: (valid_printable_filename(arg) if arg else ""))),
"cdrom": Option(False, type=valid_bool), "cdrom": Option(False, type=valid_bool),
@@ -313,6 +327,7 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
await self.__remount_storage(rw=True) await self.__remount_storage(rw=True)
self.__set_image_complete(name, False) self.__set_image_complete(name, False)
self.__new_file_written = 0 self.__new_file_written = 0
self.__new_file_unsynced = 0
self.__new_file = await aiofiles.open(path, mode="w+b", buffering=0) # type: ignore self.__new_file = await aiofiles.open(path, mode="w+b", buffering=0) # type: ignore
await self.__notifier.notify() await self.__notifier.notify()
@@ -331,10 +346,20 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
await self.__reload_state() await self.__reload_state()
await self.__notifier.notify() await self.__notifier.notify()
def get_upload_chunk_size(self) -> int:
return self.__upload_chunk_size
async def write_image_chunk(self, chunk: bytes) -> int: async def write_image_chunk(self, chunk: bytes) -> int:
assert self.__new_file assert self.__new_file
await aiofs.afile_write_now(self.__new_file, chunk)
await self.__new_file.write(chunk) # type: ignore
self.__new_file_written += len(chunk) self.__new_file_written += len(chunk)
self.__new_file_unsynced += len(chunk)
if self.__new_file_unsynced >= self.__sync_chunk_size:
await aiofs.afile_sync(self.__new_file)
self.__new_file_unsynced = 0
now = time.monotonic() now = time.monotonic()
if self.__new_file_tick + 1 < now: if self.__new_file_tick + 1 < now:
# Это нужно для ручного оповещения о свободном пространстве на диске, см. get_state() # Это нужно для ручного оповещения о свободном пространстве на диске, см. get_state()
@@ -377,6 +402,7 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
finally: finally:
self.__new_file = None self.__new_file = None
self.__new_file_written = 0 self.__new_file_written = 0
self.__new_file_unsynced = 0
# ===== # =====

View File

@@ -23,6 +23,7 @@
import asyncio import asyncio
import contextlib import contextlib
import dataclasses import dataclasses
import functools
from typing import Dict from typing import Dict
from typing import AsyncGenerator from typing import AsyncGenerator
@@ -39,6 +40,7 @@ from .... import aiofs
from ....yamlconf import Option from ....yamlconf import Option
from ....validators.basic import valid_bool from ....validators.basic import valid_bool
from ....validators.basic import valid_number
from ....validators.basic import valid_int_f1 from ....validators.basic import valid_int_f1
from ....validators.basic import valid_float_f01 from ....validators.basic import valid_float_f01
from ....validators.os import valid_abs_path from ....validators.os import valid_abs_path
@@ -61,8 +63,11 @@ from .drive import DeviceInfo
# ===== # =====
class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
def __init__( # pylint: disable=super-init-not-called def __init__( # pylint: disable=super-init-not-called,too-many-arguments
self, self,
upload_chunk_size: int,
sync_chunk_size: int,
gpio_device_path: str, gpio_device_path: str,
target_pin: int, target_pin: int,
reset_inverted: bool, reset_inverted: bool,
@@ -74,6 +79,9 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
reset_delay: float, reset_delay: float,
) -> None: ) -> None:
self.__upload_chunk_size = upload_chunk_size
self.__sync_chunk_size = sync_chunk_size
self.__device_path = device_path self.__device_path = device_path
self.__init_delay = init_delay self.__init_delay = init_delay
self.__init_retries = init_retries self.__init_retries = init_retries
@@ -85,6 +93,7 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
self.__device_file: Optional[aiofiles.base.AiofilesContextManager] = None self.__device_file: Optional[aiofiles.base.AiofilesContextManager] = None
self.__written = 0 self.__written = 0
self.__unsynced = 0
self.__notifier = aiotools.AioNotifier() self.__notifier = aiotools.AioNotifier()
self.__region = aiotools.AioExclusiveRegion(MsdIsBusyError, self.__notifier) self.__region = aiotools.AioExclusiveRegion(MsdIsBusyError, self.__notifier)
@@ -92,6 +101,9 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
@classmethod @classmethod
def get_plugin_options(cls) -> Dict: def get_plugin_options(cls) -> Dict:
return { return {
"upload_chunk_size": Option(65536, type=functools.partial(valid_number, min=1024)),
"sync_chunk_size": Option(4194304, type=functools.partial(valid_number, min=1024)),
"gpio_device": Option("/dev/gpiochip0", type=valid_abs_path, unpack_as="gpio_device_path"), "gpio_device": Option("/dev/gpiochip0", type=valid_abs_path, unpack_as="gpio_device_path"),
"target_pin": Option(-1, type=valid_gpio_pin), "target_pin": Option(-1, type=valid_gpio_pin),
"reset_pin": Option(-1, type=valid_gpio_pin), "reset_pin": Option(-1, type=valid_gpio_pin),
@@ -212,6 +224,7 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
self.__device_file = await aiofiles.open(self.__device_info.path, mode="w+b", buffering=0) # type: ignore self.__device_file = await aiofiles.open(self.__device_info.path, mode="w+b", buffering=0) # type: ignore
self.__written = 0 self.__written = 0
self.__unsynced = 0
await self.__write_image_info(name, complete=False) await self.__write_image_info(name, complete=False)
await self.__notifier.notify() await self.__notifier.notify()
@@ -221,10 +234,17 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
await self.__close_device_file() await self.__close_device_file()
await self.__load_device_info() await self.__load_device_info()
def get_upload_chunk_size(self) -> int:
return self.__upload_chunk_size
async def write_image_chunk(self, chunk: bytes) -> int: async def write_image_chunk(self, chunk: bytes) -> int:
assert self.__device_file assert self.__device_file
await aiofs.afile_write_now(self.__device_file, chunk) await self.__device_file.write(chunk) # type: ignore
self.__written += len(chunk) self.__written += len(chunk)
self.__unsynced += len(chunk)
if self.__unsynced >= self.__sync_chunk_size:
await aiofs.afile_sync(self.__device_file)
self.__unsynced = 0
return self.__written return self.__written
@aiotools.atomic @aiotools.atomic
@@ -261,6 +281,7 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
finally: finally:
self.__device_file = None self.__device_file = None
self.__written = 0 self.__written = 0
self.__unsynced = 0
async def __load_device_info(self) -> None: async def __load_device_info(self) -> None:
retries = self.__init_retries retries = self.__init_retries

View File

@@ -129,7 +129,8 @@ class DeviceInfo:
if self.size - image_info.size > _IMAGE_INFO_SIZE: if self.size - image_info.size > _IMAGE_INFO_SIZE:
await device_file.seek(self.size - _IMAGE_INFO_SIZE) # type: ignore await device_file.seek(self.size - _IMAGE_INFO_SIZE) # type: ignore
await aiofs.afile_write_now(device_file, image_info.to_bytes()) await device_file.write(image_info.to_bytes()) # type: ignore
await aiofs.afile_sync(device_file)
await device_file.seek(0) # type: ignore await device_file.seek(0) # type: ignore
return True return True
return False # Device is full return False # Device is full