mirror of
https://github.com/mofeng-git/One-KVM.git
synced 2025-12-12 17:20:30 +08:00
Remove all uses of assignment expressions.
This is needed to port to Python 3.7 because Raspbian 10 doesn't have Python 3.8. Signed-off-by: Oleg Girko <ol@infoserver.lv>
This commit is contained in:
parent
5307765399
commit
2dbf11428f
@ -177,5 +177,6 @@ async def run_region_task(
|
|||||||
|
|
||||||
if entered.done():
|
if entered.done():
|
||||||
return
|
return
|
||||||
if (exc := task.exception()) is not None:
|
exc = task.exception()
|
||||||
|
if exc is not None:
|
||||||
raise exc
|
raise exc
|
||||||
|
|||||||
@ -45,7 +45,8 @@ _COOKIE_AUTH_TOKEN = "auth_token"
|
|||||||
|
|
||||||
async def check_request_auth(auth_manager: AuthManager, exposed: HttpExposed, request: Request) -> None:
|
async def check_request_auth(auth_manager: AuthManager, exposed: HttpExposed, request: Request) -> None:
|
||||||
if exposed.auth_required and auth_manager.is_auth_enabled():
|
if exposed.auth_required and auth_manager.is_auth_enabled():
|
||||||
if (user := request.headers.get("X-KVMD-User", "")):
|
user = request.headers.get("X-KVMD-User", "")
|
||||||
|
if user:
|
||||||
user = valid_user(user)
|
user = valid_user(user)
|
||||||
passwd = request.headers.get("X-KVMD-Passwd", "")
|
passwd = request.headers.get("X-KVMD-Passwd", "")
|
||||||
set_request_auth_info(request, f"{user} (xhdr)")
|
set_request_auth_info(request, f"{user} (xhdr)")
|
||||||
@ -53,7 +54,8 @@ async def check_request_auth(auth_manager: AuthManager, exposed: HttpExposed, re
|
|||||||
raise ForbiddenError()
|
raise ForbiddenError()
|
||||||
return
|
return
|
||||||
|
|
||||||
elif (token := request.cookies.get(_COOKIE_AUTH_TOKEN, "")):
|
token = request.cookies.get(_COOKIE_AUTH_TOKEN, "")
|
||||||
|
if token:
|
||||||
user = auth_manager.check(valid_auth_token(token))
|
user = auth_manager.check(valid_auth_token(token))
|
||||||
if not user:
|
if not user:
|
||||||
set_request_auth_info(request, "- (token)")
|
set_request_auth_info(request, "- (token)")
|
||||||
@ -61,7 +63,8 @@ async def check_request_auth(auth_manager: AuthManager, exposed: HttpExposed, re
|
|||||||
set_request_auth_info(request, f"{user} (token)")
|
set_request_auth_info(request, f"{user} (token)")
|
||||||
return
|
return
|
||||||
|
|
||||||
elif (basic_auth := request.headers.get("Authorization", "")):
|
basic_auth = request.headers.get("Authorization", "")
|
||||||
|
if basic_auth:
|
||||||
if basic_auth[:6].lower() == "basic ":
|
if basic_auth[:6].lower() == "basic ":
|
||||||
try:
|
try:
|
||||||
(user, passwd) = base64.b64decode(basic_auth[6:]).decode("utf-8").split(":")
|
(user, passwd) = base64.b64decode(basic_auth[6:]).decode("utf-8").split(":")
|
||||||
|
|||||||
@ -56,11 +56,12 @@ class StreamerApi:
|
|||||||
|
|
||||||
@exposed_http("GET", "/streamer/snapshot")
|
@exposed_http("GET", "/streamer/snapshot")
|
||||||
async def __take_snapshot_handler(self, request: Request) -> Response:
|
async def __take_snapshot_handler(self, request: Request) -> Response:
|
||||||
if (snapshot := await self.__streamer.take_snapshot(
|
snapshot = await self.__streamer.take_snapshot(
|
||||||
save=valid_bool(request.query.get("save", "false")),
|
save=valid_bool(request.query.get("save", "false")),
|
||||||
load=valid_bool(request.query.get("load", "false")),
|
load=valid_bool(request.query.get("load", "false")),
|
||||||
allow_offline=valid_bool(request.query.get("allow_offline", "false")),
|
allow_offline=valid_bool(request.query.get("allow_offline", "false")),
|
||||||
)):
|
)
|
||||||
|
if snapshot:
|
||||||
if valid_bool(request.query.get("preview", "false")):
|
if valid_bool(request.query.get("preview", "false")):
|
||||||
data = await self.__make_preview(
|
data = await self.__make_preview(
|
||||||
snapshot=snapshot,
|
snapshot=snapshot,
|
||||||
|
|||||||
@ -77,8 +77,8 @@ def get_exposed_http(obj: object) -> List[HttpExposed]:
|
|||||||
auth_required=getattr(handler, _HTTP_AUTH_REQUIRED),
|
auth_required=getattr(handler, _HTTP_AUTH_REQUIRED),
|
||||||
handler=handler,
|
handler=handler,
|
||||||
)
|
)
|
||||||
for name in dir(obj)
|
for handler in [getattr(obj, name) for name in dir(obj)]
|
||||||
if inspect.ismethod(handler := getattr(obj, name)) and getattr(handler, _HTTP_EXPOSED, False)
|
if inspect.ismethod(handler) and getattr(handler, _HTTP_EXPOSED, False)
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
@ -107,8 +107,8 @@ def get_exposed_ws(obj: object) -> List[WsExposed]:
|
|||||||
event_type=getattr(handler, _WS_EVENT_TYPE),
|
event_type=getattr(handler, _WS_EVENT_TYPE),
|
||||||
handler=handler,
|
handler=handler,
|
||||||
)
|
)
|
||||||
for name in dir(obj)
|
for handler in [getattr(obj, name) for name in dir(obj)]
|
||||||
if inspect.ismethod(handler := getattr(obj, name)) and getattr(handler, _WS_EXPOSED, False)
|
if inspect.ismethod(handler) and getattr(handler, _WS_EXPOSED, False)
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -109,10 +109,11 @@ class HwInfoSubmanager(BaseInfoSubmanager):
|
|||||||
|
|
||||||
async def __get_throttling(self) -> Optional[Dict]:
|
async def __get_throttling(self) -> Optional[Dict]:
|
||||||
# https://www.raspberrypi.org/forums/viewtopic.php?f=63&t=147781&start=50#p972790
|
# https://www.raspberrypi.org/forums/viewtopic.php?f=63&t=147781&start=50#p972790
|
||||||
if (flags := await self.__parse_vcgencmd(
|
flags = await self.__parse_vcgencmd(
|
||||||
arg="get_throttled",
|
arg="get_throttled",
|
||||||
parser=(lambda text: int(text.split("=")[-1].strip(), 16)),
|
parser=(lambda text: int(text.split("=")[-1].strip(), 16)),
|
||||||
)) is not None:
|
)
|
||||||
|
if flags is not None:
|
||||||
return {
|
return {
|
||||||
"raw_flags": flags,
|
"raw_flags": flags,
|
||||||
"parsed_flags": {
|
"parsed_flags": {
|
||||||
|
|||||||
@ -215,7 +215,8 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
|
|||||||
("desired_fps", valid_stream_fps, None),
|
("desired_fps", valid_stream_fps, None),
|
||||||
("resolution", valid_stream_resolution, StreamerResolutionNotSupported),
|
("resolution", valid_stream_resolution, StreamerResolutionNotSupported),
|
||||||
]:
|
]:
|
||||||
if (value := request.query.get(name)):
|
value = request.query.get(name)
|
||||||
|
if (value):
|
||||||
if name not in current_params:
|
if name not in current_params:
|
||||||
assert exc_cls is not None, name
|
assert exc_cls is not None, name
|
||||||
raise exc_cls()
|
raise exc_cls()
|
||||||
|
|||||||
@ -363,7 +363,8 @@ class RfbClient(RfbClientStream): # pylint: disable=too-many-instance-attribute
|
|||||||
}
|
}
|
||||||
while True:
|
while True:
|
||||||
msg_type = await self._read_number("B")
|
msg_type = await self._read_number("B")
|
||||||
if (handler := handlers.get(msg_type)) is not None:
|
handler = handlers.get(msg_type)
|
||||||
|
if handler is not None:
|
||||||
await handler() # type: ignore # mypy bug
|
await handler() # type: ignore # mypy bug
|
||||||
else:
|
else:
|
||||||
raise RfbError(f"Unknown message type: {msg_type}")
|
raise RfbError(f"Unknown message type: {msg_type}")
|
||||||
|
|||||||
@ -239,7 +239,8 @@ class _Client(RfbClient): # pylint: disable=too-many-instance-attributes
|
|||||||
|
|
||||||
async def _on_key_event(self, code: int, state: bool) -> None:
|
async def _on_key_event(self, code: int, state: bool) -> None:
|
||||||
if self.__kvmd_ws:
|
if self.__kvmd_ws:
|
||||||
if (web_key := self.__symmap.get(code)) is not None:
|
web_key = self.__symmap.get(code)
|
||||||
|
if web_key is not None:
|
||||||
await self.__kvmd_ws.send_key_event(web_key.name, state)
|
await self.__kvmd_ws.send_key_event(web_key.name, state)
|
||||||
|
|
||||||
async def _on_pointer_event(self, buttons: Dict[str, bool], wheel: Dict[str, int], move: Dict[str, int]) -> None:
|
async def _on_pointer_event(self, buttons: Dict[str, bool], wheel: Dict[str, int], move: Dict[str, int]) -> None:
|
||||||
|
|||||||
@ -54,7 +54,8 @@ def build_symmap(path: str) -> Dict[int, SymmapWebKey]:
|
|||||||
(path, list(_read_keyboard_layout(path).items())),
|
(path, list(_read_keyboard_layout(path).items())),
|
||||||
]:
|
]:
|
||||||
for (code, key) in items:
|
for (code, key) in items:
|
||||||
if (web_name := AT1_TO_WEB.get(key.code)) is not None:
|
web_name = AT1_TO_WEB.get(key.code)
|
||||||
|
if web_name is not None:
|
||||||
if (
|
if (
|
||||||
(web_name in ["ShiftLeft", "ShiftRight"] and key.shift) # pylint: disable=too-many-boolean-expressions
|
(web_name in ["ShiftLeft", "ShiftRight"] and key.shift) # pylint: disable=too-many-boolean-expressions
|
||||||
or (web_name in ["AltLeft", "AltRight"] and key.altgr)
|
or (web_name in ["AltLeft", "AltRight"] and key.altgr)
|
||||||
@ -113,7 +114,8 @@ def _read_keyboard_layout(path: str) -> Dict[int, At1Key]: # Keysym to evdev (a
|
|||||||
|
|
||||||
parts = line.split()
|
parts = line.split()
|
||||||
if len(parts) >= 2:
|
if len(parts) >= 2:
|
||||||
if (x11_code := _resolve_keysym(parts[0])) != 0:
|
x11_code = _resolve_keysym(parts[0])
|
||||||
|
if x11_code != 0:
|
||||||
try:
|
try:
|
||||||
at1_code = int(parts[1], 16)
|
at1_code = int(parts[1], 16)
|
||||||
except ValueError as err:
|
except ValueError as err:
|
||||||
@ -128,7 +130,8 @@ def _read_keyboard_layout(path: str) -> Dict[int, At1Key]: # Keysym to evdev (a
|
|||||||
ctrl=("ctrl" in rest),
|
ctrl=("ctrl" in rest),
|
||||||
)
|
)
|
||||||
|
|
||||||
if "addupper" in rest and (x11_code := _resolve_keysym(parts[0].upper())) != 0:
|
x11_code = _resolve_keysym(parts[0].upper())
|
||||||
|
if "addupper" in rest and x11_code != 0:
|
||||||
layout[x11_code] = At1Key(
|
layout[x11_code] = At1Key(
|
||||||
code=at1_code,
|
code=at1_code,
|
||||||
shift=True,
|
shift=True,
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user