From 6740c411887b527e90d0c7ee4256cc208e31c942 Mon Sep 17 00:00:00 2001 From: mofeng-git Date: Thu, 1 Jan 2026 10:36:30 +0800 Subject: [PATCH] =?UTF-8?q?feat(video):=20=E6=B7=BB=E5=8A=A0=E8=A7=86?= =?UTF-8?q?=E9=A2=91=E7=AE=A1=E9=81=93=E6=97=A0=E8=AE=A2=E9=98=85=E8=80=85?= =?UTF-8?q?=E8=87=AA=E5=8A=A8=E5=81=9C=E6=AD=A2=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - SharedVideoPipeline: 添加 3 秒宽限期,无订阅者后自动停止 - Streamer: 添加 5 秒空闲检测,无 MJPEG/其他消费者后停止分发 - WebRtcStreamer: 添加管道监控任务,自动清理已停止的管道资源 - 修改方法签名使用 Arc 以支持弱引用回调 --- src/video/shared_video_pipeline.rs | 47 +++++++++++++++++++++++++++-- src/video/streamer.rs | 40 +++++++++++++++++++++++++ src/webrtc/webrtc_streamer.rs | 48 ++++++++++++++++++++++++++---- 3 files changed, 128 insertions(+), 7 deletions(-) diff --git a/src/video/shared_video_pipeline.rs b/src/video/shared_video_pipeline.rs index e13d63c0..772ae172 100644 --- a/src/video/shared_video_pipeline.rs +++ b/src/video/shared_video_pipeline.rs @@ -21,7 +21,10 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::sync::{broadcast, watch, Mutex, RwLock}; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, info, trace, warn}; + +/// Grace period before auto-stopping pipeline when no subscribers (in seconds) +const AUTO_STOP_GRACE_PERIOD_SECS: u64 = 3; use crate::error::{AppError, Result}; use crate::video::convert::{Nv12Converter, PixelConverter}; @@ -562,6 +565,14 @@ impl SharedVideoPipeline { *self.running_rx.borrow() } + /// Subscribe to running state changes + /// + /// Returns a watch receiver that can be used to detect when the pipeline stops. + /// This is useful for auto-cleanup when the pipeline auto-stops due to no subscribers. + pub fn running_watch(&self) -> watch::Receiver { + self.running_rx.clone() + } + /// Get current codec pub async fn current_codec(&self) -> VideoEncoderType { self.config.read().await.output_codec @@ -614,6 +625,10 @@ impl SharedVideoPipeline { let mut fps_frame_count: u64 = 0; let mut running_rx = pipeline.running_rx.clone(); + // Track when we last had subscribers for auto-stop feature + let mut no_subscribers_since: Option = None; + let grace_period = Duration::from_secs(AUTO_STOP_GRACE_PERIOD_SECS); + loop { tokio::select! { biased; @@ -629,8 +644,36 @@ impl SharedVideoPipeline { Ok(video_frame) => { pipeline.stats.lock().await.frames_captured += 1; - if pipeline.frame_tx.receiver_count() == 0 { + let subscriber_count = pipeline.frame_tx.receiver_count(); + + if subscriber_count == 0 { + // Track when we started having no subscribers + if no_subscribers_since.is_none() { + no_subscribers_since = Some(Instant::now()); + trace!("No subscribers, starting grace period timer"); + } + + // Check if grace period has elapsed + if let Some(since) = no_subscribers_since { + if since.elapsed() >= grace_period { + info!( + "No subscribers for {}s, auto-stopping video pipeline", + grace_period.as_secs() + ); + // Signal stop and break out of loop + let _ = pipeline.running.send(false); + break; + } + } + + // Skip encoding but continue loop (within grace period) continue; + } else { + // Reset the no-subscriber timer when we have subscribers again + if no_subscribers_since.is_some() { + trace!("Subscriber connected, resetting grace period timer"); + no_subscribers_since = None; + } } let start = Instant::now(); diff --git a/src/video/streamer.rs b/src/video/streamer.rs index 423f873b..861421fb 100644 --- a/src/video/streamer.rs +++ b/src/video/streamer.rs @@ -496,13 +496,53 @@ impl Streamer { let mjpeg_handler = self.mjpeg_handler.clone(); let mut frame_rx = capturer.subscribe(); let state_ref = Arc::downgrade(self); + let frame_tx = capturer.frame_sender(); tokio::spawn(async move { info!("Frame distribution task started"); + + // Track when we started having no active consumers + let mut idle_since: Option = None; + const IDLE_STOP_DELAY_SECS: u64 = 5; + loop { match frame_rx.recv().await { Ok(frame) => { mjpeg_handler.update_frame(frame); + + // Check if there are any active consumers: + // - MJPEG clients via mjpeg_handler + // - Other subscribers (WebRTC/RustDesk) via frame_tx receiver_count + // Note: receiver_count includes this task, so > 1 means other subscribers + let mjpeg_clients = mjpeg_handler.client_count(); + let other_subscribers = frame_tx.receiver_count().saturating_sub(1); + + if mjpeg_clients == 0 && other_subscribers == 0 { + if idle_since.is_none() { + idle_since = Some(std::time::Instant::now()); + trace!("No active video consumers, starting idle timer"); + } else if let Some(since) = idle_since { + if since.elapsed().as_secs() >= IDLE_STOP_DELAY_SECS { + info!( + "No active video consumers for {}s, stopping frame distribution", + IDLE_STOP_DELAY_SECS + ); + // Stop the streamer + if let Some(streamer) = state_ref.upgrade() { + if let Err(e) = streamer.stop().await { + warn!("Failed to stop streamer during idle cleanup: {}", e); + } + } + break; + } + } + } else { + // Reset idle timer when we have consumers + if idle_since.is_some() { + trace!("Video consumers active, resetting idle timer"); + idle_since = None; + } + } } Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => { trace!("Frame distribution lagged by {} frames", n); diff --git a/src/webrtc/webrtc_streamer.rs b/src/webrtc/webrtc_streamer.rs index 46b74eb4..7e0e94e1 100644 --- a/src/webrtc/webrtc_streamer.rs +++ b/src/webrtc/webrtc_streamer.rs @@ -199,7 +199,7 @@ impl WebRtcStreamer { /// /// Supports H264, H265, VP8, VP9. This will restart the video pipeline /// and close all existing sessions. - pub async fn set_video_codec(&self, codec: VideoCodecType) -> Result<()> { + pub async fn set_video_codec(self: &Arc, codec: VideoCodecType) -> Result<()> { let current = *self.video_codec.read().await; if current == codec { return Ok(()); @@ -263,7 +263,10 @@ impl WebRtcStreamer { } /// Ensure video pipeline is initialized and running - async fn ensure_video_pipeline(&self, tx: broadcast::Sender) -> Result> { + async fn ensure_video_pipeline( + self: &Arc, + tx: broadcast::Sender, + ) -> Result> { let mut pipeline_guard = self.video_pipeline.write().await; if let Some(ref pipeline) = *pipeline_guard { @@ -289,6 +292,41 @@ impl WebRtcStreamer { let pipeline = SharedVideoPipeline::new(pipeline_config)?; pipeline.start(tx.subscribe()).await?; + // Start a monitor task to detect when pipeline auto-stops + let pipeline_weak = Arc::downgrade(&pipeline); + let streamer_weak = Arc::downgrade(self); + let mut running_rx = pipeline.running_watch(); + + tokio::spawn(async move { + // Wait for pipeline to stop (running becomes false) + while running_rx.changed().await.is_ok() { + if !*running_rx.borrow() { + info!("Video pipeline auto-stopped, cleaning up resources"); + + // Clear pipeline reference in WebRtcStreamer + if let Some(streamer) = streamer_weak.upgrade() { + let mut pipeline_guard = streamer.video_pipeline.write().await; + // Only clear if it's the same pipeline that stopped + if let Some(ref current) = *pipeline_guard { + if let Some(stopped_pipeline) = pipeline_weak.upgrade() { + if Arc::ptr_eq(current, &stopped_pipeline) { + *pipeline_guard = None; + info!("Cleared stopped video pipeline reference"); + } + } + } + drop(pipeline_guard); + + // Clear video frame source to signal upstream to stop + *streamer.video_frame_tx.write().await = None; + info!("Cleared video frame source"); + } + break; + } + } + debug!("Video pipeline monitor task ended"); + }); + *pipeline_guard = Some(pipeline.clone()); Ok(pipeline) } @@ -298,7 +336,7 @@ impl WebRtcStreamer { /// This is a public wrapper around ensure_video_pipeline for external /// components (like RustDesk) that need to share the encoded video stream. pub async fn ensure_video_pipeline_for_external( - &self, + self: &Arc, tx: broadcast::Sender, ) -> Result> { self.ensure_video_pipeline(tx).await @@ -586,7 +624,7 @@ impl WebRtcStreamer { // === Session Management === /// Create a new WebRTC session - pub async fn create_session(&self) -> Result { + pub async fn create_session(self: &Arc) -> Result { let session_id = uuid::Uuid::new_v4().to_string(); let codec = *self.video_codec.read().await; @@ -845,7 +883,7 @@ impl WebRtcStreamer { /// /// Note: Hardware encoders (VAAPI, NVENC, etc.) don't support dynamic bitrate changes. /// This method restarts the pipeline to apply the new bitrate. - pub async fn set_bitrate(&self, bitrate_kbps: u32) -> Result<()> { + pub async fn set_bitrate(self: &Arc, bitrate_kbps: u32) -> Result<()> { // Update config first self.config.write().await.bitrate_kbps = bitrate_kbps;