fix(video): 修复 FFmpeg 硬编码 EAGAIN 刷屏并为编码错误增加日志节流

This commit is contained in:
mofeng-git
2026-03-26 22:30:53 +08:00
parent 46ae0c81e2
commit 200f947b5d
2 changed files with 93 additions and 26 deletions

View File

@@ -271,6 +271,7 @@ extern "C" int ffmpeg_hw_mjpeg_h26x_encode(FfmpegHwMjpegH26x* handle,
*out_data = nullptr; *out_data = nullptr;
*out_len = 0; *out_len = 0;
*out_keyframe = 0; *out_keyframe = 0;
bool encoded = false;
av_packet_unref(ctx->dec_pkt); av_packet_unref(ctx->dec_pkt);
int ret = av_new_packet(ctx->dec_pkt, len); int ret = av_new_packet(ctx->dec_pkt, len);
@@ -290,7 +291,7 @@ extern "C" int ffmpeg_hw_mjpeg_h26x_encode(FfmpegHwMjpegH26x* handle,
while (true) { while (true) {
ret = avcodec_receive_frame(ctx->dec_ctx, ctx->dec_frame); ret = avcodec_receive_frame(ctx->dec_ctx, ctx->dec_frame);
if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF) { if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF) {
return 0; return encoded ? 1 : 0;
} }
if (ret < 0) { if (ret < 0) {
set_last_error(make_err("avcodec_receive_frame failed", ret)); set_last_error(make_err("avcodec_receive_frame failed", ret));
@@ -370,33 +371,40 @@ extern "C" int ffmpeg_hw_mjpeg_h26x_encode(FfmpegHwMjpegH26x* handle,
return -1; return -1;
} }
av_packet_unref(ctx->enc_pkt); while (true) {
ret = avcodec_receive_packet(ctx->enc_ctx, ctx->enc_pkt); av_packet_unref(ctx->enc_pkt);
if (ret == AVERROR(EAGAIN)) { ret = avcodec_receive_packet(ctx->enc_ctx, ctx->enc_pkt);
av_frame_unref(ctx->dec_frame); if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF) {
return 0; break;
} }
if (ret < 0) { if (ret < 0) {
set_last_error(make_err("avcodec_receive_packet failed", ret)); set_last_error(make_err("avcodec_receive_packet failed", ret));
av_frame_unref(ctx->dec_frame);
return -1;
}
if (ctx->enc_pkt->size > 0) {
uint8_t *buf = (uint8_t*)malloc(ctx->enc_pkt->size);
if (!buf) {
set_last_error("malloc for output packet failed");
av_packet_unref(ctx->enc_pkt); av_packet_unref(ctx->enc_pkt);
av_frame_unref(ctx->dec_frame); av_frame_unref(ctx->dec_frame);
return -1; return -1;
} }
memcpy(buf, ctx->enc_pkt->data, ctx->enc_pkt->size);
*out_data = buf; if (ctx->enc_pkt->size <= 0) {
*out_len = ctx->enc_pkt->size; set_last_error("avcodec_receive_packet failed, pkt size is 0");
*out_keyframe = (ctx->enc_pkt->flags & AV_PKT_FLAG_KEY) ? 1 : 0; av_packet_unref(ctx->enc_pkt);
av_packet_unref(ctx->enc_pkt); av_frame_unref(ctx->dec_frame);
av_frame_unref(ctx->dec_frame); return -1;
return 1; }
if (!encoded) {
uint8_t *buf = (uint8_t*)malloc(ctx->enc_pkt->size);
if (!buf) {
set_last_error("malloc for output packet failed");
av_packet_unref(ctx->enc_pkt);
av_frame_unref(ctx->dec_frame);
return -1;
}
memcpy(buf, ctx->enc_pkt->data, ctx->enc_pkt->size);
*out_data = buf;
*out_len = ctx->enc_pkt->size;
*out_keyframe = (ctx->enc_pkt->flags & AV_PKT_FLAG_KEY) ? 1 : 0;
encoded = true;
}
} }
av_frame_unref(ctx->dec_frame); av_frame_unref(ctx->dec_frame);

View File

@@ -33,6 +33,8 @@ const CAPTURE_TIMEOUT_RESTART_THRESHOLD: u32 = 5;
const MIN_CAPTURE_FRAME_SIZE: usize = 128; const MIN_CAPTURE_FRAME_SIZE: usize = 128;
/// Validate JPEG header every N frames to reduce overhead /// Validate JPEG header every N frames to reduce overhead
const JPEG_VALIDATE_INTERVAL: u64 = 30; const JPEG_VALIDATE_INTERVAL: u64 = 30;
/// Throttle repeated encoding errors to avoid log flooding
const ENCODE_ERROR_THROTTLE_SECS: u64 = 5;
use crate::error::{AppError, Result}; use crate::error::{AppError, Result};
use crate::utils::LogThrottler; use crate::utils::LogThrottler;
@@ -161,6 +163,51 @@ impl SharedVideoPipelineConfig {
} }
} }
fn classify_encode_error(err: &AppError) -> String {
let message = err.to_string();
if message.contains("FFmpeg HW encode failed") {
if message.contains("avcodec_send_packet failed") && message.contains("ret=-11") {
"encode_ffmpeg_hw_send_packet_eagain".to_string()
} else if message.contains("avcodec_send_frame failed") && message.contains("ret=-11") {
"encode_ffmpeg_hw_send_frame_eagain".to_string()
} else if message.contains("avcodec_receive_packet failed") && message.contains("ret=-11") {
"encode_ffmpeg_hw_receive_packet_eagain".to_string()
} else if message.contains("Resource temporarily unavailable") {
"encode_ffmpeg_hw_eagain".to_string()
} else if message.contains("avcodec_send_packet failed") {
"encode_ffmpeg_hw_send_packet".to_string()
} else if message.contains("avcodec_send_frame failed") {
"encode_ffmpeg_hw_send_frame".to_string()
} else if message.contains("avcodec_receive_packet failed") {
"encode_ffmpeg_hw_receive_packet".to_string()
} else {
"encode_ffmpeg_hw".to_string()
}
} else {
format!("encode_{}", message)
}
}
fn log_encoding_error(
throttler: &LogThrottler,
suppressed_errors: &mut HashMap<String, u64>,
err: &AppError,
) {
let key = classify_encode_error(err);
if throttler.should_log(&key) {
let suppressed = suppressed_errors.remove(&key).unwrap_or(0);
if suppressed > 0 {
error!("Encoding failed: {} (suppressed {} repeats)", err, suppressed);
} else {
error!("Encoding failed: {}", err);
}
} else {
let counter = suppressed_errors.entry(key).or_insert(0);
*counter = counter.saturating_add(1);
}
}
/// Pipeline statistics /// Pipeline statistics
#[derive(Debug, Clone, Default)] #[derive(Debug, Clone, Default)]
pub struct SharedVideoPipelineStats { pub struct SharedVideoPipelineStats {
@@ -1070,6 +1117,8 @@ impl SharedVideoPipeline {
let mut last_fps_time = Instant::now(); let mut last_fps_time = Instant::now();
let mut fps_frame_count: u64 = 0; let mut fps_frame_count: u64 = 0;
let mut running_rx = pipeline.running_rx.clone(); let mut running_rx = pipeline.running_rx.clone();
let encode_error_throttler = LogThrottler::with_secs(ENCODE_ERROR_THROTTLE_SECS);
let mut suppressed_encode_errors: HashMap<String, u64> = HashMap::new();
// Track when we last had subscribers for auto-stop feature // Track when we last had subscribers for auto-stop feature
let mut no_subscribers_since: Option<Instant> = None; let mut no_subscribers_since: Option<Instant> = None;
@@ -1138,7 +1187,11 @@ impl SharedVideoPipeline {
} }
Ok(None) => {} Ok(None) => {}
Err(e) => { Err(e) => {
error!("Encoding failed: {}", e); log_encoding_error(
&encode_error_throttler,
&mut suppressed_encode_errors,
&e,
);
} }
} }
@@ -1214,6 +1267,8 @@ impl SharedVideoPipeline {
let mut last_fps_time = Instant::now(); let mut last_fps_time = Instant::now();
let mut fps_frame_count: u64 = 0; let mut fps_frame_count: u64 = 0;
let mut last_seq = *frame_seq_rx.borrow(); let mut last_seq = *frame_seq_rx.borrow();
let encode_error_throttler = LogThrottler::with_secs(ENCODE_ERROR_THROTTLE_SECS);
let mut suppressed_encode_errors: HashMap<String, u64> = HashMap::new();
while pipeline.running_flag.load(Ordering::Acquire) { while pipeline.running_flag.load(Ordering::Acquire) {
if frame_seq_rx.changed().await.is_err() { if frame_seq_rx.changed().await.is_err() {
@@ -1258,7 +1313,11 @@ impl SharedVideoPipeline {
} }
Ok(None) => {} Ok(None) => {}
Err(e) => { Err(e) => {
error!("Encoding failed: {}", e); log_encoding_error(
&encode_error_throttler,
&mut suppressed_encode_errors,
&e,
);
} }
} }