atx plugin

This commit is contained in:
Devaev Maxim 2019-09-11 20:54:27 +03:00
parent 2535892723
commit ca2eabc01f
8 changed files with 216 additions and 86 deletions

View File

@ -38,6 +38,7 @@ import pygments.formatters
from ..plugins import UnknownPluginError
from ..plugins.auth import get_auth_service_class
from ..plugins.hid import get_hid_class
from ..plugins.atx import get_atx_class
from ..yamlconf import ConfigError
from ..yamlconf import make_config
@ -117,6 +118,7 @@ def _init_config(config_path: str, sections: List[str], override_options: List[s
scheme["kvmd"]["auth"]["external"].update(get_auth_service_class(config.kvmd.auth.external.type).get_plugin_options())
scheme["kvmd"]["hid"].update(get_hid_class(config.kvmd.hid.type).get_plugin_options())
scheme["kvmd"]["atx"].update(get_atx_class(config.kvmd.atx.type).get_plugin_options())
config = make_config(raw_config, scheme)
@ -183,19 +185,7 @@ def _get_config_scheme(sections: List[str]) -> Dict:
},
"atx": {
"enabled": Option(True, type=valid_bool),
"power_led_pin": Option(-1, type=valid_gpio_pin, only_if="enabled"),
"hdd_led_pin": Option(-1, type=valid_gpio_pin, only_if="enabled"),
"power_led_inverted": Option(True, type=valid_bool),
"hdd_led_inverted": Option(True, type=valid_bool),
"power_switch_pin": Option(-1, type=valid_gpio_pin, only_if="enabled"),
"reset_switch_pin": Option(-1, type=valid_gpio_pin, only_if="enabled"),
"click_delay": Option(0.1, type=valid_float_f01),
"long_click_delay": Option(5.5, type=valid_float_f01),
"state_poll": Option(0.1, type=valid_float_f01),
"type": Option("gpio"),
},
"msd": {

View File

@ -51,8 +51,10 @@ def main(argv: Optional[List[str]]=None) -> None:
*([
("hid_reset_pin", config.hid.reset_pin, True),
] if config.hid.type == "tty" else []),
("atx_power_switch_pin", config.atx.power_switch_pin, config.atx.enabled),
("atx_reset_switch_pin", config.atx.reset_switch_pin, config.atx.enabled),
*([
("atx_power_switch_pin", config.atx.power_switch_pin, True),
("atx_reset_switch_pin", config.atx.reset_switch_pin, True),
] if config.atx.type == "gpio" else []),
("msd_target_pin", config.msd.target_pin, config.msd.enabled),
("msd_reset_pin", config.msd.reset_pin, config.msd.enabled),
("streamer_cap_pin", config.streamer.cap_pin, True),

View File

@ -28,13 +28,13 @@ from ...logging import get_logger
from ... import gpio
from ...plugins.hid import get_hid_class
from ...plugins.atx import get_atx_class
from .. import init
from .auth import AuthManager
from .info import InfoManager
from .logreader import LogReader
from .atx import Atx
from .msd import MassStorageDevice
from .streamer import Streamer
from .server import Server
@ -63,7 +63,7 @@ def main(argv: Optional[List[str]]=None) -> None:
log_reader=LogReader(),
hid=get_hid_class(config.hid.type)(**config.hid._unpack(ignore=["type"])),
atx=Atx(**config.atx._unpack()),
atx=get_atx_class(config.atx.type)(**config.atx._unpack(ignore=["type"])),
msd=MassStorageDevice(**config.msd._unpack()),
streamer=Streamer(**config.streamer._unpack()),
).run(**config.server._unpack())

View File

@ -46,6 +46,9 @@ from ...aioregion import RegionIsBusyError
from ...plugins.hid import BaseHid
from ...plugins.atx import AtxOperationError
from ...plugins.atx import BaseAtx
from ...validators import ValidatorError
from ...validators.basic import valid_bool
@ -72,8 +75,6 @@ from ... import __version__
from .auth import AuthManager
from .info import InfoManager
from .logreader import LogReader
from .atx import AtxOperationError
from .atx import Atx
from .msd import MsdOperationError
from .msd import MassStorageDevice
from .streamer import Streamer
@ -232,7 +233,7 @@ class Server: # pylint: disable=too-many-instance-attributes
log_reader: LogReader,
hid: BaseHid,
atx: Atx,
atx: BaseAtx,
msd: MassStorageDevice,
streamer: Streamer,
) -> None:

View File

@ -0,0 +1,82 @@
# ========================================================================== #
# #
# KVMD - The main Pi-KVM daemon. #
# #
# Copyright (C) 2018 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
# ========================================================================== #
from typing import Dict
from typing import AsyncGenerator
from typing import Type
from ... import aioregion
from .. import BasePlugin
from .. import get_plugin_class
# =====
class AtxError(Exception):
pass
class AtxOperationError(AtxError):
pass
class AtxIsBusyError(AtxOperationError, aioregion.RegionIsBusyError):
pass
# =====
class BaseAtx(BasePlugin):
def get_state(self) -> Dict:
raise NotImplementedError
async def poll_state(self) -> AsyncGenerator[Dict, None]:
yield {}
raise NotImplementedError
async def power_on(self) -> bool:
raise NotImplementedError
async def power_off(self) -> bool:
raise NotImplementedError
async def power_off_hard(self) -> bool:
raise NotImplementedError
async def power_reset_hard(self) -> bool:
raise NotImplementedError
async def click_power(self) -> None:
raise NotImplementedError
async def click_power_long(self) -> None:
raise NotImplementedError
async def click_reset(self) -> None:
raise NotImplementedError
async def cleanup(self) -> None:
pass
# =====
def get_atx_class(name: str) -> Type[BaseAtx]:
return get_plugin_class("atx", (name or "none")) # type: ignore

View File

@ -24,9 +24,7 @@ import asyncio
import operator
from typing import Dict
from typing import Callable
from typing import AsyncGenerator
from typing import Any
from ...logging import get_logger
@ -34,37 +32,22 @@ from ... import aiotools
from ... import aioregion
from ... import gpio
from ...yamlconf import Option
from ...validators.basic import valid_bool
from ...validators.basic import valid_float_f01
from ...validators.hw import valid_gpio_pin
from . import AtxIsBusyError
from . import BaseAtx
# =====
class AtxError(Exception):
pass
class AtxOperationError(AtxError):
pass
class AtxDisabledError(AtxOperationError):
def __init__(self) -> None:
super().__init__("ATX is disabled")
class AtxIsBusyError(AtxOperationError, aioregion.RegionIsBusyError):
pass
def _atx_working(method: Callable) -> Callable:
async def wrapper(self: "Atx", *args: Any, **kwargs: Any) -> Any:
if not self._enabled: # pylint: disable=protected-access
raise AtxDisabledError()
return (await method(self, *args, **kwargs))
return wrapper
class Atx: # pylint: disable=too-many-instance-attributes
def __init__( # pylint: disable=too-many-arguments
class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes
def __init__( # pylint: disable=too-many-arguments,super-init-not-called
self,
enabled: bool,
power_led_pin: int,
hdd_led_pin: int,
@ -79,18 +62,10 @@ class Atx: # pylint: disable=too-many-instance-attributes
state_poll: float,
) -> None:
self._enabled = enabled
if self._enabled:
self.__power_led_pin = gpio.set_input(power_led_pin)
self.__hdd_led_pin = gpio.set_input(hdd_led_pin)
self.__power_switch_pin = gpio.set_output(power_switch_pin)
self.__reset_switch_pin = gpio.set_output(reset_switch_pin)
else:
self.__power_led_pin = -1
self.__hdd_led_pin = -1
self.__power_switch_pin = -1
self.__reset_switch_pin = -1
self.__power_led_inverted = power_led_inverted
self.__hdd_led_inverted = hdd_led_inverted
@ -102,32 +77,40 @@ class Atx: # pylint: disable=too-many-instance-attributes
self.__region = aioregion.AioExclusiveRegion(AtxIsBusyError)
def get_state(self) -> Dict:
if self._enabled:
power_led_state = operator.xor(self.__power_led_inverted, gpio.read(self.__power_led_pin))
hdd_led_state = operator.xor(self.__hdd_led_inverted, gpio.read(self.__hdd_led_pin))
else:
power_led_state = hdd_led_state = False
@classmethod
def get_plugin_options(cls) -> Dict[str, Option]:
return {
"enabled": self._enabled,
"power_led_pin": Option(-1, type=valid_gpio_pin),
"hdd_led_pin": Option(-1, type=valid_gpio_pin),
"power_led_inverted": Option(True, type=valid_bool),
"hdd_led_inverted": Option(True, type=valid_bool),
"power_switch_pin": Option(-1, type=valid_gpio_pin),
"reset_switch_pin": Option(-1, type=valid_gpio_pin),
"click_delay": Option(0.1, type=valid_float_f01),
"long_click_delay": Option(5.5, type=valid_float_f01),
"state_poll": Option(0.1, type=valid_float_f01),
}
def get_state(self) -> Dict:
return {
"enabled": True,
"busy": self.__region.is_busy(),
"leds": {
"power": power_led_state,
"hdd": hdd_led_state,
"power": operator.xor(self.__power_led_inverted, gpio.read(self.__power_led_pin)),
"hdd": operator.xor(self.__hdd_led_inverted, gpio.read(self.__hdd_led_pin)),
},
}
async def poll_state(self) -> AsyncGenerator[Dict, None]:
prev_state: Dict = {}
while True:
if self._enabled:
state = self.get_state()
if state != prev_state:
yield state
prev_state = state
await asyncio.sleep(self.__state_poll)
else:
await asyncio.sleep(60)
async def cleanup(self) -> None:
for (name, pin) in [
@ -141,28 +124,24 @@ class Atx: # pylint: disable=too-many-instance-attributes
# =====
@_atx_working
async def power_on(self) -> bool:
if not self.get_state()["leds"]["power"]:
await self.click_power()
return True
return False
@_atx_working
async def power_off(self) -> bool:
if self.get_state()["leds"]["power"]:
await self.click_power()
return True
return False
@_atx_working
async def power_off_hard(self) -> bool:
if self.get_state()["leds"]["power"]:
await self.click_power_long()
return True
return False
@_atx_working
async def power_reset_hard(self) -> bool:
if self.get_state()["leds"]["power"]:
await self.click_reset()
@ -171,15 +150,12 @@ class Atx: # pylint: disable=too-many-instance-attributes
# =====
@_atx_working
async def click_power(self) -> None:
await self.__click("power", self.__power_switch_pin, self.__click_delay)
@_atx_working
async def click_power_long(self) -> None:
await self.__click("power_long", self.__power_switch_pin, self.__long_click_delay)
@_atx_working
async def click_reset(self) -> None:
await self.__click("reset", self.__reset_switch_pin, self.__click_delay)

78
kvmd/plugins/atx/none.py Normal file
View File

@ -0,0 +1,78 @@
# ========================================================================== #
# #
# KVMD - The main Pi-KVM daemon. #
# #
# Copyright (C) 2018 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
# ========================================================================== #
import asyncio
from typing import Dict
from typing import AsyncGenerator
from . import AtxOperationError
from . import BaseAtx
# =====
class AtxDisabledError(AtxOperationError):
def __init__(self) -> None:
super().__init__("ATX is disabled")
# =====
class Plugin(BaseAtx):
def get_state(self) -> Dict:
return {
"enabled": False,
"busy": False,
"leds": {
"power": False,
"hdd": False,
},
}
async def poll_state(self) -> AsyncGenerator[Dict, None]:
while True:
yield self.get_state()
await asyncio.sleep(60)
# =====
async def power_on(self) -> bool:
raise AtxDisabledError()
async def power_off(self) -> bool:
raise AtxDisabledError()
async def power_off_hard(self) -> bool:
raise AtxDisabledError()
async def power_reset_hard(self) -> bool:
raise AtxDisabledError()
# =====
async def click_power(self) -> None:
raise AtxDisabledError()
async def click_power_long(self) -> None:
raise AtxDisabledError()
async def click_reset(self) -> None:
raise AtxDisabledError()

View File

@ -82,6 +82,7 @@ def main() -> None:
"kvmd.plugins",
"kvmd.plugins.auth",
"kvmd.plugins.hid",
"kvmd.plugins.atx",
"kvmd.apps",
"kvmd.apps.kvmd",
"kvmd.apps.htpasswd",