2020-11-11 22:24:25 +03:00

155 lines
5.1 KiB
Python

# ========================================================================== #
# #
# KVMD - The main Pi-KVM daemon. #
# #
# Copyright (C) 2018 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
# ========================================================================== #
import os
import contextlib
import time
from typing import List
from typing import Dict
from typing import Generator
from typing import Any
import spidev
from ...yamlconf import Option
from ...validators.basic import valid_int_f0
from ...validators.basic import valid_int_f1
from ...validators.basic import valid_float_f0
from ...validators.basic import valid_float_f01
from ._mcu import BasePhyConnection
from ._mcu import BasePhy
from ._mcu import BaseMcuHid
# =====
class SpiPhyError(Exception):
pass
class _SpiPhyConnection(BasePhyConnection):
def __init__(
self,
spi: spidev.SpiDev,
read_timeout: float,
read_delay: float,
) -> None:
self.__spi = spi
self.__read_timeout = read_timeout
self.__read_delay = read_delay
def send(self, request: bytes, receive: int) -> bytes:
assert 0 < receive <= len(request)
dummy = b"\x00" * len(request)
deadline_ts = time.time() + self.__read_timeout
while time.time() < deadline_ts:
garbage = bytes(self.__spi.xfer(dummy))
if garbage == dummy:
break
else:
raise SpiPhyError("Timeout reached while reading a garbage")
self.__spi.xfer(request)
response: List[int] = []
dummy = b"\x00" * receive
deadline_ts = time.time() + self.__read_timeout
found = False
while time.time() < deadline_ts:
if not found:
time.sleep(self.__read_delay)
for byte in self.__spi.xfer(dummy):
if not found:
if byte == 0:
continue
found = True
response.append(byte)
if len(response) >= receive:
break
if len(response) >= receive:
break
else:
raise SpiPhyError("Timeout reached while responce waiting")
assert len(response) == receive
return bytes(response)
class _SpiPhy(BasePhy):
def __init__(
self,
bus: int,
chip: int,
max_freq: int,
read_timeout: float,
read_delay: float,
) -> None:
self.__bus = bus
self.__chip = chip
self.__max_freq = max_freq
self.__read_timeout = read_timeout
self.__read_delay = read_delay
def has_device(self) -> bool:
return os.path.exists(f"/dev/spidev{self.__bus}.{self.__chip}")
@contextlib.contextmanager
def connected(self) -> Generator[_SpiPhyConnection, None, None]: # type: ignore
with contextlib.closing(spidev.SpiDev(self.__bus, self.__chip)) as spi:
spi.mode = 0
spi.max_speed_hz = self.__max_freq
yield _SpiPhyConnection(spi, self.__read_timeout, self.__read_delay)
# =====
class Plugin(BaseMcuHid):
def __init__(
self,
bus: int,
chip: int,
max_freq: int,
read_timeout: float,
read_delay: float,
**kwargs: Any,
) -> None:
super().__init__(
phy=_SpiPhy(bus, chip, max_freq, read_timeout, read_delay),
**kwargs,
)
@classmethod
def get_plugin_options(cls) -> Dict:
return {
"bus": Option(0, type=valid_int_f0),
"chip": Option(0, type=valid_int_f0),
"max_freq": Option(1000000, type=valid_int_f1),
"read_timeout": Option(2.0, type=valid_float_f01),
"read_delay": Option(0.001, type=valid_float_f0),
**BaseMcuHid.get_plugin_options(),
}