mirror of
https://github.com/mofeng-git/One-KVM.git
synced 2026-01-29 00:51:53 +08:00
refactoring
This commit is contained in:
@@ -20,12 +20,13 @@
|
|||||||
# #
|
# #
|
||||||
# ========================================================================== #
|
# ========================================================================== #
|
||||||
|
|
||||||
from periphery import PWM
|
|
||||||
|
|
||||||
from typing import Dict
|
from typing import Dict
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
from typing import Set
|
from typing import Set
|
||||||
|
|
||||||
|
from periphery import PWM
|
||||||
|
|
||||||
from ...logging import get_logger
|
from ...logging import get_logger
|
||||||
|
|
||||||
from ... import tools
|
from ... import tools
|
||||||
@@ -42,36 +43,34 @@ from . import BaseUserGpioDriver
|
|||||||
|
|
||||||
# =====
|
# =====
|
||||||
class Plugin(BaseUserGpioDriver):
|
class Plugin(BaseUserGpioDriver):
|
||||||
|
|
||||||
def __init__( # pylint: disable=super-init-not-called
|
def __init__( # pylint: disable=super-init-not-called
|
||||||
self,
|
self,
|
||||||
instance_name: str,
|
instance_name: str,
|
||||||
notifier: aiotools.AioNotifier,
|
notifier: aiotools.AioNotifier,
|
||||||
|
|
||||||
pwm_chip: int,
|
chip: int,
|
||||||
pwm_period: int,
|
period: int,
|
||||||
duty_cycle_push: int,
|
duty_cycle_push: int,
|
||||||
duty_cycle_release: int,
|
duty_cycle_release: int,
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
||||||
super().__init__(instance_name, notifier)
|
super().__init__(instance_name, notifier)
|
||||||
|
|
||||||
self.__pwm_chip = pwm_chip
|
self.__chip = chip
|
||||||
self.__pwm_period = pwm_period
|
self.__period = period
|
||||||
self.__duty_cycle_push = duty_cycle_push
|
self.__duty_cycle_push = duty_cycle_push
|
||||||
self.__duty_cycle_release = duty_cycle_release
|
self.__duty_cycle_release = duty_cycle_release
|
||||||
|
|
||||||
self.__channels: Dict[int, Optional[bool]] = {}
|
self.__channels: Dict[int, Optional[bool]] = {}
|
||||||
|
self.__pwms: Dict[int, PWM] = {}
|
||||||
self.__channel_pwm: Dict[int, PWM] = {}
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def get_plugin_options(cls) -> Dict:
|
def get_plugin_options(cls) -> Dict:
|
||||||
return {
|
return {
|
||||||
"pwm_chip": Option(0, type=valid_int_f0),
|
"chip": Option(0, type=valid_int_f0),
|
||||||
"pwm_period": Option(20000000, type=valid_int_f0),
|
"period": Option(20000000, type=valid_int_f0),
|
||||||
"duty_cycle_push": Option(1500000, type=valid_int_f0),
|
"duty_cycle_push": Option(1500000, type=valid_int_f0),
|
||||||
"duty_cycle_release": Option(1000000, type=valid_int_f0),
|
"duty_cycle_release": Option(1000000, type=valid_int_f0),
|
||||||
}
|
}
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
@@ -86,40 +85,45 @@ class Plugin(BaseUserGpioDriver):
|
|||||||
|
|
||||||
def prepare(self) -> None:
|
def prepare(self) -> None:
|
||||||
logger = get_logger(0)
|
logger = get_logger(0)
|
||||||
|
|
||||||
for (pin, initial) in self.__channels.items():
|
for (pin, initial) in self.__channels.items():
|
||||||
try:
|
try:
|
||||||
logger.info("Probing pwm chip %d channel %d ...", self.__pwm_chip, pin)
|
logger.info("Probing pwm chip %d channel %d ...", self.__chip, pin)
|
||||||
pwm = PWM(self.__pwm_chip, pin)
|
pwm = PWM(self.__chip, pin)
|
||||||
self.__channel_pwm[pin] = pwm
|
self.__pwms[pin] = pwm
|
||||||
pwm.period_ns = self.__pwm_period
|
pwm.period_ns = self.__period
|
||||||
pwm.duty_cycle_ns = self.__duty_cycle_push if initial else self.__duty_cycle_release
|
pwm.duty_cycle_ns = self.__get_duty_cycle(bool(initial))
|
||||||
pwm.enable()
|
pwm.enable()
|
||||||
|
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
logger.error("Can't get pwm chip %d channel %d: %s",
|
logger.error("Can't get PWM chip %d channel %d: %s",
|
||||||
self.__pwm_chip, pin, tools.efmt(err))
|
self.__chip, pin, tools.efmt(err))
|
||||||
|
|
||||||
async def run(self) -> None:
|
async def run(self) -> None:
|
||||||
await aiotools.wait_infinite()
|
await aiotools.wait_infinite()
|
||||||
|
|
||||||
async def cleanup(self) -> None:
|
async def cleanup(self) -> None:
|
||||||
for (pin, _) in self.__channels.items():
|
for (pin, pwm) in self.__pwms.items():
|
||||||
self.__channel_pwm[pin].disable()
|
try:
|
||||||
self.__channel_pwm[pin].close()
|
pwm.disable()
|
||||||
|
pwm.close()
|
||||||
|
except Exception as err:
|
||||||
|
get_logger(0).error("Can't cleanup PWM chip %d channel %d: %s",
|
||||||
|
self.__chip, pin, tools.efmt(err))
|
||||||
|
|
||||||
async def read(self, pin: int) -> bool:
|
async def read(self, pin: int) -> bool:
|
||||||
try:
|
try:
|
||||||
return self.__channel_pwm[pin].duty_cycle_ns == self.__duty_cycle_push
|
return (self.__pwms[pin].duty_cycle_ns == self.__duty_cycle_push)
|
||||||
except Exception:
|
except Exception:
|
||||||
raise GpioDriverOfflineError(self)
|
raise GpioDriverOfflineError(self)
|
||||||
|
|
||||||
async def write(self, pin: int, state: bool) -> None:
|
async def write(self, pin: int, state: bool) -> None:
|
||||||
try:
|
try:
|
||||||
self.__channel_pwm[pin].duty_cycle_ns = self.__duty_cycle_push if state else self.__duty_cycle_release
|
self.__pwms[pin].duty_cycle_ns = self.__get_duty_cycle(state)
|
||||||
except Exception:
|
except Exception:
|
||||||
raise GpioDriverOfflineError(self)
|
raise GpioDriverOfflineError(self)
|
||||||
|
|
||||||
|
def __get_duty_cycle(self, state: bool) -> int:
|
||||||
|
return (self.__duty_cycle_push if state else self.__duty_cycle_release)
|
||||||
|
|
||||||
def __str__(self) -> str:
|
def __str__(self) -> str:
|
||||||
return f"PWM({self._instance_name})"
|
return f"PWM({self._instance_name})"
|
||||||
|
|
||||||
|
|||||||
@@ -41,4 +41,4 @@ _Netcfg.dhcp_option_3
|
|||||||
|
|
||||||
_ScriptWriter.get_args
|
_ScriptWriter.get_args
|
||||||
|
|
||||||
_pwm.period_ns
|
_pwm.period_ns
|
||||||
|
|||||||
Reference in New Issue
Block a user