mirror of
https://github.com/mofeng-git/One-KVM.git
synced 2025-12-12 01:00:29 +08:00
removed edge detection
This commit is contained in:
parent
ff36ff203e
commit
44c50aa4de
@ -97,9 +97,6 @@ class AioNotifier:
|
||||
async def notify(self) -> None:
|
||||
await self.__queue.put(None)
|
||||
|
||||
def notify_sync(self) -> None:
|
||||
self.__queue.put_nowait(None)
|
||||
|
||||
async def wait(self) -> None:
|
||||
await self.__queue.get()
|
||||
while not self.__queue.empty():
|
||||
|
||||
41
kvmd/gpio.py
41
kvmd/gpio.py
@ -23,7 +23,6 @@
|
||||
import asyncio
|
||||
import contextlib
|
||||
|
||||
from typing import List
|
||||
from typing import Tuple
|
||||
from typing import Set
|
||||
from typing import Generator
|
||||
@ -75,59 +74,21 @@ class BatchReader:
|
||||
def __init__(
|
||||
self,
|
||||
pins: Set[int],
|
||||
edge_detection: bool,
|
||||
interval: float,
|
||||
notifier: aiotools.AioNotifier,
|
||||
) -> None:
|
||||
|
||||
self.__pins = sorted(pins)
|
||||
self.__edge_detection = edge_detection
|
||||
self.__interval = interval
|
||||
self.__notifier = notifier
|
||||
|
||||
self.__state = {pin: read(pin) for pin in self.__pins}
|
||||
|
||||
self.__loop: Optional[asyncio.AbstractEventLoop] = None # Only for edge detection
|
||||
|
||||
self.__flags: Tuple[Optional[bool], ...] = (None,) * len(self.__pins) # Only for busyloop
|
||||
self.__flags: Tuple[Optional[bool], ...] = (None,) * len(self.__pins)
|
||||
|
||||
def get(self, pin: int) -> bool:
|
||||
return self.__state[pin]
|
||||
|
||||
async def poll(self) -> None:
|
||||
if self.__edge_detection:
|
||||
await self.__poll_edge()
|
||||
else:
|
||||
await self.__poll_busyloop()
|
||||
|
||||
# =====
|
||||
|
||||
async def __poll_edge(self) -> None:
|
||||
assert self.__loop is None
|
||||
self.__loop = asyncio.get_running_loop()
|
||||
watched: List[int] = []
|
||||
try:
|
||||
for pin in self.__pins:
|
||||
GPIO.add_event_detect(
|
||||
pin, GPIO.BOTH,
|
||||
callback=self.__poll_edge_callback,
|
||||
bouncetime=int(self.__interval * 1000),
|
||||
)
|
||||
watched.append(pin)
|
||||
await self.__notifier.notify()
|
||||
await aiotools.wait_infinite()
|
||||
finally:
|
||||
for pin in watched:
|
||||
GPIO.remove_event_detect(pin)
|
||||
|
||||
def __poll_edge_callback(self, pin: int) -> None:
|
||||
assert self.__loop
|
||||
self.__state[pin] = read(pin)
|
||||
self.__loop.call_soon_threadsafe(self.__notifier.notify_sync)
|
||||
|
||||
# =====
|
||||
|
||||
async def __poll_busyloop(self) -> None:
|
||||
if not self.__pins:
|
||||
await aiotools.wait_infinite()
|
||||
else:
|
||||
|
||||
@ -57,7 +57,6 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes
|
||||
click_delay: float,
|
||||
long_click_delay: float,
|
||||
|
||||
edge_detection: bool,
|
||||
state_poll: float,
|
||||
) -> None:
|
||||
|
||||
@ -77,7 +76,6 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes
|
||||
|
||||
self.__reader = gpio.BatchReader(
|
||||
pins=set([self.__power_led_pin, self.__hdd_led_pin]),
|
||||
edge_detection=edge_detection,
|
||||
interval=state_poll,
|
||||
notifier=self.__notifier,
|
||||
)
|
||||
@ -95,7 +93,6 @@ class Plugin(BaseAtx): # pylint: disable=too-many-instance-attributes
|
||||
"click_delay": Option(0.1, type=valid_float_f01),
|
||||
"long_click_delay": Option(5.5, type=valid_float_f01),
|
||||
|
||||
"edge_detection": Option(False, type=valid_bool),
|
||||
"state_poll": Option(0.1, type=valid_float_f01),
|
||||
}
|
||||
|
||||
|
||||
@ -75,7 +75,6 @@ class Plugin(BaseUserGpioDriver):
|
||||
for (pin, initial) in self.__output_pins.items()
|
||||
],
|
||||
]),
|
||||
edge_detection=False,
|
||||
interval=self.__state_poll,
|
||||
notifier=self._notifier,
|
||||
)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user