fix: 修复 USB HID 端点异常时鼠标事件写入会阻塞把服务端音视频线程拖死的问题

This commit is contained in:
mofeng
2026-01-30 12:33:19 +08:00
parent 1a0b285fe6
commit b2b99115ec
2 changed files with 222 additions and 114 deletions

View File

@@ -43,24 +43,52 @@ pub struct HidInfo {
}
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::sync::RwLock;
use tracing::{info, warn};
use crate::error::{AppError, Result};
use crate::otg::OtgService;
use tokio::sync::mpsc;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use std::time::Duration;
const HID_EVENT_QUEUE_CAPACITY: usize = 64;
const HID_EVENT_SEND_TIMEOUT_MS: u64 = 30;
#[derive(Debug)]
enum HidEvent {
Keyboard(KeyboardEvent),
Mouse(MouseEvent),
Consumer(ConsumerEvent),
Reset,
}
/// HID controller managing keyboard and mouse input
pub struct HidController {
/// OTG Service reference (only used when backend is OTG)
otg_service: Option<Arc<OtgService>>,
/// Active backend
backend: Arc<RwLock<Option<Box<dyn HidBackend>>>>,
backend: Arc<RwLock<Option<Arc<dyn HidBackend>>>>,
/// Backend type (mutable for reload)
backend_type: RwLock<HidBackendType>,
backend_type: Arc<RwLock<HidBackendType>>,
/// Event bus for broadcasting state changes (optional)
events: tokio::sync::RwLock<Option<Arc<crate::events::EventBus>>>,
/// Health monitor for error tracking and recovery
monitor: Arc<HidHealthMonitor>,
/// HID event queue sender (non-blocking)
hid_tx: mpsc::Sender<HidEvent>,
/// HID event queue receiver (moved into worker on first start)
hid_rx: Mutex<Option<mpsc::Receiver<HidEvent>>>,
/// Coalesced mouse move (latest)
pending_move: Arc<parking_lot::Mutex<Option<MouseEvent>>>,
/// Pending move flag (fast path)
pending_move_flag: Arc<AtomicBool>,
/// Worker task handle
hid_worker: Mutex<Option<JoinHandle<()>>>,
/// Backend availability fast flag
backend_available: AtomicBool,
}
impl HidController {
@@ -68,12 +96,19 @@ impl HidController {
///
/// For OTG backend, otg_service should be provided to support hot-reload
pub fn new(backend_type: HidBackendType, otg_service: Option<Arc<OtgService>>) -> Self {
let (hid_tx, hid_rx) = mpsc::channel(HID_EVENT_QUEUE_CAPACITY);
Self {
otg_service,
backend: Arc::new(RwLock::new(None)),
backend_type: RwLock::new(backend_type),
backend_type: Arc::new(RwLock::new(backend_type)),
events: tokio::sync::RwLock::new(None),
monitor: Arc::new(HidHealthMonitor::with_defaults()),
hid_tx,
hid_rx: Mutex::new(Some(hid_rx)),
pending_move: Arc::new(parking_lot::Mutex::new(None)),
pending_move_flag: Arc::new(AtomicBool::new(false)),
hid_worker: Mutex::new(None),
backend_available: AtomicBool::new(false),
}
}
@@ -87,7 +122,7 @@ impl HidController {
/// Initialize the HID backend
pub async fn init(&self) -> Result<()> {
let backend_type = self.backend_type.read().await.clone();
let backend: Box<dyn HidBackend> = match backend_type {
let backend: Arc<dyn HidBackend> = match backend_type {
HidBackendType::Otg => {
// Request HID functions from OtgService
let otg_service = self
@@ -100,7 +135,7 @@ impl HidController {
// Create OtgBackend from handles (no longer manages gadget itself)
info!("Creating OTG HID backend from device paths");
Box::new(otg::OtgBackend::from_handles(handles)?)
Arc::new(otg::OtgBackend::from_handles(handles)?)
}
HidBackendType::Ch9329 {
ref port,
@@ -110,7 +145,7 @@ impl HidController {
"Initializing CH9329 HID backend on {} @ {} baud",
port, baud_rate
);
Box::new(ch9329::Ch9329Backend::with_baud_rate(port, baud_rate)?)
Arc::new(ch9329::Ch9329Backend::with_baud_rate(port, baud_rate)?)
}
HidBackendType::None => {
warn!("HID backend disabled");
@@ -120,6 +155,10 @@ impl HidController {
backend.init().await?;
*self.backend.write().await = Some(backend);
self.backend_available.store(true, Ordering::Release);
// Start HID event worker (once)
self.start_event_worker().await;
info!("HID backend initialized: {:?}", backend_type);
Ok(())
@@ -131,6 +170,7 @@ impl HidController {
// Close the backend
*self.backend.write().await = None;
self.backend_available.store(false, Ordering::Release);
// If OTG backend, notify OtgService to disable HID
let backend_type = self.backend_type.read().await.clone();
@@ -147,125 +187,47 @@ impl HidController {
/// Send keyboard event
pub async fn send_keyboard(&self, event: KeyboardEvent) -> Result<()> {
let backend = self.backend.read().await;
match backend.as_ref() {
Some(b) => {
match b.send_keyboard(event).await {
Ok(_) => {
// Check if we were in an error state and now recovered
if self.monitor.is_error().await {
let backend_type = self.backend_type.read().await;
self.monitor.report_recovered(backend_type.name_str()).await;
}
Ok(())
}
Err(e) => {
// Report error to monitor, but skip temporary EAGAIN retries
// - "eagain_retry": within threshold, just temporary busy
// - "eagain": exceeded threshold, report as error
if let AppError::HidError {
ref backend,
ref reason,
ref error_code,
} = e
{
if error_code != "eagain_retry" {
self.monitor
.report_error(backend, None, reason, error_code)
.await;
}
}
Err(e)
}
}
}
None => Err(AppError::BadRequest(
if !self.backend_available.load(Ordering::Acquire) {
return Err(AppError::BadRequest(
"HID backend not available".to_string(),
)),
));
}
self.enqueue_event(HidEvent::Keyboard(event)).await
}
/// Send mouse event
pub async fn send_mouse(&self, event: MouseEvent) -> Result<()> {
let backend = self.backend.read().await;
match backend.as_ref() {
Some(b) => {
match b.send_mouse(event).await {
Ok(_) => {
// Check if we were in an error state and now recovered
if self.monitor.is_error().await {
let backend_type = self.backend_type.read().await;
self.monitor.report_recovered(backend_type.name_str()).await;
}
Ok(())
}
Err(e) => {
// Report error to monitor, but skip temporary EAGAIN retries
// - "eagain_retry": within threshold, just temporary busy
// - "eagain": exceeded threshold, report as error
if let AppError::HidError {
ref backend,
ref reason,
ref error_code,
} = e
{
if error_code != "eagain_retry" {
self.monitor
.report_error(backend, None, reason, error_code)
.await;
}
}
Err(e)
}
}
}
None => Err(AppError::BadRequest(
if !self.backend_available.load(Ordering::Acquire) {
return Err(AppError::BadRequest(
"HID backend not available".to_string(),
)),
));
}
if matches!(event.event_type, MouseEventType::Move | MouseEventType::MoveAbs) {
// Best-effort: drop/merge move events if queue is full
self.enqueue_mouse_move(event)
} else {
self.enqueue_event(HidEvent::Mouse(event)).await
}
}
/// Send consumer control event (multimedia keys)
pub async fn send_consumer(&self, event: ConsumerEvent) -> Result<()> {
let backend = self.backend.read().await;
match backend.as_ref() {
Some(b) => match b.send_consumer(event).await {
Ok(_) => {
if self.monitor.is_error().await {
let backend_type = self.backend_type.read().await;
self.monitor.report_recovered(backend_type.name_str()).await;
}
Ok(())
}
Err(e) => {
if let AppError::HidError {
ref backend,
ref reason,
ref error_code,
} = e
{
if error_code != "eagain_retry" {
self.monitor
.report_error(backend, None, reason, error_code)
.await;
}
}
Err(e)
}
},
None => Err(AppError::BadRequest(
if !self.backend_available.load(Ordering::Acquire) {
return Err(AppError::BadRequest(
"HID backend not available".to_string(),
)),
));
}
self.enqueue_event(HidEvent::Consumer(event)).await
}
/// Reset all keys (release all pressed keys)
pub async fn reset(&self) -> Result<()> {
let backend = self.backend.read().await;
match backend.as_ref() {
Some(b) => b.reset().await,
None => Ok(()),
if !self.backend_available.load(Ordering::Acquire) {
return Ok(());
}
// Reset is important but best-effort; enqueue to avoid blocking
self.enqueue_event(HidEvent::Reset).await
}
/// Check if backend is available
@@ -332,6 +294,7 @@ impl HidController {
/// Reload the HID backend with new type
pub async fn reload(&self, new_backend_type: HidBackendType) -> Result<()> {
info!("Reloading HID backend: {:?}", new_backend_type);
self.backend_available.store(false, Ordering::Release);
// Shutdown existing backend first
if let Some(backend) = self.backend.write().await.take() {
@@ -341,7 +304,7 @@ impl HidController {
}
// Create and initialize new backend
let new_backend: Option<Box<dyn HidBackend>> = match new_backend_type {
let new_backend: Option<Arc<dyn HidBackend>> = match new_backend_type {
HidBackendType::Otg => {
info!("Initializing OTG HID backend");
@@ -362,11 +325,11 @@ impl HidController {
// Create OtgBackend from handles
match otg::OtgBackend::from_handles(handles) {
Ok(backend) => {
let boxed: Box<dyn HidBackend> = Box::new(backend);
match boxed.init().await {
let backend = Arc::new(backend);
match backend.init().await {
Ok(_) => {
info!("OTG backend initialized successfully");
Some(boxed)
Some(backend)
}
Err(e) => {
warn!("Failed to initialize OTG backend: {}", e);
@@ -407,9 +370,9 @@ impl HidController {
);
match ch9329::Ch9329Backend::with_baud_rate(port, baud_rate) {
Ok(b) => {
let boxed = Box::new(b);
match boxed.init().await {
Ok(_) => Some(boxed),
let backend = Arc::new(b);
match backend.init().await {
Ok(_) => Some(backend),
Err(e) => {
warn!("Failed to initialize CH9329 backend: {}", e);
None
@@ -432,6 +395,8 @@ impl HidController {
if self.backend.read().await.is_some() {
info!("HID backend reloaded successfully: {:?}", new_backend_type);
self.backend_available.store(true, Ordering::Release);
self.start_event_worker().await;
// Update backend_type on success
*self.backend_type.write().await = new_backend_type.clone();
@@ -452,6 +417,7 @@ impl HidController {
Ok(())
} else {
warn!("HID backend reload resulted in no active backend");
self.backend_available.store(false, Ordering::Release);
// Update backend_type even on failure (to reflect the attempted change)
*self.backend_type.write().await = new_backend_type.clone();
@@ -477,6 +443,148 @@ impl HidController {
events.publish(event);
}
}
async fn start_event_worker(&self) {
let mut worker_guard = self.hid_worker.lock().await;
if worker_guard.is_some() {
return;
}
let mut rx_guard = self.hid_rx.lock().await;
let rx = match rx_guard.take() {
Some(rx) => rx,
None => return,
};
let backend = self.backend.clone();
let monitor = self.monitor.clone();
let backend_type = self.backend_type.clone();
let pending_move = self.pending_move.clone();
let pending_move_flag = self.pending_move_flag.clone();
let handle = tokio::spawn(async move {
let mut rx = rx;
loop {
let event = match rx.recv().await {
Some(ev) => ev,
None => break,
};
process_hid_event(
event,
&backend,
&monitor,
&backend_type,
)
.await;
// After each event, flush latest move if pending
if pending_move_flag.swap(false, Ordering::AcqRel) {
let move_event = { pending_move.lock().take() };
if let Some(move_event) = move_event {
process_hid_event(
HidEvent::Mouse(move_event),
&backend,
&monitor,
&backend_type,
)
.await;
}
}
}
});
*worker_guard = Some(handle);
}
fn enqueue_mouse_move(&self, event: MouseEvent) -> Result<()> {
match self.hid_tx.try_send(HidEvent::Mouse(event.clone())) {
Ok(_) => Ok(()),
Err(mpsc::error::TrySendError::Full(_)) => {
*self.pending_move.lock() = Some(event);
self.pending_move_flag.store(true, Ordering::Release);
Ok(())
}
Err(mpsc::error::TrySendError::Closed(_)) => Err(AppError::BadRequest(
"HID event queue closed".to_string(),
)),
}
}
async fn enqueue_event(&self, event: HidEvent) -> Result<()> {
match self.hid_tx.try_send(event) {
Ok(_) => Ok(()),
Err(mpsc::error::TrySendError::Full(ev)) => {
// For non-move events, wait briefly to avoid dropping critical input
let tx = self.hid_tx.clone();
let send_result =
tokio::time::timeout(Duration::from_millis(HID_EVENT_SEND_TIMEOUT_MS), tx.send(ev))
.await;
if send_result.is_ok() {
Ok(())
} else {
warn!("HID event queue full, dropping event");
Ok(())
}
}
Err(mpsc::error::TrySendError::Closed(_)) => Err(AppError::BadRequest(
"HID event queue closed".to_string(),
)),
}
}
}
async fn process_hid_event(
event: HidEvent,
backend: &Arc<RwLock<Option<Arc<dyn HidBackend>>>>,
monitor: &Arc<HidHealthMonitor>,
backend_type: &Arc<RwLock<HidBackendType>>,
) {
let backend_opt = backend.read().await.clone();
let backend = match backend_opt {
Some(b) => b,
None => return,
};
let result = tokio::task::spawn_blocking(move || {
futures::executor::block_on(async move {
match event {
HidEvent::Keyboard(ev) => backend.send_keyboard(ev).await,
HidEvent::Mouse(ev) => backend.send_mouse(ev).await,
HidEvent::Consumer(ev) => backend.send_consumer(ev).await,
HidEvent::Reset => backend.reset().await,
}
})
})
.await;
let result = match result {
Ok(r) => r,
Err(_) => return,
};
match result {
Ok(_) => {
if monitor.is_error().await {
let backend_type = backend_type.read().await;
monitor.report_recovered(backend_type.name_str()).await;
}
}
Err(e) => {
if let AppError::HidError {
ref backend,
ref reason,
ref error_code,
} = e
{
if error_code != "eagain_retry" {
monitor
.report_error(backend, None, reason, error_code)
.await;
}
}
}
}
}
impl Default for HidController {

View File

@@ -145,7 +145,7 @@ pub struct OtgBackend {
}
/// Write timeout in milliseconds (same as JetKVM's hidWriteTimeout)
const HID_WRITE_TIMEOUT_MS: i32 = 500;
const HID_WRITE_TIMEOUT_MS: i32 = 20;
impl OtgBackend {
/// Create OTG backend from device paths provided by OtgService