This commit is contained in:
Devaev Maxim
2020-11-03 10:15:19 +03:00
parent 5f407fd4a0
commit 52a6eb3d98
7 changed files with 879 additions and 5 deletions

View File

@@ -0,0 +1,208 @@
# ========================================================================== #
# #
# KVMD - The main Pi-KVM daemon. #
# #
# Copyright (C) 2018 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
# ========================================================================== #
import os
import multiprocessing
import time
from typing import Tuple
from typing import Dict
from typing import Iterable
from typing import AsyncGenerator
from typing import Optional
from ....logging import get_logger
from ....yamlconf import Option
from ....validators.basic import valid_bool
from ....validators.basic import valid_stripped_string_not_empty
from ....validators.basic import valid_int_f1
from ....validators.basic import valid_float_f01
from .... import aiotools
from .... import aiomulti
from .... import aioproc
from .. import BaseHid
from ..otg.events import ResetEvent
from ..otg.events import make_keyboard_event
from ..otg.events import MouseButtonEvent
from ..otg.events import MouseRelativeEvent
from ..otg.events import MouseWheelEvent
from .sdp import make_sdp_record
from .bluez import BluezIface
from .server import BtServer
# =====
class Plugin(BaseHid): # pylint: disable=too-many-instance-attributes
# https://github.com/SySS-Research/bluetooth-keyboard-emulator
# https://github.com/nutki/bt-keyboard-switcher
# https://gist.github.com/whitelynx/9f9bd4cb266b3924c64dfdff14bce2e8
# https://archlinuxarm.org/forum/viewtopic.php?f=67&t=14244
def __init__( # pylint: disable=too-many-arguments,too-many-locals,super-init-not-called
self,
manufacturer: str,
product: str,
description: str,
iface: str,
alias: str,
pairing_required: bool,
auth_required: bool,
control_public: bool,
unpair_on_close: bool,
max_clients: int,
socket_timeout: float,
select_timeout: float,
) -> None:
self.__proc: Optional[multiprocessing.Process] = None
self.__stop_event = multiprocessing.Event()
self.__notifier = aiomulti.AioProcessNotifier()
self.__server = BtServer(
iface=BluezIface(
iface=iface,
alias=alias,
sdp_record=make_sdp_record(manufacturer, product, description),
pairing_required=pairing_required,
auth_required=auth_required,
),
control_public=control_public,
unpair_on_close=unpair_on_close,
max_clients=max_clients,
socket_timeout=socket_timeout,
select_timeout=select_timeout,
notifier=self.__notifier,
stop_event=self.__stop_event,
)
@classmethod
def get_plugin_options(cls) -> Dict:
return {
"manufacturer": Option("Pi-KVM"),
"product": Option("HID Device"),
"description": Option("Bluetooth Keyboard & Mouse"),
"iface": Option("hci0", type=valid_stripped_string_not_empty),
"alias": Option("Pi-KVM HID"),
"pairing_required": Option(True, type=valid_bool),
"auth_required": Option(False, type=valid_bool),
"control_public": Option(True, type=valid_bool),
"unpair_on_close": Option(True, type=valid_bool),
"max_clients": Option(1, type=valid_int_f1),
"socket_timeout": Option(5.0, type=valid_float_f01),
"select_timeout": Option(1.0, type=valid_float_f01),
}
def sysprep(self) -> None:
get_logger(0).info("Starting HID daemon ...")
self.__proc = multiprocessing.Process(target=self.__server_worker, daemon=True)
self.__proc.start()
async def get_state(self) -> Dict:
state = await self.__server.get_state()
return {
"online": state["online"],
"keyboard": {
"online": state["online"],
"leds": {
"caps": state["caps"],
"scroll": state["scroll"],
"num": state["num"],
},
},
"mouse": {
"online": state["online"],
"absolute": False,
},
}
async def poll_state(self) -> AsyncGenerator[Dict, None]:
prev_state: Dict = {}
while True:
state = await self.get_state()
if state != prev_state:
yield state
prev_state = state
await self.__notifier.wait()
async def reset(self) -> None:
self.clear_events()
self.__server.queue_event(ResetEvent())
@aiotools.atomic
async def cleanup(self) -> None:
if self.__proc is not None:
if self.__proc.is_alive():
get_logger(0).info("Stopping HID daemon ...")
self.__stop_event.set()
if self.__proc.exitcode is not None:
self.__proc.join()
# =====
def send_key_events(self, keys: Iterable[Tuple[str, bool]]) -> None:
for (key, state) in keys:
self.__server.queue_event(make_keyboard_event(key, state))
def send_mouse_button_event(self, button: str, state: bool) -> None:
self.__server.queue_event(MouseButtonEvent(button, state))
def send_mouse_move_event(self, to_x: int, to_y: int) -> None:
_ = to_x # No absolute events
_ = to_y
def send_mouse_relative_event(self, delta_x: int, delta_y: int) -> None:
self.__server.queue_event(MouseRelativeEvent(delta_x, delta_y))
def send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None:
self.__server.queue_event(MouseWheelEvent(delta_x, delta_y))
def clear_events(self) -> None:
self.__server.clear_events()
# =====
def __server_worker(self) -> None: # pylint: disable=too-many-branches
logger = get_logger(0)
logger.info("Started HID pid=%d", os.getpid())
aioproc.ignore_sigint()
aioproc.rename_process("hid")
while not self.__stop_event.is_set():
try:
self.__server.run()
except Exception:
logger.exception("Unexpected HID error")
time.sleep(5)

View File

@@ -0,0 +1,110 @@
# ========================================================================== #
# #
# KVMD - The main Pi-KVM daemon. #
# #
# Copyright (C) 2018 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
# ========================================================================== #
import types
from typing import Type
from typing import Optional
from typing import Any
import dbus
import dbus.proxies
# =====
HID_CTL_PORT = 17
HID_INT_PORT = 19
# =====
class BluezIface:
# https://git.kernel.org/pub/scm/bluetooth/bluez.git/tree/doc/profile-api.txt
# https://git.kernel.org/pub/scm/bluetooth/bluez.git/tree/doc/adapter-api.txt
# qdbus --system org.bluez /org/bluez/hci0 org.bluez.Adapter1.Address
def __init__(
self,
iface: str,
alias: str,
sdp_record: str,
pairing_required: bool,
auth_required: bool,
) -> None:
self.__iface = iface
self.__alias = alias
self.__sdp_record = sdp_record
self.__pairing_required = pairing_required
self.__auth_required = auth_required
self.__bus: Optional[dbus.SystemBus] = None
def get_address(self) -> str:
return self.__get_prop("Address")
def configure(self) -> None:
self.__set_prop("Alias", self.__alias)
manager = dbus.Interface(self.__get_object("/org/bluez"), "org.bluez.ProfileManager1")
manager.RegisterProfile(f"/org/bluez/{self.__iface}", "00001124-0000-1000-8000-00805F9B34FB", {
"ServiceRecord": self.__sdp_record,
"Role": "server",
"RequireAuthentication": self.__pairing_required,
"RequireAuthorization": self.__auth_required,
})
self.__set_prop("Powered", True)
def set_public(self, public: bool) -> None:
self.__set_prop("Pairable", public)
self.__set_prop("Discoverable", public)
def unpair(self, addr: str) -> None:
adapter = dbus.Interface(self.__get_object(f"/org/bluez/{self.__iface}"), "org.bluez.Adapter1")
adapter.RemoveDevice(f"/org/bluez/hci0/dev_{addr.upper().replace(':', '_')}")
def __get_prop(self, key: str) -> Any:
return self.__get_props().Get("org.bluez.Adapter1", key)
def __set_prop(self, key: str, value: Any) -> None:
self.__get_props().Set("org.bluez.Adapter1", key, value)
def __get_props(self) -> dbus.Interface:
return dbus.Interface(self.__get_object(f"/org/bluez/{self.__iface}"), "org.freedesktop.DBus.Properties")
def __get_object(self, path: str) -> dbus.proxies.ProxyObject:
assert self.__bus is not None
return self.__bus.get_object("org.bluez", path)
def __enter__(self) -> "BluezIface":
assert self.__bus is None
self.__bus = dbus.SystemBus()
return self
def __exit__(
self,
_exc_type: Type[BaseException],
_exc: BaseException,
_tb: types.TracebackType,
) -> None:
assert self.__bus is not None
self.__bus.close()
self.__bus = None

152
kvmd/plugins/hid/bt/sdp.py Normal file
View File

@@ -0,0 +1,152 @@
# ========================================================================== #
# #
# KVMD - The main Pi-KVM daemon. #
# #
# Copyright (C) 2018 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
# ========================================================================== #
from xml.sax.saxutils import escape
from ....apps.otg.hid.keyboard import make_keyboard_hid
from ....apps.otg.hid.mouse import make_mouse_hid
# =====
def make_sdp_record(manufacturer: str, product: str, description: str) -> str:
manufacturer = escape(manufacturer)
product = escape(product)
description = escape(description)
keyboard_descriptor = make_keyboard_hid(0x01).report_descriptor.hex().upper()
mouse_descriptor = make_mouse_hid(False, False, 0x02).report_descriptor.hex().upper()
return f"""
<?xml version="1.0" encoding="UTF-8" ?>
<record>
<attribute id="0x0001">
<sequence>
<uuid value="0x1124" />
</sequence>
</attribute>
<attribute id="0x0004">
<sequence>
<sequence>
<uuid value="0x0100" />
<uint16 value="0x0011" />
</sequence>
<sequence>
<uuid value="0x0011" />
</sequence>
</sequence>
</attribute>
<attribute id="0x0005">
<sequence>
<uuid value="0x1002" />
</sequence>
</attribute>
<attribute id="0x0006">
<sequence>
<uint16 value="0x656E" />
<uint16 value="0x006A" />
<uint16 value="0x0100" />
</sequence>
</attribute>
<attribute id="0x0009">
<sequence>
<sequence>
<uuid value="0x1124" />
<uint16 value="0x0100" />
</sequence>
</sequence>
</attribute>
<attribute id="0x000D">
<sequence>
<sequence>
<sequence>
<uuid value="0x0100" />
<uint16 value="0x0013" />
</sequence>
<sequence>
<uuid value="0x0011" />
</sequence>
</sequence>
</sequence>
</attribute>
<attribute id="0x0100">
<text value="{product}" />
</attribute>
<attribute id="0x0101">
<text value="{description}" />
</attribute>
<attribute id="0x0102">
<text value="{manufacturer}" />
</attribute>
<attribute id="0x0200">
<uint16 value="0x0100" />
</attribute>
<attribute id="0x0201">
<uint16 value="0x0111" />
</attribute>
<attribute id="0x0202">
<uint8 value="0xC0" />
</attribute>
<attribute id="0x0203">
<uint8 value="0x00" />
</attribute>
<attribute id="0x0204">
<boolean value="false" />
</attribute>
<attribute id="0x0205">
<boolean value="false" />
</attribute>
<attribute id="0x0206">
<sequence>
<sequence>
<uint8 value="0x22" />
<text encoding="hex" value="{keyboard_descriptor}{mouse_descriptor}" />
</sequence>
</sequence>
</attribute>
<attribute id="0x0207">
<sequence>
<sequence>
<uint16 value="0x0409" />
<uint16 value="0x0100" />
</sequence>
</sequence>
</attribute>
<attribute id="0x020B">
<uint16 value="0x0100" />
</attribute>
<attribute id="0x020C">
<uint16 value="0x0C80" />
</attribute>
<attribute id="0x020D">
<boolean value="false" />
</attribute>
<attribute id="0x020E">
<boolean value="false" />
</attribute>
<attribute id="0x020F">
<uint16 value="0x0640" />
</attribute>
<attribute id="0x0210">
<uint16 value="0x0320" />
</attribute>
</record>
"""

View File

@@ -0,0 +1,390 @@
# ========================================================================== #
# #
# KVMD - The main Pi-KVM daemon. #
# #
# Copyright (C) 2018 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
# ========================================================================== #
import socket
import select
import multiprocessing
import multiprocessing.synchronize
import dataclasses
import contextlib
import queue
from typing import Literal
from typing import List
from typing import Dict
from typing import Set
from typing import Generator
from typing import Optional
from ....logging import get_logger
from .... import tools
from .... import aiomulti
from ....keyboard.mappings import OtgKey
from ..otg.events import BaseEvent
from ..otg.events import ClearEvent
from ..otg.events import ResetEvent
from ..otg.events import get_led_caps
from ..otg.events import get_led_scroll
from ..otg.events import get_led_num
from ..otg.events import MouseButtonEvent
from ..otg.events import MouseRelativeEvent
from ..otg.events import MouseWheelEvent
from ..otg.events import make_mouse_report
from ..otg.events import KeyEvent
from ..otg.events import ModifierEvent
from ..otg.events import make_keyboard_report
from .bluez import HID_CTL_PORT
from .bluez import HID_INT_PORT
from .bluez import BluezIface
# =====
_RoleT = Literal["CTL", "INT"]
_SockAttrT = Literal["ctl_sock", "int_sock"]
@dataclasses.dataclass
class _BtClient:
addr: str
ctl_sock: Optional[socket.socket] = None
int_sock: Optional[socket.socket] = None
# =====
class BtServer: # pylint: disable=too-many-instance-attributes
def __init__(
self,
iface: BluezIface,
control_public: bool,
unpair_on_close: bool,
max_clients: int,
socket_timeout: float,
select_timeout: float,
notifier: aiomulti.AioProcessNotifier,
stop_event: multiprocessing.synchronize.Event,
) -> None:
self.__iface = iface
self.__control_public = control_public
self.__unpair_on_close = unpair_on_close
self.__max_clients = max_clients
self.__socket_timeout = socket_timeout
self.__select_timeout = select_timeout
self.__stop_event = stop_event
self.__clients: Dict[str, _BtClient] = {}
self.__to_read: Set[socket.socket] = set()
self.__events_queue: "multiprocessing.Queue[BaseEvent]" = multiprocessing.Queue()
self.__state_flags = aiomulti.AioSharedFlags({
"online": False,
"caps": False,
"scroll": False,
"num": False,
}, notifier)
self.__modifiers: Set[OtgKey] = set()
self.__keys: List[Optional[OtgKey]] = [None] * 6
self.__mouse_buttons = 0
def run(self) -> None:
with self.__iface:
self.__iface.configure()
self.__set_public(True)
addr = self.__iface.get_address()
try:
with self.__listen("CTL", addr, HID_CTL_PORT) as ctl_sock:
with self.__listen("INT", addr, HID_INT_PORT) as int_sock:
self.__main_loop(ctl_sock, int_sock)
finally:
self.__close_all_clients(no_change_public=True)
self.__set_public(False)
async def get_state(self) -> Dict:
return (await self.__state_flags.get())
def queue_event(self, event: BaseEvent) -> None:
if not self.__stop_event.is_set():
self.__events_queue.put_nowait(event)
def clear_events(self) -> None:
# FIXME: Если очистка производится со стороны процесса хида, то возможна гонка между
# очисткой и добавлением события ClearEvent. Неприятно, но не смертельно.
# Починить блокировкой после перехода на асинхронные очереди.
tools.clear_queue(self.__events_queue)
self.queue_event(ClearEvent())
# =====
@contextlib.contextmanager
def __listen(self, role: _RoleT, addr: str, port: int) -> Generator[socket.socket, None, None]:
get_logger(0).info("Listening [%s]:%d for %s ...", addr, port, role)
with socket.socket(socket.AF_BLUETOOTH, socket.SOCK_SEQPACKET, socket.BTPROTO_L2CAP) as sock:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.settimeout(self.__socket_timeout)
sock.bind((addr, port))
sock.listen(5)
yield sock
def __main_loop( # pylint: disable=too-many-branches
self,
server_ctl_sock: socket.socket,
server_int_sock: socket.socket,
) -> None:
qr = self.__events_queue._reader # type: ignore # pylint: disable=protected-access
self.__to_read = set([qr, server_ctl_sock, server_int_sock])
self.__clients = {}
while not self.__stop_event.is_set():
(ready_read, _, _) = select.select(self.__to_read, [], [], self.__select_timeout)
if server_ctl_sock in ready_read:
self.__accept_client("CTL", server_ctl_sock, "ctl_sock")
if server_int_sock in ready_read:
self.__accept_client("INT", server_int_sock, "int_sock")
for client in list(self.__clients.values()):
sock = client.ctl_sock
if sock in ready_read:
assert sock is not None
try:
data = sock.recv(1024)
if not data:
self.__close_client("CTL", client, "ctl_sock")
elif data == b"\x71":
sock.send(b"\x00")
except Exception as err:
get_logger(0).exception("CTL socket error on %s: %s: %s",
client.addr, type(err).__name__, err)
self.__close_client("CTL", client, "ctl_sock")
continue
sock = client.int_sock
if sock in ready_read:
assert sock is not None
try:
data = sock.recv(1024)
if not data:
self.__close_client("INT", client, "int_sock")
elif data[:2] == b"\xA2\x01":
self.__process_leds(data[2])
except Exception as err:
get_logger(0).exception("INT socket error on %s: %s: %s",
client.addr, type(err).__name__, err)
self.__close_client("INT", client, "ctl_sock")
if qr in ready_read:
self.__process_events()
# =====
def __process_leds(self, leds: int) -> None:
self.__state_flags.update(
caps=get_led_caps(leds),
scroll=get_led_scroll(leds),
num=get_led_num(leds),
)
def __process_events(self) -> None:
for _ in range(self.__events_queue.qsize()):
try:
event = self.__events_queue.get_nowait()
except queue.Empty:
break
else:
if isinstance(event, ResetEvent):
self.__close_all_clients()
return
elif isinstance(event, ClearEvent):
self.__clear_modifiers()
self.__clear_keys()
self.__mouse_buttons = 0
self.__send_keyboard_state()
self.__send_mouse_state(0, 0, 0)
elif isinstance(event, ModifierEvent):
if event.modifier in self.__modifiers: # Ранее нажатый модификатор отжимаем
self.__modifiers.remove(event.modifier)
self.__send_keyboard_state()
if event.state: # Нажимаем если нужно
self.__modifiers.add(event.modifier)
self.__send_keyboard_state()
elif isinstance(event, KeyEvent):
if event.key in self.__keys: # Ранее нажатую клавишу отжимаем
self.__keys[self.__keys.index(event.key)] = None
self.__send_keyboard_state()
elif event.state and None not in self.__keys: # Если слоты полны - отжимаем всё
self.__clear_keys()
self.__send_keyboard_state()
if event.state: # Нажимаем если нужно
self.__keys[self.__keys.index(None)] = event.key
self.__send_keyboard_state()
elif isinstance(event, MouseButtonEvent):
if event.code & self.__mouse_buttons: # Ранее нажатую кнопку отжимаем
self.__mouse_buttons &= ~event.code
self.__send_mouse_state(0, 0, 0)
if event.state: # Нажимаем если нужно
self.__mouse_buttons |= event.code
self.__send_mouse_state(0, 0, 0)
elif isinstance(event, MouseRelativeEvent):
self.__send_mouse_state(event.delta_x, event.delta_y, 0)
elif isinstance(event, MouseWheelEvent):
self.__send_mouse_state(0, 0, event.delta_y)
def __send_keyboard_state(self) -> None:
for client in list(self.__clients.values()):
if client.int_sock is not None:
report = make_keyboard_report(self.__modifiers, self.__keys)
self.__send_report(client, "keyboard", b"\xA1\x01" + report)
def __send_mouse_state(self, move_x: int, move_y: int, wheel_y: int) -> None:
for client in list(self.__clients.values()):
if client.int_sock is not None:
report = make_mouse_report(False, self.__mouse_buttons, move_x, move_y, None, wheel_y)
self.__send_report(client, "mouse", b"\xA1\x02" + report)
def __send_report(self, client: _BtClient, name: str, report: bytes) -> None:
assert client.int_sock is not None
try:
client.int_sock.send(report)
except Exception as err:
get_logger(0).info("Can't send %s report to %s: %s: %s",
name, client.addr, type(err).__name__, err)
self.__close_client_pair(client)
def __clear_modifiers(self) -> None:
self.__modifiers.clear()
def __clear_keys(self) -> None:
self.__keys = [None] * 6
def __clear_state(self) -> None:
self.__state_flags.update(
online=False,
caps=False,
scroll=False,
num=False,
)
self.__clear_modifiers()
self.__clear_keys()
self.__mouse_buttons = 0
# =====
def __accept_client(self, role: _RoleT, server_sock: socket.socket, sock_attr: _SockAttrT) -> None:
try:
(sock, peer) = server_sock.accept()
sock.setblocking(True)
except Exception:
get_logger(0).exception("Can't accept %s client", role)
else:
if peer[0] not in self.__clients:
if len(self.__clients) >= self.__max_clients:
self.__close_sock(sock)
get_logger(0).info("Refused %s client: %s: max clients reached", role, peer[0])
return
self.__clients[peer[0]] = _BtClient(peer[0])
client = self.__clients[peer[0]]
assert hasattr(client, sock_attr)
setattr(client, sock_attr, sock)
self.__to_read.add(sock)
get_logger(0).info("Accepted %s client: %s", role, peer[0])
self.__state_flags.update(online=True)
self.__set_public(len(self.__clients) < self.__max_clients)
def __close_client(self, role: _RoleT, client: _BtClient, sock_attr: _SockAttrT, no_change_public: bool=False) -> None:
sock = getattr(client, sock_attr)
if sock is not None:
self.__close_sock(sock)
setattr(client, sock_attr, None)
self.__to_read.remove(sock)
get_logger(0).info("Closed %s client %s", role, client.addr)
if client.ctl_sock is None and client.int_sock is None:
self.__clients.pop(client.addr)
if self.__unpair_on_close:
self.__unpair_client(client)
if len(self.__clients) == 0:
self.__clear_state()
if not no_change_public:
self.__set_public(len(self.__clients) < self.__max_clients)
def __close_client_pair(self, client: _BtClient, no_change_public: bool=False) -> None:
self.__close_client("CTL", client, "ctl_sock", no_change_public)
self.__close_client("INT", client, "int_sock", no_change_public)
def __close_all_clients(self, no_change_public: bool=False) -> None:
for client in list(self.__clients.values()):
self.__close_client_pair(client, no_change_public)
self.__clear_state()
if not no_change_public:
self.__set_public(True)
def __close_sock(self, sock: socket.socket) -> None:
try:
sock.close()
except Exception:
pass
# =====
def __set_public(self, public: bool) -> None:
logger = get_logger(0)
if self.__control_public:
logger.info("Publishing ..." if public else "Unpublishing ...")
try:
self.__iface.set_public(public)
except Exception as err:
logger.error("Can't change public mode: %s: %s", type(err).__name__, err)
def __unpair_client(self, client: _BtClient) -> None:
logger = get_logger(0)
logger.info("Unpairing %s ...", client.addr)
try:
self.__iface.unpair(client.addr)
except Exception as err:
logger.error("Can't unpair %s: %s: %s", client.addr, type(err).__name__, err)