ps/2 keyboard prototype

This commit is contained in:
Devaev Maxim 2018-06-28 18:58:52 +03:00
parent 65bee96fef
commit 54430fed31
7 changed files with 92 additions and 7 deletions

View File

@ -8,7 +8,7 @@ kvmd:
pinout: pinout:
clock: 17 clock: 17
data: 4 data: 4
delay: 0.0002 pulse: 0.0002
atx: atx:
leds: leds:

View File

@ -5,6 +5,7 @@ from .application import init
from .atx import Atx from .atx import Atx
from .streamer import Streamer from .streamer import Streamer
from .ps2 import Ps2Keyboard
from .server import Server from .server import Server
from . import gpio from . import gpio
@ -33,9 +34,16 @@ def main() -> None:
loop=loop, loop=loop,
) )
keyboard = Ps2Keyboard(
clock=config["keyboard"]["pinout"]["clock"],
data=config["keyboard"]["pinout"]["data"],
pulse=config["keyboard"]["pulse"],
)
Server( Server(
atx=atx, atx=atx,
streamer=streamer, streamer=streamer,
keyboard=keyboard,
heartbeat=config["server"]["heartbeat"], heartbeat=config["server"]["heartbeat"],
atx_leds_poll=config["atx"]["leds"]["poll"], atx_leds_poll=config["atx"]["leds"]["poll"],
video_shutdown_delay=config["video"]["shutdown_delay"], video_shutdown_delay=config["video"]["shutdown_delay"],

View File

@ -24,8 +24,8 @@ class Atx:
self.__power_led = gpio.set_input(power_led) self.__power_led = gpio.set_input(power_led)
self.__hdd_led = gpio.set_input(hdd_led) self.__hdd_led = gpio.set_input(hdd_led)
self.__power_switch = gpio.set_output_zeroed(power_switch) self.__power_switch = gpio.set_output(power_switch)
self.__reset_switch = gpio.set_output_zeroed(reset_switch) self.__reset_switch = gpio.set_output(reset_switch)
self.__click_delay = click_delay self.__click_delay = click_delay
self.__long_click_delay = long_click_delay self.__long_click_delay = long_click_delay

View File

@ -21,8 +21,8 @@ def bcm() -> Generator[None, None, None]:
_logger.info("GPIO cleaned") _logger.info("GPIO cleaned")
def set_output_zeroed(pin: int) -> int: def set_output(pin: int, initial: bool=False) -> int:
GPIO.setup(pin, GPIO.OUT) GPIO.setup(pin, GPIO.OUT, initial=initial)
GPIO.output(pin, False) GPIO.output(pin, False)
return pin return pin

63
kvmd/kvmd/ps2.py Normal file
View File

@ -0,0 +1,63 @@
import multiprocessing
import multiprocessing.queues
import queue
import logging
import time
from . import gpio
# =====
_logger = logging.getLogger(__name__)
class Ps2Keyboard(multiprocessing.Process):
def __init__(self, clock: int, data: int, pulse: float) -> None:
super().__init__(daemon=True)
self.__clock = gpio.set_output(clock, initial=True)
self.__data = gpio.set_output(data, initial=True)
self.__pulse = pulse
self.__queue: multiprocessing.queues.Queue = multiprocessing.Queue()
self.__event = multiprocessing.Event()
def start(self) -> None:
_logger.info("Starting keyboard daemon ...")
super().start()
def stop(self) -> None:
_logger.info("Stopping keyboard daemon ...")
self.__event.set()
self.join()
def send_byte(self, code: int) -> None:
self.__queue.put(code)
def run(self) -> None:
try:
while not self.__event.is_set():
try:
code = self.__queue.get(timeout=0.1)
except queue.Empty:
pass
else:
self.__send_byte(code)
except Exception:
_logger.exception("Unhandled exception")
raise
def __send_byte(self, code: int) -> None:
code_bits = list(map(bool, bin(code)[2:].zfill(8)))
code_bits.reverse()
message = [False] + code_bits + [(not sum(code_bits) % 2), True]
for bit in message:
self.__send_bit(bit)
def __send_bit(self, bit: bool) -> None:
gpio.write(self.__clock, True)
gpio.write(self.__data, bool(bit))
time.sleep(self.__pulse)
gpio.write(self.__clock, False)
time.sleep(self.__pulse)
gpio.write(self.__clock, True)

View File

@ -13,6 +13,7 @@ import aiohttp
from .atx import Atx from .atx import Atx
from .streamer import Streamer from .streamer import Streamer
from .ps2 import Ps2Keyboard
# ===== # =====
@ -36,6 +37,7 @@ class Server: # pylint: disable=too-many-instance-attributes
self, self,
atx: Atx, atx: Atx,
streamer: Streamer, streamer: Streamer,
keyboard: Ps2Keyboard,
heartbeat: float, heartbeat: float,
atx_leds_poll: float, atx_leds_poll: float,
video_shutdown_delay: float, video_shutdown_delay: float,
@ -45,6 +47,7 @@ class Server: # pylint: disable=too-many-instance-attributes
self.__atx = atx self.__atx = atx
self.__streamer = streamer self.__streamer = streamer
self.__heartbeat = heartbeat self.__heartbeat = heartbeat
self.__keyboard = keyboard
self.__video_shutdown_delay = video_shutdown_delay self.__video_shutdown_delay = video_shutdown_delay
self.__atx_leds_poll = atx_leds_poll self.__atx_leds_poll = atx_leds_poll
self.__loop = loop self.__loop = loop
@ -55,6 +58,8 @@ class Server: # pylint: disable=too-many-instance-attributes
self.__system_tasks: List[asyncio.Task] = [] self.__system_tasks: List[asyncio.Task] = []
def run(self, host: str, port: int) -> None: def run(self, host: str, port: int) -> None:
self.__keyboard.start()
app = aiohttp.web.Application(loop=self.__loop) app = aiohttp.web.Application(loop=self.__loop)
app.router.add_get("/", self.__root_handler) app.router.add_get("/", self.__root_handler)
app.router.add_get("/ws", self.__ws_handler) app.router.add_get("/ws", self.__ws_handler)
@ -62,6 +67,7 @@ class Server: # pylint: disable=too-many-instance-attributes
app.on_cleanup.append(self.__on_cleanup) app.on_cleanup.append(self.__on_cleanup)
self.__system_tasks.extend([ self.__system_tasks.extend([
self.__loop.create_task(self.__monitor_keyboard()),
self.__loop.create_task(self.__stream_controller()), self.__loop.create_task(self.__stream_controller()),
self.__loop.create_task(self.__poll_dead_sockets()), self.__loop.create_task(self.__poll_dead_sockets()),
self.__loop.create_task(self.__poll_atx_leds()), self.__loop.create_task(self.__poll_atx_leds()),
@ -101,9 +107,17 @@ class Server: # pylint: disable=too-many-instance-attributes
await self.__remove_socket(ws) await self.__remove_socket(ws)
async def __on_cleanup(self, _: aiohttp.web.Application) -> None: async def __on_cleanup(self, _: aiohttp.web.Application) -> None:
if self.__keyboard.is_alive():
self.__keyboard.stop()
if self.__streamer.is_running(): if self.__streamer.is_running():
await self.__streamer.stop() await self.__streamer.stop()
@_system_task
async def __monitor_keyboard(self) -> None:
while self.__keyboard.is_alive():
await asyncio.sleep(0.1)
raise RuntimeError("Keyboard dead")
@_system_task @_system_task
async def __stream_controller(self) -> None: async def __stream_controller(self) -> None:
prev = 0 prev = 0

View File

@ -22,8 +22,8 @@ class Streamer: # pylint: disable=too-many-instance-attributes
loop: asyncio.AbstractEventLoop, loop: asyncio.AbstractEventLoop,
) -> None: ) -> None:
self.__cap_power = gpio.set_output_zeroed(cap_power) self.__cap_power = gpio.set_output(cap_power)
self.__vga_power = gpio.set_output_zeroed(vga_power) self.__vga_power = gpio.set_output(vga_power)
self.__sync_delay = sync_delay self.__sync_delay = sync_delay
self.__cmd = ( self.__cmd = (