mirror of
https://github.com/mofeng-git/One-KVM.git
synced 2025-12-12 09:10:30 +08:00
experimental edge mode
This commit is contained in:
parent
23ad910606
commit
1d98f5ed04
@ -97,6 +97,9 @@ class AioNotifier:
|
|||||||
async def notify(self) -> None:
|
async def notify(self) -> None:
|
||||||
await self.__queue.put(None)
|
await self.__queue.put(None)
|
||||||
|
|
||||||
|
def notify_sync(self) -> None:
|
||||||
|
self.__queue.put_nowait(None)
|
||||||
|
|
||||||
async def wait(self) -> None:
|
async def wait(self) -> None:
|
||||||
await self.__queue.get()
|
await self.__queue.get()
|
||||||
while not self.__queue.empty():
|
while not self.__queue.empty():
|
||||||
|
|||||||
53
kvmd/gpio.py
53
kvmd/gpio.py
@ -23,6 +23,7 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
import contextlib
|
import contextlib
|
||||||
|
|
||||||
|
from typing import List
|
||||||
from typing import Tuple
|
from typing import Tuple
|
||||||
from typing import Set
|
from typing import Set
|
||||||
from typing import Generator
|
from typing import Generator
|
||||||
@ -71,18 +72,62 @@ def write(pin: int, state: bool) -> None:
|
|||||||
|
|
||||||
|
|
||||||
class BatchReader:
|
class BatchReader:
|
||||||
def __init__(self, pins: Set[int], interval: float, notifier: aiotools.AioNotifier) -> None:
|
def __init__(
|
||||||
self.__pins = sorted(pins)
|
self,
|
||||||
self.__flags: Tuple[Optional[bool], ...] = (None,) * len(self.__pins)
|
pins: Set[int],
|
||||||
self.__state = {pin: read(pin) for pin in self.__pins}
|
edge_detection: bool,
|
||||||
|
interval: float,
|
||||||
|
notifier: aiotools.AioNotifier,
|
||||||
|
) -> None:
|
||||||
|
|
||||||
|
self.__pins = sorted(pins)
|
||||||
|
self.__edge_detection = edge_detection
|
||||||
self.__interval = interval
|
self.__interval = interval
|
||||||
self.__notifier = notifier
|
self.__notifier = notifier
|
||||||
|
|
||||||
|
self.__state = {pin: read(pin) for pin in self.__pins}
|
||||||
|
|
||||||
|
self.__loop: Optional[asyncio.AbstractEventLoop] = None # Only for edge detection
|
||||||
|
|
||||||
|
self.__flags: Tuple[Optional[bool], ...] = (None,) * len(self.__pins) # Only for busyloop
|
||||||
|
|
||||||
def get(self, pin: int) -> bool:
|
def get(self, pin: int) -> bool:
|
||||||
return self.__state[pin]
|
return self.__state[pin]
|
||||||
|
|
||||||
async def poll(self) -> None:
|
async def poll(self) -> None:
|
||||||
|
if self.__edge_detection:
|
||||||
|
await self.__poll_edge()
|
||||||
|
else:
|
||||||
|
await self.__poll_busyloop()
|
||||||
|
|
||||||
|
# =====
|
||||||
|
|
||||||
|
async def __poll_edge(self) -> None:
|
||||||
|
assert self.__loop is None
|
||||||
|
self.__loop = asyncio.get_running_loop()
|
||||||
|
watched: List[int] = []
|
||||||
|
try:
|
||||||
|
for pin in self.__pins:
|
||||||
|
GPIO.add_event_detect(
|
||||||
|
pin, GPIO.BOTH,
|
||||||
|
callback=self.__poll_edge_callback,
|
||||||
|
bouncetime=int(self.__interval * 1000),
|
||||||
|
)
|
||||||
|
watched.append(pin)
|
||||||
|
await self.__notifier.notify()
|
||||||
|
await aiotools.wait_infinite()
|
||||||
|
finally:
|
||||||
|
for pin in watched:
|
||||||
|
GPIO.remove_event_detect(pin)
|
||||||
|
|
||||||
|
def __poll_edge_callback(self, pin: int) -> None:
|
||||||
|
assert self.__loop
|
||||||
|
self.__state[pin] = read(pin)
|
||||||
|
self.__loop.call_soon_threadsafe(self.__notifier.notify_sync)
|
||||||
|
|
||||||
|
# =====
|
||||||
|
|
||||||
|
async def __poll_busyloop(self) -> None:
|
||||||
if not self.__pins:
|
if not self.__pins:
|
||||||
await aiotools.wait_infinite()
|
await aiotools.wait_infinite()
|
||||||
else:
|
else:
|
||||||
|
|||||||
@ -57,6 +57,7 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes
|
|||||||
click_delay: float,
|
click_delay: float,
|
||||||
long_click_delay: float,
|
long_click_delay: float,
|
||||||
|
|
||||||
|
edge_detection: bool,
|
||||||
state_poll: float,
|
state_poll: float,
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
||||||
@ -76,6 +77,7 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes
|
|||||||
|
|
||||||
self.__reader = gpio.BatchReader(
|
self.__reader = gpio.BatchReader(
|
||||||
pins=set([self.__power_led_pin, self.__hdd_led_pin]),
|
pins=set([self.__power_led_pin, self.__hdd_led_pin]),
|
||||||
|
edge_detection=edge_detection,
|
||||||
interval=state_poll,
|
interval=state_poll,
|
||||||
notifier=self.__notifier,
|
notifier=self.__notifier,
|
||||||
)
|
)
|
||||||
@ -93,6 +95,7 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes
|
|||||||
"click_delay": Option(0.1, type=valid_float_f01),
|
"click_delay": Option(0.1, type=valid_float_f01),
|
||||||
"long_click_delay": Option(5.5, type=valid_float_f01),
|
"long_click_delay": Option(5.5, type=valid_float_f01),
|
||||||
|
|
||||||
|
"edge_detection": Option(False, type=valid_bool),
|
||||||
"state_poll": Option(0.1, type=valid_float_f01),
|
"state_poll": Option(0.1, type=valid_float_f01),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -29,6 +29,7 @@ from ... import gpio
|
|||||||
|
|
||||||
from ...yamlconf import Option
|
from ...yamlconf import Option
|
||||||
|
|
||||||
|
from ...validators.basic import valid_bool
|
||||||
from ...validators.basic import valid_float_f01
|
from ...validators.basic import valid_float_f01
|
||||||
|
|
||||||
from . import BaseUserGpioDriver
|
from . import BaseUserGpioDriver
|
||||||
@ -41,11 +42,13 @@ class Plugin(BaseUserGpioDriver):
|
|||||||
instance_name: str,
|
instance_name: str,
|
||||||
notifier: aiotools.AioNotifier,
|
notifier: aiotools.AioNotifier,
|
||||||
|
|
||||||
|
edge_detection: bool,
|
||||||
state_poll: float,
|
state_poll: float,
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
||||||
super().__init__(instance_name, notifier)
|
super().__init__(instance_name, notifier)
|
||||||
|
|
||||||
|
self.__edge_detection = edge_detection
|
||||||
self.__state_poll = state_poll
|
self.__state_poll = state_poll
|
||||||
|
|
||||||
self.__input_pins: Set[int] = set()
|
self.__input_pins: Set[int] = set()
|
||||||
@ -56,6 +59,7 @@ class Plugin(BaseUserGpioDriver):
|
|||||||
@classmethod
|
@classmethod
|
||||||
def get_plugin_options(cls) -> Dict:
|
def get_plugin_options(cls) -> Dict:
|
||||||
return {
|
return {
|
||||||
|
"edge_detection": Option(False, type=valid_bool),
|
||||||
"state_poll": Option(0.1, type=valid_float_f01),
|
"state_poll": Option(0.1, type=valid_float_f01),
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -75,6 +79,7 @@ class Plugin(BaseUserGpioDriver):
|
|||||||
for (pin, initial) in self.__output_pins.items()
|
for (pin, initial) in self.__output_pins.items()
|
||||||
],
|
],
|
||||||
]),
|
]),
|
||||||
|
edge_detection=self.__edge_detection,
|
||||||
interval=self.__state_poll,
|
interval=self.__state_poll,
|
||||||
notifier=self._notifier,
|
notifier=self._notifier,
|
||||||
)
|
)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user