perf(video): 改善视频卡顿问题并优化编码性能

改善内容:
1. NAL单元duration累积bug修复
   - 修改video_track.rs和unified_video_track.rs
   - 只有帧内最后一个NAL获得frame_duration,其他为ZERO
   - 确保同一帧的所有NAL共享相同的RTP时间戳

2. 修复VP8/VP9固定1秒duration错误
   - 将Duration::from_secs(1)改为正确的frame_duration计算

3. PTS计算优化(shared_video_pipeline.rs)
   - 将pipeline_start_time从Mutex<Option<Instant>>改为AtomicI64
   - 消除每帧一次的async mutex lock
   - 使用compare_exchange实现无锁的首帧时间设置

4. 避免重复读取config
   - 在encode_frame中缓存fps,避免末尾再次获取config锁

5. 编码器零拷贝优化
   - H264/H265/VP8/VP9编码器使用drain()替代clone()
   - 减少内存分配和拷贝开销

6. MJPEG处理器优化
   - 无客户端时跳过JPEG编码(WebRTC-only模式优化)

7. RKMPP YUYV直接输入支持
   - hwcodec C++层添加YUYV422格式支持
   - H264编码器添加Yuyv422输入格式选项
This commit is contained in:
mofeng-git
2026-01-02 11:58:55 +08:00
parent 0fc5be21c6
commit 04e62d1e3f
9 changed files with 168 additions and 40 deletions

View File

@@ -33,6 +33,14 @@ static int calculate_offset_length(int pix_fmt, int height, const int *linesize,
offset[0] = linesize[0] * height;
*length = offset[0] + linesize[1] * height / 2;
break;
case AV_PIX_FMT_YUYV422:
case AV_PIX_FMT_YVYU422:
case AV_PIX_FMT_UYVY422:
// Packed YUV 4:2:2 formats: single plane, 2 bytes per pixel
// linesize[0] = width * 2 (YUYV/YVYU/UYVY are interleaved)
offset[0] = 0; // Only one plane
*length = linesize[0] * height;
break;
default:
LOG_ERROR(std::string("unsupported pixfmt") + std::to_string(pix_fmt));
return -1;
@@ -413,6 +421,19 @@ private:
frame->data[1] = data + offset[0];
frame->data[2] = data + offset[1];
break;
case AV_PIX_FMT_YUYV422:
case AV_PIX_FMT_YVYU422:
case AV_PIX_FMT_UYVY422:
// Packed YUV 4:2:2 formats: single plane, linesize[0] = width * 2
if (data_length < frame->height * frame->linesize[0]) {
LOG_ERROR(std::string("fill_frame: YUYV422 data length error. data_length:") +
std::to_string(data_length) +
", linesize[0]:" + std::to_string(frame->linesize[0]) +
", height:" + std::to_string(frame->height));
return -1;
}
frame->data[0] = data;
break;
default:
LOG_ERROR(std::string("fill_frame: unsupported format, ") +
std::to_string(frame->format));

View File

@@ -184,6 +184,14 @@ impl MjpegStreamHandler {
/// Update current frame
pub fn update_frame(&self, frame: VideoFrame) {
// Skip JPEG encoding if no clients are connected (optimization for WebRTC-only mode)
// This avoids unnecessary libyuv conversion when only WebRTC is active
if self.clients.read().is_empty() && !frame.format.is_compressed() {
// Still update the online status and sequence for monitoring purposes
// but skip the expensive JPEG encoding
return;
}
// If frame is not JPEG, encode it
let frame = if !frame.format.is_compressed() {
match self.encode_to_jpeg(&frame) {

View File

@@ -99,6 +99,8 @@ pub enum H264InputFormat {
Yuv420p,
/// NV12 - Y plane + interleaved UV plane (optimal for VAAPI)
Nv12,
/// YUYV422 - packed YUV 4:2:2 format (optimal for RKMPP direct input)
Yuyv422,
}
impl Default for H264InputFormat {
@@ -157,6 +159,17 @@ impl H264Config {
}
}
/// Create config for low latency streaming with YUYV422 input (optimal for RKMPP direct input)
pub fn low_latency_yuyv422(resolution: Resolution, bitrate_kbps: u32) -> Self {
Self {
base: EncoderConfig::h264(resolution, bitrate_kbps),
bitrate_kbps,
gop_size: 30,
fps: 30,
input_format: H264InputFormat::Yuyv422,
}
}
/// Create config for quality streaming
pub fn quality(resolution: Resolution, bitrate_kbps: u32) -> Self {
Self {
@@ -275,6 +288,7 @@ impl H264Encoder {
let pixfmt = match config.input_format {
H264InputFormat::Nv12 => AVPixelFormat::AV_PIX_FMT_NV12,
H264InputFormat::Yuv420p => AVPixelFormat::AV_PIX_FMT_YUV420P,
H264InputFormat::Yuyv422 => AVPixelFormat::AV_PIX_FMT_YUYV422,
};
info!(
@@ -367,11 +381,12 @@ impl H264Encoder {
match self.inner.encode(data, pts_ms) {
Ok(frames) => {
// Copy frame data to owned HwEncodeFrame
// Zero-copy: drain frames from hwcodec buffer instead of cloning
// hwcodec returns &mut Vec, so we can take ownership via drain
let owned_frames: Vec<HwEncodeFrame> = frames
.iter()
.drain(..)
.map(|f| HwEncodeFrame {
data: f.data.clone(),
data: f.data, // Move, not clone
pts: f.pts,
key: f.key,
})
@@ -438,7 +453,7 @@ impl Encoder for H264Encoder {
// Assume input is YUV420P
let pts_ms = (sequence * 1000 / self.config.fps as u64) as i64;
let frames = self.encode_yuv420p(data, pts_ms)?;
let mut frames = self.encode_yuv420p(data, pts_ms)?;
if frames.is_empty() {
// Encoder needs more frames (shouldn't happen with our config)
@@ -446,12 +461,12 @@ impl Encoder for H264Encoder {
return Err(AppError::VideoError("Encoder returned no frames".to_string()));
}
// Take the first frame
let frame = &frames[0];
// Take ownership of the first frame (zero-copy)
let frame = frames.remove(0);
let key_frame = frame.key == 1;
Ok(EncodedFrame::h264(
Bytes::from(frame.data.clone()),
Bytes::from(frame.data), // Move Vec into Bytes (zero-copy)
self.config.base.resolution,
key_frame,
sequence,
@@ -479,6 +494,7 @@ impl Encoder for H264Encoder {
match self.config.input_format {
H264InputFormat::Nv12 => matches!(format, PixelFormat::Nv12),
H264InputFormat::Yuv420p => matches!(format, PixelFormat::Yuv420),
H264InputFormat::Yuyv422 => matches!(format, PixelFormat::Yuyv),
}
}
}

View File

@@ -426,10 +426,11 @@ impl H265Encoder {
match self.inner.encode(data, pts_ms) {
Ok(frames) => {
// Zero-copy: drain frames from hwcodec buffer instead of cloning
let owned_frames: Vec<HwEncodeFrame> = frames
.iter()
.drain(..)
.map(|f| HwEncodeFrame {
data: f.data.clone(),
data: f.data, // Move, not clone
pts: f.pts,
key: f.key,
})
@@ -505,7 +506,7 @@ impl Encoder for H265Encoder {
fn encode(&mut self, data: &[u8], sequence: u64) -> Result<EncodedFrame> {
let pts_ms = (sequence * 1000 / self.config.fps as u64) as i64;
let frames = self.encode_raw(data, pts_ms)?;
let mut frames = self.encode_raw(data, pts_ms)?;
if frames.is_empty() {
warn!("H.265 encoder returned no frames");
@@ -514,11 +515,12 @@ impl Encoder for H265Encoder {
));
}
let frame = &frames[0];
// Take ownership of the first frame (zero-copy)
let frame = frames.remove(0);
let key_frame = frame.key == 1;
Ok(EncodedFrame {
data: Bytes::from(frame.data.clone()),
data: Bytes::from(frame.data), // Move Vec into Bytes (zero-copy)
format: EncodedFormat::H265,
resolution: self.config.base.resolution,
key_frame,

View File

@@ -362,10 +362,11 @@ impl VP8Encoder {
match self.inner.encode(data, pts_ms) {
Ok(frames) => {
// Zero-copy: drain frames from hwcodec buffer instead of cloning
let owned_frames: Vec<HwEncodeFrame> = frames
.iter()
.drain(..)
.map(|f| HwEncodeFrame {
data: f.data.clone(),
data: f.data, // Move, not clone
pts: f.pts,
key: f.key,
})
@@ -416,7 +417,7 @@ impl Encoder for VP8Encoder {
fn encode(&mut self, data: &[u8], sequence: u64) -> Result<EncodedFrame> {
let pts_ms = (sequence * 1000 / self.config.fps as u64) as i64;
let frames = self.encode_raw(data, pts_ms)?;
let mut frames = self.encode_raw(data, pts_ms)?;
if frames.is_empty() {
warn!("VP8 encoder returned no frames");
@@ -425,11 +426,12 @@ impl Encoder for VP8Encoder {
));
}
let frame = &frames[0];
// Take ownership of the first frame (zero-copy)
let frame = frames.remove(0);
let key_frame = frame.key == 1;
Ok(EncodedFrame {
data: Bytes::from(frame.data.clone()),
data: Bytes::from(frame.data), // Move Vec into Bytes (zero-copy)
format: EncodedFormat::Vp8,
resolution: self.config.base.resolution,
key_frame,

View File

@@ -362,10 +362,11 @@ impl VP9Encoder {
match self.inner.encode(data, pts_ms) {
Ok(frames) => {
// Zero-copy: drain frames from hwcodec buffer instead of cloning
let owned_frames: Vec<HwEncodeFrame> = frames
.iter()
.drain(..)
.map(|f| HwEncodeFrame {
data: f.data.clone(),
data: f.data, // Move, not clone
pts: f.pts,
key: f.key,
})
@@ -416,7 +417,7 @@ impl Encoder for VP9Encoder {
fn encode(&mut self, data: &[u8], sequence: u64) -> Result<EncodedFrame> {
let pts_ms = (sequence * 1000 / self.config.fps as u64) as i64;
let frames = self.encode_raw(data, pts_ms)?;
let mut frames = self.encode_raw(data, pts_ms)?;
if frames.is_empty() {
warn!("VP9 encoder returned no frames");
@@ -425,11 +426,12 @@ impl Encoder for VP9Encoder {
));
}
let frame = &frames[0];
// Take ownership of the first frame (zero-copy)
let frame = frames.remove(0);
let key_frame = frame.key == 1;
Ok(EncodedFrame {
data: Bytes::from(frame.data.clone()),
data: Bytes::from(frame.data), // Move Vec into Bytes (zero-copy)
format: EncodedFormat::Vp9,
resolution: self.config.base.resolution,
key_frame,

View File

@@ -17,7 +17,7 @@
//! ```
use bytes::Bytes;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{broadcast, watch, Mutex, RwLock};
@@ -292,6 +292,8 @@ pub struct SharedVideoPipeline {
yuv420p_buffer: Mutex<Vec<u8>>,
/// Whether the encoder needs YUV420P (true) or NV12 (false)
encoder_needs_yuv420p: AtomicBool,
/// Whether YUYV direct input is enabled (RKMPP optimization)
yuyv_direct_input: AtomicBool,
frame_tx: broadcast::Sender<EncodedVideoFrame>,
stats: Mutex<SharedVideoPipelineStats>,
running: watch::Sender<bool>,
@@ -300,6 +302,9 @@ pub struct SharedVideoPipeline {
sequence: AtomicU64,
/// Atomic flag for keyframe request (avoids lock contention)
keyframe_requested: AtomicBool,
/// Pipeline start time for PTS calculation (epoch millis, 0 = not set)
/// Uses AtomicI64 instead of Mutex for lock-free access
pipeline_start_time_ms: AtomicI64,
}
impl SharedVideoPipeline {
@@ -329,12 +334,14 @@ impl SharedVideoPipeline {
nv12_buffer: Mutex::new(vec![0u8; nv12_size]),
yuv420p_buffer: Mutex::new(vec![0u8; yuv420p_size]),
encoder_needs_yuv420p: AtomicBool::new(false),
yuyv_direct_input: AtomicBool::new(false),
frame_tx,
stats: Mutex::new(SharedVideoPipelineStats::default()),
running: running_tx,
running_rx,
sequence: AtomicU64::new(0),
keyframe_requested: AtomicBool::new(false),
pipeline_start_time_ms: AtomicI64::new(0),
});
Ok(pipeline)
@@ -353,18 +360,41 @@ impl SharedVideoPipeline {
}
};
// Check if RKMPP backend is available for YUYV direct input optimization
let is_rkmpp_available = registry.encoder_with_backend(VideoEncoderType::H264, EncoderBackend::Rkmpp).is_some();
let use_yuyv_direct = is_rkmpp_available && config.input_format == PixelFormat::Yuyv;
if use_yuyv_direct {
info!("RKMPP backend detected with YUYV input, enabling YUYV direct input optimization");
}
// Create encoder based on codec type
let encoder: Box<dyn VideoEncoderTrait + Send> = match config.output_codec {
VideoEncoderType::H264 => {
// Determine H264 input format based on backend and input format
let h264_input_format = if use_yuyv_direct {
crate::video::encoder::h264::H264InputFormat::Yuyv422
} else {
crate::video::encoder::h264::H264InputFormat::Nv12
};
let encoder_config = H264Config {
base: EncoderConfig::h264(config.resolution, config.bitrate_kbps),
bitrate_kbps: config.bitrate_kbps,
gop_size: config.gop_size,
fps: config.fps,
input_format: crate::video::encoder::h264::H264InputFormat::Nv12,
input_format: h264_input_format,
};
let encoder = if let Some(ref backend) = config.encoder_backend {
let encoder = if use_yuyv_direct {
// Force RKMPP backend for YUYV direct input
let codec_name = get_codec_name(VideoEncoderType::H264, Some(EncoderBackend::Rkmpp))
.ok_or_else(|| AppError::VideoError(
"RKMPP backend not available for H.264".to_string()
))?;
info!("Creating H264 encoder with RKMPP backend for YUYV direct input (codec: {})", codec_name);
H264Encoder::with_codec(encoder_config, &codec_name)?
} else if let Some(ref backend) = config.encoder_backend {
// Specific backend requested
let codec_name = get_codec_name(VideoEncoderType::H264, Some(*backend))
.ok_or_else(|| AppError::VideoError(format!(
@@ -440,15 +470,19 @@ impl SharedVideoPipeline {
info!(
"Encoder {} needs {} format",
codec_name,
if needs_yuv420p { "YUV420P" } else { "NV12" }
if use_yuyv_direct { "YUYV422 (direct)" } else if needs_yuv420p { "YUV420P" } else { "NV12" }
);
// Create converter or decoder based on input format and encoder needs
info!("Initializing input format handler for: {} -> {}",
config.input_format,
if needs_yuv420p { "YUV420P" } else { "NV12" });
if use_yuyv_direct { "YUYV422 (direct)" } else if needs_yuv420p { "YUV420P" } else { "NV12" });
let (nv12_converter, yuv420p_converter, mjpeg_decoder, mjpeg_turbo_decoder) = if needs_yuv420p {
let (nv12_converter, yuv420p_converter, mjpeg_decoder, mjpeg_turbo_decoder) = if use_yuyv_direct {
// RKMPP with YUYV direct input - skip all conversion
info!("YUYV direct input enabled for RKMPP, skipping format conversion");
(None, None, None, None)
} else if needs_yuv420p {
// Software encoder needs YUV420P
match config.input_format {
PixelFormat::Yuv420 => {
@@ -527,6 +561,7 @@ impl SharedVideoPipeline {
*self.mjpeg_decoder.lock().await = mjpeg_decoder;
*self.mjpeg_turbo_decoder.lock().await = mjpeg_turbo_decoder;
self.encoder_needs_yuv420p.store(needs_yuv420p, Ordering::Release);
self.yuyv_direct_input.store(use_yuyv_direct, Ordering::Release);
Ok(())
}
@@ -749,7 +784,28 @@ impl SharedVideoPipeline {
let codec = config.output_codec;
drop(config);
let pts_ms = (frame_count * 1000 / fps as u64) as i64;
// Calculate PTS from real capture timestamp (lock-free using AtomicI64)
// This ensures smooth playback even when capture timing varies
let frame_ts_ms = frame.capture_ts.elapsed().as_millis() as i64;
// Convert Instant to a comparable value (negate elapsed to get "time since epoch")
let current_ts_ms = -(frame_ts_ms);
// Try to set start time if not yet set (first frame wins)
let start_ts = self.pipeline_start_time_ms.load(Ordering::Acquire);
let pts_ms = if start_ts == 0 {
// First frame - try to set the start time
// Use compare_exchange to ensure only one thread sets it
let _ = self.pipeline_start_time_ms.compare_exchange(
0,
current_ts_ms,
Ordering::AcqRel,
Ordering::Acquire,
);
0 // First frame has PTS 0
} else {
// Subsequent frames: PTS = current - start
current_ts_ms - start_ts
};
// Debug log for H265
if codec == VideoEncoderType::H265 && frame_count % 30 == 1 {
@@ -857,13 +913,12 @@ impl SharedVideoPipeline {
}
}
let config = self.config.read().await;
Ok(Some(EncodedVideoFrame {
data: Bytes::from(encoded.data),
pts_ms,
is_keyframe,
sequence,
duration: Duration::from_millis(1000 / config.fps as u64),
duration: Duration::from_millis(1000 / fps as u64),
codec,
}))
} else {

View File

@@ -490,10 +490,13 @@ impl UnifiedVideoTrack {
/// Write VP8 frame (raw encoded)
async fn write_vp8_frame(&self, data: &[u8], is_keyframe: bool) -> Result<()> {
// Calculate frame duration based on configured FPS
let frame_duration = Duration::from_micros(1_000_000 / self.config.fps.max(1) as u64);
// VP8 frames are sent directly
let sample = Sample {
data: Bytes::copy_from_slice(data),
duration: Duration::from_secs(1),
duration: frame_duration,
..Default::default()
};
@@ -514,10 +517,13 @@ impl UnifiedVideoTrack {
/// Write VP9 frame (raw encoded)
async fn write_vp9_frame(&self, data: &[u8], is_keyframe: bool) -> Result<()> {
// Calculate frame duration based on configured FPS
let frame_duration = Duration::from_micros(1_000_000 / self.config.fps.max(1) as u64);
// VP9 frames are sent directly
let sample = Sample {
data: Bytes::copy_from_slice(data),
duration: Duration::from_secs(1),
duration: frame_duration,
..Default::default()
};
@@ -537,25 +543,33 @@ impl UnifiedVideoTrack {
}
/// Send NAL units via track (for H264/H265)
///
/// Important: Only the last NAL unit should have the frame duration set.
/// All NAL units in a frame share the same RTP timestamp, so only the last
/// one should increment the timestamp by the frame duration.
async fn send_nal_units(&self, nals: Vec<Bytes>, is_keyframe: bool) -> Result<()> {
let mut total_bytes = 0u64;
let mut nal_count = 0;
let nal_count = nals.len();
// Calculate frame duration based on configured FPS
let frame_duration = Duration::from_micros(1_000_000 / self.config.fps.max(1) as u64);
for nal_data in nals {
for (i, nal_data) in nals.into_iter().enumerate() {
let is_last = i == nal_count - 1;
// Only the last NAL should have duration set
// This ensures all NALs in a frame share the same RTP timestamp
let sample = Sample {
data: nal_data.clone(),
duration: Duration::from_secs(1),
duration: if is_last { frame_duration } else { Duration::ZERO },
..Default::default()
};
if let Err(e) = self.track.write_sample(&sample).await {
if nal_count % 100 == 0 {
if i % 100 == 0 {
debug!("write_sample failed (no peer?): {}", e);
}
}
total_bytes += nal_data.len() as u64;
nal_count += 1;
}
if nal_count > 0 {

View File

@@ -484,17 +484,25 @@ impl UniversalVideoTrack {
}
/// Send NAL units as samples (H264 only)
///
/// Important: Only the last NAL unit should have the frame duration set.
/// All NAL units in a frame share the same RTP timestamp, so only the last
/// one should increment the timestamp by the frame duration.
async fn send_nals(&self, nals: Vec<Bytes>, is_keyframe: bool) -> Result<()> {
let mut total_bytes = 0u64;
// Calculate frame duration based on configured FPS
let frame_duration = Duration::from_micros(1_000_000 / self.config.fps.max(1) as u64);
let nal_count = nals.len();
match &self.track {
TrackType::Sample(track) => {
for nal_data in nals {
for (i, nal_data) in nals.into_iter().enumerate() {
let is_last = i == nal_count - 1;
// Only the last NAL should have duration set
// This ensures all NALs in a frame share the same RTP timestamp
let sample = Sample {
data: nal_data.clone(),
duration: frame_duration,
duration: if is_last { frame_duration } else { Duration::ZERO },
..Default::default()
};