gpio wrapper

This commit is contained in:
Devaev Maxim 2018-06-28 06:23:19 +03:00
parent cac56cd92f
commit e8595665c0
5 changed files with 67 additions and 48 deletions

View File

@ -8,8 +8,6 @@ from typing import Set
from typing import Callable from typing import Callable
from typing import Optional from typing import Optional
from RPi import GPIO
import aiohttp import aiohttp
from .application import init from .application import init
@ -17,6 +15,8 @@ from .application import init
from .atx import Atx from .atx import Atx
from .streamer import Streamer from .streamer import Streamer
from . import gpio
# ===== # =====
_logger = logging.getLogger(__name__) _logger = logging.getLogger(__name__)
@ -42,8 +42,6 @@ class _Application:
self.__sockets: Set[aiohttp.web.WebSocketResponse] = set() self.__sockets: Set[aiohttp.web.WebSocketResponse] = set()
self.__sockets_lock = asyncio.Lock() self.__sockets_lock = asyncio.Lock()
GPIO.setmode(GPIO.BCM)
self.__atx = Atx( self.__atx = Atx(
power_led=self.__config["atx"]["leds"]["pinout"]["power"], power_led=self.__config["atx"]["leds"]["pinout"]["power"],
hdd_led=self.__config["atx"]["leds"]["pinout"]["hdd"], hdd_led=self.__config["atx"]["leds"]["pinout"]["hdd"],
@ -113,11 +111,6 @@ class _Application:
if self.__streamer.is_running(): if self.__streamer.is_running():
await self.__streamer.stop() await self.__streamer.stop()
_logger.info("Cleaning up GPIO ...")
GPIO.cleanup()
_logger.info("Bye-bye")
@_system_task @_system_task
async def __stream_controller(self) -> None: async def __stream_controller(self) -> None:
prev = 0 prev = 0
@ -189,4 +182,7 @@ class _Application:
def main() -> None: def main() -> None:
_Application(init()).run() config = init()
with gpio.bcm():
_Application(config).run()
_logger.info("Bye-bye")

View File

@ -3,7 +3,7 @@ import logging
from typing import Tuple from typing import Tuple
from RPi import GPIO from . import gpio
# ===== # =====
@ -21,29 +21,20 @@ class Atx:
long_click_delay: float, long_click_delay: float,
) -> None: ) -> None:
self.__power_led = self.__set_input_pin(power_led) self.__power_led = gpio.set_input(power_led)
self.__hdd_led = self.__set_input_pin(hdd_led) self.__hdd_led = gpio.set_input(hdd_led)
self.__power_switch = self.__set_output_pin(power_switch) self.__power_switch = gpio.set_output_zeroed(power_switch)
self.__reset_switch = self.__set_output_pin(reset_switch) self.__reset_switch = gpio.set_output_zeroed(reset_switch)
self.__click_delay = click_delay self.__click_delay = click_delay
self.__long_click_delay = long_click_delay self.__long_click_delay = long_click_delay
self.__lock = asyncio.Lock() self.__lock = asyncio.Lock()
def __set_input_pin(self, pin: int) -> int:
GPIO.setup(pin, GPIO.IN)
return pin
def __set_output_pin(self, pin: int) -> int:
GPIO.setup(pin, GPIO.OUT)
GPIO.output(pin, False)
return pin
def get_leds(self) -> Tuple[bool, bool]: def get_leds(self) -> Tuple[bool, bool]:
return ( return (
not GPIO.input(self.__power_led), not gpio.read(self.__power_led),
not GPIO.input(self.__hdd_led), not gpio.read(self.__hdd_led),
) )
async def click_power(self) -> None: async def click_power(self) -> None:
@ -62,7 +53,7 @@ class Atx:
if not self.__lock.locked(): if not self.__lock.locked():
async with self.__lock: async with self.__lock:
for flag in (True, False): for flag in (True, False):
GPIO.output(pin, flag) gpio.write(pin, flag)
await asyncio.sleep(delay) await asyncio.sleep(delay)
return True return True
return False return False

View File

@ -1,9 +1,9 @@
import logging import logging
from RPi import GPIO
from ...application import init from ...application import init
from ... import gpio
# ===== # =====
_logger = logging.getLogger(__name__) _logger = logging.getLogger(__name__)
@ -12,12 +12,11 @@ _logger = logging.getLogger(__name__)
def main() -> None: def main() -> None:
config = init() config = init()
_logger.info("Cleaning up ...") _logger.info("Cleaning up ...")
GPIO.setmode(GPIO.BCM) with gpio.bcm():
for (key, pin) in [ for (key, pin) in [
*config["atx"]["switches"]["pinout"].items(), *config["atx"]["switches"]["pinout"].items(),
*config["video"]["pinout"].items(), *config["video"]["pinout"].items(),
]: ]:
_logger.info("Writing value=0 to pin=%d (%s)", pin, key) _logger.info("Writing value=0 to pin=%d (%s)", pin, key)
GPIO.output(pin, False) gpio.write(pin, False)
GPIO.cleanup() _logger.info("Bye-bye")
_logger.info("Done!")

38
kvmd/kvmd/gpio.py Normal file
View File

@ -0,0 +1,38 @@
import contextlib
import logging
from typing import Generator
from RPi import GPIO
# =====
_logger = logging.getLogger(__name__)
@contextlib.contextmanager
def bcm() -> Generator[None, None, None]:
GPIO.setmode(GPIO.BCM)
_logger.info("Configured GPIO mode as BCM")
yield
GPIO.cleanup()
_logger.info("GPIO cleaned")
def set_output_zeroed(pin: int) -> int:
GPIO.setup(pin, GPIO.OUT)
GPIO.output(pin, False)
return pin
def set_input(pin: int) -> int:
GPIO.setup(pin, GPIO.IN)
return pin
def read(pin: int) -> bool:
return bool(GPIO.input(pin))
def write(pin: int, flag: bool) -> None:
GPIO.output(pin, flag)

View File

@ -5,7 +5,7 @@ import logging
from typing import Dict from typing import Dict
from typing import Optional from typing import Optional
from RPi import GPIO from . import gpio
# ===== # =====
@ -22,8 +22,8 @@ class Streamer: # pylint: disable=too-many-instance-attributes
loop: asyncio.AbstractEventLoop, loop: asyncio.AbstractEventLoop,
) -> None: ) -> None:
self.__cap_power = self.__set_output_pin(cap_power) self.__cap_power = gpio.set_output_zeroed(cap_power)
self.__vga_power = self.__set_output_pin(vga_power) self.__vga_power = gpio.set_output_zeroed(vga_power)
self.__sync_delay = sync_delay self.__sync_delay = sync_delay
self.__cmd = ( self.__cmd = (
@ -36,11 +36,6 @@ class Streamer: # pylint: disable=too-many-instance-attributes
self.__proc_task: Optional[asyncio.Task] = None self.__proc_task: Optional[asyncio.Task] = None
def __set_output_pin(self, pin: int) -> int:
GPIO.setup(pin, GPIO.OUT)
GPIO.output(pin, False)
return pin
async def start(self) -> None: async def start(self) -> None:
assert not self.__proc_task assert not self.__proc_task
_logger.info("Starting mjpg_streamer ...") _logger.info("Starting mjpg_streamer ...")
@ -60,10 +55,10 @@ class Streamer: # pylint: disable=too-many-instance-attributes
async def __set_hw_enabled(self, enabled: bool) -> None: async def __set_hw_enabled(self, enabled: bool) -> None:
# XXX: This sequence is very important for enable # XXX: This sequence is very important for enable
GPIO.output(self.__cap_power, enabled) gpio.write(self.__cap_power, enabled)
if enabled: if enabled:
await asyncio.sleep(self.__sync_delay) await asyncio.sleep(self.__sync_delay)
GPIO.output(self.__vga_power, enabled) gpio.write(self.__vga_power, enabled)
await asyncio.sleep(self.__sync_delay) await asyncio.sleep(self.__sync_delay)
async def __process(self) -> None: async def __process(self) -> None: