From 099f0b1ca2c9181dff526a3c4782965bed72af62 Mon Sep 17 00:00:00 2001 From: mofeng-git Date: Sat, 11 Apr 2026 21:20:54 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BC=98=E5=8C=96=E8=A7=86=E9=A2=91?= =?UTF-8?q?=E5=88=87=E6=8D=A2=E6=B5=81=E7=95=85=E6=80=A7=EF=BC=9B=E4=BF=AE?= =?UTF-8?q?=E5=A4=8D=20OTG=20HID=20=E5=8A=9F=E8=83=BD=E6=97=A0=E6=B3=95?= =?UTF-8?q?=E4=B8=80=E6=AC=A1=E4=BF=9D=E5=AD=98=E6=88=90=E5=8A=9F=E5=92=8C?= =?UTF-8?q?=E9=A1=B5=E9=9D=A2=E6=9C=AA=E5=8D=B3=E5=88=BB=E7=94=9F=E6=95=88?= =?UTF-8?q?=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/config/store.rs | 11 ++++-- src/video/shared_video_pipeline.rs | 35 ++++++++++++++++- src/video/stream_manager.rs | 62 +++++++++++++++++++++++++++++- src/web/handlers/mod.rs | 55 ++++++++++++++++++++------ src/webrtc/webrtc_streamer.rs | 16 ++++++++ web/src/composables/useWebRTC.ts | 16 +++----- web/src/views/ConsoleView.vue | 37 ++++++++++++------ web/src/views/SettingsView.vue | 60 +++++++++++++---------------- 8 files changed, 219 insertions(+), 73 deletions(-) diff --git a/src/config/store.rs b/src/config/store.rs index 2a69ade0..5547cbae 100644 --- a/src/config/store.rs +++ b/src/config/store.rs @@ -4,6 +4,7 @@ use std::path::Path; use std::sync::Arc; use std::time::Duration; use tokio::sync::broadcast; +use tokio::sync::Mutex; use super::AppConfig; use crate::error::{AppError, Result}; @@ -18,6 +19,8 @@ pub struct ConfigStore { /// Lock-free cache using ArcSwap for zero-cost reads cache: Arc>, change_tx: broadcast::Sender, + /// Serializes `set` / `update` so concurrent PATCH handlers cannot clobber each other + write_lock: Arc>, } /// Configuration change event @@ -59,6 +62,7 @@ impl ConfigStore { pool, cache, change_tx, + write_lock: Arc::new(Mutex::new(())), }) } @@ -191,6 +195,7 @@ impl ConfigStore { /// Set entire configuration pub async fn set(&self, config: AppConfig) -> Result<()> { + let _guard = self.write_lock.lock().await; Self::save_config_to_db(&self.pool, &config).await?; self.cache.store(Arc::new(config)); @@ -204,13 +209,13 @@ impl ConfigStore { /// Update configuration with a closure /// - /// Note: This uses a read-modify-write pattern. For concurrent updates, - /// the last write wins. This is acceptable for configuration changes - /// which are infrequent and typically user-initiated. + /// Uses read-modify-write under a mutex so concurrent `update` / `set` calls are serialized + /// and merged correctly (each closure sees the latest stored config). pub async fn update(&self, f: F) -> Result<()> where F: FnOnce(&mut AppConfig), { + let _guard = self.write_lock.lock().await; // Load current config, clone it for modification let current = self.cache.load(); let mut config = (**current).clone(); diff --git a/src/video/shared_video_pipeline.rs b/src/video/shared_video_pipeline.rs index cf84d1a2..23f8ed8b 100644 --- a/src/video/shared_video_pipeline.rs +++ b/src/video/shared_video_pipeline.rs @@ -844,7 +844,7 @@ impl SharedVideoPipeline { } } - /// Stop the pipeline + /// Stop the pipeline (non-blocking, does not wait for capture thread to exit) pub fn stop(&self) { if *self.running_rx.borrow() { let _ = self.running.send(false); @@ -854,6 +854,39 @@ impl SharedVideoPipeline { } } + /// Stop the pipeline and wait for the capture thread to fully exit. + /// + /// This ensures the V4L2 device is released before returning, which is + /// necessary when another consumer (e.g. MJPEG streamer) needs to open + /// the same device immediately after. + pub async fn stop_and_wait(&self, timeout: std::time::Duration) { + self.stop(); + let mut rx = self.running_watch(); + if !*rx.borrow() { + // Capture thread may still be running from a previous `stop()` call. + // Wait for the "Video pipeline stopped" log (thread sets running=false + // at exit), unless it already happened. + } + let deadline = tokio::time::Instant::now() + timeout; + loop { + if !self.running_flag.load(Ordering::Acquire) { + // Flag is cleared, but the capture thread may still be unwinding + // (dropping the V4L2 stream). Give it a brief moment. + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + break; + } + let remaining = deadline.saturating_duration_since(tokio::time::Instant::now()); + if remaining.is_zero() { + warn!( + "Timed out waiting for video pipeline to stop after {:?}", + timeout + ); + break; + } + let _ = tokio::time::timeout(remaining, rx.changed()).await; + } + } + /// Set bitrate using preset pub async fn set_bitrate_preset( &self, diff --git a/src/video/stream_manager.rs b/src/video/stream_manager.rs index 095c8af0..7ffe755e 100644 --- a/src/video/stream_manager.rs +++ b/src/video/stream_manager.rs @@ -404,8 +404,11 @@ impl VideoStreamManager { } } StreamMode::WebRTC => { - info!("Closing all WebRTC sessions"); - let closed = self.webrtc_streamer.close_all_sessions().await; + info!("Closing all WebRTC sessions and releasing capture device"); + let closed = self + .webrtc_streamer + .close_all_sessions_and_release_device() + .await; if closed > 0 { info!("Closed {} WebRTC sessions", closed); } @@ -781,6 +784,61 @@ impl VideoStreamManager { self.webrtc_streamer.request_keyframe().await } + /// Notify frontend about a codec-only switch (WebRTC mode unchanged, codec changed). + /// + /// `set_video_codec` already rebuilt the pipeline synchronously, so we just + /// emit the events the frontend waits on: `StreamModeChanged`, `WebRTCReady`, + /// and `StreamModeReady`. + /// + /// Events are spawned asynchronously so the HTTP response (carrying the + /// `transition_id`) reaches the client before the WebSocket events, giving + /// the frontend time to call `registerTransition()` first. + pub async fn notify_codec_switch( + self: &Arc, + transition_id: &str, + new_codec_str: &str, + previous_codec_str: &str, + ) { + let manager = Arc::clone(self); + let transition_id = transition_id.to_string(); + let new_codec = new_codec_str.to_string(); + let prev_codec = previous_codec_str.to_string(); + + tokio::spawn(async move { + // Small yield to ensure the HTTP response is flushed first. + tokio::task::yield_now().await; + + manager + .publish_event(SystemEvent::StreamModeChanged { + transition_id: Some(transition_id.clone()), + mode: new_codec.clone(), + previous_mode: prev_codec.clone(), + }) + .await; + + let is_hardware = manager.webrtc_streamer.is_hardware_encoding().await; + manager + .publish_event(SystemEvent::WebRTCReady { + transition_id: Some(transition_id.clone()), + codec: new_codec.clone(), + hardware: is_hardware, + }) + .await; + + manager + .publish_event(SystemEvent::StreamModeReady { + transition_id: transition_id.clone(), + mode: new_codec.clone(), + }) + .await; + + info!( + "Codec switch notified: {} -> {} (transition: {})", + prev_codec, new_codec, transition_id + ); + }); + } + /// Publish event to event bus async fn publish_event(&self, event: SystemEvent) { if let Some(ref events) = *self.events.read().await { diff --git a/src/web/handlers/mod.rs b/src/web/handlers/mod.rs index f6480705..aa004d8b 100644 --- a/src/web/handlers/mod.rs +++ b/src/web/handlers/mod.rs @@ -1282,7 +1282,26 @@ pub async fn stream_mode_set( } } - // Set video codec if switching to WebRTC mode with specific codec + let requested_mode_str = match (&new_mode, &video_codec) { + (StreamMode::Mjpeg, _) => "mjpeg", + (StreamMode::WebRTC, Some(VideoCodecType::H264)) => "h264", + (StreamMode::WebRTC, Some(VideoCodecType::H265)) => "h265", + (StreamMode::WebRTC, Some(VideoCodecType::VP8)) => "vp8", + (StreamMode::WebRTC, Some(VideoCodecType::VP9)) => "vp9", + (StreamMode::WebRTC, None) => "webrtc", + }; + + // Detect codec-only switch: already in WebRTC mode, just changing codec. + // switch_mode_transaction treats this as "no switch needed" since StreamMode + // is still WebRTC, so we handle codec change + event emission here. + let current_mode = state.stream_manager.current_mode().await; + let prev_codec = state.stream_manager.webrtc_streamer().current_video_codec().await; + + let codec_changed = video_codec.is_some_and(|c| c != prev_codec); + let is_codec_only_switch = current_mode == StreamMode::WebRTC + && new_mode == StreamMode::WebRTC + && codec_changed; + if let Some(codec) = video_codec { info!("Setting WebRTC video codec to {:?}", codec); if let Err(e) = state @@ -1295,22 +1314,34 @@ pub async fn stream_mode_set( } } + // For codec-only switch, emit events directly instead of going through + // switch_mode_transaction (which short-circuits when mode is unchanged). + if is_codec_only_switch { + let transition_id = uuid::Uuid::new_v4().to_string(); + + state + .stream_manager + .notify_codec_switch( + &transition_id, + requested_mode_str, + &codec_to_id(prev_codec), + ) + .await; + + return Ok(Json(StreamModeResponse { + success: true, + mode: requested_mode_str.to_string(), + transition_id: Some(transition_id), + switching: false, + message: Some(format!("Codec switched to {}", requested_mode_str)), + })); + } + let tx = state .stream_manager .switch_mode_transaction(new_mode.clone()) .await?; - // Return the requested codec identifier (for UI display). The actual active mode - // may differ if the request was rejected due to an in-progress switch. - let requested_mode_str = match (&new_mode, &video_codec) { - (StreamMode::Mjpeg, _) => "mjpeg", - (StreamMode::WebRTC, Some(VideoCodecType::H264)) => "h264", - (StreamMode::WebRTC, Some(VideoCodecType::H265)) => "h265", - (StreamMode::WebRTC, Some(VideoCodecType::VP8)) => "vp8", - (StreamMode::WebRTC, Some(VideoCodecType::VP9)) => "vp9", - (StreamMode::WebRTC, None) => "webrtc", - }; - let active_mode_str = match state.stream_manager.current_mode().await { StreamMode::Mjpeg => "mjpeg".to_string(), StreamMode::WebRTC => { diff --git a/src/webrtc/webrtc_streamer.rs b/src/webrtc/webrtc_streamer.rs index 2844f990..cabab679 100644 --- a/src/webrtc/webrtc_streamer.rs +++ b/src/webrtc/webrtc_streamer.rs @@ -787,6 +787,22 @@ impl WebRtcStreamer { count } + /// Close all sessions and wait for the video pipeline to fully release the + /// capture device. Use this when the caller needs the V4L2 device immediately + /// afterwards (e.g. switching to MJPEG mode). + pub async fn close_all_sessions_and_release_device(&self) -> usize { + let count = self.close_all_sessions().await; + + if let Some(ref pipeline) = *self.video_pipeline.read().await { + pipeline + .stop_and_wait(std::time::Duration::from_secs(3)) + .await; + } + *self.video_pipeline.write().await = None; + + count + } + /// Get session count pub async fn session_count(&self) -> usize { self.sessions.read().await.len() diff --git a/web/src/composables/useWebRTC.ts b/web/src/composables/useWebRTC.ts index 941d7520..01765f9b 100644 --- a/web/src/composables/useWebRTC.ts +++ b/web/src/composables/useWebRTC.ts @@ -334,15 +334,14 @@ async function addRemoteIceCandidate(candidate: IceCandidate) { async function flushPendingRemoteIce() { if (!peerConnection || !sessionId || !peerConnection.remoteDescription) return - const remaining: WebRTCIceCandidateEvent[] = [] - for (const event of pendingRemoteCandidates) { + const queued = pendingRemoteCandidates + pendingRemoteCandidates = [] + + for (const event of queued) { if (event.session_id === sessionId) { await addRemoteIceCandidate(event.candidate) - } else { - // Drop candidates for old sessions } } - pendingRemoteCandidates = remaining if (pendingRemoteIceComplete.has(sessionId)) { pendingRemoteIceComplete.delete(sessionId) @@ -546,10 +545,8 @@ async function connect(): Promise { } } - // 等待连接真正建立(最多等待 15 秒) - // 直接检查 peerConnection.connectionState 而不是 reactive state - // 因为 TypeScript 不知道 state 会被 onconnectionstatechange 回调异步修改 - const connectionTimeout = 15000 + // Wait for connection to establish (5s for LAN, sufficient for most scenarios) + const connectionTimeout = 5000 const pollInterval = 100 let waited = 0 connectStage.value = 'waiting_connection' @@ -568,7 +565,6 @@ async function connect(): Promise { waited += pollInterval } - // 超时 throw new Error('Connection timeout waiting for ICE negotiation') } catch (err) { state.value = 'failed' diff --git a/web/src/views/ConsoleView.vue b/web/src/views/ConsoleView.vue index 0325c1f9..37b84a43 100644 --- a/web/src/views/ConsoleView.vue +++ b/web/src/views/ConsoleView.vue @@ -1312,18 +1312,17 @@ async function switchToWebRTC(codec: VideoMode = 'h264') { } } - // Step 3: Connect WebRTC with retry - let retries = 3 + // Step 3: Connect WebRTC with retry (backoff between retries) + const MAX_ATTEMPTS = 3 + const RETRY_DELAYS = [200, 800] let success = false - while (retries > 0 && !success) { - success = await connectWebRTCSerial('switchToWebRTC') - if (!success) { - retries-- - if (retries > 0) { - console.log(`[WebRTC] Connection failed, retrying (${retries} attempts left)`) - await new Promise(resolve => setTimeout(resolve, 500)) - } + for (let attempt = 0; attempt < MAX_ATTEMPTS && !success; attempt++) { + if (attempt > 0) { + const delay = RETRY_DELAYS[attempt - 1] ?? RETRY_DELAYS[RETRY_DELAYS.length - 1] + console.log(`[WebRTC] Connection failed, retrying in ${delay}ms (${MAX_ATTEMPTS - attempt} attempts left)`) + await new Promise(resolve => setTimeout(resolve, delay)) } + success = await connectWebRTCSerial('switchToWebRTC') } if (success) { toast.success(t('console.webrtcConnected'), { @@ -1526,10 +1525,22 @@ watch(() => webrtc.state.value, (newState, oldState) => { }, 1000) } + // Handle direct 'failed' state (ICE or DTLS failure) + // Allow one automatic retry before marking as failed, consistent with + // the disconnected->reconnect path that allows 2 failures. if (newState === 'failed' && videoMode.value !== 'mjpeg') { webrtcReconnectFailures += 1 - if (webrtcReconnectFailures >= 1) { + if (webrtcReconnectFailures >= 2) { markWebRTCFailure(t('console.webrtcFailed')) + } else { + webrtcReconnectTimeout = setTimeout(async () => { + if (videoMode.value !== 'mjpeg' && webrtc.state.value !== 'connected') { + const success = await connectWebRTCSerial('auto reconnect after failed') + if (!success) { + markWebRTCFailure(t('console.webrtcFailed')) + } + } + }, 1000) } } }) @@ -2155,6 +2166,10 @@ async function activateConsoleView() { isConsoleActive.value = true registerInteractionListeners() + // REST snapshot: returning from Settings (or other routes) may have missed WS device_info + void systemStore.fetchAllStates() + void configStore.refreshHid().then(() => syncMouseModeFromConfig()).catch(() => {}) + // Ensure HID WebSocket is connected when console becomes active if (!hidWs.connected.value) { hidWs.connect().catch(() => {}) diff --git a/web/src/views/SettingsView.vue b/web/src/views/SettingsView.vue index e42623f8..80f0b038 100644 --- a/web/src/views/SettingsView.vue +++ b/web/src/views/SettingsView.vue @@ -974,33 +974,29 @@ async function saveConfig() { saved.value = false try { - // Save only config related to the active section - const savePromises: Promise[] = [] + // Save only config related to the active section. + // Sequential awaits: backend ConfigStore uses read-modify-write; parallel PATCH + // requests could overwrite each other's section (last writer wins on full JSON). // Video config (including encoder and WebRTC/STUN/TURN settings) if (activeSection.value === 'video') { - savePromises.push( - configStore.updateVideo({ - device: config.value.video_device || undefined, - format: config.value.video_format || undefined, - width: config.value.video_width, - height: config.value.video_height, - fps: toConfigFps(config.value.video_fps), - }) - ) - // Save Stream/Encoder and STUN/TURN config together - savePromises.push( - configStore.updateStream({ - encoder: config.value.encoder_backend as any, - stun_server: config.value.stun_server || undefined, - turn_server: config.value.turn_server || undefined, - turn_username: config.value.turn_username || undefined, - turn_password: config.value.turn_password || undefined, - }) - ) + await configStore.updateVideo({ + device: config.value.video_device || undefined, + format: config.value.video_format || undefined, + width: config.value.video_width, + height: config.value.video_height, + fps: toConfigFps(config.value.video_fps), + }) + await configStore.updateStream({ + encoder: config.value.encoder_backend as any, + stun_server: config.value.stun_server || undefined, + turn_server: config.value.turn_server || undefined, + turn_username: config.value.turn_username || undefined, + turn_password: config.value.turn_password || undefined, + }) } - // HID config + // HID config (includes MSD enable — same gadget; must not race with updateHid) if (activeSection.value === 'hid') { if (!isHidFunctionSelectionValid.value || !isOtgEndpointBudgetValid.value) { return @@ -1024,24 +1020,20 @@ async function saveConfig() { hidUpdate.otg_functions = { ...config.value.hid_otg_functions } hidUpdate.otg_keyboard_leds = config.value.hid_otg_keyboard_leds } - savePromises.push(configStore.updateHid(hidUpdate)) - savePromises.push( - configStore.updateMsd({ - enabled: config.value.msd_enabled, - }) - ) + await configStore.updateHid(hidUpdate) + await configStore.updateMsd({ + enabled: config.value.msd_enabled, + }) } // MSD config if (activeSection.value === 'msd') { - savePromises.push( - configStore.updateMsd({ - msd_dir: config.value.msd_dir || undefined, - }) - ) + await configStore.updateMsd({ + msd_dir: config.value.msd_dir || undefined, + }) } - await Promise.all(savePromises) + await loadConfig() saved.value = true setTimeout(() => (saved.value = false), 2000) } catch (e) {