refactoring

This commit is contained in:
Maxim Devaev 2023-03-07 23:54:05 +02:00
parent 002031baf1
commit f652eca9c2
15 changed files with 49 additions and 49 deletions

View File

@ -117,8 +117,8 @@ def _parse_ps2_key(key: str) -> _Ps2Key:
def _read_keymap_csv(path: str) -> list[_KeyMapping]: def _read_keymap_csv(path: str) -> list[_KeyMapping]:
keymap: list[_KeyMapping] = [] keymap: list[_KeyMapping] = []
with open(path) as keymap_file: with open(path) as file:
for row in csv.DictReader(keymap_file): for row in csv.DictReader(file):
if len(row) >= 6: if len(row) >= 6:
keymap.append(_KeyMapping( keymap.append(_KeyMapping(
web_name=row["web_name"], web_name=row["web_name"],

View File

@ -59,8 +59,8 @@ def _get_htpasswd_for_write(config: Section) -> Generator[passlib.apache.Htpassw
try: try:
try: try:
st = os.stat(path) st = os.stat(path)
with open(path, "rb") as htpasswd_file: with open(path, "rb") as file:
os.write(tmp_fd, htpasswd_file.read()) os.write(tmp_fd, file.read())
os.fchown(tmp_fd, st.st_uid, st.st_gid) os.fchown(tmp_fd, st.st_uid, st.st_gid)
os.fchmod(tmp_fd, st.st_mode) os.fchmod(tmp_fd, st.st_mode)
finally: finally:

View File

@ -40,8 +40,8 @@ class IpmiUserCredentials:
class IpmiAuthManager: class IpmiAuthManager:
def __init__(self, path: str) -> None: def __init__(self, path: str) -> None:
self.__path = path self.__path = path
with open(path) as passwd_file: with open(path) as file:
self.__credentials = self.__parse_passwd_file(passwd_file.read().split("\n")) self.__credentials = self.__parse_passwd_file(file.read().split("\n"))
def __contains__(self, ipmi_user: str) -> bool: def __contains__(self, ipmi_user: str) -> bool:
return (ipmi_user in self.__credentials) return (ipmi_user in self.__credentials)

View File

@ -77,8 +77,8 @@ class AuthManager:
assert self.__internal_service assert self.__internal_service
if self.__totp_secret_path: if self.__totp_secret_path:
with open(self.__totp_secret_path) as secret_file: with open(self.__totp_secret_path) as file:
secret = secret_file.read().strip() secret = file.read().strip()
if secret: if secret:
code = passwd[-6:] code = passwd[-6:]
if not pyotp.TOTP(secret).verify(code): if not pyotp.TOTP(secret).verify(code):

View File

@ -80,14 +80,14 @@ def _write(path: str, value: (str | int), optional: bool=False) -> None:
logger.info("WRITE --- [SKIPPED] %s", path) logger.info("WRITE --- [SKIPPED] %s", path)
return return
logger.info("WRITE --- %s", path) logger.info("WRITE --- %s", path)
with open(path, "w") as param_file: with open(path, "w") as file:
param_file.write(str(value)) file.write(str(value))
def _write_bytes(path: str, data: bytes) -> None: def _write_bytes(path: str, data: bytes) -> None:
get_logger().info("WRITE --- %s", path) get_logger().info("WRITE --- %s", path)
with open(path, "wb") as param_file: with open(path, "wb") as file:
param_file.write(data) file.write(data)
def _check_config(config: Section) -> None: def _check_config(config: Section) -> None:

View File

@ -49,22 +49,22 @@ class _GadgetControl:
def __udc_stopped(self) -> Generator[None, None, None]: def __udc_stopped(self) -> Generator[None, None, None]:
udc = usb.find_udc(self.__udc) udc = usb.find_udc(self.__udc)
udc_path = usb.get_gadget_path(self.__gadget, usb.G_UDC) udc_path = usb.get_gadget_path(self.__gadget, usb.G_UDC)
with open(udc_path) as udc_file: with open(udc_path) as file:
enabled = bool(udc_file.read().strip()) enabled = bool(file.read().strip())
if enabled: if enabled:
with open(udc_path, "w") as udc_file: with open(udc_path, "w") as file:
udc_file.write("\n") file.write("\n")
try: try:
yield yield
finally: finally:
time.sleep(self.__init_delay) time.sleep(self.__init_delay)
with open(udc_path, "w") as udc_file: with open(udc_path, "w") as file:
udc_file.write(udc) file.write(udc)
def __read_metas(self) -> Generator[dict, None, None]: def __read_metas(self) -> Generator[dict, None, None]:
for meta_name in sorted(os.listdir(self.__meta_path)): for meta_name in sorted(os.listdir(self.__meta_path)):
with open(os.path.join(self.__meta_path, meta_name)) as meta_file: with open(os.path.join(self.__meta_path, meta_name)) as file:
yield json.loads(meta_file.read()) yield json.loads(file.read())
def enable_function(self, func: str) -> None: def enable_function(self, func: str) -> None:
with self.__udc_stopped(): with self.__udc_stopped():

View File

@ -39,14 +39,14 @@ def _get_param_path(gadget: str, instance: int, param: str) -> str:
def _get_param(gadget: str, instance: int, param: str) -> str: def _get_param(gadget: str, instance: int, param: str) -> str:
with open(_get_param_path(gadget, instance, param)) as param_file: with open(_get_param_path(gadget, instance, param)) as file:
return param_file.read().strip() return file.read().strip()
def _set_param(gadget: str, instance: int, param: str, value: str) -> None: def _set_param(gadget: str, instance: int, param: str, value: str) -> None:
try: try:
with open(_get_param_path(gadget, instance, param), "w") as param_file: with open(_get_param_path(gadget, instance, param), "w") as file:
param_file.write(value + "\n") file.write(value + "\n")
except OSError as err: except OSError as err:
if err.errno == errno.EBUSY: if err.errno == errno.EBUSY:
raise SystemExit(f"Can't change {param!r} value because device is locked: {err}") raise SystemExit(f"Can't change {param!r} value because device is locked: {err}")

View File

@ -174,8 +174,8 @@ class _Service: # pylint: disable=too-many-instance-attributes
real_driver = "rndis" real_driver = "rndis"
path = usb.get_gadget_path(self.__gadget, usb.G_FUNCTIONS, f"{real_driver}.usb0/ifname") path = usb.get_gadget_path(self.__gadget, usb.G_FUNCTIONS, f"{real_driver}.usb0/ifname")
logger.info("Using OTG gadget %r ...", self.__gadget) logger.info("Using OTG gadget %r ...", self.__gadget)
with open(path) as iface_file: with open(path) as file:
iface = iface_file.read().strip() iface = file.read().strip()
logger.info("Using OTG Ethernet interface %r ...", iface) logger.info("Using OTG Ethernet interface %r ...", iface)
assert iface assert iface
return iface return iface

View File

@ -44,13 +44,13 @@ def _join_rtc(rtc: int, key: str) -> str:
def _read_int(rtc: int, key: str) -> int: def _read_int(rtc: int, key: str) -> int:
with open(_join_rtc(rtc, key)) as value_file: with open(_join_rtc(rtc, key)) as file:
return int(value_file.read().strip() or "0") return int(file.read().strip() or "0")
def _write_int(rtc: int, key: str, value: int) -> None: def _write_int(rtc: int, key: str, value: int) -> None:
with open(_join_rtc(rtc, key), "w") as value_file: with open(_join_rtc(rtc, key), "w") as file:
value_file.write(str(value)) file.write(str(value))
def _reset_alarm(rtc: int, timeout: int) -> None: def _reset_alarm(rtc: int, timeout: int) -> None:

View File

@ -110,8 +110,8 @@ def _read_keyboard_layout(path: str) -> dict[int, list[At1Key]]: # Keysym to ev
logger = get_logger(0) logger = get_logger(0)
logger.info("Reading keyboard layout %s ...", path) logger.info("Reading keyboard layout %s ...", path)
with open(path) as layout_file: with open(path) as file:
lines = list(map(str.strip, layout_file.read().split("\n"))) lines = list(map(str.strip, file.read().split("\n")))
layout: dict[int, list[At1Key]] = {} layout: dict[int, list[At1Key]] = {}
for (lineno, line) in enumerate(lines): for (lineno, line) in enumerate(lines):

View File

@ -426,8 +426,8 @@ class Plugin(BaseAuthService):
assert user == user.strip() assert user == user.strip()
assert user assert user
try: try:
with io.StringIO(_FREERADUIS_DICT) as dct_file: with io.StringIO(_FREERADUIS_DICT) as file:
dct = pyrad.dictionary.Dictionary(dct_file) dct = pyrad.dictionary.Dictionary(file)
client = pyrad.client.Client( client = pyrad.client.Client(
server=self.__host, server=self.__host,
authport=self.__port, authport=self.__port,

View File

@ -172,8 +172,8 @@ class BaseDeviceProcess(multiprocessing.Process): # pylint: disable=too-many-in
return get_logger() return get_logger()
def __is_udc_configured(self) -> bool: def __is_udc_configured(self) -> bool:
with open(self.__udc_state_path) as udc_state_file: with open(self.__udc_state_path) as file:
return (udc_state_file.read().strip().lower() == "configured") return (file.read().strip().lower() == "configured")
def __write_report(self, report: bytes) -> bool: def __write_report(self, report: bytes) -> bool:
assert report assert report

View File

@ -75,13 +75,13 @@ class Drive:
# ===== # =====
def __get_param(self, param: str) -> str: def __get_param(self, param: str) -> str:
with open(os.path.join(self.__lun_path, param)) as param_file: with open(os.path.join(self.__lun_path, param)) as file:
return param_file.read().strip() return file.read().strip()
def __set_param(self, param: str, value: str) -> None: def __set_param(self, param: str, value: str) -> None:
try: try:
with open(os.path.join(self.__lun_path, param), "w") as param_file: with open(os.path.join(self.__lun_path, param), "w") as file:
param_file.write(value + "\n") file.write(value + "\n")
except OSError as err: except OSError as err:
if err.errno == errno.EBUSY: if err.errno == errno.EBUSY:
raise MsdDriveLockedError() raise MsdDriveLockedError()

View File

@ -129,12 +129,12 @@ class Plugin(BaseUserGpioDriver):
self.__set_udc_enabled(True) self.__set_udc_enabled(True)
def __set_udc_enabled(self, enabled: bool) -> None: def __set_udc_enabled(self, enabled: bool) -> None:
with open(self.__udc_path, "w") as udc_file: with open(self.__udc_path, "w") as file:
udc_file.write(self.__udc if enabled else "\n") file.write(self.__udc if enabled else "\n")
def __is_udc_enabled(self) -> bool: def __is_udc_enabled(self) -> bool:
with open(self.__udc_path) as udc_file: with open(self.__udc_path) as file:
return bool(udc_file.read().strip()) return bool(file.read().strip())
def __str__(self) -> str: def __str__(self) -> str:
return f"GPIO({self._instance_name})" return f"GPIO({self._instance_name})"

View File

@ -35,9 +35,9 @@ from .. import tools
# ===== # =====
def load_yaml_file(path: str) -> Any: def load_yaml_file(path: str) -> Any:
with open(path) as yaml_file: with open(path) as file:
try: try:
return yaml.load(yaml_file, _YamlLoader) return yaml.load(file, _YamlLoader)
except Exception as err: except Exception as err:
# Reraise internal exception as standard ValueError and show the incorrect file # Reraise internal exception as standard ValueError and show the incorrect file
raise ValueError(f"Invalid YAML in the file {path!r}:\n{tools.efmt(err)}") from None raise ValueError(f"Invalid YAML in the file {path!r}:\n{tools.efmt(err)}") from None
@ -45,9 +45,9 @@ def load_yaml_file(path: str) -> Any:
# ===== # =====
class _YamlLoader(yaml.SafeLoader): class _YamlLoader(yaml.SafeLoader):
def __init__(self, yaml_file: IO) -> None: def __init__(self, file: IO) -> None:
super().__init__(yaml_file) super().__init__(file)
self.__root = os.path.dirname(yaml_file.name) self.__root = os.path.dirname(file.name)
def include(self, node: yaml.nodes.Node) -> Any: def include(self, node: yaml.nodes.Node) -> Any:
incs: list[str] incs: list[str]