diff --git a/src/hid/mod.rs b/src/hid/mod.rs index 46e4c45b..611bdea8 100644 --- a/src/hid/mod.rs +++ b/src/hid/mod.rs @@ -43,24 +43,52 @@ pub struct HidInfo { } use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; use tokio::sync::RwLock; use tracing::{info, warn}; use crate::error::{AppError, Result}; use crate::otg::OtgService; +use tokio::sync::mpsc; +use tokio::sync::Mutex; +use tokio::task::JoinHandle; +use std::time::Duration; + +const HID_EVENT_QUEUE_CAPACITY: usize = 64; +const HID_EVENT_SEND_TIMEOUT_MS: u64 = 30; + +#[derive(Debug)] +enum HidEvent { + Keyboard(KeyboardEvent), + Mouse(MouseEvent), + Consumer(ConsumerEvent), + Reset, +} /// HID controller managing keyboard and mouse input pub struct HidController { /// OTG Service reference (only used when backend is OTG) otg_service: Option>, /// Active backend - backend: Arc>>>, + backend: Arc>>>, /// Backend type (mutable for reload) - backend_type: RwLock, + backend_type: Arc>, /// Event bus for broadcasting state changes (optional) events: tokio::sync::RwLock>>, /// Health monitor for error tracking and recovery monitor: Arc, + /// HID event queue sender (non-blocking) + hid_tx: mpsc::Sender, + /// HID event queue receiver (moved into worker on first start) + hid_rx: Mutex>>, + /// Coalesced mouse move (latest) + pending_move: Arc>>, + /// Pending move flag (fast path) + pending_move_flag: Arc, + /// Worker task handle + hid_worker: Mutex>>, + /// Backend availability fast flag + backend_available: AtomicBool, } impl HidController { @@ -68,12 +96,19 @@ impl HidController { /// /// For OTG backend, otg_service should be provided to support hot-reload pub fn new(backend_type: HidBackendType, otg_service: Option>) -> Self { + let (hid_tx, hid_rx) = mpsc::channel(HID_EVENT_QUEUE_CAPACITY); Self { otg_service, backend: Arc::new(RwLock::new(None)), - backend_type: RwLock::new(backend_type), + backend_type: Arc::new(RwLock::new(backend_type)), events: tokio::sync::RwLock::new(None), monitor: Arc::new(HidHealthMonitor::with_defaults()), + hid_tx, + hid_rx: Mutex::new(Some(hid_rx)), + pending_move: Arc::new(parking_lot::Mutex::new(None)), + pending_move_flag: Arc::new(AtomicBool::new(false)), + hid_worker: Mutex::new(None), + backend_available: AtomicBool::new(false), } } @@ -87,7 +122,7 @@ impl HidController { /// Initialize the HID backend pub async fn init(&self) -> Result<()> { let backend_type = self.backend_type.read().await.clone(); - let backend: Box = match backend_type { + let backend: Arc = match backend_type { HidBackendType::Otg => { // Request HID functions from OtgService let otg_service = self @@ -100,7 +135,7 @@ impl HidController { // Create OtgBackend from handles (no longer manages gadget itself) info!("Creating OTG HID backend from device paths"); - Box::new(otg::OtgBackend::from_handles(handles)?) + Arc::new(otg::OtgBackend::from_handles(handles)?) } HidBackendType::Ch9329 { ref port, @@ -110,7 +145,7 @@ impl HidController { "Initializing CH9329 HID backend on {} @ {} baud", port, baud_rate ); - Box::new(ch9329::Ch9329Backend::with_baud_rate(port, baud_rate)?) + Arc::new(ch9329::Ch9329Backend::with_baud_rate(port, baud_rate)?) } HidBackendType::None => { warn!("HID backend disabled"); @@ -120,6 +155,10 @@ impl HidController { backend.init().await?; *self.backend.write().await = Some(backend); + self.backend_available.store(true, Ordering::Release); + + // Start HID event worker (once) + self.start_event_worker().await; info!("HID backend initialized: {:?}", backend_type); Ok(()) @@ -131,6 +170,7 @@ impl HidController { // Close the backend *self.backend.write().await = None; + self.backend_available.store(false, Ordering::Release); // If OTG backend, notify OtgService to disable HID let backend_type = self.backend_type.read().await.clone(); @@ -147,125 +187,47 @@ impl HidController { /// Send keyboard event pub async fn send_keyboard(&self, event: KeyboardEvent) -> Result<()> { - let backend = self.backend.read().await; - match backend.as_ref() { - Some(b) => { - match b.send_keyboard(event).await { - Ok(_) => { - // Check if we were in an error state and now recovered - if self.monitor.is_error().await { - let backend_type = self.backend_type.read().await; - self.monitor.report_recovered(backend_type.name_str()).await; - } - Ok(()) - } - Err(e) => { - // Report error to monitor, but skip temporary EAGAIN retries - // - "eagain_retry": within threshold, just temporary busy - // - "eagain": exceeded threshold, report as error - if let AppError::HidError { - ref backend, - ref reason, - ref error_code, - } = e - { - if error_code != "eagain_retry" { - self.monitor - .report_error(backend, None, reason, error_code) - .await; - } - } - Err(e) - } - } - } - None => Err(AppError::BadRequest( + if !self.backend_available.load(Ordering::Acquire) { + return Err(AppError::BadRequest( "HID backend not available".to_string(), - )), + )); } + self.enqueue_event(HidEvent::Keyboard(event)).await } /// Send mouse event pub async fn send_mouse(&self, event: MouseEvent) -> Result<()> { - let backend = self.backend.read().await; - match backend.as_ref() { - Some(b) => { - match b.send_mouse(event).await { - Ok(_) => { - // Check if we were in an error state and now recovered - if self.monitor.is_error().await { - let backend_type = self.backend_type.read().await; - self.monitor.report_recovered(backend_type.name_str()).await; - } - Ok(()) - } - Err(e) => { - // Report error to monitor, but skip temporary EAGAIN retries - // - "eagain_retry": within threshold, just temporary busy - // - "eagain": exceeded threshold, report as error - if let AppError::HidError { - ref backend, - ref reason, - ref error_code, - } = e - { - if error_code != "eagain_retry" { - self.monitor - .report_error(backend, None, reason, error_code) - .await; - } - } - Err(e) - } - } - } - None => Err(AppError::BadRequest( + if !self.backend_available.load(Ordering::Acquire) { + return Err(AppError::BadRequest( "HID backend not available".to_string(), - )), + )); + } + + if matches!(event.event_type, MouseEventType::Move | MouseEventType::MoveAbs) { + // Best-effort: drop/merge move events if queue is full + self.enqueue_mouse_move(event) + } else { + self.enqueue_event(HidEvent::Mouse(event)).await } } /// Send consumer control event (multimedia keys) pub async fn send_consumer(&self, event: ConsumerEvent) -> Result<()> { - let backend = self.backend.read().await; - match backend.as_ref() { - Some(b) => match b.send_consumer(event).await { - Ok(_) => { - if self.monitor.is_error().await { - let backend_type = self.backend_type.read().await; - self.monitor.report_recovered(backend_type.name_str()).await; - } - Ok(()) - } - Err(e) => { - if let AppError::HidError { - ref backend, - ref reason, - ref error_code, - } = e - { - if error_code != "eagain_retry" { - self.monitor - .report_error(backend, None, reason, error_code) - .await; - } - } - Err(e) - } - }, - None => Err(AppError::BadRequest( + if !self.backend_available.load(Ordering::Acquire) { + return Err(AppError::BadRequest( "HID backend not available".to_string(), - )), + )); } + self.enqueue_event(HidEvent::Consumer(event)).await } /// Reset all keys (release all pressed keys) pub async fn reset(&self) -> Result<()> { - let backend = self.backend.read().await; - match backend.as_ref() { - Some(b) => b.reset().await, - None => Ok(()), + if !self.backend_available.load(Ordering::Acquire) { + return Ok(()); } + // Reset is important but best-effort; enqueue to avoid blocking + self.enqueue_event(HidEvent::Reset).await } /// Check if backend is available @@ -332,6 +294,7 @@ impl HidController { /// Reload the HID backend with new type pub async fn reload(&self, new_backend_type: HidBackendType) -> Result<()> { info!("Reloading HID backend: {:?}", new_backend_type); + self.backend_available.store(false, Ordering::Release); // Shutdown existing backend first if let Some(backend) = self.backend.write().await.take() { @@ -341,7 +304,7 @@ impl HidController { } // Create and initialize new backend - let new_backend: Option> = match new_backend_type { + let new_backend: Option> = match new_backend_type { HidBackendType::Otg => { info!("Initializing OTG HID backend"); @@ -362,11 +325,11 @@ impl HidController { // Create OtgBackend from handles match otg::OtgBackend::from_handles(handles) { Ok(backend) => { - let boxed: Box = Box::new(backend); - match boxed.init().await { + let backend = Arc::new(backend); + match backend.init().await { Ok(_) => { info!("OTG backend initialized successfully"); - Some(boxed) + Some(backend) } Err(e) => { warn!("Failed to initialize OTG backend: {}", e); @@ -407,9 +370,9 @@ impl HidController { ); match ch9329::Ch9329Backend::with_baud_rate(port, baud_rate) { Ok(b) => { - let boxed = Box::new(b); - match boxed.init().await { - Ok(_) => Some(boxed), + let backend = Arc::new(b); + match backend.init().await { + Ok(_) => Some(backend), Err(e) => { warn!("Failed to initialize CH9329 backend: {}", e); None @@ -432,6 +395,8 @@ impl HidController { if self.backend.read().await.is_some() { info!("HID backend reloaded successfully: {:?}", new_backend_type); + self.backend_available.store(true, Ordering::Release); + self.start_event_worker().await; // Update backend_type on success *self.backend_type.write().await = new_backend_type.clone(); @@ -452,6 +417,7 @@ impl HidController { Ok(()) } else { warn!("HID backend reload resulted in no active backend"); + self.backend_available.store(false, Ordering::Release); // Update backend_type even on failure (to reflect the attempted change) *self.backend_type.write().await = new_backend_type.clone(); @@ -477,6 +443,148 @@ impl HidController { events.publish(event); } } + + async fn start_event_worker(&self) { + let mut worker_guard = self.hid_worker.lock().await; + if worker_guard.is_some() { + return; + } + + let mut rx_guard = self.hid_rx.lock().await; + let rx = match rx_guard.take() { + Some(rx) => rx, + None => return, + }; + + let backend = self.backend.clone(); + let monitor = self.monitor.clone(); + let backend_type = self.backend_type.clone(); + let pending_move = self.pending_move.clone(); + let pending_move_flag = self.pending_move_flag.clone(); + + let handle = tokio::spawn(async move { + let mut rx = rx; + loop { + let event = match rx.recv().await { + Some(ev) => ev, + None => break, + }; + + process_hid_event( + event, + &backend, + &monitor, + &backend_type, + ) + .await; + + // After each event, flush latest move if pending + if pending_move_flag.swap(false, Ordering::AcqRel) { + let move_event = { pending_move.lock().take() }; + if let Some(move_event) = move_event { + process_hid_event( + HidEvent::Mouse(move_event), + &backend, + &monitor, + &backend_type, + ) + .await; + } + } + } + }); + + *worker_guard = Some(handle); + } + + fn enqueue_mouse_move(&self, event: MouseEvent) -> Result<()> { + match self.hid_tx.try_send(HidEvent::Mouse(event.clone())) { + Ok(_) => Ok(()), + Err(mpsc::error::TrySendError::Full(_)) => { + *self.pending_move.lock() = Some(event); + self.pending_move_flag.store(true, Ordering::Release); + Ok(()) + } + Err(mpsc::error::TrySendError::Closed(_)) => Err(AppError::BadRequest( + "HID event queue closed".to_string(), + )), + } + } + + async fn enqueue_event(&self, event: HidEvent) -> Result<()> { + match self.hid_tx.try_send(event) { + Ok(_) => Ok(()), + Err(mpsc::error::TrySendError::Full(ev)) => { + // For non-move events, wait briefly to avoid dropping critical input + let tx = self.hid_tx.clone(); + let send_result = + tokio::time::timeout(Duration::from_millis(HID_EVENT_SEND_TIMEOUT_MS), tx.send(ev)) + .await; + if send_result.is_ok() { + Ok(()) + } else { + warn!("HID event queue full, dropping event"); + Ok(()) + } + } + Err(mpsc::error::TrySendError::Closed(_)) => Err(AppError::BadRequest( + "HID event queue closed".to_string(), + )), + } + } +} + +async fn process_hid_event( + event: HidEvent, + backend: &Arc>>>, + monitor: &Arc, + backend_type: &Arc>, +) { + let backend_opt = backend.read().await.clone(); + let backend = match backend_opt { + Some(b) => b, + None => return, + }; + + let result = tokio::task::spawn_blocking(move || { + futures::executor::block_on(async move { + match event { + HidEvent::Keyboard(ev) => backend.send_keyboard(ev).await, + HidEvent::Mouse(ev) => backend.send_mouse(ev).await, + HidEvent::Consumer(ev) => backend.send_consumer(ev).await, + HidEvent::Reset => backend.reset().await, + } + }) + }) + .await; + + let result = match result { + Ok(r) => r, + Err(_) => return, + }; + + match result { + Ok(_) => { + if monitor.is_error().await { + let backend_type = backend_type.read().await; + monitor.report_recovered(backend_type.name_str()).await; + } + } + Err(e) => { + if let AppError::HidError { + ref backend, + ref reason, + ref error_code, + } = e + { + if error_code != "eagain_retry" { + monitor + .report_error(backend, None, reason, error_code) + .await; + } + } + } + } } impl Default for HidController { diff --git a/src/hid/otg.rs b/src/hid/otg.rs index 0db55cc0..b21917d4 100644 --- a/src/hid/otg.rs +++ b/src/hid/otg.rs @@ -145,7 +145,7 @@ pub struct OtgBackend { } /// Write timeout in milliseconds (same as JetKVM's hidWriteTimeout) -const HID_WRITE_TIMEOUT_MS: i32 = 500; +const HID_WRITE_TIMEOUT_MS: i32 = 20; impl OtgBackend { /// Create OTG backend from device paths provided by OtgService