Merge pull request #8 from pikvm/libgpiod

Libgpiod
This commit is contained in:
Maxim Devaev 2020-09-17 01:12:09 +03:00 committed by GitHub
commit 1f3cdd03be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 458 additions and 406 deletions

View File

@ -3,8 +3,11 @@
TESTENV_IMAGE ?= kvmd-testenv TESTENV_IMAGE ?= kvmd-testenv
TESTENV_HID ?= /dev/ttyS10 TESTENV_HID ?= /dev/ttyS10
TESTENV_VIDEO ?= /dev/video0 TESTENV_VIDEO ?= /dev/video0
TESTENV_GPIO ?= /dev/gpiochip0
TESTENV_RELAY ?= $(if $(shell ls /dev/hidraw0 2>/dev/null || true),/dev/hidraw0,) TESTENV_RELAY ?= $(if $(shell ls /dev/hidraw0 2>/dev/null || true),/dev/hidraw0,)
LIBGPIOD_VERSION ?= 1.5.2
USTREAMER_MIN_VERSION ?= $(shell grep -o 'ustreamer>=[^"]\+' PKGBUILD | sed 's/ustreamer>=//g') USTREAMER_MIN_VERSION ?= $(shell grep -o 'ustreamer>=[^"]\+' PKGBUILD | sed 's/ustreamer>=//g')
DEFAULT_PLATFORM ?= v2-hdmi-rpi4 DEFAULT_PLATFORM ?= v2-hdmi-rpi4
@ -23,6 +26,7 @@ all:
@ echo " make textenv # Build test environment" @ echo " make textenv # Build test environment"
@ echo " make tox # Run tests and linters" @ echo " make tox # Run tests and linters"
@ echo " make tox E=pytest # Run selected test environment" @ echo " make tox E=pytest # Run selected test environment"
@ echo " make gpio # Create gpio mockup"
@ echo " make run # Run kvmd" @ echo " make run # Run kvmd"
@ echo " make run CMD=... # Run specified command inside kvmd environment" @ echo " make run CMD=... # Run specified command inside kvmd environment"
@ echo " make run-ipmi # Run kvmd-ipmi" @ echo " make run-ipmi # Run kvmd-ipmi"
@ -44,6 +48,7 @@ testenv:
$(if $(call optbool,$(NC)),--no-cache,) \ $(if $(call optbool,$(NC)),--no-cache,) \
--rm \ --rm \
--tag $(TESTENV_IMAGE) \ --tag $(TESTENV_IMAGE) \
--build-arg LIBGPIOD_VERSION=$(LIBGPIOD_VERSION) \
--build-arg USTREAMER_MIN_VERSION=$(USTREAMER_MIN_VERSION) \ --build-arg USTREAMER_MIN_VERSION=$(USTREAMER_MIN_VERSION) \
-f testenv/Dockerfile . -f testenv/Dockerfile .
@ -66,8 +71,15 @@ tox: testenv
" "
run: testenv $(TESTENV_GPIO):
test ! -e $(TESTENV_GPIO)
sudo modprobe gpio-mockup gpio_mockup_ranges=0,40
test -c $(TESTENV_GPIO)
run: testenv $(TESTENV_GPIO)
- docker run --rm --name kvmd \ - docker run --rm --name kvmd \
--cap-add SYS_ADMIN \
--volume `pwd`/testenv/run:/run/kvmd:rw \ --volume `pwd`/testenv/run:/run/kvmd:rw \
--volume `pwd`/testenv:/testenv:ro \ --volume `pwd`/testenv:/testenv:ro \
--volume `pwd`/kvmd:/kvmd:ro \ --volume `pwd`/kvmd:/kvmd:ro \
@ -76,10 +88,14 @@ run: testenv
--volume `pwd`/configs:/usr/share/kvmd/configs.default:ro \ --volume `pwd`/configs:/usr/share/kvmd/configs.default:ro \
--volume `pwd`/contrib/keymaps:/usr/share/kvmd/keymaps:ro \ --volume `pwd`/contrib/keymaps:/usr/share/kvmd/keymaps:ro \
--device $(TESTENV_VIDEO):$(TESTENV_VIDEO) \ --device $(TESTENV_VIDEO):$(TESTENV_VIDEO) \
--device $(TESTENV_GPIO):$(TESTENV_GPIO) \
--env KVMD_GPIO_DEVICE_PATH=$(TESTENV_GPIO) \
$(if $(TESTENV_RELAY),--device $(TESTENV_RELAY):$(TESTENV_RELAY),) \ $(if $(TESTENV_RELAY),--device $(TESTENV_RELAY):$(TESTENV_RELAY),) \
--publish 8080:80/tcp \ --publish 8080:80/tcp \
-it $(TESTENV_IMAGE) /bin/bash -c " \ -it $(TESTENV_IMAGE) /bin/bash -c " \
(socat PTY,link=$(TESTENV_HID) PTY,link=/dev/ttyS11 &) \ mount -t debugfs none /sys/kernel/debug \
&& test -d /sys/kernel/debug/gpio-mockup/`basename $(TESTENV_GPIO)`/ \
&& (socat PTY,link=$(TESTENV_HID) PTY,link=/dev/ttyS11 &) \
&& cp -r /usr/share/kvmd/configs.default/nginx/* /etc/kvmd/nginx \ && cp -r /usr/share/kvmd/configs.default/nginx/* /etc/kvmd/nginx \
&& cp /usr/share/kvmd/configs.default/kvmd/*.yaml /etc/kvmd \ && cp /usr/share/kvmd/configs.default/kvmd/*.yaml /etc/kvmd \
&& cp /usr/share/kvmd/configs.default/kvmd/*passwd /etc/kvmd \ && cp /usr/share/kvmd/configs.default/kvmd/*passwd /etc/kvmd \

View File

@ -39,7 +39,6 @@ depends=(
python-aiohttp python-aiohttp
python-aiofiles python-aiofiles
python-passlib python-passlib
python-raspberry-gpio
python-pyserial python-pyserial
python-setproctitle python-setproctitle
python-psutil python-psutil
@ -51,6 +50,7 @@ depends=(
python-pillow python-pillow
python-xlib python-xlib
python-hidapi python-hidapi
libgpiod
freetype2 freetype2
v4l-utils v4l-utils
nginx-mainline nginx-mainline
@ -59,7 +59,7 @@ depends=(
make make
patch patch
sudo sudo
raspberrypi-io-access "raspberrypi-io-access>=0.5"
"ustreamer>=1.19" "ustreamer>=1.19"
) )
makedepends=(python-setuptools) makedepends=(python-setuptools)

180
kvmd/aiogp.py Normal file
View File

@ -0,0 +1,180 @@
# ========================================================================== #
# #
# KVMD - The main Pi-KVM daemon. #
# #
# Copyright (C) 2018 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
# ========================================================================== #
import os
import asyncio
import asyncio.queues
import threading
import dataclasses
from typing import Tuple
from typing import Dict
from typing import Optional
import gpiod
from . import aiotools
# =====
# XXX: Do not use this variable for any purpose other than testing.
# It can be removed at any time.
DEVICE_PATH = os.getenv("KVMD_GPIO_DEVICE_PATH", "/dev/gpiochip0")
# =====
async def pulse(line: gpiod.Line, delay: float, final: float) -> None:
try:
line.set_value(1)
await asyncio.sleep(delay)
finally:
line.set_value(0)
await asyncio.sleep(final)
# =====
@dataclasses.dataclass(frozen=True)
class AioReaderPinParams:
inverted: bool
debounce: float
class AioReader: # pylint: disable=too-many-instance-attributes
def __init__(
self,
path: str,
consumer: str,
pins: Dict[int, AioReaderPinParams],
notifier: aiotools.AioNotifier,
) -> None:
self.__path = path
self.__consumer = consumer
self.__pins = pins
self.__notifier = notifier
self.__values: Optional[Dict[int, _DebouncedValue]] = None
self.__thread = threading.Thread(target=self.__run, daemon=True)
self.__stop_event = threading.Event()
self.__loop: Optional[asyncio.AbstractEventLoop] = None
def get(self, pin: int) -> bool:
value = (self.__values[pin].get() if self.__values is not None else False)
return (value ^ self.__pins[pin].inverted)
async def poll(self) -> None:
if not self.__pins:
await aiotools.wait_infinite()
else:
assert self.__loop is None
self.__loop = asyncio.get_running_loop()
self.__thread.start()
try:
await aiotools.run_async(self.__thread.join)
finally:
self.__stop_event.set()
await aiotools.run_async(self.__thread.join)
def __run(self) -> None:
assert self.__values is None
assert self.__loop
with gpiod.Chip(self.__path) as chip:
pins = sorted(self.__pins)
lines = chip.get_lines(pins)
lines.request(self.__consumer, gpiod.LINE_REQ_EV_BOTH_EDGES)
lines.event_wait(nsec=1)
self.__values = {
pin: _DebouncedValue(
initial=bool(value),
debounce=self.__pins[pin].debounce,
notifier=self.__notifier,
loop=self.__loop,
)
for (pin, value) in zip(pins, lines.get_values())
}
self.__loop.call_soon_threadsafe(self.__notifier.notify_sync)
while not self.__stop_event.is_set():
ev_lines = lines.event_wait(1)
if ev_lines:
for ev_line in ev_lines:
events = ev_line.event_read_multiple()
if events:
(pin, value) = self.__parse_event(events[-1])
self.__values[pin].set(bool(value))
else: # Timeout
# Размер буфера ядра - 16 эвентов на линии. При превышении этого числа,
# новые эвенты потеряются. Это не баг, это фича, как мне объяснили в LKML.
# Штош. Будем с этим жить и синхронизировать состояния при таймауте.
for (pin, value) in zip(pins, lines.get_values()):
self.__values[pin].set(bool(value))
def __parse_event(self, event: gpiod.LineEvent) -> Tuple[int, int]:
pin = event.source.offset()
if event.type == gpiod.LineEvent.RISING_EDGE:
return (pin, 1)
elif event.type == gpiod.LineEvent.FALLING_EDGE:
return (pin, 0)
raise RuntimeError(f"Invalid event {event} type: {event.type}")
class _DebouncedValue:
def __init__(
self,
initial: bool,
debounce: float,
notifier: aiotools.AioNotifier,
loop: asyncio.AbstractEventLoop,
) -> None:
self.__value = initial
self.__debounce = debounce
self.__notifier = notifier
self.__loop = loop
self.__queue: asyncio.queues.Queue = asyncio.Queue(loop=loop)
self.__task = loop.create_task(self.__consumer_task_loop())
def set(self, value: bool) -> None:
if self.__loop.is_running():
self.__check_alive()
self.__loop.call_soon_threadsafe(self.__queue.put_nowait, value)
def get(self) -> bool:
return self.__value
def __check_alive(self) -> None:
if self.__task.done() and not self.__task.cancelled():
raise RuntimeError("Dead debounce consumer")
async def __consumer_task_loop(self) -> None:
while True:
value = await self.__queue.get()
while not self.__queue.empty():
value = await self.__queue.get()
if self.__value != value:
self.__value = value
await self.__notifier.notify()
await asyncio.sleep(self.__debounce)

View File

@ -97,6 +97,9 @@ class AioNotifier:
async def notify(self) -> None: async def notify(self) -> None:
await self.__queue.put(None) await self.__queue.put(None)
def notify_sync(self) -> None:
self.__queue.put_nowait(None)
async def wait(self) -> None: async def wait(self) -> None:
await self.__queue.get() await self.__queue.get()
while not self.__queue.empty(): while not self.__queue.empty():

View File

@ -227,7 +227,9 @@ def _patch_dynamic( # pylint: disable=too-many-locals
"min_delay": Option(0.1, type=valid_float_f01), "min_delay": Option(0.1, type=valid_float_f01),
"max_delay": Option(0.1, type=valid_float_f01), "max_delay": Option(0.1, type=valid_float_f01),
}, },
} if mode == UserGpioModes.OUTPUT else {}) } if mode == UserGpioModes.OUTPUT else { # input
"debounce": Option(0.1, type=valid_float_f0),
})
} }
rebuild = True rebuild = True

View File

@ -33,39 +33,10 @@ from ...logging import get_logger
from ...yamlconf import Section from ...yamlconf import Section
from ... import gpio
from .. import init from .. import init
# ===== # =====
def _clear_gpio(config: Section) -> None:
logger = get_logger(0)
with gpio.bcm():
for (name, pin) in [
*([
("hid_serial/reset", config.hid.reset_pin),
] if config.hid.type == "serial" else []),
*([
("atx_gpio/power_switch", config.atx.power_switch_pin),
("atx_gpio/reset_switch", config.atx.reset_switch_pin),
] if config.atx.type == "gpio" else []),
*([
("msd_relay/target", config.msd.target_pin),
("msd_relay/reset", config.msd.reset_pin),
] if config.msd.type == "relay" else []),
]:
if pin >= 0:
logger.info("Writing 0 to GPIO pin=%d (%s)", pin, name)
try:
gpio.set_output(pin, False)
except Exception:
logger.exception("Can't clear GPIO pin=%d (%s)", pin, name)
def _kill_streamer(config: Section) -> None: def _kill_streamer(config: Section) -> None:
logger = get_logger(0) logger = get_logger(0)
@ -108,17 +79,12 @@ def main(argv: Optional[List[str]]=None) -> None:
prog="kvmd-cleanup", prog="kvmd-cleanup",
description="Kill KVMD and clear resources", description="Kill KVMD and clear resources",
argv=argv, argv=argv,
load_hid=True,
load_atx=True,
load_msd=True,
load_gpio=True,
)[2].kvmd )[2].kvmd
logger = get_logger(0) logger = get_logger(0)
logger.info("Cleaning up ...") logger.info("Cleaning up ...")
for method in [ for method in [
_clear_gpio,
_kill_streamer, _kill_streamer,
_remove_sockets, _remove_sockets,
]: ]:

View File

@ -25,8 +25,6 @@ from typing import Optional
from ...logging import get_logger from ...logging import get_logger
from ... import gpio
from ...plugins.hid import get_hid_class from ...plugins.hid import get_hid_class
from ...plugins.atx import get_atx_class from ...plugins.atx import get_atx_class
from ...plugins.msd import get_msd_class from ...plugins.msd import get_msd_class
@ -45,6 +43,8 @@ from .server import KvmdServer
# ===== # =====
def main(argv: Optional[List[str]]=None) -> None: def main(argv: Optional[List[str]]=None) -> None:
# pylint: disable=protected-access
config = init( config = init(
prog="kvmd", prog="kvmd",
description="The main Pi-KVM daemon", description="The main Pi-KVM daemon",
@ -56,48 +56,45 @@ def main(argv: Optional[List[str]]=None) -> None:
load_gpio=True, load_gpio=True,
)[2] )[2]
with gpio.bcm(): msd_kwargs = config.kvmd.msd._unpack(ignore=["type"])
# pylint: disable=protected-access if config.kvmd.msd.type == "otg":
msd_kwargs["gadget"] = config.otg.gadget # XXX: Small crutch to pass gadget name to plugin
msd_kwargs = config.kvmd.msd._unpack(ignore=["type"]) global_config = config
if config.kvmd.msd.type == "otg": config = config.kvmd
msd_kwargs["gadget"] = config.otg.gadget # XXX: Small crutch to pass gadget name to plugin
global_config = config hid = get_hid_class(config.hid.type)(**config.hid._unpack(ignore=["type", "keymap"]))
config = config.kvmd streamer = Streamer(**config.streamer._unpack())
hid = get_hid_class(config.hid.type)(**config.hid._unpack(ignore=["type", "keymap"])) KvmdServer(
streamer = Streamer(**config.streamer._unpack()) auth_manager=AuthManager(
internal_type=config.auth.internal.type,
internal_kwargs=config.auth.internal._unpack(ignore=["type", "force_users"]),
external_type=config.auth.external.type,
external_kwargs=(config.auth.external._unpack(ignore=["type"]) if config.auth.external.type else {}),
force_internal_users=config.auth.internal.force_users,
enabled=config.auth.enabled,
),
info_manager=InfoManager(global_config),
log_reader=LogReader(),
wol=WakeOnLan(**config.wol._unpack()),
user_gpio=UserGpio(config.gpio),
KvmdServer( hid=hid,
auth_manager=AuthManager( atx=get_atx_class(config.atx.type)(**config.atx._unpack(ignore=["type"])),
internal_type=config.auth.internal.type, msd=get_msd_class(config.msd.type)(**msd_kwargs),
internal_kwargs=config.auth.internal._unpack(ignore=["type", "force_users"]), streamer=streamer,
external_type=config.auth.external.type,
external_kwargs=(config.auth.external._unpack(ignore=["type"]) if config.auth.external.type else {}),
force_internal_users=config.auth.internal.force_users,
enabled=config.auth.enabled,
),
info_manager=InfoManager(global_config),
log_reader=LogReader(),
wol=WakeOnLan(**config.wol._unpack()),
user_gpio=UserGpio(config.gpio),
snapshoter=Snapshoter(
hid=hid, hid=hid,
atx=get_atx_class(config.atx.type)(**config.atx._unpack(ignore=["type"])),
msd=get_msd_class(config.msd.type)(**msd_kwargs),
streamer=streamer, streamer=streamer,
**config.snapshot._unpack(),
),
snapshoter=Snapshoter( heartbeat=config.server.heartbeat,
hid=hid, sync_chunk_size=config.server.sync_chunk_size,
streamer=streamer,
**config.snapshot._unpack(),
),
heartbeat=config.server.heartbeat, keymap_path=config.hid.keymap,
sync_chunk_size=config.server.sync_chunk_size, ).run(**config.server._unpack(ignore=["heartbeat", "sync_chunk_size"]))
keymap_path=config.hid.keymap,
).run(**config.server._unpack(ignore=["heartbeat", "sync_chunk_size"]))
get_logger(0).info("Bye-bye") get_logger(0).info("Bye-bye")

View File

@ -81,7 +81,7 @@ class _GpioInput:
self.__inverted: bool = config.inverted self.__inverted: bool = config.inverted
self.__driver = driver self.__driver = driver
self.__driver.register_input(self.__pin) self.__driver.register_input(self.__pin, config.debounce)
def get_scheme(self) -> Dict: def get_scheme(self) -> Dict:
return { return {
@ -201,10 +201,9 @@ class _GpioOutput: # pylint: disable=too-many-instance-attributes
@aiotools.atomic @aiotools.atomic
async def __inner_switch(self, state: bool) -> None: async def __inner_switch(self, state: bool) -> None:
if state != self.__read(): self.__write(state)
self.__write(state) get_logger(0).info("Ensured switch %s to state=%d", self, state)
get_logger(0).info("Switched %s to state=%d", self, state) await asyncio.sleep(self.__busy_delay)
await asyncio.sleep(self.__busy_delay)
@aiotools.atomic @aiotools.atomic
async def __inner_pulse(self, delay: float) -> None: async def __inner_pulse(self, delay: float) -> None:

View File

@ -1,101 +0,0 @@
# ========================================================================== #
# #
# KVMD - The main Pi-KVM daemon. #
# #
# Copyright (C) 2018 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
# ========================================================================== #
import asyncio
import contextlib
from typing import Tuple
from typing import Set
from typing import Generator
from typing import Optional
from RPi import GPIO
from .logging import get_logger
from . import aiotools
# =====
@contextlib.contextmanager
def bcm() -> Generator[None, None, None]:
logger = get_logger(2)
GPIO.setmode(GPIO.BCM)
logger.info("Configured GPIO mode as BCM")
try:
yield
finally:
GPIO.cleanup()
logger.info("GPIO cleaned")
def set_output(pin: int, initial: Optional[bool]) -> int:
assert pin >= 0, pin
GPIO.setup(pin, GPIO.OUT, initial=initial)
return pin
def set_input(pin: int) -> int:
assert pin >= 0, pin
GPIO.setup(pin, GPIO.IN)
return pin
def read(pin: int) -> bool:
assert pin >= 0, pin
return bool(GPIO.input(pin))
def write(pin: int, state: bool) -> None:
assert pin >= 0, pin
GPIO.output(pin, state)
class BatchReader:
def __init__(
self,
pins: Set[int],
interval: float,
notifier: aiotools.AioNotifier,
) -> None:
self.__pins = sorted(pins)
self.__interval = interval
self.__notifier = notifier
self.__state = {pin: read(pin) for pin in self.__pins}
self.__flags: Tuple[Optional[bool], ...] = (None,) * len(self.__pins)
def get(self, pin: int) -> bool:
return self.__state[pin]
async def poll(self) -> None:
if not self.__pins:
await aiotools.wait_infinite()
else:
while True:
flags = tuple(map(read, self.__pins))
if flags != self.__flags:
self.__flags = flags
self.__state = dict(zip(self.__pins, flags))
await self.__notifier.notify()
await asyncio.sleep(self.__interval)

View File

@ -20,24 +20,25 @@
# ========================================================================== # # ========================================================================== #
import asyncio
from typing import Dict from typing import Dict
from typing import AsyncGenerator from typing import AsyncGenerator
from typing import Optional
import gpiod
from ...logging import get_logger from ...logging import get_logger
from ... import aiotools from ... import aiotools
from ... import gpio from ... import aiogp
from ...yamlconf import Option from ...yamlconf import Option
from ...validators.basic import valid_bool from ...validators.basic import valid_bool
from ...validators.basic import valid_float_f0
from ...validators.basic import valid_float_f01 from ...validators.basic import valid_float_f01
from ...validators.hw import valid_gpio_pin from ...validators.hw import valid_gpio_pin
from . import AtxIsBusyError from . import AtxIsBusyError
from . import BaseAtx from . import BaseAtx
@ -46,27 +47,24 @@ from . import BaseAtx
class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes
def __init__( # pylint: disable=too-many-arguments,super-init-not-called def __init__( # pylint: disable=too-many-arguments,super-init-not-called
self, self,
power_led_pin: int, power_led_pin: int,
hdd_led_pin: int,
power_led_inverted: bool, power_led_inverted: bool,
power_led_debounce: float,
hdd_led_pin: int,
hdd_led_inverted: bool, hdd_led_inverted: bool,
hdd_led_debounce: float,
power_switch_pin: int, power_switch_pin: int,
reset_switch_pin: int, reset_switch_pin: int,
click_delay: float, click_delay: float,
long_click_delay: float, long_click_delay: float,
state_poll: float,
) -> None: ) -> None:
self.__power_led_pin = gpio.set_input(power_led_pin) self.__power_led_pin = power_led_pin
self.__hdd_led_pin = gpio.set_input(hdd_led_pin) self.__hdd_led_pin = hdd_led_pin
self.__power_switch_pin = gpio.set_output(power_switch_pin, False) self.__power_switch_pin = power_switch_pin
self.__reset_switch_pin = gpio.set_output(reset_switch_pin, False) self.__reset_switch_pin = reset_switch_pin
self.__power_led_inverted = power_led_inverted
self.__hdd_led_inverted = hdd_led_inverted
self.__click_delay = click_delay self.__click_delay = click_delay
self.__long_click_delay = long_click_delay self.__long_click_delay = long_click_delay
@ -74,9 +72,17 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes
self.__notifier = aiotools.AioNotifier() self.__notifier = aiotools.AioNotifier()
self.__region = aiotools.AioExclusiveRegion(AtxIsBusyError, self.__notifier) self.__region = aiotools.AioExclusiveRegion(AtxIsBusyError, self.__notifier)
self.__reader = gpio.BatchReader( self.__chip: Optional[gpiod.Chip] = None
pins=set([self.__power_led_pin, self.__hdd_led_pin]), self.__power_switch_line: Optional[gpiod.Line] = None
interval=state_poll, self.__reset_switch_line: Optional[gpiod.Line] = None
self.__reader = aiogp.AioReader(
path=aiogp.DEVICE_PATH,
consumer="kvmd/atx-gpio/leds",
pins={
power_led_pin: aiogp.AioReaderPinParams(power_led_inverted, power_led_debounce),
hdd_led_pin: aiogp.AioReaderPinParams(hdd_led_inverted, hdd_led_debounce),
},
notifier=self.__notifier, notifier=self.__notifier,
) )
@ -84,25 +90,39 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes
def get_plugin_options(cls) -> Dict: def get_plugin_options(cls) -> Dict:
return { return {
"power_led_pin": Option(-1, type=valid_gpio_pin), "power_led_pin": Option(-1, type=valid_gpio_pin),
"hdd_led_pin": Option(-1, type=valid_gpio_pin),
"power_led_inverted": Option(False, type=valid_bool), "power_led_inverted": Option(False, type=valid_bool),
"hdd_led_inverted": Option(False, type=valid_bool), "power_led_debounce": Option(0.1, type=valid_float_f0),
"hdd_led_pin": Option(-1, type=valid_gpio_pin),
"hdd_led_inverted": Option(False, type=valid_bool),
"hdd_led_debounce": Option(0.1, type=valid_float_f0),
"power_switch_pin": Option(-1, type=valid_gpio_pin), "power_switch_pin": Option(-1, type=valid_gpio_pin),
"reset_switch_pin": Option(-1, type=valid_gpio_pin), "reset_switch_pin": Option(-1, type=valid_gpio_pin),
"click_delay": Option(0.1, type=valid_float_f01), "click_delay": Option(0.1, type=valid_float_f01),
"long_click_delay": Option(5.5, type=valid_float_f01), "long_click_delay": Option(5.5, type=valid_float_f01),
"state_poll": Option(0.1, type=valid_float_f01),
} }
def sysprep(self) -> None:
assert self.__chip is None
assert self.__power_switch_line is None
assert self.__reset_switch_line is None
self.__chip = gpiod.Chip(aiogp.DEVICE_PATH)
self.__power_switch_line = self.__chip.get_line(self.__power_switch_pin)
self.__power_switch_line.request("kvmd/atx-gpio/power_switch", gpiod.LINE_REQ_DIR_OUT, default_val=0)
self.__reset_switch_line = self.__chip.get_line(self.__reset_switch_pin)
self.__reset_switch_line.request("kvmd/atx-gpio/reset_switch", gpiod.LINE_REQ_DIR_OUT, default_val=0)
async def get_state(self) -> Dict: async def get_state(self) -> Dict:
return { return {
"enabled": True, "enabled": True,
"busy": self.__region.is_busy(), "busy": self.__region.is_busy(),
"leds": { "leds": {
"power": (self.__reader.get(self.__power_led_pin) ^ self.__power_led_inverted), "power": self.__reader.get(self.__power_led_pin),
"hdd": (self.__reader.get(self.__hdd_led_pin) ^ self.__hdd_led_inverted), "hdd": self.__reader.get(self.__hdd_led_pin),
}, },
} }
@ -119,14 +139,11 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes
await self.__reader.poll() await self.__reader.poll()
async def cleanup(self) -> None: async def cleanup(self) -> None:
for (name, pin) in [ if self.__chip:
("power", self.__power_switch_pin),
("reset", self.__reset_switch_pin),
]:
try: try:
gpio.write(pin, False) self.__chip.close()
except Exception: except Exception:
get_logger(0).exception("Can't cleanup %s pin %d", name, pin) pass
# ===== # =====
@ -149,13 +166,13 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes
# ===== # =====
async def click_power(self, wait: bool) -> None: async def click_power(self, wait: bool) -> None:
await self.__click("power", self.__power_switch_pin, self.__click_delay, wait) await self.__click("power", self.__power_switch_line, self.__click_delay, wait)
async def click_power_long(self, wait: bool) -> None: async def click_power_long(self, wait: bool) -> None:
await self.__click("power_long", self.__power_switch_pin, self.__long_click_delay, wait) await self.__click("power_long", self.__power_switch_line, self.__long_click_delay, wait)
async def click_reset(self, wait: bool) -> None: async def click_reset(self, wait: bool) -> None:
await self.__click("reset", self.__reset_switch_pin, self.__click_delay, wait) await self.__click("reset", self.__reset_switch_line, self.__click_delay, wait)
# ===== # =====
@ -163,22 +180,17 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes
return (await self.get_state())["leds"]["power"] return (await self.get_state())["leds"]["power"]
@aiotools.atomic @aiotools.atomic
async def __click(self, name: str, pin: int, delay: float, wait: bool) -> None: async def __click(self, name: str, line: gpiod.Line, delay: float, wait: bool) -> None:
if wait: if wait:
async with self.__region: async with self.__region:
await self.__inner_click(name, pin, delay) await self.__inner_click(name, line, delay)
else: else:
await aiotools.run_region_task( await aiotools.run_region_task(
"Can't perform ATX click or operation was not completed", f"Can't perform ATX {name} click or operation was not completed",
self.__region, self.__inner_click, name, pin, delay, self.__region, self.__inner_click, name, line, delay,
) )
@aiotools.atomic @aiotools.atomic
async def __inner_click(self, name: str, pin: int, delay: float) -> None: async def __inner_click(self, name: str, line: gpiod.Line, delay: float) -> None:
try: await aiogp.pulse(line, delay, 1)
gpio.write(pin, True)
await asyncio.sleep(delay)
finally:
gpio.write(pin, False)
await asyncio.sleep(1)
get_logger(0).info("Clicked ATX button %r", name) get_logger(0).info("Clicked ATX button %r", name)

View File

@ -21,7 +21,6 @@
import os import os
import asyncio
import multiprocessing import multiprocessing
import multiprocessing.queues import multiprocessing.queues
import dataclasses import dataclasses
@ -35,7 +34,9 @@ from typing import List
from typing import Dict from typing import Dict
from typing import Iterable from typing import Iterable
from typing import AsyncGenerator from typing import AsyncGenerator
from typing import Optional
import gpiod
import serial import serial
from ...logging import get_logger from ...logging import get_logger
@ -45,7 +46,7 @@ from ...keyboard.mappings import KEYMAP
from ... import aiotools from ... import aiotools
from ... import aiomulti from ... import aiomulti
from ... import aioproc from ... import aioproc
from ... import gpio from ... import aiogp
from ...yamlconf import Option from ...yamlconf import Option
@ -57,7 +58,7 @@ from ...validators.basic import valid_float_f01
from ...validators.os import valid_abs_path from ...validators.os import valid_abs_path
from ...validators.hw import valid_tty_speed from ...validators.hw import valid_tty_speed
from ...validators.hw import valid_gpio_pin from ...validators.hw import valid_gpio_pin_optional
from . import BaseHid from . import BaseHid
@ -156,6 +157,45 @@ class _MouseWheelEvent(_BaseEvent):
return struct.pack(">Bxbxx", 0x14, self.delta_y) return struct.pack(">Bxbxx", 0x14, self.delta_y)
class _Gpio:
def __init__(self, reset_pin: int, reset_delay: float) -> None:
self.__reset_pin = reset_pin
self.__reset_delay = reset_delay
self.__chip: Optional[gpiod.Chip] = None
self.__reset_line: Optional[gpiod.Line] = None
self.__reset_wip = False
def open(self) -> None:
if self.__reset_pin >= 0:
assert self.__chip is None
assert self.__reset_line is None
self.__chip = gpiod.Chip(aiogp.DEVICE_PATH)
self.__reset_line = self.__chip.get_line(self.__reset_pin)
self.__reset_line.request("kvmd/hid-serial/reset", gpiod.LINE_REQ_DIR_OUT, default_val=0)
def close(self) -> None:
if self.__chip:
try:
self.__chip.close()
except Exception:
pass
@aiotools.atomic
async def reset(self) -> None:
if self.__reset_pin >= 0:
assert self.__reset_line
if not self.__reset_wip:
self.__reset_wip = True
try:
await aiogp.pulse(self.__reset_line, self.__reset_delay, 1)
finally:
self.__reset_wip = False
get_logger(0).info("Reset HID performed")
else:
get_logger(0).info("Another reset HID in progress")
# ===== # =====
class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-instance-attributes class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-instance-attributes
def __init__( # pylint: disable=too-many-arguments,super-init-not-called def __init__( # pylint: disable=too-many-arguments,super-init-not-called
@ -175,9 +215,6 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
multiprocessing.Process.__init__(self, daemon=True) multiprocessing.Process.__init__(self, daemon=True)
self.__reset_pin = gpio.set_output(reset_pin, False)
self.__reset_delay = reset_delay
self.__device_path = device_path self.__device_path = device_path
self.__speed = speed self.__speed = speed
self.__read_timeout = read_timeout self.__read_timeout = read_timeout
@ -187,7 +224,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
self.__errors_threshold = errors_threshold self.__errors_threshold = errors_threshold
self.__noop = noop self.__noop = noop
self.__reset_wip = False self.__gpio = _Gpio(reset_pin, reset_delay)
self.__events_queue: multiprocessing.queues.Queue = multiprocessing.Queue() self.__events_queue: multiprocessing.queues.Queue = multiprocessing.Queue()
@ -204,7 +241,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
@classmethod @classmethod
def get_plugin_options(cls) -> Dict: def get_plugin_options(cls) -> Dict:
return { return {
"reset_pin": Option(-1, type=valid_gpio_pin), "reset_pin": Option(-1, type=valid_gpio_pin_optional),
"reset_delay": Option(0.1, type=valid_float_f01), "reset_delay": Option(0.1, type=valid_float_f01),
"device": Option("", type=valid_abs_path, unpack_as="device_path"), "device": Option("", type=valid_abs_path, unpack_as="device_path"),
@ -218,6 +255,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
} }
def sysprep(self) -> None: def sysprep(self) -> None:
self.__gpio.open()
get_logger(0).info("Starting HID daemon ...") get_logger(0).info("Starting HID daemon ...")
self.start() self.start()
@ -247,20 +285,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
@aiotools.atomic @aiotools.atomic
async def reset(self) -> None: async def reset(self) -> None:
if not self.__reset_wip: await self.__gpio.reset()
try:
self.__reset_wip = True
gpio.write(self.__reset_pin, True)
await asyncio.sleep(self.__reset_delay)
finally:
try:
gpio.write(self.__reset_pin, False)
await asyncio.sleep(1)
finally:
self.__reset_wip = False
get_logger().info("Reset HID performed")
else:
get_logger().info("Another reset HID in progress")
@aiotools.atomic @aiotools.atomic
async def cleanup(self) -> None: async def cleanup(self) -> None:
@ -279,7 +304,7 @@ class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-inst
except Exception: except Exception:
logger.exception("Can't clear HID events") logger.exception("Can't clear HID events")
finally: finally:
gpio.write(self.__reset_pin, False) self.__gpio.close()
# ===== # =====

View File

@ -35,12 +35,13 @@ from typing import Optional
import aiofiles import aiofiles
import aiofiles.base import aiofiles.base
import gpiod
from ...logging import get_logger from ...logging import get_logger
from ... import aiotools from ... import aiotools
from ... import aiofs from ... import aiofs
from ... import gpio from ... import aiogp
from ...yamlconf import Option from ...yamlconf import Option
@ -152,6 +153,55 @@ def _explore_device(device_path: str) -> _DeviceInfo:
) )
class _Gpio:
def __init__(
self,
target_pin: int,
reset_pin: int,
reset_delay: float,
) -> None:
self.__target_pin = target_pin
self.__reset_pin = reset_pin
self.__reset_delay = reset_delay
self.__chip: Optional[gpiod.Chip] = None
self.__target_line: Optional[gpiod.Line] = None
self.__reset_line: Optional[gpiod.Line] = None
def open(self) -> None:
assert self.__chip is None
assert self.__target_line is None
assert self.__reset_line is None
self.__chip = gpiod.Chip(aiogp.DEVICE_PATH)
self.__target_line = self.__chip.get_line(self.__target_pin)
self.__target_line.request("kvmd/msd-relay/target", gpiod.LINE_REQ_DIR_OUT, default_val=0)
self.__reset_line = self.__chip.get_line(self.__reset_pin)
self.__reset_line.request("kvmd/msd-relay/reset", gpiod.LINE_REQ_DIR_OUT, default_val=0)
def close(self) -> None:
if self.__chip:
try:
self.__chip.close()
except Exception:
pass
def switch_to_local(self) -> None:
assert self.__target_line
self.__target_line.set_value(0)
def switch_to_server(self) -> None:
assert self.__target_line
self.__target_line.set_value(1)
async def reset(self) -> None:
assert self.__reset_line
await aiogp.pulse(self.__reset_line, self.__reset_delay, 0)
# ===== # =====
class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
def __init__( # pylint: disable=super-init-not-called def __init__( # pylint: disable=super-init-not-called
@ -165,13 +215,11 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
reset_delay: float, reset_delay: float,
) -> None: ) -> None:
self.__target_pin = gpio.set_output(target_pin, False)
self.__reset_pin = gpio.set_output(reset_pin, False)
self.__device_path = device_path self.__device_path = device_path
self.__init_delay = init_delay self.__init_delay = init_delay
self.__init_retries = init_retries self.__init_retries = init_retries
self.__reset_delay = reset_delay
self.__gpio = _Gpio(target_pin, reset_pin, reset_delay)
self.__device_info: Optional[_DeviceInfo] = None self.__device_info: Optional[_DeviceInfo] = None
self.__connected = False self.__connected = False
@ -202,6 +250,9 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
"reset_delay": Option(1.0, type=valid_float_f01), "reset_delay": Option(1.0, type=valid_float_f01),
} }
def sysprep(self) -> None:
self.__gpio.open()
async def get_state(self) -> Dict: async def get_state(self) -> Dict:
storage: Optional[Dict] = None storage: Optional[Dict] = None
drive: Optional[Dict] = None drive: Optional[Dict] = None
@ -245,26 +296,18 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
@aiotools.atomic @aiotools.atomic
async def __inner_reset(self) -> None: async def __inner_reset(self) -> None:
try: await self.__gpio.reset()
gpio.write(self.__reset_pin, True) self.__gpio.switch_to_local()
await asyncio.sleep(self.__reset_delay) self.__connected = False
gpio.write(self.__reset_pin, False) await self.__load_device_info()
get_logger(0).info("MSD reset has been successful")
gpio.write(self.__target_pin, False)
self.__connected = False
await self.__load_device_info()
get_logger(0).info("MSD reset has been successful")
finally:
gpio.write(self.__reset_pin, False)
@aiotools.atomic @aiotools.atomic
async def cleanup(self) -> None: async def cleanup(self) -> None:
try: try:
await self.__close_device_file() await self.__close_device_file()
finally: finally:
gpio.write(self.__target_pin, False) self.__gpio.close()
gpio.write(self.__reset_pin, False)
# ===== # =====
@ -283,7 +326,7 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
if self.__connected: if self.__connected:
raise MsdConnectedError() raise MsdConnectedError()
gpio.write(self.__target_pin, True) self.__gpio.switch_to_server()
self.__connected = True self.__connected = True
get_logger(0).info("MSD switched to Server") get_logger(0).info("MSD switched to Server")
@ -294,12 +337,12 @@ class Plugin(BaseMsd): # pylint: disable=too-many-instance-attributes
if not self.__connected: if not self.__connected:
raise MsdDisconnectedError() raise MsdDisconnectedError()
gpio.write(self.__target_pin, False) self.__gpio.switch_to_local()
try: try:
await self.__load_device_info() await self.__load_device_info()
except Exception: except Exception:
if self.__connected: if self.__connected:
gpio.write(self.__target_pin, True) self.__gpio.switch_to_server()
raise raise
self.__connected = False self.__connected = False
get_logger(0).info("MSD switched to KVM: %s", self.__device_info) get_logger(0).info("MSD switched to KVM: %s", self.__device_info)

View File

@ -74,7 +74,7 @@ class BaseUserGpioDriver(BasePlugin):
def get_modes(cls) -> Set[str]: def get_modes(cls) -> Set[str]:
return set(UserGpioModes.ALL) return set(UserGpioModes.ALL)
def register_input(self, pin: int) -> None: def register_input(self, pin: int, debounce: float) -> None:
raise NotImplementedError raise NotImplementedError
def register_output(self, pin: int, initial: Optional[bool]) -> None: def register_output(self, pin: int, initial: Optional[bool]) -> None:

View File

@ -21,15 +21,12 @@
from typing import Dict from typing import Dict
from typing import Set
from typing import Optional from typing import Optional
import gpiod
from ... import aiotools from ... import aiotools
from ... import gpio from ... import aiogp
from ...yamlconf import Option
from ...validators.basic import valid_float_f01
from . import BaseUserGpioDriver from . import BaseUserGpioDriver
@ -40,59 +37,58 @@ class Plugin(BaseUserGpioDriver):
self, self,
instance_name: str, instance_name: str,
notifier: aiotools.AioNotifier, notifier: aiotools.AioNotifier,
state_poll: float,
) -> None: ) -> None:
super().__init__(instance_name, notifier) super().__init__(instance_name, notifier)
self.__state_poll = state_poll self.__input_pins: Dict[int, aiogp.AioReaderPinParams] = {}
self.__input_pins: Set[int] = set()
self.__output_pins: Dict[int, Optional[bool]] = {} self.__output_pins: Dict[int, Optional[bool]] = {}
self.__reader: Optional[gpio.BatchReader] = None self.__reader: Optional[aiogp.AioReader] = None
@classmethod self.__chip: Optional[gpiod.Chip] = None
def get_plugin_options(cls) -> Dict: self.__output_lines: Dict[int, gpiod.Line] = {}
return {
"state_poll": Option(0.1, type=valid_float_f01),
}
def register_input(self, pin: int) -> None: def register_input(self, pin: int, debounce: float) -> None:
self.__input_pins.add(pin) self.__input_pins[pin] = aiogp.AioReaderPinParams(False, debounce)
def register_output(self, pin: int, initial: Optional[bool]) -> None: def register_output(self, pin: int, initial: Optional[bool]) -> None:
self.__output_pins[pin] = initial self.__output_pins[pin] = initial
def prepare(self) -> None: def prepare(self) -> None:
assert self.__reader is None assert self.__reader is None
self.__reader = gpio.BatchReader( self.__reader = aiogp.AioReader(
pins=set([ path=aiogp.DEVICE_PATH,
*map(gpio.set_input, self.__input_pins), consumer="kvmd/ugpio-gpio/inputs",
*[ pins=self.__input_pins,
gpio.set_output(pin, initial)
for (pin, initial) in self.__output_pins.items()
],
]),
interval=self.__state_poll,
notifier=self._notifier, notifier=self._notifier,
) )
self.__chip = gpiod.Chip(aiogp.DEVICE_PATH)
for (pin, initial) in self.__output_pins.items():
line = self.__chip.get_line(pin)
line.request("kvmd/ugpio-gpio/outputs", gpiod.LINE_REQ_DIR_OUT, default_val=int(initial or False))
self.__output_lines[pin] = line
async def run(self) -> None: async def run(self) -> None:
assert self.__reader assert self.__reader
await self.__reader.poll() await self.__reader.poll()
def cleanup(self) -> None: def cleanup(self) -> None:
for (pin, initial) in self.__output_pins.items(): if self.__chip:
if initial is not None: try:
gpio.write(pin, initial) self.__chip.close()
except Exception:
pass
def read(self, pin: int) -> bool: def read(self, pin: int) -> bool:
return gpio.read(pin) assert self.__reader
if pin in self.__input_pins:
return self.__reader.get(pin)
return bool(self.__output_lines[pin].get_value())
def write(self, pin: int, state: bool) -> None: def write(self, pin: int, state: bool) -> None:
gpio.write(pin, state) self.__output_lines[pin].set_value(int(state))
def __str__(self) -> str: def __str__(self) -> str:
return f"GPIO({self._instance_name})" return f"GPIO({self._instance_name})"

View File

@ -79,7 +79,7 @@ class Plugin(BaseUserGpioDriver):
def get_modes(cls) -> Set[str]: def get_modes(cls) -> Set[str]:
return set([UserGpioModes.OUTPUT]) return set([UserGpioModes.OUTPUT])
def register_input(self, pin: int) -> None: def register_input(self, pin: int, debounce: float) -> None:
raise RuntimeError(f"Unsupported mode 'input' for pin={pin} on {self}") raise RuntimeError(f"Unsupported mode 'input' for pin={pin} on {self}")
def register_output(self, pin: int, initial: Optional[bool]) -> None: def register_output(self, pin: int, initial: Optional[bool]) -> None:

View File

@ -6,6 +6,9 @@ RUN pacman -Syu --noconfirm \
&& pacman -S --needed --noconfirm \ && pacman -S --needed --noconfirm \
base \ base \
base-devel \ base-devel \
autoconf-archive \
help2man \
m4 \
vim \ vim \
git \ git \
libjpeg \ libjpeg \
@ -30,6 +33,18 @@ RUN npm install htmlhint -g \
&& npm install pug \ && npm install pug \
&& npm install pug-cli -g && npm install pug-cli -g
ARG LIBGPIOD_VERSION
ENV LIBGPIOD_PKG libgpiod-$LIBGPIOD_VERSION
RUN curl \
-o $LIBGPIOD_PKG.tar.gz \
https://git.kernel.org/pub/scm/libs/libgpiod/libgpiod.git/snapshot/$LIBGPIOD_PKG.tar.gz \
&& tar -xzvf $LIBGPIOD_PKG.tar.gz \
&& cd $LIBGPIOD_PKG \
&& ./autogen.sh --prefix=/usr --enable-tools=yes --enable-bindings-python \
&& make PREFIX=/usr install \
&& cd - \
&& rm -rf $LIBGPIOD_PKG{,.tar.gz}
ARG USTREAMER_MIN_VERSION ARG USTREAMER_MIN_VERSION
ENV USTREAMER_MIN_VERSION $USTREAMER_MIN_VERSION ENV USTREAMER_MIN_VERSION $USTREAMER_MIN_VERSION
RUN echo $USTREAMER_MIN_VERSION RUN echo $USTREAMER_MIN_VERSION

View File

@ -20,8 +20,6 @@ IpmiServer.handle_raw_request
_AtxApiPart.switch_power _AtxApiPart.switch_power
fake_rpi.RPi.GPIO
_KeyMapping.web_name _KeyMapping.web_name
_KeyMapping.serial_code _KeyMapping.serial_code
_KeyMapping.arduino_name _KeyMapping.arduino_name

View File

@ -1,5 +1,3 @@
git+git://github.com/willbuckner/rpi-gpio-development-mock@master#egg=rpi
fake_rpi
aiohttp aiohttp
aiofiles aiofiles
passlib passlib

View File

@ -18,42 +18,3 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>. # # along with this program. If not, see <https://www.gnu.org/licenses/>. #
# # # #
# ========================================================================== # # ========================================================================== #
import sys
from typing import Dict
from typing import Optional
import fake_rpi.RPi
# =====
class _GPIO(fake_rpi.RPi._GPIO): # pylint: disable=protected-access
def __init__(self) -> None:
super().__init__()
self.__states: Dict[int, int] = {}
@fake_rpi.RPi.printf
def setup(self, channel: int, state: int, initial: int=0, pull_up_down: Optional[int]=None) -> None:
_ = state # Makes linter happy
_ = pull_up_down # Makes linter happy
self.__states[int(channel)] = int(initial)
@fake_rpi.RPi.printf
def output(self, channel: int, state: int) -> None:
self.__states[int(channel)] = int(state)
@fake_rpi.RPi.printf
def input(self, channel: int) -> int: # pylint: disable=arguments-differ
return self.__states[int(channel)]
@fake_rpi.RPi.printf
def cleanup(self, channel: Optional[int]=None) -> None: # pylint: disable=arguments-differ
_ = channel # Makes linter happy
self.__states = {}
# =====
fake_rpi.RPi.GPIO = _GPIO()
sys.modules["RPi"] = fake_rpi.RPi

View File

@ -1,58 +0,0 @@
# ========================================================================== #
# #
# KVMD - The main Pi-KVM daemon. #
# #
# Copyright (C) 2018 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
# ========================================================================== #
import pytest
from kvmd import gpio
# =====
@pytest.mark.parametrize("pin", [0, 1, 13])
def test_ok__loopback_initial_false(pin: int) -> None:
with gpio.bcm():
assert gpio.set_output(pin, False) == pin
assert gpio.read(pin) is False
gpio.write(pin, True)
assert gpio.read(pin) is True
@pytest.mark.parametrize("pin", [0, 1, 13])
def test_ok__loopback_initial_true(pin: int) -> None:
with gpio.bcm():
assert gpio.set_output(pin, True) == pin
assert gpio.read(pin) is True
gpio.write(pin, False)
assert gpio.read(pin) is False
@pytest.mark.parametrize("pin", [0, 1, 13])
def test_ok__input(pin: int) -> None:
with gpio.bcm():
assert gpio.set_input(pin) == pin
assert gpio.read(pin) is False
def test_fail__invalid_pin() -> None:
with pytest.raises(AssertionError):
gpio.set_output(-1, False)
with pytest.raises(AssertionError):
gpio.set_input(-1)