From 04e62d1e3f89e9345aca24e77094f7f3fd7db17e Mon Sep 17 00:00:00 2001 From: mofeng-git Date: Fri, 2 Jan 2026 11:58:55 +0800 Subject: [PATCH] =?UTF-8?q?perf(video):=20=E6=94=B9=E5=96=84=E8=A7=86?= =?UTF-8?q?=E9=A2=91=E5=8D=A1=E9=A1=BF=E9=97=AE=E9=A2=98=E5=B9=B6=E4=BC=98?= =?UTF-8?q?=E5=8C=96=E7=BC=96=E7=A0=81=E6=80=A7=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 改善内容: 1. NAL单元duration累积bug修复 - 修改video_track.rs和unified_video_track.rs - 只有帧内最后一个NAL获得frame_duration,其他为ZERO - 确保同一帧的所有NAL共享相同的RTP时间戳 2. 修复VP8/VP9固定1秒duration错误 - 将Duration::from_secs(1)改为正确的frame_duration计算 3. PTS计算优化(shared_video_pipeline.rs) - 将pipeline_start_time从Mutex>改为AtomicI64 - 消除每帧一次的async mutex lock - 使用compare_exchange实现无锁的首帧时间设置 4. 避免重复读取config - 在encode_frame中缓存fps,避免末尾再次获取config锁 5. 编码器零拷贝优化 - H264/H265/VP8/VP9编码器使用drain()替代clone() - 减少内存分配和拷贝开销 6. MJPEG处理器优化 - 无客户端时跳过JPEG编码(WebRTC-only模式优化) 7. RKMPP YUYV直接输入支持 - hwcodec C++层添加YUYV422格式支持 - H264编码器添加Yuyv422输入格式选项 --- .../cpp/ffmpeg_ram/ffmpeg_ram_encode.cpp | 21 ++++++ src/stream/mjpeg.rs | 8 ++ src/video/encoder/h264.rs | 30 ++++++-- src/video/encoder/h265.rs | 12 +-- src/video/encoder/vp8.rs | 12 +-- src/video/encoder/vp9.rs | 12 +-- src/video/shared_video_pipeline.rs | 73 ++++++++++++++++--- src/webrtc/unified_video_track.rs | 28 +++++-- src/webrtc/video_track.rs | 12 ++- 9 files changed, 168 insertions(+), 40 deletions(-) diff --git a/libs/hwcodec/cpp/ffmpeg_ram/ffmpeg_ram_encode.cpp b/libs/hwcodec/cpp/ffmpeg_ram/ffmpeg_ram_encode.cpp index 487e555c..f0c36ae3 100644 --- a/libs/hwcodec/cpp/ffmpeg_ram/ffmpeg_ram_encode.cpp +++ b/libs/hwcodec/cpp/ffmpeg_ram/ffmpeg_ram_encode.cpp @@ -33,6 +33,14 @@ static int calculate_offset_length(int pix_fmt, int height, const int *linesize, offset[0] = linesize[0] * height; *length = offset[0] + linesize[1] * height / 2; break; + case AV_PIX_FMT_YUYV422: + case AV_PIX_FMT_YVYU422: + case AV_PIX_FMT_UYVY422: + // Packed YUV 4:2:2 formats: single plane, 2 bytes per pixel + // linesize[0] = width * 2 (YUYV/YVYU/UYVY are interleaved) + offset[0] = 0; // Only one plane + *length = linesize[0] * height; + break; default: LOG_ERROR(std::string("unsupported pixfmt") + std::to_string(pix_fmt)); return -1; @@ -413,6 +421,19 @@ private: frame->data[1] = data + offset[0]; frame->data[2] = data + offset[1]; break; + case AV_PIX_FMT_YUYV422: + case AV_PIX_FMT_YVYU422: + case AV_PIX_FMT_UYVY422: + // Packed YUV 4:2:2 formats: single plane, linesize[0] = width * 2 + if (data_length < frame->height * frame->linesize[0]) { + LOG_ERROR(std::string("fill_frame: YUYV422 data length error. data_length:") + + std::to_string(data_length) + + ", linesize[0]:" + std::to_string(frame->linesize[0]) + + ", height:" + std::to_string(frame->height)); + return -1; + } + frame->data[0] = data; + break; default: LOG_ERROR(std::string("fill_frame: unsupported format, ") + std::to_string(frame->format)); diff --git a/src/stream/mjpeg.rs b/src/stream/mjpeg.rs index e5f5d3b1..ccf4c352 100644 --- a/src/stream/mjpeg.rs +++ b/src/stream/mjpeg.rs @@ -184,6 +184,14 @@ impl MjpegStreamHandler { /// Update current frame pub fn update_frame(&self, frame: VideoFrame) { + // Skip JPEG encoding if no clients are connected (optimization for WebRTC-only mode) + // This avoids unnecessary libyuv conversion when only WebRTC is active + if self.clients.read().is_empty() && !frame.format.is_compressed() { + // Still update the online status and sequence for monitoring purposes + // but skip the expensive JPEG encoding + return; + } + // If frame is not JPEG, encode it let frame = if !frame.format.is_compressed() { match self.encode_to_jpeg(&frame) { diff --git a/src/video/encoder/h264.rs b/src/video/encoder/h264.rs index 4c8d642a..21886f97 100644 --- a/src/video/encoder/h264.rs +++ b/src/video/encoder/h264.rs @@ -99,6 +99,8 @@ pub enum H264InputFormat { Yuv420p, /// NV12 - Y plane + interleaved UV plane (optimal for VAAPI) Nv12, + /// YUYV422 - packed YUV 4:2:2 format (optimal for RKMPP direct input) + Yuyv422, } impl Default for H264InputFormat { @@ -157,6 +159,17 @@ impl H264Config { } } + /// Create config for low latency streaming with YUYV422 input (optimal for RKMPP direct input) + pub fn low_latency_yuyv422(resolution: Resolution, bitrate_kbps: u32) -> Self { + Self { + base: EncoderConfig::h264(resolution, bitrate_kbps), + bitrate_kbps, + gop_size: 30, + fps: 30, + input_format: H264InputFormat::Yuyv422, + } + } + /// Create config for quality streaming pub fn quality(resolution: Resolution, bitrate_kbps: u32) -> Self { Self { @@ -275,6 +288,7 @@ impl H264Encoder { let pixfmt = match config.input_format { H264InputFormat::Nv12 => AVPixelFormat::AV_PIX_FMT_NV12, H264InputFormat::Yuv420p => AVPixelFormat::AV_PIX_FMT_YUV420P, + H264InputFormat::Yuyv422 => AVPixelFormat::AV_PIX_FMT_YUYV422, }; info!( @@ -367,11 +381,12 @@ impl H264Encoder { match self.inner.encode(data, pts_ms) { Ok(frames) => { - // Copy frame data to owned HwEncodeFrame + // Zero-copy: drain frames from hwcodec buffer instead of cloning + // hwcodec returns &mut Vec, so we can take ownership via drain let owned_frames: Vec = frames - .iter() + .drain(..) .map(|f| HwEncodeFrame { - data: f.data.clone(), + data: f.data, // Move, not clone pts: f.pts, key: f.key, }) @@ -438,7 +453,7 @@ impl Encoder for H264Encoder { // Assume input is YUV420P let pts_ms = (sequence * 1000 / self.config.fps as u64) as i64; - let frames = self.encode_yuv420p(data, pts_ms)?; + let mut frames = self.encode_yuv420p(data, pts_ms)?; if frames.is_empty() { // Encoder needs more frames (shouldn't happen with our config) @@ -446,12 +461,12 @@ impl Encoder for H264Encoder { return Err(AppError::VideoError("Encoder returned no frames".to_string())); } - // Take the first frame - let frame = &frames[0]; + // Take ownership of the first frame (zero-copy) + let frame = frames.remove(0); let key_frame = frame.key == 1; Ok(EncodedFrame::h264( - Bytes::from(frame.data.clone()), + Bytes::from(frame.data), // Move Vec into Bytes (zero-copy) self.config.base.resolution, key_frame, sequence, @@ -479,6 +494,7 @@ impl Encoder for H264Encoder { match self.config.input_format { H264InputFormat::Nv12 => matches!(format, PixelFormat::Nv12), H264InputFormat::Yuv420p => matches!(format, PixelFormat::Yuv420), + H264InputFormat::Yuyv422 => matches!(format, PixelFormat::Yuyv), } } } diff --git a/src/video/encoder/h265.rs b/src/video/encoder/h265.rs index fb73bcdd..c2782c49 100644 --- a/src/video/encoder/h265.rs +++ b/src/video/encoder/h265.rs @@ -426,10 +426,11 @@ impl H265Encoder { match self.inner.encode(data, pts_ms) { Ok(frames) => { + // Zero-copy: drain frames from hwcodec buffer instead of cloning let owned_frames: Vec = frames - .iter() + .drain(..) .map(|f| HwEncodeFrame { - data: f.data.clone(), + data: f.data, // Move, not clone pts: f.pts, key: f.key, }) @@ -505,7 +506,7 @@ impl Encoder for H265Encoder { fn encode(&mut self, data: &[u8], sequence: u64) -> Result { let pts_ms = (sequence * 1000 / self.config.fps as u64) as i64; - let frames = self.encode_raw(data, pts_ms)?; + let mut frames = self.encode_raw(data, pts_ms)?; if frames.is_empty() { warn!("H.265 encoder returned no frames"); @@ -514,11 +515,12 @@ impl Encoder for H265Encoder { )); } - let frame = &frames[0]; + // Take ownership of the first frame (zero-copy) + let frame = frames.remove(0); let key_frame = frame.key == 1; Ok(EncodedFrame { - data: Bytes::from(frame.data.clone()), + data: Bytes::from(frame.data), // Move Vec into Bytes (zero-copy) format: EncodedFormat::H265, resolution: self.config.base.resolution, key_frame, diff --git a/src/video/encoder/vp8.rs b/src/video/encoder/vp8.rs index 1a119ea1..a9af912c 100644 --- a/src/video/encoder/vp8.rs +++ b/src/video/encoder/vp8.rs @@ -362,10 +362,11 @@ impl VP8Encoder { match self.inner.encode(data, pts_ms) { Ok(frames) => { + // Zero-copy: drain frames from hwcodec buffer instead of cloning let owned_frames: Vec = frames - .iter() + .drain(..) .map(|f| HwEncodeFrame { - data: f.data.clone(), + data: f.data, // Move, not clone pts: f.pts, key: f.key, }) @@ -416,7 +417,7 @@ impl Encoder for VP8Encoder { fn encode(&mut self, data: &[u8], sequence: u64) -> Result { let pts_ms = (sequence * 1000 / self.config.fps as u64) as i64; - let frames = self.encode_raw(data, pts_ms)?; + let mut frames = self.encode_raw(data, pts_ms)?; if frames.is_empty() { warn!("VP8 encoder returned no frames"); @@ -425,11 +426,12 @@ impl Encoder for VP8Encoder { )); } - let frame = &frames[0]; + // Take ownership of the first frame (zero-copy) + let frame = frames.remove(0); let key_frame = frame.key == 1; Ok(EncodedFrame { - data: Bytes::from(frame.data.clone()), + data: Bytes::from(frame.data), // Move Vec into Bytes (zero-copy) format: EncodedFormat::Vp8, resolution: self.config.base.resolution, key_frame, diff --git a/src/video/encoder/vp9.rs b/src/video/encoder/vp9.rs index 4c57321d..3725388b 100644 --- a/src/video/encoder/vp9.rs +++ b/src/video/encoder/vp9.rs @@ -362,10 +362,11 @@ impl VP9Encoder { match self.inner.encode(data, pts_ms) { Ok(frames) => { + // Zero-copy: drain frames from hwcodec buffer instead of cloning let owned_frames: Vec = frames - .iter() + .drain(..) .map(|f| HwEncodeFrame { - data: f.data.clone(), + data: f.data, // Move, not clone pts: f.pts, key: f.key, }) @@ -416,7 +417,7 @@ impl Encoder for VP9Encoder { fn encode(&mut self, data: &[u8], sequence: u64) -> Result { let pts_ms = (sequence * 1000 / self.config.fps as u64) as i64; - let frames = self.encode_raw(data, pts_ms)?; + let mut frames = self.encode_raw(data, pts_ms)?; if frames.is_empty() { warn!("VP9 encoder returned no frames"); @@ -425,11 +426,12 @@ impl Encoder for VP9Encoder { )); } - let frame = &frames[0]; + // Take ownership of the first frame (zero-copy) + let frame = frames.remove(0); let key_frame = frame.key == 1; Ok(EncodedFrame { - data: Bytes::from(frame.data.clone()), + data: Bytes::from(frame.data), // Move Vec into Bytes (zero-copy) format: EncodedFormat::Vp9, resolution: self.config.base.resolution, key_frame, diff --git a/src/video/shared_video_pipeline.rs b/src/video/shared_video_pipeline.rs index edaf3b79..37d3695a 100644 --- a/src/video/shared_video_pipeline.rs +++ b/src/video/shared_video_pipeline.rs @@ -17,7 +17,7 @@ //! ``` use bytes::Bytes; -use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::sync::{broadcast, watch, Mutex, RwLock}; @@ -292,6 +292,8 @@ pub struct SharedVideoPipeline { yuv420p_buffer: Mutex>, /// Whether the encoder needs YUV420P (true) or NV12 (false) encoder_needs_yuv420p: AtomicBool, + /// Whether YUYV direct input is enabled (RKMPP optimization) + yuyv_direct_input: AtomicBool, frame_tx: broadcast::Sender, stats: Mutex, running: watch::Sender, @@ -300,6 +302,9 @@ pub struct SharedVideoPipeline { sequence: AtomicU64, /// Atomic flag for keyframe request (avoids lock contention) keyframe_requested: AtomicBool, + /// Pipeline start time for PTS calculation (epoch millis, 0 = not set) + /// Uses AtomicI64 instead of Mutex for lock-free access + pipeline_start_time_ms: AtomicI64, } impl SharedVideoPipeline { @@ -329,12 +334,14 @@ impl SharedVideoPipeline { nv12_buffer: Mutex::new(vec![0u8; nv12_size]), yuv420p_buffer: Mutex::new(vec![0u8; yuv420p_size]), encoder_needs_yuv420p: AtomicBool::new(false), + yuyv_direct_input: AtomicBool::new(false), frame_tx, stats: Mutex::new(SharedVideoPipelineStats::default()), running: running_tx, running_rx, sequence: AtomicU64::new(0), keyframe_requested: AtomicBool::new(false), + pipeline_start_time_ms: AtomicI64::new(0), }); Ok(pipeline) @@ -353,18 +360,41 @@ impl SharedVideoPipeline { } }; + // Check if RKMPP backend is available for YUYV direct input optimization + let is_rkmpp_available = registry.encoder_with_backend(VideoEncoderType::H264, EncoderBackend::Rkmpp).is_some(); + let use_yuyv_direct = is_rkmpp_available && config.input_format == PixelFormat::Yuyv; + + if use_yuyv_direct { + info!("RKMPP backend detected with YUYV input, enabling YUYV direct input optimization"); + } + // Create encoder based on codec type let encoder: Box = match config.output_codec { VideoEncoderType::H264 => { + // Determine H264 input format based on backend and input format + let h264_input_format = if use_yuyv_direct { + crate::video::encoder::h264::H264InputFormat::Yuyv422 + } else { + crate::video::encoder::h264::H264InputFormat::Nv12 + }; + let encoder_config = H264Config { base: EncoderConfig::h264(config.resolution, config.bitrate_kbps), bitrate_kbps: config.bitrate_kbps, gop_size: config.gop_size, fps: config.fps, - input_format: crate::video::encoder::h264::H264InputFormat::Nv12, + input_format: h264_input_format, }; - let encoder = if let Some(ref backend) = config.encoder_backend { + let encoder = if use_yuyv_direct { + // Force RKMPP backend for YUYV direct input + let codec_name = get_codec_name(VideoEncoderType::H264, Some(EncoderBackend::Rkmpp)) + .ok_or_else(|| AppError::VideoError( + "RKMPP backend not available for H.264".to_string() + ))?; + info!("Creating H264 encoder with RKMPP backend for YUYV direct input (codec: {})", codec_name); + H264Encoder::with_codec(encoder_config, &codec_name)? + } else if let Some(ref backend) = config.encoder_backend { // Specific backend requested let codec_name = get_codec_name(VideoEncoderType::H264, Some(*backend)) .ok_or_else(|| AppError::VideoError(format!( @@ -440,15 +470,19 @@ impl SharedVideoPipeline { info!( "Encoder {} needs {} format", codec_name, - if needs_yuv420p { "YUV420P" } else { "NV12" } + if use_yuyv_direct { "YUYV422 (direct)" } else if needs_yuv420p { "YUV420P" } else { "NV12" } ); // Create converter or decoder based on input format and encoder needs info!("Initializing input format handler for: {} -> {}", config.input_format, - if needs_yuv420p { "YUV420P" } else { "NV12" }); + if use_yuyv_direct { "YUYV422 (direct)" } else if needs_yuv420p { "YUV420P" } else { "NV12" }); - let (nv12_converter, yuv420p_converter, mjpeg_decoder, mjpeg_turbo_decoder) = if needs_yuv420p { + let (nv12_converter, yuv420p_converter, mjpeg_decoder, mjpeg_turbo_decoder) = if use_yuyv_direct { + // RKMPP with YUYV direct input - skip all conversion + info!("YUYV direct input enabled for RKMPP, skipping format conversion"); + (None, None, None, None) + } else if needs_yuv420p { // Software encoder needs YUV420P match config.input_format { PixelFormat::Yuv420 => { @@ -527,6 +561,7 @@ impl SharedVideoPipeline { *self.mjpeg_decoder.lock().await = mjpeg_decoder; *self.mjpeg_turbo_decoder.lock().await = mjpeg_turbo_decoder; self.encoder_needs_yuv420p.store(needs_yuv420p, Ordering::Release); + self.yuyv_direct_input.store(use_yuyv_direct, Ordering::Release); Ok(()) } @@ -749,7 +784,28 @@ impl SharedVideoPipeline { let codec = config.output_codec; drop(config); - let pts_ms = (frame_count * 1000 / fps as u64) as i64; + // Calculate PTS from real capture timestamp (lock-free using AtomicI64) + // This ensures smooth playback even when capture timing varies + let frame_ts_ms = frame.capture_ts.elapsed().as_millis() as i64; + // Convert Instant to a comparable value (negate elapsed to get "time since epoch") + let current_ts_ms = -(frame_ts_ms); + + // Try to set start time if not yet set (first frame wins) + let start_ts = self.pipeline_start_time_ms.load(Ordering::Acquire); + let pts_ms = if start_ts == 0 { + // First frame - try to set the start time + // Use compare_exchange to ensure only one thread sets it + let _ = self.pipeline_start_time_ms.compare_exchange( + 0, + current_ts_ms, + Ordering::AcqRel, + Ordering::Acquire, + ); + 0 // First frame has PTS 0 + } else { + // Subsequent frames: PTS = current - start + current_ts_ms - start_ts + }; // Debug log for H265 if codec == VideoEncoderType::H265 && frame_count % 30 == 1 { @@ -857,13 +913,12 @@ impl SharedVideoPipeline { } } - let config = self.config.read().await; Ok(Some(EncodedVideoFrame { data: Bytes::from(encoded.data), pts_ms, is_keyframe, sequence, - duration: Duration::from_millis(1000 / config.fps as u64), + duration: Duration::from_millis(1000 / fps as u64), codec, })) } else { diff --git a/src/webrtc/unified_video_track.rs b/src/webrtc/unified_video_track.rs index 1367b5f7..3cf6e6df 100644 --- a/src/webrtc/unified_video_track.rs +++ b/src/webrtc/unified_video_track.rs @@ -490,10 +490,13 @@ impl UnifiedVideoTrack { /// Write VP8 frame (raw encoded) async fn write_vp8_frame(&self, data: &[u8], is_keyframe: bool) -> Result<()> { + // Calculate frame duration based on configured FPS + let frame_duration = Duration::from_micros(1_000_000 / self.config.fps.max(1) as u64); + // VP8 frames are sent directly let sample = Sample { data: Bytes::copy_from_slice(data), - duration: Duration::from_secs(1), + duration: frame_duration, ..Default::default() }; @@ -514,10 +517,13 @@ impl UnifiedVideoTrack { /// Write VP9 frame (raw encoded) async fn write_vp9_frame(&self, data: &[u8], is_keyframe: bool) -> Result<()> { + // Calculate frame duration based on configured FPS + let frame_duration = Duration::from_micros(1_000_000 / self.config.fps.max(1) as u64); + // VP9 frames are sent directly let sample = Sample { data: Bytes::copy_from_slice(data), - duration: Duration::from_secs(1), + duration: frame_duration, ..Default::default() }; @@ -537,25 +543,33 @@ impl UnifiedVideoTrack { } /// Send NAL units via track (for H264/H265) + /// + /// Important: Only the last NAL unit should have the frame duration set. + /// All NAL units in a frame share the same RTP timestamp, so only the last + /// one should increment the timestamp by the frame duration. async fn send_nal_units(&self, nals: Vec, is_keyframe: bool) -> Result<()> { let mut total_bytes = 0u64; - let mut nal_count = 0; + let nal_count = nals.len(); + // Calculate frame duration based on configured FPS + let frame_duration = Duration::from_micros(1_000_000 / self.config.fps.max(1) as u64); - for nal_data in nals { + for (i, nal_data) in nals.into_iter().enumerate() { + let is_last = i == nal_count - 1; + // Only the last NAL should have duration set + // This ensures all NALs in a frame share the same RTP timestamp let sample = Sample { data: nal_data.clone(), - duration: Duration::from_secs(1), + duration: if is_last { frame_duration } else { Duration::ZERO }, ..Default::default() }; if let Err(e) = self.track.write_sample(&sample).await { - if nal_count % 100 == 0 { + if i % 100 == 0 { debug!("write_sample failed (no peer?): {}", e); } } total_bytes += nal_data.len() as u64; - nal_count += 1; } if nal_count > 0 { diff --git a/src/webrtc/video_track.rs b/src/webrtc/video_track.rs index 510ba2e3..a6b6a264 100644 --- a/src/webrtc/video_track.rs +++ b/src/webrtc/video_track.rs @@ -484,17 +484,25 @@ impl UniversalVideoTrack { } /// Send NAL units as samples (H264 only) + /// + /// Important: Only the last NAL unit should have the frame duration set. + /// All NAL units in a frame share the same RTP timestamp, so only the last + /// one should increment the timestamp by the frame duration. async fn send_nals(&self, nals: Vec, is_keyframe: bool) -> Result<()> { let mut total_bytes = 0u64; // Calculate frame duration based on configured FPS let frame_duration = Duration::from_micros(1_000_000 / self.config.fps.max(1) as u64); + let nal_count = nals.len(); match &self.track { TrackType::Sample(track) => { - for nal_data in nals { + for (i, nal_data) in nals.into_iter().enumerate() { + let is_last = i == nal_count - 1; + // Only the last NAL should have duration set + // This ensures all NALs in a frame share the same RTP timestamp let sample = Sample { data: nal_data.clone(), - duration: frame_duration, + duration: if is_last { frame_duration } else { Duration::ZERO }, ..Default::default() };