refactoring

This commit is contained in:
Maxim Devaev 2022-03-31 16:17:52 +03:00
parent ab09f88d80
commit b775239d72
7 changed files with 44 additions and 59 deletions

View File

@ -38,7 +38,6 @@ from ...yamlconf import Section
from ...validators import ValidatorError from ...validators import ValidatorError
from ... import env
from ... import usb from ... import usb
from .. import init from .. import init
@ -147,7 +146,7 @@ class _GadgetConfig:
# a Microsoft-specific extension of USB. # a Microsoft-specific extension of USB.
_write(join(func_path, "os_desc/interface.rndis/compatible_id"), "RNDIS") _write(join(func_path, "os_desc/interface.rndis/compatible_id"), "RNDIS")
_write(join(func_path, "os_desc/interface.rndis/sub_compatible_id"), "5162001") _write(join(func_path, "os_desc/interface.rndis/sub_compatible_id"), "5162001")
_symlink(self.__profile_path, join(self.__gadget_path, "os_desc/c.1")) _symlink(self.__profile_path, join(self.__gadget_path, "os_desc", usb.G_PROFILE_NAME))
_symlink(func_path, join(self.__profile_path, func)) _symlink(func_path, join(self.__profile_path, func))
self.__create_meta(func, "Ethernet") self.__create_meta(func, "Ethernet")
@ -207,7 +206,7 @@ def _cmd_start(config: Section) -> None: # pylint: disable=too-many-statements
logger.info("Using UDC %s", udc) logger.info("Using UDC %s", udc)
logger.info("Creating gadget %r ...", config.otg.gadget) logger.info("Creating gadget %r ...", config.otg.gadget)
gadget_path = join(f"{env.SYSFS_PREFIX}/sys/kernel/config/usb_gadget", config.otg.gadget) gadget_path = usb.get_gadget_path(config.otg.gadget)
_mkdir(gadget_path) _mkdir(gadget_path)
_write(join(gadget_path, "idVendor"), f"0x{config.otg.vendor_id:04X}") _write(join(gadget_path, "idVendor"), f"0x{config.otg.vendor_id:04X}")
@ -229,7 +228,7 @@ def _cmd_start(config: Section) -> None: # pylint: disable=too-many-statements
_write(join(lang_path, "product"), config.otg.product) _write(join(lang_path, "product"), config.otg.product)
_write(join(lang_path, "serialnumber"), config.otg.serial) _write(join(lang_path, "serialnumber"), config.otg.serial)
profile_path = join(gadget_path, "configs/c.1") profile_path = join(gadget_path, usb.G_PROFILE)
_mkdir(profile_path) _mkdir(profile_path)
_mkdir(join(profile_path, "strings/0x409")) _mkdir(join(profile_path, "strings/0x409"))
_write(join(profile_path, "strings/0x409/configuration"), f"Config 1: {config.otg.config}") _write(join(profile_path, "strings/0x409/configuration"), f"Config 1: {config.otg.config}")
@ -285,14 +284,14 @@ def _cmd_stop(config: Section) -> None:
_check_config(config) _check_config(config)
gadget_path = join(f"{env.SYSFS_PREFIX}/sys/kernel/config/usb_gadget", config.otg.gadget) gadget_path = usb.get_gadget_path(config.otg.gadget)
logger.info("Disabling gadget %r ...", config.otg.gadget) logger.info("Disabling gadget %r ...", config.otg.gadget)
_write(join(gadget_path, "UDC"), "\n") _write(join(gadget_path, "UDC"), "\n")
_unlink(join(gadget_path, "os_desc/c.1"), True) _unlink(join(gadget_path, "os_desc", usb.G_PROFILE_NAME), True)
profile_path = join(gadget_path, "configs/c.1") profile_path = join(gadget_path, usb.G_PROFILE)
for func in os.listdir(profile_path): for func in os.listdir(profile_path):
if re.search(r"\.usb\d+$", func): if re.search(r"\.usb\d+$", func):
_unlink(join(profile_path, func)) _unlink(join(profile_path, func))

View File

@ -31,25 +31,16 @@ from typing import Optional
from ...validators.basic import valid_stripped_string_not_empty from ...validators.basic import valid_stripped_string_not_empty
from ... import env
from ... import usb from ... import usb
from .. import init from .. import init
# ===== # =====
def _make_config_path(gadget: str, func: str) -> str:
return os.path.join(f"{env.SYSFS_PREFIX}/sys/kernel/config/usb_gadget", gadget, "configs/c.1", func)
def _make_func_path(gadget: str, func: str) -> str:
return os.path.join(f"{env.SYSFS_PREFIX}/sys/kernel/config/usb_gadget", gadget, "functions", func)
@contextlib.contextmanager @contextlib.contextmanager
def _udc_stopped(gadget: str, udc: str) -> Generator[None, None, None]: def _udc_stopped(gadget: str, udc: str) -> Generator[None, None, None]:
udc = usb.find_udc(udc) udc = usb.find_udc(udc)
udc_path = os.path.join(f"{env.SYSFS_PREFIX}/sys/kernel/config/usb_gadget", gadget, "UDC") udc_path = usb.get_gadget_path(gadget, usb.G_UDC)
with open(udc_path) as udc_file: with open(udc_path) as udc_file:
enabled = bool(udc_file.read().strip()) enabled = bool(udc_file.read().strip())
if enabled: if enabled:
@ -65,19 +56,22 @@ def _udc_stopped(gadget: str, udc: str) -> Generator[None, None, None]:
def _enable_function(gadget: str, udc: str, func: str) -> None: def _enable_function(gadget: str, udc: str, func: str) -> None:
with _udc_stopped(gadget, udc): with _udc_stopped(gadget, udc):
os.symlink(_make_func_path(gadget, func), _make_config_path(gadget, func)) os.symlink(
usb.get_gadget_path(gadget, usb.G_FUNCTIONS, func),
usb.get_gadget_path(gadget, usb.G_PROFILE, func),
)
def _disable_function(gadget: str, udc: str, func: str) -> None: def _disable_function(gadget: str, udc: str, func: str) -> None:
with _udc_stopped(gadget, udc): with _udc_stopped(gadget, udc):
os.unlink(_make_config_path(gadget, func)) os.unlink(usb.get_gadget_path(gadget, usb.G_PROFILE, func))
def _list_functions(gadget: str, meta_path: str) -> None: def _list_functions(gadget: str, meta_path: str) -> None:
for meta_name in sorted(os.listdir(meta_path)): for meta_name in sorted(os.listdir(meta_path)):
with open(os.path.join(meta_path, meta_name)) as meta_file: with open(os.path.join(meta_path, meta_name)) as meta_file:
meta = json.loads(meta_file.read()) meta = json.loads(meta_file.read())
enabled = os.path.exists(_make_config_path(gadget, meta["func"])) enabled = os.path.exists(usb.get_gadget_path(gadget, usb.G_PROFILE, meta["func"]))
print(f"{'+' if enabled else '-'} {meta['func']} # {meta['name']}") print(f"{'+' if enabled else '-'} {meta['func']} # {meta['name']}")

View File

@ -34,29 +34,24 @@ from ...validators.basic import valid_bool
from ...validators.basic import valid_int_f0 from ...validators.basic import valid_int_f0
from ...validators.os import valid_abs_file from ...validators.os import valid_abs_file
from ... import env from ... import usb
from .. import init from .. import init
# ===== # =====
def _make_param_path(gadget: str, instance: int, param: str) -> str: def _get_param_path(gadget: str, instance: int, param: str) -> str:
return os.path.join( return usb.get_gadget_path(gadget, usb.G_FUNCTIONS, f"mass_storage.usb{instance}/lun.0", param)
f"{env.SYSFS_PREFIX}/sys/kernel/config/usb_gadget",
gadget,
f"functions/mass_storage.usb{instance}/lun.0",
param,
)
def _get_param(gadget: str, instance: int, param: str) -> str: def _get_param(gadget: str, instance: int, param: str) -> str:
with open(_make_param_path(gadget, instance, param)) as param_file: with open(_get_param_path(gadget, instance, param)) as param_file:
return param_file.read().strip() return param_file.read().strip()
def _set_param(gadget: str, instance: int, param: str, value: str) -> None: def _set_param(gadget: str, instance: int, param: str, value: str) -> None:
try: try:
with open(_make_param_path(gadget, instance, param), "w") as param_file: with open(_get_param_path(gadget, instance, param), "w") as param_file:
param_file.write(value + "\n") param_file.write(value + "\n")
except OSError as err: except OSError as err:
if err.errno == errno.EBUSY: if err.errno == errno.EBUSY:

View File

@ -20,7 +20,6 @@
# ========================================================================== # # ========================================================================== #
import os
import asyncio import asyncio
import ipaddress import ipaddress
import dataclasses import dataclasses
@ -34,9 +33,9 @@ from ...logging import get_logger
from ...yamlconf import Section from ...yamlconf import Section
from ... import env
from ... import tools from ... import tools
from ... import aioproc from ... import aioproc
from ... import usb
from .. import init from .. import init
@ -176,11 +175,7 @@ class _Service: # pylint: disable=too-many-instance-attributes
real_driver = self.__driver real_driver = self.__driver
if self.__driver == "rndis5": if self.__driver == "rndis5":
real_driver = "rndis" real_driver = "rndis"
path = env.SYSFS_PREFIX + os.path.join( path = usb.get_gadget_path(self.__gadget, usb.G_FUNCTIONS, f"{real_driver}.usb0/ifname")
"/sys/kernel/config/usb_gadget",
self.__gadget,
f"functions/{real_driver}.usb0/ifname",
)
logger.info("Using OTG gadget %r ...", self.__gadget) logger.info("Using OTG gadget %r ...", self.__gadget)
with open(path) as iface_file: with open(path) as iface_file:
iface = iface_file.read().strip() iface = iface_file.read().strip()

View File

@ -25,7 +25,7 @@ import errno
from typing import List from typing import List
from .... import env from .... import usb
from .. import MsdOperationError from .. import MsdOperationError
@ -39,23 +39,16 @@ class MsdDriveLockedError(MsdOperationError):
# ===== # =====
class Drive: class Drive:
def __init__(self, gadget: str, instance: int, lun: int) -> None: def __init__(self, gadget: str, instance: int, lun: int) -> None:
self.__lun_path = os.path.join( func = f"mass_storage.usb{instance}"
f"{env.SYSFS_PREFIX}/sys/kernel/config/usb_gadget", self.__profile_func_path = usb.get_gadget_path(gadget, usb.G_PROFILE, func)
gadget, self.__profile_path = usb.get_gadget_path(gadget, usb.G_PROFILE)
f"functions/mass_storage.usb{instance}/lun.{lun}", self.__lun_path = usb.get_gadget_path(gadget, usb.G_FUNCTIONS, func, f"lun.{lun}")
)
self.__configs_root_path = os.path.join(
f"{env.SYSFS_PREFIX}/sys/kernel/config/usb_gadget",
gadget,
"configs/c.1",
)
self.__func_path = os.path.join(self.__configs_root_path, f"mass_storage.usb{instance}")
def is_enabled(self) -> bool: def is_enabled(self) -> bool:
return os.path.exists(self.__func_path) return os.path.exists(self.__profile_func_path)
def get_watchable_paths(self) -> List[str]: def get_watchable_paths(self) -> List[str]:
return [self.__lun_path, self.__configs_root_path] return [self.__lun_path, self.__profile_path]
# ===== # =====

View File

@ -31,7 +31,6 @@ from ...logging import get_logger
from ...inotify import InotifyMask from ...inotify import InotifyMask
from ...inotify import Inotify from ...inotify import Inotify
from ... import env
from ... import aiotools from ... import aiotools
from ... import usb from ... import usb
@ -53,7 +52,7 @@ class Plugin(BaseUserGpioDriver):
self.__udc = udc self.__udc = udc
self.__ctl_path = f"{env.SYSFS_PREFIX}/sys/kernel/config/usb_gadget/{gadget}/UDC" self.__udc_path = usb.get_gadget_path(gadget, usb.G_UDC)
@classmethod @classmethod
def get_pin_validator(cls) -> Callable[[Any], Any]: def get_pin_validator(cls) -> Callable[[Any], Any]:
@ -69,12 +68,12 @@ class Plugin(BaseUserGpioDriver):
try: try:
while True: while True:
await self._notifier.notify() await self._notifier.notify()
if os.path.isfile(self.__ctl_path): if os.path.isfile(self.__udc_path):
break break
await asyncio.sleep(5) await asyncio.sleep(5)
with Inotify() as inotify: with Inotify() as inotify:
inotify.watch(os.path.dirname(self.__ctl_path), InotifyMask.ALL_MODIFY_EVENTS) inotify.watch(os.path.dirname(self.__udc_path), InotifyMask.ALL_MODIFY_EVENTS)
await self._notifier.notify() await self._notifier.notify()
while True: while True:
need_restart = False need_restart = False
@ -94,13 +93,13 @@ class Plugin(BaseUserGpioDriver):
async def read(self, pin: str) -> bool: async def read(self, pin: str) -> bool:
_ = pin _ = pin
with open(self.__ctl_path) as ctl_file: with open(self.__udc_path) as udc_file:
return bool(ctl_file.read().strip()) return bool(udc_file.read().strip())
async def write(self, pin: str, state: bool) -> None: async def write(self, pin: str, state: bool) -> None:
_ = pin _ = pin
with open(self.__ctl_path, "w") as ctl_file: with open(self.__udc_path, "w") as udc_file:
ctl_file.write(self.__udc if state else "\n") udc_file.write(self.__udc if state else "\n")
def __str__(self) -> str: def __str__(self) -> str:
return f"GPIO({self._instance_name})" return f"GPIO({self._instance_name})"

View File

@ -36,3 +36,13 @@ def find_udc(udc: str) -> str:
elif udc not in candidates: elif udc not in candidates:
raise RuntimeError(f"Can't find selected UDC: {udc}") raise RuntimeError(f"Can't find selected UDC: {udc}")
return udc # fe980000.usb return udc # fe980000.usb
G_UDC = "UDC"
G_FUNCTIONS = "functions"
G_PROFILE_NAME = "c.1"
G_PROFILE = f"configs/{G_PROFILE_NAME}"
def get_gadget_path(gadget: str, *parts: str) -> str:
return os.path.join(f"{env.SYSFS_PREFIX}/sys/kernel/config/usb_gadget", gadget, *parts)