From 12a3f1c9476e951be25a5501d35e3830ed6e013a Mon Sep 17 00:00:00 2001 From: mofeng-git Date: Sat, 2 May 2026 10:54:31 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=A2=9E=E5=8A=A0=E8=AE=BE=E5=A4=87?= =?UTF-8?q?=E4=B8=A2=E5=A4=B1=E8=87=AA=E6=81=A2=E5=A4=8D=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 增加音频设备丢失自恢复机制,完善视频设备丢失自恢复机制 降级部分日志级别,GOSTC key打印脱敏 代码格式化 --- src/audio/capture.rs | 20 +- src/audio/controller.rs | 351 ++++++++++++++++++++++++-- src/audio/encoder.rs | 4 +- src/audio/streamer.rs | 55 +++- src/extensions/manager.rs | 30 ++- src/hid/otg.rs | 14 +- src/main.rs | 18 +- src/otg/manager.rs | 8 +- src/otg/service.rs | 6 +- src/video/capture_status.rs | 9 + src/video/device.rs | 165 +++++++++++- src/video/shared_video_pipeline.rs | 28 +- src/video/streamer.rs | 55 +++- src/web/handlers/config/apply.rs | 2 +- src/webrtc/webrtc_streamer.rs | 185 +++++++++++++- web/src/composables/useAudioPlayer.ts | 39 +++ 16 files changed, 929 insertions(+), 60 deletions(-) diff --git a/src/audio/capture.rs b/src/audio/capture.rs index c1dfc415..14a3510b 100644 --- a/src/audio/capture.rs +++ b/src/audio/capture.rs @@ -128,7 +128,7 @@ impl AudioCapturer { return Ok(()); } - info!( + debug!( "Starting audio capture on {} at {}Hz {}ch", self.config.device_name, self.config.sample_rate, self.config.channels ); @@ -243,7 +243,7 @@ fn run_capture( actual_ch ))); } - info!("Audio capture: 48000 Hz, 2 ch"); + debug!("Audio capture: 48000 Hz, 2 ch"); pcm.prepare() .map_err(|e| AppError::AudioError(format!("Failed to prepare PCM: {}", e)))?; @@ -307,11 +307,14 @@ fn run_capture( } Err(e) => { let desc = e.to_string(); - if desc.contains("EPIPE") || desc.contains("Broken pipe") { + if is_device_lost_error(&desc) { + return Err(AppError::AudioError(format!( + "Audio device lost while reading {}: {}", + config.device_name, e + ))); + } else if desc.contains("EPIPE") || desc.contains("Broken pipe") { warn_throttled!(log_throttler, "buffer_overrun", "Audio buffer overrun"); let _ = pcm.prepare(); - } else if desc.contains("No such device") || desc.contains("ENODEV") { - error_throttled!(log_throttler, "no_device", "Audio read error: {}", e); } else { error_throttled!(log_throttler, "read_error", "Audio read error: {}", e); } @@ -322,3 +325,10 @@ fn run_capture( info!("Audio capture stopped"); Ok(()) } + +fn is_device_lost_error(desc: &str) -> bool { + desc.contains("No such device") + || desc.contains("ENODEV") + || desc.contains("ENXIO") + || desc.contains("ESHUTDOWN") +} diff --git a/src/audio/controller.rs b/src/audio/controller.rs index d6346e4e..1c570985 100644 --- a/src/audio/controller.rs +++ b/src/audio/controller.rs @@ -2,19 +2,26 @@ use serde::{Deserialize, Serialize}; use std::str::FromStr; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +use std::time::Duration; use tokio::sync::RwLock; -use tracing::info; +use tracing::{debug, info, warn}; use super::capture::AudioConfig; use super::device::{ - enumerate_audio_devices_with_current, find_best_audio_device, AudioDeviceInfo, + enumerate_audio_devices, enumerate_audio_devices_with_current, find_best_audio_device, + AudioDeviceInfo, }; use super::encoder::{OpusConfig, OpusFrame}; use super::monitor::AudioHealthMonitor; -use super::streamer::{AudioStreamer, AudioStreamerConfig}; +use super::streamer::{AudioStreamState, AudioStreamer, AudioStreamerConfig}; use crate::error::{AppError, Result}; -use crate::events::EventBus; +use crate::events::{EventBus, SystemEvent}; + +const AUDIO_RECOVERY_RETRY_DELAY: Duration = Duration::from_secs(1); + +type AudioRecoveredCallback = Arc; #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] #[serde(rename_all = "lowercase")] @@ -97,21 +104,25 @@ pub struct AudioStatus { } pub struct AudioController { - config: RwLock, - streamer: RwLock>>, - devices: RwLock>, - event_bus: RwLock>>, + config: Arc>, + streamer: Arc>>>, + devices: Arc>>, + event_bus: Arc>>>, monitor: Arc, + recovery_in_progress: Arc, + recovered_callback: Arc>>, } impl AudioController { pub fn new(config: AudioControllerConfig) -> Self { Self { - config: RwLock::new(config), - streamer: RwLock::new(None), - devices: RwLock::new(Vec::new()), - event_bus: RwLock::new(None), + config: Arc::new(RwLock::new(config)), + streamer: Arc::new(RwLock::new(None)), + devices: Arc::new(RwLock::new(Vec::new())), + event_bus: Arc::new(RwLock::new(None)), monitor: Arc::new(AudioHealthMonitor::new()), + recovery_in_progress: Arc::new(AtomicBool::new(false)), + recovered_callback: Arc::new(RwLock::new(None)), } } @@ -119,12 +130,302 @@ impl AudioController { *self.event_bus.write().await = Some(event_bus); } + pub async fn set_recovered_callback(&self, callback: Arc) { + *self.recovered_callback.write().await = Some(callback); + } + async fn mark_device_info_dirty(&self) { if let Some(ref bus) = *self.event_bus.read().await { bus.mark_device_info_dirty(); } } + async fn publish_state( + event_bus: &Arc>>>, + state: &str, + device: Option, + reason: Option<&str>, + next_retry_ms: Option, + ) { + if let Some(ref bus) = *event_bus.read().await { + bus.publish(SystemEvent::StreamStateChanged { + state: state.to_string(), + device, + reason: reason.map(str::to_string), + next_retry_ms, + }); + bus.mark_device_info_dirty(); + } + } + + async fn publish_device_lost( + event_bus: &Arc>>>, + device: &str, + reason: &str, + ) { + if let Some(ref bus) = *event_bus.read().await { + bus.publish(SystemEvent::StreamDeviceLost { + device: device.to_string(), + reason: reason.to_string(), + }); + } + } + + async fn publish_reconnecting( + event_bus: &Arc>>>, + device: &str, + attempt: u32, + ) { + if let Some(ref bus) = *event_bus.read().await { + bus.publish(SystemEvent::StreamReconnecting { + device: device.to_string(), + attempt, + }); + } + } + + async fn publish_recovered(event_bus: &Arc>>>, device: &str) { + if let Some(ref bus) = *event_bus.read().await { + bus.publish(SystemEvent::StreamRecovered { + device: device.to_string(), + }); + } + } + + fn select_recovery_device( + devices: &[AudioDeviceInfo], + preferred: &str, + ) -> Option { + if !preferred.trim().is_empty() { + if let Some(device) = devices.iter().find(|d| d.name == preferred) { + return Some(device.clone()); + } + } + + devices + .iter() + .find(|d| d.is_hdmi && d.sample_rates.contains(&48_000) && d.channels.contains(&2)) + .or_else(|| { + devices + .iter() + .find(|d| d.sample_rates.contains(&48_000) && d.channels.contains(&2)) + }) + .or_else(|| devices.first()) + .cloned() + } + + fn spawn_stream_monitor_from_parts( + config: Arc>, + streamer_slot: Arc>>>, + event_bus: Arc>>>, + monitor: Arc, + recovery_in_progress: Arc, + recovered_callback: Arc>>, + streamer: Arc, + device: String, + ) { + let mut state_rx = streamer.state_watch(); + + tokio::spawn(async move { + loop { + if state_rx.changed().await.is_err() { + return; + } + + if *state_rx.borrow() != AudioStreamState::Error { + continue; + } + + { + let current = streamer_slot.read().await; + if !current + .as_ref() + .is_some_and(|current| Arc::ptr_eq(current, &streamer)) + { + return; + } + } + + let reason = format!("Audio device lost: {}", device); + monitor.report_error(&reason, "device_lost").await; + Self::spawn_recovery_task_from_parts( + config, + streamer_slot, + event_bus, + monitor, + recovery_in_progress, + recovered_callback, + device, + reason, + ); + return; + } + }); + } + + fn spawn_recovery_task_from_parts( + config: Arc>, + streamer_slot: Arc>>>, + event_bus: Arc>>>, + monitor: Arc, + recovery_in_progress: Arc, + recovered_callback: Arc>>, + lost_device: String, + reason: String, + ) { + if recovery_in_progress.swap(true, Ordering::SeqCst) { + debug!("Audio recovery already in progress"); + return; + } + + tokio::spawn(async move { + warn!("Audio recovery started for {}: {}", lost_device, reason); + Self::publish_device_lost(&event_bus, &lost_device, &reason).await; + Self::publish_state( + &event_bus, + "device_lost", + Some(lost_device.clone()), + Some("audio_device_lost"), + Some(AUDIO_RECOVERY_RETRY_DELAY.as_millis() as u64), + ) + .await; + + let mut attempt = 0u32; + + loop { + if !recovery_in_progress.load(Ordering::SeqCst) { + debug!("Audio recovery canceled"); + return; + } + + if streamer_slot + .read() + .await + .as_ref() + .is_some_and(|s| s.is_running()) + { + recovery_in_progress.store(false, Ordering::SeqCst); + return; + } + + let cfg = config.read().await.clone(); + if !cfg.enabled { + recovery_in_progress.store(false, Ordering::SeqCst); + return; + } + + attempt = attempt.saturating_add(1); + Self::publish_reconnecting(&event_bus, &lost_device, attempt).await; + Self::publish_state( + &event_bus, + "device_lost", + Some(lost_device.clone()), + Some("audio_reconnecting"), + Some(AUDIO_RECOVERY_RETRY_DELAY.as_millis() as u64), + ) + .await; + + tokio::time::sleep(AUDIO_RECOVERY_RETRY_DELAY).await; + + let devices = match enumerate_audio_devices() { + Ok(devices) => devices, + Err(e) => { + debug!( + "Audio recovery enumerate failed (attempt {}): {}", + attempt, e + ); + continue; + } + }; + + let Some(device) = Self::select_recovery_device(&devices, &cfg.device) else { + debug!("No audio devices found during recovery attempt {}", attempt); + continue; + }; + + let streamer_config = AudioStreamerConfig { + capture: AudioConfig { + device_name: device.name.clone(), + ..Default::default() + }, + opus: cfg.quality.to_opus_config(), + }; + let new_streamer = Arc::new(AudioStreamer::with_config(streamer_config)); + + match new_streamer.start().await { + Ok(()) => { + { + let mut cfg = config.write().await; + cfg.device = device.name.clone(); + } + *streamer_slot.write().await = Some(new_streamer.clone()); + monitor.report_recovered().await; + Self::publish_recovered(&event_bus, &device.name).await; + if let Some(callback) = recovered_callback.read().await.clone() { + callback(); + } + Self::publish_state( + &event_bus, + "streaming", + Some(device.name.clone()), + None, + None, + ) + .await; + recovery_in_progress.store(false, Ordering::SeqCst); + info!( + "Audio device recovered with {} after {} attempts", + device.name, attempt + ); + Self::spawn_stream_monitor_from_parts( + config, + streamer_slot, + event_bus, + monitor, + recovery_in_progress, + recovered_callback, + new_streamer, + device.name, + ); + return; + } + Err(e) => { + debug!( + "Audio recovery start failed with {} (attempt {}): {}", + device.name, attempt, e + ); + } + } + } + }); + } + + fn spawn_recovery_task(&self, lost_device: String, reason: String) { + Self::spawn_recovery_task_from_parts( + self.config.clone(), + self.streamer.clone(), + self.event_bus.clone(), + self.monitor.clone(), + self.recovery_in_progress.clone(), + self.recovered_callback.clone(), + lost_device, + reason, + ); + } + + fn spawn_stream_monitor(&self, streamer: Arc, device: String) { + Self::spawn_stream_monitor_from_parts( + self.config.clone(), + self.streamer.clone(), + self.event_bus.clone(), + self.monitor.clone(), + self.recovery_in_progress.clone(), + self.recovered_callback.clone(), + streamer, + device, + ); + } + pub async fn list_devices(&self) -> Result> { let current_device = if self.is_streaming().await { Some(self.config.read().await.device.clone()) @@ -199,16 +500,28 @@ impl AudioController { return Ok(()); } + let mut select_error = None; let (device_name, quality) = { let mut cfg = self.config.write().await; if cfg.device.trim().is_empty() { - let best = find_best_audio_device()?; - cfg.device = best.name; + match find_best_audio_device() { + Ok(best) => cfg.device = best.name, + Err(e) => { + select_error = Some(format!("Failed to select audio device: {}", e)); + } + } } (cfg.device.clone(), cfg.quality) }; - info!("Starting audio streaming with device: {}", device_name); + if let Some(error_msg) = select_error { + self.monitor.report_error(&error_msg, "start_failed").await; + self.spawn_recovery_task("auto".to_string(), error_msg.clone()); + self.mark_device_info_dirty().await; + return Err(AppError::AudioError(error_msg)); + } + + debug!("Starting audio streaming with device: {}", device_name); self.monitor.prepare_retry_attempt(); @@ -226,18 +539,23 @@ impl AudioController { let error_msg = format!("Failed to start audio: {}", e); self.monitor.report_error(&error_msg, "start_failed").await; + self.spawn_recovery_task(device_name.clone(), error_msg.clone()); self.mark_device_info_dirty().await; return Err(AppError::AudioError(error_msg)); } + let streamer_for_monitor = streamer.clone(); *self.streamer.write().await = Some(streamer); + self.spawn_stream_monitor(streamer_for_monitor, device_name.clone()); if self.monitor.is_error().await { self.monitor.report_recovered().await; } + self.recovery_in_progress.store(false, Ordering::SeqCst); + self.mark_device_info_dirty().await; info!("Audio streaming started"); @@ -245,10 +563,13 @@ impl AudioController { } pub async fn stop_streaming(&self) -> Result<()> { + self.recovery_in_progress.store(false, Ordering::SeqCst); + if let Some(streamer) = self.streamer.write().await.take() { streamer.stop().await?; } + self.monitor.reset().await; self.mark_device_info_dirty().await; info!("Audio streaming stopped"); diff --git a/src/audio/encoder.rs b/src/audio/encoder.rs index 3095a9cc..4f214677 100644 --- a/src/audio/encoder.rs +++ b/src/audio/encoder.rs @@ -3,7 +3,7 @@ use audiopus::coder::GenericCtl; use audiopus::{coder::Encoder, Application, Bitrate, Channels, SampleRate}; use bytes::Bytes; -use tracing::info; +use tracing::debug; use super::capture::AudioFrame; use crate::error::{AppError, Result}; @@ -123,7 +123,7 @@ impl OpusEncoder { .map_err(|e| AppError::AudioError(format!("Failed to enable FEC: {:?}", e)))?; } - info!( + debug!( "Opus encoder created: {}Hz {}ch {}bps", config.sample_rate, config.channels, config.bitrate ); diff --git a/src/audio/streamer.rs b/src/audio/streamer.rs index d7864484..bdc1a207 100644 --- a/src/audio/streamer.rs +++ b/src/audio/streamer.rs @@ -3,13 +3,14 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; use tokio::sync::{broadcast, mpsc, watch, Mutex as AsyncMutex, RwLock}; -use tracing::{error, info, warn}; +use tracing::{debug, error, info, warn}; use super::capture::{AudioCapturer, AudioConfig, AudioFrame, CaptureState}; use super::encoder::{OpusConfig, OpusEncoder, OpusFrame}; use crate::error::{AppError, Result}; use bytemuck; use bytes::Bytes; +use std::time::Duration; /// 48 kHz stereo: 20 ms = 960 × 2 samples (S16LE). const OPUS_STEREO_SAMPLES: usize = 960 * 2; @@ -156,6 +157,49 @@ impl AudioStreamer { capturer.start().await?; + let mut capture_state = capturer.state_watch(); + let startup_result = tokio::time::timeout(Duration::from_secs(2), async { + loop { + let current_state = *capture_state.borrow(); + match current_state { + CaptureState::Running => return Ok(()), + CaptureState::Error => { + return Err(AppError::AudioError( + "Audio capture failed to start".to_string(), + )) + } + CaptureState::Stopped => { + if capture_state.changed().await.is_err() { + return Err(AppError::AudioError( + "Audio capture stopped during startup".to_string(), + )); + } + } + } + } + }) + .await; + + match startup_result { + Ok(Ok(())) => {} + Ok(Err(e)) => { + let _ = capturer.stop().await; + *self.capturer.write().await = None; + *self.encoder.lock().await = None; + let _ = self.state.send(AudioStreamState::Error); + return Err(e); + } + Err(_) => { + let _ = capturer.stop().await; + *self.capturer.write().await = None; + *self.encoder.lock().await = None; + let _ = self.state.send(AudioStreamState::Error); + return Err(AppError::AudioError( + "Timed out waiting for audio capture to start".to_string(), + )); + } + } + let capturer_for_task = capturer.clone(); let encoder = self.encoder.clone(); let opus_subscribers = self.opus_subscribers.clone(); @@ -232,7 +276,7 @@ impl AudioStreamer { let mut pcm_rx = capturer.subscribe(); let _ = state.send(AudioStreamState::Running); - info!("Audio stream task started (48 kHz stereo → Opus, mpsc fan-out)"); + debug!("Audio stream task started (48 kHz stereo → Opus, mpsc fan-out)"); let mut pending: Vec = Vec::new(); @@ -310,13 +354,18 @@ impl AudioStreamer { Err(_) => { if capturer.state() != CaptureState::Running { info!("Audio capture stopped, ending stream task"); + let _ = state.send(AudioStreamState::Error); break; } } } } - let _ = state.send(AudioStreamState::Stopped); + if stop_flag.load(Ordering::Relaxed) { + let _ = state.send(AudioStreamState::Stopped); + } else { + opus_subscribers.lock().unwrap().clear(); + } info!("Audio stream task ended"); } } diff --git a/src/extensions/manager.rs b/src/extensions/manager.rs index 732e2102..7cb9d9a2 100644 --- a/src/extensions/manager.rs +++ b/src/extensions/manager.rs @@ -116,7 +116,7 @@ impl ExtensionManager { "Starting extension {}: {} {}", id, id.binary_path(), - args.join(" ") + Self::redact_args_for_log(&args).join(" ") ); let mut child = Command::new(id.binary_path()) @@ -302,6 +302,34 @@ impl ExtensionManager { } } + fn redact_args_for_log(args: &[String]) -> Vec { + let mut redacted = Vec::with_capacity(args.len()); + let mut redact_next = false; + + for arg in args { + if redact_next { + redacted.push("****".to_string()); + redact_next = false; + continue; + } + + if arg == "-key" || arg == "--key" { + redacted.push(arg.clone()); + redact_next = true; + } else if let Some((flag, _)) = arg.split_once('=') { + if flag == "-key" || flag == "--key" { + redacted.push(format!("{}=****", flag)); + } else { + redacted.push(arg.clone()); + } + } else { + redacted.push(arg.clone()); + } + } + + redacted + } + async fn prepare_ttyd_socket() -> Result<(), String> { let socket_path = Path::new(TTYD_SOCKET_PATH); diff --git a/src/hid/otg.rs b/src/hid/otg.rs index 6a25e1e4..5acd91e3 100644 --- a/src/hid/otg.rs +++ b/src/hid/otg.rs @@ -827,15 +827,15 @@ impl OtgBackend { #[async_trait] impl HidBackend for OtgBackend { async fn init(&self) -> Result<()> { - info!("Initializing OTG HID backend"); + debug!("Initializing OTG HID backend"); if self.udc_name.read().is_none() { if let Some(udc) = Self::find_udc() { - info!("Auto-detected UDC: {}", udc); + debug!("Auto-detected UDC: {}", udc); self.set_udc_name(&udc); } } else if let Some(udc) = self.udc_name.read().clone() { - info!("Using configured UDC: {}", udc); + debug!("Using configured UDC: {}", udc); } let mut device_paths = Vec::new(); @@ -866,7 +866,7 @@ impl HidBackend for OtgBackend { if path.exists() { let file = Self::open_device(path)?; *self.keyboard_dev.lock() = Some(file); - info!("Keyboard device opened: {}", path.display()); + debug!("Keyboard device opened: {}", path.display()); } else { warn!("Keyboard device not found: {}", path.display()); } @@ -876,7 +876,7 @@ impl HidBackend for OtgBackend { if path.exists() { let file = Self::open_device(path)?; *self.mouse_rel_dev.lock() = Some(file); - info!("Relative mouse device opened: {}", path.display()); + debug!("Relative mouse device opened: {}", path.display()); } else { warn!("Relative mouse device not found: {}", path.display()); } @@ -886,7 +886,7 @@ impl HidBackend for OtgBackend { if path.exists() { let file = Self::open_device(path)?; *self.mouse_abs_dev.lock() = Some(file); - info!("Absolute mouse device opened: {}", path.display()); + debug!("Absolute mouse device opened: {}", path.display()); } else { warn!("Absolute mouse device not found: {}", path.display()); } @@ -896,7 +896,7 @@ impl HidBackend for OtgBackend { if path.exists() { let file = Self::open_device(path)?; *self.consumer_dev.lock() = Some(file); - info!("Consumer control device opened: {}", path.display()); + debug!("Consumer control device opened: {}", path.display()); } else { debug!("Consumer control device not found: {}", path.display()); } diff --git a/src/main.rs b/src/main.rs index 392f37ac..ac0f74dc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -424,13 +424,13 @@ async fn main() -> anyhow::Result<()> { if let Err(e) = webrtc_streamer.set_audio_enabled(true).await { tracing::warn!("Failed to enable WebRTC audio: {}", e); } else { - tracing::info!("WebRTC audio enabled"); + tracing::debug!("WebRTC audio enabled"); } } let (device_path, actual_resolution, actual_format, actual_fps, jpeg_quality) = streamer.current_capture_config().await; - tracing::info!( + tracing::debug!( "Initial video config: {}x{} {:?} @ {}fps", actual_resolution.width, actual_resolution.height, @@ -461,7 +461,7 @@ async fn main() -> anyhow::Result<()> { v4l2_driver, ) .await; - tracing::info!("WebRTC streamer configured for direct capture"); + tracing::debug!("WebRTC streamer configured for direct capture"); } else { tracing::warn!("No capture device configured for WebRTC"); } @@ -472,6 +472,18 @@ async fn main() -> anyhow::Result<()> { ); stream_manager.set_event_bus(events.clone()).await; stream_manager.set_config_store(config_store.clone()).await; + { + let stream_manager_weak = Arc::downgrade(&stream_manager); + audio + .set_recovered_callback(Arc::new(move || { + if let Some(stream_manager) = stream_manager_weak.upgrade() { + tokio::spawn(async move { + stream_manager.reconnect_webrtc_audio_sources().await; + }); + } + })) + .await; + } let initial_mode = config.stream.mode.clone(); if let Err(e) = stream_manager.init_with_mode(initial_mode.clone()).await { diff --git a/src/otg/manager.rs b/src/otg/manager.rs index c507147e..8f1679dc 100644 --- a/src/otg/manager.rs +++ b/src/otg/manager.rs @@ -163,7 +163,7 @@ impl OtgGadgetManager { } pub fn setup(&mut self) -> Result<()> { - info!("Setting up OTG USB Gadget: {}", self.gadget_name); + debug!("Setting up OTG USB Gadget: {}", self.gadget_name); if !Self::is_available() { return Err(AppError::Internal( @@ -173,7 +173,7 @@ impl OtgGadgetManager { if self.gadget_exists() { if self.is_bound() { - info!("Gadget already exists and is bound, skipping setup"); + debug!("Gadget already exists and is bound, skipping setup"); return Ok(()); } warn!("Gadget exists but not bound, will reconfigure"); @@ -194,7 +194,7 @@ impl OtgGadgetManager { func.link(&self.config_path, &self.gadget_path)?; } - info!("OTG USB Gadget setup complete"); + debug!("OTG USB Gadget setup complete"); Ok(()) } @@ -203,7 +203,7 @@ impl OtgGadgetManager { warn!("Failed to recreate gadget config links before bind: {}", e); } - info!("Binding gadget to UDC: {}", udc); + debug!("Binding gadget to UDC: {}", udc); write_file(&self.gadget_path.join("UDC"), &udc)?; self.bound_udc = Some(udc.to_string()); std::thread::sleep(std::time::Duration::from_millis(REBIND_DELAY_MS)); diff --git a/src/otg/service.rs b/src/otg/service.rs index 8988a04a..fe9b6f6d 100644 --- a/src/otg/service.rs +++ b/src/otg/service.rs @@ -148,7 +148,7 @@ impl OtgService { async fn reconcile_gadget(&self) -> Result<()> { let desired = self.desired.read().await.clone(); - info!( + debug!( "Reconciling OTG gadget: HID={}, MSD={}, UDC={:?}", desired.hid_enabled(), desired.msd_enabled, @@ -166,7 +166,7 @@ impl OtgService { && state.max_endpoints == desired.max_endpoints && state.descriptor.as_ref() == Some(&desired.descriptor) { - info!("OTG gadget already matches desired state"); + debug!("OTG gadget already matches desired state"); return Ok(()); } } @@ -174,7 +174,7 @@ impl OtgService { { let mut manager = self.manager.lock().await; if let Some(mut m) = manager.take() { - info!("Cleaning up existing gadget before OTG reconcile"); + debug!("Cleaning up existing gadget before OTG reconcile"); if let Err(e) = m.cleanup() { warn!("Error cleaning up existing gadget: {}", e); } diff --git a/src/video/capture_status.rs b/src/video/capture_status.rs index 1b9f7ea8..565510cc 100644 --- a/src/video/capture_status.rs +++ b/src/video/capture_status.rs @@ -28,6 +28,15 @@ pub fn classify_capture_io_error(err: &io::Error) -> CaptureIoErrorKind { } } +pub fn is_device_lost_message(message: &str) -> bool { + message.contains("No such file or directory") + || message.contains("No such device") + || message.contains("os error 2") + || message.contains("ENODEV") + || message.contains("ENXIO") + || message.contains("ESHUTDOWN") +} + pub fn capture_error_log_key(err: &io::Error) -> String { let message = err.to_string(); if message.contains("dqbuf failed") && message.contains("EINVAL") { diff --git a/src/video/device.rs b/src/video/device.rs index 3a61f7a0..30e2a796 100644 --- a/src/video/device.rs +++ b/src/video/device.rs @@ -61,6 +61,29 @@ pub struct VideoDeviceInfo { pub bridge_kind: Option, } +#[derive(Debug, Clone)] +pub struct VideoDeviceRecoveryHint { + pub path: PathBuf, + pub name: String, + pub driver: String, + pub bus_info: String, + pub card: String, + pub is_capture_card: bool, +} + +impl From<&VideoDeviceInfo> for VideoDeviceRecoveryHint { + fn from(device: &VideoDeviceInfo) -> Self { + Self { + path: device.path.clone(), + name: device.name.clone(), + driver: device.driver.clone(), + bus_info: device.bus_info.clone(), + card: device.card.clone(), + is_capture_card: device.is_capture_card, + } + } +} + /// Information about a supported format #[derive(Debug, Clone, Serialize, Deserialize)] pub struct FormatInfo { @@ -850,7 +873,7 @@ impl VideoDevice { /// Enumerate all video capture devices pub fn enumerate_devices() -> Result> { - info!("Enumerating video devices..."); + debug!("Enumerating video devices..."); // First pass: collect candidates that pass the sysfs-based pre-filter. // This avoids opening orphan /dev/videoN nodes (ENODEV) and m2m codec @@ -934,6 +957,51 @@ pub fn enumerate_devices() -> Result> { Ok(devices) } +pub fn select_recovery_device( + devices: &[VideoDeviceInfo], + hint: &VideoDeviceRecoveryHint, +) -> Option { + devices + .iter() + .find(|device| device.path == hint.path) + .or_else(|| { + if hint.bus_info.trim().is_empty() { + None + } else { + devices + .iter() + .find(|device| device.bus_info == hint.bus_info) + } + }) + .or_else(|| { + if hint.driver.trim().is_empty() || hint.card.trim().is_empty() { + None + } else { + devices + .iter() + .find(|device| device.driver == hint.driver && device.card == hint.card) + } + }) + .or_else(|| { + if hint.driver.trim().is_empty() || hint.name.trim().is_empty() { + None + } else { + devices + .iter() + .find(|device| device.driver == hint.driver && device.name == hint.name) + } + }) + .or_else(|| { + if hint.is_capture_card { + devices.iter().find(|device| device.is_capture_card) + } else { + None + } + }) + .or_else(|| devices.first()) + .cloned() +} + /// Collapse platform sub-device nodes that share the same driver + bus_info /// into a single entry (the one with the highest priority / most formats). /// Currently applies to the `rkcif` driver on Rockchip SoCs where each @@ -1215,6 +1283,35 @@ pub fn find_best_device() -> Result { mod tests { use super::*; + fn test_device( + path: &str, + name: &str, + driver: &str, + bus_info: &str, + card: &str, + is_capture_card: bool, + priority: u32, + ) -> VideoDeviceInfo { + VideoDeviceInfo { + path: PathBuf::from(path), + name: name.to_string(), + driver: driver.to_string(), + bus_info: bus_info.to_string(), + card: card.to_string(), + formats: Vec::new(), + capabilities: DeviceCapabilities { + video_capture: true, + streaming: true, + ..Default::default() + }, + is_capture_card, + priority, + has_signal: true, + subdev_path: None, + bridge_kind: None, + } + } + #[test] fn test_pixel_format_conversion() { let format = PixelFormat::Mjpeg; @@ -1230,4 +1327,70 @@ mod tests { assert_eq!(res.height, 1080); assert!(res.is_valid()); } + + #[test] + fn recovery_selection_prefers_original_path() { + let original = test_device( + "/dev/video0", + "USB Capture", + "uvcvideo", + "usb-1", + "USB Capture", + true, + 100, + ); + let other = test_device( + "/dev/video2", + "USB Capture", + "uvcvideo", + "usb-1", + "USB Capture", + true, + 200, + ); + let hint = VideoDeviceRecoveryHint::from(&original); + let selected = select_recovery_device(&[other, original.clone()], &hint).unwrap(); + assert_eq!(selected.path, original.path); + } + + #[test] + fn recovery_selection_matches_bus_info_after_path_change() { + let original = test_device( + "/dev/video0", + "USB Capture", + "uvcvideo", + "usb-1", + "USB Capture", + true, + 100, + ); + let recovered = test_device( + "/dev/video3", + "USB Capture", + "uvcvideo", + "usb-1", + "USB Capture", + true, + 100, + ); + let hint = VideoDeviceRecoveryHint::from(&original); + let selected = select_recovery_device(&[recovered.clone()], &hint).unwrap(); + assert_eq!(selected.path, recovered.path); + } + + #[test] + fn recovery_selection_falls_back_to_capture_priority() { + let hint = VideoDeviceRecoveryHint { + path: PathBuf::from("/dev/video9"), + name: "Gone".to_string(), + driver: "gone".to_string(), + bus_info: String::new(), + card: "Gone".to_string(), + is_capture_card: true, + }; + let lower = test_device("/dev/video1", "A", "uvcvideo", "usb-a", "A", true, 10); + let higher = test_device("/dev/video2", "B", "uvcvideo", "usb-b", "B", true, 20); + let selected = select_recovery_device(&[higher.clone(), lower], &hint).unwrap(); + assert_eq!(selected.path, higher.path); + } } diff --git a/src/video/shared_video_pipeline.rs b/src/video/shared_video_pipeline.rs index a5603a75..8d9402a3 100644 --- a/src/video/shared_video_pipeline.rs +++ b/src/video/shared_video_pipeline.rs @@ -45,8 +45,8 @@ use crate::error::{AppError, Result}; use crate::utils::LogThrottler; use crate::video::capture_limits::{should_validate_jpeg_frame, MIN_CAPTURE_FRAME_SIZE}; use crate::video::capture_status::{ - capture_error_log_key, classify_capture_io_error, signal_status_from_capture_kind, - CaptureIoErrorKind, + capture_error_log_key, classify_capture_io_error, is_device_lost_message, + signal_status_from_capture_kind, CaptureIoErrorKind, }; use crate::video::csi_bridge::{self, ProbeResult}; use crate::video::device::parse_bridge_kind; @@ -272,6 +272,7 @@ pub struct SharedVideoPipeline { /// Uses AtomicI64 instead of Mutex for lock-free access pipeline_start_time_ms: AtomicI64, pending_sync_geometry: ParkingMutex>, + device_lost_reason: ParkingMutex>, state_notifier: ParkingRwLock>>, last_state_notification: ParkingMutex>, } @@ -377,6 +378,7 @@ impl SharedVideoPipeline { keyframe_requested: AtomicBool::new(false), pipeline_start_time_ms: AtomicI64::new(0), pending_sync_geometry: ParkingMutex::new(None), + device_lost_reason: ParkingMutex::new(None), state_notifier: ParkingRwLock::new(None), last_state_notification: ParkingMutex::new(None), }); @@ -388,6 +390,14 @@ impl SharedVideoPipeline { self.pending_sync_geometry.lock().take() } + pub fn take_device_lost_reason(&self) -> Option { + self.device_lost_reason.lock().take() + } + + fn mark_device_lost(&self, reason: String) { + *self.device_lost_reason.lock() = Some(reason); + } + pub fn set_state_notifier( &self, notifier: Option>, @@ -783,6 +793,7 @@ impl SharedVideoPipeline { enum OpenResult { Opened(V4l2rCaptureStream), NoSignal(SignalStatus), + DeviceLost(String), Fatal, } @@ -807,6 +818,11 @@ impl SharedVideoPipeline { OpenResult::NoSignal(signal_status_from_capture_kind(&kind)) } Err(e) => { + let reason = e.to_string(); + if is_device_lost_message(&reason) { + error!("Capture device lost during soft-restart: {}", e); + return OpenResult::DeviceLost(reason); + } error!("Capture soft-restart failed: {}", e); OpenResult::Fatal } @@ -950,6 +966,13 @@ impl SharedVideoPipeline { std::thread::sleep(Duration::from_millis(wait_ms)); continue; } + OpenResult::DeviceLost(reason) => { + pipeline.mark_device_lost(reason); + let _ = pipeline.running.send(false); + pipeline.running_flag.store(false, Ordering::Release); + let _ = frame_seq_tx.send(sequence.wrapping_add(1)); + break; + } OpenResult::Fatal => { let _ = pipeline.running.send(false); pipeline.running_flag.store(false, Ordering::Release); @@ -1137,6 +1160,7 @@ impl SharedVideoPipeline { } CaptureIoErrorKind::DeviceLost => { error!("Capture device lost: {}", e); + pipeline.mark_device_lost(e.to_string()); let _ = pipeline.running.send(false); pipeline.running_flag.store(false, Ordering::Release); let _ = frame_seq_tx.send(sequence.wrapping_add(1)); diff --git a/src/video/streamer.rs b/src/video/streamer.rs index 2c1ca194..29783c72 100644 --- a/src/video/streamer.rs +++ b/src/video/streamer.rs @@ -13,7 +13,8 @@ use tracing::{debug, error, info, trace, warn}; use super::csi_bridge; use super::device::{ - enumerate_devices, find_best_device, parse_bridge_kind, VideoDevice, VideoDeviceInfo, + enumerate_devices, find_best_device, parse_bridge_kind, select_recovery_device, VideoDevice, + VideoDeviceInfo, VideoDeviceRecoveryHint, }; use super::format::{PixelFormat, Resolution}; use super::frame::{FrameBuffer, FrameBufferPool, VideoFrame}; @@ -366,7 +367,7 @@ impl Streamer { // IMPORTANT: Disconnect all MJPEG clients FIRST before stopping capture // This prevents race conditions where clients try to reconnect and reopen the device - info!("Disconnecting all MJPEG clients before config change..."); + debug!("Disconnecting all MJPEG clients before config change..."); self.mjpeg_handler.disconnect_all_clients(); // Give clients time to receive the disconnect signal and close their connections @@ -392,7 +393,7 @@ impl Streamer { *self.state.write().await = StreamerState::Ready; // Publish "config applied" event - info!( + debug!( "Publishing StreamConfigApplied event: {}x{} {:?} @ {}fps", resolution.width, resolution.height, format, fps ); @@ -408,7 +409,7 @@ impl Streamer { // Note: We don't auto-start here anymore. // The stream will be started when MJPEG client connects (handlers.rs:790) // This avoids race conditions between config change and client reconnection. - info!("Config applied, stream will start when client connects"); + debug!("Config applied, stream will start when client connects"); Ok(()) } @@ -1305,6 +1306,7 @@ impl Streamer { { let mut cfg = self.config.write().await; + cfg.device_path = Some(device_info.path.clone()); cfg.format = format; cfg.resolution = resolution; } @@ -1392,6 +1394,20 @@ impl Streamer { .await .clone() .unwrap_or_else(|| "Device lost".to_string()); + let recovery_hint = self + .current_device + .read() + .await + .as_ref() + .map(VideoDeviceRecoveryHint::from) + .unwrap_or_else(|| VideoDeviceRecoveryHint { + path: PathBuf::from(&device), + name: String::new(), + driver: String::new(), + bus_info: String::new(), + card: String::new(), + is_capture_card: true, + }); // Store error info *self.last_lost_device.write().await = Some(device.clone()); @@ -1409,7 +1425,7 @@ impl Streamer { // Start recovery task let streamer = Arc::clone(self); tokio::spawn(async move { - let device_path = device.clone(); + let original_device_path = device.clone(); loop { let attempt = streamer @@ -1433,13 +1449,13 @@ impl Streamer { if attempt == 1 || attempt.is_multiple_of(5) { streamer .publish_event(SystemEvent::StreamReconnecting { - device: device_path.clone(), + device: original_device_path.clone(), attempt, }) .await; info!( "Attempting to recover video device {} (attempt {})", - device_path, attempt + original_device_path, attempt ); } @@ -1450,9 +1466,28 @@ impl Streamer { }; tokio::time::sleep(wait).await; - // Check if device file exists - let device_exists = std::path::Path::new(&device_path).exists(); - if !device_exists { + let devices = match enumerate_devices() { + Ok(devices) => devices, + Err(e) => { + debug!("Failed to enumerate devices during recovery: {}", e); + continue; + } + }; + + let Some(device) = select_recovery_device(&devices, &recovery_hint) else { + debug!("No matching video device present yet for recovery"); + continue; + }; + + let device_path = device.path.display().to_string(); + if device_path != original_device_path { + info!( + "Recovered video device path changed: {} -> {}", + original_device_path, device_path + ); + } + + if !std::path::Path::new(&device_path).exists() { debug!("Device {} not present yet", device_path); continue; } diff --git a/src/web/handlers/config/apply.rs b/src/web/handlers/config/apply.rs index 3e0677ac..25f7c114 100644 --- a/src/web/handlers/config/apply.rs +++ b/src/web/handlers/config/apply.rs @@ -365,7 +365,7 @@ pub async fn apply_audio_config( .stream_manager .set_webrtc_audio_enabled(new_config.enabled) .await?; - tracing::info!("WebRTC audio enabled: {}", new_config.enabled); + tracing::debug!("WebRTC audio enabled: {}", new_config.enabled); if new_config.enabled { state.stream_manager.reconnect_webrtc_audio_sources().await; diff --git a/src/webrtc/webrtc_streamer.rs b/src/webrtc/webrtc_streamer.rs index a67fefa4..c69beed1 100644 --- a/src/webrtc/webrtc_streamer.rs +++ b/src/webrtc/webrtc_streamer.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use std::path::PathBuf; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock as StdRwLock}; use tokio::sync::RwLock; use tracing::{debug, info, trace, warn}; @@ -10,6 +11,9 @@ use crate::audio::{AudioController, OpusFrame}; use crate::error::{AppError, Result}; use crate::events::{EventBus, SystemEvent}; use crate::hid::HidController; +use crate::video::device::{ + enumerate_devices, select_recovery_device, VideoDevice, VideoDeviceRecoveryHint, +}; use crate::video::types::{ BitratePreset, EncoderBackend, PipelineStateNotification, PixelFormat, Resolution, SharedVideoPipeline, SharedVideoPipelineConfig, SharedVideoPipelineStats, VideoCodecType, @@ -56,6 +60,7 @@ pub struct CaptureDeviceConfig { pub bridge_kind: Option, /// V4L2 driver name (e.g. `uvcvideo`) for UVC-specific recovery hints. pub v4l2_driver: Option, + pub recovery_hint: VideoDeviceRecoveryHint, } #[derive(Debug, Clone, Default)] @@ -88,6 +93,7 @@ pub struct WebRtcStreamer { audio_controller: RwLock>>, hid_controller: RwLock>>, events: RwLock>>, + recovery_in_progress: AtomicBool, self_weak: StdRwLock>>, } @@ -107,6 +113,7 @@ impl WebRtcStreamer { audio_controller: RwLock::new(None), hid_controller: RwLock::new(None), events: RwLock::new(None), + recovery_in_progress: AtomicBool::new(false), self_weak: StdRwLock::new(None), }); let weak = Arc::downgrade(&streamer); @@ -283,6 +290,156 @@ impl WebRtcStreamer { Ok(sessions_to_reconnect.len()) } + async fn publish_stream_event(&self, event: SystemEvent) { + if let Some(events) = self.events.read().await.as_ref() { + events.publish(event); + events.mark_device_info_dirty(); + } + } + + async fn update_recovered_capture_device( + &self, + device: crate::video::device::VideoDeviceInfo, + ) -> Result<()> { + let (format, resolution, fps, jpeg_quality, buffer_count) = { + let config = self.config.read().await; + let current_capture = self.capture_device.read().await.clone(); + ( + config.input_format, + config.resolution, + config.fps, + current_capture + .as_ref() + .map(|capture| capture.jpeg_quality) + .unwrap_or(80), + current_capture + .as_ref() + .map(|capture| capture.buffer_count) + .unwrap_or(2), + ) + }; + + let pipeline_config = CaptureDeviceConfig { + device_path: device.path.clone(), + buffer_count, + jpeg_quality, + subdev_path: device.subdev_path.clone(), + bridge_kind: device.bridge_kind.clone(), + v4l2_driver: Some(device.driver.clone()), + recovery_hint: VideoDeviceRecoveryHint::from(&device), + }; + + *self.capture_device.write().await = Some(pipeline_config); + let mut config = self.config.write().await; + config.input_format = format; + config.resolution = resolution; + config.fps = fps; + Ok(()) + } + + fn start_device_recovery(self: &Arc, hint: VideoDeviceRecoveryHint, reason: String) { + if self.recovery_in_progress.swap(true, Ordering::SeqCst) { + debug!("WebRTC video recovery already in progress"); + return; + } + + let streamer = self.clone(); + tokio::spawn(async move { + let original_device = hint.path.display().to_string(); + warn!( + "WebRTC video recovery started for {}: {}", + original_device, reason + ); + streamer + .publish_stream_event(SystemEvent::StreamDeviceLost { + device: original_device.clone(), + reason: reason.clone(), + }) + .await; + + let mut attempt = 0u32; + loop { + attempt = attempt.saturating_add(1); + streamer + .publish_stream_event(SystemEvent::StreamReconnecting { + device: original_device.clone(), + attempt, + }) + .await; + streamer + .publish_stream_event(SystemEvent::StreamStateChanged { + state: "device_lost".to_string(), + device: Some(original_device.clone()), + reason: Some("recovering".to_string()), + next_retry_ms: Some(1_000), + }) + .await; + + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + let devices = match enumerate_devices() { + Ok(devices) => devices, + Err(e) => { + debug!("WebRTC video recovery enumerate failed: {}", e); + continue; + } + }; + + let Some(device) = select_recovery_device(&devices, &hint) else { + debug!("No matching WebRTC video device found during recovery"); + continue; + }; + + if let Err(e) = streamer + .update_recovered_capture_device(device.clone()) + .await + { + debug!("Failed to update recovered capture device: {}", e); + continue; + } + + match streamer.ensure_video_pipeline().await { + Ok(_) => { + match streamer + .reconnect_sessions_to_current_pipeline("device recovery") + .await + { + Ok(reconnected) => { + info!( + "WebRTC video recovered with {} after {} attempts, reconnected {} sessions", + device.path.display(), + attempt, + reconnected + ); + streamer + .publish_stream_event(SystemEvent::StreamRecovered { + device: device.path.display().to_string(), + }) + .await; + streamer + .publish_stream_event(SystemEvent::StreamStateChanged { + state: "streaming".to_string(), + device: Some(device.path.display().to_string()), + reason: None, + next_retry_ms: None, + }) + .await; + streamer.recovery_in_progress.store(false, Ordering::SeqCst); + return; + } + Err(e) => { + debug!("Failed to reconnect WebRTC sessions after recovery: {}", e); + } + } + } + Err(e) => { + debug!("Failed to restart WebRTC video pipeline: {}", e); + } + } + } + }); + } + /// Ensure video pipeline is initialized and running async fn ensure_video_pipeline(&self) -> Result> { let mut pipeline_guard = self.video_pipeline.write().await; @@ -344,6 +501,7 @@ impl WebRtcStreamer { // Clear pipeline reference in WebRtcStreamer if let Some(streamer) = streamer_weak.upgrade() { let mut pending_geometry: Option<(Resolution, PixelFormat)> = None; + let mut device_lost_reason: Option = None; let mut pipeline_guard = streamer.video_pipeline.write().await; // Only clear if it's the same pipeline that stopped if let Some(ref current) = *pipeline_guard { @@ -351,6 +509,7 @@ impl WebRtcStreamer { if Arc::ptr_eq(current, &stopped_pipeline) { pending_geometry = stopped_pipeline.take_pending_sync_geometry(); + device_lost_reason = stopped_pipeline.take_device_lost_reason(); *pipeline_guard = None; info!("Cleared stopped video pipeline reference"); } @@ -358,6 +517,13 @@ impl WebRtcStreamer { } drop(pipeline_guard); + if let Some(reason) = device_lost_reason { + if let Some(capture) = streamer.capture_device.read().await.clone() { + streamer.start_device_recovery(capture.recovery_hint, reason); + } + continue; + } + let should_reconnect = pending_geometry.is_some(); if let Some((r, f)) = pending_geometry { streamer.sync_video_geometry_from_negotiated(r, f).await; @@ -466,13 +632,13 @@ impl WebRtcStreamer { } } - info!("WebRTC audio enabled: {}", enabled); + debug!("WebRTC audio enabled: {}", enabled); Ok(()) } /// Set audio controller reference pub async fn set_audio_controller(&self, controller: Arc) { - info!("Setting audio controller for WebRTC streamer"); + debug!("Setting audio controller for WebRTC streamer"); *self.audio_controller.write().await = Some(controller.clone()); // Reconnect audio for existing sessions if audio is enabled @@ -516,10 +682,21 @@ impl WebRtcStreamer { bridge_kind: Option, v4l2_driver: Option, ) { - info!( + debug!( "Setting direct capture device for WebRTC: {:?} (subdev={:?}, kind={:?}, driver={:?})", device_path, subdev_path, bridge_kind, v4l2_driver ); + let recovery_hint = VideoDevice::open_readonly(&device_path) + .and_then(|device| device.info()) + .map(|info| VideoDeviceRecoveryHint::from(&info)) + .unwrap_or_else(|_| VideoDeviceRecoveryHint { + path: device_path.clone(), + name: String::new(), + driver: v4l2_driver.clone().unwrap_or_default(), + bus_info: String::new(), + card: String::new(), + is_capture_card: true, + }); *self.capture_device.write().await = Some(CaptureDeviceConfig { device_path, buffer_count: 2, @@ -527,6 +704,7 @@ impl WebRtcStreamer { subdev_path, bridge_kind, v4l2_driver, + recovery_hint, }); } @@ -1164,6 +1342,7 @@ impl Default for WebRtcStreamer { audio_controller: RwLock::new(None), hid_controller: RwLock::new(None), events: RwLock::new(None), + recovery_in_progress: AtomicBool::new(false), self_weak: StdRwLock::new(None), } } diff --git a/web/src/composables/useAudioPlayer.ts b/web/src/composables/useAudioPlayer.ts index 70f03b90..97b66937 100644 --- a/web/src/composables/useAudioPlayer.ts +++ b/web/src/composables/useAudioPlayer.ts @@ -16,6 +16,31 @@ export function useAudioPlayer() { let decoder: any = null let nextPlayTime = 0 let isConnecting = false // Prevent concurrent connection attempts + let reconnectTimer: number | null = null + let shouldReconnect = false + + function clearReconnectTimer() { + if (reconnectTimer !== null) { + clearTimeout(reconnectTimer) + reconnectTimer = null + } + } + + function scheduleReconnect() { + if (!shouldReconnect || volume.value <= 0 || reconnectTimer !== null) { + return + } + + reconnectTimer = window.setTimeout(() => { + reconnectTimer = null + if (!shouldReconnect || volume.value <= 0) { + return + } + connect().catch(() => { + scheduleReconnect() + }) + }, 1000) + } async function initDecoder() { const opusDecoder = new OpusDecoder({ @@ -34,6 +59,8 @@ export function useAudioPlayer() { } async function connect() { + shouldReconnect = true + // Prevent concurrent connection attempts (critical fix for multiple WS connections) if (isConnecting) { return @@ -52,6 +79,7 @@ export function useAudioPlayer() { } isConnecting = true + clearReconnectTimer() try { if (!decoder) await initDecoder() @@ -72,6 +100,7 @@ export function useAudioPlayer() { connected.value = true playing.value = true error.value = null + clearReconnectTimer() nextPlayTime = audioContext!.currentTime } @@ -83,8 +112,10 @@ export function useAudioPlayer() { ws.onclose = () => { isConnecting = false + ws = null connected.value = false playing.value = false + scheduleReconnect() } ws.onerror = () => { @@ -94,10 +125,13 @@ export function useAudioPlayer() { } catch (e) { isConnecting = false error.value = e instanceof Error ? e.message : 'Failed to initialize audio' + scheduleReconnect() } } function disconnect() { + shouldReconnect = false + clearReconnectTimer() if (ws) { ws.close() ws = null @@ -172,6 +206,11 @@ export function useAudioPlayer() { function setVolume(v: number) { volume.value = Math.max(0, Math.min(1, v)) updateVolume() + if (volume.value <= 0) { + clearReconnectTimer() + } else if (shouldReconnect && !connected.value && !isConnecting) { + scheduleReconnect() + } } watch(volume, updateVolume)