mirror of
https://github.com/mofeng-git/One-KVM.git
synced 2025-12-12 01:00:29 +08:00
144 lines
5.4 KiB
Python
144 lines
5.4 KiB
Python
# ========================================================================== #
|
|
# #
|
|
# KVMD - The main PiKVM daemon. #
|
|
# #
|
|
# Copyright (C) 2018-2022 Maxim Devaev <mdevaev@gmail.com> #
|
|
# #
|
|
# This program is free software: you can redistribute it and/or modify #
|
|
# it under the terms of the GNU General Public License as published by #
|
|
# the Free Software Foundation, either version 3 of the License, or #
|
|
# (at your option) any later version. #
|
|
# #
|
|
# This program is distributed in the hope that it will be useful, #
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
|
|
# GNU General Public License for more details. #
|
|
# #
|
|
# You should have received a copy of the GNU General Public License #
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
|
|
# #
|
|
# ========================================================================== #
|
|
|
|
|
|
import os
|
|
import stat
|
|
import fcntl
|
|
import struct
|
|
import dataclasses
|
|
|
|
from typing import IO
|
|
from typing import Optional
|
|
|
|
from .... import aiotools
|
|
from .... import aiofs
|
|
|
|
from .. import MsdFileWriter
|
|
|
|
|
|
# =====
|
|
_IMAGE_INFO_SIZE = 4096
|
|
_IMAGE_INFO_MAGIC_SIZE = 16
|
|
_IMAGE_INFO_NAME_SIZE = 256
|
|
_IMAGE_INFO_PADS_SIZE = _IMAGE_INFO_SIZE - _IMAGE_INFO_NAME_SIZE - 1 - 8 - _IMAGE_INFO_MAGIC_SIZE * 8
|
|
_IMAGE_INFO_FORMAT = ">%dL%dc?Q%dx%dL" % (
|
|
_IMAGE_INFO_MAGIC_SIZE,
|
|
_IMAGE_INFO_NAME_SIZE,
|
|
_IMAGE_INFO_PADS_SIZE,
|
|
_IMAGE_INFO_MAGIC_SIZE,
|
|
)
|
|
_IMAGE_INFO_MAGIC = [0x1ACE1ACE] * _IMAGE_INFO_MAGIC_SIZE
|
|
|
|
|
|
# =====
|
|
@dataclasses.dataclass(frozen=True)
|
|
class ImageInfo:
|
|
name: str
|
|
size: int
|
|
complete: bool
|
|
|
|
@classmethod
|
|
def from_bytes(cls, data: bytes) -> Optional["ImageInfo"]:
|
|
try:
|
|
parsed = list(struct.unpack(_IMAGE_INFO_FORMAT, data))
|
|
except struct.error:
|
|
pass
|
|
else:
|
|
magic_begin = parsed[:_IMAGE_INFO_MAGIC_SIZE]
|
|
magic_end = parsed[-_IMAGE_INFO_MAGIC_SIZE:]
|
|
if magic_begin == magic_end == _IMAGE_INFO_MAGIC:
|
|
image_name_bytes = b"".join(parsed[
|
|
_IMAGE_INFO_MAGIC_SIZE # noqa: E203
|
|
:
|
|
_IMAGE_INFO_MAGIC_SIZE + _IMAGE_INFO_NAME_SIZE
|
|
])
|
|
return ImageInfo(
|
|
name=image_name_bytes.decode("utf-8", errors="ignore").strip("\x00").strip(),
|
|
size=parsed[_IMAGE_INFO_MAGIC_SIZE + _IMAGE_INFO_NAME_SIZE + 1],
|
|
complete=parsed[_IMAGE_INFO_MAGIC_SIZE + _IMAGE_INFO_NAME_SIZE],
|
|
)
|
|
return None
|
|
|
|
def to_bytes(self) -> bytes:
|
|
return struct.pack(
|
|
_IMAGE_INFO_FORMAT,
|
|
*_IMAGE_INFO_MAGIC,
|
|
*memoryview(( # type: ignore
|
|
self.name.encode("utf-8")
|
|
+ b"\x00" * _IMAGE_INFO_NAME_SIZE
|
|
)[:_IMAGE_INFO_NAME_SIZE]).cast("c"),
|
|
self.complete,
|
|
self.size,
|
|
*_IMAGE_INFO_MAGIC,
|
|
)
|
|
|
|
|
|
@dataclasses.dataclass(frozen=True)
|
|
class DeviceInfo:
|
|
path: str
|
|
size: int
|
|
free: int
|
|
image: Optional[ImageInfo]
|
|
|
|
@classmethod
|
|
async def read(cls, device_path: str) -> "DeviceInfo":
|
|
return (await aiotools.run_async(cls.__inner_read, device_path))
|
|
|
|
@classmethod
|
|
def __inner_read(cls, device_path: str) -> "DeviceInfo":
|
|
if not stat.S_ISBLK(os.stat(device_path).st_mode):
|
|
raise RuntimeError(f"Not a block device: {device_path}")
|
|
|
|
with open(device_path, "rb") as device_file:
|
|
# size = BLKGETSIZE * BLKSSZGET
|
|
size = _ioctl_uint32(device_file, 0x1260) * _ioctl_uint32(device_file, 0x1268)
|
|
device_file.seek(size - _IMAGE_INFO_SIZE)
|
|
image_info = ImageInfo.from_bytes(device_file.read())
|
|
|
|
return DeviceInfo(
|
|
path=device_path,
|
|
size=size,
|
|
free=(size - image_info.size if image_info else size),
|
|
image=image_info,
|
|
)
|
|
|
|
async def write_image_info(self, device_writer: MsdFileWriter, complete: bool) -> bool:
|
|
device_file = device_writer.get_file()
|
|
state = device_writer.get_state()
|
|
image_info = ImageInfo(state["name"], state["written"], complete)
|
|
|
|
if self.size - image_info.size > _IMAGE_INFO_SIZE:
|
|
await device_file.seek(self.size - _IMAGE_INFO_SIZE) # type: ignore
|
|
await device_file.write(image_info.to_bytes()) # type: ignore
|
|
await aiofs.afile_sync(device_file)
|
|
await device_file.seek(0) # type: ignore
|
|
return True
|
|
return False # Device is full
|
|
|
|
|
|
def _ioctl_uint32(device_file: IO, request: int) -> int:
|
|
buf = b"\0" * 4
|
|
buf = fcntl.ioctl(device_file.fileno(), request, buf) # type: ignore
|
|
result = struct.unpack("I", buf)[0]
|
|
assert result > 0, (device_file, request, buf)
|
|
return result
|