otgconf gpio plugin

This commit is contained in:
Maxim Devaev 2022-04-02 10:19:29 +03:00
parent f1e9f33c13
commit bd8984dd06
6 changed files with 97 additions and 25 deletions

View File

@ -86,7 +86,7 @@ def main(argv: Optional[List[str]]=None) -> None:
), ),
info_manager=InfoManager(global_config), info_manager=InfoManager(global_config),
log_reader=LogReader(), log_reader=LogReader(),
user_gpio=UserGpio(config.gpio, global_config.otg.udc, global_config.otg.gadget), user_gpio=UserGpio(config.gpio, global_config.otg),
ocr=TesseractOcr(**config.ocr._unpack()), ocr=TesseractOcr(**config.ocr._unpack()),
hid=hid, hid=hid,

View File

@ -231,7 +231,7 @@ class _GpioOutput: # pylint: disable=too-many-instance-attributes
# ===== # =====
class UserGpio: class UserGpio:
def __init__(self, config: Section, udc: str, gadget: str) -> None: def __init__(self, config: Section, otg_config: Section) -> None:
self.__view = config.view self.__view = config.view
self.__notifier = aiotools.AioNotifier() self.__notifier = aiotools.AioNotifier()
@ -241,7 +241,7 @@ class UserGpio:
instance_name=driver, instance_name=driver,
notifier=self.__notifier, notifier=self.__notifier,
**drv_config._unpack(ignore=["instance_name", "notifier", "type"]), **drv_config._unpack(ignore=["instance_name", "notifier", "type"]),
**({"udc": udc, "gadget": gadget} if drv_config.type == "otgbind" else {}), # Hack **({"otg_config": otg_config} if drv_config.type == "otgconf" else {}), # Hack
) )
for (driver, drv_config) in tools.sorted_kvs(config.drivers) for (driver, drv_config) in tools.sorted_kvs(config.drivers)
} }

View File

@ -270,8 +270,9 @@ def _cmd_start(config: Section) -> None: # pylint: disable=too-many-statements
_write(join(gadget_path, "UDC"), udc) _write(join(gadget_path, "UDC"), udc)
time.sleep(config.otg.init_delay) time.sleep(config.otg.init_delay)
logger.info("Setting UDC permissions ...") logger.info("Setting up permissions ...")
_chown(join(gadget_path, "UDC"), config.otg.user) _chown(join(gadget_path, "UDC"), config.otg.user)
_chown(profile_path, config.otg.user)
logger.info("Ready to work") logger.info("Ready to work")

View File

@ -27,9 +27,12 @@ import argparse
import time import time
from typing import List from typing import List
from typing import Dict
from typing import Generator from typing import Generator
from typing import Optional from typing import Optional
import yaml
from ...validators.basic import valid_stripped_string_not_empty from ...validators.basic import valid_stripped_string_not_empty
from ... import usb from ... import usb
@ -57,10 +60,14 @@ class _GadgetControl:
try: try:
yield yield
finally: finally:
if enabled: time.sleep(self.__init_delay)
time.sleep(self.__init_delay) with open(udc_path, "w") as udc_file:
with open(udc_path, "w") as udc_file: udc_file.write(udc)
udc_file.write(udc)
def __read_metas(self) -> Generator[Dict, None, None]:
for meta_name in sorted(os.listdir(self.__meta_path)):
with open(os.path.join(self.__meta_path, meta_name)) as meta_file:
yield json.loads(meta_file.read())
def enable_function(self, func: str) -> None: def enable_function(self, func: str) -> None:
with self.__udc_stopped(): with self.__udc_stopped():
@ -74,12 +81,30 @@ class _GadgetControl:
os.unlink(usb.get_gadget_path(self.__gadget, usb.G_PROFILE, func)) os.unlink(usb.get_gadget_path(self.__gadget, usb.G_PROFILE, func))
def list_functions(self) -> None: def list_functions(self) -> None:
for meta_name in sorted(os.listdir(self.__meta_path)): for meta in self.__read_metas():
with open(os.path.join(self.__meta_path, meta_name)) as meta_file:
meta = json.loads(meta_file.read())
enabled = os.path.exists(usb.get_gadget_path(self.__gadget, usb.G_PROFILE, meta["func"])) enabled = os.path.exists(usb.get_gadget_path(self.__gadget, usb.G_PROFILE, meta["func"]))
print(f"{'+' if enabled else '-'} {meta['func']} # {meta['name']}") print(f"{'+' if enabled else '-'} {meta['func']} # {meta['name']}")
def make_gpio_config(self) -> None:
config = {
"drivers": {"otgconf": {"type": "otgconf"}},
"scheme": {},
"view": {"table": []},
}
for meta in self.__read_metas():
config["scheme"][meta["func"]] = { # type: ignore
"driver": "otgconf",
"pin": meta["func"],
"mode": "output",
"pulse": {"delay": 0},
}
config["view"]["table"].append([ # type: ignore
"#" + meta["name"],
"#" + meta["func"],
meta["func"],
])
print(yaml.dump({"kvmd": {"gpio": config}}, indent=4))
def reset(self) -> None: def reset(self) -> None:
with self.__udc_stopped(): with self.__udc_stopped():
pass pass
@ -102,14 +127,27 @@ def main(argv: Optional[List[str]]=None) -> None:
parser.add_argument("-d", "--disable-function", type=valid_stripped_string_not_empty, parser.add_argument("-d", "--disable-function", type=valid_stripped_string_not_empty,
metavar="<name>", help="Disable function") metavar="<name>", help="Disable function")
parser.add_argument("-r", "--reset-gadget", action="store_true", help="Reset gadget") parser.add_argument("-r", "--reset-gadget", action="store_true", help="Reset gadget")
parser.add_argument("--make-gpio-config", action="store_true")
options = parser.parse_args(argv[1:]) options = parser.parse_args(argv[1:])
gc = _GadgetControl(config.otg.meta, config.otg.gadget, config.otg.udc, config.otg.init_delay) gc = _GadgetControl(config.otg.meta, config.otg.gadget, config.otg.udc, config.otg.init_delay)
if options.reset_gadget:
gc.reset() if options.list_functions:
return gc.list_functions()
elif options.enable_function: elif options.enable_function:
gc.enable_function(options.enable_function) gc.enable_function(options.enable_function)
gc.list_functions()
elif options.disable_function: elif options.disable_function:
gc.disable_function(options.disable_function) gc.disable_function(options.disable_function)
gc.list_functions() gc.list_functions()
elif options.reset_gadget:
gc.reset()
elif options.make_gpio_config:
gc.make_gpio_config()
else:
gc.list_functions()

View File

@ -34,6 +34,10 @@ from ...inotify import Inotify
from ... import aiotools from ... import aiotools
from ... import usb from ... import usb
from ...yamlconf import Section
from ...validators.basic import valid_stripped_string_not_empty
from . import BaseUserGpioDriver from . import BaseUserGpioDriver
@ -44,19 +48,24 @@ class Plugin(BaseUserGpioDriver):
instance_name: str, instance_name: str,
notifier: aiotools.AioNotifier, notifier: aiotools.AioNotifier,
udc: str, # XXX: Not from options, see /kvmd/apps/kvmd/__init__.py for details otg_config: Section, # XXX: Not from options, see /kvmd/apps/kvmd/__init__.py for details
gadget: str, # ditto
) -> None: ) -> None:
super().__init__(instance_name, notifier) super().__init__(instance_name, notifier)
self.__udc = udc self.__udc: str = otg_config.udc
self.__init_delay: float = otg_config.init_delay
gadget: str = otg_config.gadget
self.__udc_path = usb.get_gadget_path(gadget, usb.G_UDC) self.__udc_path = usb.get_gadget_path(gadget, usb.G_UDC)
self.__functions_path = usb.get_gadget_path(gadget, usb.G_FUNCTIONS)
self.__profile_path = usb.get_gadget_path(gadget, usb.G_PROFILE)
self.__lock = asyncio.Lock()
@classmethod @classmethod
def get_pin_validator(cls) -> Callable[[Any], Any]: def get_pin_validator(cls) -> Callable[[Any], Any]:
return str return valid_stripped_string_not_empty
def prepare(self) -> None: def prepare(self) -> None:
self.__udc = usb.find_udc(self.__udc) self.__udc = usb.find_udc(self.__udc)
@ -74,6 +83,7 @@ class Plugin(BaseUserGpioDriver):
with Inotify() as inotify: with Inotify() as inotify:
inotify.watch(os.path.dirname(self.__udc_path), InotifyMask.ALL_MODIFY_EVENTS) inotify.watch(os.path.dirname(self.__udc_path), InotifyMask.ALL_MODIFY_EVENTS)
inotify.watch(self.__profile_path, InotifyMask.ALL_MODIFY_EVENTS)
await self._notifier.notify() await self._notifier.notify()
while True: while True:
need_restart = False need_restart = False
@ -90,16 +100,39 @@ class Plugin(BaseUserGpioDriver):
await self._notifier.notify() await self._notifier.notify()
except Exception: except Exception:
logger.exception("Unexpected OTG-bind watcher error") logger.exception("Unexpected OTG-bind watcher error")
await asyncio.sleep(1)
async def read(self, pin: str) -> bool: async def read(self, pin: str) -> bool:
_ = pin if pin == "udc":
with open(self.__udc_path) as udc_file: return self.__is_udc_enabled()
return bool(udc_file.read().strip()) return os.path.exists(os.path.join(self.__profile_path, pin))
async def write(self, pin: str, state: bool) -> None: async def write(self, pin: str, state: bool) -> None:
_ = pin async with self.__lock:
if pin == "udc":
self.__set_udc_enabled(state)
else:
if self.__is_udc_enabled():
self.__set_udc_enabled(False)
try:
if state:
os.symlink(
os.path.join(self.__functions_path, pin),
os.path.join(self.__profile_path, pin),
)
else:
os.unlink(os.path.join(self.__profile_path, pin))
finally:
await asyncio.sleep(self.__init_delay)
self.__set_udc_enabled(True)
def __set_udc_enabled(self, enabled: bool) -> None:
with open(self.__udc_path, "w") as udc_file: with open(self.__udc_path, "w") as udc_file:
udc_file.write(self.__udc if state else "\n") udc_file.write(self.__udc if enabled else "\n")
def __is_udc_enabled(self) -> bool:
with open(self.__udc_path) as udc_file:
return bool(udc_file.read().strip())
def __str__(self) -> str: def __str__(self) -> str:
return f"GPIO({self._instance_name})" return f"GPIO({self._instance_name})"

View File

@ -42,7 +42,7 @@ def valid_ugpio_driver(arg: Any, variants: Optional[Set[str]]=None) -> str:
def valid_ugpio_channel(arg: Any) -> str: def valid_ugpio_channel(arg: Any) -> str:
name = "GPIO channel" name = "GPIO channel"
return check_len(check_re_match(arg, name, r"^[a-zA-Z_][a-zA-Z0-9_-]*$"), name, 255) return check_len(check_re_match(arg, name, r"^[a-zA-Z_][a-zA-Z0-9_.-]*$"), name, 255)
def valid_ugpio_mode(arg: Any, variants: Set[str]) -> str: def valid_ugpio_mode(arg: Any, variants: Set[str]) -> str: