mirror of
https://github.com/mofeng-git/One-KVM.git
synced 2025-12-12 09:10:30 +08:00
f-strings
This commit is contained in:
parent
ff270591b0
commit
ef3c62a7af
@ -64,7 +64,7 @@ def _render_keymap(keymap: List[_KeyMapping], template_path: str, out_path: str)
|
|||||||
|
|
||||||
# =====
|
# =====
|
||||||
def main() -> None:
|
def main() -> None:
|
||||||
assert len(sys.argv) == 4, "%s <keymap.in> <template> <out>" % (sys.argv[0])
|
assert len(sys.argv) == 4, f"{sys.argv[0]} <keymap.in> <template> <out>"
|
||||||
|
|
||||||
keymap = _read_keymap_in(sys.argv[1])
|
keymap = _read_keymap_in(sys.argv[1])
|
||||||
_render_keymap(keymap, sys.argv[2], sys.argv[3])
|
_render_keymap(keymap, sys.argv[2], sys.argv[3])
|
||||||
|
|||||||
@ -119,7 +119,7 @@ def _init_config(config_path: str, sections: List[str], override_options: List[s
|
|||||||
|
|
||||||
return config
|
return config
|
||||||
except (ConfigError, UnknownPluginError) as err:
|
except (ConfigError, UnknownPluginError) as err:
|
||||||
raise SystemExit("Config error: %s" % (str(err)))
|
raise SystemExit(f"Config error: {err}")
|
||||||
|
|
||||||
|
|
||||||
def _dump_config(config: Section) -> None:
|
def _dump_config(config: Section) -> None:
|
||||||
|
|||||||
@ -45,7 +45,8 @@ from .. import init
|
|||||||
# =====
|
# =====
|
||||||
def _get_htpasswd_path(config: Section) -> str:
|
def _get_htpasswd_path(config: Section) -> str:
|
||||||
if config.kvmd.auth.internal_type != "htpasswd":
|
if config.kvmd.auth.internal_type != "htpasswd":
|
||||||
raise SystemExit("Error: KVMD internal auth not using 'htpasswd' (now configured %r)" % (config.kvmd.auth.internal_type))
|
raise SystemExit(f"Error: KVMD internal auth not using 'htpasswd'"
|
||||||
|
f" (now configured {config.kvmd.auth.internal_type!r})")
|
||||||
return config.kvmd.auth.internal.file
|
return config.kvmd.auth.internal.file
|
||||||
|
|
||||||
|
|
||||||
@ -53,7 +54,7 @@ def _get_htpasswd_path(config: Section) -> str:
|
|||||||
def _get_htpasswd_for_write(config: Section) -> Generator[passlib.apache.HtpasswdFile, None, None]:
|
def _get_htpasswd_for_write(config: Section) -> Generator[passlib.apache.HtpasswdFile, None, None]:
|
||||||
path = _get_htpasswd_path(config)
|
path = _get_htpasswd_path(config)
|
||||||
(tmp_fd, tmp_path) = tempfile.mkstemp(
|
(tmp_fd, tmp_path) = tempfile.mkstemp(
|
||||||
prefix=".%s." % (os.path.basename(path)),
|
prefix=f".{os.path.basename(path)}.",
|
||||||
dir=os.path.dirname(path),
|
dir=os.path.dirname(path),
|
||||||
)
|
)
|
||||||
try:
|
try:
|
||||||
|
|||||||
@ -61,12 +61,12 @@ class IpmiAuthManager:
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
if " -> " not in line:
|
if " -> " not in line:
|
||||||
raise IpmiPasswdError("Missing ' -> ' operator at line #%d" % (number))
|
raise IpmiPasswdError(f"Missing ' -> ' operator at line #{number}")
|
||||||
|
|
||||||
(left, right) = map(str.lstrip, line.split(" -> ", 1))
|
(left, right) = map(str.lstrip, line.split(" -> ", 1))
|
||||||
for (name, pair) in [("left", left), ("right", right)]:
|
for (name, pair) in [("left", left), ("right", right)]:
|
||||||
if ":" not in pair:
|
if ":" not in pair:
|
||||||
raise IpmiPasswdError("Missing ':' operator in %s credentials at line #%d" % (name, number))
|
raise IpmiPasswdError(f"Missing ':' operator in {name} credentials at line #{number}")
|
||||||
|
|
||||||
(ipmi_user, ipmi_passwd) = left.split(":")
|
(ipmi_user, ipmi_passwd) = left.split(":")
|
||||||
ipmi_user = ipmi_user.strip()
|
ipmi_user = ipmi_user.strip()
|
||||||
@ -75,7 +75,7 @@ class IpmiAuthManager:
|
|||||||
kvmd_user = kvmd_user.strip()
|
kvmd_user = kvmd_user.strip()
|
||||||
|
|
||||||
if ipmi_user in credentials:
|
if ipmi_user in credentials:
|
||||||
raise IpmiPasswdError("Found duplicating user %r (left) at line #%d" % (ipmi_user, number))
|
raise IpmiPasswdError(f"Found duplicating user {ipmi_user!r} (left) at line #{number}")
|
||||||
|
|
||||||
credentials[ipmi_user] = IpmiUserCredentials(
|
credentials[ipmi_user] = IpmiUserCredentials(
|
||||||
ipmi_user=ipmi_user,
|
ipmi_user=ipmi_user,
|
||||||
|
|||||||
@ -155,7 +155,7 @@ class IpmiServer(BaseIpmiServer): # pylint: disable=too-many-instance-attribute
|
|||||||
logger = get_logger(0)
|
logger = get_logger(0)
|
||||||
|
|
||||||
assert handle.startswith("/")
|
assert handle.startswith("/")
|
||||||
url = "http://%s:%d%s" % (self.__kvmd_host, self.__kvmd_port, handle)
|
url = f"http://{self.__kvmd_host}:{self.__kvmd_port}{handle}"
|
||||||
|
|
||||||
credentials = self.__auth_manager.get_credentials(ipmi_session.username.decode())
|
credentials = self.__auth_manager.get_credentials(ipmi_session.username.decode())
|
||||||
logger.info("Performing %r request to %r from user %r (IPMI) as %r (KVMD)",
|
logger.info("Performing %r request to %r from user %r (IPMI) as %r (KVMD)",
|
||||||
@ -169,7 +169,7 @@ class IpmiServer(BaseIpmiServer): # pylint: disable=too-many-instance-attribute
|
|||||||
headers={
|
headers={
|
||||||
"X-KVMD-User": credentials.kvmd_user,
|
"X-KVMD-User": credentials.kvmd_user,
|
||||||
"X-KVMD-Passwd": credentials.kvmd_passwd,
|
"X-KVMD-Passwd": credentials.kvmd_passwd,
|
||||||
"User-Agent": "KVMD-IPMI/%s" % (__version__),
|
"User-Agent": f"KVMD-IPMI/{__version__}",
|
||||||
},
|
},
|
||||||
timeout=self.__kvmd_timeout,
|
timeout=self.__kvmd_timeout,
|
||||||
) as response:
|
) as response:
|
||||||
|
|||||||
@ -156,7 +156,7 @@ def _ioctl_uint32(device_file: IO, request: int) -> int:
|
|||||||
|
|
||||||
def _explore_device(device_path: str) -> _MassStorageDeviceInfo:
|
def _explore_device(device_path: str) -> _MassStorageDeviceInfo:
|
||||||
if not stat.S_ISBLK(os.stat(device_path).st_mode):
|
if not stat.S_ISBLK(os.stat(device_path).st_mode):
|
||||||
raise RuntimeError("Not a block device: %s" % (device_path))
|
raise RuntimeError(f"Not a block device: {device_path}")
|
||||||
|
|
||||||
with open(device_path, "rb") as device_file:
|
with open(device_path, "rb") as device_file:
|
||||||
# size = BLKGETSIZE * BLKSSZGET
|
# size = BLKGETSIZE * BLKSSZGET
|
||||||
|
|||||||
@ -142,7 +142,7 @@ def _json_exception(err: Exception, status: int) -> aiohttp.web.Response:
|
|||||||
async def _get_multipart_field(reader: aiohttp.MultipartReader, name: str) -> aiohttp.BodyPartReader:
|
async def _get_multipart_field(reader: aiohttp.MultipartReader, name: str) -> aiohttp.BodyPartReader:
|
||||||
field = await reader.next()
|
field = await reader.next()
|
||||||
if not field or field.name != name:
|
if not field or field.name != name:
|
||||||
raise ValidatorError("Missing %r field" % (name))
|
raise ValidatorError(f"Missing {name!r} field")
|
||||||
return field
|
return field
|
||||||
|
|
||||||
|
|
||||||
@ -168,7 +168,7 @@ def _exposed(http_method: str, path: str, auth_required: bool=True) -> Callable:
|
|||||||
|
|
||||||
if user:
|
if user:
|
||||||
user = valid_user(user)
|
user = valid_user(user)
|
||||||
setattr(request, _ATTR_KVMD_AUTH_INFO, "%s (xhdr)" % (user))
|
setattr(request, _ATTR_KVMD_AUTH_INFO, f"{user} (xhdr)")
|
||||||
if not (await self._auth_manager.authorize(user, valid_passwd(passwd))):
|
if not (await self._auth_manager.authorize(user, valid_passwd(passwd))):
|
||||||
raise ForbiddenError("Forbidden")
|
raise ForbiddenError("Forbidden")
|
||||||
|
|
||||||
@ -177,7 +177,7 @@ def _exposed(http_method: str, path: str, auth_required: bool=True) -> Callable:
|
|||||||
if not user:
|
if not user:
|
||||||
setattr(request, _ATTR_KVMD_AUTH_INFO, "- (token)")
|
setattr(request, _ATTR_KVMD_AUTH_INFO, "- (token)")
|
||||||
raise ForbiddenError("Forbidden")
|
raise ForbiddenError("Forbidden")
|
||||||
setattr(request, _ATTR_KVMD_AUTH_INFO, "%s (token)" % (user))
|
setattr(request, _ATTR_KVMD_AUTH_INFO, f"{user} (token)")
|
||||||
|
|
||||||
else:
|
else:
|
||||||
raise UnauthorizedError("Unauthorized")
|
raise UnauthorizedError("Unauthorized")
|
||||||
@ -204,7 +204,7 @@ def _system_task(method: Callable) -> Callable:
|
|||||||
async def wrapper(self: "Server") -> None:
|
async def wrapper(self: "Server") -> None:
|
||||||
try:
|
try:
|
||||||
await method(self)
|
await method(self)
|
||||||
raise RuntimeError("Dead system task: %s" % (method))
|
raise RuntimeError(f"Dead system task: {method}")
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
pass
|
pass
|
||||||
except Exception:
|
except Exception:
|
||||||
|
|||||||
@ -121,8 +121,8 @@ class Streamer: # pylint: disable=too-many-instance-attributes
|
|||||||
state = None
|
state = None
|
||||||
try:
|
try:
|
||||||
async with session.get(
|
async with session.get(
|
||||||
url="http://%s:%d/state" % (self.__host, self.__port),
|
url=f"http://{self.__host}:{self.__port}/state",
|
||||||
headers={"User-Agent": "KVMD/%s" % (__version__)},
|
headers={"User-Agent": f"KVMD/{__version__}"},
|
||||||
timeout=self.__timeout,
|
timeout=self.__timeout,
|
||||||
) as response:
|
) as response:
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
|
|||||||
@ -53,7 +53,7 @@ class BasePlugin:
|
|||||||
@functools.lru_cache()
|
@functools.lru_cache()
|
||||||
def get_plugin_class(sub: str, name: str) -> Type[BasePlugin]:
|
def get_plugin_class(sub: str, name: str) -> Type[BasePlugin]:
|
||||||
try:
|
try:
|
||||||
module = importlib.import_module("kvmd.plugins.{}.{}".format(sub, name))
|
module = importlib.import_module(f"kvmd.plugins.{sub}.{name}")
|
||||||
except ModuleNotFoundError:
|
except ModuleNotFoundError:
|
||||||
raise UnknownPluginError("Unknown plugin '%s/%s'" % (sub, name))
|
raise UnknownPluginError(f"Unknown plugin '{sub}/{name}'")
|
||||||
return getattr(module, "Plugin")
|
return getattr(module, "Plugin")
|
||||||
|
|||||||
@ -81,7 +81,7 @@ class Plugin(BaseAuthService):
|
|||||||
"passwd": passwd
|
"passwd": passwd
|
||||||
},
|
},
|
||||||
headers={
|
headers={
|
||||||
"User-Agent": "KVMD/%s" % (__version__),
|
"User-Agent": f"KVMD/{__version__}",
|
||||||
"X-KVMD-User": user,
|
"X-KVMD-User": user,
|
||||||
},
|
},
|
||||||
) as response:
|
) as response:
|
||||||
|
|||||||
@ -40,13 +40,13 @@ class ValidatorError(ValueError):
|
|||||||
def raise_error(arg: Any, name: str, hide: bool=False) -> NoReturn:
|
def raise_error(arg: Any, name: str, hide: bool=False) -> NoReturn:
|
||||||
arg_str = " "
|
arg_str = " "
|
||||||
if not hide:
|
if not hide:
|
||||||
arg_str = (" %r " if isinstance(arg, (str, bytes)) else " '%s' ") % (arg)
|
arg_str = (f" {arg!r} " if isinstance(arg, (str, bytes)) else f" '{arg}' ")
|
||||||
raise ValidatorError("The argument" + arg_str + "is not a valid " + name)
|
raise ValidatorError(f"The argument{arg_str}is not a valid {name}")
|
||||||
|
|
||||||
|
|
||||||
def check_not_none(arg: Any, name: str) -> Any:
|
def check_not_none(arg: Any, name: str) -> Any:
|
||||||
if arg is None:
|
if arg is None:
|
||||||
raise ValidatorError("Empty argument is not a valid %s" % (name))
|
raise ValidatorError(f"Empty argument is not a valid {name}")
|
||||||
return arg
|
return arg
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -40,7 +40,7 @@ def valid_bool(arg: Any) -> bool:
|
|||||||
true_args = ["1", "true", "yes"]
|
true_args = ["1", "true", "yes"]
|
||||||
false_args = ["0", "false", "no"]
|
false_args = ["0", "false", "no"]
|
||||||
|
|
||||||
name = "bool (%r or %r)" % (true_args, false_args)
|
name = f"bool ({true_args!r} or {false_args!r})"
|
||||||
|
|
||||||
arg = check_not_none_string(arg, name).lower()
|
arg = check_not_none_string(arg, name).lower()
|
||||||
arg = check_in_list(arg, name, true_args + false_args)
|
arg = check_in_list(arg, name, true_args + false_args)
|
||||||
@ -64,9 +64,9 @@ def valid_number(
|
|||||||
raise_error(arg, name)
|
raise_error(arg, name)
|
||||||
|
|
||||||
if min is not None and arg < min:
|
if min is not None and arg < min:
|
||||||
raise ValidatorError("The argument '%s' must be %s and greater or equial than %s" % (arg, name, min))
|
raise ValidatorError(f"The argument '{arg}' must be {name} and greater or equial than {min}")
|
||||||
if max is not None and arg > max:
|
if max is not None and arg > max:
|
||||||
raise ValidatorError("The argument '%s' must be %s and lesser or equal then %s" % (arg, name, max))
|
raise ValidatorError(f"The argument '{arg}' must be {name} and lesser or equal then {max}")
|
||||||
return arg
|
return arg
|
||||||
|
|
||||||
|
|
||||||
@ -98,5 +98,5 @@ def valid_string_list(
|
|||||||
try:
|
try:
|
||||||
arg = list(map(subval, arg))
|
arg = list(map(subval, arg))
|
||||||
except Exception:
|
except Exception:
|
||||||
raise ValidatorError("Failed sub-validator on one of the item of %r" % (arg))
|
raise ValidatorError(f"Failed sub-validator on one of the item of {arg!r}")
|
||||||
return arg
|
return arg
|
||||||
|
|||||||
@ -41,9 +41,9 @@ def build_raw_from_options(options: List[str]) -> Dict[str, Any]:
|
|||||||
for option in options:
|
for option in options:
|
||||||
(key, value) = (option.split("=", 1) + [None])[:2] # type: ignore
|
(key, value) = (option.split("=", 1) + [None])[:2] # type: ignore
|
||||||
if len(key.strip()) == 0:
|
if len(key.strip()) == 0:
|
||||||
raise ConfigError("Empty option key (required 'key=value' instead of %r)" % (option))
|
raise ConfigError(f"Empty option key (required 'key=value' instead of {option!r})")
|
||||||
if value is None:
|
if value is None:
|
||||||
raise ConfigError("No value for key %r" % (key))
|
raise ConfigError(f"No value for key {key!r}")
|
||||||
|
|
||||||
section = raw
|
section = raw
|
||||||
subs = list(filter(None, map(str.strip, key.split("/"))))
|
subs = list(filter(None, map(str.strip, key.split("/"))))
|
||||||
@ -61,7 +61,7 @@ def _parse_value(value: str) -> Any:
|
|||||||
and value not in ["true", "false", "null"]
|
and value not in ["true", "false", "null"]
|
||||||
and not value.startswith(("{", "[", "\""))
|
and not value.startswith(("{", "[", "\""))
|
||||||
):
|
):
|
||||||
value = "\"%s\"" % (value)
|
value = f"\"{value}\""
|
||||||
return json.loads(value)
|
return json.loads(value)
|
||||||
|
|
||||||
|
|
||||||
@ -124,13 +124,13 @@ class Option:
|
|||||||
self.help = help
|
self.help = help
|
||||||
|
|
||||||
def __repr__(self) -> str:
|
def __repr__(self) -> str:
|
||||||
return "<Option(default={0.default}, type={0.type}, only_if={0.only_if}, unpack_as={0.unpack_as})>".format(self)
|
return f"<Option(default={self.default}, type={self.type}, only_if={self.only_if}, unpack_as={self.unpack_as})>"
|
||||||
|
|
||||||
|
|
||||||
# =====
|
# =====
|
||||||
def make_config(raw: Dict[str, Any], scheme: Dict[str, Any], _keys: Tuple[str, ...]=()) -> Section:
|
def make_config(raw: Dict[str, Any], scheme: Dict[str, Any], _keys: Tuple[str, ...]=()) -> Section:
|
||||||
if not isinstance(raw, dict):
|
if not isinstance(raw, dict):
|
||||||
raise ConfigError("The node %r must be a dictionary" % ("/".join(_keys) or "/"))
|
raise ConfigError(f"The node {('/'.join(_keys) or '/')!r} must be a dictionary")
|
||||||
|
|
||||||
config = Section()
|
config = Section()
|
||||||
|
|
||||||
@ -150,7 +150,7 @@ def make_config(raw: Dict[str, Any], scheme: Dict[str, Any], _keys: Tuple[str, .
|
|||||||
|
|
||||||
if only_if and no_only_if: # pylint: disable=no-else-raise
|
if only_if and no_only_if: # pylint: disable=no-else-raise
|
||||||
# Перекрестный only_if запрещен
|
# Перекрестный only_if запрещен
|
||||||
raise RuntimeError("Found only_if recursuon on key %r" % (make_full_name(key)))
|
raise RuntimeError(f"Found only_if recursuon on key {make_full_name(key)!r}")
|
||||||
elif only_if and (
|
elif only_if and (
|
||||||
(not only_if_negative and not process_option(only_if, no_only_if=True))
|
(not only_if_negative and not process_option(only_if, no_only_if=True))
|
||||||
or (only_if_negative and process_option(only_if, no_only_if=True))
|
or (only_if_negative and process_option(only_if, no_only_if=True))
|
||||||
@ -162,7 +162,7 @@ def make_config(raw: Dict[str, Any], scheme: Dict[str, Any], _keys: Tuple[str, .
|
|||||||
try:
|
try:
|
||||||
value = option.type(value)
|
value = option.type(value)
|
||||||
except ValueError as err:
|
except ValueError as err:
|
||||||
raise ConfigError("Invalid value %r for key %r: %s" % (value, make_full_name(key), str(err)))
|
raise ConfigError(f"Invalid value {value!r} for key {make_full_name(key)!r}: {err}")
|
||||||
|
|
||||||
config[key] = value
|
config[key] = value
|
||||||
config._set_meta( # pylint: disable=protected-access
|
config._set_meta( # pylint: disable=protected-access
|
||||||
@ -179,6 +179,6 @@ def make_config(raw: Dict[str, Any], scheme: Dict[str, Any], _keys: Tuple[str, .
|
|||||||
elif isinstance(scheme[key], dict):
|
elif isinstance(scheme[key], dict):
|
||||||
config[key] = make_config(raw.get(key, {}), scheme[key], make_full_key(key))
|
config[key] = make_config(raw.get(key, {}), scheme[key], make_full_key(key))
|
||||||
else:
|
else:
|
||||||
raise RuntimeError("Incorrect scheme definition for key %r:"
|
raise RuntimeError(f"Incorrect scheme definition for key {make_full_name(key)!r}:"
|
||||||
" the value is %r, not dict() or Option()" % (make_full_name(key), type(scheme[key])))
|
f" the value is {type(scheme[key])!r}, not dict() or Option()")
|
||||||
return config
|
return config
|
||||||
|
|||||||
@ -39,7 +39,8 @@ def make_config_dump(config: Section, indent: int=4) -> str:
|
|||||||
def _inner_make_dump(config: Section, indent: int, _level: int=0) -> Generator[str, None, None]:
|
def _inner_make_dump(config: Section, indent: int, _level: int=0) -> Generator[str, None, None]:
|
||||||
for (key, value) in sorted(config.items(), key=operator.itemgetter(0)):
|
for (key, value) in sorted(config.items(), key=operator.itemgetter(0)):
|
||||||
if isinstance(value, Section):
|
if isinstance(value, Section):
|
||||||
yield "%s%s:" % (" " * indent * _level, key)
|
prefix = " " * indent * _level
|
||||||
|
yield f"{prefix}{key}:"
|
||||||
yield from _inner_make_dump(value, indent, _level + 1)
|
yield from _inner_make_dump(value, indent, _level + 1)
|
||||||
yield ""
|
yield ""
|
||||||
else:
|
else:
|
||||||
@ -66,7 +67,7 @@ def _make_yaml_kv(key: str, value: Any, indent: int, level: int, comment: str=""
|
|||||||
prefix = " " * indent * level
|
prefix = " " * indent * level
|
||||||
if commented:
|
if commented:
|
||||||
prefix = prefix + "# "
|
prefix = prefix + "# "
|
||||||
text = textwrap.indent("%s:%s" % (key, text), prefix=prefix)
|
text = textwrap.indent(f"{key}:{text}", prefix=prefix)
|
||||||
|
|
||||||
if comment:
|
if comment:
|
||||||
lines = text.split("\n")
|
lines = text.split("\n")
|
||||||
|
|||||||
@ -36,7 +36,7 @@ def load_yaml_file(path: str) -> Any:
|
|||||||
return yaml.load(yaml_file, _YamlLoader)
|
return yaml.load(yaml_file, _YamlLoader)
|
||||||
except Exception:
|
except Exception:
|
||||||
# Reraise internal exception as standard ValueError and show the incorrect file
|
# Reraise internal exception as standard ValueError and show the incorrect file
|
||||||
raise ValueError("Incorrect YAML syntax in file %r" % (path))
|
raise ValueError(f"Incorrect YAML syntax in file {path!r}")
|
||||||
|
|
||||||
|
|
||||||
class _YamlLoader(yaml.SafeLoader):
|
class _YamlLoader(yaml.SafeLoader):
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user