Merge branch 'multihid'

This commit is contained in:
Devaev Maxim 2020-11-22 14:33:18 +03:00
commit b7e0ee3300
28 changed files with 1166 additions and 648 deletions

View File

@ -1,15 +1,7 @@
usb:
make _build E=usb
ps2:
make _build E=ps2
mixed:
make _build E=mixed
usb-spi:
make _build E=usb_spi
ps2-spi:
make _build E=ps2_spi
mixed-spi:
make _build E=mixed_spi
serial:
make _build E=serial
spi:
make _build E=spi
_build:
rm -f .current
platformio run --environment $(E)

View File

@ -33,10 +33,11 @@ def _patch(path: str, patch_path: str) -> None:
# =====
_patch(_get_pkg_path("framework-arduino-avr"), "patches/optional-serial.patch")
_patch(_get_pkg_path("framework-arduino-avr"), "patches/no-main.patch")
_patch(_get_pkg_path("framework-arduino-avr"), "patches/optional-usb-serial.patch")
_patch(_get_pkg_path("framework-arduino-avr"), "patches/get-plugged-endpoint.patch")
_libs = _get_libs()
if "HID-Project" in _libs:
_patch(_libs["HID-Project"], "patches/absmouse.patch")
_patch(_libs["HID-Project"], "patches/shut-up.patch")
_patch(_libs["HID-Project"], "patches/shut-up.patch")
_patch(_libs["HID-Project"], "patches/no-hid-singletones.patch")
_patch(_libs["HID-Project"], "patches/absmouse-win-fix.patch")

View File

@ -0,0 +1,66 @@
diff -u -r HID-Project/src/SingleReport/BootKeyboard.cpp _HID-Project/src/SingleReport/BootKeyboard.cpp
--- HID-Project/src/SingleReport/BootKeyboard.cpp 2019-07-13 21:16:23.000000000 +0300
+++ _HID-Project/src/SingleReport/BootKeyboard.cpp 2020-11-17 18:59:36.618815374 +0300
@@ -206,6 +206,6 @@
}
-BootKeyboard_ BootKeyboard;
+//BootKeyboard_ BootKeyboard;
diff -u -r HID-Project/src/SingleReport/BootKeyboard.h _HID-Project/src/SingleReport/BootKeyboard.h
--- HID-Project/src/SingleReport/BootKeyboard.h 2019-07-13 21:16:23.000000000 +0300
+++ _HID-Project/src/SingleReport/BootKeyboard.h 2020-11-17 19:00:54.967113649 +0300
@@ -80,6 +80,6 @@
uint8_t* featureReport;
int featureLength;
};
-extern BootKeyboard_ BootKeyboard;
+//extern BootKeyboard_ BootKeyboard;
diff -u -r HID-Project/src/SingleReport/BootMouse.cpp _HID-Project/src/SingleReport/BootMouse.cpp
--- HID-Project/src/SingleReport/BootMouse.cpp 2019-07-13 21:16:23.000000000 +0300
+++ _HID-Project/src/SingleReport/BootMouse.cpp 2020-11-17 18:59:22.859113905 +0300
@@ -139,6 +139,6 @@
}
}
-BootMouse_ BootMouse;
+//BootMouse_ BootMouse;
diff -u -r HID-Project/src/SingleReport/BootMouse.h _HID-Project/src/SingleReport/BootMouse.h
--- HID-Project/src/SingleReport/BootMouse.h 2019-07-13 21:16:23.000000000 +0300
+++ _HID-Project/src/SingleReport/BootMouse.h 2020-11-17 19:01:04.076915591 +0300
@@ -48,6 +48,6 @@
virtual void SendReport(void* data, int length) override;
};
-extern BootMouse_ BootMouse;
+//extern BootMouse_ BootMouse;
diff -u -r HID-Project/src/SingleReport/SingleAbsoluteMouse.cpp _HID-Project/src/SingleReport/SingleAbsoluteMouse.cpp
--- HID-Project/src/SingleReport/SingleAbsoluteMouse.cpp 2020-11-17 18:39:35.314843889 +0300
+++ _HID-Project/src/SingleReport/SingleAbsoluteMouse.cpp 2020-11-17 18:59:12.189345326 +0300
@@ -139,6 +139,6 @@
USB_Send(pluggedEndpoint | TRANSFER_RELEASE, data, length);
}
-SingleAbsoluteMouse_ SingleAbsoluteMouse;
+//SingleAbsoluteMouse_ SingleAbsoluteMouse;
diff -u -r HID-Project/src/SingleReport/SingleAbsoluteMouse.h _HID-Project/src/SingleReport/SingleAbsoluteMouse.h
--- HID-Project/src/SingleReport/SingleAbsoluteMouse.h 2019-07-13 21:16:23.000000000 +0300
+++ _HID-Project/src/SingleReport/SingleAbsoluteMouse.h 2020-11-17 19:01:21.356539808 +0300
@@ -49,6 +49,6 @@
virtual inline void SendReport(void* data, int length) override;
};
-extern SingleAbsoluteMouse_ SingleAbsoluteMouse;
+//extern SingleAbsoluteMouse_ SingleAbsoluteMouse;

17
hid/patches/no-main.patch Normal file
View File

@ -0,0 +1,17 @@
diff -u -r framework-arduino-avr/cores/arduino/main.cpp _framework-arduino-avr/cores/arduino/main.cpp
--- framework-arduino-avr/cores/arduino/main.cpp 2019-05-16 15:52:01.000000000 +0300
+++ _framework-arduino-avr/cores/arduino/main.cpp 2020-11-17 18:56:01.243474508 +0300
@@ -30,6 +30,7 @@
void setupUSB() __attribute__((weak));
void setupUSB() { }
+/*
int main(void)
{
init();
@@ -49,4 +50,5 @@
return 0;
}
+*/

View File

@ -5,7 +5,7 @@ https://github.com/arduino-libraries/MIDIUSB/issues/50#issuecomment-451427496
return obj;
}
+#ifndef NO_SERIAL
+#ifndef NO_USB_SERIAL
PluggableUSB_::PluggableUSB_() : lastIf(CDC_ACM_INTERFACE + CDC_INTERFACE_COUNT),
lastEp(CDC_FIRST_ENDPOINT + CDC_ENPOINT_COUNT),
+#else
@ -22,7 +22,7 @@ diff -u -r a/cores/arduino/USBCore.cpp b/cores/arduino/USBCore.cpp
{
0, // Control Endpoint
+#ifndef NO_SERIAL
+#ifndef NO_USB_SERIAL
EP_TYPE_INTERRUPT_IN, // CDC_ENDPOINT_ACM
EP_TYPE_BULK_OUT, // CDC_ENDPOINT_OUT
EP_TYPE_BULK_IN, // CDC_ENDPOINT_IN
@ -34,7 +34,7 @@ diff -u -r a/cores/arduino/USBCore.cpp b/cores/arduino/USBCore.cpp
{
u8 i = setup.wIndex;
+#ifndef NO_SERIAL
+#ifndef NO_USB_SERIAL
if (CDC_ACM_INTERFACE == i)
return CDC_Setup(setup);
+#endif
@ -45,7 +45,7 @@ diff -u -r a/cores/arduino/USBCore.cpp b/cores/arduino/USBCore.cpp
{
u8 interfaces = 0;
+#ifndef NO_SERIAL
+#ifndef NO_USB_SERIAL
CDC_GetInterface(&interfaces);
+#endif

View File

@ -6,87 +6,51 @@ core_dir = ./.platformio/
platform = atmelavr
board = micro
framework = arduino
lib_deps =
HID-Project@2.6.1
git+https://github.com/Harvie/ps2dev#v0.0.3
extra_scripts =
pre:avrdude.py
post:patch.py
platform_packages =
tool-avrdude
[_parts_usb_kbd]
lib_deps =
HID-Project@2.6.1
[_common]
build_flags =
-DHID_USB_KBD
[_parts_usb_mouse]
lib_deps =
HID-Project@2.6.1
build_flags =
-DHID_USB_MOUSE
[_parts_ps2_kbd]
lib_deps =
git+https://github.com/Harvie/ps2dev#v0.0.3
build_flags =
-DHID_PS2_KBD
-DPS2_KBD_CLOCK_PIN=7
-DPS2_KBD_DATA_PIN=5
[_usb]
lib_deps =
${_parts_usb_kbd.lib_deps}
# ${_parts_usb_mouse.lib_deps}
build_flags =
${_parts_usb_kbd.build_flags}
${_parts_usb_mouse.build_flags}
[_ps2]
lib_deps =
${_parts_ps2_kbd.lib_deps}
build_flags =
${_parts_ps2_kbd.build_flags}
[_mixed]
lib_deps =
${_parts_ps2_kbd.lib_deps}
${_parts_usb_mouse.lib_deps}
build_flags =
${_parts_ps2_kbd.build_flags}
${_parts_usb_mouse.build_flags}
-DHID_PS2_KBD_CLOCK_PIN=7
-DHID_PS2_KBD_DATA_PIN=5
-DHID_USB_CHECK_ENDPOINT
# ----- The default config with dynamic switching -----
-DHID_DYNAMIC
-DHID_WITH_USB
-DHID_SET_USB_KBD
-DHID_SET_USB_MOUSE_ABS
# ----- PS2 keyboard only -----
# -DHID_WITH_PS2
# -DHID_SET_PS2_KBD
# ----- PS2 keyboard + USB absolute mouse -----
# -DHID_WITH_USB
# -DHID_WITH_PS2
# -DHID_SET_PS2_KBD
# -DHID_SET_USB_MOUSE_ABS
# ----- PS2 keyboard + USB relative mouse -----
# -DHID_WITH_USB
# -DHID_WITH_PS2
# -DHID_SET_PS2_KBD
# -DHID_SET_USB_MOUSE_REL
# ===== Serial =====
[_cmd_serial]
[env:serial]
extends =
_common
build_flags =
${_common.build_flags}
-DCMD_SERIAL=Serial1
-DCMD_SERIAL_SPEED=115200
-DCMD_SERIAL_TIMEOUT=100000
upload_port = /dev/ttyACM0
[env:usb]
extends =
_usb
_cmd_serial
build_flags =
${_usb.build_flags}
${_cmd_serial.build_flags}
[env:ps2]
extends =
_ps2
_cmd_serial
build_flags =
${_ps2.build_flags}
${_cmd_serial.build_flags}
[env:mixed]
extends =
_mixed
_cmd_serial
build_flags =
${_mixed.build_flags}
${_cmd_serial.build_flags}
# ===== RPi SPI =====
[env:bootloader_spi]
@ -99,11 +63,13 @@ upload_flags =
extra_scripts =
pre:avrdude.py
[_cmd_spi]
[env:spi]
extends =
_common
build_flags =
${_common.build_flags}
-DCMD_SPI
-DNO_SERIAL
-DCHECK_ENDPOINT
-DNO_USB_SERIAL
upload_protocol = custom
upload_flags =
-C
@ -117,27 +83,3 @@ upload_flags =
-p
$BOARD_MCU
upload_command = avrdude $UPLOAD_FLAGS -U flash:w:$SOURCE:i
[env:usb_spi]
extends =
_usb
_cmd_spi
build_flags =
${_usb.build_flags}
${_cmd_spi.build_flags}
[env:ps2_spi]
extends =
_ps2
_cmd_spi
build_flags =
${_ps2.build_flags}
${_cmd_spi.build_flags}
[env:mixed_spi]
extends =
_mixed
_cmd_spi
build_flags =
${_mixed.build_flags}
${_cmd_spi.build_flags}

View File

@ -20,126 +20,222 @@
*****************************************************************************/
#if !(defined(CMD_SERIAL) || defined(CMD_SPI))
# error CMD phy is not defined
#endif
#include <Arduino.h>
#ifdef CMD_SPI
# include <SPI.h>
#endif
#include "proto.h"
#if defined(HID_USB_KBD) || defined(HID_USB_MOUSE)
# include "usb/hid.h"
#endif
#ifdef HID_PS2_KBD
# include "ps2/hid.h"
#endif
// #define CMD_SERIAL Serial1
// #define CMD_SERIAL_SPEED 115200
// #define CMD_SERIAL_TIMEOUT 100000
// -- OR --
// #define CMD_SPI
// -----------------------------------------------------------------------------
#ifdef HID_USB_KBD
UsbHidKeyboard hid_kbd;
#elif defined(HID_PS2_KBD)
Ps2HidKeyboard hid_kbd;
#endif
#ifdef HID_USB_MOUSE
UsbHidMouse hid_mouse;
#if !(defined(CMD_SERIAL) || defined(CMD_SPI))
# error CMD phy is not defined
#endif
#include <Arduino.h>
#ifdef HID_DYNAMIC
# include <avr/eeprom.h>
#endif
#include "proto.h"
#ifdef CMD_SPI
# include "spi.h"
#endif
#include "usb/hid.h"
#include "ps2/hid.h"
// -----------------------------------------------------------------------------
uint8_t cmdPong(const uint8_t *_=NULL) { // 0 bytes
return (
PROTO::PONG::PREFIX
| hid_kbd.getLedsAs(PROTO::PONG::CAPS, PROTO::PONG::SCROLL, PROTO::PONG::NUM)
| (hid_kbd.isOnline() ? 0 : PROTO::PONG::KEYBOARD_OFFLINE)
# ifdef HID_USB_MOUSE
| (hid_mouse.isOnline() ? 0 : PROTO::PONG::MOUSE_OFFLINE)
# endif
);
static UsbKeyboard *_usb_kbd = NULL;
static UsbMouseAbsolute *_usb_mouse_abs = NULL;
static UsbMouseRelative *_usb_mouse_rel = NULL;
static Ps2Keyboard *_ps2_kbd = NULL;
#ifdef HID_DYNAMIC
static bool _reset_required = false;
static int _readOutputs(void) {
uint8_t data[8];
eeprom_read_block(data, 0, 8);
if (data[0] != PROTO::MAGIC || PROTO::crc16(data, 6) != PROTO::merge8(data[6], data[7])) {
return -1;
}
return data[1];
}
uint8_t cmdResetHid(const uint8_t *_) { // 0 bytes
# ifdef HID_USB_KBD
hid_kbd.reset();
static void _writeOutputs(uint8_t mask, uint8_t outputs, bool force) {
int old = 0;
if (!force) {
old = _readOutputs();
if (old < 0) {
old = 0;
}
}
uint8_t data[8] = {0};
data[0] = PROTO::MAGIC;
data[1] = (old & ~mask) | outputs;
PROTO::split16(PROTO::crc16(data, 6), &data[6], &data[7]);
eeprom_update_block(data, 0, 8);
}
#endif
static void _initOutputs() {
int outputs;
# ifdef HID_DYNAMIC
outputs = _readOutputs();
if (outputs < 0) {
# endif
# ifdef HID_USB_MOUSE
hid_mouse.reset();
outputs = 0;
# if defined(HID_WITH_USB) && defined(HID_SET_USB_KBD)
outputs |= PROTO::OUTPUTS::KEYBOARD::USB;
# elif defined(HID_WITH_PS2) && defined(HID_SET_PS2_KBD)
outputs |= PROTO::OUTPUTS::KEYBOARD::PS2;
# endif
return cmdPong();
# if defined(HID_WITH_USB) && defined(HID_SET_USB_MOUSE_ABS)
outputs |= PROTO::OUTPUTS::MOUSE::USB_ABS;
# elif defined(HID_WITH_USB) && defined(HID_SET_USB_MOUSE_REL)
outputs |= PROTO::OUTPUTS::MOUSE::USB_REL;
# elif defined(HID_WITH_PS2) && defined(HID_SET_PS2_MOUSE)
outputs |= PROTO::OUTPUTS::MOUSE::PS2;
# endif
# ifdef HID_DYNAMIC
_writeOutputs(0xFF, outputs, true);
}
# endif
uint8_t kbd = outputs & PROTO::OUTPUTS::KEYBOARD::MASK;
switch (kbd) {
# ifdef HID_WITH_USB
case PROTO::OUTPUTS::KEYBOARD::USB: _usb_kbd = new UsbKeyboard(); break;
# endif
# ifdef HID_WITH_PS2
case PROTO::OUTPUTS::KEYBOARD::PS2: _ps2_kbd = new Ps2Keyboard(); break;
# endif
}
uint8_t mouse = outputs & PROTO::OUTPUTS::MOUSE::MASK;
switch (mouse) {
# ifdef HID_WITH_USB
case PROTO::OUTPUTS::MOUSE::USB_ABS: _usb_mouse_abs = new UsbMouseAbsolute(); break;
case PROTO::OUTPUTS::MOUSE::USB_REL: _usb_mouse_rel = new UsbMouseRelative(); break;
# endif
}
USBDevice.attach();
switch (kbd) {
# ifdef HID_WITH_USB
case PROTO::OUTPUTS::KEYBOARD::USB: _usb_kbd->begin(); break;
# endif
# ifdef HID_WITH_PS2
case PROTO::OUTPUTS::KEYBOARD::PS2: _ps2_kbd->begin(); break;
# endif
}
switch (mouse) {
# ifdef HID_WITH_USB
case PROTO::OUTPUTS::MOUSE::USB_ABS: _usb_mouse_abs->begin(); break;
case PROTO::OUTPUTS::MOUSE::USB_REL: _usb_mouse_rel->begin(); break;
# endif
}
}
uint8_t cmdKeyEvent(const uint8_t *buffer) { // 2 bytes
hid_kbd.sendKey(buffer[0], buffer[1]);
return cmdPong();
// -----------------------------------------------------------------------------
static void _cmdSetKeyboard(const uint8_t *data) { // 1 bytes
# ifdef HID_DYNAMIC
_writeOutputs(PROTO::OUTPUTS::KEYBOARD::MASK, data[0], false);
_reset_required = true;
# endif
}
uint8_t cmdMouseButtonEvent(const uint8_t *buffer) { // 2 bytes
# ifdef HID_USB_MOUSE
uint8_t main_state = buffer[0];
uint8_t extra_state = buffer[1];
static void _cmdSetMouse(const uint8_t *data) { // 1 bytes
# ifdef HID_DYNAMIC
_writeOutputs(PROTO::OUTPUTS::MOUSE::MASK, data[0], false);
_reset_required = true;
# endif
}
static void _cmdClearHid(const uint8_t *_) { // 0 bytes
if (_usb_kbd) {
_usb_kbd->clear();
}
if (_usb_mouse_abs) {
_usb_mouse_abs->clear();
} else if (_usb_mouse_rel) {
_usb_mouse_rel->clear();
}
}
static void _cmdKeyEvent(const uint8_t *data) { // 2 bytes
if (_usb_kbd) {
_usb_kbd->sendKey(data[0], data[1]);
} else if (_ps2_kbd) {
_ps2_kbd->sendKey(data[0], data[1]);
}
}
static void _cmdMouseButtonEvent(const uint8_t *data) { // 2 bytes
# define MOUSE_PAIR(_state, _button) \
_state & PROTO::CMD::MOUSE::_button::SELECT, \
_state & PROTO::CMD::MOUSE::_button::STATE
hid_mouse.sendButtons(
MOUSE_PAIR(main_state, LEFT),
MOUSE_PAIR(main_state, RIGHT),
MOUSE_PAIR(main_state, MIDDLE),
MOUSE_PAIR(extra_state, EXTRA_UP),
MOUSE_PAIR(extra_state, EXTRA_DOWN)
);
# define SEND_BUTTONS(_hid) \
_hid->sendButtons( \
MOUSE_PAIR(data[0], LEFT), \
MOUSE_PAIR(data[0], RIGHT), \
MOUSE_PAIR(data[0], MIDDLE), \
MOUSE_PAIR(data[1], EXTRA_UP), \
MOUSE_PAIR(data[1], EXTRA_DOWN) \
);
if (_usb_mouse_abs) {
SEND_BUTTONS(_usb_mouse_abs);
} else if (_usb_mouse_rel) {
SEND_BUTTONS(_usb_mouse_rel);
}
# undef SEND_BUTTONS
# undef MOUSE_PAIR
# endif
return cmdPong();
}
uint8_t cmdMouseMoveEvent(const uint8_t *buffer) { // 4 bytes
# ifdef HID_USB_MOUSE
int x = (int)buffer[0] << 8;
x |= (int)buffer[1];
x = (x + 32768) / 2; // See /kvmd/apps/otg/hid/keyboard.py for details
int y = (int)buffer[2] << 8;
y |= (int)buffer[3];
y = (y + 32768) / 2; // See /kvmd/apps/otg/hid/keyboard.py for details
hid_mouse.sendMove(x, y);
# endif
return cmdPong();
static void _cmdMouseMoveEvent(const uint8_t *data) { // 4 bytes
// See /kvmd/apps/otg/hid/keyboard.py for details
if (_usb_mouse_abs) {
_usb_mouse_abs->sendMove(
(PROTO::merge8_int(data[0], data[1]) + 32768) / 2,
(PROTO::merge8_int(data[2], data[3]) + 32768) / 2
);
}
}
uint8_t cmdMouseWheelEvent(const uint8_t *buffer) { // 2 bytes
# ifdef HID_USB_MOUSE
hid_mouse.sendWheel(buffer[1]); // Y only, X is not supported
# endif
return cmdPong();
static void _cmdMouseRelativeEvent(const uint8_t *data) { // 2 bytes
if (_usb_mouse_rel) {
_usb_mouse_rel->sendRelative(data[0], data[1]);
}
}
uint8_t handleCmdBuffer(const uint8_t *buffer) { // 8 bytes
uint16_t crc = (uint16_t)buffer[6] << 8;
crc |= (uint16_t)buffer[7];
static void _cmdMouseWheelEvent(const uint8_t *data) { // 2 bytes
// Y only, X is not supported
if (_usb_mouse_abs) {
_usb_mouse_abs->sendWheel(data[1]);
} else if (_usb_mouse_rel) {
_usb_mouse_rel->sendWheel(data[1]);
}
}
if (protoCrc16(buffer, 6) == crc) {
# define HANDLE(_handler) { return _handler(buffer + 2); }
switch (buffer[1]) {
case PROTO::CMD::RESET_HID: HANDLE(cmdResetHid);
case PROTO::CMD::KEYBOARD::KEY: HANDLE(cmdKeyEvent);
case PROTO::CMD::MOUSE::BUTTON: HANDLE(cmdMouseButtonEvent);
case PROTO::CMD::MOUSE::MOVE: HANDLE(cmdMouseMoveEvent);
case PROTO::CMD::MOUSE::WHEEL: HANDLE(cmdMouseWheelEvent);
case PROTO::CMD::PING: HANDLE(cmdPong);
static uint8_t _handleRequest(const uint8_t *data) { // 8 bytes
if (PROTO::crc16(data, 6) == PROTO::merge8(data[6], data[7])) {
# define HANDLE(_handler) { _handler(data + 2); return PROTO::PONG::OK; }
switch (data[1]) {
case PROTO::CMD::PING: return PROTO::PONG::OK;
case PROTO::CMD::SET_KEYBOARD: HANDLE(_cmdSetKeyboard);
case PROTO::CMD::SET_MOUSE: HANDLE(_cmdSetMouse);
case PROTO::CMD::CLEAR_HID: HANDLE(_cmdClearHid);
case PROTO::CMD::KEYBOARD::KEY: HANDLE(_cmdKeyEvent);
case PROTO::CMD::MOUSE::BUTTON: HANDLE(_cmdMouseButtonEvent);
case PROTO::CMD::MOUSE::MOVE: HANDLE(_cmdMouseMoveEvent);
case PROTO::CMD::MOUSE::RELATIVE: HANDLE(_cmdMouseRelativeEvent);
case PROTO::CMD::MOUSE::WHEEL: HANDLE(_cmdMouseWheelEvent);
case PROTO::CMD::REPEAT: return 0;
default: return PROTO::RESP::INVALID_ERROR;
}
@ -150,56 +246,7 @@ uint8_t handleCmdBuffer(const uint8_t *buffer) { // 8 bytes
// -----------------------------------------------------------------------------
#ifdef CMD_SPI
volatile uint8_t spi_in[8] = {0};
volatile uint8_t spi_in_index = 0;
volatile uint8_t spi_out[4] = {0};
volatile uint8_t spi_out_index = 0;
bool spiReady() {
return (!spi_out[0] && spi_in_index == 8);
}
void spiWrite(const uint8_t *buffer) {
spi_out[3] = buffer[3];
spi_out[2] = buffer[2];
spi_out[1] = buffer[1];
spi_out[0] = buffer[0]; // Меджик разрешает начать ответ
}
ISR(SPI_STC_vect) {
uint8_t in = SPDR;
if (spi_out[0] && spi_out_index < 4) {
SPDR = spi_out[spi_out_index];
if (!(SPSR & (1 << WCOL))) {
++spi_out_index;
if (spi_out_index == 4) {
spi_out_index = 0;
spi_in_index = 0;
spi_out[0] = 0;
}
}
} else {
static bool receiving = false;
if (!receiving && in == PROTO::MAGIC) {
receiving = true;
}
if (receiving && spi_in_index < 8) {
spi_in[spi_in_index] = in;
++spi_in_index;
}
if (spi_in_index == 8) {
receiving = false;
}
SPDR = 0;
}
}
#endif
// -----------------------------------------------------------------------------
void sendCmdResponse(uint8_t code) {
static void _sendResponse(uint8_t code) {
static uint8_t prev_code = PROTO::RESP::NONE;
if (code == 0) {
code = prev_code; // Repeat the last code
@ -207,51 +254,75 @@ void sendCmdResponse(uint8_t code) {
prev_code = code;
}
uint8_t buffer[4];
buffer[0] = PROTO::MAGIC;
buffer[1] = code;
uint16_t crc = protoCrc16(buffer, 2);
buffer[2] = (uint8_t)(crc >> 8);
buffer[3] = (uint8_t)(crc & 0xFF);
uint8_t response[8] = {0};
response[0] = PROTO::MAGIC;
if (code & PROTO::PONG::OK) {
response[1] = PROTO::PONG::OK;
# ifdef HID_DYNAMIC
if (_reset_required) {
response[1] |= PROTO::PONG::RESET_REQUIRED;
}
response[2] = PROTO::OUTPUTS::DYNAMIC;
# endif
if (_usb_kbd) {
response[1] |= _usb_kbd->getOfflineAs(PROTO::PONG::KEYBOARD_OFFLINE);
response[1] |= _usb_kbd->getLedsAs(PROTO::PONG::CAPS, PROTO::PONG::SCROLL, PROTO::PONG::NUM);
response[2] |= PROTO::OUTPUTS::KEYBOARD::USB;
} else if (_ps2_kbd) {
response[1] |= _ps2_kbd->getOfflineAs(PROTO::PONG::KEYBOARD_OFFLINE);
response[1] |= _ps2_kbd->getLedsAs(PROTO::PONG::CAPS, PROTO::PONG::SCROLL, PROTO::PONG::NUM);
response[2] |= PROTO::OUTPUTS::KEYBOARD::PS2;
}
if (_usb_mouse_abs) {
response[1] |= _usb_mouse_abs->getOfflineAs(PROTO::PONG::MOUSE_OFFLINE);
response[2] |= PROTO::OUTPUTS::MOUSE::USB_ABS;
} else if (_usb_mouse_rel) {
response[1] |= _usb_mouse_rel->getOfflineAs(PROTO::PONG::MOUSE_OFFLINE);
response[2] |= PROTO::OUTPUTS::MOUSE::USB_REL;
} // TODO: ps2
# ifdef HID_WITH_USB
response[3] |= PROTO::FEATURES::HAS_USB;
# endif
# ifdef HID_WITH_PS2
response[3] |= PROTO::FEATURES::HAS_PS2;
# endif
} else {
response[1] = code;
}
PROTO::split16(PROTO::crc16(response, 6), &response[6], &response[7]);
# ifdef CMD_SERIAL
CMD_SERIAL.write(buffer, 4);
CMD_SERIAL.write(response, 8);
# elif defined(CMD_SPI)
spiWrite(buffer);
spiWrite(response);
# endif
}
void setup() {
hid_kbd.begin();
# ifdef HID_USB_MOUSE
hid_mouse.begin();
# endif
int main() {
init(); // Embedded
initVariant(); // Arduino
_initOutputs();
# ifdef CMD_SERIAL
CMD_SERIAL.begin(CMD_SERIAL_SPEED);
# elif defined(CMD_SPI)
pinMode(MISO, OUTPUT);
SPCR = (1 << SPE) | (1 << SPIE); // Slave, SPI En, IRQ En
# endif
}
void loop() {
# ifdef CMD_SERIAL
unsigned long last = micros();
uint8_t buffer[8];
uint8_t index = 0;
# elif defined(CMD_SPI)
spiBegin();
# endif
while (true) {
# ifdef HID_PS2_KBD
hid_kbd.periodic();
# ifdef HID_WITH_PS2
if (_ps2_kbd) {
_ps2_kbd->periodic();
}
# endif
# ifdef CMD_SERIAL
if (CMD_SERIAL.available() > 0) {
buffer[index] = (uint8_t)CMD_SERIAL.read();
if (index == 7) {
sendCmdResponse(handleCmdBuffer(buffer));
_sendResponse(_handleRequest(buffer));
index = 0;
} else {
last = micros();
@ -263,14 +334,15 @@ void loop() {
(now >= last && now - last > CMD_SERIAL_TIMEOUT)
|| (now < last && ((unsigned long)-1) - last + now > CMD_SERIAL_TIMEOUT)
) {
sendCmdResponse(PROTO::RESP::TIMEOUT_ERROR);
_sendResponse(PROTO::RESP::TIMEOUT_ERROR);
index = 0;
}
}
# elif defined(CMD_SPI)
if (spiReady()) {
sendCmdResponse(handleCmdBuffer(spi_in));
_sendResponse(_handleRequest(spiGet()));
}
# endif
}
return 0;
}

View File

@ -25,7 +25,6 @@
namespace PROTO {
const uint8_t MAGIC = 0x33;
const uint16_t CRC_POLINOM = 0xA001;
namespace RESP { // Plain responses
// const uint8_t OK = 0x20; // Legacy
@ -36,27 +35,51 @@ namespace PROTO {
};
namespace PONG { // Complex response
const uint8_t PREFIX = 0x80;
const uint8_t OK = 0x80;
const uint8_t CAPS = 0b00000001;
const uint8_t SCROLL = 0b00000010;
const uint8_t NUM = 0b00000100;
const uint8_t KEYBOARD_OFFLINE = 0b00001000;
const uint8_t MOUSE_OFFLINE = 0b00010000;
const uint8_t RESET_REQUIRED = 0b01000000;
};
namespace OUTPUTS { // Complex request/responce flags
const uint8_t DYNAMIC = 0b10000000;
namespace KEYBOARD {
const uint8_t MASK = 0b00000111;
const uint8_t USB = 0b00000001;
const uint8_t PS2 = 0b00000011;
};
namespace MOUSE {
const uint8_t MASK = 0b00111000;
const uint8_t USB_ABS = 0b00001000;
const uint8_t USB_REL = 0b00010000;
const uint8_t PS2 = 0b00011000;
};
};
namespace FEATURES {
const uint8_t HAS_USB = 0b00000001;
const uint8_t HAS_PS2 = 0b00000010;
}
namespace CMD {
const uint8_t PING = 0x01;
const uint8_t REPEAT = 0x02;
const uint8_t RESET_HID = 0x10;
const uint8_t PING = 0x01;
const uint8_t REPEAT = 0x02;
const uint8_t SET_KEYBOARD = 0x03;
const uint8_t SET_MOUSE = 0x04;
const uint8_t CLEAR_HID = 0x10;
namespace KEYBOARD {
const uint8_t KEY = 0x11;
};
namespace MOUSE {
const uint8_t MOVE = 0x12;
const uint8_t BUTTON = 0x13;
const uint8_t WHEEL = 0x14;
const uint8_t MOVE = 0x12;
const uint8_t BUTTON = 0x13;
const uint8_t WHEEL = 0x14;
const uint8_t RELATIVE = 0x15;
namespace LEFT {
const uint8_t SELECT = 0b10000000;
const uint8_t STATE = 0b00001000;
@ -79,22 +102,35 @@ namespace PROTO {
};
};
};
};
uint16_t crc16(const uint8_t *buffer, unsigned length) {
const uint16_t polinom = 0xA001;
uint16_t crc = 0xFFFF;
uint16_t protoCrc16(const uint8_t *buffer, unsigned length) {
uint16_t crc = 0xFFFF;
for (unsigned byte_count = 0; byte_count < length; ++byte_count) {
crc = crc ^ buffer[byte_count];
for (unsigned bit_count = 0; bit_count < 8; ++bit_count) {
if ((crc & 0x0001) == 0) {
crc = crc >> 1;
} else {
crc = crc >> 1;
crc = crc ^ PROTO::CRC_POLINOM;
for (unsigned byte_count = 0; byte_count < length; ++byte_count) {
crc = crc ^ buffer[byte_count];
for (unsigned bit_count = 0; bit_count < 8; ++bit_count) {
if ((crc & 0x0001) == 0) {
crc = crc >> 1;
} else {
crc = crc >> 1;
crc = crc ^ polinom;
}
}
}
return crc;
}
return crc;
}
inline int merge8_int(uint8_t from_a, uint8_t from_b) {
return (((int)from_a << 8) | (int)from_b);
}
inline uint16_t merge8(uint8_t from_a, uint8_t from_b) {
return (((uint16_t)from_a << 8) | (uint16_t)from_b);
}
inline void split16(uint16_t from, uint8_t *to_a, uint8_t *to_b) {
*to_a = (uint8_t)(from >> 8);
*to_b = (uint8_t)(from & 0xFF);
}
};

View File

@ -27,24 +27,20 @@
#include "keymap.h"
// #define PS2_KBD_CLOCK_PIN 7
// #define PS2_KBD_DATA_PIN 5
// #define HID_PS2_KBD_CLOCK_PIN 7
// #define HID_PS2_KBD_DATA_PIN 5
class Ps2HidKeyboard {
class Ps2Keyboard {
// https://wiki.osdev.org/PS/2_Keyboard
public:
Ps2HidKeyboard() : _dev(PS2_KBD_CLOCK_PIN, PS2_KBD_DATA_PIN) {}
Ps2Keyboard() : _dev(HID_PS2_KBD_CLOCK_PIN, HID_PS2_KBD_DATA_PIN) {}
void begin() {
_dev.keyboard_init();
}
bool isOnline() {
return true;
}
void periodic() {
_dev.keyboard_handle(&_leds);
}
@ -57,7 +53,7 @@ class Ps2HidKeyboard {
if (ps2_type != PS2_KEY_TYPE_UNKNOWN) {
// Не отправлялась часть нажатий. Когда clock на нуле, комп не принимает ничего от клавы.
// Этот костыль понижает процент пропущенных нажатий.
while (digitalRead(PS2_KBD_CLOCK_PIN) == 0) {};
while (digitalRead(HID_PS2_KBD_CLOCK_PIN) == 0) {};
if (state) {
switch (ps2_type) {
case PS2_KEY_TYPE_REG: _dev.keyboard_press(ps2_code); break;
@ -78,6 +74,10 @@ class Ps2HidKeyboard {
}
}
uint8_t getOfflineAs(uint8_t offline) {
return 0;
}
uint8_t getLedsAs(uint8_t caps, uint8_t scroll, uint8_t num) {
uint8_t result = 0;

83
hid/src/spi.cpp Normal file
View File

@ -0,0 +1,83 @@
/*****************************************************************************
# #
# KVMD - The main Pi-KVM daemon. #
# #
# Copyright (C) 2018 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
*****************************************************************************/
#include "spi.h"
#include <Arduino.h>
#include <SPI.h>
static volatile uint8_t _spi_in[8] = {0};
static volatile uint8_t _spi_in_index = 0;
static volatile uint8_t _spi_out[8] = {0};
static volatile uint8_t _spi_out_index = 0;
void spiBegin() {
pinMode(MISO, OUTPUT);
SPCR = (1 << SPE) | (1 << SPIE); // Slave, SPI En, IRQ En
}
bool spiReady() {
return (!_spi_out[0] && _spi_in_index == 8);
}
const uint8_t *spiGet() {
return (const uint8_t *)_spi_in;
}
void spiWrite(const uint8_t *data) {
// Меджик в нулевом байте разрешает начать ответ
for (int index = 7; index >= 0; --index) {
_spi_out[index] = data[index];
}
}
ISR(SPI_STC_vect) {
uint8_t in = SPDR;
if (_spi_out[0] && _spi_out_index < 8) {
SPDR = _spi_out[_spi_out_index];
if (!(SPSR & (1 << WCOL))) {
++_spi_out_index;
if (_spi_out_index == 8) {
_spi_out_index = 0;
_spi_in_index = 0;
_spi_out[0] = 0;
}
}
} else {
static bool receiving = false;
if (!receiving && in != 0) {
receiving = true;
}
if (receiving && _spi_in_index < 8) {
_spi_in[_spi_in_index] = in;
++_spi_in_index;
}
if (_spi_in_index == 8) {
receiving = false;
}
SPDR = 0;
}
}

31
hid/src/spi.h Normal file
View File

@ -0,0 +1,31 @@
/*****************************************************************************
# #
# KVMD - The main Pi-KVM daemon. #
# #
# Copyright (C) 2018 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
*****************************************************************************/
#pragma once
#include <Arduino.h>
void spiBegin();
bool spiReady();
const uint8_t *spiGet();
void spiWrite(const uint8_t *data);

View File

@ -29,113 +29,158 @@
// -----------------------------------------------------------------------------
#ifdef CHECK_ENDPOINT
static bool _checkEndpoint(uint8_t ep) {
// https://github.com/arduino/ArduinoCore-avr/blob/2f67c916f6ab6193c404eebe22efe901e0f9542d/cores/arduino/USBCore.cpp#L249
// https://sourceforge.net/p/arduinomidilib/svn/41/tree/branch/3.1/Teensy/teensy_core/usb_midi/usb_api.cpp#l103
uint8_t intr_state = SREG;
cli();
UENUM = ep & 7;
bool rw_allowed = UEINTX & (1 << RWAL);
SREG = intr_state;
return rw_allowed;
}
# define CHECK_HID_EP { if (!isOnline()) return; }
#ifdef HID_USB_CHECK_ENDPOINT
// https://github.com/arduino/ArduinoCore-avr/blob/2f67c916f6ab6193c404eebe22efe901e0f9542d/cores/arduino/USBCore.cpp#L249
// https://sourceforge.net/p/arduinomidilib/svn/41/tree/branch/3.1/Teensy/teensy_core/usb_midi/usb_api.cpp#l103
# define CLS_GET_OFFLINE_AS(_hid) \
uint8_t getOfflineAs(uint8_t offline) { \
uint8_t ep = _hid.getPluggedEndpoint(); \
uint8_t intr_state = SREG; \
cli(); \
UENUM = ep & 7; \
bool rw_allowed = UEINTX & (1 << RWAL); \
SREG = intr_state; \
if (rw_allowed) { \
return 0; \
} \
return offline; \
}
# define CHECK_HID_EP { if (getOfflineAs(1)) return; }
#else
# define CLS_GET_OFFLINE_AS(_hid) \
uint8_t getOfflineAs(uint8_t offline) { \
return 0; \
}
# define CHECK_HID_EP
#endif
class UsbHidKeyboard {
class UsbKeyboard {
public:
UsbHidKeyboard() {}
UsbKeyboard() {}
void begin() {
BootKeyboard.begin();
_kbd.begin();
}
bool isOnline() {
# ifdef CHECK_ENDPOINT
return _checkEndpoint(BootKeyboard.getPluggedEndpoint());
# else
return true;
# endif
}
void reset() {
BootKeyboard.releaseAll();
void clear() {
_kbd.releaseAll();
}
void sendKey(uint8_t code, bool state) {
CHECK_HID_EP;
KeyboardKeycode usb_code = keymapUsb(code);
if (usb_code != KEY_ERROR_UNDEFINED) {
if (state) BootKeyboard.press(usb_code);
else BootKeyboard.release(usb_code);
if (state) _kbd.press(usb_code);
else _kbd.release(usb_code);
}
}
uint8_t getLedsAs(uint8_t caps, uint8_t scroll, uint8_t num) {
uint8_t leds = BootKeyboard.getLeds();
uint8_t result = 0;
CLS_GET_OFFLINE_AS(_kbd)
uint8_t getLedsAs(uint8_t caps, uint8_t scroll, uint8_t num) {
uint8_t leds = _kbd.getLeds();
uint8_t result = 0;
if (leds & LED_CAPS_LOCK) result |= caps;
if (leds & LED_SCROLL_LOCK) result |= scroll;
if (leds & LED_NUM_LOCK) result |= num;
return result;
}
private:
BootKeyboard_ _kbd;
};
class UsbHidMouse {
#define CLS_SEND_BUTTONS \
void sendButtons( \
bool left_select, bool left_state, \
bool right_select, bool right_state, \
bool middle_select, bool middle_state, \
bool up_select, bool up_state, \
bool down_select, bool down_state \
) { \
if (left_select) _sendButton(MOUSE_LEFT, left_state); \
if (right_select) _sendButton(MOUSE_RIGHT, right_state); \
if (middle_select) _sendButton(MOUSE_MIDDLE, middle_state); \
if (up_select) _sendButton(MOUSE_PREV, up_state); \
if (down_select) _sendButton(MOUSE_NEXT, down_state); \
}
class UsbMouseAbsolute {
public:
UsbHidMouse() {}
UsbMouseAbsolute() {}
void begin() {
SingleAbsoluteMouse.begin();
_mouse.begin();
}
bool isOnline() {
# ifdef CHECK_ENDPOINT
return _checkEndpoint(SingleAbsoluteMouse.getPluggedEndpoint());
# else
return true;
# endif
void clear() {
_mouse.releaseAll();
}
void reset() {
SingleAbsoluteMouse.releaseAll();
}
void sendButtons(
bool left_select, bool left_state,
bool right_select, bool right_state,
bool middle_select, bool middle_state,
bool up_select, bool up_state,
bool down_select, bool down_state
) {
if (left_select) _sendButton(MOUSE_LEFT, left_state);
if (right_select) _sendButton(MOUSE_RIGHT, right_state);
if (middle_select) _sendButton(MOUSE_MIDDLE, middle_state);
if (up_select) _sendButton(MOUSE_PREV, up_state);
if (down_select) _sendButton(MOUSE_NEXT, down_state);
}
CLS_SEND_BUTTONS
void sendMove(int x, int y) {
CHECK_HID_EP;
SingleAbsoluteMouse.moveTo(x, y);
_mouse.moveTo(x, y);
}
void sendWheel(int delta_y) {
CHECK_HID_EP;
// delta_x is not supported by hid-project now
SingleAbsoluteMouse.move(0, 0, delta_y);
CHECK_HID_EP;
_mouse.move(0, 0, delta_y);
}
CLS_GET_OFFLINE_AS(_mouse)
private:
SingleAbsoluteMouse_ _mouse;
void _sendButton(uint8_t button, bool state) {
CHECK_HID_EP;
if (state) SingleAbsoluteMouse.press(button);
else SingleAbsoluteMouse.release(button);
if (state) _mouse.press(button);
else _mouse.release(button);
}
};
class UsbMouseRelative {
public:
UsbMouseRelative() {}
void begin() {
_mouse.begin();
}
void clear() {
_mouse.releaseAll();
}
CLS_SEND_BUTTONS
void sendRelative(int x, int y) {
CHECK_HID_EP;
_mouse.move(x, y, 0);
}
void sendWheel(int delta_y) {
// delta_x is not supported by hid-project now
CHECK_HID_EP;
_mouse.move(0, 0, delta_y);
}
CLS_GET_OFFLINE_AS(_mouse)
private:
BootMouse_ _mouse;
void _sendButton(uint8_t button, bool state) {
CHECK_HID_EP;
if (state) _mouse.press(button);
else _mouse.release(button);
}
};
#undef CLS_SEND_BUTTONS
#undef CLS_GET_OFFLINE_AS
#undef CHECK_HID_EP

View File

@ -25,7 +25,9 @@ import queue
from typing import Tuple
from typing import Dict
from typing import Type
from typing import TypeVar
from typing import Generic
from typing import Optional
from . import aiotools
@ -71,14 +73,19 @@ class AioProcessNotifier:
# =====
class AioSharedFlags:
_SharedFlagT = TypeVar("_SharedFlagT", int, bool)
class AioSharedFlags(Generic[_SharedFlagT]):
def __init__(
self,
initial: Dict[str, bool],
initial: Dict[str, _SharedFlagT],
notifier: AioProcessNotifier,
type: Type[_SharedFlagT]=bool, # pylint: disable=redefined-builtin
) -> None:
self.__notifier = notifier
self.__type: Type[_SharedFlagT] = type
self.__flags = {
key: multiprocessing.RawValue("i", int(value)) # type: ignore
@ -87,7 +94,7 @@ class AioSharedFlags:
self.__lock = multiprocessing.Lock()
def update(self, **kwargs: bool) -> None:
def update(self, **kwargs: _SharedFlagT) -> None:
changed = False
with self.__lock:
for (key, value) in kwargs.items():
@ -98,12 +105,12 @@ class AioSharedFlags:
if changed:
self.__notifier.notify()
async def get(self) -> Dict[str, bool]:
async def get(self) -> Dict[str, _SharedFlagT]:
return (await aiotools.run_async(self.__inner_get))
def __inner_get(self) -> Dict[str, bool]:
def __inner_get(self) -> Dict[str, _SharedFlagT]:
with self.__lock:
return {
key: bool(shared.value)
key: self.__type(shared.value)
for (key, shared) in self.__flags.items()
}

View File

@ -84,11 +84,12 @@ from ..validators.net import valid_ports_list
from ..validators.net import valid_mac
from ..validators.net import valid_ssl_ciphers
from ..validators.hid import valid_hid_key
from ..validators.hid import valid_hid_mouse_move
from ..validators.kvm import valid_stream_quality
from ..validators.kvm import valid_stream_fps
from ..validators.kvm import valid_stream_resolution
from ..validators.kvm import valid_hid_key
from ..validators.kvm import valid_hid_mouse_move
from ..validators.kvm import valid_ugpio_driver
from ..validators.kvm import valid_ugpio_channel
from ..validators.kvm import valid_ugpio_mode

View File

@ -38,10 +38,12 @@ from ....validators import raise_error
from ....validators.basic import valid_bool
from ....validators.basic import valid_int_f0
from ....validators.os import valid_printable_filename
from ....validators.kvm import valid_hid_key
from ....validators.kvm import valid_hid_mouse_move
from ....validators.kvm import valid_hid_mouse_button
from ....validators.kvm import valid_hid_mouse_delta
from ....validators.hid import valid_hid_keyboard_output
from ....validators.hid import valid_hid_mouse_output
from ....validators.hid import valid_hid_key
from ....validators.hid import valid_hid_mouse_move
from ....validators.hid import valid_hid_mouse_button
from ....validators.hid import valid_hid_mouse_delta
from ....keyboard.keysym import build_symmap
from ....keyboard.printer import text_to_web_keys
@ -67,6 +69,16 @@ class HidApi:
async def __state_handler(self, _: Request) -> Response:
return make_json_response(await self.__hid.get_state())
@exposed_http("POST", "/hid/keyboard/set_params")
async def __keyboard_set_params_handler(self, request: Request) -> Response:
self.__hid.set_keyboard_output(valid_hid_keyboard_output(request.query.get("output")))
return make_json_response()
@exposed_http("POST", "/hid/mouse/set_params")
async def __mouse_set_params_handler(self, request: Request) -> Response:
self.__hid.set_mouse_output(valid_hid_mouse_output(request.query.get("output")))
return make_json_response()
@exposed_http("POST", "/hid/reset")
async def __reset_handler(self, _: Request) -> Response:
await self.__hid.reset()

View File

@ -57,14 +57,22 @@ class BaseHid(BasePlugin):
raise NotImplementedError
def send_mouse_move_event(self, to_x: int, to_y: int) -> None:
raise NotImplementedError
_ = to_x
_ = to_y
def send_mouse_relative_event(self, delta_x: int, delta_y: int) -> None:
raise NotImplementedError
_ = delta_x
_ = delta_y
def send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None:
raise NotImplementedError
def set_keyboard_output(self, output: str) -> None:
_ = output
def set_mouse_output(self, output: str) -> None:
_ = output
def clear_events(self) -> None:
raise NotImplementedError

View File

@ -22,10 +22,8 @@
import os
import multiprocessing
import dataclasses
import contextlib
import queue
import struct
import time
from typing import Tuple
@ -37,8 +35,6 @@ from typing import AsyncGenerator
from ....logging import get_logger
from ....keyboard.mappings import KEYMAP
from .... import tools
from .... import aiotools
from .... import aiomulti
@ -56,6 +52,22 @@ from .. import BaseHid
from .gpio import Gpio
from .proto import REQUEST_PING
from .proto import REQUEST_REPEAT
from .proto import RESPONSE_LEGACY_OK
from .proto import BaseEvent
from .proto import SetKeyboardOutputEvent
from .proto import SetMouseOutputEvent
from .proto import ClearEvent
from .proto import KeyEvent
from .proto import MouseButtonEvent
from .proto import MouseMoveEvent
from .proto import MouseRelativeEvent
from .proto import MouseWheelEvent
from .proto import get_active_keyboard
from .proto import get_active_mouse
from .proto import check_response
# =====
class _RequestError(Exception):
@ -72,84 +84,6 @@ class _TempRequestError(_RequestError):
pass
# =====
class _BaseEvent:
def make_command(self) -> bytes:
raise NotImplementedError
class _ClearEvent(_BaseEvent):
def make_command(self) -> bytes:
return b"\x10\x00\x00\x00\x00"
@dataclasses.dataclass(frozen=True)
class _KeyEvent(_BaseEvent):
name: str
state: bool
def __post_init__(self) -> None:
assert self.name in KEYMAP
def make_command(self) -> bytes:
code = KEYMAP[self.name].mcu.code
return struct.pack(">BBBxx", 0x11, code, int(self.state))
@dataclasses.dataclass(frozen=True)
class _MouseButtonEvent(_BaseEvent):
name: str
state: bool
def __post_init__(self) -> None:
assert self.name in ["left", "right", "middle", "up", "down"]
def make_command(self) -> bytes:
(code, state_pressed, is_main) = {
"left": (0b10000000, 0b00001000, True),
"right": (0b01000000, 0b00000100, True),
"middle": (0b00100000, 0b00000010, True),
"up": (0b10000000, 0b00001000, False), # Back
"down": (0b01000000, 0b00000100, False), # Forward
}[self.name]
if self.state:
code |= state_pressed
if is_main:
main_code = code
extra_code = 0
else:
main_code = 0
extra_code = code
return struct.pack(">BBBxx", 0x13, main_code, extra_code)
@dataclasses.dataclass(frozen=True)
class _MouseMoveEvent(_BaseEvent):
to_x: int
to_y: int
def __post_init__(self) -> None:
assert -32768 <= self.to_x <= 32767
assert -32768 <= self.to_y <= 32767
def make_command(self) -> bytes:
return struct.pack(">Bhh", 0x12, self.to_x, self.to_y)
@dataclasses.dataclass(frozen=True)
class _MouseWheelEvent(_BaseEvent):
delta_x: int
delta_y: int
def __post_init__(self) -> None:
assert -127 <= self.delta_x <= 127
assert -127 <= self.delta_y <= 127
def make_command(self) -> bytes:
# Горизонтальная прокрутка пока не поддерживается
return struct.pack(">Bxbxx", 0x14, self.delta_y)
# =====
class BasePhyConnection:
def send(self, request: bytes) -> bytes:
@ -192,16 +126,13 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
self.__phy = phy
self.__gpio = Gpio(reset_pin, reset_inverted, reset_delay)
self.__events_queue: "multiprocessing.Queue[_BaseEvent]" = multiprocessing.Queue()
self.__events_queue: "multiprocessing.Queue[BaseEvent]" = multiprocessing.Queue()
self.__notifier = aiomulti.AioProcessNotifier()
self.__state_flags = aiomulti.AioSharedFlags({
"keyboard_online": True,
"mouse_online": True,
"caps": False,
"scroll": False,
"num": False,
}, self.__notifier)
"online": 0,
"status": 0,
}, self.__notifier, type=int)
self.__stop_event = multiprocessing.Event()
@ -226,19 +157,51 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
async def get_state(self) -> Dict:
state = await self.__state_flags.get()
online = bool(state["online"])
pong = (state["status"] >> 16) & 0xFF
outputs = (state["status"] >> 8) & 0xFF
features = state["status"] & 0xFF
absolute = True
active_mouse = get_active_mouse(outputs)
if online and active_mouse in ["usb_rel", "ps2"]:
absolute = False
keyboard_outputs: Dict = {"available": {}, "active": ""}
mouse_outputs: Dict = {"available": {}, "active": ""}
if outputs & 0b10000000: # Dynamic
if features & 0b00000001: # USB
keyboard_outputs["available"]["usb"] = {"name": "USB"}
mouse_outputs["available"]["usb"] = {"name": "USB", "absolute": True}
mouse_outputs["available"]["usb_rel"] = {"name": "USB Relative", "absolute": False}
if features & 0b00000010: # PS/2
keyboard_outputs["available"]["ps2"] = {"name": "PS/2"}
mouse_outputs["available"]["ps2"] = {"name": "PS/2"}
active_keyboard = get_active_keyboard(outputs)
if active_keyboard in keyboard_outputs["available"]:
keyboard_outputs["active"] = active_keyboard
if active_mouse in mouse_outputs["available"]:
mouse_outputs["active"] = active_mouse
return {
"online": (state["keyboard_online"] and state["mouse_online"]),
"online": online,
"keyboard": {
"online": state["keyboard_online"],
"online": (online and not (pong & 0b00001000)),
"leds": {
"caps": state["caps"],
"scroll": state["scroll"],
"num": state["num"],
"caps": bool(pong & 0b00000001),
"scroll": bool(pong & 0b00000010),
"num": bool(pong & 0b00000100),
},
"outputs": keyboard_outputs,
},
"mouse": {
"online": state["mouse_online"],
"absolute": True,
"online": (online and not (pong & 0b00010000)),
"absolute": absolute,
"outputs": mouse_outputs,
},
}
@ -268,7 +231,7 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
get_logger().info("Clearing HID events ...")
try:
with self.__phy.connected() as conn:
self.__process_command(conn, b"\x10\x00\x00\x00\x00")
self.__process_request(conn, ClearEvent().make_request())
except Exception:
logger.exception("Can't clear HID events")
finally:
@ -278,30 +241,36 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
def send_key_events(self, keys: Iterable[Tuple[str, bool]]) -> None:
for (key, state) in keys:
self.__queue_event(_KeyEvent(key, state))
self.__queue_event(KeyEvent(key, state))
def send_mouse_button_event(self, button: str, state: bool) -> None:
self.__queue_event(_MouseButtonEvent(button, state))
self.__queue_event(MouseButtonEvent(button, state))
def send_mouse_move_event(self, to_x: int, to_y: int) -> None:
self.__queue_event(_MouseMoveEvent(to_x, to_y))
self.__queue_event(MouseMoveEvent(to_x, to_y))
def send_mouse_relative_event(self, delta_x: int, delta_y: int) -> None:
_ = delta_x # No relative events yet
_ = delta_y
self.__queue_event(MouseRelativeEvent(delta_x, delta_y))
def send_mouse_wheel_event(self, delta_x: int, delta_y: int) -> None:
self.__queue_event(_MouseWheelEvent(delta_x, delta_y))
self.__queue_event(MouseWheelEvent(delta_x, delta_y))
def set_keyboard_output(self, output: str) -> None:
self.__queue_event(SetKeyboardOutputEvent(output), clear=True)
def set_mouse_output(self, output: str) -> None:
self.__queue_event(SetMouseOutputEvent(output), clear=True)
def clear_events(self) -> None:
# FIXME: Если очистка производится со стороны процесса хида, то возможна гонка между
# очисткой и добавлением события _ClearEvent. Неприятно, но не смертельно.
# Починить блокировкой после перехода на асинхронные очереди.
tools.clear_queue(self.__events_queue)
self.__queue_event(_ClearEvent())
self.__queue_event(ClearEvent(), clear=True)
def __queue_event(self, event: _BaseEvent) -> None:
def __queue_event(self, event: BaseEvent, clear: bool=False) -> None:
if not self.__stop_event.is_set():
if clear:
# FIXME: Если очистка производится со стороны процесса хида, то возможна гонка между
# очисткой и добавлением нового события. Неприятно, но не смертельно.
# Починить блокировкой после перехода на асинхронные очереди.
tools.clear_queue(self.__events_queue)
self.__events_queue.put_nowait(event)
def run(self) -> None: # pylint: disable=too-many-branches
@ -319,9 +288,9 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
try:
event = self.__events_queue.get(timeout=0.1)
except queue.Empty:
self.__process_command(conn, b"\x01\x00\x00\x00\x00") # Ping
self.__process_request(conn, REQUEST_PING)
else:
if not self.__process_command(conn, event.make_command()):
if not self.__process_request(conn, event.make_request()):
self.clear_events()
else:
logger.error("Missing HID device")
@ -331,9 +300,6 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
logger.exception("Unexpected HID error")
time.sleep(1)
def __process_command(self, conn: BasePhyConnection, command: bytes) -> bool:
return self.__process_request(conn, self.__make_request(command))
def __process_request(self, conn: BasePhyConnection, request: bytes) -> bool: # pylint: disable=too-many-branches
logger = get_logger()
error_messages: List[str] = []
@ -344,15 +310,14 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
error_retval = False
while common_retries and read_retries:
response = self.__send_request(conn, request)
response = (RESPONSE_LEGACY_OK if self.__noop else conn.send(request))
try:
if len(response) < 4:
read_retries -= 1
raise _TempRequestError(f"No response from HID: request={request!r}")
assert len(response) == 4, response
if self.__make_crc16(response[-4:-2]) != struct.unpack(">H", response[-2:])[0]:
request = self.__make_request(b"\x02\x00\x00\x00\x00") # Repeat an answer
if not check_response(response):
request = REQUEST_REPEAT
raise _TempRequestError("Invalid response CRC; requesting response again ...")
code = response[1]
@ -368,9 +333,9 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
self.__set_state_online(True)
return True
elif code & 0x80: # Pong/Done with state
self.__set_state_code(code)
self.__set_state_pong(response)
return True
raise _TempRequestError(f"Invalid response from HID: request={request!r}; code=0x{code:02X}")
raise _TempRequestError(f"Invalid response from HID: request={request!r}, response=0x{response!r}")
except _RequestError as err:
common_retries -= 1
@ -401,42 +366,10 @@ class BaseMcuHid(BaseHid, multiprocessing.Process): # pylint: disable=too-many-
return error_retval
def __set_state_online(self, online: bool) -> None:
self.__state_flags.update(
keyboard_online=online,
mouse_online=online,
)
self.__state_flags.update(online=int(online))
def __set_state_code(self, code: int) -> None:
self.__state_flags.update(
keyboard_online=(not (code & 0b00001000)),
mouse_online=(not (code & 0b00010000)),
caps=bool(code & 0b00000001),
scroll=bool(code & 0b00000010),
num=bool(code & 0b00000100),
)
def __send_request(self, conn: BasePhyConnection, request: bytes) -> bytes:
if not self.__noop:
response = conn.send(request)
else:
response = b"\x33\x20" # Magic + OK
response += struct.pack(">H", self.__make_crc16(response))
return response
def __make_request(self, command: bytes) -> bytes:
request = b"\x33" + command
request += struct.pack(">H", self.__make_crc16(request))
assert len(request) == 8, (request, command)
return request
def __make_crc16(self, data: bytes) -> int:
crc = 0xFFFF
for byte in data:
crc = crc ^ byte
for _ in range(8):
if crc & 0x0001 == 0:
crc = crc >> 1
else:
crc = crc >> 1
crc = crc ^ 0xA001
return crc
def __set_state_pong(self, response: bytes) -> None:
status = response[1] << 16
if len(response) > 4:
status |= (response[2] << 8) | response[3]
self.__state_flags.update(online=1, status=status)

View File

@ -0,0 +1,202 @@
# ========================================================================== #
# #
# KVMD - The main Pi-KVM daemon. #
# #
# Copyright (C) 2018 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
# ========================================================================== #
import dataclasses
import struct
from ....keyboard.mappings import KEYMAP
from .... import tools
# =====
class BaseEvent:
def make_request(self) -> bytes:
raise NotImplementedError
# =====
_KEYBOARD_NAMES_TO_CODES = {
"usb": 0b00000001,
"ps2": 0b00000011,
}
_KEYBOARD_CODES_TO_NAMES = tools.swapped_kvs(_KEYBOARD_NAMES_TO_CODES)
def get_active_keyboard(outputs: int) -> str:
return _KEYBOARD_CODES_TO_NAMES.get(outputs & 0b00000111, "")
@dataclasses.dataclass(frozen=True)
class SetKeyboardOutputEvent(BaseEvent):
keyboard: str
def __post_init__(self) -> None:
assert not self.keyboard or self.keyboard in _KEYBOARD_NAMES_TO_CODES
def make_request(self) -> bytes:
code = _KEYBOARD_NAMES_TO_CODES.get(self.keyboard, 0)
return _make_request(struct.pack(">BBxxx", 0x03, code))
# =====
_MOUSE_NAMES_TO_CODES = {
"usb": 0b00001000,
"usb_rel": 0b00010000,
"ps2": 0b00011000,
}
_MOUSE_CODES_TO_NAMES = tools.swapped_kvs(_MOUSE_NAMES_TO_CODES)
def get_active_mouse(outputs: int) -> str:
return _MOUSE_CODES_TO_NAMES.get(outputs & 0b00111000, "")
@dataclasses.dataclass(frozen=True)
class SetMouseOutputEvent(BaseEvent):
mouse: str
def __post_init__(self) -> None:
assert not self.mouse or self.mouse in _MOUSE_NAMES_TO_CODES
def make_request(self) -> bytes:
return _make_request(struct.pack(">BBxxx", 0x04, _MOUSE_NAMES_TO_CODES.get(self.mouse, 0)))
# =====
class ClearEvent(BaseEvent):
def make_request(self) -> bytes:
return _make_request(b"\x10\x00\x00\x00\x00")
@dataclasses.dataclass(frozen=True)
class KeyEvent(BaseEvent):
name: str
state: bool
def __post_init__(self) -> None:
assert self.name in KEYMAP
def make_request(self) -> bytes:
code = KEYMAP[self.name].mcu.code
return _make_request(struct.pack(">BBBxx", 0x11, code, int(self.state)))
@dataclasses.dataclass(frozen=True)
class MouseButtonEvent(BaseEvent):
name: str
state: bool
def __post_init__(self) -> None:
assert self.name in ["left", "right", "middle", "up", "down"]
def make_request(self) -> bytes:
(code, state_pressed, is_main) = {
"left": (0b10000000, 0b00001000, True),
"right": (0b01000000, 0b00000100, True),
"middle": (0b00100000, 0b00000010, True),
"up": (0b10000000, 0b00001000, False), # Back
"down": (0b01000000, 0b00000100, False), # Forward
}[self.name]
if self.state:
code |= state_pressed
if is_main:
main_code = code
extra_code = 0
else:
main_code = 0
extra_code = code
return _make_request(struct.pack(">BBBxx", 0x13, main_code, extra_code))
@dataclasses.dataclass(frozen=True)
class MouseMoveEvent(BaseEvent):
to_x: int
to_y: int
def __post_init__(self) -> None:
assert -32768 <= self.to_x <= 32767
assert -32768 <= self.to_y <= 32767
def make_request(self) -> bytes:
return _make_request(struct.pack(">Bhh", 0x12, self.to_x, self.to_y))
@dataclasses.dataclass(frozen=True)
class MouseRelativeEvent(BaseEvent):
delta_x: int
delta_y: int
def __post_init__(self) -> None:
assert -127 <= self.delta_x <= 127
assert -127 <= self.delta_y <= 127
def make_request(self) -> bytes:
return _make_request(struct.pack(">Bbbxx", 0x15, self.delta_x, self.delta_y))
@dataclasses.dataclass(frozen=True)
class MouseWheelEvent(BaseEvent):
delta_x: int
delta_y: int
def __post_init__(self) -> None:
assert -127 <= self.delta_x <= 127
assert -127 <= self.delta_y <= 127
def make_request(self) -> bytes:
# Горизонтальная прокрутка пока не поддерживается
return _make_request(struct.pack(">Bxbxx", 0x14, self.delta_y))
# =====
def check_response(response: bytes) -> bool:
assert len(response) in (4, 8), response
return (_make_crc16(response[:-2]) == struct.unpack(">H", response[-2:])[0])
def _make_request(command: bytes) -> bytes:
assert len(command) == 5, command
request = b"\x33" + command
request += struct.pack(">H", _make_crc16(request))
assert len(request) == 8, request
return request
def _make_crc16(data: bytes) -> int:
crc = 0xFFFF
for byte in data:
crc = crc ^ byte
for _ in range(8):
if crc & 0x0001 == 0:
crc = crc >> 1
else:
crc = crc >> 1
crc = crc ^ 0xA001
return crc
# =====
REQUEST_PING = _make_request(b"\x01\x00\x00\x00\x00")
REQUEST_REPEAT = _make_request(b"\x02\x00\x00\x00\x00")
RESPONSE_LEGACY_OK = b"\x33\x20" + struct.pack(">H", _make_crc16(b"\x33\x20"))

View File

@ -131,8 +131,9 @@ class Plugin(BaseHid): # pylint: disable=too-many-instance-attributes
async def get_state(self) -> Dict:
state = await self.__server.get_state()
outputs: Dict = {"available": {}, "active": ""}
return {
"online": state["online"],
"online": True,
"keyboard": {
"online": state["online"],
"leds": {
@ -140,10 +141,12 @@ class Plugin(BaseHid): # pylint: disable=too-many-instance-attributes
"scroll": state["scroll"],
"num": state["num"],
},
"outputs": outputs,
},
"mouse": {
"online": state["online"],
"absolute": False,
"outputs": outputs,
},
}
@ -178,10 +181,6 @@ class Plugin(BaseHid): # pylint: disable=too-many-instance-attributes
def send_mouse_button_event(self, button: str, state: bool) -> None:
self.__server.queue_event(MouseButtonEvent(button, state))
def send_mouse_move_event(self, to_x: int, to_y: int) -> None:
_ = to_x # No absolute events
_ = to_y
def send_mouse_relative_event(self, delta_x: int, delta_y: int) -> None:
self.__server.queue_event(MouseRelativeEvent(delta_x, delta_y))

View File

@ -89,8 +89,9 @@ class Plugin(BaseHid):
async def get_state(self) -> Dict:
keyboard_state = await self.__keyboard_proc.get_state()
mouse_state = await self.__mouse_proc.get_state()
outputs: Dict = {"available": {}, "active": ""}
return {
"online": (keyboard_state["online"] and mouse_state["online"]),
"online": True,
"keyboard": {
"online": keyboard_state["online"],
"leds": {
@ -98,8 +99,9 @@ class Plugin(BaseHid):
"scroll": keyboard_state["scroll"],
"num": keyboard_state["num"],
},
"outputs": outputs,
},
"mouse": mouse_state,
"mouse": {**mouse_state, "outputs": outputs},
}
async def poll_state(self) -> AsyncGenerator[Dict, None]:

View File

@ -51,7 +51,10 @@ class _SerialPhyConnection(BasePhyConnection):
if self.__tty.in_waiting:
self.__tty.read_all()
assert self.__tty.write(request) == 8
return self.__tty.read(4)
data = self.__tty.read(4)
if data[0] == 0x34: # New response protocol
data += self.__tty.read(4)
return data
class _SerialPhy(BasePhy):

View File

@ -67,7 +67,7 @@ class _SpiPhyConnection(BasePhyConnection):
assert request[0] == 0x33
deadline_ts = time.monotonic() + self.__read_timeout
dummy = b"\x00" * 8
dummy = b"\x00" * 10
while time.monotonic() < deadline_ts:
if bytes(self.__xfer(dummy)) == dummy:
break
@ -81,15 +81,15 @@ class _SpiPhyConnection(BasePhyConnection):
deadline_ts = time.monotonic() + self.__read_timeout
found = False
while time.monotonic() < deadline_ts:
for byte in self.__xfer(b"\x00" * (5 - len(response))):
for byte in self.__xfer(b"\x00" * (9 - len(response))):
if not found:
if byte != 0x33:
if byte == 0:
continue
found = True
response.append(byte)
if len(response) == 4:
if len(response) == 8:
break
if len(response) == 4:
if len(response) == 8:
break
else:
get_logger(0).error("SPI timeout reached while responce waiting")

View File

@ -57,6 +57,10 @@ def sorted_kvs(dct: Dict[_DictKeyT, _DictValueT]) -> List[Tuple[_DictKeyT, _Dict
return sorted(dct.items(), key=operator.itemgetter(0))
def swapped_kvs(dct: Dict[_DictKeyT, _DictValueT]) -> Dict[_DictValueT, _DictKeyT]:
return {value: key for (key, value) in dct.items()}
# =====
def clear_queue(q: multiprocessing.queues.Queue) -> None: # pylint: disable=invalid-name
for _ in range(q.qsize()):

56
kvmd/validators/hid.py Normal file
View File

@ -0,0 +1,56 @@
# ========================================================================== #
# #
# KVMD - The main Pi-KVM daemon. #
# #
# Copyright (C) 2018 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
# ========================================================================== #
from typing import Any
from ..keyboard.mappings import KEYMAP
from . import check_string_in_list
from .basic import valid_number
# =====
def valid_hid_keyboard_output(arg: Any) -> str:
return check_string_in_list(arg, "Keyboard output", ["usb", "ps2", ""])
def valid_hid_mouse_output(arg: Any) -> str:
return check_string_in_list(arg, "Mouse output", ["usb", "usb_rel", "ps2", ""])
def valid_hid_key(arg: Any) -> str:
return check_string_in_list(arg, "Keyboard key", KEYMAP, lower=False)
def valid_hid_mouse_move(arg: Any) -> int:
arg = valid_number(arg, name="Mouse move")
return min(max(-32768, arg), 32767)
def valid_hid_mouse_button(arg: Any) -> str:
return check_string_in_list(arg, "Mouse button", ["left", "right", "middle", "up", "down"])
def valid_hid_mouse_delta(arg: Any) -> int:
arg = valid_number(arg, name="Mouse delta")
return min(max(-127, arg), 127)

View File

@ -25,8 +25,6 @@ from typing import Set
from typing import Optional
from typing import Any
from ..keyboard.mappings import KEYMAP
from . import raise_error
from . import check_string_in_list
from . import check_re_match
@ -82,25 +80,6 @@ def valid_stream_resolution(arg: Any) -> str:
return f"{width}x{height}"
# =====
def valid_hid_key(arg: Any) -> str:
return check_string_in_list(arg, "HID key", KEYMAP, lower=False)
def valid_hid_mouse_move(arg: Any) -> int:
arg = valid_number(arg, name="HID mouse move")
return min(max(-32768, arg), 32767)
def valid_hid_mouse_button(arg: Any) -> str:
return check_string_in_list(arg, "HID mouse button", ["left", "right", "middle", "up", "down"])
def valid_hid_mouse_delta(arg: Any) -> int:
arg = valid_number(arg, name="HID mouse delta")
return min(max(-127, arg), 127)
# =====
def valid_ugpio_driver(arg: Any, variants: Optional[Set[str]]=None) -> str:
name = "GPIO driver"

View File

@ -0,0 +1,98 @@
# ========================================================================== #
# #
# KVMD - The main Pi-KVM daemon. #
# #
# Copyright (C) 2018 Maxim Devaev <mdevaev@gmail.com> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
# #
# ========================================================================== #
from typing import Any
import pytest
from kvmd.keyboard.mappings import KEYMAP
from kvmd.validators import ValidatorError
from kvmd.validators.hid import valid_hid_key
from kvmd.validators.hid import valid_hid_mouse_move
from kvmd.validators.hid import valid_hid_mouse_button
from kvmd.validators.hid import valid_hid_mouse_delta
# =====
def test_ok__valid_hid_key() -> None:
for key in KEYMAP:
print(valid_hid_key(key))
print(valid_hid_key(key + " "))
@pytest.mark.parametrize("arg", ["test", "", None, "keya"])
def test_fail__valid_hid_key(arg: Any) -> None:
with pytest.raises(ValidatorError):
print(valid_hid_key(arg))
# =====
@pytest.mark.parametrize("arg", [-20000, "1 ", "-1", 1, -1, 0, "20000 "])
def test_ok__valid_hid_mouse_move(arg: Any) -> None:
assert valid_hid_mouse_move(arg) == int(str(arg).strip())
def test_ok__valid_hid_mouse_move__m50000() -> None:
assert valid_hid_mouse_move(-50000) == -32768
def test_ok__valid_hid_mouse_move__p50000() -> None:
assert valid_hid_mouse_move(50000) == 32767
@pytest.mark.parametrize("arg", ["test", "", None, 1.1])
def test_fail__valid_hid_mouse_move(arg: Any) -> None:
with pytest.raises(ValidatorError):
print(valid_hid_mouse_move(arg))
# =====
@pytest.mark.parametrize("arg", ["LEFT ", "RIGHT ", "Up ", " Down", " MiDdLe "])
def test_ok__valid_hid_mouse_button(arg: Any) -> None:
assert valid_hid_mouse_button(arg) == arg.strip().lower()
@pytest.mark.parametrize("arg", ["test", "", None])
def test_fail__valid_hid_mouse_button(arg: Any) -> None:
with pytest.raises(ValidatorError):
print(valid_hid_mouse_button(arg))
# =====
@pytest.mark.parametrize("arg", [-100, "1 ", "-1", 1, -1, 0, "100 "])
def test_ok__valid_hid_mouse_delta(arg: Any) -> None:
assert valid_hid_mouse_delta(arg) == int(str(arg).strip())
def test_ok__valid_hid_mouse_delta__m200() -> None:
assert valid_hid_mouse_delta(-200) == -127
def test_ok__valid_hid_mouse_delta__p200() -> None:
assert valid_hid_mouse_delta(200) == 127
@pytest.mark.parametrize("arg", ["test", "", None, 1.1])
def test_fail__valid_hid_mouse_delta(arg: Any) -> None:
with pytest.raises(ValidatorError):
print(valid_hid_mouse_delta(arg))

View File

@ -25,8 +25,6 @@ from typing import Any
import pytest
from kvmd.keyboard.mappings import KEYMAP
from kvmd.validators import ValidatorError
from kvmd.validators.kvm import valid_atx_power_action
from kvmd.validators.kvm import valid_atx_button
@ -35,10 +33,6 @@ from kvmd.validators.kvm import valid_log_seek
from kvmd.validators.kvm import valid_stream_quality
from kvmd.validators.kvm import valid_stream_fps
from kvmd.validators.kvm import valid_stream_resolution
from kvmd.validators.kvm import valid_hid_key
from kvmd.validators.kvm import valid_hid_mouse_move
from kvmd.validators.kvm import valid_hid_mouse_button
from kvmd.validators.kvm import valid_hid_mouse_delta
from kvmd.validators.kvm import valid_ugpio_driver
from kvmd.validators.kvm import valid_ugpio_channel
from kvmd.validators.kvm import valid_ugpio_mode
@ -141,71 +135,6 @@ def test_fail__valid_stream_resolution(arg: Any) -> None:
print(valid_stream_resolution(arg))
# =====
def test_ok__valid_hid_key() -> None:
for key in KEYMAP:
print(valid_hid_key(key))
print(valid_hid_key(key + " "))
@pytest.mark.parametrize("arg", ["test", "", None, "keya"])
def test_fail__valid_hid_key(arg: Any) -> None:
with pytest.raises(ValidatorError):
print(valid_hid_key(arg))
# =====
@pytest.mark.parametrize("arg", [-20000, "1 ", "-1", 1, -1, 0, "20000 "])
def test_ok__valid_hid_mouse_move(arg: Any) -> None:
assert valid_hid_mouse_move(arg) == int(str(arg).strip())
def test_ok__valid_hid_mouse_move__m50000() -> None:
assert valid_hid_mouse_move(-50000) == -32768
def test_ok__valid_hid_mouse_move__p50000() -> None:
assert valid_hid_mouse_move(50000) == 32767
@pytest.mark.parametrize("arg", ["test", "", None, 1.1])
def test_fail__valid_hid_mouse_move(arg: Any) -> None:
with pytest.raises(ValidatorError):
print(valid_hid_mouse_move(arg))
# =====
@pytest.mark.parametrize("arg", ["LEFT ", "RIGHT ", "Up ", " Down", " MiDdLe "])
def test_ok__valid_hid_mouse_button(arg: Any) -> None:
assert valid_hid_mouse_button(arg) == arg.strip().lower()
@pytest.mark.parametrize("arg", ["test", "", None])
def test_fail__valid_hid_mouse_button(arg: Any) -> None:
with pytest.raises(ValidatorError):
print(valid_hid_mouse_button(arg))
# =====
@pytest.mark.parametrize("arg", [-100, "1 ", "-1", 1, -1, 0, "100 "])
def test_ok__valid_hid_mouse_delta(arg: Any) -> None:
assert valid_hid_mouse_delta(arg) == int(str(arg).strip())
def test_ok__valid_hid_mouse_delta__m200() -> None:
assert valid_hid_mouse_delta(-200) == -127
def test_ok__valid_hid_mouse_delta__p200() -> None:
assert valid_hid_mouse_delta(200) == 127
@pytest.mark.parametrize("arg", ["test", "", None, 1.1])
def test_fail__valid_hid_mouse_delta(arg: Any) -> None:
with pytest.raises(ValidatorError):
print(valid_hid_mouse_delta(arg))
# =====
@pytest.mark.parametrize("validator", [valid_ugpio_driver, valid_ugpio_channel])
@pytest.mark.parametrize("arg", [