mirror of
https://github.com/mofeng-git/One-KVM.git
synced 2026-01-28 16:41:52 +08:00
feat(video): 添加视频管道无订阅者自动停止功能
- SharedVideoPipeline: 添加 3 秒宽限期,无订阅者后自动停止 - Streamer: 添加 5 秒空闲检测,无 MJPEG/其他消费者后停止分发 - WebRtcStreamer: 添加管道监控任务,自动清理已停止的管道资源 - 修改方法签名使用 Arc<Self> 以支持弱引用回调
This commit is contained in:
@@ -21,7 +21,10 @@ use std::sync::atomic::{AtomicBool, Ordering};
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
use tokio::sync::{broadcast, watch, Mutex, RwLock};
|
use tokio::sync::{broadcast, watch, Mutex, RwLock};
|
||||||
use tracing::{debug, error, info, warn};
|
use tracing::{debug, error, info, trace, warn};
|
||||||
|
|
||||||
|
/// Grace period before auto-stopping pipeline when no subscribers (in seconds)
|
||||||
|
const AUTO_STOP_GRACE_PERIOD_SECS: u64 = 3;
|
||||||
|
|
||||||
use crate::error::{AppError, Result};
|
use crate::error::{AppError, Result};
|
||||||
use crate::video::convert::{Nv12Converter, PixelConverter};
|
use crate::video::convert::{Nv12Converter, PixelConverter};
|
||||||
@@ -562,6 +565,14 @@ impl SharedVideoPipeline {
|
|||||||
*self.running_rx.borrow()
|
*self.running_rx.borrow()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Subscribe to running state changes
|
||||||
|
///
|
||||||
|
/// Returns a watch receiver that can be used to detect when the pipeline stops.
|
||||||
|
/// This is useful for auto-cleanup when the pipeline auto-stops due to no subscribers.
|
||||||
|
pub fn running_watch(&self) -> watch::Receiver<bool> {
|
||||||
|
self.running_rx.clone()
|
||||||
|
}
|
||||||
|
|
||||||
/// Get current codec
|
/// Get current codec
|
||||||
pub async fn current_codec(&self) -> VideoEncoderType {
|
pub async fn current_codec(&self) -> VideoEncoderType {
|
||||||
self.config.read().await.output_codec
|
self.config.read().await.output_codec
|
||||||
@@ -614,6 +625,10 @@ impl SharedVideoPipeline {
|
|||||||
let mut fps_frame_count: u64 = 0;
|
let mut fps_frame_count: u64 = 0;
|
||||||
let mut running_rx = pipeline.running_rx.clone();
|
let mut running_rx = pipeline.running_rx.clone();
|
||||||
|
|
||||||
|
// Track when we last had subscribers for auto-stop feature
|
||||||
|
let mut no_subscribers_since: Option<Instant> = None;
|
||||||
|
let grace_period = Duration::from_secs(AUTO_STOP_GRACE_PERIOD_SECS);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
biased;
|
biased;
|
||||||
@@ -629,8 +644,36 @@ impl SharedVideoPipeline {
|
|||||||
Ok(video_frame) => {
|
Ok(video_frame) => {
|
||||||
pipeline.stats.lock().await.frames_captured += 1;
|
pipeline.stats.lock().await.frames_captured += 1;
|
||||||
|
|
||||||
if pipeline.frame_tx.receiver_count() == 0 {
|
let subscriber_count = pipeline.frame_tx.receiver_count();
|
||||||
|
|
||||||
|
if subscriber_count == 0 {
|
||||||
|
// Track when we started having no subscribers
|
||||||
|
if no_subscribers_since.is_none() {
|
||||||
|
no_subscribers_since = Some(Instant::now());
|
||||||
|
trace!("No subscribers, starting grace period timer");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if grace period has elapsed
|
||||||
|
if let Some(since) = no_subscribers_since {
|
||||||
|
if since.elapsed() >= grace_period {
|
||||||
|
info!(
|
||||||
|
"No subscribers for {}s, auto-stopping video pipeline",
|
||||||
|
grace_period.as_secs()
|
||||||
|
);
|
||||||
|
// Signal stop and break out of loop
|
||||||
|
let _ = pipeline.running.send(false);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip encoding but continue loop (within grace period)
|
||||||
continue;
|
continue;
|
||||||
|
} else {
|
||||||
|
// Reset the no-subscriber timer when we have subscribers again
|
||||||
|
if no_subscribers_since.is_some() {
|
||||||
|
trace!("Subscriber connected, resetting grace period timer");
|
||||||
|
no_subscribers_since = None;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
|
|||||||
@@ -496,13 +496,53 @@ impl Streamer {
|
|||||||
let mjpeg_handler = self.mjpeg_handler.clone();
|
let mjpeg_handler = self.mjpeg_handler.clone();
|
||||||
let mut frame_rx = capturer.subscribe();
|
let mut frame_rx = capturer.subscribe();
|
||||||
let state_ref = Arc::downgrade(self);
|
let state_ref = Arc::downgrade(self);
|
||||||
|
let frame_tx = capturer.frame_sender();
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
info!("Frame distribution task started");
|
info!("Frame distribution task started");
|
||||||
|
|
||||||
|
// Track when we started having no active consumers
|
||||||
|
let mut idle_since: Option<std::time::Instant> = None;
|
||||||
|
const IDLE_STOP_DELAY_SECS: u64 = 5;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
match frame_rx.recv().await {
|
match frame_rx.recv().await {
|
||||||
Ok(frame) => {
|
Ok(frame) => {
|
||||||
mjpeg_handler.update_frame(frame);
|
mjpeg_handler.update_frame(frame);
|
||||||
|
|
||||||
|
// Check if there are any active consumers:
|
||||||
|
// - MJPEG clients via mjpeg_handler
|
||||||
|
// - Other subscribers (WebRTC/RustDesk) via frame_tx receiver_count
|
||||||
|
// Note: receiver_count includes this task, so > 1 means other subscribers
|
||||||
|
let mjpeg_clients = mjpeg_handler.client_count();
|
||||||
|
let other_subscribers = frame_tx.receiver_count().saturating_sub(1);
|
||||||
|
|
||||||
|
if mjpeg_clients == 0 && other_subscribers == 0 {
|
||||||
|
if idle_since.is_none() {
|
||||||
|
idle_since = Some(std::time::Instant::now());
|
||||||
|
trace!("No active video consumers, starting idle timer");
|
||||||
|
} else if let Some(since) = idle_since {
|
||||||
|
if since.elapsed().as_secs() >= IDLE_STOP_DELAY_SECS {
|
||||||
|
info!(
|
||||||
|
"No active video consumers for {}s, stopping frame distribution",
|
||||||
|
IDLE_STOP_DELAY_SECS
|
||||||
|
);
|
||||||
|
// Stop the streamer
|
||||||
|
if let Some(streamer) = state_ref.upgrade() {
|
||||||
|
if let Err(e) = streamer.stop().await {
|
||||||
|
warn!("Failed to stop streamer during idle cleanup: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Reset idle timer when we have consumers
|
||||||
|
if idle_since.is_some() {
|
||||||
|
trace!("Video consumers active, resetting idle timer");
|
||||||
|
idle_since = None;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
|
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
|
||||||
trace!("Frame distribution lagged by {} frames", n);
|
trace!("Frame distribution lagged by {} frames", n);
|
||||||
|
|||||||
@@ -199,7 +199,7 @@ impl WebRtcStreamer {
|
|||||||
///
|
///
|
||||||
/// Supports H264, H265, VP8, VP9. This will restart the video pipeline
|
/// Supports H264, H265, VP8, VP9. This will restart the video pipeline
|
||||||
/// and close all existing sessions.
|
/// and close all existing sessions.
|
||||||
pub async fn set_video_codec(&self, codec: VideoCodecType) -> Result<()> {
|
pub async fn set_video_codec(self: &Arc<Self>, codec: VideoCodecType) -> Result<()> {
|
||||||
let current = *self.video_codec.read().await;
|
let current = *self.video_codec.read().await;
|
||||||
if current == codec {
|
if current == codec {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
@@ -263,7 +263,10 @@ impl WebRtcStreamer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Ensure video pipeline is initialized and running
|
/// Ensure video pipeline is initialized and running
|
||||||
async fn ensure_video_pipeline(&self, tx: broadcast::Sender<VideoFrame>) -> Result<Arc<SharedVideoPipeline>> {
|
async fn ensure_video_pipeline(
|
||||||
|
self: &Arc<Self>,
|
||||||
|
tx: broadcast::Sender<VideoFrame>,
|
||||||
|
) -> Result<Arc<SharedVideoPipeline>> {
|
||||||
let mut pipeline_guard = self.video_pipeline.write().await;
|
let mut pipeline_guard = self.video_pipeline.write().await;
|
||||||
|
|
||||||
if let Some(ref pipeline) = *pipeline_guard {
|
if let Some(ref pipeline) = *pipeline_guard {
|
||||||
@@ -289,6 +292,41 @@ impl WebRtcStreamer {
|
|||||||
let pipeline = SharedVideoPipeline::new(pipeline_config)?;
|
let pipeline = SharedVideoPipeline::new(pipeline_config)?;
|
||||||
pipeline.start(tx.subscribe()).await?;
|
pipeline.start(tx.subscribe()).await?;
|
||||||
|
|
||||||
|
// Start a monitor task to detect when pipeline auto-stops
|
||||||
|
let pipeline_weak = Arc::downgrade(&pipeline);
|
||||||
|
let streamer_weak = Arc::downgrade(self);
|
||||||
|
let mut running_rx = pipeline.running_watch();
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
// Wait for pipeline to stop (running becomes false)
|
||||||
|
while running_rx.changed().await.is_ok() {
|
||||||
|
if !*running_rx.borrow() {
|
||||||
|
info!("Video pipeline auto-stopped, cleaning up resources");
|
||||||
|
|
||||||
|
// Clear pipeline reference in WebRtcStreamer
|
||||||
|
if let Some(streamer) = streamer_weak.upgrade() {
|
||||||
|
let mut pipeline_guard = streamer.video_pipeline.write().await;
|
||||||
|
// Only clear if it's the same pipeline that stopped
|
||||||
|
if let Some(ref current) = *pipeline_guard {
|
||||||
|
if let Some(stopped_pipeline) = pipeline_weak.upgrade() {
|
||||||
|
if Arc::ptr_eq(current, &stopped_pipeline) {
|
||||||
|
*pipeline_guard = None;
|
||||||
|
info!("Cleared stopped video pipeline reference");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
drop(pipeline_guard);
|
||||||
|
|
||||||
|
// Clear video frame source to signal upstream to stop
|
||||||
|
*streamer.video_frame_tx.write().await = None;
|
||||||
|
info!("Cleared video frame source");
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
debug!("Video pipeline monitor task ended");
|
||||||
|
});
|
||||||
|
|
||||||
*pipeline_guard = Some(pipeline.clone());
|
*pipeline_guard = Some(pipeline.clone());
|
||||||
Ok(pipeline)
|
Ok(pipeline)
|
||||||
}
|
}
|
||||||
@@ -298,7 +336,7 @@ impl WebRtcStreamer {
|
|||||||
/// This is a public wrapper around ensure_video_pipeline for external
|
/// This is a public wrapper around ensure_video_pipeline for external
|
||||||
/// components (like RustDesk) that need to share the encoded video stream.
|
/// components (like RustDesk) that need to share the encoded video stream.
|
||||||
pub async fn ensure_video_pipeline_for_external(
|
pub async fn ensure_video_pipeline_for_external(
|
||||||
&self,
|
self: &Arc<Self>,
|
||||||
tx: broadcast::Sender<VideoFrame>,
|
tx: broadcast::Sender<VideoFrame>,
|
||||||
) -> Result<Arc<SharedVideoPipeline>> {
|
) -> Result<Arc<SharedVideoPipeline>> {
|
||||||
self.ensure_video_pipeline(tx).await
|
self.ensure_video_pipeline(tx).await
|
||||||
@@ -586,7 +624,7 @@ impl WebRtcStreamer {
|
|||||||
// === Session Management ===
|
// === Session Management ===
|
||||||
|
|
||||||
/// Create a new WebRTC session
|
/// Create a new WebRTC session
|
||||||
pub async fn create_session(&self) -> Result<String> {
|
pub async fn create_session(self: &Arc<Self>) -> Result<String> {
|
||||||
let session_id = uuid::Uuid::new_v4().to_string();
|
let session_id = uuid::Uuid::new_v4().to_string();
|
||||||
let codec = *self.video_codec.read().await;
|
let codec = *self.video_codec.read().await;
|
||||||
|
|
||||||
@@ -845,7 +883,7 @@ impl WebRtcStreamer {
|
|||||||
///
|
///
|
||||||
/// Note: Hardware encoders (VAAPI, NVENC, etc.) don't support dynamic bitrate changes.
|
/// Note: Hardware encoders (VAAPI, NVENC, etc.) don't support dynamic bitrate changes.
|
||||||
/// This method restarts the pipeline to apply the new bitrate.
|
/// This method restarts the pipeline to apply the new bitrate.
|
||||||
pub async fn set_bitrate(&self, bitrate_kbps: u32) -> Result<()> {
|
pub async fn set_bitrate(self: &Arc<Self>, bitrate_kbps: u32) -> Result<()> {
|
||||||
// Update config first
|
// Update config first
|
||||||
self.config.write().await.bitrate_kbps = bitrate_kbps;
|
self.config.write().await.bitrate_kbps = bitrate_kbps;
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user