mirror of
https://github.com/mofeng-git/One-KVM.git
synced 2026-01-29 00:51:53 +08:00
validators, tests
This commit is contained in:
@@ -30,15 +30,20 @@ from typing import Optional
|
||||
from typing import Any
|
||||
|
||||
|
||||
# =====
|
||||
class ConfigError(ValueError):
|
||||
pass
|
||||
|
||||
|
||||
# =====
|
||||
def build_raw_from_options(options: List[str]) -> Dict[str, Any]:
|
||||
raw: Dict[str, Any] = {}
|
||||
for option in options:
|
||||
(key, value) = (option.split("=", 1) + [None])[:2] # type: ignore
|
||||
if len(key.strip()) == 0:
|
||||
raise ValueError("Empty option key (required 'key=value' instead of '{}')".format(option))
|
||||
raise ConfigError("Empty option key (required 'key=value' instead of %r)" % (option))
|
||||
if value is None:
|
||||
raise ValueError("No value for key '{}'".format(key))
|
||||
raise ConfigError("No value for key %r" % (key))
|
||||
|
||||
section = raw
|
||||
subs = list(map(str.strip, key.split("/")))
|
||||
@@ -56,7 +61,7 @@ def _parse_value(value: str) -> Any:
|
||||
and value not in ["true", "false", "null"]
|
||||
and not value.startswith(("{", "[", "\""))
|
||||
):
|
||||
value = "\"{}\"".format(value)
|
||||
value = "\"%s\"" % (value)
|
||||
return json.loads(value)
|
||||
|
||||
|
||||
@@ -66,33 +71,33 @@ class Section(dict):
|
||||
dict.__init__(self)
|
||||
self.__meta: Dict[str, Dict[str, Any]] = {}
|
||||
|
||||
def _unpack_renamed(self, _section: Optional["Section"]=None) -> Dict[str, Any]:
|
||||
def _unpack(self, _section: Optional["Section"]=None) -> Dict[str, Any]:
|
||||
if _section is None:
|
||||
_section = self
|
||||
unpacked: Dict[str, Any] = {}
|
||||
for (key, value) in _section.items():
|
||||
if isinstance(value, Section):
|
||||
unpacked[key] = value._unpack_renamed() # pylint: disable=protected-access
|
||||
unpacked[key] = value._unpack() # pylint: disable=protected-access
|
||||
else: # Option
|
||||
unpacked[_section._get_rename(key)] = value # pylint: disable=protected-access
|
||||
unpacked[_section._get_unpack_as(key)] = value # pylint: disable=protected-access
|
||||
return unpacked
|
||||
|
||||
def _set_meta(self, key: str, default: Any, help: str, rename: str) -> None: # pylint: disable=redefined-builtin
|
||||
def _set_meta(self, key: str, default: Any, unpack_as: str, help: str) -> None: # pylint: disable=redefined-builtin
|
||||
self.__meta[key] = {
|
||||
"default": default,
|
||||
"help": help,
|
||||
"rename": rename,
|
||||
"unpack_as": unpack_as,
|
||||
"help": help,
|
||||
}
|
||||
|
||||
def _get_default(self, key: str) -> Any:
|
||||
return self.__meta[key]["default"]
|
||||
|
||||
def _get_unpack_as(self, key: str) -> str:
|
||||
return (self.__meta[key]["unpack_as"] or key)
|
||||
|
||||
def _get_help(self, key: str) -> str:
|
||||
return self.__meta[key]["help"]
|
||||
|
||||
def _get_rename(self, key: str) -> str:
|
||||
return (self.__meta[key]["rename"] or key)
|
||||
|
||||
def __getattribute__(self, key: str) -> Any:
|
||||
if key in self:
|
||||
return self[key]
|
||||
@@ -106,46 +111,74 @@ class Option:
|
||||
def __init__(
|
||||
self,
|
||||
default: Any,
|
||||
help: str="", # pylint: disable=redefined-builtin
|
||||
type: Optional[Callable[[Any], Any]]=None, # pylint: disable=redefined-builtin
|
||||
rename: str="",
|
||||
only_if: str="",
|
||||
unpack_as: str="",
|
||||
help: str="", # pylint: disable=redefined-builtin
|
||||
) -> None:
|
||||
|
||||
self.default = default
|
||||
self.help = help
|
||||
self.type: Callable[[Any], Any] = (type or (self.__type(default) if default is not None else str)) # type: ignore
|
||||
self.rename = rename
|
||||
self.only_if = only_if
|
||||
self.unpack_as = unpack_as
|
||||
self.help = help
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return "<Option(default={self.default}, type={self.type}, help={self.help}, rename={self.rename})>".format(self=self)
|
||||
return "<Option(default={0.default}, type={0.type}, only_if={0.only_if}, unpack_as={0.unpack_as})>".format(self)
|
||||
|
||||
|
||||
# =====
|
||||
def make_config(raw: Dict[str, Any], scheme: Dict[str, Any], _keys: Tuple[str, ...]=()) -> Section:
|
||||
if not isinstance(raw, dict):
|
||||
raise ValueError("The node '{}' must be a dictionary".format("/".join(_keys) or "/"))
|
||||
raise ConfigError("The node %r must be a dictionary" % ("/".join(_keys) or "/"))
|
||||
|
||||
config = Section()
|
||||
for (key, option) in scheme.items():
|
||||
full_key = _keys + (key,)
|
||||
full_name = "/".join(full_key)
|
||||
|
||||
if isinstance(option, Option):
|
||||
value = raw.get(key, option.default)
|
||||
try:
|
||||
value = option.type(value)
|
||||
except Exception:
|
||||
raise ValueError("Invalid value '{value}' for key '{key}'".format(key=full_name, value=value))
|
||||
def make_full_key(key: str) -> Tuple[str, ...]:
|
||||
return _keys + (key,)
|
||||
|
||||
def make_full_name(key: str) -> str:
|
||||
return "/".join(make_full_key(key))
|
||||
|
||||
def process_option(key: str, no_only_if: bool=False) -> Any:
|
||||
if key not in config:
|
||||
option: Option = scheme[key]
|
||||
only_if = option.only_if
|
||||
only_if_negative = option.only_if.startswith("!")
|
||||
if only_if_negative:
|
||||
only_if = only_if[1:]
|
||||
|
||||
if only_if and no_only_if: # pylint: disable=no-else-raise
|
||||
# Перекрестный only_if запрещен
|
||||
raise RuntimeError("Found only_if recursuon on key %r" % (make_full_name(key)))
|
||||
elif only_if and (
|
||||
(not only_if_negative and not process_option(only_if, no_only_if=True))
|
||||
or (only_if_negative and process_option(only_if, no_only_if=True))
|
||||
):
|
||||
# Если есть условие и оно ложно - ставим дефолт и не валидируем
|
||||
value = option.default
|
||||
else:
|
||||
value = raw.get(key, option.default)
|
||||
try:
|
||||
value = option.type(value)
|
||||
except ValueError as err:
|
||||
raise ConfigError("Invalid value %r for key %r: %s" % (value, make_full_name(key), str(err)))
|
||||
|
||||
config[key] = value
|
||||
config._set_meta( # pylint: disable=protected-access
|
||||
key=key,
|
||||
default=option.default,
|
||||
unpack_as=option.unpack_as,
|
||||
help=option.help,
|
||||
rename=option.rename,
|
||||
)
|
||||
elif isinstance(option, dict):
|
||||
config[key] = make_config(raw.get(key, {}), option, full_key)
|
||||
return config[key]
|
||||
|
||||
for key in scheme:
|
||||
if isinstance(scheme[key], Option):
|
||||
process_option(key)
|
||||
elif isinstance(scheme[key], dict):
|
||||
config[key] = make_config(raw.get(key, {}), scheme[key], make_full_key(key))
|
||||
else:
|
||||
raise RuntimeError("Incorrect scheme definition for key '{}':"
|
||||
" the value is {}, not dict or Option()".format(full_name, type(option)))
|
||||
raise RuntimeError("Incorrect scheme definition for key %r:"
|
||||
" the value is %r, not dict() or Option()" % (make_full_name(key), type(scheme[key])))
|
||||
return config
|
||||
|
||||
@@ -44,17 +44,17 @@ def _inner_make_dump(config: Section, _level: int=0) -> List[str]:
|
||||
for (key, value) in sorted(config.items(), key=operator.itemgetter(0)):
|
||||
indent = " " * _INDENT * _level
|
||||
if isinstance(value, Section):
|
||||
lines.append("{}{}:".format(indent, key))
|
||||
lines.append("%s%s:" % (indent, key))
|
||||
lines += _inner_make_dump(value, _level + 1)
|
||||
lines.append("")
|
||||
else:
|
||||
default = config._get_default(key) # pylint: disable=protected-access
|
||||
comment = config._get_help(key) # pylint: disable=protected-access
|
||||
if default == value:
|
||||
lines.append("{}{}: {} # {}".format(indent, key, _make_yaml(value, _level), comment))
|
||||
lines.append("%s%s: %s # %s" % (indent, key, _make_yaml(value, _level), comment))
|
||||
else:
|
||||
lines.append("{}# {}: {} # {}".format(indent, key, _make_yaml(default, _level), comment))
|
||||
lines.append("{}{}: {}".format(indent, key, _make_yaml(value, _level)))
|
||||
lines.append("%s# %s: %s # %s" % (indent, key, _make_yaml(default, _level), comment))
|
||||
lines.append("%s%s: %s" % (indent, key, _make_yaml(value, _level)))
|
||||
return lines
|
||||
|
||||
|
||||
|
||||
@@ -26,7 +26,6 @@ from typing import IO
|
||||
from typing import Any
|
||||
|
||||
import yaml
|
||||
import yaml.loader
|
||||
import yaml.nodes
|
||||
|
||||
|
||||
@@ -37,17 +36,17 @@ def load_yaml_file(path: str) -> Any:
|
||||
return yaml.load(yaml_file, _YamlLoader)
|
||||
except Exception:
|
||||
# Reraise internal exception as standard ValueError and show the incorrect file
|
||||
raise ValueError("Incorrect YAML syntax in file '{}'".format(path))
|
||||
raise ValueError("Incorrect YAML syntax in file %r" % (path))
|
||||
|
||||
|
||||
class _YamlLoader(yaml.loader.Loader): # pylint: disable=too-many-ancestors
|
||||
class _YamlLoader(yaml.SafeLoader):
|
||||
def __init__(self, yaml_file: IO) -> None:
|
||||
yaml.loader.Loader.__init__(self, yaml_file)
|
||||
super().__init__(yaml_file)
|
||||
self.__root = os.path.dirname(yaml_file.name)
|
||||
|
||||
def include(self, node: yaml.nodes.Node) -> str:
|
||||
path = os.path.join(self.__root, self.construct_scalar(node)) # pylint: disable=no-member
|
||||
path = os.path.join(self.__root, self.construct_scalar(node))
|
||||
return load_yaml_file(path)
|
||||
|
||||
|
||||
_YamlLoader.add_constructor("!include", _YamlLoader.include) # pylint: disable=no-member
|
||||
_YamlLoader.add_constructor("!include", _YamlLoader.include)
|
||||
|
||||
Reference in New Issue
Block a user