moved wol to gpio

This commit is contained in:
Devaev Maxim
2021-07-17 01:57:01 +03:00
parent 688ddca549
commit 0c500aa0c9
10 changed files with 80 additions and 166 deletions

View File

@@ -79,7 +79,6 @@ from ..validators.os import valid_options
from ..validators.os import valid_command
from ..validators.net import valid_ip_or_host
from ..validators.net import valid_ip
from ..validators.net import valid_net
from ..validators.net import valid_port
from ..validators.net import valid_ports_list
@@ -183,7 +182,7 @@ def _init_config(config_path: str, override_options: List[str], **load_flags: bo
raise SystemExit(f"ConfigError: {err}")
def _patch_raw(raw_config: Dict) -> None:
def _patch_raw(raw_config: Dict) -> None: # pylint: disable=too-many-branches
if isinstance(raw_config.get("otg"), dict):
for (old, new) in [
("msd", "msd"),
@@ -195,6 +194,23 @@ def _patch_raw(raw_config: Dict) -> None:
raw_config["otg"]["devices"] = {}
raw_config["otg"]["devices"][new] = raw_config["otg"].pop(old)
if isinstance(raw_config.get("kvmd"), dict) and isinstance(raw_config["kvmd"].get("wol"), dict):
if not isinstance(raw_config["kvmd"].get("gpio"), dict):
raw_config["kvmd"]["gpio"] = {}
for section in ["drivers", "scheme"]:
if not isinstance(raw_config["kvmd"]["gpio"].get(section), dict):
raw_config["kvmd"]["gpio"][section] = {}
raw_config["kvmd"]["gpio"]["drivers"]["__wol__"] = {
"type": "wol",
**raw_config["kvmd"].pop("wol"),
}
raw_config["kvmd"]["gpio"]["scheme"]["__wol__"] = {
"driver": "__wol__",
"pin": 0,
"mode": "output",
"switch": False,
}
if isinstance(raw_config.get("kvmd"), dict) and isinstance(raw_config["kvmd"].get("streamer"), dict):
streamer_config = raw_config["kvmd"]["streamer"]
@@ -359,12 +375,6 @@ def _get_config_scheme() -> Dict:
},
},
"wol": {
"ip": Option("255.255.255.255", type=functools.partial(valid_ip, v6=False)),
"port": Option(9, type=valid_port),
"mac": Option("", type=_make_ifarg(valid_mac, "")),
},
"hid": {
"type": Option("", type=valid_stripped_string_not_empty),

View File

@@ -34,7 +34,6 @@ from .. import init
from .auth import AuthManager
from .info import InfoManager
from .logreader import LogReader
from .wol import WakeOnLan
from .ugpio import UserGpio
from .streamer import Streamer
from .snapshoter import Snapshoter
@@ -86,7 +85,6 @@ def main(argv: Optional[List[str]]=None) -> None:
),
info_manager=InfoManager(global_config),
log_reader=LogReader(),
wol=WakeOnLan(**config.wol._unpack()),
user_gpio=UserGpio(config.gpio, global_config.otg.udc),
hid=hid,

View File

@@ -1,46 +0,0 @@
# ========================================================================== #
# #
# KVMD - The main Pi-KVM daemon. #
# #
# Copyright (C) 2018-2021 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
# ========================================================================== #
from aiohttp.web import Request
from aiohttp.web import Response
from ..wol import WakeOnLan
from ..http import exposed_http
from ..http import make_json_response
# =====
class WolApi:
def __init__(self, wol: WakeOnLan) -> None:
self.__wol = wol
# =====
@exposed_http("GET", "/wol")
async def __state_handler(self, _: Request) -> Response:
return make_json_response(await self.__wol.get_state())
@exposed_http("POST", "/wol/wakeup")
async def __wakeup_handler(self, _: Request) -> Response:
await self.__wol.wakeup()
return make_json_response()

View File

@@ -65,7 +65,6 @@ from ... import aioproc
from .auth import AuthManager
from .info import InfoManager
from .logreader import LogReader
from .wol import WakeOnLan
from .ugpio import UserGpio
from .streamer import Streamer
from .snapshoter import Snapshoter
@@ -85,7 +84,6 @@ from .api.auth import check_request_auth
from .api.info import InfoApi
from .api.log import LogApi
from .api.wol import WolApi
from .api.ugpio import UserGpioApi
from .api.hid import HidApi
from .api.atx import AtxApi
@@ -148,7 +146,6 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
auth_manager: AuthManager,
info_manager: InfoManager,
log_reader: LogReader,
wol: WakeOnLan,
user_gpio: UserGpio,
hid: BaseHid,
@@ -186,7 +183,6 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
for sub in sorted(info_manager.get_subs())
],
*[
_Component("Wake-on-LAN", "wol_state", wol),
_Component("User-GPIO", "gpio_state", user_gpio),
_Component("HID", "hid_state", hid),
_Component("ATX", "atx_state", atx),
@@ -201,7 +197,6 @@ class KvmdServer(HttpServer): # pylint: disable=too-many-arguments,too-many-ins
AuthApi(auth_manager),
InfoApi(info_manager),
LogApi(log_reader),
WolApi(wol),
UserGpioApi(user_gpio),
self.__hid_api,
AtxApi(atx),

View File

@@ -1,87 +0,0 @@
# ========================================================================== #
# #
# KVMD - The main Pi-KVM daemon. #
# #
# Copyright (C) 2018-2021 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
# ========================================================================== #
import socket
from typing import Dict
from typing import Optional
from ...logging import get_logger
from ...errors import OperationError
from ... import aiotools
# =====
class WolDisabledError(OperationError):
def __init__(self) -> None:
super().__init__("WoL is disabled")
# =====
class WakeOnLan:
def __init__(self, ip: str, port: int, mac: str) -> None:
self.__ip = ip
self.__port = port
self.__mac = mac
self.__magic = b""
if mac:
assert len(mac) == 17, mac
self.__magic = bytes.fromhex("FF" * 6 + mac.replace(":", "") * 16)
async def get_state(self) -> Dict:
return {
"enabled": bool(self.__magic),
"target": {
"ip": self.__ip,
"port": self.__port,
"mac": self.__mac,
},
}
@aiotools.atomic
async def wakeup(self) -> None:
if not self.__magic:
raise WolDisabledError()
logger = get_logger(0)
logger.info("Waking up %s (%s:%s) using Wake-on-LAN ...", self.__mac, self.__ip, self.__port)
sock: Optional[socket.socket] = None
try:
# TODO: IPv6 support: http://lists.cluenet.de/pipermail/ipv6-ops/2014-September/010139.html
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.connect((self.__ip, self.__port))
sock.send(self.__magic)
except Exception:
logger.exception("Can't send Wake-on-LAN packet")
else:
logger.info("Wake-on-LAN packet sent")
finally:
if sock:
try:
sock.close()
except Exception:
pass