refactoring

This commit is contained in:
Maxim Devaev 2024-09-18 04:37:43 +03:00
parent 45270a09d7
commit 7a53f14456
83 changed files with 517 additions and 516 deletions

View File

@ -83,9 +83,9 @@ class AioReader: # pylint: disable=too-many-instance-attributes
self.__path,
consumer=self.__consumer,
config={tuple(pins): gpiod.LineSettings(edge_detection=gpiod.line.Edge.BOTH)},
) as line_request:
) as line_req:
line_request.wait_edge_events(0.1)
line_req.wait_edge_events(0.1)
self.__values = {
pin: _DebouncedValue(
initial=bool(value.value),
@ -93,14 +93,14 @@ class AioReader: # pylint: disable=too-many-instance-attributes
notifier=self.__notifier,
loop=self.__loop,
)
for (pin, value) in zip(pins, line_request.get_values(pins))
for (pin, value) in zip(pins, line_req.get_values(pins))
}
self.__loop.call_soon_threadsafe(self.__notifier.notify)
while not self.__stop_event.is_set():
if line_request.wait_edge_events(1):
if line_req.wait_edge_events(1):
new: dict[int, bool] = {}
for event in line_request.read_edge_events():
for event in line_req.read_edge_events():
(pin, value) = self.__parse_event(event)
new[pin] = value
for (pin, value) in new.items():
@ -110,7 +110,7 @@ class AioReader: # pylint: disable=too-many-instance-attributes
# Размер буфера ядра - 16 эвентов на линии. При превышении этого числа,
# новые эвенты потеряются. Это не баг, это фича, как мне объяснили в LKML.
# Штош. Будем с этим жить и синхронизировать состояния при таймауте.
for (pin, value) in zip(pins, line_request.get_values(pins)):
for (pin, value) in zip(pins, line_req.get_values(pins)):
self.__values[pin].set(bool(value.value)) # type: ignore
def __parse_event(self, event: gpiod.EdgeEvent) -> tuple[int, bool]:

View File

@ -42,7 +42,7 @@ async def remount(name: str, base_cmd: list[str], rw: bool) -> bool:
if proc.returncode != 0:
assert proc.returncode is not None
raise subprocess.CalledProcessError(proc.returncode, cmd)
except Exception as err:
logger.error("Can't remount %s storage: %s", name, tools.efmt(err))
except Exception as ex:
logger.error("Can't remount %s storage: %s", name, tools.efmt(ex))
return False
return True

View File

@ -112,9 +112,9 @@ def shield_fg(aw: Awaitable): # type: ignore
if inner.cancelled():
outer.forced_cancel()
else:
err = inner.exception()
if err is not None:
outer.set_exception(err)
ex = inner.exception()
if ex is not None:
outer.set_exception(ex)
else:
outer.set_result(inner.result())

View File

@ -171,8 +171,8 @@ def _init_config(config_path: str, override_options: list[str], **load_flags: bo
config_path = os.path.expanduser(config_path)
try:
raw_config: dict = load_yaml_file(config_path)
except Exception as err:
raise SystemExit(f"ConfigError: Can't read config file {config_path!r}:\n{tools.efmt(err)}")
except Exception as ex:
raise SystemExit(f"ConfigError: Can't read config file {config_path!r}:\n{tools.efmt(ex)}")
if not isinstance(raw_config, dict):
raise SystemExit(f"ConfigError: Top-level of the file {config_path!r} must be a dictionary")
@ -187,8 +187,8 @@ def _init_config(config_path: str, override_options: list[str], **load_flags: bo
config = make_config(raw_config, scheme)
return config
except (ConfigError, UnknownPluginError) as err:
raise SystemExit(f"ConfigError: {err}")
except (ConfigError, UnknownPluginError) as ex:
raise SystemExit(f"ConfigError: {ex}")
def _patch_raw(raw_config: dict) -> None: # pylint: disable=too-many-branches

View File

@ -402,5 +402,5 @@ def main(argv: (list[str] | None)=None) -> None: # pylint: disable=too-many-bra
f"--set-edid=file={orig_edid_path}",
"--info-edid",
], stdout=sys.stderr, check=True)
except subprocess.CalledProcessError as err:
raise SystemExit(str(err))
except subprocess.CalledProcessError as ex:
raise SystemExit(str(ex))

View File

@ -155,5 +155,5 @@ def main(argv: (list[str] | None)=None) -> None:
options = parser.parse_args(argv[1:])
try:
options.cmd(config, options)
except ValidatorError as err:
raise SystemExit(str(err))
except ValidatorError as ex:
raise SystemExit(str(ex))

View File

@ -101,6 +101,7 @@ class IpmiServer(BaseIpmiServer): # pylint: disable=too-many-instance-attribute
# =====
def handle_raw_request(self, request: dict, session: IpmiServerSession) -> None:
# Parameter 'request' has been renamed to 'req' in overriding method
handler = {
(6, 1): (lambda _, session: self.send_device_id(session)), # Get device ID
(6, 7): self.__get_power_state_handler, # Power state
@ -145,13 +146,13 @@ class IpmiServer(BaseIpmiServer): # pylint: disable=too-many-instance-attribute
data = [int(result["leds"]["power"]), 0, 0]
session.send_ipmi_response(data=data)
def __chassis_control_handler(self, request: dict, session: IpmiServerSession) -> None:
def __chassis_control_handler(self, req: dict, session: IpmiServerSession) -> None:
action = {
0: "off_hard",
1: "on",
3: "reset_hard",
5: "off",
}.get(request["data"][0], "")
}.get(req["data"][0], "")
if action:
if not self.__make_request(session, f"atx.switch_power({action})", "atx.switch_power", action=action):
code = 0xC0 # Try again later
@ -171,8 +172,8 @@ class IpmiServer(BaseIpmiServer): # pylint: disable=too-many-instance-attribute
async with self.__kvmd.make_session(credentials.kvmd_user, credentials.kvmd_passwd) as kvmd_session:
func = functools.reduce(getattr, func_path.split("."), kvmd_session)
return (await func(**kwargs))
except (aiohttp.ClientError, asyncio.TimeoutError) as err:
logger.error("[%s]: Can't perform request %s: %s", session.sockaddr[0], name, err)
except (aiohttp.ClientError, asyncio.TimeoutError) as ex:
logger.error("[%s]: Can't perform request %s: %s", session.sockaddr[0], name, ex)
raise
return aiotools.run_sync(runner())

View File

@ -113,8 +113,8 @@ class JanusRunner: # pylint: disable=too-many-instance-attributes
for proto in [socket.AF_INET, socket.AF_INET6]:
if proto in addrs:
return addrs[proto][0]["addr"]
except Exception as err:
get_logger().error("Can't get default IP: %s", tools.efmt(err))
except Exception as ex:
get_logger().error("Can't get default IP: %s", tools.efmt(ex))
return ""
# =====

View File

@ -93,8 +93,8 @@ class Stun:
self.__sock.bind(src_addr)
(nat_type, resp) = await self.__get_nat_type(src_ip)
ext_ip = (resp.ext.ip if resp.ext is not None else "")
except Exception as err:
get_logger(0).error("Can't get STUN info: %s", tools.efmt(err))
except Exception as ex:
get_logger(0).error("Can't get STUN info: %s", tools.efmt(ex))
finally:
self.__sock = None
@ -201,16 +201,16 @@ class Stun:
try:
await aiotools.run_async(self.__sock.sendto, req, addr)
except Exception as err:
return (b"", f"Send error: {tools.efmt(err)}")
except Exception as ex:
return (b"", f"Send error: {tools.efmt(ex)}")
try:
resp = (await aiotools.run_async(self.__sock.recvfrom, 2048))[0]
except Exception as err:
return (b"", f"Recv error: {tools.efmt(err)}")
except Exception as ex:
return (b"", f"Recv error: {tools.efmt(ex)}")
(response_type, payload_len) = struct.unpack(">HH", resp[:4])
if response_type != 0x0101:
return (b"", f"Invalid response type: {response_type:#06x}")
(resp_type, payload_len) = struct.unpack(">HH", resp[:4])
if resp_type != 0x0101:
return (b"", f"Invalid response type: {resp_type:#06x}")
if trans_id != resp[4:20]:
return (b"", "Transaction ID mismatch")

View File

@ -45,9 +45,9 @@ class AtxApi:
return make_json_response(await self.__atx.get_state())
@exposed_http("POST", "/atx/power")
async def __power_handler(self, request: Request) -> Response:
action = valid_atx_power_action(request.query.get("action"))
wait = valid_bool(request.query.get("wait", False))
async def __power_handler(self, req: Request) -> Response:
action = valid_atx_power_action(req.query.get("action"))
wait = valid_bool(req.query.get("wait", False))
await ({
"on": self.__atx.power_on,
"off": self.__atx.power_off,
@ -57,9 +57,9 @@ class AtxApi:
return make_json_response()
@exposed_http("POST", "/atx/click")
async def __click_handler(self, request: Request) -> Response:
button = valid_atx_button(request.query.get("button"))
wait = valid_bool(request.query.get("wait", False))
async def __click_handler(self, req: Request) -> Response:
button = valid_atx_button(req.query.get("button"))
wait = valid_bool(req.query.get("wait", False))
await ({
"power": self.__atx.click_power,
"power_long": self.__atx.click_power_long,

View File

@ -43,34 +43,34 @@ from ..auth import AuthManager
_COOKIE_AUTH_TOKEN = "auth_token"
async def check_request_auth(auth_manager: AuthManager, exposed: HttpExposed, request: Request) -> None:
async def check_request_auth(auth_manager: AuthManager, exposed: HttpExposed, req: Request) -> None:
if auth_manager.is_auth_required(exposed):
user = request.headers.get("X-KVMD-User", "")
user = req.headers.get("X-KVMD-User", "")
if user:
user = valid_user(user)
passwd = request.headers.get("X-KVMD-Passwd", "")
set_request_auth_info(request, f"{user} (xhdr)")
passwd = req.headers.get("X-KVMD-Passwd", "")
set_request_auth_info(req, f"{user} (xhdr)")
if not (await auth_manager.authorize(user, valid_passwd(passwd))):
raise ForbiddenError()
return
token = request.cookies.get(_COOKIE_AUTH_TOKEN, "")
token = req.cookies.get(_COOKIE_AUTH_TOKEN, "")
if token:
user = auth_manager.check(valid_auth_token(token)) # type: ignore
if not user:
set_request_auth_info(request, "- (token)")
set_request_auth_info(req, "- (token)")
raise ForbiddenError()
set_request_auth_info(request, f"{user} (token)")
set_request_auth_info(req, f"{user} (token)")
return
basic_auth = request.headers.get("Authorization", "")
basic_auth = req.headers.get("Authorization", "")
if basic_auth and basic_auth[:6].lower() == "basic ":
try:
(user, passwd) = base64.b64decode(basic_auth[6:]).decode("utf-8").split(":")
except Exception:
raise UnauthorizedError()
user = valid_user(user)
set_request_auth_info(request, f"{user} (basic)")
set_request_auth_info(req, f"{user} (basic)")
if not (await auth_manager.authorize(user, valid_passwd(passwd))):
raise ForbiddenError()
return
@ -85,9 +85,9 @@ class AuthApi:
# =====
@exposed_http("POST", "/auth/login", auth_required=False)
async def __login_handler(self, request: Request) -> Response:
async def __login_handler(self, req: Request) -> Response:
if self.__auth_manager.is_auth_enabled():
credentials = await request.post()
credentials = await req.post()
token = await self.__auth_manager.login(
user=valid_user(credentials.get("user", "")),
passwd=valid_passwd(credentials.get("passwd", "")),
@ -98,9 +98,9 @@ class AuthApi:
return make_json_response()
@exposed_http("POST", "/auth/logout")
async def __logout_handler(self, request: Request) -> Response:
async def __logout_handler(self, req: Request) -> Response:
if self.__auth_manager.is_auth_enabled():
token = valid_auth_token(request.cookies.get(_COOKIE_AUTH_TOKEN, ""))
token = valid_auth_token(req.cookies.get(_COOKIE_AUTH_TOKEN, ""))
self.__auth_manager.logout(token)
return make_json_response()

View File

@ -85,22 +85,22 @@ class HidApi:
return make_json_response(await self.__hid.get_state())
@exposed_http("POST", "/hid/set_params")
async def __set_params_handler(self, request: Request) -> Response:
async def __set_params_handler(self, req: Request) -> Response:
params = {
key: validator(request.query.get(key))
key: validator(req.query.get(key))
for (key, validator) in [
("keyboard_output", valid_hid_keyboard_output),
("mouse_output", valid_hid_mouse_output),
("jiggler", valid_bool),
]
if request.query.get(key) is not None
if req.query.get(key) is not None
}
self.__hid.set_params(**params) # type: ignore
return make_json_response()
@exposed_http("POST", "/hid/set_connected")
async def __set_connected_handler(self, request: Request) -> Response:
self.__hid.set_connected(valid_bool(request.query.get("connected")))
async def __set_connected_handler(self, req: Request) -> Response:
self.__hid.set_connected(valid_bool(req.query.get("connected")))
return make_json_response()
@exposed_http("POST", "/hid/reset")
@ -128,12 +128,12 @@ class HidApi:
return make_json_response(await self.get_keymaps())
@exposed_http("POST", "/hid/print")
async def __print_handler(self, request: Request) -> Response:
text = await request.text()
limit = int(valid_int_f0(request.query.get("limit", 1024)))
async def __print_handler(self, req: Request) -> Response:
text = await req.text()
limit = int(valid_int_f0(req.query.get("limit", 1024)))
if limit > 0:
text = text[:limit]
symmap = self.__ensure_symmap(request.query.get("keymap", self.__default_keymap_name))
symmap = self.__ensure_symmap(req.query.get("keymap", self.__default_keymap_name))
self.__hid.send_key_events(text_to_web_keys(text, symmap))
return make_json_response()
@ -257,21 +257,21 @@ class HidApi:
# =====
@exposed_http("POST", "/hid/events/send_key")
async def __events_send_key_handler(self, request: Request) -> Response:
key = valid_hid_key(request.query.get("key"))
async def __events_send_key_handler(self, req: Request) -> Response:
key = valid_hid_key(req.query.get("key"))
if key not in self.__ignore_keys:
if "state" in request.query:
state = valid_bool(request.query["state"])
if "state" in req.query:
state = valid_bool(req.query["state"])
self.__hid.send_key_events([(key, state)])
else:
self.__hid.send_key_events([(key, True), (key, False)])
return make_json_response()
@exposed_http("POST", "/hid/events/send_mouse_button")
async def __events_send_mouse_button_handler(self, request: Request) -> Response:
button = valid_hid_mouse_button(request.query.get("button"))
if "state" in request.query:
state = valid_bool(request.query["state"])
async def __events_send_mouse_button_handler(self, req: Request) -> Response:
button = valid_hid_mouse_button(req.query.get("button"))
if "state" in req.query:
state = valid_bool(req.query["state"])
self.__hid.send_mouse_button_event(button, state)
else:
self.__hid.send_mouse_button_event(button, True)
@ -279,23 +279,23 @@ class HidApi:
return make_json_response()
@exposed_http("POST", "/hid/events/send_mouse_move")
async def __events_send_mouse_move_handler(self, request: Request) -> Response:
to_x = valid_hid_mouse_move(request.query.get("to_x"))
to_y = valid_hid_mouse_move(request.query.get("to_y"))
async def __events_send_mouse_move_handler(self, req: Request) -> Response:
to_x = valid_hid_mouse_move(req.query.get("to_x"))
to_y = valid_hid_mouse_move(req.query.get("to_y"))
self.__send_mouse_move_event(to_x, to_y)
return make_json_response()
@exposed_http("POST", "/hid/events/send_mouse_relative")
async def __events_send_mouse_relative_handler(self, request: Request) -> Response:
return self.__process_http_delta_event(request, self.__hid.send_mouse_relative_event)
async def __events_send_mouse_relative_handler(self, req: Request) -> Response:
return self.__process_http_delta_event(req, self.__hid.send_mouse_relative_event)
@exposed_http("POST", "/hid/events/send_mouse_wheel")
async def __events_send_mouse_wheel_handler(self, request: Request) -> Response:
return self.__process_http_delta_event(request, self.__hid.send_mouse_wheel_event)
async def __events_send_mouse_wheel_handler(self, req: Request) -> Response:
return self.__process_http_delta_event(req, self.__hid.send_mouse_wheel_event)
def __process_http_delta_event(self, request: Request, handler: Callable[[int, int], None]) -> Response:
delta_x = valid_hid_mouse_delta(request.query.get("delta_x"))
delta_y = valid_hid_mouse_delta(request.query.get("delta_y"))
def __process_http_delta_event(self, req: Request, handler: Callable[[int, int], None]) -> Response:
delta_x = valid_hid_mouse_delta(req.query.get("delta_x"))
delta_y = valid_hid_mouse_delta(req.query.get("delta_y"))
handler(delta_x, delta_y)
return make_json_response()

View File

@ -41,17 +41,17 @@ class InfoApi:
# =====
@exposed_http("GET", "/info")
async def __common_state_handler(self, request: Request) -> Response:
fields = self.__valid_info_fields(request)
async def __common_state_handler(self, req: Request) -> Response:
fields = self.__valid_info_fields(req)
results = dict(zip(fields, await asyncio.gather(*[
self.__info_manager.get_submanager(field).get_state()
for field in fields
])))
return make_json_response(results)
def __valid_info_fields(self, request: Request) -> list[str]:
def __valid_info_fields(self, req: Request) -> list[str]:
subs = self.__info_manager.get_subs()
return sorted(valid_info_fields(
arg=request.query.get("fields", ",".join(subs)),
arg=req.query.get("fields", ",".join(subs)),
variants=subs,
) or subs)

View File

@ -47,12 +47,12 @@ class LogApi:
# =====
@exposed_http("GET", "/log")
async def __log_handler(self, request: Request) -> StreamResponse:
async def __log_handler(self, req: Request) -> StreamResponse:
if self.__log_reader is None:
raise LogReaderDisabledError()
seek = valid_log_seek(request.query.get("seek", 0))
follow = valid_bool(request.query.get("follow", False))
response = await start_streaming(request, "text/plain")
seek = valid_log_seek(req.query.get("seek", 0))
follow = valid_bool(req.query.get("follow", False))
response = await start_streaming(req, "text/plain")
async for record in self.__log_reader.poll_log(seek, follow):
await response.write(("[%s %s] --- %s" % (
record["dt"].strftime("%Y-%m-%d %H:%M:%S"),

View File

@ -66,29 +66,29 @@ class MsdApi:
return make_json_response(await self.__msd.get_state())
@exposed_http("POST", "/msd/set_params")
async def __set_params_handler(self, request: Request) -> Response:
async def __set_params_handler(self, req: Request) -> Response:
params = {
key: validator(request.query.get(param))
key: validator(req.query.get(param))
for (param, key, validator) in [
("image", "name", (lambda arg: str(arg).strip() and valid_msd_image_name(arg))),
("cdrom", "cdrom", valid_bool),
("rw", "rw", valid_bool),
]
if request.query.get(param) is not None
if req.query.get(param) is not None
}
await self.__msd.set_params(**params) # type: ignore
return make_json_response()
@exposed_http("POST", "/msd/set_connected")
async def __set_connected_handler(self, request: Request) -> Response:
await self.__msd.set_connected(valid_bool(request.query.get("connected")))
async def __set_connected_handler(self, req: Request) -> Response:
await self.__msd.set_connected(valid_bool(req.query.get("connected")))
return make_json_response()
# =====
@exposed_http("GET", "/msd/read")
async def __read_handler(self, request: Request) -> StreamResponse:
name = valid_msd_image_name(request.query.get("image"))
async def __read_handler(self, req: Request) -> StreamResponse:
name = valid_msd_image_name(req.query.get("image"))
compressors = {
"": ("", None),
"none": ("", None),
@ -96,7 +96,7 @@ class MsdApi:
"zstd": (".zst", (lambda: zstandard.ZstdCompressor().compressobj())), # pylint: disable=unnecessary-lambda
}
(suffix, make_compressor) = compressors[check_string_in_list(
arg=request.query.get("compress", ""),
arg=req.query.get("compress", ""),
name="Compression mode",
variants=set(compressors),
)]
@ -127,7 +127,7 @@ class MsdApi:
src = compressed()
size = -1
response = await start_streaming(request, "application/octet-stream", size, name + suffix)
response = await start_streaming(req, "application/octet-stream", size, name + suffix)
async for chunk in src:
await response.write(chunk)
return response
@ -135,28 +135,28 @@ class MsdApi:
# =====
@exposed_http("POST", "/msd/write")
async def __write_handler(self, request: Request) -> Response:
unsafe_prefix = request.query.get("prefix", "") + "/"
name = valid_msd_image_name(unsafe_prefix + request.query.get("image", ""))
size = valid_int_f0(request.content_length)
remove_incomplete = self.__get_remove_incomplete(request)
async def __write_handler(self, req: Request) -> Response:
unsafe_prefix = req.query.get("prefix", "") + "/"
name = valid_msd_image_name(unsafe_prefix + req.query.get("image", ""))
size = valid_int_f0(req.content_length)
remove_incomplete = self.__get_remove_incomplete(req)
written = 0
async with self.__msd.write_image(name, size, remove_incomplete) as writer:
chunk_size = writer.get_chunk_size()
while True:
chunk = await request.content.read(chunk_size)
chunk = await req.content.read(chunk_size)
if not chunk:
break
written = await writer.write_chunk(chunk)
return make_json_response(self.__make_write_info(name, size, written))
@exposed_http("POST", "/msd/write_remote")
async def __write_remote_handler(self, request: Request) -> (Response | StreamResponse): # pylint: disable=too-many-locals
unsafe_prefix = request.query.get("prefix", "") + "/"
url = valid_url(request.query.get("url"))
insecure = valid_bool(request.query.get("insecure", False))
timeout = valid_float_f01(request.query.get("timeout", 10.0))
remove_incomplete = self.__get_remove_incomplete(request)
async def __write_remote_handler(self, req: Request) -> (Response | StreamResponse): # pylint: disable=too-many-locals
unsafe_prefix = req.query.get("prefix", "") + "/"
url = valid_url(req.query.get("url"))
insecure = valid_bool(req.query.get("insecure", False))
timeout = valid_float_f01(req.query.get("timeout", 10.0))
remove_incomplete = self.__get_remove_incomplete(req)
name = ""
size = written = 0
@ -174,7 +174,7 @@ class MsdApi:
read_timeout=(7 * 24 * 3600),
) as remote:
name = str(request.query.get("image", "")).strip()
name = str(req.query.get("image", "")).strip()
if len(name) == 0:
name = htclient.get_filename(remote)
name = valid_msd_image_name(unsafe_prefix + name)
@ -184,7 +184,7 @@ class MsdApi:
get_logger(0).info("Downloading image %r as %r to MSD ...", url, name)
async with self.__msd.write_image(name, size, remove_incomplete) as writer:
chunk_size = writer.get_chunk_size()
response = await start_streaming(request, "application/x-ndjson")
response = await start_streaming(req, "application/x-ndjson")
await stream_write_info()
last_report_ts = 0
async for chunk in remote.content.iter_chunked(chunk_size):
@ -197,16 +197,16 @@ class MsdApi:
await stream_write_info()
return response
except Exception as err:
except Exception as ex:
if response is not None:
await stream_write_info()
await stream_json_exception(response, err)
elif isinstance(err, aiohttp.ClientError):
return make_json_exception(err, 400)
await stream_json_exception(response, ex)
elif isinstance(ex, aiohttp.ClientError):
return make_json_exception(ex, 400)
raise
def __get_remove_incomplete(self, request: Request) -> (bool | None):
flag: (str | None) = request.query.get("remove_incomplete")
def __get_remove_incomplete(self, req: Request) -> (bool | None):
flag: (str | None) = req.query.get("remove_incomplete")
return (valid_bool(flag) if flag is not None else None)
def __make_write_info(self, name: str, size: int, written: int) -> dict:
@ -215,8 +215,8 @@ class MsdApi:
# =====
@exposed_http("POST", "/msd/remove")
async def __remove_handler(self, request: Request) -> Response:
await self.__msd.remove(valid_msd_image_name(request.query.get("image")))
async def __remove_handler(self, req: Request) -> Response:
await self.__msd.remove(valid_msd_image_name(req.query.get("image")))
return make_json_response()
@exposed_http("POST", "/msd/reset")

View File

@ -111,10 +111,10 @@ class RedfishApi:
}, wrap_result=False)
@exposed_http("POST", "/redfish/v1/Systems/0/Actions/ComputerSystem.Reset")
async def __power_handler(self, request: Request) -> Response:
async def __power_handler(self, req: Request) -> Response:
try:
action = check_string_in_list(
arg=(await request.json())["ResetType"],
arg=(await req.json()).get("ResetType"),
name="Redfish ResetType",
variants=set(self.__actions),
lower=False,

View File

@ -52,36 +52,36 @@ class StreamerApi:
return make_json_response(await self.__streamer.get_state())
@exposed_http("GET", "/streamer/snapshot")
async def __take_snapshot_handler(self, request: Request) -> Response:
async def __take_snapshot_handler(self, req: Request) -> Response:
snapshot = await self.__streamer.take_snapshot(
save=valid_bool(request.query.get("save", False)),
load=valid_bool(request.query.get("load", False)),
allow_offline=valid_bool(request.query.get("allow_offline", False)),
save=valid_bool(req.query.get("save", False)),
load=valid_bool(req.query.get("load", False)),
allow_offline=valid_bool(req.query.get("allow_offline", False)),
)
if snapshot:
if valid_bool(request.query.get("ocr", False)):
if valid_bool(req.query.get("ocr", False)):
langs = self.__ocr.get_available_langs()
return Response(
body=(await self.__ocr.recognize(
data=snapshot.data,
langs=valid_string_list(
arg=str(request.query.get("ocr_langs", "")).strip(),
arg=str(req.query.get("ocr_langs", "")).strip(),
subval=(lambda lang: check_string_in_list(lang, "OCR lang", langs)),
name="OCR langs list",
),
left=int(valid_number(request.query.get("ocr_left", -1))),
top=int(valid_number(request.query.get("ocr_top", -1))),
right=int(valid_number(request.query.get("ocr_right", -1))),
bottom=int(valid_number(request.query.get("ocr_bottom", -1))),
left=int(valid_number(req.query.get("ocr_left", -1))),
top=int(valid_number(req.query.get("ocr_top", -1))),
right=int(valid_number(req.query.get("ocr_right", -1))),
bottom=int(valid_number(req.query.get("ocr_bottom", -1))),
)),
headers=dict(snapshot.headers),
content_type="text/plain",
)
elif valid_bool(request.query.get("preview", False)):
elif valid_bool(req.query.get("preview", False)):
data = await snapshot.make_preview(
max_width=valid_int_f0(request.query.get("preview_max_width", 0)),
max_height=valid_int_f0(request.query.get("preview_max_height", 0)),
quality=valid_stream_quality(request.query.get("preview_quality", 80)),
max_width=valid_int_f0(req.query.get("preview_max_width", 0)),
max_height=valid_int_f0(req.query.get("preview_max_height", 0)),
quality=valid_stream_quality(req.query.get("preview_quality", 80)),
)
else:
data = snapshot.data

View File

@ -48,17 +48,17 @@ class UserGpioApi:
})
@exposed_http("POST", "/gpio/switch")
async def __switch_handler(self, request: Request) -> Response:
channel = valid_ugpio_channel(request.query.get("channel"))
state = valid_bool(request.query.get("state"))
wait = valid_bool(request.query.get("wait", False))
async def __switch_handler(self, req: Request) -> Response:
channel = valid_ugpio_channel(req.query.get("channel"))
state = valid_bool(req.query.get("state"))
wait = valid_bool(req.query.get("wait", False))
await self.__user_gpio.switch(channel, state, wait)
return make_json_response()
@exposed_http("POST", "/gpio/pulse")
async def __pulse_handler(self, request: Request) -> Response:
channel = valid_ugpio_channel(request.query.get("channel"))
delay = valid_float_f0(request.query.get("delay", 0.0))
wait = valid_bool(request.query.get("wait", False))
async def __pulse_handler(self, req: Request) -> Response:
channel = valid_ugpio_channel(req.query.get("channel"))
delay = valid_float_f0(req.query.get("delay", 0.0))
wait = valid_bool(req.query.get("wait", False))
await self.__user_gpio.pulse(channel, delay, wait)
return make_json_response()

View File

@ -46,8 +46,8 @@ class ExtrasInfoSubmanager(BaseInfoSubmanager):
try:
sui = sysunit.SystemdUnitInfo()
await sui.open()
except Exception as err:
get_logger(0).error("Can't open systemd bus to get extras state: %s", tools.efmt(err))
except Exception as ex:
get_logger(0).error("Can't open systemd bus to get extras state: %s", tools.efmt(ex))
sui = None
try:
extras: dict[str, dict] = {}
@ -85,8 +85,8 @@ class ExtrasInfoSubmanager(BaseInfoSubmanager):
if sui is not None:
try:
(extra["enabled"], extra["started"]) = await sui.get_status(daemon)
except Exception as err:
get_logger(0).error("Can't get info about the service %r: %s", daemon, tools.efmt(err))
except Exception as ex:
get_logger(0).error("Can't get info about the service %r: %s", daemon, tools.efmt(ex))
def __rewrite_app_port(self, extra: dict) -> None:
port_path = extra.get("port", "")

View File

@ -87,8 +87,8 @@ class FanInfoSubmanager(BaseInfoSubmanager):
async with sysunit.SystemdUnitInfo() as sui:
status = await sui.get_status(self.__daemon)
return (status[0] or status[1])
except Exception as err:
get_logger(0).error("Can't get info about the service %r: %s", self.__daemon, tools.efmt(err))
except Exception as ex:
get_logger(0).error("Can't get info about the service %r: %s", self.__daemon, tools.efmt(ex))
return False
async def __get_fan_state(self) -> (dict | None):
@ -97,8 +97,8 @@ class FanInfoSubmanager(BaseInfoSubmanager):
async with session.get("http://localhost/state") as response:
htclient.raise_not_200(response)
return (await response.json())["result"]
except Exception as err:
get_logger(0).error("Can't read fan state: %s", err)
except Exception as ex:
get_logger(0).error("Can't read fan state: %s", ex)
return None
def __make_http_session(self) -> aiohttp.ClientSession:

View File

@ -114,8 +114,8 @@ class HwInfoSubmanager(BaseInfoSubmanager):
try:
value = (await aiotools.read_file(path)).strip(" \t\r\n\0")
self.__dt_cache[name] = (value.upper() if upper else value)
except Exception as err:
get_logger(0).error("Can't read DT %s from %s: %s", name, path, err)
except Exception as ex:
get_logger(0).error("Can't read DT %s from %s: %s", name, path, ex)
return None
return self.__dt_cache[name]
@ -141,8 +141,8 @@ class HwInfoSubmanager(BaseInfoSubmanager):
temp_path = f"{env.SYSFS_PREFIX}/sys/class/thermal/thermal_zone0/temp"
try:
return int((await aiotools.read_file(temp_path)).strip()) / 1000
except Exception as err:
get_logger(0).error("Can't read CPU temp from %s: %s", temp_path, err)
except Exception as ex:
get_logger(0).error("Can't read CPU temp from %s: %s", temp_path, ex)
return None
async def __get_cpu_percent(self) -> (float | None):
@ -160,8 +160,8 @@ class HwInfoSubmanager(BaseInfoSubmanager):
+ system_all / total * 100
+ (st.steal + st.guest) / total * 100
)
except Exception as err:
get_logger(0).error("Can't get CPU percent: %s", err)
except Exception as ex:
get_logger(0).error("Can't get CPU percent: %s", ex)
return None
async def __get_mem(self) -> dict:
@ -172,8 +172,8 @@ class HwInfoSubmanager(BaseInfoSubmanager):
"total": st.total,
"available": st.available,
}
except Exception as err:
get_logger(0).error("Can't get memory info: %s", err)
except Exception as ex:
get_logger(0).error("Can't get memory info: %s", ex)
return {
"percent": None,
"total": None,
@ -216,6 +216,6 @@ class HwInfoSubmanager(BaseInfoSubmanager):
return None
try:
return parser(text)
except Exception as err:
get_logger(0).error("Can't parse [ %s ] output: %r: %s", tools.cmdfmt(cmd), text, tools.efmt(err))
except Exception as ex:
get_logger(0).error("Can't parse [ %s ] output: %r: %s", tools.cmdfmt(cmd), text, tools.efmt(ex))
return None

View File

@ -76,8 +76,8 @@ def _load_libtesseract() -> (ctypes.CDLL | None):
setattr(func, "restype", restype)
setattr(func, "argtypes", argtypes)
return lib
except Exception as err:
warnings.warn(f"Can't load libtesseract: {err}", RuntimeWarning)
except Exception as ex:
warnings.warn(f"Can't load libtesseract: {ex}", RuntimeWarning)
return None

View File

@ -213,7 +213,7 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
# ===== STREAMER CONTROLLER
@exposed_http("POST", "/streamer/set_params")
async def __streamer_set_params_handler(self, request: Request) -> Response:
async def __streamer_set_params_handler(self, req: Request) -> Response:
current_params = self.__streamer.get_params()
for (name, validator, exc_cls) in [
("quality", valid_stream_quality, StreamerQualityNotSupported),
@ -222,7 +222,7 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
("h264_bitrate", valid_stream_h264_bitrate, StreamerH264NotSupported),
("h264_gop", valid_stream_h264_gop, StreamerH264NotSupported),
]:
value = request.query.get(name)
value = req.query.get(name)
if value:
if name not in current_params:
assert exc_cls is not None, name
@ -242,9 +242,9 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
# ===== WEBSOCKET
@exposed_http("GET", "/ws")
async def __ws_handler(self, request: Request) -> WebSocketResponse:
stream = valid_bool(request.query.get("stream", True))
async with self._ws_session(request, stream=stream) as ws:
async def __ws_handler(self, req: Request) -> WebSocketResponse:
stream = valid_bool(req.query.get("stream", True))
async with self._ws_session(req, stream=stream) as ws:
states = [
(event_type, src.get_state())
for sub in self.__subsystems
@ -275,8 +275,8 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
aioproc.rename_process("main")
super().run(**kwargs)
async def _check_request_auth(self, exposed: HttpExposed, request: Request) -> None:
await check_request_auth(self.__auth_manager, exposed, request)
async def _check_request_auth(self, exposed: HttpExposed, req: Request) -> None:
await check_request_auth(self.__auth_manager, exposed, req)
async def _init_app(self) -> None:
aiotools.create_deadly_task("Stream controller", self.__stream_controller())

View File

@ -386,8 +386,8 @@ class Streamer: # pylint: disable=too-many-instance-attributes
return snapshot
logger.error("Stream is offline, no signal or so")
except (aiohttp.ClientConnectionError, aiohttp.ServerConnectionError) as err:
logger.error("Can't connect to streamer: %s", tools.efmt(err))
except (aiohttp.ClientConnectionError, aiohttp.ServerConnectionError) as ex:
logger.error("Can't connect to streamer: %s", tools.efmt(ex))
except Exception:
logger.exception("Invalid streamer response from /snapshot")
return None
@ -473,8 +473,8 @@ class Streamer: # pylint: disable=too-many-instance-attributes
logger.info("%s: %s", name, tools.cmdfmt(cmd))
try:
await aioproc.log_process(cmd, logger, prefix=name)
except Exception as err:
logger.exception("Can't execute command: %s", err)
except Exception as ex:
logger.exception("Can't execute command: %s", ex)
async def __start_streamer_proc(self) -> None:
assert self.__streamer_proc is None

View File

@ -51,8 +51,8 @@ class SystemdUnitInfo:
unit_props = unit.get_interface("org.freedesktop.DBus.Properties")
started = ((await unit_props.call_get("org.freedesktop.systemd1.Unit", "ActiveState")).value == "active") # type: ignore
self.__requested = True
except dbus_next.errors.DBusError as err:
if err.type != "org.freedesktop.systemd1.NoSuchUnit":
except dbus_next.errors.DBusError as ex:
if ex.type != "org.freedesktop.systemd1.NoSuchUnit":
raise
started = False
enabled = ((await self.__manager.call_get_unit_file_state(name)) in [ # type: ignore

View File

@ -346,5 +346,5 @@ def main(argv: (list[str] | None)=None) -> None:
options = parser.parse_args(argv[1:])
try:
options.cmd(config)
except ValidatorError as err:
raise SystemExit(str(err))
except ValidatorError as ex:
raise SystemExit(str(ex))

View File

@ -47,9 +47,9 @@ def _set_param(gadget: str, instance: int, param: str, value: str) -> None:
try:
with open(_get_param_path(gadget, instance, param), "w") as file:
file.write(value + "\n")
except OSError as err:
if err.errno == errno.EBUSY:
raise SystemExit(f"Can't change {param!r} value because device is locked: {err}")
except OSError as ex:
if ex.errno == errno.EBUSY:
raise SystemExit(f"Can't change {param!r} value because device is locked: {ex}")
raise

View File

@ -133,8 +133,8 @@ class _Service: # pylint: disable=too-many-instance-attributes
logger.info("CMD: %s", tools.cmdfmt(cmd))
try:
return (not (await aioproc.log_process(cmd, logger)).returncode)
except Exception as err:
logger.exception("Can't execute command: %s", err)
except Exception as ex:
logger.exception("Can't execute command: %s", ex)
return False
# =====

View File

@ -60,8 +60,8 @@ class PstServer(HttpServer): # pylint: disable=too-many-arguments,too-many-inst
# ===== WEBSOCKET
@exposed_http("GET", "/ws")
async def __ws_handler(self, request: Request) -> WebSocketResponse:
async with self._ws_session(request) as ws:
async def __ws_handler(self, req: Request) -> WebSocketResponse:
async with self._ws_session(req) as ws:
await ws.send_event("loop", {})
return (await self._ws_loop(ws))
@ -128,9 +128,9 @@ class PstServer(HttpServer): # pylint: disable=too-many-arguments,too-many-inst
def __is_write_available(self) -> bool:
try:
return (not (os.statvfs(self.__data_path).f_flag & os.ST_RDONLY))
except Exception as err:
except Exception as ex:
get_logger(0).info("Can't get filesystem state of PST (%s): %s",
self.__data_path, tools.efmt(err))
self.__data_path, tools.efmt(ex))
return False
async def __remount_storage(self, rw: bool) -> bool:

View File

@ -46,8 +46,8 @@ def _preexec() -> None:
if os.isatty(0):
try:
os.tcsetpgrp(0, os.getpgid(0))
except Exception as err:
get_logger(0).info("Can't perform tcsetpgrp(0): %s", tools.efmt(err))
except Exception as ex:
get_logger(0).info("Can't perform tcsetpgrp(0): %s", tools.efmt(ex))
async def _run_process(cmd: list[str], data_path: str) -> asyncio.subprocess.Process: # pylint: disable=no-member

View File

@ -120,10 +120,10 @@ class RfbClient(RfbClientStream): # pylint: disable=too-many-instance-attribute
except asyncio.CancelledError:
logger.info("%s [%s]: Cancelling subtask ...", self._remote, name)
raise
except RfbConnectionError as err:
logger.info("%s [%s]: Gone: %s", self._remote, name, err)
except (RfbError, ssl.SSLError) as err:
logger.error("%s [%s]: Error: %s", self._remote, name, err)
except RfbConnectionError as ex:
logger.info("%s [%s]: Gone: %s", self._remote, name, ex)
except (RfbError, ssl.SSLError) as ex:
logger.error("%s [%s]: Error: %s", self._remote, name, ex)
except Exception:
logger.exception("%s [%s]: Unhandled exception", self._remote, name)

View File

@ -29,5 +29,5 @@ class RfbError(Exception):
class RfbConnectionError(RfbError):
def __init__(self, msg: str, err: Exception) -> None:
super().__init__(f"{msg}: {tools.efmt(err)}")
def __init__(self, msg: str, ex: Exception) -> None:
super().__init__(f"{msg}: {tools.efmt(ex)}")

View File

@ -51,22 +51,22 @@ class RfbClientStream:
else:
fmt = f">{fmt}"
return struct.unpack(fmt, await self.__reader.readexactly(struct.calcsize(fmt)))[0]
except (ConnectionError, asyncio.IncompleteReadError) as err:
raise RfbConnectionError(f"Can't read {msg}", err)
except (ConnectionError, asyncio.IncompleteReadError) as ex:
raise RfbConnectionError(f"Can't read {msg}", ex)
async def _read_struct(self, msg: str, fmt: str) -> tuple[int, ...]:
assert len(fmt) > 1
try:
fmt = f">{fmt}"
return struct.unpack(fmt, (await self.__reader.readexactly(struct.calcsize(fmt))))
except (ConnectionError, asyncio.IncompleteReadError) as err:
raise RfbConnectionError(f"Can't read {msg}", err)
except (ConnectionError, asyncio.IncompleteReadError) as ex:
raise RfbConnectionError(f"Can't read {msg}", ex)
async def _read_text(self, msg: str, length: int) -> str:
try:
return (await self.__reader.readexactly(length)).decode("utf-8", errors="ignore")
except (ConnectionError, asyncio.IncompleteReadError) as err:
raise RfbConnectionError(f"Can't read {msg}", err)
except (ConnectionError, asyncio.IncompleteReadError) as ex:
raise RfbConnectionError(f"Can't read {msg}", ex)
# =====
@ -84,8 +84,8 @@ class RfbClientStream:
self.__writer.write(struct.pack(f">{fmt}", *values))
if drain:
await self.__writer.drain()
except ConnectionError as err:
raise RfbConnectionError(f"Can't write {msg}", err)
except ConnectionError as ex:
raise RfbConnectionError(f"Can't write {msg}", ex)
async def _write_reason(self, msg: str, text: str, drain: bool=True) -> None:
encoded = text.encode("utf-8", errors="ignore")
@ -94,8 +94,8 @@ class RfbClientStream:
self.__writer.write(encoded)
if drain:
await self.__writer.drain()
except ConnectionError as err:
raise RfbConnectionError(f"Can't write {msg}", err)
except ConnectionError as ex:
raise RfbConnectionError(f"Can't write {msg}", ex)
async def _write_fb_update(self, msg: str, width: int, height: int, encoding: int, drain: bool=True) -> None:
await self._write_struct(
@ -123,8 +123,8 @@ class RfbClientStream:
server_side=True,
ssl_handshake_timeout=ssl_timeout,
)
except ConnectionError as err:
raise RfbConnectionError("Can't start TLS", err)
except ConnectionError as ex:
raise RfbConnectionError("Can't start TLS", ex)
ssl_reader.set_transport(transport) # type: ignore
ssl_writer = asyncio.StreamWriter(

View File

@ -210,12 +210,12 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
await self.__queue_frame(frame)
else:
await self.__queue_frame("No signal")
except StreamerError as err:
if isinstance(err, StreamerPermError):
except StreamerError as ex:
if isinstance(ex, StreamerPermError):
streamer = self.__get_default_streamer()
logger.info("%s [streamer]: Permanent error: %s; switching to %s ...", self._remote, err, streamer)
logger.info("%s [streamer]: Permanent error: %s; switching to %s ...", self._remote, ex, streamer)
else:
logger.info("%s [streamer]: Waiting for stream: %s", self._remote, err)
logger.info("%s [streamer]: Waiting for stream: %s", self._remote, ex)
await self.__queue_frame("Waiting for stream ...")
await asyncio.sleep(1)
@ -481,8 +481,8 @@ class VncServer: # pylint: disable=too-many-instance-attributes
try:
async with kvmd.make_session("", "") as kvmd_session:
none_auth_only = await kvmd_session.auth.check()
except (aiohttp.ClientError, asyncio.TimeoutError) as err:
logger.error("%s [entry]: Can't check KVMD auth mode: %s", remote, tools.efmt(err))
except (aiohttp.ClientError, asyncio.TimeoutError) as ex:
logger.error("%s [entry]: Can't check KVMD auth mode: %s", remote, tools.efmt(ex))
return
await _Client(

View File

@ -54,8 +54,8 @@ class VncAuthManager:
if self.__enabled:
try:
return (await self.__inner_read_credentials(), True)
except VncAuthError as err:
get_logger(0).error(str(err))
except VncAuthError as ex:
get_logger(0).error(str(ex))
except Exception:
get_logger(0).exception("Unhandled exception while reading VNCAuth passwd file")
return ({}, (not self.__enabled))

View File

@ -56,8 +56,8 @@ def _write_int(rtc: int, key: str, value: int) -> None:
def _reset_alarm(rtc: int, timeout: int) -> None:
try:
now = _read_int(rtc, "since_epoch")
except OSError as err:
if err.errno != errno.EINVAL:
except OSError as ex:
if ex.errno != errno.EINVAL:
raise
raise RtcIsNotAvailableError("Can't read since_epoch right now")
if now == 0:
@ -65,8 +65,8 @@ def _reset_alarm(rtc: int, timeout: int) -> None:
try:
for wake in [0, now + timeout]:
_write_int(rtc, "wakealarm", wake)
except OSError as err:
if err.errno != errno.EIO:
except OSError as ex:
if ex.errno != errno.EIO:
raise
raise RtcIsNotAvailableError("IO error, probably the supercapacitor is not charged")
@ -80,9 +80,9 @@ def _cmd_run(config: Section) -> None:
while True:
try:
_reset_alarm(config.rtc, config.timeout)
except RtcIsNotAvailableError as err:
except RtcIsNotAvailableError as ex:
if not fail:
logger.error("RTC%d is not available now: %s; waiting ...", config.rtc, err)
logger.error("RTC%d is not available now: %s; waiting ...", config.rtc, ex)
fail = True
else:
if fail:

View File

@ -66,8 +66,8 @@ class _AuthApiPart(_BaseApiPart):
async with session.get(self._make_url("auth/check")) as response:
htclient.raise_not_200(response)
return True
except aiohttp.ClientResponseError as err:
if err.status in [400, 401, 403]:
except aiohttp.ClientResponseError as ex:
if ex.status in [400, 401, 403]:
return False
raise
@ -128,8 +128,8 @@ class _AtxApiPart(_BaseApiPart):
) as response:
htclient.raise_not_200(response)
return True
except aiohttp.ClientResponseError as err:
if err.status == 409:
except aiohttp.ClientResponseError as ex:
if ex.status == 409:
return False
raise

View File

@ -72,10 +72,10 @@ class BaseStreamerClient:
def _http_handle_errors() -> Generator[None, None, None]:
try:
yield
except Exception as err: # Тут бывают и ассерты, и KeyError, и прочая херня
if isinstance(err, StreamerTempError):
except Exception as ex: # Тут бывают и ассерты, и KeyError, и прочая херня
if isinstance(ex, StreamerTempError):
raise
raise StreamerTempError(tools.efmt(err))
raise StreamerTempError(tools.efmt(ex))
class HttpStreamerClient(BaseStreamerClient):
@ -167,10 +167,10 @@ def _memsink_handle_errors() -> Generator[None, None, None]:
yield
except StreamerPermError:
raise
except FileNotFoundError as err:
raise StreamerTempError(tools.efmt(err))
except Exception as err:
raise StreamerPermError(tools.efmt(err))
except FileNotFoundError as ex:
raise StreamerTempError(tools.efmt(ex))
except Exception as ex:
raise StreamerPermError(tools.efmt(ex))
class MemsinkStreamerClient(BaseStreamerClient):

View File

@ -46,8 +46,8 @@ def _remount(path: str, rw: bool) -> None:
_log(f"Remounting {path} to {mode.upper()}-mode ...")
try:
subprocess.check_call(["/bin/mount", "--options", f"remount,{mode}", path])
except subprocess.CalledProcessError as err:
raise SystemExit(f"Can't remount: {err}")
except subprocess.CalledProcessError as ex:
raise SystemExit(f"Can't remount: {ex}")
def _mkdir(path: str) -> None:
@ -55,8 +55,8 @@ def _mkdir(path: str) -> None:
_log(f"MKDIR --- {path}")
try:
os.mkdir(path)
except Exception as err:
raise SystemExit(f"Can't create directory: {err}")
except Exception as ex:
raise SystemExit(f"Can't create directory: {ex}")
def _rmtree(path: str) -> None:
@ -64,8 +64,8 @@ def _rmtree(path: str) -> None:
_log(f"RMALL --- {path}")
try:
shutil.rmtree(path)
except Exception as err:
raise SystemExit(f"Can't remove directory: {err}")
except Exception as ex:
raise SystemExit(f"Can't remove directory: {ex}")
def _rm(path: str) -> None:
@ -73,16 +73,16 @@ def _rm(path: str) -> None:
_log(f"RM --- {path}")
try:
os.remove(path)
except Exception as err:
raise SystemExit(f"Can't remove file: {err}")
except Exception as ex:
raise SystemExit(f"Can't remove file: {ex}")
def _move(src: str, dest: str) -> None:
_log(f"MOVE --- {src} --> {dest}")
try:
os.rename(src, dest)
except Exception as err:
raise SystemExit(f"Can't move file: {err}")
except Exception as ex:
raise SystemExit(f"Can't move file: {ex}")
def _chown(path: str, user: str) -> None:
@ -90,8 +90,8 @@ def _chown(path: str, user: str) -> None:
_log(f"CHOWN --- {user} - {path}")
try:
shutil.chown(path, user=user)
except Exception as err:
raise SystemExit(f"Can't change ownership: {err}")
except Exception as ex:
raise SystemExit(f"Can't change ownership: {ex}")
def _chgrp(path: str, group: str) -> None:
@ -99,8 +99,8 @@ def _chgrp(path: str, group: str) -> None:
_log(f"CHGRP --- {group} - {path}")
try:
shutil.chown(path, group=group)
except Exception as err:
raise SystemExit(f"Can't change group: {err}")
except Exception as ex:
raise SystemExit(f"Can't change group: {ex}")
def _chmod(path: str, mode: int) -> None:
@ -108,8 +108,8 @@ def _chmod(path: str, mode: int) -> None:
_log(f"CHMOD --- 0o{mode:o} - {path}")
try:
os.chmod(path, mode)
except Exception as err:
raise SystemExit(f"Can't change permissions: {err}")
except Exception as ex:
raise SystemExit(f"Can't change permissions: {ex}")
# =====

View File

@ -36,27 +36,27 @@ def make_user_agent(app: str) -> str:
return f"{app}/{__version__}"
def raise_not_200(response: aiohttp.ClientResponse) -> None:
if response.status != 200:
assert response.reason is not None
response.release()
def raise_not_200(resp: aiohttp.ClientResponse) -> None:
if resp.status != 200:
assert resp.reason is not None
resp.release()
raise aiohttp.ClientResponseError(
response.request_info,
response.history,
status=response.status,
message=response.reason,
headers=response.headers,
resp.request_info,
resp.history,
status=resp.status,
message=resp.reason,
headers=resp.headers,
)
def get_filename(response: aiohttp.ClientResponse) -> str:
def get_filename(resp: aiohttp.ClientResponse) -> str:
try:
disp = response.headers["Content-Disposition"]
disp = resp.headers["Content-Disposition"]
parsed = aiohttp.multipart.parse_content_disposition(disp)
return str(parsed[1]["filename"])
except Exception:
try:
return os.path.basename(response.url.path)
return os.path.basename(resp.url.path)
except Exception:
raise aiohttp.ClientError("Can't determine filename")
@ -79,6 +79,6 @@ async def download(
),
}
async with aiohttp.ClientSession(**kwargs) as session:
async with session.get(url, verify_ssl=verify) as response: # type: ignore
raise_not_200(response)
yield response
async with session.get(url, verify_ssl=verify) as resp: # type: ignore
raise_not_200(resp)
yield resp

View File

@ -157,7 +157,7 @@ def make_json_response(
wrap_result: bool=True,
) -> Response:
response = Response(
resp = Response(
text=json.dumps(({
"ok": (status == 200),
"result": (result or {}),
@ -167,18 +167,18 @@ def make_json_response(
)
if set_cookies:
for (key, value) in set_cookies.items():
response.set_cookie(key, value, httponly=True, samesite="Strict")
return response
resp.set_cookie(key, value, httponly=True, samesite="Strict")
return resp
def make_json_exception(err: Exception, status: (int | None)=None) -> Response:
name = type(err).__name__
msg = str(err)
if isinstance(err, HttpError):
status = err.status
def make_json_exception(ex: Exception, status: (int | None)=None) -> Response:
name = type(ex).__name__
msg = str(ex)
if isinstance(ex, HttpError):
status = ex.status
else:
get_logger().error("API error: %s: %s", name, msg)
assert status is not None, err
assert status is not None, ex
return make_json_response({
"error": name,
"error_msg": msg,
@ -186,35 +186,35 @@ def make_json_exception(err: Exception, status: (int | None)=None) -> Response:
async def start_streaming(
request: Request,
req: Request,
content_type: str,
content_length: int=-1,
file_name: str="",
) -> StreamResponse:
response = StreamResponse(status=200, reason="OK")
response.content_type = content_type
resp = StreamResponse(status=200, reason="OK")
resp.content_type = content_type
if content_length >= 0: # pylint: disable=consider-using-min-builtin
response.content_length = content_length
resp.content_length = content_length
if file_name:
file_name = urllib.parse.quote(file_name, safe="")
response.headers["Content-Disposition"] = f"attachment; filename*=UTF-8''{file_name}"
await response.prepare(request)
return response
resp.headers["Content-Disposition"] = f"attachment; filename*=UTF-8''{file_name}"
await resp.prepare(req)
return resp
async def stream_json(response: StreamResponse, result: dict, ok: bool=True) -> None:
await response.write(json.dumps({
async def stream_json(resp: StreamResponse, result: dict, ok: bool=True) -> None:
await resp.write(json.dumps({
"ok": ok,
"result": result,
}).encode("utf-8") + b"\r\n")
async def stream_json_exception(response: StreamResponse, err: Exception) -> None:
name = type(err).__name__
msg = str(err)
async def stream_json_exception(resp: StreamResponse, ex: Exception) -> None:
name = type(ex).__name__
msg = str(ex)
get_logger().error("API error: %s: %s", name, msg)
await stream_json(response, {
await stream_json(resp, {
"error": name,
"error_msg": msg,
}, False)
@ -249,15 +249,15 @@ def parse_ws_event(msg: str) -> tuple[str, dict]:
_REQUEST_AUTH_INFO = "_kvmd_auth_info"
def _format_P(request: BaseRequest, *_, **__) -> str: # type: ignore # pylint: disable=invalid-name
return (getattr(request, _REQUEST_AUTH_INFO, None) or "-")
def _format_P(req: BaseRequest, *_, **__) -> str: # type: ignore # pylint: disable=invalid-name
return (getattr(req, _REQUEST_AUTH_INFO, None) or "-")
AccessLogger._format_P = staticmethod(_format_P) # type: ignore # pylint: disable=protected-access
def set_request_auth_info(request: BaseRequest, info: str) -> None:
setattr(request, _REQUEST_AUTH_INFO, info)
def set_request_auth_info(req: BaseRequest, info: str) -> None:
setattr(req, _REQUEST_AUTH_INFO, info)
# =====
@ -318,16 +318,16 @@ class HttpServer:
self.__add_exposed_ws(ws_exposed)
def __add_exposed_http(self, exposed: HttpExposed) -> None:
async def wrapper(request: Request) -> Response:
async def wrapper(req: Request) -> Response:
try:
await self._check_request_auth(exposed, request)
return (await exposed.handler(request))
except IsBusyError as err:
return make_json_exception(err, 409)
except (ValidatorError, OperationError) as err:
return make_json_exception(err, 400)
except HttpError as err:
return make_json_exception(err)
await self._check_request_auth(exposed, req)
return (await exposed.handler(req))
except IsBusyError as ex:
return make_json_exception(ex, 409)
except (ValidatorError, OperationError) as ex:
return make_json_exception(ex, 400)
except HttpError as ex:
return make_json_exception(ex)
self.__app.router.add_route(exposed.method, exposed.path, wrapper)
def __add_exposed_ws(self, exposed: WsExposed) -> None:
@ -342,10 +342,10 @@ class HttpServer:
# =====
@contextlib.asynccontextmanager
async def _ws_session(self, request: Request, **kwargs: Any) -> AsyncGenerator[WsSession, None]:
async def _ws_session(self, req: Request, **kwargs: Any) -> AsyncGenerator[WsSession, None]:
assert self.__ws_heartbeat is not None
wsr = WebSocketResponse(heartbeat=self.__ws_heartbeat)
await wsr.prepare(request)
await wsr.prepare(req)
ws = WsSession(wsr, kwargs)
async with self.__ws_sessions_lock:
@ -364,8 +364,8 @@ class HttpServer:
if msg.type == WSMsgType.TEXT:
try:
(event_type, event) = parse_ws_event(msg.data)
except Exception as err:
logger.error("Can't parse JSON event from websocket: %r", err)
except Exception as ex:
logger.error("Can't parse JSON event from websocket: %r", ex)
else:
handler = self.__ws_handlers.get(event_type)
if handler:
@ -417,7 +417,7 @@ class HttpServer:
# =====
async def _check_request_auth(self, exposed: HttpExposed, request: Request) -> None:
async def _check_request_auth(self, exposed: HttpExposed, req: Request) -> None:
pass
async def _init_app(self) -> None:

View File

@ -286,8 +286,8 @@ class Inotify:
while True:
try:
return os.read(self.__fd, _EVENTS_BUFFER_LENGTH)
except OSError as err:
if err.errno == errno.EINTR:
except OSError as ex:
if ex.errno == errno.EINTR:
pass
def __enter__(self) -> "Inotify":

View File

@ -135,8 +135,8 @@ def _read_keyboard_layout(path: str) -> dict[int, list[At1Key]]: # Keysym to ev
try:
at1_code = int(parts[1], 16)
except ValueError as err:
logger.error("Syntax error at %s:%d: %s", path, lineno, err)
except ValueError as ex:
logger.error("Syntax error at %s:%d: %s", path, lineno, ex)
continue
rest = parts[2:]

View File

@ -34,10 +34,10 @@ def is_ipv6_enabled() -> bool:
with socket.socket(socket.AF_INET6, socket.SOCK_STREAM) as sock:
sock.bind(("::1", 0))
return True
except OSError as err:
if err.errno in [errno.EADDRNOTAVAIL, errno.EAFNOSUPPORT]:
except OSError as ex:
if ex.errno in [errno.EADDRNOTAVAIL, errno.EAFNOSUPPORT]:
return False
if err.errno == errno.EADDRINUSE:
if ex.errno == errno.EADDRINUSE:
return True
raise

View File

@ -76,7 +76,7 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes
self.__notifier = aiotools.AioNotifier()
self.__region = aiotools.AioExclusiveRegion(AtxIsBusyError, self.__notifier)
self.__line_request: (gpiod.LineRequest | None) = None
self.__line_req: (gpiod.LineRequest | None) = None
self.__reader = aiogp.AioReader(
path=self.__device_path,
@ -108,8 +108,8 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes
}
def sysprep(self) -> None:
assert self.__line_request is None
self.__line_request = gpiod.request_lines(
assert self.__line_req is None
self.__line_req = gpiod.request_lines(
self.__device_path,
consumer="kvmd::atx",
config={
@ -143,9 +143,9 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes
await self.__reader.poll()
async def cleanup(self) -> None:
if self.__line_request:
if self.__line_req:
try:
self.__line_request.release()
self.__line_req.release()
except Exception:
pass
@ -196,11 +196,11 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes
@aiotools.atomic_fg
async def __inner_click(self, name: str, pin: int, delay: float) -> None:
assert self.__line_request
assert self.__line_req
try:
self.__line_request.set_value(pin, gpiod.line.Value(True))
self.__line_req.set_value(pin, gpiod.line.Value(True))
await asyncio.sleep(delay)
finally:
self.__line_request.set_value(pin, gpiod.line.Value(False))
self.__line_req.set_value(pin, gpiod.line.Value(False))
await asyncio.sleep(1)
get_logger(0).info("Clicked ATX button %r", name)

View File

@ -85,8 +85,8 @@ class Plugin(BaseAuthService):
"User-Agent": htclient.make_user_agent("KVMD"),
"X-KVMD-User": user,
},
) as response:
htclient.raise_not_200(response)
) as resp:
htclient.raise_not_200(resp)
return True
except Exception:
get_logger().exception("Failed HTTP auth request for user %r", user)

View File

@ -100,10 +100,10 @@ class Plugin(BaseAuthService):
return True
except ldap.INVALID_CREDENTIALS:
pass
except ldap.SERVER_DOWN as err:
get_logger().error("LDAP server is down: %s", tools.efmt(err))
except Exception as err:
get_logger().error("Unexpected LDAP error: %s", tools.efmt(err))
except ldap.SERVER_DOWN as ex:
get_logger().error("LDAP server is down: %s", tools.efmt(ex))
except Exception as ex:
get_logger().error("Unexpected LDAP error: %s", tools.efmt(ex))
finally:
if conn is not None:
try:

View File

@ -435,10 +435,10 @@ class Plugin(BaseAuthService):
timeout=self.__timeout,
dict=dct,
)
request = client.CreateAuthPacket(code=pyrad.packet.AccessRequest, User_Name=user)
request["User-Password"] = request.PwCrypt(passwd)
response = client.SendPacket(request)
return (response.code == pyrad.packet.AccessAccept)
req = client.CreateAuthPacket(code=pyrad.packet.AccessRequest, User_Name=user)
req["User-Password"] = req.PwCrypt(passwd)
resp = client.SendPacket(req)
return (resp.code == pyrad.packet.AccessAccept)
except Exception:
get_logger().exception("Failed RADIUS auth request for user %r", user)
return False

View File

@ -91,7 +91,7 @@ class _TempRequestError(_RequestError):
# =====
class BasePhyConnection:
def send(self, request: bytes) -> bytes:
def send(self, req: bytes) -> bytes:
raise NotImplementedError
@ -374,7 +374,7 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
self.__set_state_online(False)
return False
def __process_request(self, conn: BasePhyConnection, request: bytes) -> bool: # pylint: disable=too-many-branches
def __process_request(self, conn: BasePhyConnection, req: bytes) -> bool: # pylint: disable=too-many-branches
logger = get_logger()
error_messages: list[str] = []
live_log_errors = False
@ -384,47 +384,47 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
error_retval = False
while self.__gpio.is_powered() and common_retries and read_retries:
response = (RESPONSE_LEGACY_OK if self.__noop else conn.send(request))
resp = (RESPONSE_LEGACY_OK if self.__noop else conn.send(req))
try:
if len(response) < 4:
if len(resp) < 4:
read_retries -= 1
raise _TempRequestError(f"No response from HID: request={request!r}")
raise _TempRequestError(f"No response from HID: request={req!r}")
if not check_response(response):
request = REQUEST_REPEAT
if not check_response(resp):
req = REQUEST_REPEAT
raise _TempRequestError("Invalid response CRC; requesting response again ...")
code = response[1]
code = resp[1]
if code == 0x48: # Request timeout # pylint: disable=no-else-raise
raise _TempRequestError(f"Got request timeout from HID: request={request!r}")
raise _TempRequestError(f"Got request timeout from HID: request={req!r}")
elif code == 0x40: # CRC Error
raise _TempRequestError(f"Got CRC error of request from HID: request={request!r}")
raise _TempRequestError(f"Got CRC error of request from HID: request={req!r}")
elif code == 0x45: # Unknown command
raise _PermRequestError(f"HID did not recognize the request={request!r}")
raise _PermRequestError(f"HID did not recognize the request={req!r}")
elif code == 0x24: # Rebooted?
raise _PermRequestError("No previous command state inside HID, seems it was rebooted")
elif code == 0x20: # Legacy done
self.__set_state_online(True)
return True
elif code & 0x80: # Pong/Done with state
self.__set_state_pong(response)
self.__set_state_pong(resp)
return True
raise _TempRequestError(f"Invalid response from HID: request={request!r}, response=0x{response!r}")
raise _TempRequestError(f"Invalid response from HID: request={req!r}, response=0x{resp!r}")
except _RequestError as err:
except _RequestError as ex:
common_retries -= 1
if live_log_errors:
logger.error(err.msg)
logger.error(ex.msg)
else:
error_messages.append(err.msg)
error_messages.append(ex.msg)
if len(error_messages) > self.__errors_threshold:
for msg in error_messages:
logger.error(msg)
error_messages = []
live_log_errors = True
if isinstance(err, _PermRequestError):
if isinstance(ex, _PermRequestError):
error_retval = True
break
@ -440,7 +440,7 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
for msg in error_messages:
logger.error(msg)
if not (common_retries and read_retries):
logger.error("Can't process HID request due many errors: %r", request)
logger.error("Can't process HID request due many errors: %r", req)
return error_retval
def __set_state_online(self, online: bool) -> None:
@ -449,11 +449,11 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
def __set_state_busy(self, busy: bool) -> None:
self.__state_flags.update(busy=int(busy))
def __set_state_pong(self, response: bytes) -> None:
status = response[1] << 16
if len(response) > 4:
status |= (response[2] << 8) | response[3]
reset_required = (1 if response[1] & 0b01000000 else 0)
def __set_state_pong(self, resp: bytes) -> None:
status = resp[1] << 16
if len(resp) > 4:
status |= (resp[2] << 8) | resp[3]
reset_required = (1 if resp[1] & 0b01000000 else 0)
self.__state_flags.update(online=1, busy=reset_required, status=status)
if reset_required:
if self.__reset_self:

View File

@ -47,12 +47,12 @@ class Gpio: # pylint: disable=too-many-instance-attributes
self.__reset_inverted = reset_inverted
self.__reset_delay = reset_delay
self.__line_request: (gpiod.LineRequest | None) = None
self.__line_req: (gpiod.LineRequest | None) = None
self.__last_power: (bool | None) = None
def __enter__(self) -> None:
if self.__power_detect_pin >= 0 or self.__reset_pin >= 0:
assert self.__line_request is None
assert self.__line_req is None
config: dict[int, gpiod.LineSettings] = {}
if self.__power_detect_pin >= 0:
config[self.__power_detect_pin] = gpiod.LineSettings(
@ -65,7 +65,7 @@ class Gpio: # pylint: disable=too-many-instance-attributes
output_value=gpiod.line.Value(self.__reset_inverted),
)
assert len(config) > 0
self.__line_request = gpiod.request_lines(
self.__line_req = gpiod.request_lines(
self.__device_path,
consumer="kvmd::hid",
config=config,
@ -78,18 +78,18 @@ class Gpio: # pylint: disable=too-many-instance-attributes
_tb: types.TracebackType,
) -> None:
if self.__line_request:
if self.__line_req:
try:
self.__line_request.release()
self.__line_req.release()
except Exception:
pass
self.__last_power = None
self.__line_request = None
self.__line_req = None
def is_powered(self) -> bool:
if self.__power_detect_pin >= 0:
assert self.__line_request
power = bool(self.__line_request.get_value(self.__power_detect_pin).value)
assert self.__line_req
power = bool(self.__line_req.get_value(self.__power_detect_pin).value)
if power != self.__last_power:
get_logger(0).info("HID power state changed: %s -> %s", self.__last_power, power)
self.__last_power = power
@ -98,11 +98,11 @@ class Gpio: # pylint: disable=too-many-instance-attributes
def reset(self) -> None:
if self.__reset_pin >= 0:
assert self.__line_request
assert self.__line_req
try:
self.__line_request.set_value(self.__reset_pin, gpiod.line.Value(not self.__reset_inverted))
self.__line_req.set_value(self.__reset_pin, gpiod.line.Value(not self.__reset_inverted))
time.sleep(self.__reset_delay)
finally:
self.__line_request.set_value(self.__reset_pin, gpiod.line.Value(self.__reset_inverted))
self.__line_req.set_value(self.__reset_pin, gpiod.line.Value(self.__reset_inverted))
time.sleep(1)
get_logger(0).info("Reset HID performed")

View File

@ -184,17 +184,17 @@ class MouseWheelEvent(BaseEvent):
# =====
def check_response(response: bytes) -> bool:
assert len(response) in (4, 8), response
return (bitbang.make_crc16(response[:-2]) == struct.unpack(">H", response[-2:])[0])
def check_response(resp: bytes) -> bool:
assert len(resp) in (4, 8), resp
return (bitbang.make_crc16(resp[:-2]) == struct.unpack(">H", resp[-2:])[0])
def _make_request(command: bytes) -> bytes:
assert len(command) == 5, command
request = b"\x33" + command
request += struct.pack(">H", bitbang.make_crc16(request))
assert len(request) == 8, request
return request
def _make_request(cmd: bytes) -> bytes:
assert len(cmd) == 5, cmd
req = b"\x33" + cmd
req += struct.pack(">H", bitbang.make_crc16(req))
assert len(req) == 8, req
return req
# =====

View File

@ -182,8 +182,8 @@ class BtServer: # pylint: disable=too-many-instance-attributes
self.__close_client("CTL", client, "ctl_sock")
elif data == b"\x71":
sock.send(b"\x00")
except Exception as err:
get_logger(0).exception("CTL socket error on %s: %s", client.addr, tools.efmt(err))
except Exception as ex:
get_logger(0).exception("CTL socket error on %s: %s", client.addr, tools.efmt(ex))
self.__close_client("CTL", client, "ctl_sock")
continue
@ -196,8 +196,8 @@ class BtServer: # pylint: disable=too-many-instance-attributes
self.__close_client("INT", client, "int_sock")
elif data[:2] == b"\xA2\x01":
self.__process_leds(data[2])
except Exception as err:
get_logger(0).exception("INT socket error on %s: %s", client.addr, tools.efmt(err))
except Exception as ex:
get_logger(0).exception("INT socket error on %s: %s", client.addr, tools.efmt(ex))
self.__close_client("INT", client, "ctl_sock")
if qr in ready_read:
@ -279,8 +279,8 @@ class BtServer: # pylint: disable=too-many-instance-attributes
assert client.int_sock is not None
try:
client.int_sock.send(report)
except Exception as err:
get_logger(0).info("Can't send %s report to %s: %s", name, client.addr, tools.efmt(err))
except Exception as ex:
get_logger(0).info("Can't send %s report to %s: %s", name, client.addr, tools.efmt(ex))
self.__close_client_pair(client)
def __clear_modifiers(self) -> None:
@ -371,13 +371,13 @@ class BtServer: # pylint: disable=too-many-instance-attributes
logger.info("Publishing ..." if public else "Unpublishing ...")
try:
self.__iface.set_public(public)
except Exception as err:
logger.error("Can't change public mode: %s", tools.efmt(err))
except Exception as ex:
logger.error("Can't change public mode: %s", tools.efmt(ex))
def __unpair_client(self, client: _BtClient) -> None:
logger = get_logger(0)
logger.info("Unpairing %s ...", client.addr)
try:
self.__iface.unpair(client.addr)
except Exception as err:
logger.error("Can't unpair %s: %s", client.addr, tools.efmt(err))
except Exception as ex:
logger.error("Can't unpair %s: %s", client.addr, tools.efmt(ex))

View File

@ -230,9 +230,9 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
def __process_cmd(self, conn: ChipConnection, cmd: bytes) -> bool: # pylint: disable=too-many-branches
try:
led_byte = conn.xfer(cmd)
except ChipResponseError as err:
except ChipResponseError as ex:
self.__set_state_online(False)
get_logger(0).info(err)
get_logger(0).error("Invalid chip response: %s", tools.efmt(ex))
time.sleep(2)
else:
if led_byte >= 0:

View File

@ -192,13 +192,13 @@ class BaseDeviceProcess(multiprocessing.Process): # pylint: disable=too-many-in
else:
logger.error("HID-%s write() error: written (%s) != report length (%d)",
self.__name, written, len(report))
except Exception as err:
if isinstance(err, OSError) and (
except Exception as ex:
if isinstance(ex, OSError) and (
# https://github.com/raspberrypi/linux/commit/61b7f805dc2fd364e0df682de89227e94ce88e25
err.errno == errno.EAGAIN # pylint: disable=no-member
or err.errno == errno.ESHUTDOWN # pylint: disable=no-member
ex.errno == errno.EAGAIN # pylint: disable=no-member
or ex.errno == errno.ESHUTDOWN # pylint: disable=no-member
):
logger.debug("HID-%s busy/unplugged (write): %s", self.__name, tools.efmt(err))
logger.debug("HID-%s busy/unplugged (write): %s", self.__name, tools.efmt(ex))
else:
logger.exception("Can't write report to HID-%s", self.__name)
@ -216,16 +216,16 @@ class BaseDeviceProcess(multiprocessing.Process): # pylint: disable=too-many-in
while read:
try:
read = bool(select.select([self.__fd], [], [], 0)[0])
except Exception as err:
logger.error("Can't select() for read HID-%s: %s", self.__name, tools.efmt(err))
except Exception as ex:
logger.error("Can't select() for read HID-%s: %s", self.__name, tools.efmt(ex))
break
if read:
try:
report = os.read(self.__fd, self.__read_size)
except Exception as err:
if isinstance(err, OSError) and err.errno == errno.EAGAIN: # pylint: disable=no-member
logger.debug("HID-%s busy/unplugged (read): %s", self.__name, tools.efmt(err))
except Exception as ex:
if isinstance(ex, OSError) and ex.errno == errno.EAGAIN: # pylint: disable=no-member
logger.debug("HID-%s busy/unplugged (read): %s", self.__name, tools.efmt(ex))
else:
logger.exception("Can't read report from HID-%s", self.__name)
else:
@ -255,9 +255,9 @@ class BaseDeviceProcess(multiprocessing.Process): # pylint: disable=too-many-in
flags = os.O_NONBLOCK
flags |= (os.O_RDWR if self.__read_size else os.O_WRONLY)
self.__fd = os.open(self.__device_path, flags)
except Exception as err:
except Exception as ex:
logger.error("Can't open HID-%s device %s: %s",
self.__name, self.__device_path, tools.efmt(err))
self.__name, self.__device_path, tools.efmt(ex))
if self.__fd >= 0:
try:
@ -268,8 +268,8 @@ class BaseDeviceProcess(multiprocessing.Process): # pylint: disable=too-many-in
else:
# Если запись недоступна, то скорее всего устройство отключено
logger.debug("HID-%s is busy/unplugged (write select)", self.__name)
except Exception as err:
logger.error("Can't select() for write HID-%s: %s", self.__name, tools.efmt(err))
except Exception as ex:
logger.error("Can't select() for write HID-%s: %s", self.__name, tools.efmt(ex))
self.__state_flags.update(online=False)
return False

View File

@ -44,12 +44,12 @@ class _SerialPhyConnection(BasePhyConnection):
def __init__(self, tty: serial.Serial) -> None:
self.__tty = tty
def send(self, request: bytes) -> bytes:
assert len(request) == 8
assert request[0] == 0x33
def send(self, req: bytes) -> bytes:
assert len(req) == 8
assert req[0] == 0x33
if self.__tty.in_waiting:
self.__tty.read_all()
assert self.__tty.write(request) == 8
assert self.__tty.write(req) == 8
data = self.__tty.read(4)
if len(data) == 4:
if data[0] == 0x34: # New response protocol

View File

@ -57,9 +57,9 @@ class _SpiPhyConnection(BasePhyConnection):
self.__xfer = xfer
self.__read_timeout = read_timeout
def send(self, request: bytes) -> bytes:
assert len(request) == 8
assert request[0] == 0x33
def send(self, req: bytes) -> bytes:
assert len(req) == 8
assert req[0] == 0x33
deadline_ts = time.monotonic() + self.__read_timeout
dummy = b"\x00" * 10
@ -70,26 +70,26 @@ class _SpiPhyConnection(BasePhyConnection):
get_logger(0).error("SPI timeout reached while garbage reading")
return b""
self.__xfer(request)
self.__xfer(req)
response: list[int] = []
resp: list[int] = []
deadline_ts = time.monotonic() + self.__read_timeout
found = False
while time.monotonic() < deadline_ts:
for byte in self.__xfer(b"\x00" * (9 - len(response))):
for byte in self.__xfer(b"\x00" * (9 - len(resp))):
if not found:
if byte == 0:
continue
found = True
response.append(byte)
if len(response) == 8:
resp.append(byte)
if len(resp) == 8:
break
if len(response) == 8:
if len(resp) == 8:
break
else:
get_logger(0).error("SPI timeout reached while responce waiting")
return b""
return bytes(response)
return bytes(resp)
class _SpiPhy(BasePhy): # pylint: disable=too-many-instance-attributes

View File

@ -82,7 +82,7 @@ class Drive:
try:
with open(os.path.join(self.__lun_path, param), "w") as file:
file.write(value + "\n")
except OSError as err:
if err.errno == errno.EBUSY:
except OSError as ex:
if ex.errno == errno.EBUSY:
raise MsdDriveLockedError()
raise

View File

@ -113,13 +113,13 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
while True:
session = self.__ensure_http_session()
try:
async with session.get(f"{self.__url}/strg.cfg") as response:
htclient.raise_not_200(response)
parts = (await response.text()).split(";")
async with session.get(f"{self.__url}/strg.cfg") as resp:
htclient.raise_not_200(resp)
parts = (await resp.text()).split(";")
for pin in self.__state:
self.__state[pin] = (parts[1 + int(pin) * 5] == "1")
except Exception as err:
get_logger().error("Failed ANELPWR bulk GET request: %s", tools.efmt(err))
except Exception as ex:
get_logger().error("Failed ANELPWR bulk GET request: %s", tools.efmt(ex))
self.__state = dict.fromkeys(self.__state, None)
if self.__state != prev_state:
self._notifier.notify()
@ -143,10 +143,10 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
url=f"{self.__url}/ctrl.htm",
data=f"F{pin}={int(state)}",
headers={"Content-Type": "text/plain"},
) as response:
htclient.raise_not_200(response)
except Exception as err:
get_logger().error("Failed ANELPWR POST request to pin %s: %s", pin, tools.efmt(err))
) as resp:
htclient.raise_not_200(resp)
except Exception as ex:
get_logger().error("Failed ANELPWR POST request to pin %s: %s", pin, tools.efmt(ex))
raise GpioDriverOfflineError(self)
self.__update_notifier.notify()

View File

@ -78,9 +78,9 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
proc = await aioproc.log_process(self.__cmd, logger=get_logger(0), prefix=str(self))
if proc.returncode != 0:
raise RuntimeError(f"Custom command error: retcode={proc.returncode}")
except Exception as err:
except Exception as ex:
get_logger(0).error("Can't run custom command [ %s ]: %s",
tools.cmdfmt(self.__cmd), tools.efmt(err))
tools.cmdfmt(self.__cmd), tools.efmt(ex))
raise GpioDriverOfflineError(self)
def __str__(self) -> str:

View File

@ -71,9 +71,9 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
try:
proc = await aioproc.log_process(self.__cmd, logger=get_logger(0), prefix=str(self))
return (proc.returncode == 0)
except Exception as err:
except Exception as ex:
get_logger(0).error("Can't run custom command [ %s ]: %s",
tools.cmdfmt(self.__cmd), tools.efmt(err))
tools.cmdfmt(self.__cmd), tools.efmt(ex))
raise GpioDriverOfflineError(self)
async def write(self, pin: str, state: bool) -> None:

View File

@ -150,9 +150,9 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
assert channel is not None
self.__send_channel(tty, channel)
except Exception as err:
except Exception as ex:
self.__channel_queue.put_nowait(None)
if isinstance(err, serial.SerialException) and err.errno == errno.ENOENT: # pylint: disable=no-member
if isinstance(ex, serial.SerialException) and ex.errno == errno.ENOENT: # pylint: disable=no-member
logger.error("Missing %s serial device: %s", self, self.__device_path)
else:
logger.exception("Unexpected %s error", self)

View File

@ -150,9 +150,9 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
assert channel is not None
self.__send_channel(tty, channel)
except Exception as err:
except Exception as ex:
self.__channel_queue.put_nowait(None)
if isinstance(err, serial.SerialException) and err.errno == errno.ENOENT: # pylint: disable=no-member
if isinstance(ex, serial.SerialException) and ex.errno == errno.ENOENT: # pylint: disable=no-member
logger.error("Missing %s serial device: %s", self, self.__device_path)
else:
logger.exception("Unexpected %s error", self)

View File

@ -54,7 +54,7 @@ class Plugin(BaseUserGpioDriver):
self.__output_pins: dict[int, (bool | None)] = {}
self.__reader: (aiogp.AioReader | None) = None
self.__outputs_request: (gpiod.LineRequest | None) = None
self.__outputs_req: (gpiod.LineRequest | None) = None
@classmethod
def get_plugin_options(cls) -> dict:
@ -74,7 +74,7 @@ class Plugin(BaseUserGpioDriver):
def prepare(self) -> None:
assert self.__reader is None
assert self.__outputs_request is None
assert self.__outputs_req is None
self.__reader = aiogp.AioReader(
path=self.__device_path,
consumer="kvmd::gpio::inputs",
@ -82,7 +82,7 @@ class Plugin(BaseUserGpioDriver):
notifier=self._notifier,
)
if self.__output_pins:
self.__outputs_request = gpiod.request_lines(
self.__outputs_req = gpiod.request_lines(
self.__device_path,
consumer="kvmd::gpiod::outputs",
config={
@ -99,9 +99,9 @@ class Plugin(BaseUserGpioDriver):
await self.__reader.poll()
async def cleanup(self) -> None:
if self.__outputs_request:
if self.__outputs_req:
try:
self.__outputs_request.release()
self.__outputs_req.release()
except Exception:
pass
@ -110,15 +110,15 @@ class Plugin(BaseUserGpioDriver):
pin_int = int(pin)
if pin_int in self.__input_pins:
return self.__reader.get(pin_int)
assert self.__outputs_request
assert self.__outputs_req
assert pin_int in self.__output_pins
return bool(self.__outputs_request.get_value(pin_int).value)
return bool(self.__outputs_req.get_value(pin_int).value)
async def write(self, pin: str, state: bool) -> None:
assert self.__outputs_request
assert self.__outputs_req
pin_int = int(pin)
assert pin_int in self.__output_pins
self.__outputs_request.set_value(pin_int, gpiod.line.Value(state))
self.__outputs_req.set_value(pin_int, gpiod.line.Value(state))
def __str__(self) -> str:
return f"GPIO({self._instance_name})"

View File

@ -93,9 +93,9 @@ class Plugin(BaseUserGpioDriver):
try:
with self.__ensure_device("probing"):
pass
except Exception as err:
except Exception as ex:
logger.error("Can't probe %s on %s: %s",
self, self.__device_path, tools.efmt(err))
self, self.__device_path, tools.efmt(ex))
self.__reset_pins()
async def run(self) -> None:
@ -137,9 +137,9 @@ class Plugin(BaseUserGpioDriver):
pin, state, self, self.__device_path)
try:
self.__inner_write(pin, state)
except Exception as err:
except Exception as ex:
logger.error("Can't reset pin=%d of %s on %s: %s",
pin, self, self.__device_path, tools.efmt(err))
pin, self, self.__device_path, tools.efmt(ex))
def __inner_read(self, pin: int) -> bool:
assert 0 <= pin <= 7
@ -168,9 +168,9 @@ class Plugin(BaseUserGpioDriver):
get_logger(0).info("Opened %s on %s while %s", self, self.__device_path, context)
try:
yield self.__device
except Exception as err:
except Exception as ex:
get_logger(0).error("Error occured on %s on %s while %s: %s",
self, self.__device_path, context, tools.efmt(err))
self, self.__device_path, context, tools.efmt(ex))
self.__close_device()
raise

View File

@ -111,13 +111,13 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
while True:
session = self.__ensure_http_session()
try:
async with session.get(f"{self.__url}/api/{self.__token}/lights") as response:
results = await response.json()
async with session.get(f"{self.__url}/api/{self.__token}/lights") as resp:
results = await resp.json()
for pin in self.__state:
if pin in results:
self.__state[pin] = bool(results[pin]["state"]["on"])
except Exception as err:
get_logger().error("Failed Hue bulk GET request: %s", tools.efmt(err))
except Exception as ex:
get_logger().error("Failed Hue bulk GET request: %s", tools.efmt(ex))
self.__state = dict.fromkeys(self.__state, None)
if self.__state != prev_state:
self._notifier.notify()
@ -140,10 +140,10 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
async with session.put(
url=f"{self.__url}/api/{self.__token}/lights/{pin}/state",
json={"on": state},
) as response:
htclient.raise_not_200(response)
except Exception as err:
get_logger().error("Failed Hue PUT request to pin %s: %s", pin, tools.efmt(err))
) as resp:
htclient.raise_not_200(resp)
except Exception as ex:
get_logger().error("Failed Hue PUT request to pin %s: %s", pin, tools.efmt(ex))
raise GpioDriverOfflineError(self)
self.__update_notifier.notify()

View File

@ -153,9 +153,9 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
proc = await aioproc.log_process(**self.__make_ipmitool_kwargs(action), logger=get_logger(0), prefix=str(self))
if proc.returncode != 0:
raise RuntimeError(f"Ipmitool error: retcode={proc.returncode}")
except Exception as err:
except Exception as ex:
get_logger(0).error("Can't send IPMI power-%s request to %s:%d: %s",
action, self.__host, self.__port, tools.efmt(err))
action, self.__host, self.__port, tools.efmt(ex))
raise GpioDriverOfflineError(self)
# =====
@ -171,9 +171,9 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
self.__online = True
return
raise RuntimeError(f"Invalid ipmitool response: {text}")
except Exception as err:
except Exception as ex:
get_logger(0).error("Can't fetch IPMI power status from %s:%d: %s",
self.__host, self.__port, tools.efmt(err))
self.__host, self.__port, tools.efmt(ex))
self.__power = False
self.__online = False

View File

@ -53,7 +53,7 @@ class Plugin(BaseUserGpioDriver):
self.__device_path = device_path
self.__tasks: dict[int, (asyncio.Task | None)] = {}
self.__line_request: (gpiod.LineRequest | None) = None
self.__line_req: (gpiod.LineRequest | None) = None
@classmethod
def get_plugin_options(cls) -> dict:
@ -74,7 +74,7 @@ class Plugin(BaseUserGpioDriver):
self.__tasks[int(pin)] = None
def prepare(self) -> None:
self.__line_request = gpiod.request_lines(
self.__line_req = gpiod.request_lines(
self.__device_path,
consumer="kvmd::locator",
config={
@ -94,9 +94,9 @@ class Plugin(BaseUserGpioDriver):
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
if self.__line_request:
if self.__line_req:
try:
self.__line_request.release()
self.__line_req.release()
except Exception:
pass
@ -115,17 +115,17 @@ class Plugin(BaseUserGpioDriver):
async def __blink(self, pin: int) -> None:
assert pin in self.__tasks
assert self.__line_request
assert self.__line_req
try:
state = True
while True:
self.__line_request.set_value(pin, gpiod.line.Value(state))
self.__line_req.set_value(pin, gpiod.line.Value(state))
state = (not state)
await asyncio.sleep(0.1)
except asyncio.CancelledError:
pass
finally:
self.__line_request.set_value(pin, gpiod.line.Value(False))
self.__line_req.set_value(pin, gpiod.line.Value(False))
def __str__(self) -> str:
return f"Locator({self._instance_name})"

View File

@ -91,9 +91,9 @@ class Plugin(BaseUserGpioDriver):
try:
with self.__ensure_device("probing"):
pass
except Exception as err:
except Exception as ex:
logger.error("Can't probe %s on %s: %s",
self, self.__device_path, tools.efmt(err))
self, self.__device_path, tools.efmt(ex))
self.__reset_pins()
async def cleanup(self) -> None:
@ -119,9 +119,9 @@ class Plugin(BaseUserGpioDriver):
pin, state, self, self.__device_path)
try:
self.__inner_write(pin, state)
except Exception as err:
except Exception as ex:
logger.error("Can't reset pin=%d of %s on %s: %s",
pin, self, self.__device_path, tools.efmt(err))
pin, self, self.__device_path, tools.efmt(ex))
def __inner_write(self, pin: int, state: bool) -> None:
assert 0 <= pin <= 7
@ -144,9 +144,9 @@ class Plugin(BaseUserGpioDriver):
get_logger(0).info("Opened %s on %s while %s", self, self.__device_path, context)
try:
yield self.__device
except Exception as err:
except Exception as ex:
get_logger(0).error("Error occured on %s on %s while %s: %s",
self, self.__device_path, context, tools.efmt(err))
self, self.__device_path, context, tools.efmt(ex))
self.__close_device()
raise

View File

@ -153,9 +153,9 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
assert channel is not None
self.__send_channel(tty, channel)
except Exception as err:
except Exception as ex:
self.__channel_queue.put_nowait(None)
if isinstance(err, serial.SerialException) and err.errno == errno.ENOENT: # pylint: disable=no-member
if isinstance(ex, serial.SerialException) and ex.errno == errno.ENOENT: # pylint: disable=no-member
logger.error("Missing %s serial device: %s", self, self.__device_path)
else:
logger.exception("Unexpected %s error", self)

View File

@ -94,18 +94,18 @@ class Plugin(BaseUserGpioDriver):
pwm.period_ns = self.__period
pwm.duty_cycle_ns = self.__get_duty_cycle(bool(initial))
pwm.enable()
except Exception as err:
except Exception as ex:
logger.error("Can't get PWM chip %d channel %d: %s",
self.__chip, pin, tools.efmt(err))
self.__chip, pin, tools.efmt(ex))
async def cleanup(self) -> None:
for (pin, pwm) in self.__pwms.items():
try:
pwm.disable()
pwm.close()
except Exception as err:
except Exception as ex:
get_logger(0).error("Can't cleanup PWM chip %d channel %d: %s",
self.__chip, pin, tools.efmt(err))
self.__chip, pin, tools.efmt(ex))
async def read(self, pin: str) -> bool:
try:

View File

@ -146,9 +146,9 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
asyncio.ensure_future(self.__reader.readexactly(6)),
timeout=self.__timeout,
))[4]
except Exception as err:
except Exception as ex:
get_logger(0).error("Can't send command to TESmart KVM [%s]:%d: %s",
self.__host, self.__port, tools.efmt(err))
self.__host, self.__port, tools.efmt(ex))
await self.__close_device()
self.__active = -1
raise GpioDriverOfflineError(self)
@ -168,9 +168,9 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
asyncio.ensure_future(asyncio.open_connection(self.__host, self.__port)),
timeout=self.__timeout,
)
except Exception as err:
except Exception as ex:
get_logger(0).error("Can't connect to TESmart KVM [%s]:%d: %s",
self.__host, self.__port, tools.efmt(err))
self.__host, self.__port, tools.efmt(ex))
raise GpioDriverOfflineError(self)
async def __ensure_device_serial(self) -> None:
@ -179,9 +179,9 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
serial_asyncio.open_serial_connection(url=self.__device_path, baudrate=self.__speed),
timeout=self.__timeout,
)
except Exception as err:
except Exception as ex:
get_logger(0).error("Can't connect to TESmart KVM [%s]:%d: %s",
self.__device_path, self.__speed, tools.efmt(err))
self.__device_path, self.__speed, tools.efmt(ex))
raise GpioDriverOfflineError(self)
async def __close_device(self) -> None:

View File

@ -157,9 +157,9 @@ class Plugin(BaseUserGpioDriver): # pylint: disable=too-many-instance-attribute
if self.__protocol == 2:
self.__channel_queue.put_nowait(channel)
except Exception as err:
except Exception as ex:
self.__channel_queue.put_nowait(None)
if isinstance(err, serial.SerialException) and err.errno == errno.ENOENT: # pylint: disable=no-member
if isinstance(ex, serial.SerialException) and ex.errno == errno.ENOENT: # pylint: disable=no-member
logger.error("Missing %s serial device: %s", self, self.__device_path)
else:
logger.exception("Unexpected %s error", self)

View File

@ -39,8 +39,8 @@ def cmdfmt(cmd: list[str]) -> str:
return " ".join(map(shlex.quote, cmd))
def efmt(err: Exception) -> str:
return f"{type(err).__name__}: {err}"
def efmt(ex: Exception) -> str:
return f"{type(ex).__name__}: {ex}"
# =====

View File

@ -112,8 +112,8 @@ def valid_ssl_ciphers(arg: Any) -> str:
arg = valid_stripped_string_not_empty(arg, name)
try:
ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER).set_ciphers(arg)
except Exception as err:
raise ValidatorError(f"The argument {arg!r} is not a valid {name}: {err}")
except Exception as ex:
raise ValidatorError(f"The argument {arg!r} is not a valid {name}: {ex}")
return arg

View File

@ -55,8 +55,8 @@ def valid_abs_path(arg: Any, type: str="", name: str="") -> str: # pylint: disa
if type:
try:
st = os.stat(arg)
except Exception as err:
raise_error(arg, f"{name}: {err}")
except Exception as ex:
raise_error(arg, f"{name}: {ex}")
else:
if not getattr(stat, f"S_IS{type.upper()}")(st.st_mode):
raise_error(arg, name)

View File

@ -143,8 +143,8 @@ class Option:
def manual_validated(value: Any, *path: str) -> Generator[None, None, None]:
try:
yield
except (TypeError, ValueError) as err:
raise ConfigError(f"Invalid value {value!r} for key {'/'.join(path)!r}: {err}")
except (TypeError, ValueError) as ex:
raise ConfigError(f"Invalid value {value!r} for key {'/'.join(path)!r}: {ex}")
def make_config(raw: dict[str, Any], scheme: dict[str, Any], _keys: tuple[str, ...]=()) -> Section:
@ -185,8 +185,8 @@ def make_config(raw: dict[str, Any], scheme: dict[str, Any], _keys: tuple[str, .
else:
try:
value = option.type(value)
except (TypeError, ValueError) as err:
raise ConfigError(f"Invalid value {value!r} for key {make_full_name(key)!r}: {err}")
except (TypeError, ValueError) as ex:
raise ConfigError(f"Invalid value {value!r} for key {make_full_name(key)!r}: {ex}")
config[key] = value
config._set_meta( # pylint: disable=protected-access

View File

@ -40,9 +40,9 @@ def load_yaml_file(path: str) -> Any:
with open(path) as file:
try:
return yaml.load(file, _YamlLoader)
except Exception as err:
except Exception as ex:
# Reraise internal exception as standard ValueError and show the incorrect file
raise ValueError(f"Invalid YAML in the file {path!r}:\n{tools.efmt(err)}") from None
raise ValueError(f"Invalid YAML in the file {path!r}:\n{tools.efmt(ex)}") from None
# =====

View File

@ -32,10 +32,10 @@ from . import get_configured_auth_service
# =====
async def _handle_auth(request: aiohttp.web.BaseRequest) -> aiohttp.web.Response:
async def _handle_auth(req: aiohttp.web.BaseRequest) -> aiohttp.web.Response:
status = 400
if request.method == "POST":
credentials = (await request.json())
if req.method == "POST":
credentials = (await req.json())
if credentials["user"] == "admin" and credentials["passwd"] == "pass":
status = 200
return aiohttp.web.Response(text=str(status), status=status)

View File

@ -141,8 +141,8 @@ export function Msd() {
if (!result.ok) {
msg = `Can't upload image to the Mass Storage Drive:<br>${result_str}`;
}
} catch (err) {
msg = `Can't parse upload result:<br>${err}`;
} catch (ex) {
msg = `Can't parse upload result:<br>${ex}`;
}
if (msg.length > 0) {
wm.error(msg);

View File

@ -214,8 +214,8 @@ export function Recorder() {
__events = events;
__events_time = events_time;
} catch (err) {
wm.error(`Invalid script: ${err}`);
} catch (ex) {
wm.error(`Invalid script: ${ex}`);
}
el_input.value = "";

View File

@ -411,8 +411,8 @@ export function Session() {
throw new Error("Too many missed heartbeats");
}
__ws.send("{\"event_type\": \"ping\", \"event\": {}}");
} catch (err) {
__wsErrorHandler(err.message);
} catch (ex) {
__wsErrorHandler(ex.message);
}
};

View File

@ -435,8 +435,8 @@ JanusStreamer.ensure_janus = function(callback) {
callback(true);
},
});
}).catch((err) => {
tools.error("Stream: Can't import Janus module:", err);
}).catch((ex) => {
tools.error("Stream: Can't import Janus module:", ex);
callback(false);
});
} else {

View File

@ -145,10 +145,10 @@ function __WindowManager() {
/************************************************************************/
self.copyTextToClipboard = function(text) {
let workaround = function(err) {
let workaround = function(ex) {
// https://stackoverflow.com/questions/60317969/document-execcommandcopy-not-working-even-though-the-dom-element-is-created
let callback = function() {
tools.error("copyTextToClipboard(): navigator.clipboard.writeText() is not working:", err);
tools.error("copyTextToClipboard(): navigator.clipboard.writeText() is not working:", ex);
tools.info("copyTextToClipboard(): Trying a workaround...");
let el = document.createElement("textarea");
@ -164,16 +164,16 @@ function __WindowManager() {
el.setSelectionRange(0, el.value.length); // iOS
try {
err = (document.execCommand("copy") ? null : "Unknown error");
} catch (err) { // eslint-disable-line no-unused-vars
ex = (document.execCommand("copy") ? null : "Unknown error");
} catch (ex) { // eslint-disable-line no-unused-vars
}
// Remove the added textarea again:
document.body.removeChild(el);
if (err) {
tools.error("copyTextToClipboard(): Workaround failed:", err);
wm.error("Can't copy text to the clipboard:<br>", err);
if (ex) {
tools.error("copyTextToClipboard(): Workaround failed:", ex);
wm.error("Can't copy text to the clipboard:<br>", ex);
}
};
__modalDialog("Info", "Press OK to copy the text to the clipboard", true, false, callback);
@ -181,8 +181,8 @@ function __WindowManager() {
if (navigator.clipboard) {
navigator.clipboard.writeText(text).then(function() {
wm.info("The text has been copied to the clipboard");
}, function(err) {
workaround(err);
}, function(ex) {
workaround(ex);
});
} else {
workaround("navigator.clipboard is not available");