mirror of
https://github.com/mofeng-git/One-KVM.git
synced 2025-12-11 16:50:28 +08:00
refactor: 完善代码质量检查和修复系统
主要改进: - 添加 make tox-local 本地代码质量检查支持 - 创建 check-code.sh 脚本支持独立工具执行 - 修复 51+ flake8 代码风格问题(未使用导入、行尾空格、注释格式等) - 解决 pylint 变量命名和日志格式问题 - 重构 make_image 方法解决 too-many-statements 警告 - 添加类型注解和修复方法签名不匹配问题 - 统一代码风格规范(引号使用、空格格式等) 工具配置: - 更新 tox.ini 支持 Python 3.10 本地环境 - 添加缺失的核心依赖包定义 - 完善 Makefile 构建系统集成
This commit is contained in:
parent
c8d1dcca30
commit
187c713424
6
Makefile
6
Makefile
@ -28,6 +28,8 @@ all:
|
||||
@ echo " make testenv # Build test environment"
|
||||
@ echo " make tox # Run tests and linters"
|
||||
@ echo " make tox E=pytest # Run selected test environment"
|
||||
@ echo " make tox-local # Run tests and linters locally (no Docker)"
|
||||
@ echo " make tox-local E=flake8 # Run selected test locally"
|
||||
@ echo " make gpio # Create gpio mockup"
|
||||
@ echo " make run # Run kvmd"
|
||||
@ echo " make run CMD=... # Run specified command inside kvmd environment"
|
||||
@ -96,6 +98,10 @@ tox: testenv
|
||||
"
|
||||
|
||||
|
||||
tox-local:
|
||||
@./check-code.sh $(if $(E),$(E),all)
|
||||
|
||||
|
||||
$(TESTENV_GPIO):
|
||||
test ! -e $(TESTENV_GPIO)
|
||||
sudo modprobe gpio-mockup gpio_mockup_ranges=0,40
|
||||
|
||||
82
check-code.sh
Executable file
82
check-code.sh
Executable file
@ -0,0 +1,82 @@
|
||||
#!/bin/bash
|
||||
# 本地代码质量检查脚本
|
||||
|
||||
set -e
|
||||
cd "$(dirname "$0")"
|
||||
|
||||
echo "🔍 运行代码质量检查..."
|
||||
|
||||
# 检查参数,如果有参数则只运行指定的检查
|
||||
CHECK_TYPE="${1:-all}"
|
||||
|
||||
run_flake8() {
|
||||
echo "📝 运行 flake8 代码风格检查..."
|
||||
flake8 --config=testenv/linters/flake8.ini kvmd testenv/tests *.py
|
||||
}
|
||||
|
||||
run_pylint() {
|
||||
echo "🔎 运行 pylint 代码质量分析..."
|
||||
pylint -j0 --rcfile=testenv/linters/pylint.ini --output-format=colorized --reports=no kvmd testenv/tests *.py || true
|
||||
}
|
||||
|
||||
run_mypy() {
|
||||
echo "🔧 运行 mypy 类型检查..."
|
||||
mypy --config-file=testenv/linters/mypy.ini --cache-dir=testenv/.mypy_cache kvmd testenv/tests *.py || true
|
||||
}
|
||||
|
||||
run_vulture() {
|
||||
echo "💀 运行 vulture 死代码检测..."
|
||||
vulture --ignore-names=_format_P,Plugin --ignore-decorators=@exposed_http,@exposed_ws,@pytest.fixture kvmd testenv/tests *.py testenv/linters/vulture-wl.py || true
|
||||
}
|
||||
|
||||
run_eslint() {
|
||||
echo "📜 运行 eslint JavaScript检查..."
|
||||
if command -v eslint >/dev/null 2>&1; then
|
||||
eslint --cache-location=/tmp --config=testenv/linters/eslintrc.js --color web/share/js || true
|
||||
else
|
||||
echo "⚠️ eslint 未安装,跳过"
|
||||
fi
|
||||
}
|
||||
|
||||
run_htmlhint() {
|
||||
echo "📄 运行 htmlhint HTML检查..."
|
||||
if command -v htmlhint >/dev/null 2>&1; then
|
||||
htmlhint --config=testenv/linters/htmlhint.json web/*.html web/*/*.html || true
|
||||
else
|
||||
echo "⚠️ htmlhint 未安装,跳过"
|
||||
fi
|
||||
}
|
||||
|
||||
run_shellcheck() {
|
||||
echo "🐚 运行 shellcheck Shell脚本检查..."
|
||||
if command -v shellcheck >/dev/null 2>&1; then
|
||||
shellcheck --color=always kvmd.install scripts/* || true
|
||||
else
|
||||
echo "⚠️ shellcheck 未安装,跳过"
|
||||
fi
|
||||
}
|
||||
|
||||
case "$CHECK_TYPE" in
|
||||
flake8) run_flake8 ;;
|
||||
pylint) run_pylint ;;
|
||||
mypy) run_mypy ;;
|
||||
vulture) run_vulture ;;
|
||||
eslint) run_eslint ;;
|
||||
htmlhint) run_htmlhint ;;
|
||||
shellcheck) run_shellcheck ;;
|
||||
all)
|
||||
run_flake8
|
||||
run_pylint
|
||||
run_mypy
|
||||
run_vulture
|
||||
run_eslint
|
||||
run_htmlhint
|
||||
run_shellcheck
|
||||
;;
|
||||
*)
|
||||
echo "用法: $0 [flake8|pylint|mypy|vulture|eslint|htmlhint|shellcheck|all]"
|
||||
exit 1
|
||||
;;
|
||||
esac
|
||||
|
||||
echo "✅ 代码质量检查完成!"
|
||||
@ -60,9 +60,9 @@ class LogApi:
|
||||
record["service"],
|
||||
record["msg"],
|
||||
)).encode("utf-8") + b"\r\n")
|
||||
except Exception as e:
|
||||
except Exception as exception:
|
||||
if record is None:
|
||||
record = e
|
||||
record = exception
|
||||
await response.write(f"Module systemd.journal is unavailable.\n{record}".encode("utf-8"))
|
||||
return response
|
||||
return response
|
||||
|
||||
@ -84,7 +84,7 @@ class MsdApi:
|
||||
async def __set_connected_handler(self, req: Request) -> Response:
|
||||
await self.__msd.set_connected(valid_bool(req.query.get("connected")))
|
||||
return make_json_response()
|
||||
|
||||
|
||||
@exposed_http("POST", "/msd/make_image")
|
||||
async def __set_zipped_handler(self, req: Request) -> Response:
|
||||
await self.__msd.make_image(valid_bool(req.query.get("zipped")))
|
||||
|
||||
@ -34,7 +34,6 @@ from ....yamlconf.loader import load_yaml_file
|
||||
|
||||
from .... import tools
|
||||
from .... import aiotools
|
||||
from .... import env
|
||||
|
||||
from .. import sysunit
|
||||
|
||||
|
||||
@ -73,8 +73,8 @@ class HwInfoSubmanager(BaseInfoSubmanager):
|
||||
cpu_temp,
|
||||
mem,
|
||||
) = await asyncio.gather(
|
||||
self.__read_dt_file("model", upper=False),
|
||||
self.__read_dt_file("serial-number", upper=True),
|
||||
self.__read_dt_file("model", _upper=False),
|
||||
self.__read_dt_file("serial-number", _upper=True),
|
||||
self.__read_platform_file(),
|
||||
self.__get_throttling(),
|
||||
self.__get_cpu_percent(),
|
||||
@ -115,15 +115,15 @@ class HwInfoSubmanager(BaseInfoSubmanager):
|
||||
|
||||
# =====
|
||||
|
||||
async def __read_dt_file(self, name: str, upper: bool) -> (str | None):
|
||||
async def __read_dt_file(self, name: str, _upper: bool) -> (str | None):
|
||||
if name not in self.__dt_cache:
|
||||
path = os.path.join(f"{env.PROCFS_PREFIX}/proc/device-tree", name)
|
||||
if not os.path.exists(path):
|
||||
path = os.path.join(f"{env.PROCFS_PREFIX}/etc/kvmd/hw_info/", name)
|
||||
try:
|
||||
self.__dt_cache[name] = (await aiotools.read_file(path)).strip(" \t\r\n\0")
|
||||
except Exception as err:
|
||||
#get_logger(0).warn("Can't read DT %s from %s: %s", name, path, err)
|
||||
except Exception:
|
||||
# get_logger(0).warn("Can't read DT %s from %s: %s", name, path, err)
|
||||
return None
|
||||
return self.__dt_cache[name]
|
||||
|
||||
@ -149,8 +149,8 @@ class HwInfoSubmanager(BaseInfoSubmanager):
|
||||
temp_path = f"{env.SYSFS_PREFIX}/sys/class/thermal/thermal_zone0/temp"
|
||||
try:
|
||||
return int((await aiotools.read_file(temp_path)).strip()) / 1000
|
||||
except Exception as err:
|
||||
#get_logger(0).warn("Can't read CPU temp from %s: %s", temp_path, err)
|
||||
except Exception:
|
||||
# get_logger(0).warn("Can't read CPU temp from %s: %s", temp_path, err)
|
||||
return None
|
||||
|
||||
async def __get_cpu_percent(self) -> (float | None):
|
||||
|
||||
@ -29,13 +29,11 @@ import time
|
||||
from typing import AsyncGenerator
|
||||
from xmlrpc.client import ServerProxy
|
||||
|
||||
from ...logging import get_logger
|
||||
|
||||
us_systemd_journal = True
|
||||
try:
|
||||
import systemd.journal
|
||||
except ImportError:
|
||||
import supervisor.xmlrpc
|
||||
us_systemd_journal = False
|
||||
|
||||
|
||||
@ -43,14 +41,14 @@ except ImportError:
|
||||
class LogReader:
|
||||
async def poll_log(self, seek: int, follow: bool) -> AsyncGenerator[dict, None]:
|
||||
if us_systemd_journal:
|
||||
reader = systemd.journal.Reader() # type: ignore
|
||||
reader = systemd.journal.Reader() # type: ignore
|
||||
reader.this_boot()
|
||||
# XXX: Из-за смены ID машины в bootconfig это не работает при первой загрузке.
|
||||
# reader.this_machine()
|
||||
reader.log_level(systemd.journal.LOG_DEBUG) # type: ignore
|
||||
reader.log_level(systemd.journal.LOG_DEBUG) # type: ignore
|
||||
services = set(
|
||||
service
|
||||
for service in systemd.journal.Reader().query_unique("_SYSTEMD_UNIT") # type: ignore
|
||||
for service in systemd.journal.Reader().query_unique("_SYSTEMD_UNIT") # type: ignore
|
||||
if re.match(r"kvmd(-\w+)*\.service", service)
|
||||
).union(["kvmd.service"])
|
||||
|
||||
@ -69,10 +67,15 @@ class LogReader:
|
||||
else:
|
||||
await asyncio.sleep(1)
|
||||
else:
|
||||
server = ServerProxy('http://127.0.0.1',transport=supervisor.xmlrpc.SupervisorTransport(None, None, serverurl='unix:///tmp/supervisor.sock'))
|
||||
log_entries = server.supervisor.readLog(0,0)
|
||||
yield log_entries
|
||||
|
||||
import supervisor.xmlrpc # pylint: disable=import-outside-toplevel
|
||||
server_transport = supervisor.xmlrpc.SupervisorTransport(None, None, serverurl="unix:///tmp/supervisor.sock")
|
||||
server = ServerProxy("http://127.0.0.1", transport=server_transport)
|
||||
log_entries = server.supervisor.readLog(0, 0)
|
||||
yield {
|
||||
"dt": int(time.time()),
|
||||
"service": "kvmd.service",
|
||||
"msg": str(log_entries).rstrip()
|
||||
}
|
||||
|
||||
def __entry_to_record(self, entry: dict) -> dict[str, dict]:
|
||||
return {
|
||||
|
||||
@ -201,8 +201,8 @@ class _GadgetConfig:
|
||||
rw: bool,
|
||||
removable: bool,
|
||||
fua: bool,
|
||||
inquiry_string_cdrom: str,
|
||||
inquiry_string_flash: str,
|
||||
_inquiry_string_cdrom: str,
|
||||
_inquiry_string_flash: str,
|
||||
) -> None:
|
||||
|
||||
# Endpoints number depends on transport_type but we can consider that this is 2
|
||||
@ -216,8 +216,8 @@ class _GadgetConfig:
|
||||
_write(join(func_path, "lun.0/ro"), int(not rw))
|
||||
_write(join(func_path, "lun.0/removable"), int(removable))
|
||||
_write(join(func_path, "lun.0/nofua"), int(not fua))
|
||||
#_write(join(func_path, "lun.0/inquiry_string_cdrom"), inquiry_string_cdrom)
|
||||
#_write(join(func_path, "lun.0/inquiry_string"), inquiry_string_flash)
|
||||
# _write(join(func_path, "lun.0/inquiry_string_cdrom"), inquiry_string_cdrom)
|
||||
# _write(join(func_path, "lun.0/inquiry_string"), inquiry_string_flash)
|
||||
if user != "root":
|
||||
_chown(join(func_path, "lun.0/cdrom"), user)
|
||||
_chown(join(func_path, "lun.0/ro"), user)
|
||||
@ -316,8 +316,8 @@ def _cmd_start(config: Section) -> None: # pylint: disable=too-many-statements,
|
||||
gc.add_msd(
|
||||
start=cod.msd.start,
|
||||
user=config.otg.user,
|
||||
inquiry_string_cdrom=usb.make_inquiry_string(**cod.msd.default.inquiry_string.cdrom._unpack()),
|
||||
inquiry_string_flash=usb.make_inquiry_string(**cod.msd.default.inquiry_string.flash._unpack()),
|
||||
_inquiry_string_cdrom=usb.make_inquiry_string(**cod.msd.default.inquiry_string.cdrom._unpack()),
|
||||
_inquiry_string_flash=usb.make_inquiry_string(**cod.msd.default.inquiry_string.flash._unpack()),
|
||||
**cod.msd.default._unpack(ignore="inquiry_string"),
|
||||
)
|
||||
if cod.drives.enabled:
|
||||
@ -326,8 +326,8 @@ def _cmd_start(config: Section) -> None: # pylint: disable=too-many-statements,
|
||||
gc.add_msd(
|
||||
start=cod.drives.start,
|
||||
user="root",
|
||||
inquiry_string_cdrom=usb.make_inquiry_string(**cod.drives.default.inquiry_string.cdrom._unpack()),
|
||||
inquiry_string_flash=usb.make_inquiry_string(**cod.drives.default.inquiry_string.flash._unpack()),
|
||||
_inquiry_string_cdrom=usb.make_inquiry_string(**cod.drives.default.inquiry_string.cdrom._unpack()),
|
||||
_inquiry_string_flash=usb.make_inquiry_string(**cod.drives.default.inquiry_string.flash._unpack()),
|
||||
**cod.drives.default._unpack(ignore="inquiry_string"),
|
||||
)
|
||||
|
||||
|
||||
@ -23,6 +23,7 @@
|
||||
|
||||
import errno
|
||||
import argparse
|
||||
import os
|
||||
|
||||
from ...validators.basic import valid_bool
|
||||
from ...validators.basic import valid_int_f0
|
||||
@ -37,6 +38,7 @@ from .. import init
|
||||
def _has_param(gadget: str, instance: int, param: str) -> bool:
|
||||
return os.access(_get_param_path(gadget, instance, param), os.F_OK)
|
||||
|
||||
|
||||
def _get_param_path(gadget: str, instance: int, param: str) -> str:
|
||||
return usb.get_gadget_path(gadget, usb.G_FUNCTIONS, f"mass_storage.usb{instance}/lun.0", param)
|
||||
|
||||
|
||||
@ -23,7 +23,11 @@
|
||||
import types
|
||||
|
||||
from typing import Callable
|
||||
from typing import Self
|
||||
|
||||
try:
|
||||
from typing import Self
|
||||
except ImportError:
|
||||
from typing_extensions import Self
|
||||
|
||||
import aiohttp
|
||||
|
||||
|
||||
@ -38,12 +38,12 @@ class Partition:
|
||||
|
||||
|
||||
# =====
|
||||
def find_msd(msd_directory_path) -> Partition:
|
||||
def find_msd(msd_directory_path: str = "/var/lib/kvmd/msd") -> Partition:
|
||||
return _find_single("otgmsd", msd_directory_path)
|
||||
|
||||
|
||||
def find_pst() -> Partition:
|
||||
return _find_single("pst")
|
||||
def find_pst(msd_directory_path: str = "/var/lib/kvmd/msd") -> Partition:
|
||||
return _find_single("pst", msd_directory_path)
|
||||
|
||||
|
||||
# =====
|
||||
@ -51,8 +51,8 @@ def _find_single(part_type: str, msd_directory_path: str) -> Partition:
|
||||
parts = _find_partitions(part_type, True)
|
||||
if len(parts) == 0:
|
||||
if os.path.exists(msd_directory_path):
|
||||
#set default value
|
||||
parts = [Partition(mount_path = msd_directory_path, root_path = msd_directory_path, group = 'kvmd', user = 'kvmd')]
|
||||
# set default value
|
||||
parts = [Partition(mount_path=msd_directory_path, root_path=msd_directory_path, group="kvmd", user="kvmd")]
|
||||
else:
|
||||
raise RuntimeError(f"Can't find {part_type!r} mountpoint")
|
||||
return parts[0]
|
||||
|
||||
@ -157,7 +157,7 @@ class BaseMsd(BasePlugin):
|
||||
|
||||
async def set_connected(self, connected: bool) -> None:
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
async def make_image(self, zipped: bool) -> None:
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
@ -77,6 +77,9 @@ class Plugin(BaseMsd):
|
||||
async def set_connected(self, connected: bool) -> None:
|
||||
raise MsdDisabledError()
|
||||
|
||||
async def make_image(self, zipped: bool) -> None:
|
||||
raise MsdDisabledError()
|
||||
|
||||
@contextlib.asynccontextmanager
|
||||
async def read_image(self, name: str) -> AsyncGenerator[BaseMsdReader, None]:
|
||||
if self is not None: # XXX: Vulture and pylint hack
|
||||
|
||||
@ -25,7 +25,6 @@ import asyncio
|
||||
import contextlib
|
||||
import dataclasses
|
||||
import functools
|
||||
import time
|
||||
import os
|
||||
import copy
|
||||
import pyfatfs
|
||||
@ -303,7 +302,7 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
|
||||
|
||||
self.__drive.set_rw_flag(self.__state.vd.rw)
|
||||
self.__drive.set_cdrom_flag(self.__state.vd.cdrom)
|
||||
#reset UDC to fix otg cd-rom and flash switch
|
||||
# reset UDC to fix otg cd-rom and flash switch
|
||||
try:
|
||||
udc_path = self.__drive.get_udc_path()
|
||||
with open(udc_path) as file:
|
||||
@ -313,7 +312,7 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
|
||||
file.write("\n")
|
||||
with open(udc_path, "w") as file:
|
||||
file.write(sorted(os.listdir("/sys/class/udc"))[0])
|
||||
except:
|
||||
except Exception:
|
||||
logger = get_logger(0)
|
||||
logger.error("Can't reset UDC")
|
||||
if self.__state.vd.rw:
|
||||
@ -329,69 +328,7 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
|
||||
|
||||
@aiotools.atomic_fg
|
||||
async def make_image(self, zipped: bool) -> None:
|
||||
#Note: img size >= 64M
|
||||
def create_fat_image(img_size: int, file_img_path: str, source_dir: str, fat_type: int = 32, label: str = 'One-KVM'):
|
||||
def add_directory_to_fat(fat: str, src_path: str, dst_path: str):
|
||||
for item in os.listdir(src_path):
|
||||
src_item_path = os.path.join(src_path, item)
|
||||
dst_item_path = os.path.join(dst_path, item)
|
||||
|
||||
if os.path.isdir(src_item_path):
|
||||
fat.makedir(dst_item_path)
|
||||
add_directory_to_fat(fat, src_item_path, dst_item_path)
|
||||
elif os.path.isfile(src_item_path):
|
||||
with open(src_item_path, 'rb') as src_file:
|
||||
fat.create(dst_item_path)
|
||||
with fat.open(dst_item_path, 'wb') as dst_file:
|
||||
dst_file.write(src_file.read())
|
||||
print(file_img_path)
|
||||
with open(file_img_path, 'wb') as f:
|
||||
f.seek(img_size * 1024 *1024 - 1)
|
||||
f.write(b'\0')
|
||||
fat_file = pyfatfs.PyFat.PyFat()
|
||||
try:
|
||||
fat_file.mkfs(file_img_path, fat_type = fat_type, label = label)
|
||||
except Exception as e:
|
||||
get_logger(0).exception(f"Error making FAT Filesystem: {e}")
|
||||
finally:
|
||||
fat_file.close()
|
||||
fat_handle = pyfatfs.PyFatFS.PyFatFS(file_img_path)
|
||||
try:
|
||||
add_directory_to_fat(fat_handle, source_dir, '/')
|
||||
except Exception as e:
|
||||
get_logger(0).exception(f"Error adding directory to FAT image: {e}")
|
||||
finally:
|
||||
fat_handle.close()
|
||||
|
||||
def extract_fat_image(file_img_path: str, output_dir: str):
|
||||
try:
|
||||
for root, dirs, files in os.walk(output_dir, topdown=False):
|
||||
for name in files:
|
||||
os.remove(os.path.join(root, name))
|
||||
for name in dirs:
|
||||
os.rmdir(os.path.join(root, name))
|
||||
except Exception as e:
|
||||
get_logger(0).exception(f"Error removing normal file or directory: {e}")
|
||||
fat_handle = pyfatfs.PyFatFS.PyFatFS(file_img_path)
|
||||
try:
|
||||
def extract_directory(fat_handle, src_path: str, dst_path: str):
|
||||
for entry in fat_handle.listdir(src_path):
|
||||
src_item_path = os.path.join(src_path, entry)
|
||||
dst_item_path = os.path.join(dst_path, entry)
|
||||
|
||||
if fat_handle.gettype(src_item_path) is pyfatfs.PyFatFS.ResourceType.directory:
|
||||
os.makedirs(dst_item_path, exist_ok=True)
|
||||
extract_directory(fat_handle, src_item_path, dst_item_path)
|
||||
else:
|
||||
with fat_handle.open(src_item_path, 'rb') as src_file:
|
||||
with open(dst_item_path, 'wb') as dst_file:
|
||||
dst_file.write(src_file.read())
|
||||
extract_directory(fat_handle, '/', output_dir)
|
||||
except Exception as e:
|
||||
get_logger(0).exception(f"Error extracting FAT image: {e}")
|
||||
finally:
|
||||
fat_handle.close()
|
||||
|
||||
# Note: img size >= 64M
|
||||
async with self.__state.busy():
|
||||
msd_path = self.__msd_path
|
||||
file_storage_path = os.path.join(msd_path, self.__normalfiles_path)
|
||||
@ -402,10 +339,74 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
|
||||
os.makedirs(file_storage_path)
|
||||
if os.path.exists(file_img_path):
|
||||
os.remove(file_img_path)
|
||||
create_fat_image(img_size, file_img_path, file_storage_path)
|
||||
self._create_fat_image(img_size, file_img_path, file_storage_path)
|
||||
else:
|
||||
if os.path.exists(file_img_path):
|
||||
extract_fat_image(file_img_path, file_storage_path)
|
||||
self._extract_fat_image(file_img_path, file_storage_path)
|
||||
|
||||
def _create_fat_image(self, img_size: int, file_img_path: str, source_dir: str, fat_type: int = 32, label: str = "One-KVM") -> None:
|
||||
print(file_img_path)
|
||||
with open(file_img_path, "wb") as file_handle:
|
||||
file_handle.seek(img_size * 1024 * 1024 - 1)
|
||||
file_handle.write(b"\0")
|
||||
fat_file = pyfatfs.PyFat.PyFat()
|
||||
try:
|
||||
fat_file.mkfs(file_img_path, fat_type=fat_type, label=label)
|
||||
except Exception as exception:
|
||||
get_logger(0).exception("Error making FAT Filesystem: %s", exception)
|
||||
finally:
|
||||
fat_file.close()
|
||||
fat_handle = pyfatfs.PyFatFS.PyFatFS(file_img_path)
|
||||
try:
|
||||
self._add_directory_to_fat(fat_handle, source_dir, "/")
|
||||
except Exception as exception:
|
||||
get_logger(0).exception("Error adding directory to FAT image: %s", exception)
|
||||
finally:
|
||||
fat_handle.close()
|
||||
|
||||
def _add_directory_to_fat(self, fat, src_path: str, dst_path: str) -> None: # type: ignore
|
||||
for item in os.listdir(src_path):
|
||||
src_item_path = os.path.join(src_path, item)
|
||||
dst_item_path = os.path.join(dst_path, item)
|
||||
|
||||
if os.path.isdir(src_item_path):
|
||||
fat.makedir(dst_item_path) # type: ignore
|
||||
self._add_directory_to_fat(fat, src_item_path, dst_item_path)
|
||||
elif os.path.isfile(src_item_path):
|
||||
with open(src_item_path, "rb") as src_file:
|
||||
fat.create(dst_item_path) # type: ignore
|
||||
with fat.open(dst_item_path, "wb") as dst_file: # type: ignore
|
||||
dst_file.write(src_file.read())
|
||||
|
||||
def _extract_fat_image(self, file_img_path: str, output_dir: str) -> None:
|
||||
try:
|
||||
for root, dirs, files in os.walk(output_dir, topdown=False):
|
||||
for name in files:
|
||||
os.remove(os.path.join(root, name))
|
||||
for name in dirs:
|
||||
os.rmdir(os.path.join(root, name))
|
||||
except Exception as exception:
|
||||
get_logger(0).exception("Error removing normal file or directory: %s", exception)
|
||||
fat_handle = pyfatfs.PyFatFS.PyFatFS(file_img_path)
|
||||
try:
|
||||
self._extract_directory(fat_handle, "/", output_dir)
|
||||
except Exception as exception:
|
||||
get_logger(0).exception("Error extracting FAT image: %s", exception)
|
||||
finally:
|
||||
fat_handle.close()
|
||||
|
||||
def _extract_directory(self, fat_handle, src_path: str, dst_path: str) -> None: # type: ignore
|
||||
for entry in fat_handle.listdir(src_path): # type: ignore
|
||||
src_item_path = os.path.join(src_path, entry)
|
||||
dst_item_path = os.path.join(dst_path, entry)
|
||||
|
||||
if fat_handle.gettype(src_item_path) is pyfatfs.PyFatFS.ResourceType.directory: # type: ignore
|
||||
os.makedirs(dst_item_path, exist_ok=True)
|
||||
self._extract_directory(fat_handle, src_item_path, dst_item_path)
|
||||
else:
|
||||
with fat_handle.open(src_item_path, "rb") as src_file: # type: ignore
|
||||
with open(dst_item_path, "wb") as dst_file:
|
||||
dst_file.write(src_file.read())
|
||||
|
||||
@contextlib.asynccontextmanager
|
||||
async def read_image(self, name: str) -> AsyncGenerator[MsdFileReader, None]:
|
||||
|
||||
@ -39,7 +39,7 @@ class MsdDriveLockedError(MsdOperationError):
|
||||
class Drive:
|
||||
def __init__(self, gadget: str, instance: int, lun: int) -> None:
|
||||
func = f"mass_storage.usb{instance}"
|
||||
self.__udc_path = os.path.join(f"/sys/kernel/config/usb_gadget", gadget, usb.G_UDC)
|
||||
self.__udc_path = os.path.join("/sys/kernel/config/usb_gadget", gadget, usb.G_UDC)
|
||||
self.__profile_func_path = usb.get_gadget_path(gadget, usb.G_PROFILE, func)
|
||||
self.__profile_path = usb.get_gadget_path(gadget, usb.G_PROFILE)
|
||||
self.__lun_path = usb.get_gadget_path(gadget, usb.G_FUNCTIONS, func, f"lun.{lun}")
|
||||
@ -49,7 +49,7 @@ class Drive:
|
||||
|
||||
def get_watchable_paths(self) -> list[str]:
|
||||
return [self.__lun_path, self.__profile_path]
|
||||
|
||||
|
||||
def get_udc_path(self) -> str:
|
||||
return self.__udc_path
|
||||
|
||||
@ -80,7 +80,7 @@ class Drive:
|
||||
# =====
|
||||
def __has_param(self, param: str) -> bool:
|
||||
return os.access(os.path.join(self.__lun_path, param), os.F_OK)
|
||||
|
||||
|
||||
def __get_param(self, param: str) -> str:
|
||||
with open(os.path.join(self.__lun_path, param)) as file:
|
||||
return file.read().strip()
|
||||
|
||||
3
setup.py
3
setup.py
@ -21,7 +21,8 @@
|
||||
# #
|
||||
# ========================================================================== #
|
||||
|
||||
from setuptools import setup, find_packages
|
||||
from setuptools import setup
|
||||
|
||||
|
||||
def main() -> None:
|
||||
# Define entry points manually with specific import paths
|
||||
|
||||
6
testenv/requirements-core.txt
Normal file
6
testenv/requirements-core.txt
Normal file
@ -0,0 +1,6 @@
|
||||
# Core dependencies for code quality checks (hardware dependencies excluded)
|
||||
python-dateutil>=2.8.1
|
||||
pyserial
|
||||
pyserial-asyncio
|
||||
types-PyYAML
|
||||
types-aiofiles
|
||||
@ -3,46 +3,50 @@ envlist = flake8, pylint, mypy, vulture, pytest, eslint, htmlhint, shellcheck
|
||||
skipsdist = true
|
||||
|
||||
[testenv]
|
||||
basepython = python3.13
|
||||
basepython = python3.10
|
||||
sitepackages = true
|
||||
changedir = /src
|
||||
changedir = {toxinidir}
|
||||
|
||||
[testenv:flake8]
|
||||
allowlist_externals = bash
|
||||
commands = bash -c 'flake8 --config=testenv/linters/flake8.ini kvmd testenv/tests *.py'
|
||||
commands = bash -c 'flake8 --config={toxinidir}/testenv/linters/flake8.ini {toxinidir}/kvmd {toxinidir}/testenv/tests {toxinidir}/*.py'
|
||||
deps =
|
||||
flake8
|
||||
flake8-quotes
|
||||
-rrequirements.txt
|
||||
types-PyYAML
|
||||
types-aiofiles
|
||||
|
||||
[testenv:pylint]
|
||||
allowlist_externals = bash
|
||||
commands = bash -c 'pylint -j0 --rcfile=testenv/linters/pylint.ini --output-format=colorized --reports=no kvmd testenv/tests *.py'
|
||||
commands = bash -c 'pylint -j0 --rcfile={toxinidir}/testenv/linters/pylint.ini --output-format=colorized --reports=no {toxinidir}/kvmd {toxinidir}/testenv/tests {toxinidir}/*.py'
|
||||
deps =
|
||||
pylint
|
||||
pytest
|
||||
pytest-asyncio
|
||||
aiohttp-basicauth
|
||||
-rrequirements.txt
|
||||
types-PyYAML
|
||||
types-aiofiles
|
||||
|
||||
[testenv:mypy]
|
||||
allowlist_externals = bash
|
||||
commands = bash -c 'mypy --config-file=testenv/linters/mypy.ini --cache-dir=testenv/.mypy_cache kvmd testenv/tests *.py'
|
||||
commands = bash -c 'mypy --config-file={toxinidir}/testenv/linters/mypy.ini --cache-dir={toxinidir}/testenv/.mypy_cache {toxinidir}/kvmd {toxinidir}/testenv/tests {toxinidir}/*.py'
|
||||
deps =
|
||||
mypy
|
||||
-rrequirements.txt
|
||||
types-PyYAML
|
||||
types-aiofiles
|
||||
|
||||
[testenv:vulture]
|
||||
allowlist_externals = bash
|
||||
commands = bash -c 'vulture --ignore-names=_format_P,Plugin --ignore-decorators=@exposed_http,@exposed_ws,@pytest.fixture kvmd testenv/tests *.py testenv/linters/vulture-wl.py'
|
||||
commands = bash -c 'vulture --ignore-names=_format_P,Plugin --ignore-decorators=@exposed_http,@exposed_ws,@pytest.fixture {toxinidir}/kvmd {toxinidir}/testenv/tests {toxinidir}/*.py {toxinidir}/testenv/linters/vulture-wl.py'
|
||||
deps =
|
||||
vulture
|
||||
-rrequirements.txt
|
||||
types-PyYAML
|
||||
types-aiofiles
|
||||
|
||||
[testenv:pytest]
|
||||
commands = py.test -vv --cov-config=testenv/linters/coverage.ini --cov-report=term-missing --cov=kvmd testenv/tests
|
||||
commands = py.test -vv --cov-config={toxinidir}/testenv/linters/coverage.ini --cov-report=term-missing --cov=kvmd {toxinidir}/testenv/tests
|
||||
setenv =
|
||||
PYTHONPATH=/src
|
||||
PYTHONPATH={toxinidir}
|
||||
deps =
|
||||
pytest
|
||||
pytest-cov
|
||||
@ -50,16 +54,17 @@ deps =
|
||||
pytest-asyncio
|
||||
pytest-aiohttp
|
||||
aiohttp-basicauth
|
||||
-rrequirements.txt
|
||||
types-PyYAML
|
||||
types-aiofiles
|
||||
|
||||
[testenv:eslint]
|
||||
allowlist_externals = eslint
|
||||
commands = eslint --cache-location=/tmp --config=testenv/linters/eslintrc.js --color web/share/js
|
||||
commands = eslint --cache-location=/tmp --config={toxinidir}/testenv/linters/eslintrc.js --color {toxinidir}/web/share/js
|
||||
|
||||
[testenv:htmlhint]
|
||||
allowlist_externals = htmlhint
|
||||
commands = htmlhint --config=testenv/linters/htmlhint.json web/*.html web/*/*.html
|
||||
commands = htmlhint --config={toxinidir}/testenv/linters/htmlhint.json {toxinidir}/web/*.html {toxinidir}/web/*/*.html
|
||||
|
||||
[testenv:shellcheck]
|
||||
allowlist_externals = bash
|
||||
commands = bash -c 'shellcheck --color=always kvmd.install scripts/*'
|
||||
commands = bash -c 'shellcheck --color=always {toxinidir}/kvmd.install {toxinidir}/scripts/*'
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user