From 3133db9c8677f971fa86cfd00283943451964caa Mon Sep 17 00:00:00 2001 From: mofeng-git Date: Wed, 11 Feb 2026 20:00:33 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=20rtsp=20=E6=9C=8D?= =?UTF-8?q?=E5=8A=A1=E8=BF=9E=E6=8E=A5=E9=94=99=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/rtsp/service.rs | 216 +++++++++++++++------- src/video/shared_video_pipeline.rs | 21 ++- src/webrtc/webrtc_streamer.rs | 59 ++++-- web/src/components/VideoConfigPopover.vue | 75 +++++++- web/src/i18n/en-US.ts | 2 + web/src/i18n/zh-CN.ts | 2 + 6 files changed, 287 insertions(+), 88 deletions(-) diff --git a/src/rtsp/service.rs b/src/rtsp/service.rs index 1737ec5f..f15ff4d4 100644 --- a/src/rtsp/service.rs +++ b/src/rtsp/service.rs @@ -1,8 +1,10 @@ -use bytes::Bytes; use base64::Engine; +use bytes::Bytes; use rand::Rng; use rtp::packet::Packet; use rtp::packetizer::Payloader; +use rtsp_types as rtsp; +use sdp_types as sdp; use std::collections::HashMap; use std::io; use std::net::SocketAddr; @@ -10,9 +12,8 @@ use std::sync::Arc; use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt}; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::{broadcast, Mutex, RwLock}; +use tokio::time::{sleep, Duration}; use webrtc::util::Marshal; -use rtsp_types as rtsp; -use sdp_types as sdp; use crate::config::{RtspCodec, RtspConfig}; use crate::error::{AppError, Result}; @@ -26,6 +27,7 @@ use crate::webrtc::rtp::parse_profile_level_id_from_sps; const RTP_CLOCK_RATE: u32 = 90_000; const RTP_MTU: usize = 1200; const RTSP_BUF_SIZE: usize = 8192; +const RTSP_RESUBSCRIBE_DELAY_MS: u64 = 300; #[derive(Debug, Clone, PartialEq)] pub enum RtspServiceStatus { @@ -150,9 +152,9 @@ impl RtspService { .parse() .map_err(|e| AppError::BadRequest(format!("Invalid RTSP bind address: {}", e)))?; - let listener = TcpListener::bind(bind_addr) - .await - .map_err(|e| AppError::Io(io::Error::new(e.kind(), format!("RTSP bind failed: {}", e))))?; + let listener = TcpListener::bind(bind_addr).await.map_err(|e| { + AppError::Io(io::Error::new(e.kind(), format!("RTSP bind failed: {}", e))) + })?; let service_config = self.config.clone(); let video_manager = self.video_manager.clone(); @@ -245,8 +247,14 @@ async fn handle_client( ) -> Result<()> { let cfg_snapshot = config.read().await.clone(); - let auth_enabled = cfg_snapshot.username.as_ref().is_some_and(|u| !u.is_empty()) - || cfg_snapshot.password.as_ref().is_some_and(|p| !p.is_empty()); + let auth_enabled = cfg_snapshot + .username + .as_ref() + .is_some_and(|u| !u.is_empty()) + || cfg_snapshot + .password + .as_ref() + .is_some_and(|p| !p.is_empty()); if cfg_snapshot.allow_one_client { let mut active_guard = shared.active_client.lock().await; @@ -288,17 +296,8 @@ async fn handle_client( } }; - if !is_valid_rtsp_path(&req.uri, &cfg_snapshot.path) { - send_response( - &mut stream, - &req, - 404, - "Not Found", - vec![], - "", - "", - ) - .await?; + if !is_valid_rtsp_path(&req.method, &req.uri, &cfg_snapshot.path) { + send_response(&mut stream, &req, 404, "Not Found", vec![], "", "").await?; continue; } @@ -368,21 +367,31 @@ async fn handle_client( &req, 200, "OK", - vec![( - "Content-Type".to_string(), - "application/sdp".to_string(), - )], + vec![("Content-Type".to_string(), "application/sdp".to_string())], &sdp, &state.session_id, ) .await?; } rtsp::Method::Setup => { - let transport = req - .headers - .get("transport") - .cloned() - .unwrap_or_default(); + let transport = req.headers.get("transport").cloned().unwrap_or_default(); + + if !is_tcp_transport_request(&transport) { + send_response( + &mut stream, + &req, + 461, + "Unsupported Transport", + vec![( + "Transport".to_string(), + "RTP/AVP/TCP;unicast;interleaved=0-1".to_string(), + )], + "", + &state.session_id, + ) + .await?; + continue; + } let interleaved = parse_interleaved_channel(&transport).unwrap_or(0); state.setup_done = true; @@ -420,16 +429,8 @@ async fn handle_client( continue; } - send_response( - &mut stream, - &req, - 200, - "OK", - vec![], - "", - &state.session_id, - ) - .await?; + send_response(&mut stream, &req, 200, "OK", vec![], "", &state.session_id) + .await?; if let Err(e) = stream_video_interleaved( stream, @@ -447,16 +448,8 @@ async fn handle_client( break 'client_loop; } rtsp::Method::Teardown => { - send_response( - &mut stream, - &req, - 200, - "OK", - vec![], - "", - &state.session_id, - ) - .await?; + send_response(&mut stream, &req, 200, "OK", vec![], "", &state.session_id) + .await?; break 'client_loop; } _ => { @@ -498,7 +491,9 @@ async fn stream_video_interleaved( let mut rx = video_manager .subscribe_encoded_frames() .await - .ok_or_else(|| AppError::VideoError("RTSP failed to subscribe encoded frames".to_string()))?; + .ok_or_else(|| { + AppError::VideoError("RTSP failed to subscribe encoded frames".to_string()) + })?; video_manager.request_keyframe().await.ok(); @@ -518,7 +513,21 @@ async fn stream_video_interleaved( tokio::select! { maybe_frame = rx.recv() => { let Some(frame) = maybe_frame else { - break; + tracing::warn!("RTSP encoded frame subscription ended, attempting to restart pipeline"); + + if let Some(new_rx) = video_manager.subscribe_encoded_frames().await { + rx = new_rx; + let _ = video_manager.request_keyframe().await; + tracing::info!("RTSP frame subscription recovered"); + } else { + tracing::warn!( + "RTSP failed to resubscribe encoded frames, retrying in {}ms", + RTSP_RESUBSCRIBE_DELAY_MS + ); + sleep(Duration::from_millis(RTSP_RESUBSCRIBE_DELAY_MS)).await; + } + + continue; }; if !is_frame_codec_match(&frame, &rtsp_codec) { @@ -689,11 +698,14 @@ fn take_rtsp_request_from_buffer(buffer: &mut Vec) -> Option { } fn find_bytes(haystack: &[u8], needle: &[u8]) -> Option { - haystack.windows(needle.len()).position(|window| window == needle) + haystack + .windows(needle.len()) + .position(|window| window == needle) } fn parse_rtsp_request(raw: &str) -> Option { - let (message, consumed): (rtsp::Message>, usize) = rtsp::Message::parse(raw.as_bytes()).ok()?; + let (message, consumed): (rtsp::Message>, usize) = + rtsp::Message::parse(raw.as_bytes()).ok()?; if consumed != raw.len() { return None; } @@ -745,6 +757,14 @@ fn parse_interleaved_channel(transport: &str) -> Option { None } +fn is_tcp_transport_request(transport: &str) -> bool { + transport + .split(',') + .map(str::trim) + .map(str::to_ascii_lowercase) + .any(|item| item.contains("rtp/avp/tcp") || item.contains("interleaved=")) +} + fn update_parameter_sets(params: &mut ParameterSets, frame: &EncodedVideoFrame) { let nal_units = split_annexb_nal_units(frame.data.as_ref()); @@ -1036,18 +1056,33 @@ fn status_code_from_u16(code: u16) -> rtsp::StatusCode { 405 => rtsp::StatusCode::MethodNotAllowed, 453 => rtsp::StatusCode::NotEnoughBandwidth, 455 => rtsp::StatusCode::MethodNotValidInThisState, + 461 => rtsp::StatusCode::UnsupportedTransport, _ => rtsp::StatusCode::InternalServerError, } } -fn is_valid_rtsp_path(uri: &str, configured_path: &str) -> bool { +fn is_valid_rtsp_path(method: &rtsp::Method, uri: &str, configured_path: &str) -> bool { + if matches!(method, rtsp::Method::Options) && uri.trim() == "*" { + return true; + } + let normalized_cfg = configured_path.trim_matches('/'); if normalized_cfg.is_empty() { return false; } let request_path = extract_rtsp_path(uri); - request_path == normalized_cfg + + if request_path == normalized_cfg { + return true; + } + + if !matches!(method, rtsp::Method::Setup | rtsp::Method::Teardown) { + return false; + } + + let control_track_path = format!("{}/trackID=0", normalized_cfg); + request_path == "trackID=0" || request_path == control_track_path } fn extract_rtsp_path(uri: &str) -> String { @@ -1074,8 +1109,13 @@ fn extract_rtsp_path(uri: &str) -> String { fn is_frame_codec_match(frame: &EncodedVideoFrame, codec: &RtspCodec) -> bool { matches!( (frame.codec, codec), - (crate::video::encoder::registry::VideoEncoderType::H264, RtspCodec::H264) - | (crate::video::encoder::registry::VideoEncoderType::H265, RtspCodec::H265) + ( + crate::video::encoder::registry::VideoEncoderType::H264, + RtspCodec::H264 + ) | ( + crate::video::encoder::registry::VideoEncoderType::H265, + RtspCodec::H265 + ) ) } @@ -1092,7 +1132,6 @@ fn generate_session_id() -> String { format!("{:016x}", value) } - #[cfg(test)] mod tests { use super::*; @@ -1109,9 +1148,14 @@ mod tests { } } - async fn read_response_from_duplex(mut client: tokio::io::DuplexStream) -> rtsp::Response> { + async fn read_response_from_duplex( + mut client: tokio::io::DuplexStream, + ) -> rtsp::Response> { let mut buf = vec![0u8; 4096]; - let n = client.read(&mut buf).await.expect("failed to read rtsp response"); + let n = client + .read(&mut buf) + .await + .expect("failed to read rtsp response"); assert!(n > 0); let (message, consumed): (rtsp::Message>, usize) = rtsp::Message::parse(&buf[..n]).expect("failed to parse rtsp response"); @@ -1188,13 +1232,57 @@ mod tests { assert!(fmtp_value.contains("sprop-parameter-sets=")); } + #[test] + fn rtsp_path_matching_follows_sdp_control_rules() { + assert!(is_valid_rtsp_path( + &rtsp::Method::Describe, + "rtsp://127.0.0.1/live", + "live" + )); + assert!(is_valid_rtsp_path( + &rtsp::Method::Describe, + "rtsp://127.0.0.1/live/?token=1", + "/live/" + )); + assert!(!is_valid_rtsp_path( + &rtsp::Method::Describe, + "rtsp://127.0.0.1/live2", + "live" + )); + assert!(!is_valid_rtsp_path( + &rtsp::Method::Describe, + "rtsp://127.0.0.1/", + "/" + )); + + assert!(is_valid_rtsp_path( + &rtsp::Method::Setup, + "rtsp://127.0.0.1/live/trackID=0", + "live" + )); + assert!(is_valid_rtsp_path( + &rtsp::Method::Setup, + "rtsp://127.0.0.1/trackID=0", + "live" + )); + assert!(!is_valid_rtsp_path( + &rtsp::Method::Describe, + "rtsp://127.0.0.1/live/trackID=0", + "live" + )); + + assert!(is_valid_rtsp_path(&rtsp::Method::Options, "*", "live")); + } #[test] - fn rtsp_path_matching_is_exact_after_normalization() { - assert!(is_valid_rtsp_path("rtsp://127.0.0.1/live", "live")); - assert!(is_valid_rtsp_path("rtsp://127.0.0.1/live/?token=1", "/live/")); - assert!(!is_valid_rtsp_path("rtsp://127.0.0.1/live2", "live")); - assert!(!is_valid_rtsp_path("rtsp://127.0.0.1/", "/")); + fn transport_parsing_detects_tcp_interleaved_requests() { + assert!(is_tcp_transport_request( + "RTP/AVP/TCP;unicast;interleaved=0-1" + )); + assert!(is_tcp_transport_request("RTP/AVP;unicast;interleaved=2-3")); + assert!(!is_tcp_transport_request( + "RTP/AVP;unicast;client_port=8000-8001" + )); } #[test] diff --git a/src/video/shared_video_pipeline.rs b/src/video/shared_video_pipeline.rs index bfb2acc9..f34725b9 100644 --- a/src/video/shared_video_pipeline.rs +++ b/src/video/shared_video_pipeline.rs @@ -27,6 +27,8 @@ use tracing::{debug, error, info, trace, warn}; /// Grace period before auto-stopping pipeline when no subscribers (in seconds) const AUTO_STOP_GRACE_PERIOD_SECS: u64 = 3; +/// Restart capture stream after this many consecutive timeouts. +const CAPTURE_TIMEOUT_RESTART_THRESHOLD: u32 = 5; /// Minimum valid frame size for capture const MIN_CAPTURE_FRAME_SIZE: usize = 128; /// Validate JPEG header every N frames to reduce overhead @@ -1319,6 +1321,7 @@ impl SharedVideoPipeline { let grace_period = Duration::from_secs(AUTO_STOP_GRACE_PERIOD_SECS); let mut sequence: u64 = 0; let mut validate_counter: u64 = 0; + let mut consecutive_timeouts: u32 = 0; let capture_error_throttler = LogThrottler::with_secs(5); let mut suppressed_capture_errors: HashMap = HashMap::new(); @@ -1363,11 +1366,27 @@ impl SharedVideoPipeline { let mut owned = buffer_pool.take(MIN_CAPTURE_FRAME_SIZE); let meta = match stream.next_into(&mut owned) { - Ok(meta) => meta, + Ok(meta) => { + consecutive_timeouts = 0; + meta + } Err(e) => { if e.kind() == std::io::ErrorKind::TimedOut { + consecutive_timeouts = consecutive_timeouts.saturating_add(1); warn!("Capture timeout - no signal?"); + + if consecutive_timeouts >= CAPTURE_TIMEOUT_RESTART_THRESHOLD { + warn!( + "Capture timed out {} consecutive times, restarting video pipeline", + consecutive_timeouts + ); + let _ = pipeline.running.send(false); + pipeline.running_flag.store(false, Ordering::Release); + let _ = frame_seq_tx.send(sequence.wrapping_add(1)); + break; + } } else { + consecutive_timeouts = 0; let key = classify_capture_error(&e); if capture_error_throttler.should_log(&key) { let suppressed = diff --git a/src/webrtc/webrtc_streamer.rs b/src/webrtc/webrtc_streamer.rs index 8b74e256..b3e6e276 100644 --- a/src/webrtc/webrtc_streamer.rs +++ b/src/webrtc/webrtc_streamer.rs @@ -250,6 +250,33 @@ impl WebRtcStreamer { } } + fn should_stop_pipeline(session_count: usize, subscriber_count: usize) -> bool { + session_count == 0 && subscriber_count == 0 + } + + async fn stop_pipeline_if_idle(&self, reason: &str) { + let session_count = self.sessions.read().await.len(); + let pipeline = self.video_pipeline.read().await.clone(); + + let Some(pipeline) = pipeline else { + return; + }; + + let subscriber_count = pipeline.subscriber_count(); + if Self::should_stop_pipeline(session_count, subscriber_count) { + info!( + "{} stopping video pipeline (sessions={}, subscribers={})", + reason, session_count, subscriber_count + ); + pipeline.stop(); + } else { + debug!( + "Keeping video pipeline alive (reason={}, sessions={}, subscribers={})", + reason, session_count, subscriber_count + ); + } + } + /// Ensure video pipeline is initialized and running async fn ensure_video_pipeline(self: &Arc) -> Result> { let mut pipeline_guard = self.video_pipeline.write().await; @@ -740,13 +767,7 @@ impl WebRtcStreamer { session.close().await?; } - // Stop pipeline if no more sessions - if self.sessions.read().await.is_empty() { - if let Some(ref pipeline) = *self.video_pipeline.read().await { - info!("No more sessions, stopping video pipeline"); - pipeline.stop(); - } - } + self.stop_pipeline_if_idle("After close_session").await; Ok(()) } @@ -763,11 +784,8 @@ impl WebRtcStreamer { } } - // Stop pipeline drop(sessions); - if let Some(ref pipeline) = *self.video_pipeline.read().await { - pipeline.stop(); - } + self.stop_pipeline_if_idle("After close_all_sessions").await; count } @@ -826,14 +844,9 @@ impl WebRtcStreamer { sessions.remove(id); } - // Stop pipeline if no more sessions - if sessions.is_empty() { - drop(sessions); - if let Some(ref pipeline) = *self.video_pipeline.read().await { - info!("No more sessions after cleanup, stopping video pipeline"); - pipeline.stop(); - } - } + drop(sessions); + self.stop_pipeline_if_idle("After cleanup_closed_sessions") + .await; } } @@ -990,4 +1003,12 @@ mod tests { let codecs = streamer.supported_video_codecs(); assert!(codecs.contains(&VideoCodecType::H264)); } + + #[test] + fn stop_pipeline_requires_no_sessions_and_no_subscribers() { + assert!(WebRtcStreamer::should_stop_pipeline(0, 0)); + assert!(!WebRtcStreamer::should_stop_pipeline(1, 0)); + assert!(!WebRtcStreamer::should_stop_pipeline(0, 1)); + assert!(!WebRtcStreamer::should_stop_pipeline(2, 3)); + } } diff --git a/web/src/components/VideoConfigPopover.vue b/web/src/components/VideoConfigPopover.vue index 96625bb6..82dec97d 100644 --- a/web/src/components/VideoConfigPopover.vue +++ b/web/src/components/VideoConfigPopover.vue @@ -17,7 +17,7 @@ import { SelectTrigger, SelectValue, } from '@/components/ui/select' -import { Monitor, RefreshCw, Loader2, Settings, Zap, Scale, Image } from 'lucide-vue-next' +import { Monitor, RefreshCw, Loader2, Settings, Zap, Scale, Image, AlertTriangle } from 'lucide-vue-next' import HelpTooltip from '@/components/HelpTooltip.vue' import { configApi, @@ -73,6 +73,48 @@ const loadingCodecs = ref(false) const backends = ref([]) const constraints = ref(null) const currentEncoderBackend = computed(() => configStore.stream?.encoder || 'auto') +const isRtspEnabled = computed(() => { + if (typeof configStore.rtspStatus?.config?.enabled === 'boolean') { + return configStore.rtspStatus.config.enabled + } + return !!configStore.rtspConfig?.enabled +}) +const isRustdeskEnabled = computed(() => { + if (typeof configStore.rustdeskStatus?.config?.enabled === 'boolean') { + return configStore.rustdeskStatus.config.enabled + } + return !!configStore.rustdeskConfig?.enabled +}) +const isRtspCodecLocked = computed(() => isRtspEnabled.value) +const isRustdeskWebrtcLocked = computed(() => !isRtspEnabled.value && isRustdeskEnabled.value) +const codecLockSources = computed(() => { + if (isRtspCodecLocked.value) { + return isRustdeskEnabled.value ? 'RTSP/RustDesk' : 'RTSP' + } + if (isRustdeskWebrtcLocked.value) return 'RustDesk' + return '' +}) +const codecLockMessage = computed(() => { + if (!codecLockSources.value) return '' + return t('actionbar.multiSourceCodecLocked', { sources: codecLockSources.value }) +}) +const videoParamWarningSources = computed(() => { + if (isRtspEnabled.value && isRustdeskEnabled.value) return 'RTSP/RustDesk' + if (isRtspEnabled.value) return 'RTSP' + if (isRustdeskEnabled.value) return 'RustDesk' + return '' +}) +const videoParamWarningMessage = computed(() => { + if (!videoParamWarningSources.value) return '' + return t('actionbar.multiSourceVideoParamsWarning', { sources: videoParamWarningSources.value }) +}) +const isCodecLocked = computed(() => !!codecLockMessage.value) + +const isCodecOptionDisabled = (codecId: string): boolean => { + if (!isBrowserSupported(codecId)) return true + if (isRustdeskWebrtcLocked.value && codecId === 'mjpeg') return true + return false +} // Browser supported codecs (WebRTC receive capabilities) const browserSupportedCodecs = ref>(new Set()) @@ -363,6 +405,16 @@ function syncFromCurrentIfChanged() { function handleVideoModeChange(mode: unknown) { if (typeof mode !== 'string') return + if (isRtspCodecLocked.value) { + toast.warning(codecLockMessage.value) + return + } + + if (isRustdeskWebrtcLocked.value && mode === 'mjpeg') { + toast.warning(codecLockMessage.value) + return + } + if (constraints.value?.allowed_codecs?.length && !constraints.value.allowed_codecs.includes(mode)) { toast.error(constraints.value.reason || t('actionbar.selectMode')) return @@ -500,6 +552,8 @@ watch(() => props.open, (isOpen) => { Promise.all([ configStore.refreshVideo(), configStore.refreshStream(), + configStore.refreshRtspStatus(), + configStore.refreshRustdeskStatus(), ]).then(() => { initializeFromCurrent() }).catch(() => { @@ -539,7 +593,7 @@ watch(currentConfig, () => {