diff --git a/src/video/capture.rs b/src/video/capture.rs index 5b0edfa2..f8fa2294 100644 --- a/src/video/capture.rs +++ b/src/video/capture.rs @@ -143,7 +143,7 @@ impl VideoCapturer { /// Create a new video capturer pub fn new(config: CaptureConfig) -> Self { let (state_tx, state_rx) = watch::channel(CaptureState::Stopped); - let (frame_tx, _) = broadcast::channel(64); // Buffer up to 64 frames for software encoding + let (frame_tx, _) = broadcast::channel(4); // Reduced from 64 for lower latency Self { config, diff --git a/src/video/shared_video_pipeline.rs b/src/video/shared_video_pipeline.rs index 772ae172..5b253597 100644 --- a/src/video/shared_video_pipeline.rs +++ b/src/video/shared_video_pipeline.rs @@ -17,7 +17,7 @@ //! ``` use bytes::Bytes; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::sync::{broadcast, watch, Mutex, RwLock}; @@ -291,13 +291,13 @@ pub struct SharedVideoPipeline { /// YUV420P buffer for turbojpeg decoder output yuv420p_buffer: Mutex>, /// Whether the encoder needs YUV420P (true) or NV12 (false) - encoder_needs_yuv420p: Mutex, + encoder_needs_yuv420p: AtomicBool, frame_tx: broadcast::Sender, stats: Mutex, running: watch::Sender, running_rx: watch::Receiver, - encode_times: Mutex>, - sequence: Mutex, + /// Frame sequence counter (atomic for lock-free access) + sequence: AtomicU64, /// Atomic flag for keyframe request (avoids lock contention) keyframe_requested: AtomicBool, } @@ -314,7 +314,7 @@ impl SharedVideoPipeline { config.input_format ); - let (frame_tx, _) = broadcast::channel(64); // Increased from 16 for software encoding + let (frame_tx, _) = broadcast::channel(8); // Reduced from 64 for lower latency let (running_tx, running_rx) = watch::channel(false); let nv12_size = (config.resolution.width * config.resolution.height * 3 / 2) as usize; let yuv420p_size = nv12_size; // Same size as NV12 @@ -328,13 +328,12 @@ impl SharedVideoPipeline { mjpeg_turbo_decoder: Mutex::new(None), nv12_buffer: Mutex::new(vec![0u8; nv12_size]), yuv420p_buffer: Mutex::new(vec![0u8; yuv420p_size]), - encoder_needs_yuv420p: Mutex::new(false), + encoder_needs_yuv420p: AtomicBool::new(false), frame_tx, stats: Mutex::new(SharedVideoPipelineStats::default()), running: running_tx, running_rx, - encode_times: Mutex::new(Vec::with_capacity(100)), - sequence: Mutex::new(0), + sequence: AtomicU64::new(0), keyframe_requested: AtomicBool::new(false), }); @@ -527,7 +526,7 @@ impl SharedVideoPipeline { *self.yuv420p_converter.lock().await = yuv420p_converter; *self.mjpeg_decoder.lock().await = mjpeg_decoder; *self.mjpeg_turbo_decoder.lock().await = mjpeg_turbo_decoder; - *self.encoder_needs_yuv420p.lock().await = needs_yuv420p; + self.encoder_needs_yuv420p.store(needs_yuv420p, Ordering::Release); Ok(()) } @@ -598,7 +597,7 @@ impl SharedVideoPipeline { *self.yuv420p_converter.lock().await = None; *self.mjpeg_decoder.lock().await = None; *self.mjpeg_turbo_decoder.lock().await = None; - *self.encoder_needs_yuv420p.lock().await = false; + self.encoder_needs_yuv420p.store(false, Ordering::Release); info!("Switched to {} codec", codec); Ok(()) @@ -625,6 +624,13 @@ impl SharedVideoPipeline { let mut fps_frame_count: u64 = 0; let mut running_rx = pipeline.running_rx.clone(); + // Local counters for batch stats update (reduce lock contention) + let mut local_frames_encoded: u64 = 0; + let mut local_bytes_encoded: u64 = 0; + let mut local_keyframes: u64 = 0; + let mut local_errors: u64 = 0; + let mut local_dropped: u64 = 0; + // Track when we last had subscribers for auto-stop feature let mut no_subscribers_since: Option = None; let grace_period = Duration::from_secs(AUTO_STOP_GRACE_PERIOD_SECS); @@ -642,8 +648,6 @@ impl SharedVideoPipeline { result = frame_rx.recv() => { match result { Ok(video_frame) => { - pipeline.stats.lock().await.frames_captured += 1; - let subscriber_count = pipeline.frame_tx.receiver_count(); if subscriber_count == 0 { @@ -676,32 +680,15 @@ impl SharedVideoPipeline { } } - let start = Instant::now(); - match pipeline.encode_frame(&video_frame, frame_count).await { Ok(Some(encoded_frame)) => { - let encode_time = start.elapsed().as_secs_f32() * 1000.0; let _ = pipeline.frame_tx.send(encoded_frame.clone()); - let is_keyframe = encoded_frame.is_keyframe; - - // Update stats - { - let mut s = pipeline.stats.lock().await; - s.frames_encoded += 1; - s.bytes_encoded += encoded_frame.data.len() as u64; - if is_keyframe { - s.keyframes_encoded += 1; - } - - let mut times = pipeline.encode_times.lock().await; - times.push(encode_time); - if times.len() > 100 { - times.remove(0); - } - if !times.is_empty() { - s.avg_encode_time_ms = times.iter().sum::() / times.len() as f32; - } + // Update local counters (no lock) + local_frames_encoded += 1; + local_bytes_encoded += encoded_frame.data.len() as u64; + if encoded_frame.is_keyframe { + local_keyframes += 1; } frame_count += 1; @@ -710,19 +697,35 @@ impl SharedVideoPipeline { Ok(None) => {} Err(e) => { error!("Encoding failed: {}", e); - pipeline.stats.lock().await.errors += 1; + local_errors += 1; } } + // Batch update stats every second (reduces lock contention) if last_fps_time.elapsed() >= Duration::from_secs(1) { - let mut s = pipeline.stats.lock().await; - s.current_fps = fps_frame_count as f32 / last_fps_time.elapsed().as_secs_f32(); + let current_fps = fps_frame_count as f32 / last_fps_time.elapsed().as_secs_f32(); fps_frame_count = 0; last_fps_time = Instant::now(); + + // Single lock acquisition for all stats + let mut s = pipeline.stats.lock().await; + s.frames_encoded += local_frames_encoded; + s.bytes_encoded += local_bytes_encoded; + s.keyframes_encoded += local_keyframes; + s.errors += local_errors; + s.frames_dropped += local_dropped; + s.current_fps = current_fps; + + // Reset local counters + local_frames_encoded = 0; + local_bytes_encoded = 0; + local_keyframes = 0; + local_errors = 0; + local_dropped = 0; } } Err(broadcast::error::RecvError::Lagged(n)) => { - pipeline.stats.lock().await.frames_dropped += n; + local_dropped += n; } Err(broadcast::error::RecvError::Closed) => { break; @@ -762,7 +765,7 @@ impl SharedVideoPipeline { let mut mjpeg_turbo_decoder = self.mjpeg_turbo_decoder.lock().await; let mut nv12_converter = self.nv12_converter.lock().await; let mut yuv420p_converter = self.yuv420p_converter.lock().await; - let needs_yuv420p = *self.encoder_needs_yuv420p.lock().await; + let needs_yuv420p = self.encoder_needs_yuv420p.load(Ordering::Acquire); let mut encoder_guard = self.encoder.lock().await; let encoder = encoder_guard.as_mut().ok_or_else(|| { @@ -835,11 +838,7 @@ impl SharedVideoPipeline { let encoded = frames.into_iter().next().unwrap(); let is_keyframe = encoded.key == 1; - let sequence = { - let mut seq = self.sequence.lock().await; - *seq += 1; - *seq - }; + let sequence = self.sequence.fetch_add(1, Ordering::Relaxed) + 1; // Debug log for H265 encoded frame if codec == VideoEncoderType::H265 && (is_keyframe || frame_count % 30 == 1) { diff --git a/src/webrtc/video_track.rs b/src/webrtc/video_track.rs index 5427c422..8142838e 100644 --- a/src/webrtc/video_track.rs +++ b/src/webrtc/video_track.rs @@ -421,9 +421,11 @@ impl UniversalVideoTrack { /// Write VP8 frame async fn write_vp8_frame(&self, data: &[u8], is_keyframe: bool) -> Result<()> { // VP8 frames are sent directly without NAL parsing + // Calculate frame duration based on configured FPS + let frame_duration = Duration::from_micros(1_000_000 / self.config.fps.max(1) as u64); let sample = Sample { data: Bytes::copy_from_slice(data), - duration: Duration::from_secs(1), + duration: frame_duration, ..Default::default() }; @@ -452,9 +454,11 @@ impl UniversalVideoTrack { /// Write VP9 frame async fn write_vp9_frame(&self, data: &[u8], is_keyframe: bool) -> Result<()> { // VP9 frames are sent directly without NAL parsing + // Calculate frame duration based on configured FPS + let frame_duration = Duration::from_micros(1_000_000 / self.config.fps.max(1) as u64); let sample = Sample { data: Bytes::copy_from_slice(data), - duration: Duration::from_secs(1), + duration: frame_duration, ..Default::default() }; @@ -483,13 +487,15 @@ impl UniversalVideoTrack { /// Send NAL units as samples (H264 only) async fn send_nals(&self, nals: Vec, is_keyframe: bool) -> Result<()> { let mut total_bytes = 0u64; + // Calculate frame duration based on configured FPS + let frame_duration = Duration::from_micros(1_000_000 / self.config.fps.max(1) as u64); match &self.track { TrackType::Sample(track) => { for nal_data in nals { let sample = Sample { data: nal_data.clone(), - duration: Duration::from_secs(1), + duration: frame_duration, ..Default::default() }; diff --git a/src/webrtc/webrtc_streamer.rs b/src/webrtc/webrtc_streamer.rs index 7e0e94e1..e5504e03 100644 --- a/src/webrtc/webrtc_streamer.rs +++ b/src/webrtc/webrtc_streamer.rs @@ -533,16 +533,12 @@ impl WebRtcStreamer { info!("Closed {} existing sessions due to config change", session_count); } - // Update config + // Update config (preserve user-configured bitrate) let mut config = self.config.write().await; config.resolution = resolution; config.input_format = format; config.fps = fps; - - // Scale bitrate based on resolution - let base_pixels: u64 = 1280 * 720; - let actual_pixels: u64 = resolution.width as u64 * resolution.height as u64; - config.bitrate_kbps = ((8000u64 * actual_pixels / base_pixels).max(1000).min(15000)) as u32; + // Note: bitrate is NOT auto-scaled here - use set_bitrate() or config to change it info!( "WebRTC config updated: {}x{} {:?} @ {} fps, {} kbps",