Merge branch 'spi'

This commit is contained in:
Devaev Maxim 2020-11-12 21:03:28 +03:00
commit 87cc8cf7b0
33 changed files with 1328 additions and 745 deletions

1
.gitignore vendored
View File

@ -1,3 +1,4 @@
/hid/.platformio/
/hid/.pio/
/pkg/
/src/

View File

@ -40,6 +40,7 @@ depends=(
python-aiofiles
python-passlib
python-pyserial
python-spidev
python-setproctitle
python-psutil
python-systemd
@ -56,6 +57,7 @@ depends=(
nginx-mainline
openssl
platformio
avrdude-svn
make
patch
sudo

View File

@ -10,6 +10,7 @@ u kvmd-nginx - "Pi-KVM - HTTP entrypoint" -
m kvmd gpio
m kvmd uucp
m kvmd spi
m kvmd systemd-journal
m kvmd-ipmi kvmd

View File

@ -61,7 +61,7 @@ class _X11Key:
@dataclasses.dataclass(frozen=True)
class _KeyMapping:
web_name: str
serial_code: int
mcu_code: int
arduino_name: str
otg_key: _OtgKey
ps2_key: _Ps2Key
@ -115,7 +115,7 @@ def _read_keymap_csv(path: str) -> List[_KeyMapping]:
if len(row) >= 6:
keymap.append(_KeyMapping(
web_name=row["web_name"],
serial_code=int(row["serial_code"]),
mcu_code=int(row["mcu_code"]),
arduino_name=row["arduino_name"],
otg_key=_parse_otg_key(row["otg_key"]),
ps2_key=_parse_ps2_key(row["ps2_key"]),
@ -144,7 +144,7 @@ def main() -> None:
# Fields list:
# - Web
# - Serial code
# - MCU code
# - Arduino name
# - OTG code (^ for mod)
# - PS/2 key

View File

@ -4,6 +4,12 @@ ps2:
make _build E=ps2
mixed:
make _build E=mixed
usb-spi:
make _build E=usb_spi
ps2-spi:
make _build E=ps2_spi
mixed-spi:
make _build E=mixed_spi
_build:
rm -f .current
platformio run --environment $(E)
@ -15,11 +21,18 @@ upload:
platformio run --environment $(shell cat .current) --target upload
bootloader-spi: install-bootloader-spi
install-bootloader-spi: upload-bootloader-spi
upload-bootloader-spi:
platformio run --environment bootloader_spi --target bootloader
update:
platformio platform update
clean-all: clean
rm -rf .platformio
clean:
rm -rf .pio .current

7
hid/avrdude-rpi.conf Normal file
View File

@ -0,0 +1,7 @@
programmer
id = "rpi";
desc = "RPi SPI programmer";
type = "linuxspi";
reset = 25;
baudrate = 400000;
;

54
hid/avrdude.py Normal file
View File

@ -0,0 +1,54 @@
# https://docs.platformio.org/en/latest/projectconf/advanced_scripting.html
from os import rename
from os import symlink
from os.path import exists
from os.path import join
import platform
Import("env")
# =====
def _get_tool_path() -> str:
path = env.PioPlatform().get_package_dir("tool-avrdude")
assert exists(path)
return path
def _fix_ld_arm() -> None:
tool_path = _get_tool_path()
flag_path = join(tool_path, ".fix-ld-arm.done")
if not exists(flag_path):
def patch(*_, **__) -> None:
symlink("/usr/lib/libtinfo.so.6", join(tool_path, "libtinfo.so.5"))
open(flag_path, "w").close()
env.Execute(patch)
def _replace_to_system(new_path: str) -> None:
tool_path = _get_tool_path()
flag_path = join(tool_path, ".replace-to-system.done")
if not exists(flag_path):
def patch(*_, **__) -> None:
old_path = join(tool_path, "avrdude")
bak_path = join(tool_path, "_avrdude_bak")
rename(old_path, bak_path)
symlink(new_path, old_path)
open(flag_path, "w").close()
env.Execute(patch)
# =====
if "arm" in platform.machine():
_fix_ld_arm()
_path = "/usr/bin/avrdude"
if exists(_path):
_replace_to_system(_path)

0
hid/lib/.gitignore vendored Normal file
View File

View File

@ -1,20 +1,40 @@
from os.path import join
# https://docs.platformio.org/en/latest/projectconf/advanced_scripting.html
from os.path import exists
from os.path import join
from os.path import basename
from typing import Dict
Import("env")
# =====
deps_path = env.get("PROJECT_LIBDEPS_DIR", env.get("PROJECTLIBDEPS_DIR"))
assert deps_path, deps_path
env_path = join(deps_path, env["PIOENV"])
flag_path = join(env_path, ".patched")
def _get_pkg_path(name: str) -> str:
path = env.PioPlatform().get_package_dir(name)
assert exists(path)
return path
if not exists(flag_path):
env.Execute(f"patch -p1 -d {join(env_path, 'HID-Project')} < {join('patches', 'absmouse.patch')}")
def touch_flag(*_, **__) -> None:
with open(flag_path, "w") as flag_file:
pass
def _get_libs() -> Dict[str, str]:
return {
builder.name: builder.path
for builder in env.GetLibBuilders()
}
env.Execute(touch_flag)
def _patch(path: str, patch_path: str) -> None:
assert exists(path)
flag_path: str = join(path, f".{basename(patch_path)}.done")
if not exists(flag_path):
env.Execute(f"patch -p1 -d {path} < {patch_path}")
env.Execute(lambda *_, **__: open(flag_path, "w").close())
# =====
_patch(_get_pkg_path("framework-arduino-avr"), "patches/serial.patch")
_libs = _get_libs()
if "HID-Project" in _libs:
_patch(_libs["HID-Project"], "patches/absmouse.patch")

24
hid/patches/serial.patch Normal file
View File

@ -0,0 +1,24 @@
https://github.com/arduino/Arduino/issues/6387
--- a/cores/arduino/USBCore.cpp 2019-09-20 15:48:38.000000000 +0300
+++ b/cores/arduino/USBCore.cpp 2020-11-11 19:56:49.233690476 +0300
@@ -375,8 +375,10 @@
{
u8 i = setup.wIndex;
+#ifndef NO_SERIAL
if (CDC_ACM_INTERFACE == i)
return CDC_Setup(setup);
+#endif
#ifdef PLUGGABLE_USB_ENABLED
return PluggableUSB().setup(setup);
@@ -466,7 +468,9 @@
{
u8 interfaces = 0;
+#ifndef NO_SERIAL
CDC_GetInterface(&interfaces);
+#endif
#ifdef PLUGGABLE_USB_ENABLED
PluggableUSB().getInterface(&interfaces);

View File

@ -1,59 +1,142 @@
; PlatformIO Project Configuration File
;
; Build options: build flags, source filter
; Upload options: custom upload port, speed and extra flags
; Library options: dependencies, extra library storages
; Advanced options: extra scripting
;
; Please visit documentation for the other options and examples
; http://docs.platformio.org/page/projectconf.html
# http://docs.platformio.org/page/projectconf.html
[platformio]
core_dir = ./.platformio/
[common]
[env]
platform = atmelavr
board = micro
framework = arduino
extra_scripts =
pre:avrdude.py
post:patch.py
platform_packages =
tool-avrdude
[_parts_usb_kbd]
lib_deps =
TimerOne@1.1
HID-Project@2.6.1
build_flags =
-DHID_USB_KBD
[_parts_usb_mouse]
lib_deps =
HID-Project@2.6.1
build_flags =
-DHID_USB_MOUSE
[_parts_ps2_kbd]
lib_deps =
git+https://github.com/Harvie/ps2dev#v0.0.3
build_flags =
-DHID_PS2_KBD
-DPS2_KBD_CLOCK_PIN=7
-DPS2_KBD_DATA_PIN=5
[_usb]
lib_deps =
${_parts_usb_kbd.lib_deps}
# ${_parts_usb_mouse.lib_deps}
build_flags =
${_parts_usb_kbd.build_flags}
${_parts_usb_mouse.build_flags}
[_ps2]
lib_deps =
${_parts_ps2_kbd.lib_deps}
build_flags =
${_parts_ps2_kbd.build_flags}
[_mixed]
lib_deps =
${_parts_ps2_kbd.lib_deps}
${_parts_usb_mouse.lib_deps}
build_flags =
${_parts_ps2_kbd.build_flags}
${_parts_usb_mouse.build_flags}
# ===== Serial =====
[_cmd_serial]
build_flags =
-DCMD_SERIAL=Serial1
-DCMD_SERIAL_SPEED=115200
-DCMD_SERIAL_TIMEOUT=100000
upload_port = /dev/ttyACM0
[env:usb]
platform = atmelavr
board = micro
framework = arduino
upload_port = /dev/ttyACM0
lib_deps =
${common.lib_deps}
HID-Project@2.6.1
extends =
_usb
_cmd_serial
build_flags =
${common.build_flags}
-DHID_USB_KBD
-DHID_USB_MOUSE
extra_scripts = post:patch.py
${_usb.build_flags}
${_cmd_serial.build_flags}
[env:ps2]
platform = atmelavr
board = micro
framework = arduino
upload_port = /dev/ttyACM0
lib_deps =
${common.lib_deps}
git+https://github.com/Harvie/ps2dev#v0.0.3
extends =
_ps2
_cmd_serial
build_flags =
${common.build_flags}
-DHID_PS2_KBD
-DPS2_KBD_CLOCK_PIN=7
-DPS2_KBD_DATA_PIN=5
${_ps2.build_flags}
${_cmd_serial.build_flags}
[env:mixed]
platform = atmelavr
board = micro
framework = arduino
upload_port = /dev/ttyACM0
lib_deps =
${common.lib_deps}
HID-Project@2.6.1
git+https://github.com/Harvie/ps2dev#v0.0.3
extends =
_mixed
_cmd_serial
build_flags =
${common.build_flags}
-DHID_PS2_KBD
-DHID_USB_MOUSE
-DPS2_KBD_CLOCK_PIN=7
-DPS2_KBD_DATA_PIN=5
${_mixed.build_flags}
${_cmd_serial.build_flags}
# ===== RPi SPI =====
[env:bootloader_spi]
upload_protocol = rpi
upload_flags =
-C
+avrdude-rpi.conf
-P
/dev/spidev0.0:/dev/gpiochip0
extra_scripts =
pre:avrdude.py
[_cmd_spi]
build_flags =
-DCMD_SPI
-DNO_SERIAL
upload_protocol = custom
upload_flags =
-C
$PROJECT_PACKAGES_DIR/tool-avrdude/avrdude.conf
-C
+avrdude-rpi.conf
-P
/dev/spidev0.0:/dev/gpiochip0
-c
rpi
-p
$BOARD_MCU
upload_command = avrdude $UPLOAD_FLAGS -U flash:w:$SOURCE:i
[env:usb_spi]
extends =
_usb
_cmd_spi
build_flags =
${_usb.build_flags}
${_cmd_spi.build_flags}
[env:ps2_spi]
extends =
_ps2
_cmd_spi
build_flags =
${_ps2.build_flags}
${_cmd_spi.build_flags}
[env:mixed_spi]
extends =
_mixed
_cmd_spi
build_flags =
${_mixed.build_flags}
${_cmd_spi.build_flags}

View File

@ -1,26 +0,0 @@
/*****************************************************************************
# #
# KVMD - The main Pi-KVM daemon. #
# #
# Copyright (C) 2018 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
*****************************************************************************/
#pragma once
#define INLINE inline __attribute__((always_inline))

View File

@ -20,10 +20,17 @@
*****************************************************************************/
#include <Arduino.h>
#include <TimerOne.h>
#if !(defined(CMD_SERIAL) || defined(CMD_SPI))
# error CMD phy is not defined
#endif
#include "inline.h"
#include <Arduino.h>
#ifdef CMD_SPI
# include <SPI.h>
#endif
#include "proto.h"
#if defined(HID_USB_KBD) || defined(HID_USB_MOUSE)
# include "usb/hid.h"
@ -34,42 +41,10 @@
// #define CMD_SERIAL Serial1
#define CMD_SERIAL_SPEED 115200
#define CMD_RECV_TIMEOUT 100000
#define PROTO_MAGIC 0x33
#define PROTO_CRC_POLINOM 0xA001
#define PROTO_RESP_OK 0x20
#define PROTO_RESP_NONE 0x24
#define PROTO_RESP_CRC_ERROR 0x40
#define PROTO_RESP_INVALID_ERROR 0x45
#define PROTO_RESP_TIMEOUT_ERROR 0x48
#define PROTO_RESP_PONG_PREFIX 0x80
#define PROTO_RESP_PONG_CAPS 0b00000001
#define PROTO_RESP_PONG_SCROLL 0b00000010
#define PROTO_RESP_PONG_NUM 0b00000100
#define PROTO_CMD_PING 0x01
#define PROTO_CMD_REPEAT 0x02
#define PROTO_CMD_RESET_HID 0x10
#define PROTO_CMD_KEY_EVENT 0x11
#define PROTO_CMD_MOUSE_BUTTON_EVENT 0x13 // Legacy sequence
#define PROTO_CMD_MOUSE_MOVE_EVENT 0x12
#define PROTO_CMD_MOUSE_WHEEL_EVENT 0x14
#define PROTO_CMD_MOUSE_BUTTON_LEFT_SELECT 0b10000000
#define PROTO_CMD_MOUSE_BUTTON_LEFT_STATE 0b00001000
#define PROTO_CMD_MOUSE_BUTTON_RIGHT_SELECT 0b01000000
#define PROTO_CMD_MOUSE_BUTTON_RIGHT_STATE 0b00000100
#define PROTO_CMD_MOUSE_BUTTON_MIDDLE_SELECT 0b00100000
#define PROTO_CMD_MOUSE_BUTTON_MIDDLE_STATE 0b00000010
#define PROTO_CMD_MOUSE_BUTTON_EXTRA_UP_SELECT 0b10000000
#define PROTO_CMD_MOUSE_BUTTON_EXTRA_UP_STATE 0b00001000
#define PROTO_CMD_MOUSE_BUTTON_EXTRA_DOWN_SELECT 0b01000000
#define PROTO_CMD_MOUSE_BUTTON_EXTRA_DOWN_STATE 0b00000100
// #define CMD_SERIAL_SPEED 115200
// #define CMD_SERIAL_TIMEOUT 100000
// -- OR --
// #define CMD_SPI
// -----------------------------------------------------------------------------
@ -84,7 +59,7 @@
// -----------------------------------------------------------------------------
INLINE uint8_t cmdResetHid(const uint8_t *buffer) { // 0 bytes
uint8_t cmdResetHid(const uint8_t *buffer) { // 0 bytes
# ifdef HID_USB_KBD
hid_kbd.reset();
# endif
@ -94,28 +69,32 @@ INLINE uint8_t cmdResetHid(const uint8_t *buffer) { // 0 bytes
return PROTO_RESP_OK;
}
INLINE uint8_t cmdKeyEvent(const uint8_t *buffer) { // 2 bytes
uint8_t cmdKeyEvent(const uint8_t *buffer) { // 2 bytes
hid_kbd.sendKey(buffer[0], buffer[1]);
return PROTO_RESP_OK;
}
INLINE uint8_t cmdMouseButtonEvent(const uint8_t *buffer) { // 2 bytes
uint8_t cmdMouseButtonEvent(const uint8_t *buffer) { // 2 bytes
# ifdef HID_USB_MOUSE
uint8_t main_state = buffer[0];
uint8_t extra_state = buffer[1];
# define MOUSE_PAIR(_state, _button) \
_state & PROTO_CMD_MOUSE_BUTTON_##_button##_SELECT, \
_state & PROTO_CMD_MOUSE_BUTTON_##_button##_STATE
hid_mouse.sendMouseButtons(
main_state & PROTO_CMD_MOUSE_BUTTON_LEFT_SELECT, main_state & PROTO_CMD_MOUSE_BUTTON_LEFT_STATE,
main_state & PROTO_CMD_MOUSE_BUTTON_RIGHT_SELECT, main_state & PROTO_CMD_MOUSE_BUTTON_RIGHT_STATE,
main_state & PROTO_CMD_MOUSE_BUTTON_MIDDLE_SELECT, main_state & PROTO_CMD_MOUSE_BUTTON_MIDDLE_STATE,
extra_state & PROTO_CMD_MOUSE_BUTTON_EXTRA_UP_SELECT, extra_state & PROTO_CMD_MOUSE_BUTTON_EXTRA_UP_STATE,
extra_state & PROTO_CMD_MOUSE_BUTTON_EXTRA_DOWN_SELECT, extra_state & PROTO_CMD_MOUSE_BUTTON_EXTRA_DOWN_STATE
MOUSE_PAIR(main_state, LEFT),
MOUSE_PAIR(main_state, RIGHT),
MOUSE_PAIR(main_state, MIDDLE),
MOUSE_PAIR(extra_state, EXTRA_UP),
MOUSE_PAIR(extra_state, EXTRA_DOWN)
);
# undef MOUSE_PAIR
# endif
return PROTO_RESP_OK;
}
INLINE uint8_t cmdMouseMoveEvent(const uint8_t *buffer) { // 4 bytes
uint8_t cmdMouseMoveEvent(const uint8_t *buffer) { // 4 bytes
# ifdef HID_USB_MOUSE
int x = (int)buffer[0] << 8;
x |= (int)buffer[1];
@ -130,14 +109,14 @@ INLINE uint8_t cmdMouseMoveEvent(const uint8_t *buffer) { // 4 bytes
return PROTO_RESP_OK;
}
INLINE uint8_t cmdMouseWheelEvent(const uint8_t *buffer) { // 2 bytes
uint8_t cmdMouseWheelEvent(const uint8_t *buffer) { // 2 bytes
# ifdef HID_USB_MOUSE
hid_mouse.sendMouseWheel(buffer[1]); // Y only, X is not supported
# endif
return PROTO_RESP_OK;
}
INLINE uint8_t cmdPongLeds(const uint8_t *buffer) { // 0 bytes
uint8_t cmdPongLeds(const uint8_t *buffer) { // 0 bytes
return ((uint8_t) PROTO_RESP_PONG_PREFIX) | hid_kbd.getLedsAs(
PROTO_RESP_PONG_CAPS,
PROTO_RESP_PONG_SCROLL,
@ -145,40 +124,79 @@ INLINE uint8_t cmdPongLeds(const uint8_t *buffer) { // 0 bytes
);
}
uint8_t handleCmdBuffer(const uint8_t *buffer) { // 8 bytes
uint16_t crc = (uint16_t)buffer[6] << 8;
crc |= (uint16_t)buffer[7];
if (protoCrc16(buffer, 6) == crc) {
# define HANDLE(_handler) { return _handler(buffer + 2); }
switch (buffer[1]) {
case PROTO_CMD_RESET_HID: HANDLE(cmdResetHid);
case PROTO_CMD_KEY_EVENT: HANDLE(cmdKeyEvent);
case PROTO_CMD_MOUSE_BUTTON_EVENT: HANDLE(cmdMouseButtonEvent);
case PROTO_CMD_MOUSE_MOVE_EVENT: HANDLE(cmdMouseMoveEvent);
case PROTO_CMD_MOUSE_WHEEL_EVENT: HANDLE(cmdMouseWheelEvent);
case PROTO_CMD_PING: HANDLE(cmdPongLeds);
case PROTO_CMD_REPEAT: return 0;
default: return PROTO_RESP_INVALID_ERROR;
}
# undef HANDLE
}
return PROTO_RESP_CRC_ERROR;
}
// -----------------------------------------------------------------------------
INLINE uint16_t makeCrc16(const uint8_t *buffer, unsigned length) {
uint16_t crc = 0xFFFF;
#ifdef CMD_SPI
volatile uint8_t spi_in[8] = {0};
volatile uint8_t spi_in_index = 0;
for (unsigned byte_count = 0; byte_count < length; ++byte_count) {
crc = crc ^ buffer[byte_count];
for (unsigned bit_count = 0; bit_count < 8; ++bit_count) {
if ((crc & 0x0001) == 0) {
crc = crc >> 1;
volatile uint8_t spi_out[4] = {0};
volatile uint8_t spi_out_index = 0;
bool spiReady() {
return (!spi_out[0] && spi_in_index == 8);
}
void spiWrite(const uint8_t *buffer) {
spi_out[3] = buffer[3];
spi_out[2] = buffer[2];
spi_out[1] = buffer[1];
spi_out[0] = buffer[0]; // Меджик разрешает начать ответ
}
ISR(SPI_STC_vect) {
uint8_t in = SPDR;
if (spi_out[0] && spi_out_index < 4) {
SPDR = spi_out[spi_out_index];
if (!(SPSR & (1 << WCOL))) {
++spi_out_index;
if (spi_out_index == 4) {
spi_out_index = 0;
spi_in_index = 0;
spi_out[0] = 0;
}
}
} else {
crc = crc >> 1;
crc = crc ^ PROTO_CRC_POLINOM;
static bool receiving = false;
if (!receiving && in == PROTO_MAGIC) {
receiving = true;
}
if (receiving && spi_in_index < 8) {
spi_in[spi_in_index] = in;
++spi_in_index;
}
if (spi_in_index == 8) {
receiving = false;
}
SPDR = 0;
}
return crc;
}
#endif
// -----------------------------------------------------------------------------
volatile bool cmd_recv_timed_out = false;
INLINE void recvTimerStop(bool flag) {
Timer1.stop();
cmd_recv_timed_out = flag;
}
INLINE void resetCmdRecvTimeout() {
recvTimerStop(false);
Timer1.initialize(CMD_RECV_TIMEOUT);
}
INLINE void sendCmdResponse(uint8_t code=0) {
void sendCmdResponse(uint8_t code) {
static uint8_t prev_code = PROTO_RESP_NONE;
if (code == 0) {
code = prev_code; // Repeat the last code
@ -189,16 +207,15 @@ INLINE void sendCmdResponse(uint8_t code=0) {
uint8_t buffer[4];
buffer[0] = PROTO_MAGIC;
buffer[1] = code;
uint16_t crc = makeCrc16(buffer, 2);
uint16_t crc = protoCrc16(buffer, 2);
buffer[2] = (uint8_t)(crc >> 8);
buffer[3] = (uint8_t)(crc & 0xFF);
recvTimerStop(false);
# ifdef CMD_SERIAL
CMD_SERIAL.write(buffer, 4);
}
void intRecvTimedOut() {
recvTimerStop(true);
# elif defined(CMD_SPI)
spiWrite(buffer);
# endif
}
void setup() {
@ -207,49 +224,50 @@ void setup() {
hid_mouse.begin();
# endif
Timer1.attachInterrupt(intRecvTimedOut);
# ifdef CMD_SERIAL
CMD_SERIAL.begin(CMD_SERIAL_SPEED);
# elif defined(CMD_SPI)
pinMode(MISO, OUTPUT);
SPCR = (1 << SPE) | (1 << SPIE); // Slave, SPI En, IRQ En
# endif
}
void loop() {
# ifdef CMD_SERIAL
unsigned long last = micros();
uint8_t buffer[8];
unsigned index = 0;
uint8_t index = 0;
# endif
while (true) {
# ifdef HID_PS2_KBD
hid_kbd.periodic();
# endif
# ifdef CMD_SERIAL
if (CMD_SERIAL.available() > 0) {
buffer[index] = (uint8_t)CMD_SERIAL.read();
if (index == 7) {
uint16_t crc = (uint16_t)buffer[6] << 8;
crc |= (uint16_t)buffer[7];
if (makeCrc16(buffer, 6) == crc) {
# define HANDLE(_handler) { sendCmdResponse(_handler(buffer + 2)); break; }
switch (buffer[1]) {
case PROTO_CMD_RESET_HID: HANDLE(cmdResetHid);
case PROTO_CMD_KEY_EVENT: HANDLE(cmdKeyEvent);
case PROTO_CMD_MOUSE_BUTTON_EVENT: HANDLE(cmdMouseButtonEvent);
case PROTO_CMD_MOUSE_MOVE_EVENT: HANDLE(cmdMouseMoveEvent);
case PROTO_CMD_MOUSE_WHEEL_EVENT: HANDLE(cmdMouseWheelEvent);
case PROTO_CMD_PING: HANDLE(cmdPongLeds);
case PROTO_CMD_REPEAT: sendCmdResponse(); break;
default: sendCmdResponse(PROTO_RESP_INVALID_ERROR); break;
}
# undef HANDLE
} else {
sendCmdResponse(PROTO_RESP_CRC_ERROR);
}
sendCmdResponse(handleCmdBuffer(buffer));
index = 0;
} else {
resetCmdRecvTimeout();
index += 1;
last = micros();
++index;
}
} else if (index > 0 && cmd_recv_timed_out) {
} else if (index > 0) {
unsigned long now = micros();
if (
(now >= last && now - last > CMD_SERIAL_TIMEOUT)
|| (now < last && ((unsigned long)-1) - last + now > CMD_SERIAL_TIMEOUT)
) {
sendCmdResponse(PROTO_RESP_TIMEOUT_ERROR);
index = 0;
}
}
# elif defined(CMD_SPI)
if (spiReady()) {
sendCmdResponse(handleCmdBuffer(spi_in));
}
# endif
}
}

76
hid/src/proto.h Normal file
View File

@ -0,0 +1,76 @@
/*****************************************************************************
# #
# KVMD - The main Pi-KVM daemon. #
# #
# Copyright (C) 2018 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
*****************************************************************************/
#pragma once
#define PROTO_MAGIC 0x33
#define PROTO_CRC_POLINOM 0xA001
#define PROTO_RESP_OK 0x20
#define PROTO_RESP_NONE 0x24
#define PROTO_RESP_CRC_ERROR 0x40
#define PROTO_RESP_INVALID_ERROR 0x45
#define PROTO_RESP_TIMEOUT_ERROR 0x48
#define PROTO_RESP_PONG_PREFIX 0x80
#define PROTO_RESP_PONG_CAPS 0b00000001
#define PROTO_RESP_PONG_SCROLL 0b00000010
#define PROTO_RESP_PONG_NUM 0b00000100
#define PROTO_CMD_PING 0x01
#define PROTO_CMD_REPEAT 0x02
#define PROTO_CMD_RESET_HID 0x10
#define PROTO_CMD_KEY_EVENT 0x11
#define PROTO_CMD_MOUSE_BUTTON_EVENT 0x13 // Legacy sequence
#define PROTO_CMD_MOUSE_MOVE_EVENT 0x12
#define PROTO_CMD_MOUSE_WHEEL_EVENT 0x14
#define PROTO_CMD_MOUSE_BUTTON_LEFT_SELECT 0b10000000
#define PROTO_CMD_MOUSE_BUTTON_LEFT_STATE 0b00001000
#define PROTO_CMD_MOUSE_BUTTON_RIGHT_SELECT 0b01000000
#define PROTO_CMD_MOUSE_BUTTON_RIGHT_STATE 0b00000100
#define PROTO_CMD_MOUSE_BUTTON_MIDDLE_SELECT 0b00100000
#define PROTO_CMD_MOUSE_BUTTON_MIDDLE_STATE 0b00000010
#define PROTO_CMD_MOUSE_BUTTON_EXTRA_UP_SELECT 0b10000000
#define PROTO_CMD_MOUSE_BUTTON_EXTRA_UP_STATE 0b00001000
#define PROTO_CMD_MOUSE_BUTTON_EXTRA_DOWN_SELECT 0b01000000
#define PROTO_CMD_MOUSE_BUTTON_EXTRA_DOWN_STATE 0b00000100
uint16_t protoCrc16(const uint8_t *buffer, unsigned length) {
uint16_t crc = 0xFFFF;
for (unsigned byte_count = 0; byte_count < length; ++byte_count) {
crc = crc ^ buffer[byte_count];
for (unsigned bit_count = 0; bit_count < 8; ++bit_count) {
if ((crc & 0x0001) == 0) {
crc = crc >> 1;
} else {
crc = crc >> 1;
crc = crc ^ PROTO_CRC_POLINOM;
}
}
}
return crc;
}

View File

@ -25,8 +25,6 @@
#include <Arduino.h>
#include <ps2dev.h>
#include "../inline.h"
#include "keymap.h"
// #define PS2_KBD_CLOCK_PIN 7
@ -43,11 +41,11 @@ class Ps2HidKeyboard {
_dev.keyboard_init();
}
INLINE void periodic() {
void periodic() {
_dev.keyboard_handle(&_leds);
}
INLINE void sendKey(uint8_t code, bool state) {
void sendKey(uint8_t code, bool state) {
Ps2KeyType ps2_type;
uint8_t ps2_code;
@ -76,7 +74,7 @@ class Ps2HidKeyboard {
}
}
INLINE uint8_t getLedsAs(uint8_t caps, uint8_t scroll, uint8_t num) {
uint8_t getLedsAs(uint8_t caps, uint8_t scroll, uint8_t num) {
uint8_t result = 0;
periodic();

View File

@ -22,8 +22,6 @@
#pragma once
#include "../inline.h"
enum Ps2KeyType : uint8_t {
PS2_KEY_TYPE_UNKNOWN = 0,
@ -34,7 +32,7 @@ enum Ps2KeyType : uint8_t {
};
INLINE void keymapPs2(uint8_t code, Ps2KeyType *ps2_type, uint8_t *ps2_code) {
void keymapPs2(uint8_t code, Ps2KeyType *ps2_type, uint8_t *ps2_code) {
*ps2_type = PS2_KEY_TYPE_UNKNOWN;
*ps2_code = 0;

View File

@ -22,8 +22,6 @@
#pragma once
#include "../inline.h"
enum Ps2KeyType : uint8_t {
PS2_KEY_TYPE_UNKNOWN = 0,
@ -34,13 +32,13 @@ enum Ps2KeyType : uint8_t {
};
<%! import operator %>
INLINE void keymapPs2(uint8_t code, Ps2KeyType *ps2_type, uint8_t *ps2_code) {
void keymapPs2(uint8_t code, Ps2KeyType *ps2_type, uint8_t *ps2_code) {
*ps2_type = PS2_KEY_TYPE_UNKNOWN;
*ps2_code = 0;
switch (code) {
% for km in sorted(keymap, key=operator.attrgetter("serial_code")):
case ${km.serial_code}: *ps2_type = PS2_KEY_TYPE_${km.ps2_key.type.upper()}; *ps2_code = ${km.ps2_key.code}; return; // ${km.arduino_name}
% for km in sorted(keymap, key=operator.attrgetter("mcu_code")):
case ${km.mcu_code}: *ps2_type = PS2_KEY_TYPE_${km.ps2_key.type.upper()}; *ps2_code = ${km.ps2_key.code}; return; // ${km.arduino_name}
% endfor
}
}

View File

@ -24,8 +24,6 @@
#include <HID-Project.h>
#include "../inline.h"
#include "keymap.h"
@ -38,11 +36,11 @@ class UsbHidKeyboard {
BootKeyboard.begin();
}
INLINE void reset() {
void reset() {
BootKeyboard.releaseAll();
}
INLINE void sendKey(uint8_t code, bool state) {
void sendKey(uint8_t code, bool state) {
KeyboardKeycode usb_code = keymapUsb(code);
if (usb_code != KEY_ERROR_UNDEFINED) {
if (state) BootKeyboard.press(usb_code);
@ -50,7 +48,7 @@ class UsbHidKeyboard {
}
}
INLINE uint8_t getLedsAs(uint8_t caps, uint8_t scroll, uint8_t num) {
uint8_t getLedsAs(uint8_t caps, uint8_t scroll, uint8_t num) {
uint8_t leds = BootKeyboard.getLeds();
uint8_t result = 0;
@ -69,11 +67,11 @@ class UsbHidMouse {
SingleAbsoluteMouse.begin();
}
INLINE void reset() {
void reset() {
SingleAbsoluteMouse.releaseAll();
}
INLINE void sendMouseButtons(
void sendMouseButtons(
bool left_select, bool left_state,
bool right_select, bool right_state,
bool middle_select, bool middle_state,
@ -87,17 +85,17 @@ class UsbHidMouse {
if (down_select) _sendMouseButton(MOUSE_NEXT, down_state);
}
INLINE void sendMouseMove(int x, int y) {
void sendMouseMove(int x, int y) {
SingleAbsoluteMouse.moveTo(x, y);
}
INLINE void sendMouseWheel(int delta_y) {
void sendMouseWheel(int delta_y) {
// delta_x is not supported by hid-project now
SingleAbsoluteMouse.move(0, 0, delta_y);
}
private:
INLINE void _sendMouseButton(uint8_t button, bool state) {
void _sendMouseButton(uint8_t button, bool state) {
if (state) SingleAbsoluteMouse.press(button);
else SingleAbsoluteMouse.release(button);
}

View File

@ -24,10 +24,8 @@
#include <HID-Project.h>
#include "../inline.h"
INLINE KeyboardKeycode keymapUsb(uint8_t code) {
KeyboardKeycode keymapUsb(uint8_t code) {
switch (code) {
case 1: return KEY_A;
case 2: return KEY_B;

View File

@ -24,13 +24,11 @@
#include <HID-Project.h>
#include "../inline.h"
<%! import operator %>
INLINE KeyboardKeycode keymapUsb(uint8_t code) {
KeyboardKeycode keymapUsb(uint8_t code) {
switch (code) {
% for km in sorted(keymap, key=operator.attrgetter("serial_code")):
case ${km.serial_code}: return ${km.arduino_name};
% for km in sorted(keymap, key=operator.attrgetter("mcu_code")):
case ${km.mcu_code}: return ${km.arduino_name};
% endfor
default: return KEY_ERROR_UNDEFINED;
}

View File

@ -1,4 +1,4 @@
web_name,serial_code,arduino_name,otg_key,ps2_key,at1_code,x11_names
web_name,mcu_code,arduino_name,otg_key,ps2_key,at1_code,x11_names
KeyA,1,KEY_A,0x04,reg:0x1c,0x1e,"^XK_A,XK_a"
KeyB,2,KEY_B,0x05,reg:0x32,0x30,"^XK_B,XK_b"
KeyC,3,KEY_C,0x06,reg:0x21,0x2e,"^XK_C,XK_c"

1 web_name serial_code mcu_code arduino_name otg_key ps2_key at1_code x11_names
2 KeyA 1 KEY_A 0x04 reg:0x1c 0x1e ^XK_A,XK_a
3 KeyB 2 KEY_B 0x05 reg:0x32 0x30 ^XK_B,XK_b
4 KeyC 3 KEY_C 0x06 reg:0x21 0x2e ^XK_C,XK_c

View File

@ -34,12 +34,12 @@ from . import aiotools
# =====
async def pulse(line: gpiod.Line, delay: float, final: float) -> None:
async def pulse(line: gpiod.Line, delay: float, final: float, inverted: bool=False) -> None:
try:
line.set_value(1)
line.set_value(int(not inverted))
await asyncio.sleep(delay)
finally:
line.set_value(0)
line.set_value(int(inverted))
await asyncio.sleep(final)

View File

@ -27,7 +27,7 @@ from typing import Dict
# =====
@dataclasses.dataclass(frozen=True)
class SerialKey:
class McuKey:
code: int
@ -39,117 +39,117 @@ class OtgKey:
@dataclasses.dataclass(frozen=True)
class Key:
serial: SerialKey
mcu: McuKey
otg: OtgKey
KEYMAP: Dict[str, Key] = {
"KeyA": Key(serial=SerialKey(code=1), otg=OtgKey(code=4, is_modifier=False)),
"KeyB": Key(serial=SerialKey(code=2), otg=OtgKey(code=5, is_modifier=False)),
"KeyC": Key(serial=SerialKey(code=3), otg=OtgKey(code=6, is_modifier=False)),
"KeyD": Key(serial=SerialKey(code=4), otg=OtgKey(code=7, is_modifier=False)),
"KeyE": Key(serial=SerialKey(code=5), otg=OtgKey(code=8, is_modifier=False)),
"KeyF": Key(serial=SerialKey(code=6), otg=OtgKey(code=9, is_modifier=False)),
"KeyG": Key(serial=SerialKey(code=7), otg=OtgKey(code=10, is_modifier=False)),
"KeyH": Key(serial=SerialKey(code=8), otg=OtgKey(code=11, is_modifier=False)),
"KeyI": Key(serial=SerialKey(code=9), otg=OtgKey(code=12, is_modifier=False)),
"KeyJ": Key(serial=SerialKey(code=10), otg=OtgKey(code=13, is_modifier=False)),
"KeyK": Key(serial=SerialKey(code=11), otg=OtgKey(code=14, is_modifier=False)),
"KeyL": Key(serial=SerialKey(code=12), otg=OtgKey(code=15, is_modifier=False)),
"KeyM": Key(serial=SerialKey(code=13), otg=OtgKey(code=16, is_modifier=False)),
"KeyN": Key(serial=SerialKey(code=14), otg=OtgKey(code=17, is_modifier=False)),
"KeyO": Key(serial=SerialKey(code=15), otg=OtgKey(code=18, is_modifier=False)),
"KeyP": Key(serial=SerialKey(code=16), otg=OtgKey(code=19, is_modifier=False)),
"KeyQ": Key(serial=SerialKey(code=17), otg=OtgKey(code=20, is_modifier=False)),
"KeyR": Key(serial=SerialKey(code=18), otg=OtgKey(code=21, is_modifier=False)),
"KeyS": Key(serial=SerialKey(code=19), otg=OtgKey(code=22, is_modifier=False)),
"KeyT": Key(serial=SerialKey(code=20), otg=OtgKey(code=23, is_modifier=False)),
"KeyU": Key(serial=SerialKey(code=21), otg=OtgKey(code=24, is_modifier=False)),
"KeyV": Key(serial=SerialKey(code=22), otg=OtgKey(code=25, is_modifier=False)),
"KeyW": Key(serial=SerialKey(code=23), otg=OtgKey(code=26, is_modifier=False)),
"KeyX": Key(serial=SerialKey(code=24), otg=OtgKey(code=27, is_modifier=False)),
"KeyY": Key(serial=SerialKey(code=25), otg=OtgKey(code=28, is_modifier=False)),
"KeyZ": Key(serial=SerialKey(code=26), otg=OtgKey(code=29, is_modifier=False)),
"Digit1": Key(serial=SerialKey(code=27), otg=OtgKey(code=30, is_modifier=False)),
"Digit2": Key(serial=SerialKey(code=28), otg=OtgKey(code=31, is_modifier=False)),
"Digit3": Key(serial=SerialKey(code=29), otg=OtgKey(code=32, is_modifier=False)),
"Digit4": Key(serial=SerialKey(code=30), otg=OtgKey(code=33, is_modifier=False)),
"Digit5": Key(serial=SerialKey(code=31), otg=OtgKey(code=34, is_modifier=False)),
"Digit6": Key(serial=SerialKey(code=32), otg=OtgKey(code=35, is_modifier=False)),
"Digit7": Key(serial=SerialKey(code=33), otg=OtgKey(code=36, is_modifier=False)),
"Digit8": Key(serial=SerialKey(code=34), otg=OtgKey(code=37, is_modifier=False)),
"Digit9": Key(serial=SerialKey(code=35), otg=OtgKey(code=38, is_modifier=False)),
"Digit0": Key(serial=SerialKey(code=36), otg=OtgKey(code=39, is_modifier=False)),
"Enter": Key(serial=SerialKey(code=37), otg=OtgKey(code=40, is_modifier=False)),
"Escape": Key(serial=SerialKey(code=38), otg=OtgKey(code=41, is_modifier=False)),
"Backspace": Key(serial=SerialKey(code=39), otg=OtgKey(code=42, is_modifier=False)),
"Tab": Key(serial=SerialKey(code=40), otg=OtgKey(code=43, is_modifier=False)),
"Space": Key(serial=SerialKey(code=41), otg=OtgKey(code=44, is_modifier=False)),
"Minus": Key(serial=SerialKey(code=42), otg=OtgKey(code=45, is_modifier=False)),
"Equal": Key(serial=SerialKey(code=43), otg=OtgKey(code=46, is_modifier=False)),
"BracketLeft": Key(serial=SerialKey(code=44), otg=OtgKey(code=47, is_modifier=False)),
"BracketRight": Key(serial=SerialKey(code=45), otg=OtgKey(code=48, is_modifier=False)),
"Backslash": Key(serial=SerialKey(code=46), otg=OtgKey(code=49, is_modifier=False)),
"Semicolon": Key(serial=SerialKey(code=47), otg=OtgKey(code=51, is_modifier=False)),
"Quote": Key(serial=SerialKey(code=48), otg=OtgKey(code=52, is_modifier=False)),
"Backquote": Key(serial=SerialKey(code=49), otg=OtgKey(code=53, is_modifier=False)),
"Comma": Key(serial=SerialKey(code=50), otg=OtgKey(code=54, is_modifier=False)),
"Period": Key(serial=SerialKey(code=51), otg=OtgKey(code=55, is_modifier=False)),
"Slash": Key(serial=SerialKey(code=52), otg=OtgKey(code=56, is_modifier=False)),
"CapsLock": Key(serial=SerialKey(code=53), otg=OtgKey(code=57, is_modifier=False)),
"F1": Key(serial=SerialKey(code=54), otg=OtgKey(code=58, is_modifier=False)),
"F2": Key(serial=SerialKey(code=55), otg=OtgKey(code=59, is_modifier=False)),
"F3": Key(serial=SerialKey(code=56), otg=OtgKey(code=60, is_modifier=False)),
"F4": Key(serial=SerialKey(code=57), otg=OtgKey(code=61, is_modifier=False)),
"F5": Key(serial=SerialKey(code=58), otg=OtgKey(code=62, is_modifier=False)),
"F6": Key(serial=SerialKey(code=59), otg=OtgKey(code=63, is_modifier=False)),
"F7": Key(serial=SerialKey(code=60), otg=OtgKey(code=64, is_modifier=False)),
"F8": Key(serial=SerialKey(code=61), otg=OtgKey(code=65, is_modifier=False)),
"F9": Key(serial=SerialKey(code=62), otg=OtgKey(code=66, is_modifier=False)),
"F10": Key(serial=SerialKey(code=63), otg=OtgKey(code=67, is_modifier=False)),
"F11": Key(serial=SerialKey(code=64), otg=OtgKey(code=68, is_modifier=False)),
"F12": Key(serial=SerialKey(code=65), otg=OtgKey(code=69, is_modifier=False)),
"PrintScreen": Key(serial=SerialKey(code=66), otg=OtgKey(code=70, is_modifier=False)),
"Insert": Key(serial=SerialKey(code=67), otg=OtgKey(code=73, is_modifier=False)),
"Home": Key(serial=SerialKey(code=68), otg=OtgKey(code=74, is_modifier=False)),
"PageUp": Key(serial=SerialKey(code=69), otg=OtgKey(code=75, is_modifier=False)),
"Delete": Key(serial=SerialKey(code=70), otg=OtgKey(code=76, is_modifier=False)),
"End": Key(serial=SerialKey(code=71), otg=OtgKey(code=77, is_modifier=False)),
"PageDown": Key(serial=SerialKey(code=72), otg=OtgKey(code=78, is_modifier=False)),
"ArrowRight": Key(serial=SerialKey(code=73), otg=OtgKey(code=79, is_modifier=False)),
"ArrowLeft": Key(serial=SerialKey(code=74), otg=OtgKey(code=80, is_modifier=False)),
"ArrowDown": Key(serial=SerialKey(code=75), otg=OtgKey(code=81, is_modifier=False)),
"ArrowUp": Key(serial=SerialKey(code=76), otg=OtgKey(code=82, is_modifier=False)),
"ControlLeft": Key(serial=SerialKey(code=77), otg=OtgKey(code=1, is_modifier=True)),
"ShiftLeft": Key(serial=SerialKey(code=78), otg=OtgKey(code=2, is_modifier=True)),
"AltLeft": Key(serial=SerialKey(code=79), otg=OtgKey(code=4, is_modifier=True)),
"MetaLeft": Key(serial=SerialKey(code=80), otg=OtgKey(code=8, is_modifier=True)),
"ControlRight": Key(serial=SerialKey(code=81), otg=OtgKey(code=16, is_modifier=True)),
"ShiftRight": Key(serial=SerialKey(code=82), otg=OtgKey(code=32, is_modifier=True)),
"AltRight": Key(serial=SerialKey(code=83), otg=OtgKey(code=64, is_modifier=True)),
"MetaRight": Key(serial=SerialKey(code=84), otg=OtgKey(code=128, is_modifier=True)),
"Pause": Key(serial=SerialKey(code=85), otg=OtgKey(code=72, is_modifier=False)),
"ScrollLock": Key(serial=SerialKey(code=86), otg=OtgKey(code=71, is_modifier=False)),
"NumLock": Key(serial=SerialKey(code=87), otg=OtgKey(code=83, is_modifier=False)),
"ContextMenu": Key(serial=SerialKey(code=88), otg=OtgKey(code=101, is_modifier=False)),
"NumpadDivide": Key(serial=SerialKey(code=89), otg=OtgKey(code=84, is_modifier=False)),
"NumpadMultiply": Key(serial=SerialKey(code=90), otg=OtgKey(code=85, is_modifier=False)),
"NumpadSubtract": Key(serial=SerialKey(code=91), otg=OtgKey(code=86, is_modifier=False)),
"NumpadAdd": Key(serial=SerialKey(code=92), otg=OtgKey(code=87, is_modifier=False)),
"NumpadEnter": Key(serial=SerialKey(code=93), otg=OtgKey(code=88, is_modifier=False)),
"Numpad1": Key(serial=SerialKey(code=94), otg=OtgKey(code=89, is_modifier=False)),
"Numpad2": Key(serial=SerialKey(code=95), otg=OtgKey(code=90, is_modifier=False)),
"Numpad3": Key(serial=SerialKey(code=96), otg=OtgKey(code=91, is_modifier=False)),
"Numpad4": Key(serial=SerialKey(code=97), otg=OtgKey(code=92, is_modifier=False)),
"Numpad5": Key(serial=SerialKey(code=98), otg=OtgKey(code=93, is_modifier=False)),
"Numpad6": Key(serial=SerialKey(code=99), otg=OtgKey(code=94, is_modifier=False)),
"Numpad7": Key(serial=SerialKey(code=100), otg=OtgKey(code=95, is_modifier=False)),
"Numpad8": Key(serial=SerialKey(code=101), otg=OtgKey(code=96, is_modifier=False)),
"Numpad9": Key(serial=SerialKey(code=102), otg=OtgKey(code=97, is_modifier=False)),
"Numpad0": Key(serial=SerialKey(code=103), otg=OtgKey(code=98, is_modifier=False)),
"NumpadDecimal": Key(serial=SerialKey(code=104), otg=OtgKey(code=99, is_modifier=False)),
"Power": Key(serial=SerialKey(code=105), otg=OtgKey(code=102, is_modifier=False)),
"IntlBackslash": Key(serial=SerialKey(code=106), otg=OtgKey(code=100, is_modifier=False)),
"KeyA": Key(mcu=McuKey(code=1), otg=OtgKey(code=4, is_modifier=False)),
"KeyB": Key(mcu=McuKey(code=2), otg=OtgKey(code=5, is_modifier=False)),
"KeyC": Key(mcu=McuKey(code=3), otg=OtgKey(code=6, is_modifier=False)),
"KeyD": Key(mcu=McuKey(code=4), otg=OtgKey(code=7, is_modifier=False)),
"KeyE": Key(mcu=McuKey(code=5), otg=OtgKey(code=8, is_modifier=False)),
"KeyF": Key(mcu=McuKey(code=6), otg=OtgKey(code=9, is_modifier=False)),
"KeyG": Key(mcu=McuKey(code=7), otg=OtgKey(code=10, is_modifier=False)),
"KeyH": Key(mcu=McuKey(code=8), otg=OtgKey(code=11, is_modifier=False)),
"KeyI": Key(mcu=McuKey(code=9), otg=OtgKey(code=12, is_modifier=False)),
"KeyJ": Key(mcu=McuKey(code=10), otg=OtgKey(code=13, is_modifier=False)),
"KeyK": Key(mcu=McuKey(code=11), otg=OtgKey(code=14, is_modifier=False)),
"KeyL": Key(mcu=McuKey(code=12), otg=OtgKey(code=15, is_modifier=False)),
"KeyM": Key(mcu=McuKey(code=13), otg=OtgKey(code=16, is_modifier=False)),
"KeyN": Key(mcu=McuKey(code=14), otg=OtgKey(code=17, is_modifier=False)),
"KeyO": Key(mcu=McuKey(code=15), otg=OtgKey(code=18, is_modifier=False)),
"KeyP": Key(mcu=McuKey(code=16), otg=OtgKey(code=19, is_modifier=False)),
"KeyQ": Key(mcu=McuKey(code=17), otg=OtgKey(code=20, is_modifier=False)),
"KeyR": Key(mcu=McuKey(code=18), otg=OtgKey(code=21, is_modifier=False)),
"KeyS": Key(mcu=McuKey(code=19), otg=OtgKey(code=22, is_modifier=False)),
"KeyT": Key(mcu=McuKey(code=20), otg=OtgKey(code=23, is_modifier=False)),
"KeyU": Key(mcu=McuKey(code=21), otg=OtgKey(code=24, is_modifier=False)),
"KeyV": Key(mcu=McuKey(code=22), otg=OtgKey(code=25, is_modifier=False)),
"KeyW": Key(mcu=McuKey(code=23), otg=OtgKey(code=26, is_modifier=False)),
"KeyX": Key(mcu=McuKey(code=24), otg=OtgKey(code=27, is_modifier=False)),
"KeyY": Key(mcu=McuKey(code=25), otg=OtgKey(code=28, is_modifier=False)),
"KeyZ": Key(mcu=McuKey(code=26), otg=OtgKey(code=29, is_modifier=False)),
"Digit1": Key(mcu=McuKey(code=27), otg=OtgKey(code=30, is_modifier=False)),
"Digit2": Key(mcu=McuKey(code=28), otg=OtgKey(code=31, is_modifier=False)),
"Digit3": Key(mcu=McuKey(code=29), otg=OtgKey(code=32, is_modifier=False)),
"Digit4": Key(mcu=McuKey(code=30), otg=OtgKey(code=33, is_modifier=False)),
"Digit5": Key(mcu=McuKey(code=31), otg=OtgKey(code=34, is_modifier=False)),
"Digit6": Key(mcu=McuKey(code=32), otg=OtgKey(code=35, is_modifier=False)),
"Digit7": Key(mcu=McuKey(code=33), otg=OtgKey(code=36, is_modifier=False)),
"Digit8": Key(mcu=McuKey(code=34), otg=OtgKey(code=37, is_modifier=False)),
"Digit9": Key(mcu=McuKey(code=35), otg=OtgKey(code=38, is_modifier=False)),
"Digit0": Key(mcu=McuKey(code=36), otg=OtgKey(code=39, is_modifier=False)),
"Enter": Key(mcu=McuKey(code=37), otg=OtgKey(code=40, is_modifier=False)),
"Escape": Key(mcu=McuKey(code=38), otg=OtgKey(code=41, is_modifier=False)),
"Backspace": Key(mcu=McuKey(code=39), otg=OtgKey(code=42, is_modifier=False)),
"Tab": Key(mcu=McuKey(code=40), otg=OtgKey(code=43, is_modifier=False)),
"Space": Key(mcu=McuKey(code=41), otg=OtgKey(code=44, is_modifier=False)),
"Minus": Key(mcu=McuKey(code=42), otg=OtgKey(code=45, is_modifier=False)),
"Equal": Key(mcu=McuKey(code=43), otg=OtgKey(code=46, is_modifier=False)),
"BracketLeft": Key(mcu=McuKey(code=44), otg=OtgKey(code=47, is_modifier=False)),
"BracketRight": Key(mcu=McuKey(code=45), otg=OtgKey(code=48, is_modifier=False)),
"Backslash": Key(mcu=McuKey(code=46), otg=OtgKey(code=49, is_modifier=False)),
"Semicolon": Key(mcu=McuKey(code=47), otg=OtgKey(code=51, is_modifier=False)),
"Quote": Key(mcu=McuKey(code=48), otg=OtgKey(code=52, is_modifier=False)),
"Backquote": Key(mcu=McuKey(code=49), otg=OtgKey(code=53, is_modifier=False)),
"Comma": Key(mcu=McuKey(code=50), otg=OtgKey(code=54, is_modifier=False)),
"Period": Key(mcu=McuKey(code=51), otg=OtgKey(code=55, is_modifier=False)),
"Slash": Key(mcu=McuKey(code=52), otg=OtgKey(code=56, is_modifier=False)),
"CapsLock": Key(mcu=McuKey(code=53), otg=OtgKey(code=57, is_modifier=False)),
"F1": Key(mcu=McuKey(code=54), otg=OtgKey(code=58, is_modifier=False)),
"F2": Key(mcu=McuKey(code=55), otg=OtgKey(code=59, is_modifier=False)),
"F3": Key(mcu=McuKey(code=56), otg=OtgKey(code=60, is_modifier=False)),
"F4": Key(mcu=McuKey(code=57), otg=OtgKey(code=61, is_modifier=False)),
"F5": Key(mcu=McuKey(code=58), otg=OtgKey(code=62, is_modifier=False)),
"F6": Key(mcu=McuKey(code=59), otg=OtgKey(code=63, is_modifier=False)),
"F7": Key(mcu=McuKey(code=60), otg=OtgKey(code=64, is_modifier=False)),
"F8": Key(mcu=McuKey(code=61), otg=OtgKey(code=65, is_modifier=False)),
"F9": Key(mcu=McuKey(code=62), otg=OtgKey(code=66, is_modifier=False)),
"F10": Key(mcu=McuKey(code=63), otg=OtgKey(code=67, is_modifier=False)),
"F11": Key(mcu=McuKey(code=64), otg=OtgKey(code=68, is_modifier=False)),
"F12": Key(mcu=McuKey(code=65), otg=OtgKey(code=69, is_modifier=False)),
"PrintScreen": Key(mcu=McuKey(code=66), otg=OtgKey(code=70, is_modifier=False)),
"Insert": Key(mcu=McuKey(code=67), otg=OtgKey(code=73, is_modifier=False)),
"Home": Key(mcu=McuKey(code=68), otg=OtgKey(code=74, is_modifier=False)),
"PageUp": Key(mcu=McuKey(code=69), otg=OtgKey(code=75, is_modifier=False)),
"Delete": Key(mcu=McuKey(code=70), otg=OtgKey(code=76, is_modifier=False)),
"End": Key(mcu=McuKey(code=71), otg=OtgKey(code=77, is_modifier=False)),
"PageDown": Key(mcu=McuKey(code=72), otg=OtgKey(code=78, is_modifier=False)),
"ArrowRight": Key(mcu=McuKey(code=73), otg=OtgKey(code=79, is_modifier=False)),
"ArrowLeft": Key(mcu=McuKey(code=74), otg=OtgKey(code=80, is_modifier=False)),
"ArrowDown": Key(mcu=McuKey(code=75), otg=OtgKey(code=81, is_modifier=False)),
"ArrowUp": Key(mcu=McuKey(code=76), otg=OtgKey(code=82, is_modifier=False)),
"ControlLeft": Key(mcu=McuKey(code=77), otg=OtgKey(code=1, is_modifier=True)),
"ShiftLeft": Key(mcu=McuKey(code=78), otg=OtgKey(code=2, is_modifier=True)),
"AltLeft": Key(mcu=McuKey(code=79), otg=OtgKey(code=4, is_modifier=True)),
"MetaLeft": Key(mcu=McuKey(code=80), otg=OtgKey(code=8, is_modifier=True)),
"ControlRight": Key(mcu=McuKey(code=81), otg=OtgKey(code=16, is_modifier=True)),
"ShiftRight": Key(mcu=McuKey(code=82), otg=OtgKey(code=32, is_modifier=True)),
"AltRight": Key(mcu=McuKey(code=83), otg=OtgKey(code=64, is_modifier=True)),
"MetaRight": Key(mcu=McuKey(code=84), otg=OtgKey(code=128, is_modifier=True)),
"Pause": Key(mcu=McuKey(code=85), otg=OtgKey(code=72, is_modifier=False)),
"ScrollLock": Key(mcu=McuKey(code=86), otg=OtgKey(code=71, is_modifier=False)),
"NumLock": Key(mcu=McuKey(code=87), otg=OtgKey(code=83, is_modifier=False)),
"ContextMenu": Key(mcu=McuKey(code=88), otg=OtgKey(code=101, is_modifier=False)),
"NumpadDivide": Key(mcu=McuKey(code=89), otg=OtgKey(code=84, is_modifier=False)),
"NumpadMultiply": Key(mcu=McuKey(code=90), otg=OtgKey(code=85, is_modifier=False)),
"NumpadSubtract": Key(mcu=McuKey(code=91), otg=OtgKey(code=86, is_modifier=False)),
"NumpadAdd": Key(mcu=McuKey(code=92), otg=OtgKey(code=87, is_modifier=False)),
"NumpadEnter": Key(mcu=McuKey(code=93), otg=OtgKey(code=88, is_modifier=False)),
"Numpad1": Key(mcu=McuKey(code=94), otg=OtgKey(code=89, is_modifier=False)),
"Numpad2": Key(mcu=McuKey(code=95), otg=OtgKey(code=90, is_modifier=False)),
"Numpad3": Key(mcu=McuKey(code=96), otg=OtgKey(code=91, is_modifier=False)),
"Numpad4": Key(mcu=McuKey(code=97), otg=OtgKey(code=92, is_modifier=False)),
"Numpad5": Key(mcu=McuKey(code=98), otg=OtgKey(code=93, is_modifier=False)),
"Numpad6": Key(mcu=McuKey(code=99), otg=OtgKey(code=94, is_modifier=False)),
"Numpad7": Key(mcu=McuKey(code=100), otg=OtgKey(code=95, is_modifier=False)),
"Numpad8": Key(mcu=McuKey(code=101), otg=OtgKey(code=96, is_modifier=False)),
"Numpad9": Key(mcu=McuKey(code=102), otg=OtgKey(code=97, is_modifier=False)),
"Numpad0": Key(mcu=McuKey(code=103), otg=OtgKey(code=98, is_modifier=False)),
"NumpadDecimal": Key(mcu=McuKey(code=104), otg=OtgKey(code=99, is_modifier=False)),
"Power": Key(mcu=McuKey(code=105), otg=OtgKey(code=102, is_modifier=False)),
"IntlBackslash": Key(mcu=McuKey(code=106), otg=OtgKey(code=100, is_modifier=False)),
}

View File

@ -27,7 +27,7 @@ from typing import Dict
# =====
@dataclasses.dataclass(frozen=True)
class SerialKey:
class McuKey:
code: int
@ -39,13 +39,13 @@ class OtgKey:
@dataclasses.dataclass(frozen=True)
class Key:
serial: SerialKey
mcu: McuKey
otg: OtgKey
<%! import operator %>
KEYMAP: Dict[str, Key] = {
% for km in sorted(keymap, key=operator.attrgetter("serial_code")):
"${km.web_name}": Key(serial=SerialKey(code=${km.serial_code}), otg=OtgKey(code=${km.otg_key.code}, is_modifier=${km.otg_key.is_modifier})),
% for km in sorted(keymap, key=operator.attrgetter("mcu_code")):
"${km.web_name}": Key(mcu=McuKey(code=${km.mcu_code}), otg=OtgKey(code=${km.otg_key.code}, is_modifier=${km.otg_key.is_modifier})),
% endfor
}

View File

@ -52,6 +52,8 @@ class BasePlugin:
def get_plugin_class(sub: str, name: str) -> Type[BasePlugin]:
assert sub
assert name
if name.startswith("_"):
raise UnknownPluginError(f"Unknown plugin '{sub}/{name}'")
try:
module = importlib.import_module(f"kvmd.plugins.{sub}.{name}")
except ModuleNotFoundError:

View File

@ -0,0 +1,423 @@
# ========================================================================== #
# #
# KVMD - The main Pi-KVM daemon. #
# #
# Copyright (C) 2018 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
# ========================================================================== #
import os
import multiprocessing
import dataclasses
import contextlib
import queue
import struct
import time
from typing import Tuple
from typing import List
from typing import Dict
from typing import Iterable
from typing import Generator
from typing import AsyncGenerator
from ....logging import get_logger
from ....keyboard.mappings import KEYMAP
from .... import tools
from .... import aiotools
from .... import aiomulti
from .... import aioproc
from ....yamlconf import Option
from ....validators.basic import valid_bool
from ....validators.basic import valid_int_f0
from ....validators.basic import valid_int_f1
from ....validators.basic import valid_float_f01
from ....validators.hw import valid_gpio_pin_optional
from .. import BaseHid
from .gpio import Gpio
# =====
class _RequestError(Exception):
def __init__(self, msg: str, online: bool=False) -> None:
super().__init__(msg)
self.msg = msg
self.online = online
class _PermRequestError(_RequestError):
pass
class _TempRequestError(_RequestError):
pass
# =====
class _BaseEvent:
def make_command(self) -> bytes:
raise NotImplementedError
class _ClearEvent(_BaseEvent):
def make_command(self) -> bytes:
return b"\x10\x00\x00\x00\x00"
@dataclasses.dataclass(frozen=True)
class _KeyEvent(_BaseEvent):
name: str
state: bool
def __post_init__(self) -> None:
assert self.name in KEYMAP
def make_command(self) -> bytes:
code = KEYMAP[self.name].mcu.code
return struct.pack(">BBBxx", 0x11, code, int(self.state))
@dataclasses.dataclass(frozen=True)
class _MouseButtonEvent(_BaseEvent):
name: str
state: bool
def __post_init__(self) -> None:
assert self.name in ["left", "right", "middle", "up", "down"]
def make_command(self) -> bytes:
(code, state_pressed, is_main) = {
"left": (0b10000000, 0b00001000, True),
"right": (0b01000000, 0b00000100, True),
"middle": (0b00100000, 0b00000010, True),
"up": (0b10000000, 0b00001000, False), # Back
"down": (0b01000000, 0b00000100, False), # Forward
}[self.name]
if self.state:
code |= state_pressed
if is_main:
main_code = code
extra_code = 0
else:
main_code = 0
extra_code = code
return struct.pack(">BBBxx", 0x13, main_code, extra_code)
@dataclasses.dataclass(frozen=True)
class _MouseMoveEvent(_BaseEvent):
to_x: int
to_y: int
def __post_init__(self) -> None:
assert -32768 <= self.to_x <= 32767
assert -32768 <= self.to_y <= 32767
def make_command(self) -> bytes:
return struct.pack(">Bhh", 0x12, self.to_x, self.to_y)
@dataclasses.dataclass(frozen=True)
class _MouseWheelEvent(_BaseEvent):
delta_x: int
delta_y: int
def __post_init__(self) -> None:
assert -127 <= self.delta_x <= 127
assert -127 <= self.delta_y <= 127
def make_command(self) -> bytes:
# Горизонтальная прокрутка пока не поддерживается
return struct.pack(">Bxbxx", 0x14, self.delta_y)
# =====
class BasePhyConnection:
def send(self, request: bytes) -> bytes:
raise NotImplementedError
class BasePhy:
def has_device(self) -> bool:
raise NotImplementedError
@contextlib.contextmanager
def connected(self) -> Generator[BasePhyConnection, None, None]:
raise NotImplementedError
class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-instance-attributes
def __init__( # pylint: disable=too-many-arguments,super-init-not-called
self,
phy: BasePhy,
reset_pin: int,
reset_inverted: bool,
reset_delay: float,
read_retries: int,
common_retries: int,
retries_delay: float,
errors_threshold: int,
noop: bool,
) -> None:
multiprocessing.Process.__init__(self, daemon=True)
self.__read_retries = read_retries
self.__common_retries = common_retries
self.__retries_delay = retries_delay
self.__errors_threshold = errors_threshold
self.__noop = noop
self.__phy = phy
self.__gpio = Gpio(reset_pin, reset_inverted, reset_delay)
self.__events_queue: "multiprocessing.Queue[_BaseEvent]" = multiprocessing.Queue()
self.__notifier = aiomulti.AioProcessNotifier()
self.__state_flags = aiomulti.AioSharedFlags({
"online": True,
"caps": False,
"scroll": False,
"num": False,
}, self.__notifier)
self.__stop_event = multiprocessing.Event()
@classmethod
def get_plugin_options(cls) -> Dict:
return {
"reset_pin": Option(-1, type=valid_gpio_pin_optional),
"reset_inverted": Option(False, type=valid_bool),
"reset_delay": Option(0.1, type=valid_float_f01),
"read_retries": Option(10, type=valid_int_f1),
"common_retries": Option(100, type=valid_int_f1),
"retries_delay": Option(0.1, type=valid_float_f01),
"errors_threshold": Option(5, type=valid_int_f0),
"noop": Option(False, type=valid_bool),
}
def sysprep(self) -> None:
self.__gpio.open()
get_logger(0).info("Starting HID daemon ...")
self.start()
async def get_state(self) -> Dict:
state = await self.__state_flags.get()
return {
"online": state["online"],
"keyboard": {
"online": state["online"],
"leds": {
"caps": state["caps"],
"scroll": state["scroll"],
"num": state["num"],
},
},
"mouse": {"online": state["online"]},
}
async def poll_state(self) -> AsyncGenerator[Dict, None]:
prev_state: Dict = {}
while True:
state = await self.get_state()
if state != prev_state:
yield state
prev_state = state
await self.__notifier.wait()
@aiotools.atomic
async def reset(self) -> None:
await self.__gpio.reset()
@aiotools.atomic
async def cleanup(self) -> None:
logger = get_logger(0)
try:
if self.is_alive():
logger.info("Stopping HID daemon ...")
self.__stop_event.set()
if self.exitcode is not None:
self.join()
if self.__phy.has_device():
get_logger().info("Clearing HID events ...")
try:
with self.__phy.connected() as conn:
self.__process_command(conn, b"\x10\x00\x00\x00\x00")
except Exception:
logger.exception("Can't clear HID events")
finally:
self.__gpio.close()
# =====
def send_key_events(self, keys: Iterable[Tuple[str, bool]]) -> None:
for (key, state) in keys:
self.__queue_event(_KeyEvent(key, state))
def send_mouse_button_event(self, button: str, state: bool) -> None:
self.__queue_event(_MouseButtonEvent(button, state))
def send_mouse_move_event(self, to_x: int, to_y: int) -> None:
self.__queue_event(_MouseMoveEvent(to_x, to_y))
def send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None:
self.__queue_event(_MouseWheelEvent(delta_x, delta_y))
def clear_events(self) -> None:
# FIXME: Если очистка производится со стороны процесса хида, то возможна гонка между
# очисткой и добавлением события _ClearEvent. Неприятно, но не смертельно.
# Починить блокировкой после перехода на асинхронные очереди.
tools.clear_queue(self.__events_queue)
self.__queue_event(_ClearEvent())
def __queue_event(self, event: _BaseEvent) -> None:
if not self.__stop_event.is_set():
self.__events_queue.put_nowait(event)
def run(self) -> None: # pylint: disable=too-many-branches
logger = get_logger(0)
logger.info("Started HID pid=%d", os.getpid())
aioproc.ignore_sigint()
aioproc.rename_process("hid")
while not self.__stop_event.is_set():
try:
if self.__phy.has_device():
with self.__phy.connected() as conn:
while not (self.__stop_event.is_set() and self.__events_queue.qsize() == 0):
try:
event = self.__events_queue.get(timeout=0.1)
except queue.Empty:
self.__process_command(conn, b"\x01\x00\x00\x00\x00") # Ping
else:
if not self.__process_command(conn, event.make_command()):
self.clear_events()
else:
logger.error("Missing HID device")
time.sleep(1)
except Exception:
self.clear_events()
logger.exception("Unexpected HID error")
time.sleep(1)
def __process_command(self, conn: BasePhyConnection, command: bytes) -> bool:
return self.__process_request(conn, self.__make_request(command))
def __process_request(self, conn: BasePhyConnection, request: bytes) -> bool: # pylint: disable=too-many-branches
logger = get_logger()
error_messages: List[str] = []
live_log_errors = False
common_retries = self.__common_retries
read_retries = self.__read_retries
error_retval = False
while common_retries and read_retries:
response = self.__send_request(conn, request)
try:
if len(response) < 4:
read_retries -= 1
raise _TempRequestError(f"No response from HID: request={request!r}")
assert len(response) == 4, response
if self.__make_crc16(response[-4:-2]) != struct.unpack(">H", response[-2:])[0]:
request = self.__make_request(b"\x02\x00\x00\x00\x00") # Repeat an answer
raise _TempRequestError("Invalid response CRC; requesting response again ...")
code = response[1]
if code == 0x48: # Request timeout # pylint: disable=no-else-raise
raise _TempRequestError(f"Got request timeout from HID: request={request!r}")
elif code == 0x40: # CRC Error
raise _TempRequestError(f"Got CRC error of request from HID: request={request!r}")
elif code == 0x45: # Unknown command
raise _PermRequestError(f"HID did not recognize the request={request!r}", online=True)
elif code == 0x24: # Rebooted?
raise _PermRequestError("No previous command state inside HID, seems it was rebooted", online=True)
elif code == 0x20: # Done
self.__state_flags.update(online=True)
return True
elif code & 0x80: # Pong with leds
self.__state_flags.update(
online=True,
caps=bool(code & 0b00000001),
scroll=bool(code & 0x00000010),
num=bool(code & 0x00000100),
)
return True
raise _TempRequestError(f"Invalid response from HID: request={request!r}; code=0x{code:02X}")
except _RequestError as err:
common_retries -= 1
self.__state_flags.update(online=err.online)
error_retval = err.online
if live_log_errors:
logger.error(err.msg)
else:
error_messages.append(err.msg)
if len(error_messages) > self.__errors_threshold:
for msg in error_messages:
logger.error(msg)
error_messages = []
live_log_errors = True
if isinstance(err, _PermRequestError):
break
if common_retries and read_retries:
time.sleep(self.__retries_delay)
for msg in error_messages:
logger.error(msg)
if not (common_retries and read_retries):
logger.error("Can't process HID request due many errors: %r", request)
return error_retval
def __send_request(self, conn: BasePhyConnection, request: bytes) -> bytes:
if not self.__noop:
response = conn.send(request)
else:
response = b"\x33\x20" # Magic + OK
response += struct.pack(">H", self.__make_crc16(response))
return response
def __make_request(self, command: bytes) -> bytes:
request = b"\x33" + command
request += struct.pack(">H", self.__make_crc16(request))
assert len(request) == 8, (request, command)
return request
def __make_crc16(self, data: bytes) -> int:
crc = 0xFFFF
for byte in data:
crc = crc ^ byte
for _ in range(8):
if crc & 0x0001 == 0:
crc = crc >> 1
else:
crc = crc >> 1
crc = crc ^ 0xA001
return crc

View File

@ -0,0 +1,78 @@
# ========================================================================== #
# #
# KVMD - The main Pi-KVM daemon. #
# #
# Copyright (C) 2018 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
# ========================================================================== #
from typing import Optional
import gpiod
from ....logging import get_logger
from .... import env
from .... import aiotools
from .... import aiogp
# =====
class Gpio:
def __init__(
self,
reset_pin: int,
reset_inverted: bool,
reset_delay: float,
) -> None:
self.__reset_pin = reset_pin
self.__reset_inverted = reset_inverted
self.__reset_delay = reset_delay
self.__chip: Optional[gpiod.Chip] = None
self.__reset_line: Optional[gpiod.Line] = None
self.__reset_wip = False
def open(self) -> None:
if self.__reset_pin >= 0:
assert self.__chip is None
assert self.__reset_line is None
self.__chip = gpiod.Chip(env.GPIO_DEVICE_PATH)
self.__reset_line = self.__chip.get_line(self.__reset_pin)
self.__reset_line.request("kvmd::hid-mcu::reset", gpiod.LINE_REQ_DIR_OUT, default_vals=[int(self.__reset_inverted)])
def close(self) -> None:
if self.__chip:
try:
self.__chip.close()
except Exception:
pass
@aiotools.atomic
async def reset(self) -> None:
if self.__reset_pin >= 0:
assert self.__reset_line
if not self.__reset_wip:
self.__reset_wip = True
try:
await aiogp.pulse(self.__reset_line, self.__reset_delay, 1, self.__reset_inverted)
finally:
self.__reset_wip = False
get_logger(0).info("Reset HID performed")
else:
get_logger(0).info("Another reset HID in progress")

View File

@ -21,443 +21,77 @@
import os
import multiprocessing
import dataclasses
import queue
import struct
import errno
import time
import contextlib
from typing import Tuple
from typing import List
from typing import Dict
from typing import Iterable
from typing import AsyncGenerator
from typing import Optional
from typing import Generator
from typing import Any
import gpiod
import serial
from ...logging import get_logger
from ...keyboard.mappings import KEYMAP
from ... import env
from ... import tools
from ... import aiotools
from ... import aiomulti
from ... import aioproc
from ... import aiogp
from ...yamlconf import Option
from ...validators.basic import valid_bool
from ...validators.basic import valid_int_f0
from ...validators.basic import valid_int_f1
from ...validators.basic import valid_float_f01
from ...validators.os import valid_abs_path
from ...validators.hw import valid_tty_speed
from ...validators.hw import valid_gpio_pin_optional
from . import BaseHid
from ._mcu import BasePhyConnection
from ._mcu import BasePhy
from ._mcu import BaseMcuHid
# =====
class _RequestError(Exception):
def __init__(self, msg: str, online: bool=False) -> None:
super().__init__(msg)
self.msg = msg
self.online = online
class _SerialPhyConnection(BasePhyConnection):
def __init__(self, tty: serial.Serial) -> None:
self.__tty = tty
def send(self, request: bytes) -> bytes:
assert len(request) == 8
assert request[0] == 0x33
if self.__tty.in_waiting:
self.__tty.read_all()
assert self.__tty.write(request) == 8
return self.__tty.read(4)
class _PermRequestError(_RequestError):
pass
class _TempRequestError(_RequestError):
pass
# =====
class _BaseEvent:
def make_command(self) -> bytes:
raise NotImplementedError
class _ClearEvent(_BaseEvent):
def make_command(self) -> bytes:
return b"\x10\x00\x00\x00\x00"
@dataclasses.dataclass(frozen=True)
class _KeyEvent(_BaseEvent):
name: str
state: bool
def __post_init__(self) -> None:
assert self.name in KEYMAP
def make_command(self) -> bytes:
code = KEYMAP[self.name].serial.code
return struct.pack(">BBBxx", 0x11, code, int(self.state))
@dataclasses.dataclass(frozen=True)
class _MouseButtonEvent(_BaseEvent):
name: str
state: bool
def __post_init__(self) -> None:
assert self.name in ["left", "right", "middle", "up", "down"]
def make_command(self) -> bytes:
(code, state_pressed, is_main) = {
"left": (0b10000000, 0b00001000, True),
"right": (0b01000000, 0b00000100, True),
"middle": (0b00100000, 0b00000010, True),
"up": (0b10000000, 0b00001000, False), # Back
"down": (0b01000000, 0b00000100, False), # Forward
}[self.name]
if self.state:
code |= state_pressed
if is_main:
main_code = code
extra_code = 0
else:
main_code = 0
extra_code = code
return struct.pack(">BBBxx", 0x13, main_code, extra_code)
@dataclasses.dataclass(frozen=True)
class _MouseMoveEvent(_BaseEvent):
to_x: int
to_y: int
def __post_init__(self) -> None:
assert -32768 <= self.to_x <= 32767
assert -32768 <= self.to_y <= 32767
def make_command(self) -> bytes:
return struct.pack(">Bhh", 0x12, self.to_x, self.to_y)
@dataclasses.dataclass(frozen=True)
class _MouseWheelEvent(_BaseEvent):
delta_x: int
delta_y: int
def __post_init__(self) -> None:
assert -127 <= self.delta_x <= 127
assert -127 <= self.delta_y <= 127
def make_command(self) -> bytes:
# Горизонтальная прокрутка пока не поддерживается
return struct.pack(">Bxbxx", 0x14, self.delta_y)
class _Gpio:
def __init__(self, reset_pin: int, reset_delay: float) -> None:
self.__reset_pin = reset_pin
self.__reset_delay = reset_delay
self.__chip: Optional[gpiod.Chip] = None
self.__reset_line: Optional[gpiod.Line] = None
self.__reset_wip = False
def open(self) -> None:
if self.__reset_pin >= 0:
assert self.__chip is None
assert self.__reset_line is None
self.__chip = gpiod.Chip(env.GPIO_DEVICE_PATH)
self.__reset_line = self.__chip.get_line(self.__reset_pin)
self.__reset_line.request("kvmd::hid-serial::reset", gpiod.LINE_REQ_DIR_OUT, default_vals=[0])
def close(self) -> None:
if self.__chip:
try:
self.__chip.close()
except Exception:
pass
@aiotools.atomic
async def reset(self) -> None:
if self.__reset_pin >= 0:
assert self.__reset_line
if not self.__reset_wip:
self.__reset_wip = True
try:
await aiogp.pulse(self.__reset_line, self.__reset_delay, 1)
finally:
self.__reset_wip = False
get_logger(0).info("Reset HID performed")
else:
get_logger(0).info("Another reset HID in progress")
# =====
class Plugin(BaseHid, multiprocessing.Process): # pylint: disable=too-many-instance-attributes
def __init__( # pylint: disable=too-many-arguments,super-init-not-called
class _SerialPhy(BasePhy):
def __init__(
self,
reset_pin: int,
reset_delay: float,
device_path: str,
speed: int,
read_timeout: float,
read_retries: int,
common_retries: int,
retries_delay: float,
errors_threshold: int,
noop: bool,
) -> None:
multiprocessing.Process.__init__(self, daemon=True)
self.__device_path = device_path
self.__speed = speed
self.__read_timeout = read_timeout
self.__read_retries = read_retries
self.__common_retries = common_retries
self.__retries_delay = retries_delay
self.__errors_threshold = errors_threshold
self.__noop = noop
self.__gpio = _Gpio(reset_pin, reset_delay)
def has_device(self) -> bool:
return os.path.exists(self.__device_path)
self.__events_queue: "multiprocessing.Queue[_BaseEvent]" = multiprocessing.Queue()
@contextlib.contextmanager
def connected(self) -> Generator[_SerialPhyConnection, None, None]: # type: ignore
with serial.Serial(self.__device_path, self.__speed, timeout=self.__read_timeout) as tty:
yield _SerialPhyConnection(tty)
self.__notifier = aiomulti.AioProcessNotifier()
self.__state_flags = aiomulti.AioSharedFlags({
"online": True,
"caps": False,
"scroll": False,
"num": False,
}, self.__notifier)
self.__stop_event = multiprocessing.Event()
# =====
class Plugin(BaseMcuHid):
def __init__(self, **kwargs: Any) -> None:
phy_kwargs: Dict = {key: kwargs.pop(key) for key in self.__get_phy_options()}
super().__init__(phy=_SerialPhy(**phy_kwargs), **kwargs)
@classmethod
def get_plugin_options(cls) -> Dict:
return {
"reset_pin": Option(-1, type=valid_gpio_pin_optional),
"reset_delay": Option(0.1, type=valid_float_f01),
**cls.__get_phy_options(),
**BaseMcuHid.get_plugin_options(),
}
@classmethod
def __get_phy_options(cls) -> Dict:
return {
"device": Option("", type=valid_abs_path, unpack_as="device_path"),
"speed": Option(115200, type=valid_tty_speed),
"read_timeout": Option(2.0, type=valid_float_f01),
"read_retries": Option(10, type=valid_int_f1),
"common_retries": Option(100, type=valid_int_f1),
"retries_delay": Option(0.1, type=valid_float_f01),
"errors_threshold": Option(5, type=valid_int_f0),
"noop": Option(False, type=valid_bool),
}
def sysprep(self) -> None:
self.__gpio.open()
get_logger(0).info("Starting HID daemon ...")
self.start()
async def get_state(self) -> Dict:
state = await self.__state_flags.get()
return {
"online": state["online"],
"keyboard": {
"online": state["online"],
"leds": {
"caps": state["caps"],
"scroll": state["scroll"],
"num": state["num"],
},
},
"mouse": {"online": state["online"]},
}
async def poll_state(self) -> AsyncGenerator[Dict, None]:
prev_state: Dict = {}
while True:
state = await self.get_state()
if state != prev_state:
yield state
prev_state = state
await self.__notifier.wait()
@aiotools.atomic
async def reset(self) -> None:
await self.__gpio.reset()
@aiotools.atomic
async def cleanup(self) -> None:
logger = get_logger(0)
try:
if self.is_alive():
logger.info("Stopping HID daemon ...")
self.__stop_event.set()
if self.exitcode is not None:
self.join()
if os.path.exists(self.__device_path):
get_logger().info("Clearing HID events ...")
try:
with self.__get_serial() as tty:
self.__process_command(tty, b"\x10\x00\x00\x00\x00")
except Exception:
logger.exception("Can't clear HID events")
finally:
self.__gpio.close()
# =====
def send_key_events(self, keys: Iterable[Tuple[str, bool]]) -> None:
for (key, state) in keys:
self.__queue_event(_KeyEvent(key, state))
def send_mouse_button_event(self, button: str, state: bool) -> None:
self.__queue_event(_MouseButtonEvent(button, state))
def send_mouse_move_event(self, to_x: int, to_y: int) -> None:
self.__queue_event(_MouseMoveEvent(to_x, to_y))
def send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None:
self.__queue_event(_MouseWheelEvent(delta_x, delta_y))
def clear_events(self) -> None:
# FIXME: Если очистка производится со стороны процесса хида, то возможна гонка между
# очисткой и добавлением события _ClearEvent. Неприятно, но не смертельно.
# Починить блокировкой после перехода на асинхронные очереди.
tools.clear_queue(self.__events_queue)
self.__queue_event(_ClearEvent())
def __queue_event(self, event: _BaseEvent) -> None:
if not self.__stop_event.is_set():
self.__events_queue.put_nowait(event)
def run(self) -> None: # pylint: disable=too-many-branches
logger = get_logger(0)
logger.info("Started HID pid=%d", os.getpid())
aioproc.ignore_sigint()
aioproc.rename_process("hid")
while not self.__stop_event.is_set():
try:
with self.__get_serial() as tty:
while not (self.__stop_event.is_set() and self.__events_queue.qsize() == 0):
try:
event = self.__events_queue.get(timeout=0.1)
except queue.Empty:
self.__process_command(tty, b"\x01\x00\x00\x00\x00") # Ping
else:
if not self.__process_command(tty, event.make_command()):
self.clear_events()
except Exception as err:
self.clear_events()
if isinstance(err, serial.SerialException) and err.errno == errno.ENOENT: # pylint: disable=no-member
logger.error("Missing HID serial device: %s", self.__device_path)
else:
logger.exception("Unexpected HID error")
time.sleep(1)
def __get_serial(self) -> serial.Serial:
return serial.Serial(self.__device_path, self.__speed, timeout=self.__read_timeout)
def __process_command(self, tty: serial.Serial, command: bytes) -> bool:
return self.__process_request(tty, self.__make_request(command))
def __process_request(self, tty: serial.Serial, request: bytes) -> bool: # pylint: disable=too-many-branches
logger = get_logger()
error_messages: List[str] = []
live_log_errors = False
common_retries = self.__common_retries
read_retries = self.__read_retries
error_retval = False
while common_retries and read_retries:
response = self.__send_request(tty, request)
try:
if len(response) < 4:
read_retries -= 1
raise _TempRequestError(f"No response from HID: request={request!r}")
assert len(response) == 4, response
if self.__make_crc16(response[-4:-2]) != struct.unpack(">H", response[-2:])[0]:
request = self.__make_request(b"\x02\x00\x00\x00\x00") # Repeat an answer
raise _TempRequestError("Invalid response CRC; requesting response again ...")
code = response[1]
if code == 0x48: # Request timeout # pylint: disable=no-else-raise
raise _TempRequestError(f"Got request timeout from HID: request={request!r}")
elif code == 0x40: # CRC Error
raise _TempRequestError(f"Got CRC error of request from HID: request={request!r}")
elif code == 0x45: # Unknown command
raise _PermRequestError(f"HID did not recognize the request={request!r}", online=True)
elif code == 0x24: # Rebooted?
raise _PermRequestError("No previous command state inside HID, seems it was rebooted", online=True)
elif code == 0x20: # Done
self.__state_flags.update(online=True)
return True
elif code & 0x80: # Pong with leds
self.__state_flags.update(
online=True,
caps=bool(code & 0b00000001),
scroll=bool(code & 0x00000010),
num=bool(code & 0x00000100),
)
return True
raise _TempRequestError(f"Invalid response from HID: request={request!r}; code=0x{code:02X}")
except _RequestError as err:
common_retries -= 1
self.__state_flags.update(online=err.online)
error_retval = err.online
if live_log_errors:
logger.error(err.msg)
else:
error_messages.append(err.msg)
if len(error_messages) > self.__errors_threshold:
for msg in error_messages:
logger.error(msg)
error_messages = []
live_log_errors = True
if isinstance(err, _PermRequestError):
break
if common_retries and read_retries:
time.sleep(self.__retries_delay)
for msg in error_messages:
logger.error(msg)
if not (common_retries and read_retries):
logger.error("Can't process HID request due many errors: %r", request)
return error_retval
def __send_request(self, tty: serial.Serial, request: bytes) -> bytes:
if not self.__noop:
if tty.in_waiting:
tty.read(tty.in_waiting)
assert tty.write(request) == len(request)
response = tty.read(4)
else:
response = b"\x33\x20" # Magic + OK
response += struct.pack(">H", self.__make_crc16(response))
return response
def __make_request(self, command: bytes) -> bytes:
request = b"\x33" + command
request += struct.pack(">H", self.__make_crc16(request))
assert len(request) == 8, (request, command)
return request
def __make_crc16(self, data: bytes) -> int:
crc = 0xFFFF
for byte in data:
crc = crc ^ byte
for _ in range(8):
if crc & 0x0001 == 0:
crc = crc >> 1
else:
crc = crc >> 1
crc = crc ^ 0xA001
return crc

179
kvmd/plugins/hid/spi.py Normal file
View File

@ -0,0 +1,179 @@
# ========================================================================== #
# #
# KVMD - The main Pi-KVM daemon. #
# #
# Copyright (C) 2018 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
# ========================================================================== #
import os
import contextlib
import time
from typing import List
from typing import Dict
from typing import Generator
from typing import Callable
from typing import Optional
from typing import Any
import spidev
import gpiod
from ...logging import get_logger
from ...yamlconf import Option
from ...validators.basic import valid_bool
from ...validators.basic import valid_int_f0
from ...validators.basic import valid_int_f1
from ...validators.basic import valid_float_f01
from ...validators.hw import valid_gpio_pin_optional
from ... import env
from ._mcu import BasePhyConnection
from ._mcu import BasePhy
from ._mcu import BaseMcuHid
# =====
class _SpiPhyConnection(BasePhyConnection):
def __init__(
self,
xfer: Callable[[bytes], bytes],
read_timeout: float,
) -> None:
self.__xfer = xfer
self.__read_timeout = read_timeout
def send(self, request: bytes) -> bytes:
assert len(request) == 8
assert request[0] == 0x33
deadline_ts = time.time() + self.__read_timeout
dummy = b"\x00" * 8
while time.time() < deadline_ts:
if bytes(self.__xfer(dummy)) == dummy:
break
else:
get_logger(0).error("SPI timeout reached while garbage reading")
return b""
self.__xfer(request)
response: List[int] = []
deadline_ts = time.time() + self.__read_timeout
found = False
while time.time() < deadline_ts:
for byte in self.__xfer(b"\x00" * (5 - len(response))):
if not found:
if byte != 0x33:
continue
found = True
response.append(byte)
if len(response) == 4:
break
if len(response) == 4:
break
else:
get_logger(0).error("SPI timeout reached while responce waiting")
return b""
return bytes(response)
class _SpiPhy(BasePhy): # pylint: disable=too-many-instance-attributes
def __init__(
self,
bus: int,
chip: int,
hw_cs: bool,
sw_cs_pin: int,
max_freq: int,
block_usec: int,
read_timeout: float,
) -> None:
self.__bus = bus
self.__chip = chip
self.__hw_cs = hw_cs
self.__sw_cs_pin = sw_cs_pin
self.__max_freq = max_freq
self.__block_usec = block_usec
self.__read_timeout = read_timeout
def has_device(self) -> bool:
return os.path.exists(f"/dev/spidev{self.__bus}.{self.__chip}")
@contextlib.contextmanager
def connected(self) -> Generator[_SpiPhyConnection, None, None]: # type: ignore
with self.__sw_cs_connected() as sw_cs_line:
with contextlib.closing(spidev.SpiDev(self.__bus, self.__chip)) as spi:
spi.mode = 0
spi.no_cs = (not self.__hw_cs)
spi.max_speed_hz = self.__max_freq
def xfer(data: bytes) -> bytes:
try:
if sw_cs_line is not None:
sw_cs_line.set_value(0)
return spi.xfer(data, self.__max_freq, self.__block_usec)
finally:
if sw_cs_line is not None:
sw_cs_line.set_value(1)
yield _SpiPhyConnection(
xfer=xfer,
read_timeout=self.__read_timeout,
)
@contextlib.contextmanager
def __sw_cs_connected(self) -> Generator[Optional[gpiod.Line], None, None]:
if self.__sw_cs_pin > 0:
with contextlib.closing(gpiod.Chip(env.GPIO_DEVICE_PATH)) as chip:
line = chip.get_line(self.__sw_cs_pin)
line.request("kvmd::hid-mcu::sw_cs", gpiod.LINE_REQ_DIR_OUT, default_vals=[1])
yield line
else:
yield None
# =====
class Plugin(BaseMcuHid):
def __init__(self, **kwargs: Any) -> None:
phy_kwargs: Dict = {key: kwargs.pop(key) for key in self.__get_phy_options()}
super().__init__(phy=_SpiPhy(**phy_kwargs), **kwargs)
@classmethod
def get_plugin_options(cls) -> Dict:
return {
**cls.__get_phy_options(),
**BaseMcuHid.get_plugin_options(),
}
@classmethod
def __get_phy_options(cls) -> Dict:
return {
"bus": Option(-1, type=valid_int_f0),
"chip": Option(-1, type=valid_int_f0),
"hw_cs": Option(False, type=valid_bool),
"sw_cs_pin": Option(-1, type=valid_gpio_pin_optional),
"max_freq": Option(200000, type=valid_int_f1),
"block_usec": Option(1, type=valid_int_f0),
"read_timeout": Option(0.5, type=valid_float_f01),
}

View File

@ -83,6 +83,7 @@ def main() -> None:
"kvmd.plugins",
"kvmd.plugins.auth",
"kvmd.plugins.hid",
"kvmd.plugins.hid._mcu",
"kvmd.plugins.hid.otg",
"kvmd.plugins.hid.bt",
"kvmd.plugins.atx",

View File

@ -18,10 +18,14 @@ InotifyMask.UNMOUNT
IpmiServer.handle_raw_request
SpiDev.no_cs
SpiDev.cshigh
SpiDev.max_speed_hz
_AtxApiPart.switch_power
_KeyMapping.web_name
_KeyMapping.serial_code
_KeyMapping.mcu_code
_KeyMapping.arduino_name
_KeyMapping.otg_key
_KeyMapping.ps2_key

View File

@ -1 +1,2 @@
pyghmi
spidev

View File

@ -27,7 +27,7 @@ from kvmd.keyboard.mappings import KEYMAP
# =====
def test_ok__keymap() -> None:
assert KEYMAP["KeyA"].serial.code == 1
assert KEYMAP["KeyA"].mcu.code == 1
def test_fail__keymap() -> None: