kvmd-otgconf: reset delay

This commit is contained in:
Maxim Devaev 2022-04-01 06:51:57 +03:00
parent 3b56100fe2
commit 7ba7f08be7

View File

@ -24,6 +24,7 @@ import os
import json import json
import contextlib import contextlib
import argparse import argparse
import time
from typing import List from typing import List
from typing import Generator from typing import Generator
@ -37,47 +38,51 @@ from .. import init
# ===== # =====
@contextlib.contextmanager class _GadgetControl:
def _udc_stopped(gadget: str, udc: str) -> Generator[None, None, None]: def __init__(self, meta_path: str, gadget: str, udc: str, init_delay: float) -> None:
udc = usb.find_udc(udc) self.__meta_path = meta_path
udc_path = usb.get_gadget_path(gadget, usb.G_UDC) self.__gadget = gadget
with open(udc_path) as udc_file: self.__udc = udc
enabled = bool(udc_file.read().strip()) self.__init_delay = init_delay
if enabled:
with open(udc_path, "w") as udc_file: @contextlib.contextmanager
udc_file.write("\n") def __udc_stopped(self) -> Generator[None, None, None]:
try: udc = usb.find_udc(self.__udc)
yield udc_path = usb.get_gadget_path(self.__gadget, usb.G_UDC)
finally: with open(udc_path) as udc_file:
enabled = bool(udc_file.read().strip())
if enabled: if enabled:
with open(udc_path, "w") as udc_file: with open(udc_path, "w") as udc_file:
udc_file.write(udc) udc_file.write("\n")
try:
yield
finally:
if enabled:
time.sleep(self.__init_delay)
with open(udc_path, "w") as udc_file:
udc_file.write(udc)
def enable_function(self, func: str) -> None:
with self.__udc_stopped():
os.symlink(
usb.get_gadget_path(self.__gadget, usb.G_FUNCTIONS, func),
usb.get_gadget_path(self.__gadget, usb.G_PROFILE, func),
)
def _enable_function(gadget: str, udc: str, func: str) -> None: def disable_function(self, func: str) -> None:
with _udc_stopped(gadget, udc): with self.__udc_stopped():
os.symlink( os.unlink(usb.get_gadget_path(self.__gadget, usb.G_PROFILE, func))
usb.get_gadget_path(gadget, usb.G_FUNCTIONS, func),
usb.get_gadget_path(gadget, usb.G_PROFILE, func),
)
def list_functions(self) -> None:
for meta_name in sorted(os.listdir(self.__meta_path)):
with open(os.path.join(self.__meta_path, meta_name)) as meta_file:
meta = json.loads(meta_file.read())
enabled = os.path.exists(usb.get_gadget_path(self.__gadget, usb.G_PROFILE, meta["func"]))
print(f"{'+' if enabled else '-'} {meta['func']} # {meta['name']}")
def _disable_function(gadget: str, udc: str, func: str) -> None: def reset(self) -> None:
with _udc_stopped(gadget, udc): with self.__udc_stopped():
os.unlink(usb.get_gadget_path(gadget, usb.G_PROFILE, func)) pass
def _list_functions(gadget: str, meta_path: str) -> None:
for meta_name in sorted(os.listdir(meta_path)):
with open(os.path.join(meta_path, meta_name)) as meta_file:
meta = json.loads(meta_file.read())
enabled = os.path.exists(usb.get_gadget_path(gadget, usb.G_PROFILE, meta["func"]))
print(f"{'+' if enabled else '-'} {meta['func']} # {meta['name']}")
def _reset_gadget(gadget: str, udc: str) -> None:
with _udc_stopped(gadget, udc):
pass
# ===== # =====
@ -99,11 +104,12 @@ def main(argv: Optional[List[str]]=None) -> None:
parser.add_argument("-r", "--reset-gadget", action="store_true", help="Reset gadget") parser.add_argument("-r", "--reset-gadget", action="store_true", help="Reset gadget")
options = parser.parse_args(argv[1:]) options = parser.parse_args(argv[1:])
gc = _GadgetControl(config.otg.meta, config.otg.gadget, config.otg.udc, config.otg.init_delay)
if options.reset_gadget: if options.reset_gadget:
_reset_gadget(config.otg.gadget, config.otg.udc) gc.reset()
return return
elif options.enable_function: elif options.enable_function:
_enable_function(config.otg.gadget, config.otg.udc, options.enable_function) gc.enable_function(options.enable_function)
elif options.disable_function: elif options.disable_function:
_disable_function(config.otg.gadget, config.otg.udc, options.disable_function) gc.disable_function(options.disable_function)
_list_functions(config.otg.gadget, config.otg.meta) gc.list_functions()