feat: 迁移视频采集到 v4l2r,支持多平面设备并完善构建头文件

- 将 V4L2 采集依赖从 v4l 切换到 v4l2r

- 新增基于 v4l2r 的 mmap 采集实现,优先使用 VIDEO_CAPTURE_MPLANE

- 更新像素格式转换与设备枚举逻辑,探测阶段改为只读打开

- 增加采集错误日志节流,避免 dqbuf EINVAL 日志风暴

- 交叉编译镜像安装更新的 Linux 内核头文件供 bindgen 使用
This commit is contained in:
mofeng-git
2026-02-10 13:52:52 +08:00
parent f8a031c90c
commit 72eb2c450d
12 changed files with 779 additions and 449 deletions

View File

@@ -18,6 +18,7 @@
use bytes::Bytes;
use parking_lot::RwLock as ParkingRwLock;
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
@@ -32,16 +33,12 @@ const MIN_CAPTURE_FRAME_SIZE: usize = 128;
const JPEG_VALIDATE_INTERVAL: u64 = 30;
use crate::error::{AppError, Result};
use crate::utils::LogThrottler;
use crate::video::convert::{Nv12Converter, PixelConverter};
use crate::video::decoder::MjpegTurboDecoder;
#[cfg(any(target_arch = "aarch64", target_arch = "arm"))]
use hwcodec::ffmpeg_hw::{last_error_message as ffmpeg_hw_last_error, HwMjpegH26xConfig, HwMjpegH26xPipeline};
use v4l::buffer::Type as BufferType;
use v4l::io::traits::CaptureStream;
use v4l::prelude::*;
use v4l::video::Capture;
use v4l::video::capture::Parameters;
use v4l::Format;
use crate::video::v4l2r_capture::V4l2rCaptureStream;
use crate::video::encoder::h264::{detect_best_encoder, H264Config, H264Encoder, H264InputFormat};
use crate::video::encoder::h265::{
detect_best_h265_encoder, H265Config, H265Encoder, H265InputFormat,
@@ -1279,53 +1276,17 @@ impl SharedVideoPipeline {
let frame_seq_tx = frame_seq_tx.clone();
let buffer_pool = buffer_pool.clone();
std::thread::spawn(move || {
let device = match Device::with_path(&device_path) {
Ok(d) => d,
Err(e) => {
error!("Failed to open device {:?}: {}", device_path, e);
let _ = pipeline.running.send(false);
pipeline.running_flag.store(false, Ordering::Release);
let _ = frame_seq_tx.send(1);
return;
}
};
let requested_format = Format::new(
config.resolution.width,
config.resolution.height,
config.input_format.to_fourcc(),
);
let actual_format = match device.set_format(&requested_format) {
Ok(f) => f,
Err(e) => {
error!("Failed to set capture format: {}", e);
let _ = pipeline.running.send(false);
pipeline.running_flag.store(false, Ordering::Release);
let _ = frame_seq_tx.send(1);
return;
}
};
let resolution = Resolution::new(actual_format.width, actual_format.height);
let pixel_format =
PixelFormat::from_fourcc(actual_format.fourcc).unwrap_or(config.input_format);
let stride = actual_format.stride;
if config.fps > 0 {
if let Err(e) = device.set_params(&Parameters::with_fps(config.fps)) {
warn!("Failed to set hardware FPS: {}", e);
}
}
let mut stream = match MmapStream::with_buffers(
&device,
BufferType::VideoCapture,
let mut stream = match V4l2rCaptureStream::open(
&device_path,
config.resolution,
config.input_format,
config.fps,
buffer_count.max(1),
Duration::from_secs(2),
) {
Ok(s) => s,
Ok(stream) => stream,
Err(e) => {
error!("Failed to create capture stream: {}", e);
error!("Failed to open capture stream: {}", e);
let _ = pipeline.running.send(false);
pipeline.running_flag.store(false, Ordering::Release);
let _ = frame_seq_tx.send(1);
@@ -1333,10 +1294,27 @@ impl SharedVideoPipeline {
}
};
let resolution = stream.resolution();
let pixel_format = stream.format();
let stride = stream.stride();
let mut no_subscribers_since: Option<Instant> = None;
let grace_period = Duration::from_secs(AUTO_STOP_GRACE_PERIOD_SECS);
let mut sequence: u64 = 0;
let mut validate_counter: u64 = 0;
let capture_error_throttler = LogThrottler::with_secs(5);
let mut suppressed_capture_errors: HashMap<String, u64> = HashMap::new();
let classify_capture_error = |err: &std::io::Error| -> String {
let message = err.to_string();
if message.contains("dqbuf failed") && message.contains("EINVAL") {
"capture_dqbuf_einval".to_string()
} else if message.contains("dqbuf failed") {
"capture_dqbuf".to_string()
} else {
format!("capture_{:?}", err.kind())
}
};
while pipeline.running_flag.load(Ordering::Acquire) {
let subscriber_count = pipeline.subscriber_count();
@@ -1366,19 +1344,36 @@ impl SharedVideoPipeline {
no_subscribers_since = None;
}
let (buf, meta) = match stream.next() {
Ok(frame_data) => frame_data,
let mut owned = buffer_pool.take(MIN_CAPTURE_FRAME_SIZE);
let meta = match stream.next_into(&mut owned) {
Ok(meta) => meta,
Err(e) => {
if e.kind() == std::io::ErrorKind::TimedOut {
warn!("Capture timeout - no signal?");
} else {
error!("Capture error: {}", e);
let key = classify_capture_error(&e);
if capture_error_throttler.should_log(&key) {
let suppressed =
suppressed_capture_errors.remove(&key).unwrap_or(0);
if suppressed > 0 {
error!(
"Capture error: {} (suppressed {} repeats)",
e, suppressed
);
} else {
error!("Capture error: {}", e);
}
} else {
let counter =
suppressed_capture_errors.entry(key).or_insert(0);
*counter = counter.saturating_add(1);
}
}
continue;
}
};
let frame_size = meta.bytesused as usize;
let frame_size = meta.bytes_used;
if frame_size < MIN_CAPTURE_FRAME_SIZE {
continue;
}
@@ -1386,22 +1381,20 @@ impl SharedVideoPipeline {
validate_counter = validate_counter.wrapping_add(1);
if pixel_format.is_compressed()
&& validate_counter % JPEG_VALIDATE_INTERVAL == 0
&& !VideoFrame::is_valid_jpeg_bytes(&buf[..frame_size])
&& !VideoFrame::is_valid_jpeg_bytes(&owned[..frame_size])
{
continue;
}
let mut owned = buffer_pool.take(frame_size);
owned.resize(frame_size, 0);
owned[..frame_size].copy_from_slice(&buf[..frame_size]);
owned.truncate(frame_size);
let frame = Arc::new(VideoFrame::from_pooled(
Arc::new(FrameBuffer::new(owned, Some(buffer_pool.clone()))),
resolution,
pixel_format,
stride,
sequence,
meta.sequence,
));
sequence = sequence.wrapping_add(1);
sequence = meta.sequence.wrapping_add(1);
{
let mut guard = latest_frame.write();