removed rpi.gpio

This commit is contained in:
Devaev Maxim
2020-09-13 21:43:52 +03:00
parent 0ad0d17528
commit 5ed0c27f1f
7 changed files with 33 additions and 239 deletions

View File

@@ -25,8 +25,6 @@ from typing import Optional
from ...logging import get_logger
from ... import gpio
from ...plugins.hid import get_hid_class
from ...plugins.atx import get_atx_class
from ...plugins.msd import get_msd_class
@@ -45,6 +43,8 @@ from .server import KvmdServer
# =====
def main(argv: Optional[List[str]]=None) -> None:
# pylint: disable=protected-access
config = init(
prog="kvmd",
description="The main Pi-KVM daemon",
@@ -56,48 +56,45 @@ def main(argv: Optional[List[str]]=None) -> None:
load_gpio=True,
)[2]
with gpio.bcm():
# pylint: disable=protected-access
msd_kwargs = config.kvmd.msd._unpack(ignore=["type"])
if config.kvmd.msd.type == "otg":
msd_kwargs["gadget"] = config.otg.gadget # XXX: Small crutch to pass gadget name to plugin
msd_kwargs = config.kvmd.msd._unpack(ignore=["type"])
if config.kvmd.msd.type == "otg":
msd_kwargs["gadget"] = config.otg.gadget # XXX: Small crutch to pass gadget name to plugin
global_config = config
config = config.kvmd
global_config = config
config = config.kvmd
hid = get_hid_class(config.hid.type)(**config.hid._unpack(ignore=["type", "keymap"]))
streamer = Streamer(**config.streamer._unpack())
hid = get_hid_class(config.hid.type)(**config.hid._unpack(ignore=["type", "keymap"]))
streamer = Streamer(**config.streamer._unpack())
KvmdServer(
auth_manager=AuthManager(
internal_type=config.auth.internal.type,
internal_kwargs=config.auth.internal._unpack(ignore=["type", "force_users"]),
external_type=config.auth.external.type,
external_kwargs=(config.auth.external._unpack(ignore=["type"]) if config.auth.external.type else {}),
force_internal_users=config.auth.internal.force_users,
enabled=config.auth.enabled,
),
info_manager=InfoManager(global_config),
log_reader=LogReader(),
wol=WakeOnLan(**config.wol._unpack()),
user_gpio=UserGpio(config.gpio),
KvmdServer(
auth_manager=AuthManager(
internal_type=config.auth.internal.type,
internal_kwargs=config.auth.internal._unpack(ignore=["type", "force_users"]),
external_type=config.auth.external.type,
external_kwargs=(config.auth.external._unpack(ignore=["type"]) if config.auth.external.type else {}),
force_internal_users=config.auth.internal.force_users,
enabled=config.auth.enabled,
),
info_manager=InfoManager(global_config),
log_reader=LogReader(),
wol=WakeOnLan(**config.wol._unpack()),
user_gpio=UserGpio(config.gpio),
hid=hid,
atx=get_atx_class(config.atx.type)(**config.atx._unpack(ignore=["type"])),
msd=get_msd_class(config.msd.type)(**msd_kwargs),
streamer=streamer,
snapshoter=Snapshoter(
hid=hid,
atx=get_atx_class(config.atx.type)(**config.atx._unpack(ignore=["type"])),
msd=get_msd_class(config.msd.type)(**msd_kwargs),
streamer=streamer,
**config.snapshot._unpack(),
),
snapshoter=Snapshoter(
hid=hid,
streamer=streamer,
**config.snapshot._unpack(),
),
heartbeat=config.server.heartbeat,
sync_chunk_size=config.server.sync_chunk_size,
heartbeat=config.server.heartbeat,
sync_chunk_size=config.server.sync_chunk_size,
keymap_path=config.hid.keymap,
).run(**config.server._unpack(ignore=["heartbeat", "sync_chunk_size"]))
keymap_path=config.hid.keymap,
).run(**config.server._unpack(ignore=["heartbeat", "sync_chunk_size"]))
get_logger(0).info("Bye-bye")

View File

@@ -1,101 +0,0 @@
# ========================================================================== #
# #
# KVMD - The main Pi-KVM daemon. #
# #
# Copyright (C) 2018 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
# ========================================================================== #
import asyncio
import contextlib
from typing import Tuple
from typing import Set
from typing import Generator
from typing import Optional
from RPi import GPIO
from .logging import get_logger
from . import aiotools
# =====
@contextlib.contextmanager
def bcm() -> Generator[None, None, None]:
logger = get_logger(2)
GPIO.setmode(GPIO.BCM)
logger.info("Configured GPIO mode as BCM")
try:
yield
finally:
GPIO.cleanup()
logger.info("GPIO cleaned")
def set_output(pin: int, initial: Optional[bool]) -> int:
assert pin >= 0, pin
GPIO.setup(pin, GPIO.OUT, initial=initial)
return pin
def set_input(pin: int) -> int:
assert pin >= 0, pin
GPIO.setup(pin, GPIO.IN)
return pin
def read(pin: int) -> bool:
assert pin >= 0, pin
return bool(GPIO.input(pin))
def write(pin: int, state: bool) -> None:
assert pin >= 0, pin
GPIO.output(pin, state)
class BatchReader:
def __init__(
self,
pins: Set[int],
interval: float,
notifier: aiotools.AioNotifier,
) -> None:
self.__pins = sorted(pins)
self.__interval = interval
self.__notifier = notifier
self.__state = {pin: read(pin) for pin in self.__pins}
self.__flags: Tuple[Optional[bool], ...] = (None,) * len(self.__pins)
def get(self, pin: int) -> bool:
return self.__state[pin]
async def poll(self) -> None:
if not self.__pins:
await aiotools.wait_infinite()
else:
while True:
flags = tuple(map(read, self.__pins))
if flags != self.__flags:
self.__flags = flags
self.__state = dict(zip(self.__pins, flags))
await self.__notifier.notify()
await asyncio.sleep(self.__interval)