mirror of
https://github.com/mofeng-git/One-KVM.git
synced 2025-12-13 01:30:31 +08:00
refactoring
This commit is contained in:
parent
b7e0ee3300
commit
16ad64db88
@ -20,185 +20,42 @@
|
||||
# ========================================================================== #
|
||||
|
||||
|
||||
import os
|
||||
import stat
|
||||
import fcntl
|
||||
import struct
|
||||
import asyncio
|
||||
import contextlib
|
||||
import dataclasses
|
||||
|
||||
from typing import Dict
|
||||
from typing import IO
|
||||
from typing import AsyncGenerator
|
||||
from typing import Optional
|
||||
|
||||
import aiofiles
|
||||
import aiofiles.base
|
||||
import gpiod
|
||||
|
||||
from ...logging import get_logger
|
||||
from ....logging import get_logger
|
||||
|
||||
from ... import env
|
||||
from ... import aiotools
|
||||
from ... import aiofs
|
||||
from ... import aiogp
|
||||
from .... import aiotools
|
||||
from .... import aiofs
|
||||
|
||||
from ...yamlconf import Option
|
||||
from ....yamlconf import Option
|
||||
|
||||
from ...validators.basic import valid_int_f1
|
||||
from ...validators.basic import valid_float_f01
|
||||
from ...validators.os import valid_abs_path
|
||||
from ...validators.hw import valid_gpio_pin
|
||||
from ....validators.basic import valid_int_f1
|
||||
from ....validators.basic import valid_float_f01
|
||||
from ....validators.os import valid_abs_path
|
||||
from ....validators.hw import valid_gpio_pin
|
||||
|
||||
from . import MsdError
|
||||
from . import MsdIsBusyError
|
||||
from . import MsdOfflineError
|
||||
from . import MsdConnectedError
|
||||
from . import MsdDisconnectedError
|
||||
from . import MsdMultiNotSupported
|
||||
from . import MsdCdromNotSupported
|
||||
from . import BaseMsd
|
||||
from .. import MsdError
|
||||
from .. import MsdIsBusyError
|
||||
from .. import MsdOfflineError
|
||||
from .. import MsdConnectedError
|
||||
from .. import MsdDisconnectedError
|
||||
from .. import MsdMultiNotSupported
|
||||
from .. import MsdCdromNotSupported
|
||||
from .. import BaseMsd
|
||||
|
||||
from .gpio import Gpio
|
||||
|
||||
# =====
|
||||
@dataclasses.dataclass(frozen=True)
|
||||
class _ImageInfo:
|
||||
name: str
|
||||
size: int
|
||||
complete: bool
|
||||
|
||||
|
||||
@dataclasses.dataclass(frozen=True)
|
||||
class _DeviceInfo:
|
||||
path: str
|
||||
size: int
|
||||
free: int
|
||||
image: Optional[_ImageInfo]
|
||||
|
||||
|
||||
_IMAGE_INFO_SIZE = 4096
|
||||
_IMAGE_INFO_MAGIC_SIZE = 16
|
||||
_IMAGE_INFO_NAME_SIZE = 256
|
||||
_IMAGE_INFO_PADS_SIZE = _IMAGE_INFO_SIZE - _IMAGE_INFO_NAME_SIZE - 1 - 8 - _IMAGE_INFO_MAGIC_SIZE * 8
|
||||
_IMAGE_INFO_FORMAT = ">%dL%dc?Q%dx%dL" % (
|
||||
_IMAGE_INFO_MAGIC_SIZE,
|
||||
_IMAGE_INFO_NAME_SIZE,
|
||||
_IMAGE_INFO_PADS_SIZE,
|
||||
_IMAGE_INFO_MAGIC_SIZE,
|
||||
)
|
||||
_IMAGE_INFO_MAGIC = [0x1ACE1ACE] * _IMAGE_INFO_MAGIC_SIZE
|
||||
|
||||
|
||||
def _make_image_info_bytes(name: str, size: int, complete: bool) -> bytes:
|
||||
return struct.pack(
|
||||
_IMAGE_INFO_FORMAT,
|
||||
*_IMAGE_INFO_MAGIC,
|
||||
*memoryview(( # type: ignore
|
||||
name.encode("utf-8")
|
||||
+ b"\x00" * _IMAGE_INFO_NAME_SIZE
|
||||
)[:_IMAGE_INFO_NAME_SIZE]).cast("c"),
|
||||
complete,
|
||||
size,
|
||||
*_IMAGE_INFO_MAGIC,
|
||||
)
|
||||
|
||||
|
||||
def _parse_image_info_bytes(data: bytes) -> Optional[_ImageInfo]:
|
||||
try:
|
||||
parsed = list(struct.unpack(_IMAGE_INFO_FORMAT, data))
|
||||
except struct.error:
|
||||
pass
|
||||
else:
|
||||
magic_begin = parsed[:_IMAGE_INFO_MAGIC_SIZE]
|
||||
magic_end = parsed[-_IMAGE_INFO_MAGIC_SIZE:]
|
||||
if magic_begin == magic_end == _IMAGE_INFO_MAGIC:
|
||||
image_name_bytes = b"".join(parsed[
|
||||
_IMAGE_INFO_MAGIC_SIZE # noqa: E203
|
||||
:
|
||||
_IMAGE_INFO_MAGIC_SIZE + _IMAGE_INFO_NAME_SIZE
|
||||
])
|
||||
return _ImageInfo(
|
||||
name=image_name_bytes.decode("utf-8", errors="ignore").strip("\x00").strip(),
|
||||
size=parsed[_IMAGE_INFO_MAGIC_SIZE + _IMAGE_INFO_NAME_SIZE + 1],
|
||||
complete=parsed[_IMAGE_INFO_MAGIC_SIZE + _IMAGE_INFO_NAME_SIZE],
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
def _ioctl_uint32(device_file: IO, request: int) -> int:
|
||||
buf = b"\0" * 4
|
||||
buf = fcntl.ioctl(device_file.fileno(), request, buf) # type: ignore
|
||||
result = struct.unpack("I", buf)[0]
|
||||
assert result > 0, (device_file, request, buf)
|
||||
return result
|
||||
|
||||
|
||||
def _explore_device(device_path: str) -> _DeviceInfo:
|
||||
if not stat.S_ISBLK(os.stat(device_path).st_mode):
|
||||
raise RuntimeError(f"Not a block device: {device_path}")
|
||||
|
||||
with open(device_path, "rb") as device_file:
|
||||
# size = BLKGETSIZE * BLKSSZGET
|
||||
size = _ioctl_uint32(device_file, 0x1260) * _ioctl_uint32(device_file, 0x1268)
|
||||
device_file.seek(size - _IMAGE_INFO_SIZE)
|
||||
image_info = _parse_image_info_bytes(device_file.read())
|
||||
|
||||
return _DeviceInfo(
|
||||
path=device_path,
|
||||
size=size,
|
||||
free=(size - image_info.size if image_info else size),
|
||||
image=image_info,
|
||||
)
|
||||
|
||||
|
||||
class _Gpio:
|
||||
def __init__(
|
||||
self,
|
||||
target_pin: int,
|
||||
reset_pin: int,
|
||||
reset_delay: float,
|
||||
) -> None:
|
||||
|
||||
self.__target_pin = target_pin
|
||||
self.__reset_pin = reset_pin
|
||||
self.__reset_delay = reset_delay
|
||||
|
||||
self.__chip: Optional[gpiod.Chip] = None
|
||||
self.__target_line: Optional[gpiod.Line] = None
|
||||
self.__reset_line: Optional[gpiod.Line] = None
|
||||
|
||||
def open(self) -> None:
|
||||
assert self.__chip is None
|
||||
assert self.__target_line is None
|
||||
assert self.__reset_line is None
|
||||
|
||||
self.__chip = gpiod.Chip(env.GPIO_DEVICE_PATH)
|
||||
|
||||
self.__target_line = self.__chip.get_line(self.__target_pin)
|
||||
self.__target_line.request("kvmd::msd::target", gpiod.LINE_REQ_DIR_OUT, default_vals=[0])
|
||||
|
||||
self.__reset_line = self.__chip.get_line(self.__reset_pin)
|
||||
self.__reset_line.request("kvmd::msd::reset", gpiod.LINE_REQ_DIR_OUT, default_vals=[0])
|
||||
|
||||
def close(self) -> None:
|
||||
if self.__chip:
|
||||
try:
|
||||
self.__chip.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def switch_to_local(self) -> None:
|
||||
assert self.__target_line
|
||||
self.__target_line.set_value(0)
|
||||
|
||||
def switch_to_server(self) -> None:
|
||||
assert self.__target_line
|
||||
self.__target_line.set_value(1)
|
||||
|
||||
async def reset(self) -> None:
|
||||
assert self.__reset_line
|
||||
await aiogp.pulse(self.__reset_line, self.__reset_delay, 0)
|
||||
from .drive import ImageInfo
|
||||
from .drive import DeviceInfo
|
||||
|
||||
|
||||
# =====
|
||||
@ -218,9 +75,9 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
|
||||
self.__init_delay = init_delay
|
||||
self.__init_retries = init_retries
|
||||
|
||||
self.__gpio = _Gpio(target_pin, reset_pin, reset_delay)
|
||||
self.__gpio = Gpio(target_pin, reset_pin, reset_delay)
|
||||
|
||||
self.__device_info: Optional[_DeviceInfo] = None
|
||||
self.__device_info: Optional[DeviceInfo] = None
|
||||
self.__connected = False
|
||||
|
||||
self.__device_file: Optional[aiofiles.base.AiofilesContextManager] = None
|
||||
@ -390,11 +247,10 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
|
||||
async def __write_image_info(self, name: str, complete: bool) -> None:
|
||||
assert self.__device_file
|
||||
assert self.__device_info
|
||||
if self.__device_info.size - self.__written > _IMAGE_INFO_SIZE:
|
||||
await self.__device_file.seek(self.__device_info.size - _IMAGE_INFO_SIZE) # type: ignore
|
||||
await aiofs.afile_write_now(self.__device_file, _make_image_info_bytes(name, self.__written, complete))
|
||||
await self.__device_file.seek(0) # type: ignore
|
||||
else:
|
||||
if not self.__device_info.write_image_info(
|
||||
device_file=self.__device_file,
|
||||
image_info=ImageInfo(name, self.__written, complete),
|
||||
):
|
||||
get_logger().error("Can't write image info because device is full")
|
||||
|
||||
async def __close_device_file(self) -> None:
|
||||
@ -413,7 +269,7 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
|
||||
while True:
|
||||
await asyncio.sleep(self.__init_delay)
|
||||
try:
|
||||
self.__device_info = await aiotools.run_async(_explore_device, self.__device_path)
|
||||
self.__device_info = await DeviceInfo.read(self.__device_path)
|
||||
break
|
||||
except Exception:
|
||||
if retries == 0:
|
||||
143
kvmd/plugins/msd/relay/drive.py
Normal file
143
kvmd/plugins/msd/relay/drive.py
Normal file
@ -0,0 +1,143 @@
|
||||
# ========================================================================== #
|
||||
# #
|
||||
# KVMD - The main Pi-KVM daemon. #
|
||||
# #
|
||||
# Copyright (C) 2018 Maxim Devaev <mdevaev@gmail.com> #
|
||||
# #
|
||||
# This program is free software: you can redistribute it and/or modify #
|
||||
# it under the terms of the GNU General Public License as published by #
|
||||
# the Free Software Foundation, either version 3 of the License, or #
|
||||
# (at your option) any later version. #
|
||||
# #
|
||||
# This program is distributed in the hope that it will be useful, #
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
|
||||
# GNU General Public License for more details. #
|
||||
# #
|
||||
# You should have received a copy of the GNU General Public License #
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
|
||||
# #
|
||||
# ========================================================================== #
|
||||
|
||||
|
||||
import os
|
||||
import stat
|
||||
import fcntl
|
||||
import struct
|
||||
import dataclasses
|
||||
|
||||
from typing import IO
|
||||
from typing import Optional
|
||||
|
||||
import aiofiles.base
|
||||
|
||||
from .... import aiotools
|
||||
from .... import aiofs
|
||||
|
||||
|
||||
# =====
|
||||
_IMAGE_INFO_SIZE = 4096
|
||||
_IMAGE_INFO_MAGIC_SIZE = 16
|
||||
_IMAGE_INFO_NAME_SIZE = 256
|
||||
_IMAGE_INFO_PADS_SIZE = _IMAGE_INFO_SIZE - _IMAGE_INFO_NAME_SIZE - 1 - 8 - _IMAGE_INFO_MAGIC_SIZE * 8
|
||||
_IMAGE_INFO_FORMAT = ">%dL%dc?Q%dx%dL" % (
|
||||
_IMAGE_INFO_MAGIC_SIZE,
|
||||
_IMAGE_INFO_NAME_SIZE,
|
||||
_IMAGE_INFO_PADS_SIZE,
|
||||
_IMAGE_INFO_MAGIC_SIZE,
|
||||
)
|
||||
_IMAGE_INFO_MAGIC = [0x1ACE1ACE] * _IMAGE_INFO_MAGIC_SIZE
|
||||
|
||||
|
||||
# =====
|
||||
@dataclasses.dataclass(frozen=True)
|
||||
class ImageInfo:
|
||||
name: str
|
||||
size: int
|
||||
complete: bool
|
||||
|
||||
@classmethod
|
||||
def from_bytes(cls, data: bytes) -> Optional["ImageInfo"]:
|
||||
try:
|
||||
parsed = list(struct.unpack(_IMAGE_INFO_FORMAT, data))
|
||||
except struct.error:
|
||||
pass
|
||||
else:
|
||||
magic_begin = parsed[:_IMAGE_INFO_MAGIC_SIZE]
|
||||
magic_end = parsed[-_IMAGE_INFO_MAGIC_SIZE:]
|
||||
if magic_begin == magic_end == _IMAGE_INFO_MAGIC:
|
||||
image_name_bytes = b"".join(parsed[
|
||||
_IMAGE_INFO_MAGIC_SIZE # noqa: E203
|
||||
:
|
||||
_IMAGE_INFO_MAGIC_SIZE + _IMAGE_INFO_NAME_SIZE
|
||||
])
|
||||
return ImageInfo(
|
||||
name=image_name_bytes.decode("utf-8", errors="ignore").strip("\x00").strip(),
|
||||
size=parsed[_IMAGE_INFO_MAGIC_SIZE + _IMAGE_INFO_NAME_SIZE + 1],
|
||||
complete=parsed[_IMAGE_INFO_MAGIC_SIZE + _IMAGE_INFO_NAME_SIZE],
|
||||
)
|
||||
return None
|
||||
|
||||
def to_bytes(self) -> bytes:
|
||||
return struct.pack(
|
||||
_IMAGE_INFO_FORMAT,
|
||||
*_IMAGE_INFO_MAGIC,
|
||||
*memoryview(( # type: ignore
|
||||
self.name.encode("utf-8")
|
||||
+ b"\x00" * _IMAGE_INFO_NAME_SIZE
|
||||
)[:_IMAGE_INFO_NAME_SIZE]).cast("c"),
|
||||
self.complete,
|
||||
self.size,
|
||||
*_IMAGE_INFO_MAGIC,
|
||||
)
|
||||
|
||||
|
||||
@dataclasses.dataclass(frozen=True)
|
||||
class DeviceInfo:
|
||||
path: str
|
||||
size: int
|
||||
free: int
|
||||
image: Optional[ImageInfo]
|
||||
|
||||
@classmethod
|
||||
async def read(cls, device_path: str) -> "DeviceInfo":
|
||||
return (await aiotools.run_async(cls.__inner_read, device_path))
|
||||
|
||||
@classmethod
|
||||
def __inner_read(cls, device_path: str) -> "DeviceInfo":
|
||||
if not stat.S_ISBLK(os.stat(device_path).st_mode):
|
||||
raise RuntimeError(f"Not a block device: {device_path}")
|
||||
|
||||
with open(device_path, "rb") as device_file:
|
||||
# size = BLKGETSIZE * BLKSSZGET
|
||||
size = _ioctl_uint32(device_file, 0x1260) * _ioctl_uint32(device_file, 0x1268)
|
||||
device_file.seek(size - _IMAGE_INFO_SIZE)
|
||||
image_info = ImageInfo.from_bytes(device_file.read())
|
||||
|
||||
return DeviceInfo(
|
||||
path=device_path,
|
||||
size=size,
|
||||
free=(size - image_info.size if image_info else size),
|
||||
image=image_info,
|
||||
)
|
||||
|
||||
async def write_image_info(
|
||||
self,
|
||||
device_file: aiofiles.base.AiofilesContextManager,
|
||||
image_info: ImageInfo,
|
||||
) -> bool:
|
||||
|
||||
if self.size - image_info.size > _IMAGE_INFO_SIZE:
|
||||
await device_file.seek(self.size - _IMAGE_INFO_SIZE) # type: ignore
|
||||
await aiofs.afile_write_now(device_file, image_info.to_bytes())
|
||||
await device_file.seek(0) # type: ignore
|
||||
return True
|
||||
return False # Device is full
|
||||
|
||||
|
||||
def _ioctl_uint32(device_file: IO, request: int) -> int:
|
||||
buf = b"\0" * 4
|
||||
buf = fcntl.ioctl(device_file.fileno(), request, buf) # type: ignore
|
||||
result = struct.unpack("I", buf)[0]
|
||||
assert result > 0, (device_file, request, buf)
|
||||
return result
|
||||
78
kvmd/plugins/msd/relay/gpio.py
Normal file
78
kvmd/plugins/msd/relay/gpio.py
Normal file
@ -0,0 +1,78 @@
|
||||
# ========================================================================== #
|
||||
# #
|
||||
# KVMD - The main Pi-KVM daemon. #
|
||||
# #
|
||||
# Copyright (C) 2018 Maxim Devaev <mdevaev@gmail.com> #
|
||||
# #
|
||||
# This program is free software: you can redistribute it and/or modify #
|
||||
# it under the terms of the GNU General Public License as published by #
|
||||
# the Free Software Foundation, either version 3 of the License, or #
|
||||
# (at your option) any later version. #
|
||||
# #
|
||||
# This program is distributed in the hope that it will be useful, #
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
|
||||
# GNU General Public License for more details. #
|
||||
# #
|
||||
# You should have received a copy of the GNU General Public License #
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
|
||||
# #
|
||||
# ========================================================================== #
|
||||
|
||||
|
||||
from typing import Optional
|
||||
|
||||
import gpiod
|
||||
|
||||
from .... import env
|
||||
from .... import aiogp
|
||||
|
||||
|
||||
# =====
|
||||
class Gpio:
|
||||
def __init__(
|
||||
self,
|
||||
target_pin: int,
|
||||
reset_pin: int,
|
||||
reset_delay: float,
|
||||
) -> None:
|
||||
|
||||
self.__target_pin = target_pin
|
||||
self.__reset_pin = reset_pin
|
||||
self.__reset_delay = reset_delay
|
||||
|
||||
self.__chip: Optional[gpiod.Chip] = None
|
||||
self.__target_line: Optional[gpiod.Line] = None
|
||||
self.__reset_line: Optional[gpiod.Line] = None
|
||||
|
||||
def open(self) -> None:
|
||||
assert self.__chip is None
|
||||
assert self.__target_line is None
|
||||
assert self.__reset_line is None
|
||||
|
||||
self.__chip = gpiod.Chip(env.GPIO_DEVICE_PATH)
|
||||
|
||||
self.__target_line = self.__chip.get_line(self.__target_pin)
|
||||
self.__target_line.request("kvmd::msd::target", gpiod.LINE_REQ_DIR_OUT, default_vals=[0])
|
||||
|
||||
self.__reset_line = self.__chip.get_line(self.__reset_pin)
|
||||
self.__reset_line.request("kvmd::msd::reset", gpiod.LINE_REQ_DIR_OUT, default_vals=[0])
|
||||
|
||||
def close(self) -> None:
|
||||
if self.__chip:
|
||||
try:
|
||||
self.__chip.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def switch_to_local(self) -> None:
|
||||
assert self.__target_line
|
||||
self.__target_line.set_value(0)
|
||||
|
||||
def switch_to_server(self) -> None:
|
||||
assert self.__target_line
|
||||
self.__target_line.set_value(1)
|
||||
|
||||
async def reset(self) -> None:
|
||||
assert self.__reset_line
|
||||
await aiogp.pulse(self.__reset_line, self.__reset_delay, 0)
|
||||
Loading…
x
Reference in New Issue
Block a user