mirror of
https://github.com/mofeng-git/One-KVM.git
synced 2026-01-29 00:51:53 +08:00
improved otg msd
This commit is contained in:
@@ -21,44 +21,87 @@
|
||||
|
||||
|
||||
import sys
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
import dataclasses
|
||||
import subprocess
|
||||
|
||||
|
||||
# ====
|
||||
_MOUNT_PATH = "/bin/mount"
|
||||
_FSTAB_PATH = "/etc/fstab"
|
||||
_OPTION = "X-kvmd.otg-msd"
|
||||
|
||||
|
||||
# =====
|
||||
def _find_mountpoint() -> str:
|
||||
@dataclasses.dataclass(frozen=True)
|
||||
class _Storage:
|
||||
mount_path: str
|
||||
root_path: str
|
||||
user: str
|
||||
|
||||
|
||||
# =====
|
||||
def _log(msg: str) -> None:
|
||||
print(msg, file=sys.stderr)
|
||||
|
||||
|
||||
def _find_storage() -> _Storage:
|
||||
with open(_FSTAB_PATH) as fstab_file:
|
||||
for line in fstab_file.read().split("\n"):
|
||||
line = line.strip()
|
||||
if line and not line.startswith("#"):
|
||||
parts = line.split()
|
||||
if len(parts) == 6:
|
||||
options = parts[3].split(",")
|
||||
if _OPTION in options:
|
||||
return parts[1]
|
||||
raise SystemExit(f"Can't find {_OPTION!r} mountpoint in {_FSTAB_PATH}")
|
||||
options = dict(re.findall(r"X-kvmd\.otgmsd-(root|user)=([^,]+)", parts[3]))
|
||||
if options:
|
||||
return _Storage(
|
||||
mount_path=parts[1],
|
||||
root_path=options.get("root", ""),
|
||||
user=options.get("user", ""),
|
||||
)
|
||||
raise RuntimeError(f"Can't find MSD mountpoint in {_FSTAB_PATH}")
|
||||
|
||||
|
||||
def _remount(path: str, ro: bool) -> None:
|
||||
def _remount(path: str, rw: bool) -> None:
|
||||
mode = ("rw" if rw else "ro")
|
||||
_log(f"Remouning {path} to {mode.upper()}-mode ...")
|
||||
try:
|
||||
subprocess.check_call([
|
||||
_MOUNT_PATH,
|
||||
"--options",
|
||||
f"remount,{'ro' if ro else 'rw'}",
|
||||
path,
|
||||
])
|
||||
subprocess.check_call([_MOUNT_PATH, "--options", f"remount,{mode}", path])
|
||||
except subprocess.CalledProcessError as err:
|
||||
raise SystemExit(str(err)) from None
|
||||
raise SystemExit(f"Can't remount: {err}")
|
||||
|
||||
|
||||
def _mkdir(path: str) -> None:
|
||||
if not os.path.exists(path):
|
||||
_log(f"MKDIR {path} ...")
|
||||
try:
|
||||
os.mkdir(path)
|
||||
except Exception as err:
|
||||
raise SystemExit(f"Can't create directory: {err}")
|
||||
|
||||
|
||||
def _chown(path: str, user: str) -> None:
|
||||
_log(f"CHOWN {user} {path} ...")
|
||||
try:
|
||||
shutil.chown(path, user)
|
||||
except Exception as err:
|
||||
raise SystemExit(f"Can't change ownership: {err}")
|
||||
|
||||
|
||||
# =====
|
||||
def main() -> None:
|
||||
if len(sys.argv) != 2 or sys.argv[1] not in ["ro", "rw"]:
|
||||
raise SystemExit(f"This program will remount a first volume marked by {_OPTION!r} option in {_FSTAB_PATH}\n\n"
|
||||
f"Usage: python -m kvmd.helpers.otgmsd.remount [-h|--help|ro|rw]")
|
||||
_remount(_find_mountpoint(), (sys.argv[1] == "ro"))
|
||||
raise SystemExit(f"Usage: {sys.argv[0]} [ro|rw]")
|
||||
|
||||
rw = (sys.argv[1] == "rw")
|
||||
|
||||
storage = _find_storage()
|
||||
_remount(storage.mount_path, rw)
|
||||
if rw:
|
||||
if storage.root_path:
|
||||
for name in ["images", "meta"]:
|
||||
path = os.path.join(storage.root_path, name)
|
||||
_mkdir(path)
|
||||
if storage.user:
|
||||
_chown(path, storage.user)
|
||||
|
||||
@@ -31,12 +31,17 @@ _PROCESS_NAME = "file-storage"
|
||||
|
||||
|
||||
# =====
|
||||
def _log(msg: str) -> None:
|
||||
print(msg, file=sys.stderr)
|
||||
|
||||
|
||||
def _unlock() -> None:
|
||||
# https://github.com/torvalds/linux/blob/3039fad/drivers/usb/gadget/function/f_mass_storage.c#L2924
|
||||
found = False
|
||||
for proc in psutil.process_iter():
|
||||
attrs = proc.as_dict(attrs=["name", "exe"])
|
||||
attrs = proc.as_dict(attrs=["name", "exe", "pid"])
|
||||
if attrs.get("name") == _PROCESS_NAME and not attrs.get("exe"):
|
||||
_log(f"Sending SIGUSR1 to MSD {_PROCESS_NAME!r} kernel thread with pid={attrs['pid']} ...")
|
||||
try:
|
||||
proc.send_signal(signal.SIGUSR1)
|
||||
found = True
|
||||
@@ -49,6 +54,5 @@ def _unlock() -> None:
|
||||
# =====
|
||||
def main() -> None:
|
||||
if len(sys.argv) != 2 or sys.argv[1] != "unlock":
|
||||
raise SystemExit(f"This program interrupts all IO operations performed by OTG MSD.\n\n"
|
||||
f"Usage: python -m kvmd.helpers.otgmsd.unlock [-h|--help|unlock]")
|
||||
raise SystemExit(f"Usage: {sys.argv[0]} [unlock]")
|
||||
_unlock()
|
||||
|
||||
Reference in New Issue
Block a user