From 72eb2c450dc3535e3f7a8cd7508ddee72bf9001c Mon Sep 17 00:00:00 2001 From: mofeng-git Date: Tue, 10 Feb 2026 13:52:52 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E8=BF=81=E7=A7=BB=E8=A7=86=E9=A2=91?= =?UTF-8?q?=E9=87=87=E9=9B=86=E5=88=B0=20v4l2r=EF=BC=8C=E6=94=AF=E6=8C=81?= =?UTF-8?q?=E5=A4=9A=E5=B9=B3=E9=9D=A2=E8=AE=BE=E5=A4=87=E5=B9=B6=E5=AE=8C?= =?UTF-8?q?=E5=96=84=E6=9E=84=E5=BB=BA=E5=A4=B4=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 将 V4L2 采集依赖从 v4l 切换到 v4l2r - 新增基于 v4l2r 的 mmap 采集实现,优先使用 VIDEO_CAPTURE_MPLANE - 更新像素格式转换与设备枚举逻辑,探测阶段改为只读打开 - 增加采集错误日志节流,避免 dqbuf EINVAL 日志风暴 - 交叉编译镜像安装更新的 Linux 内核头文件供 bindgen 使用 --- Cargo.toml | 2 +- build/cross/Dockerfile.arm64 | 21 ++- build/cross/Dockerfile.armv7 | 21 ++- build/cross/Dockerfile.x86_64 | 23 ++- src/stream/mjpeg_streamer.rs | 135 ++++++-------- src/video/capture.rs | 175 ++++++++---------- src/video/device.rs | 266 ++++++++++++++++----------- src/video/format.rs | 50 +++-- src/video/mod.rs | 1 + src/video/shared_video_pipeline.rs | 113 ++++++------ src/video/streamer.rs | 137 ++++++-------- src/video/v4l2r_capture.rs | 284 +++++++++++++++++++++++++++++ 12 files changed, 779 insertions(+), 449 deletions(-) create mode 100644 src/video/v4l2r_capture.rs diff --git a/Cargo.toml b/Cargo.toml index 23687974..1ac350c5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,7 +66,7 @@ clap = { version = "4", features = ["derive"] } time = "0.3" # Video capture (V4L2) -v4l = "0.14" +v4l2r = "0.0.7" # JPEG encoding (libjpeg-turbo, SIMD accelerated) turbojpeg = "1.3" diff --git a/build/cross/Dockerfile.arm64 b/build/cross/Dockerfile.arm64 index d0542c1b..9b6e849a 100644 --- a/build/cross/Dockerfile.arm64 +++ b/build/cross/Dockerfile.arm64 @@ -3,9 +3,13 @@ FROM debian:11 +# Linux headers used by v4l2r bindgen +ARG LINUX_HEADERS_VERSION=6.6 +ARG LINUX_HEADERS_SHA256= + # Set Rustup mirrors (Aliyun) -ENV RUSTUP_UPDATE_ROOT=https://mirrors.aliyun.com/rustup/rustup \ - RUSTUP_DIST_SERVER=https://mirrors.aliyun.com/rustup +#ENV RUSTUP_UPDATE_ROOT=https://mirrors.aliyun.com/rustup/rustup \ +# RUSTUP_DIST_SERVER=https://mirrors.aliyun.com/rustup # Install Rust toolchain RUN apt-get update && apt-get install -y --no-install-recommends \ @@ -31,6 +35,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ meson \ ninja-build \ wget \ + xz-utils \ file \ gcc-aarch64-linux-gnu \ g++-aarch64-linux-gnu \ @@ -47,10 +52,22 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ libv4l-dev:arm64 \ libudev-dev:arm64 \ zlib1g-dev:arm64 \ + linux-libc-dev:arm64 \ # Note: libjpeg-turbo, libyuv, libvpx, libx264, libx265, libopus are built from source below for static linking libdrm-dev:arm64 \ && rm -rf /var/lib/apt/lists/* +# Install newer V4L2 headers for v4l2r bindgen +RUN mkdir -p /opt/v4l2-headers \ + && wget -q https://cdn.kernel.org/pub/linux/kernel/v6.x/linux-${LINUX_HEADERS_VERSION}.tar.xz -O /tmp/linux-headers.tar.xz \ + && if [ -n "$LINUX_HEADERS_SHA256" ]; then echo "$LINUX_HEADERS_SHA256 /tmp/linux-headers.tar.xz" | sha256sum -c -; fi \ + && tar -xf /tmp/linux-headers.tar.xz -C /tmp \ + && cd /tmp/linux-${LINUX_HEADERS_VERSION} \ + && make ARCH=arm64 headers_install INSTALL_HDR_PATH=/opt/v4l2-headers \ + && rm -rf /tmp/linux-${LINUX_HEADERS_VERSION} /tmp/linux-headers.tar.xz + +ENV V4L2R_VIDEODEV2_H_PATH=/opt/v4l2-headers/include + # Build static libjpeg-turbo from source (cross-compile for ARM64) RUN git clone --depth 1 https://github.com/libjpeg-turbo/libjpeg-turbo /tmp/libjpeg-turbo \ && cd /tmp/libjpeg-turbo \ diff --git a/build/cross/Dockerfile.armv7 b/build/cross/Dockerfile.armv7 index 3bebfc6f..fc908f82 100644 --- a/build/cross/Dockerfile.armv7 +++ b/build/cross/Dockerfile.armv7 @@ -3,9 +3,13 @@ FROM debian:11 +# Linux headers used by v4l2r bindgen +ARG LINUX_HEADERS_VERSION=6.6 +ARG LINUX_HEADERS_SHA256= + # Set Rustup mirrors (Aliyun) -ENV RUSTUP_UPDATE_ROOT=https://mirrors.aliyun.com/rustup/rustup \ - RUSTUP_DIST_SERVER=https://mirrors.aliyun.com/rustup +#ENV RUSTUP_UPDATE_ROOT=https://mirrors.aliyun.com/rustup/rustup \ +# RUSTUP_DIST_SERVER=https://mirrors.aliyun.com/rustup # Install Rust toolchain RUN apt-get update && apt-get install -y --no-install-recommends \ @@ -31,6 +35,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ meson \ ninja-build \ wget \ + xz-utils \ file \ gcc-arm-linux-gnueabihf \ g++-arm-linux-gnueabihf \ @@ -46,10 +51,22 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ libasound2-dev:armhf \ libv4l-dev:armhf \ libudev-dev:armhf \ + linux-libc-dev:armhf \ zlib1g-dev:armhf \ libdrm-dev:armhf \ && rm -rf /var/lib/apt/lists/* +# Install newer V4L2 headers for v4l2r bindgen +RUN mkdir -p /opt/v4l2-headers \ + && wget -q https://cdn.kernel.org/pub/linux/kernel/v6.x/linux-${LINUX_HEADERS_VERSION}.tar.xz -O /tmp/linux-headers.tar.xz \ + && if [ -n "$LINUX_HEADERS_SHA256" ]; then echo "$LINUX_HEADERS_SHA256 /tmp/linux-headers.tar.xz" | sha256sum -c -; fi \ + && tar -xf /tmp/linux-headers.tar.xz -C /tmp \ + && cd /tmp/linux-${LINUX_HEADERS_VERSION} \ + && make ARCH=arm headers_install INSTALL_HDR_PATH=/opt/v4l2-headers \ + && rm -rf /tmp/linux-${LINUX_HEADERS_VERSION} /tmp/linux-headers.tar.xz + +ENV V4L2R_VIDEODEV2_H_PATH=/opt/v4l2-headers/include + # Build static libjpeg-turbo from source (cross-compile for ARMv7) RUN git clone --depth 1 https://github.com/libjpeg-turbo/libjpeg-turbo /tmp/libjpeg-turbo \ && cd /tmp/libjpeg-turbo \ diff --git a/build/cross/Dockerfile.x86_64 b/build/cross/Dockerfile.x86_64 index 779b1b02..ebaf9a82 100644 --- a/build/cross/Dockerfile.x86_64 +++ b/build/cross/Dockerfile.x86_64 @@ -3,9 +3,13 @@ FROM debian:11 +# Linux headers used by v4l2r bindgen +ARG LINUX_HEADERS_VERSION=6.6 +ARG LINUX_HEADERS_SHA256= + # Set Rustup mirrors (Aliyun) -ENV RUSTUP_UPDATE_ROOT=https://mirrors.aliyun.com/rustup/rustup \ - RUSTUP_DIST_SERVER=https://mirrors.aliyun.com/rustup +#ENV RUSTUP_UPDATE_ROOT=https://mirrors.aliyun.com/rustup/rustup \ +# RUSTUP_DIST_SERVER=https://mirrors.aliyun.com/rustup # Install Rust toolchain RUN apt-get update && apt-get install -y --no-install-recommends \ @@ -29,6 +33,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ libclang-dev \ llvm \ wget \ + xz-utils \ # Autotools for libopus (requires autoreconf) autoconf \ automake \ @@ -37,6 +42,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ libasound2-dev \ libv4l-dev \ libudev-dev \ + linux-libc-dev \ zlib1g-dev \ # Note: libjpeg-turbo, libx264, libx265, libopus are built from source below for static linking libva-dev \ @@ -49,6 +55,17 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ libxdmcp-dev \ && rm -rf /var/lib/apt/lists/* +# Install newer V4L2 headers for v4l2r bindgen +RUN mkdir -p /opt/v4l2-headers \ + && wget -q https://cdn.kernel.org/pub/linux/kernel/v6.x/linux-${LINUX_HEADERS_VERSION}.tar.xz -O /tmp/linux-headers.tar.xz \ + && if [ -n "$LINUX_HEADERS_SHA256" ]; then echo "$LINUX_HEADERS_SHA256 /tmp/linux-headers.tar.xz" | sha256sum -c -; fi \ + && tar -xf /tmp/linux-headers.tar.xz -C /tmp \ + && cd /tmp/linux-${LINUX_HEADERS_VERSION} \ + && make ARCH=x86 headers_install INSTALL_HDR_PATH=/opt/v4l2-headers \ + && rm -rf /tmp/linux-${LINUX_HEADERS_VERSION} /tmp/linux-headers.tar.xz + +ENV V4L2R_VIDEODEV2_H_PATH=/opt/v4l2-headers/include + # Build static libjpeg-turbo from source (needed by libyuv) RUN git clone --depth 1 https://github.com/libjpeg-turbo/libjpeg-turbo /tmp/libjpeg-turbo \ && cd /tmp/libjpeg-turbo \ @@ -208,4 +225,4 @@ RUN rustup target add x86_64-unknown-linux-gnu # Configure environment for static linking ENV PKG_CONFIG_ALLOW_CROSS=1\ FFMPEG_STATIC=1 \ - LIBYUV_STATIC=1 \ No newline at end of file + LIBYUV_STATIC=1 diff --git a/src/stream/mjpeg_streamer.rs b/src/stream/mjpeg_streamer.rs index 79e0fb38..d9219123 100644 --- a/src/stream/mjpeg_streamer.rs +++ b/src/stream/mjpeg_streamer.rs @@ -16,17 +16,15 @@ //! Note: Audio WebSocket is handled separately by audio_ws.rs (/api/ws/audio) use std::io; +use std::collections::HashMap; use std::path::PathBuf; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +use std::time::Duration; use tokio::sync::{Mutex, RwLock}; use tracing::{error, info, warn}; -use v4l::buffer::Type as BufferType; -use v4l::io::traits::CaptureStream; -use v4l::prelude::*; -use v4l::video::Capture; -use v4l::video::capture::Parameters; -use v4l::Format; +use crate::video::v4l2r_capture::V4l2rCaptureStream; +use crate::utils::LogThrottler; use crate::audio::AudioController; use crate::error::{AppError, Result}; @@ -491,8 +489,7 @@ impl MjpegStreamer { } }; - let mut device_opt: Option = None; - let mut format_opt: Option = None; + let mut stream_opt: Option = None; let mut last_error: Option = None; for attempt in 0..MAX_RETRIES { @@ -501,8 +498,18 @@ impl MjpegStreamer { return; } - let device = match Device::with_path(&device_path) { - Ok(d) => d, + match V4l2rCaptureStream::open( + &device_path, + config.resolution, + config.format, + config.fps, + 4, + Duration::from_secs(2), + ) { + Ok(stream) => { + stream_opt = Some(stream); + break; + } Err(e) => { let err_str = e.to_string(); if err_str.contains("busy") || err_str.contains("resource") { @@ -519,42 +526,12 @@ impl MjpegStreamer { last_error = Some(err_str); break; } - }; - - let requested = Format::new( - config.resolution.width, - config.resolution.height, - config.format.to_fourcc(), - ); - - match device.set_format(&requested) { - Ok(actual) => { - device_opt = Some(device); - format_opt = Some(actual); - break; - } - Err(e) => { - let err_str = e.to_string(); - if err_str.contains("busy") || err_str.contains("resource") { - warn!( - "Device busy on set_format attempt {}/{}, retrying in {}ms...", - attempt + 1, - MAX_RETRIES, - RETRY_DELAY_MS - ); - std::thread::sleep(std::time::Duration::from_millis(RETRY_DELAY_MS)); - last_error = Some(err_str); - continue; - } - last_error = Some(err_str); - break; - } } } - let (device, actual_format) = match (device_opt, format_opt) { - (Some(d), Some(f)) => (d, f), - _ => { + let mut stream = match stream_opt { + Some(stream) => stream, + None => { error!( "Failed to open device {:?}: {}", device_path, @@ -567,40 +544,36 @@ impl MjpegStreamer { } }; + let resolution = stream.resolution(); + let pixel_format = stream.format(); + let stride = stream.stride(); + info!( "Capture format: {}x{} {:?} stride={}", - actual_format.width, actual_format.height, actual_format.fourcc, actual_format.stride + resolution.width, resolution.height, pixel_format, stride ); - let resolution = Resolution::new(actual_format.width, actual_format.height); - let pixel_format = - PixelFormat::from_fourcc(actual_format.fourcc).unwrap_or(config.format); - - if config.fps > 0 { - if let Err(e) = device.set_params(&Parameters::with_fps(config.fps)) { - warn!("Failed to set hardware FPS: {}", e); - } - } - - let mut stream = match MmapStream::with_buffers(&device, BufferType::VideoCapture, 4) { - Ok(s) => s, - Err(e) => { - error!("Failed to create capture stream: {}", e); - set_state(MjpegStreamerState::Error); - self.mjpeg_handler.set_offline(); - self.direct_active.store(false, Ordering::SeqCst); - return; - } - }; - let buffer_pool = Arc::new(FrameBufferPool::new(8)); let mut signal_present = true; - let mut sequence: u64 = 0; let mut validate_counter: u64 = 0; + let capture_error_throttler = LogThrottler::with_secs(5); + let mut suppressed_capture_errors: HashMap = HashMap::new(); + + let classify_capture_error = |err: &std::io::Error| -> String { + let message = err.to_string(); + if message.contains("dqbuf failed") && message.contains("EINVAL") { + "capture_dqbuf_einval".to_string() + } else if message.contains("dqbuf failed") { + "capture_dqbuf".to_string() + } else { + format!("capture_{:?}", err.kind()) + } + }; while !self.direct_stop.load(Ordering::Relaxed) { - let (buf, meta) = match stream.next() { - Ok(frame_data) => frame_data, + let mut owned = buffer_pool.take(MIN_CAPTURE_FRAME_SIZE); + let meta = match stream.next_into(&mut owned) { + Ok(meta) => meta, Err(e) => { if e.kind() == io::ErrorKind::TimedOut { if signal_present { @@ -628,12 +601,23 @@ impl MjpegStreamer { return; } - error!("Capture error: {}", e); + let key = classify_capture_error(&e); + if capture_error_throttler.should_log(&key) { + let suppressed = suppressed_capture_errors.remove(&key).unwrap_or(0); + if suppressed > 0 { + error!("Capture error: {} (suppressed {} repeats)", e, suppressed); + } else { + error!("Capture error: {}", e); + } + } else { + let counter = suppressed_capture_errors.entry(key).or_insert(0); + *counter = counter.saturating_add(1); + } continue; } }; - let frame_size = meta.bytesused as usize; + let frame_size = meta.bytes_used; if frame_size < MIN_CAPTURE_FRAME_SIZE { continue; } @@ -641,22 +625,19 @@ impl MjpegStreamer { validate_counter = validate_counter.wrapping_add(1); if pixel_format.is_compressed() && validate_counter % JPEG_VALIDATE_INTERVAL == 0 - && !VideoFrame::is_valid_jpeg_bytes(&buf[..frame_size]) + && !VideoFrame::is_valid_jpeg_bytes(&owned[..frame_size]) { continue; } - let mut owned = buffer_pool.take(frame_size); - owned.resize(frame_size, 0); - owned[..frame_size].copy_from_slice(&buf[..frame_size]); + owned.truncate(frame_size); let frame = VideoFrame::from_pooled( Arc::new(FrameBuffer::new(owned, Some(buffer_pool.clone()))), resolution, pixel_format, - actual_format.stride, - sequence, + stride, + meta.sequence, ); - sequence = sequence.wrapping_add(1); if !signal_present { signal_present = true; diff --git a/src/video/capture.rs b/src/video/capture.rs index 8701521f..e2218ac9 100644 --- a/src/video/capture.rs +++ b/src/video/capture.rs @@ -2,6 +2,7 @@ //! //! Provides async video capture using memory-mapped buffers. +use std::collections::HashMap; use std::io; use std::path::{Path, PathBuf}; use bytes::Bytes; @@ -10,16 +11,12 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::sync::{watch, Mutex}; use tracing::{debug, error, info, warn}; -use v4l::buffer::Type as BufferType; -use v4l::io::traits::CaptureStream; -use v4l::prelude::*; -use v4l::video::capture::Parameters; -use v4l::video::Capture; -use v4l::Format; use super::format::{PixelFormat, Resolution}; use super::frame::VideoFrame; use crate::error::{AppError, Result}; +use crate::utils::LogThrottler; +use crate::video::v4l2r_capture::V4l2rCaptureStream; /// Default number of capture buffers (reduced from 4 to 2 for lower latency) const DEFAULT_BUFFER_COUNT: u32 = 2; @@ -280,9 +277,15 @@ fn run_capture( return Ok(()); } - // Open device - let device = match Device::with_path(&config.device_path) { - Ok(d) => d, + let stream = match V4l2rCaptureStream::open( + &config.device_path, + config.resolution, + config.format, + config.fps, + config.buffer_count, + config.timeout, + ) { + Ok(stream) => stream, Err(e) => { let err_str = e.to_string(); if err_str.contains("busy") || err_str.contains("resource") { @@ -306,34 +309,7 @@ fn run_capture( } }; - // Set format - let format = Format::new( - config.resolution.width, - config.resolution.height, - config.format.to_fourcc(), - ); - - let actual_format = match device.set_format(&format) { - Ok(f) => f, - Err(e) => { - let err_str = e.to_string(); - if err_str.contains("busy") || err_str.contains("resource") { - warn!( - "Device busy on set_format attempt {}/{}, retrying in {}ms...", - attempt + 1, - MAX_RETRIES, - RETRY_DELAY_MS - ); - std::thread::sleep(Duration::from_millis(RETRY_DELAY_MS)); - last_error = Some(AppError::VideoError(format!("Failed to set format: {}", e))); - continue; - } - return Err(AppError::VideoError(format!("Failed to set format: {}", e))); - } - }; - - // Device opened and format set successfully - proceed with capture - return run_capture_inner(config, state, stats, stop_flag, device, actual_format); + return run_capture_inner(config, state, stats, stop_flag, stream); } // All retries exhausted @@ -348,48 +324,16 @@ fn run_capture_inner( state: &watch::Sender, stats: &Arc>, stop_flag: &AtomicBool, - device: Device, - actual_format: Format, + mut stream: V4l2rCaptureStream, ) -> Result<()> { + let resolution = stream.resolution(); + let pixel_format = stream.format(); + let stride = stream.stride(); info!( "Capture format: {}x{} {:?} stride={}", - actual_format.width, actual_format.height, actual_format.fourcc, actual_format.stride + resolution.width, resolution.height, pixel_format, stride ); - - // Try to set hardware FPS (V4L2 VIDIOC_S_PARM) - if config.fps > 0 { - match device.set_params(&Parameters::with_fps(config.fps)) { - Ok(actual_params) => { - // Extract actual FPS from returned interval (numerator/denominator) - let actual_hw_fps = if actual_params.interval.numerator > 0 { - actual_params.interval.denominator / actual_params.interval.numerator - } else { - 0 - }; - - if actual_hw_fps == config.fps { - info!("Hardware FPS set successfully: {} fps", actual_hw_fps); - } else if actual_hw_fps > 0 { - info!( - "Hardware FPS coerced: requested {} fps, got {} fps", - config.fps, actual_hw_fps - ); - } else { - warn!("Hardware FPS setting returned invalid interval"); - } - } - Err(e) => { - warn!("Failed to set hardware FPS: {}", e); - } - } - } - - // Create stream with mmap buffers - let mut stream = - MmapStream::with_buffers(&device, BufferType::VideoCapture, config.buffer_count) - .map_err(|e| AppError::VideoError(format!("Failed to create stream: {}", e)))?; - let _ = state.send(CaptureState::Running); info!("Capture started"); @@ -397,12 +341,25 @@ fn run_capture_inner( let mut fps_frame_count = 0u64; let mut fps_window_start = Instant::now(); let fps_window_duration = Duration::from_secs(1); + let mut scratch = Vec::new(); + let capture_error_throttler = LogThrottler::with_secs(5); + let mut suppressed_capture_errors: HashMap = HashMap::new(); + + let classify_capture_error = |err: &std::io::Error| -> String { + let message = err.to_string(); + if message.contains("dqbuf failed") && message.contains("EINVAL") { + "capture_dqbuf_einval".to_string() + } else if message.contains("dqbuf failed") { + "capture_dqbuf".to_string() + } else { + format!("capture_{:?}", err.kind()) + } + }; // Main capture loop while !stop_flag.load(Ordering::Relaxed) { - // Try to capture a frame - let (_buf, meta) = match stream.next() { - Ok(frame_data) => frame_data, + let meta = match stream.next_into(&mut scratch) { + Ok(meta) => meta, Err(e) => { if e.kind() == io::ErrorKind::TimedOut { warn!("Capture timeout - no signal?"); @@ -432,19 +389,30 @@ fn run_capture_inner( }); } - error!("Capture error: {}", e); + let key = classify_capture_error(&e); + if capture_error_throttler.should_log(&key) { + let suppressed = suppressed_capture_errors.remove(&key).unwrap_or(0); + if suppressed > 0 { + error!("Capture error: {} (suppressed {} repeats)", e, suppressed); + } else { + error!("Capture error: {}", e); + } + } else { + let counter = suppressed_capture_errors.entry(key).or_insert(0); + *counter = counter.saturating_add(1); + } continue; } }; // Use actual bytes used, not buffer size - let frame_size = meta.bytesused as usize; + let frame_size = meta.bytes_used; // Validate frame if frame_size < MIN_FRAME_SIZE { debug!( "Dropping small frame: {} bytes (bytesused={})", - frame_size, meta.bytesused + frame_size, meta.bytes_used ); continue; } @@ -470,6 +438,10 @@ fn run_capture_inner( s.current_fps = (fps_frame_count as f32 / elapsed.as_secs_f32()).max(0.0); } } + + if *state.borrow() == CaptureState::NoSignal { + let _ = state.send(CaptureState::Running); + } } info!("Capture stopped"); @@ -525,38 +497,37 @@ fn grab_single_frame( resolution: Resolution, format: PixelFormat, ) -> Result { - let device = Device::with_path(device_path) - .map_err(|e| AppError::VideoError(format!("Failed to open device: {}", e)))?; - - let fmt = Format::new(resolution.width, resolution.height, format.to_fourcc()); - let actual = device - .set_format(&fmt) - .map_err(|e| AppError::VideoError(format!("Failed to set format: {}", e)))?; - - let mut stream = MmapStream::with_buffers(&device, BufferType::VideoCapture, 2) - .map_err(|e| AppError::VideoError(format!("Failed to create stream: {}", e)))?; + let mut stream = V4l2rCaptureStream::open( + device_path, + resolution, + format, + 0, + 2, + Duration::from_secs(DEFAULT_TIMEOUT), + )?; + let actual_resolution = stream.resolution(); + let actual_format = stream.format(); + let actual_stride = stream.stride(); + let mut scratch = Vec::new(); // Try to get a valid frame (skip first few which might be bad) for attempt in 0..5 { - match stream.next() { - Ok((buf, _meta)) => { - if buf.len() >= MIN_FRAME_SIZE { - let actual_format = PixelFormat::from_fourcc(actual.fourcc).unwrap_or(format); - + match stream.next_into(&mut scratch) { + Ok(meta) => { + if meta.bytes_used >= MIN_FRAME_SIZE { return Ok(VideoFrame::new( - Bytes::copy_from_slice(buf), - Resolution::new(actual.width, actual.height), + Bytes::copy_from_slice(&scratch[..meta.bytes_used]), + actual_resolution, actual_format, - actual.stride, + actual_stride, 0, )); } } - Err(e) => { - if attempt == 4 { - return Err(AppError::VideoError(format!("Failed to grab frame: {}", e))); - } + Err(e) if attempt == 4 => { + return Err(AppError::VideoError(format!("Failed to grab frame: {}", e))); } + Err(_) => {} } } diff --git a/src/video/device.rs b/src/video/device.rs index c99b4786..543340f6 100644 --- a/src/video/device.rs +++ b/src/video/device.rs @@ -1,15 +1,17 @@ //! V4L2 device enumeration and capability query use serde::{Deserialize, Serialize}; +use std::fs::File; use std::path::{Path, PathBuf}; use std::sync::mpsc; use std::time::Duration; use tracing::{debug, info, warn}; -use v4l::capability::Flags; -use v4l::prelude::*; -use v4l::video::Capture; -use v4l::Format; -use v4l::FourCC; +use v4l2r::nix::errno::Errno; +use v4l2r::bindings::{v4l2_frmivalenum, v4l2_frmsizeenum}; +use v4l2r::ioctl::{ + self, Capabilities, Capability as V4l2rCapability, FormatIterator, FrmIvalTypes, FrmSizeTypes, +}; +use v4l2r::{Format as V4l2rFormat, QueueType}; use super::format::{PixelFormat, Resolution}; use crate::error::{AppError, Result}; @@ -81,7 +83,7 @@ pub struct DeviceCapabilities { /// Wrapper around a V4L2 video device pub struct VideoDevice { pub path: PathBuf, - device: Device, + fd: File, } impl VideoDevice { @@ -90,42 +92,54 @@ impl VideoDevice { let path = path.as_ref().to_path_buf(); debug!("Opening video device: {:?}", path); - let device = Device::with_path(&path).map_err(|e| { - AppError::VideoError(format!("Failed to open device {:?}: {}", path, e)) - })?; + let fd = File::options() + .read(true) + .write(true) + .open(&path) + .map_err(|e| AppError::VideoError(format!("Failed to open device {:?}: {}", path, e)))?; - Ok(Self { path, device }) + Ok(Self { path, fd }) + } + + /// Open a video device read-only (for probing/enumeration) + pub fn open_readonly(path: impl AsRef) -> Result { + let path = path.as_ref().to_path_buf(); + debug!("Opening video device (read-only): {:?}", path); + + let fd = File::options() + .read(true) + .open(&path) + .map_err(|e| AppError::VideoError(format!("Failed to open device {:?}: {}", path, e)))?; + + Ok(Self { path, fd }) } /// Get device capabilities pub fn capabilities(&self) -> Result { - let caps = self - .device - .query_caps() + let caps: V4l2rCapability = ioctl::querycap(&self.fd) .map_err(|e| AppError::VideoError(format!("Failed to query capabilities: {}", e)))?; + let flags = caps.device_caps(); Ok(DeviceCapabilities { - video_capture: caps.capabilities.contains(Flags::VIDEO_CAPTURE), - video_capture_mplane: caps.capabilities.contains(Flags::VIDEO_CAPTURE_MPLANE), - video_output: caps.capabilities.contains(Flags::VIDEO_OUTPUT), - streaming: caps.capabilities.contains(Flags::STREAMING), - read_write: caps.capabilities.contains(Flags::READ_WRITE), + video_capture: flags.contains(Capabilities::VIDEO_CAPTURE), + video_capture_mplane: flags.contains(Capabilities::VIDEO_CAPTURE_MPLANE), + video_output: flags.contains(Capabilities::VIDEO_OUTPUT), + streaming: flags.contains(Capabilities::STREAMING), + read_write: flags.contains(Capabilities::READWRITE), }) } /// Get detailed device information pub fn info(&self) -> Result { - let caps = self - .device - .query_caps() + let caps: V4l2rCapability = ioctl::querycap(&self.fd) .map_err(|e| AppError::VideoError(format!("Failed to query capabilities: {}", e)))?; - + let flags = caps.device_caps(); let capabilities = DeviceCapabilities { - video_capture: caps.capabilities.contains(Flags::VIDEO_CAPTURE), - video_capture_mplane: caps.capabilities.contains(Flags::VIDEO_CAPTURE_MPLANE), - video_output: caps.capabilities.contains(Flags::VIDEO_OUTPUT), - streaming: caps.capabilities.contains(Flags::STREAMING), - read_write: caps.capabilities.contains(Flags::READ_WRITE), + video_capture: flags.contains(Capabilities::VIDEO_CAPTURE), + video_capture_mplane: flags.contains(Capabilities::VIDEO_CAPTURE_MPLANE), + video_output: flags.contains(Capabilities::VIDEO_OUTPUT), + streaming: flags.contains(Capabilities::STREAMING), + read_write: flags.contains(Capabilities::READWRITE), }; let formats = self.enumerate_formats()?; @@ -141,7 +155,7 @@ impl VideoDevice { path: self.path.clone(), name: caps.card.clone(), driver: caps.driver.clone(), - bus_info: caps.bus.clone(), + bus_info: caps.bus_info.clone(), card: caps.card, formats, capabilities, @@ -154,16 +168,13 @@ impl VideoDevice { pub fn enumerate_formats(&self) -> Result> { let mut formats = Vec::new(); - // Get supported formats - let format_descs = self - .device - .enum_formats() - .map_err(|e| AppError::VideoError(format!("Failed to enumerate formats: {}", e)))?; + let queue = self.capture_queue_type()?; + let format_descs = FormatIterator::new(&self.fd, queue); for desc in format_descs { // Try to convert FourCC to our PixelFormat - if let Some(format) = PixelFormat::from_fourcc(desc.fourcc) { - let resolutions = self.enumerate_resolutions(desc.fourcc)?; + if let Some(format) = PixelFormat::from_v4l2r(desc.pixelformat) { + let resolutions = self.enumerate_resolutions(desc.pixelformat)?; formats.push(FormatInfo { format, @@ -173,7 +184,7 @@ impl VideoDevice { } else { debug!( "Skipping unsupported format: {:?} ({})", - desc.fourcc, desc.description + desc.pixelformat, desc.description ); } } @@ -185,46 +196,53 @@ impl VideoDevice { } /// Enumerate resolutions for a specific format - fn enumerate_resolutions(&self, fourcc: FourCC) -> Result> { + fn enumerate_resolutions(&self, fourcc: v4l2r::PixelFormat) -> Result> { let mut resolutions = Vec::new(); - // Try to enumerate frame sizes - match self.device.enum_framesizes(fourcc) { - Ok(sizes) => { - for size in sizes { - match size.size { - v4l::framesize::FrameSizeEnum::Discrete(d) => { - let fps = self - .enumerate_fps(fourcc, d.width, d.height) - .unwrap_or_default(); - resolutions.push(ResolutionInfo::new(d.width, d.height, fps)); - } - v4l::framesize::FrameSizeEnum::Stepwise(s) => { - // For stepwise, add some common resolutions - for res in [ - Resolution::VGA, - Resolution::HD720, - Resolution::HD1080, - Resolution::UHD4K, - ] { - if res.width >= s.min_width - && res.width <= s.max_width - && res.height >= s.min_height - && res.height <= s.max_height - { - let fps = self - .enumerate_fps(fourcc, res.width, res.height) - .unwrap_or_default(); - resolutions - .push(ResolutionInfo::new(res.width, res.height, fps)); + let mut index = 0u32; + loop { + match ioctl::enum_frame_sizes::(&self.fd, index, fourcc) { + Ok(size) => { + if let Some(size) = size.size() { + match size { + FrmSizeTypes::Discrete(d) => { + let fps = + self.enumerate_fps(fourcc, d.width, d.height).unwrap_or_default(); + resolutions.push(ResolutionInfo::new(d.width, d.height, fps)); + } + FrmSizeTypes::StepWise(s) => { + for res in [ + Resolution::VGA, + Resolution::HD720, + Resolution::HD1080, + Resolution::UHD4K, + ] { + if res.width >= s.min_width + && res.width <= s.max_width + && res.height >= s.min_height + && res.height <= s.max_height + { + let fps = self + .enumerate_fps(fourcc, res.width, res.height) + .unwrap_or_default(); + resolutions.push(ResolutionInfo::new(res.width, res.height, fps)); + } } } } } + index += 1; + } + Err(e) => { + let is_einval = matches!( + e, + v4l2r::ioctl::FrameSizeError::IoctlError(err) if err == Errno::EINVAL + ); + if !is_einval { + debug!("Failed to enumerate frame sizes for {:?}: {}", fourcc, e); + } + break; } - } - Err(e) => { - debug!("Failed to enumerate frame sizes for {:?}: {}", fourcc, e); } } @@ -236,36 +254,59 @@ impl VideoDevice { } /// Enumerate FPS for a specific resolution - fn enumerate_fps(&self, fourcc: FourCC, width: u32, height: u32) -> Result> { + fn enumerate_fps( + &self, + fourcc: v4l2r::PixelFormat, + width: u32, + height: u32, + ) -> Result> { let mut fps_list = Vec::new(); - match self.device.enum_frameintervals(fourcc, width, height) { - Ok(intervals) => { - for interval in intervals { - match interval.interval { - v4l::frameinterval::FrameIntervalEnum::Discrete(fraction) => { - if fraction.numerator > 0 { - let fps = fraction.denominator / fraction.numerator; - fps_list.push(fps); + let mut index = 0u32; + loop { + match ioctl::enum_frame_intervals::( + &self.fd, + index, + fourcc, + width, + height, + ) { + Ok(interval) => { + if let Some(interval) = interval.intervals() { + match interval { + FrmIvalTypes::Discrete(fraction) => { + if fraction.numerator > 0 { + let fps = fraction.denominator / fraction.numerator; + fps_list.push(fps); + } } - } - v4l::frameinterval::FrameIntervalEnum::Stepwise(step) => { - // Just pick max/min/step - if step.max.numerator > 0 { - let min_fps = step.max.denominator / step.max.numerator; - let max_fps = step.min.denominator / step.min.numerator; - fps_list.push(min_fps); - if max_fps != min_fps { - fps_list.push(max_fps); + FrmIvalTypes::StepWise(step) => { + if step.max.numerator > 0 { + let min_fps = step.max.denominator / step.max.numerator; + let max_fps = step.min.denominator / step.min.numerator; + fps_list.push(min_fps); + if max_fps != min_fps { + fps_list.push(max_fps); + } } } } } + index += 1; + } + Err(e) => { + let is_einval = matches!( + e, + v4l2r::ioctl::FrameIntervalsError::IoctlError(err) if err == Errno::EINVAL + ); + if !is_einval { + debug!( + "Failed to enumerate frame intervals for {:?} {}x{}: {}", + fourcc, width, height, e + ); + } + break; } - } - Err(_) => { - // If enumeration fails, assume 30fps - fps_list.push(30); } } @@ -275,20 +316,26 @@ impl VideoDevice { } /// Get current format - pub fn get_format(&self) -> Result { - self.device - .format() + pub fn get_format(&self) -> Result { + let queue = self.capture_queue_type()?; + ioctl::g_fmt(&self.fd, queue) .map_err(|e| AppError::VideoError(format!("Failed to get format: {}", e))) } /// Set capture format - pub fn set_format(&self, width: u32, height: u32, format: PixelFormat) -> Result { - let fmt = Format::new(width, height, format.to_fourcc()); + pub fn set_format(&self, width: u32, height: u32, format: PixelFormat) -> Result { + let queue = self.capture_queue_type()?; + let mut fmt: V4l2rFormat = ioctl::g_fmt(&self.fd, queue) + .map_err(|e| AppError::VideoError(format!("Failed to get format: {}", e)))?; + fmt.width = width; + fmt.height = height; + fmt.pixelformat = format.to_v4l2r(); - // Request the format - let actual = self - .device - .set_format(&fmt) + let mut fd = self + .fd + .try_clone() + .map_err(|e| AppError::VideoError(format!("Failed to clone device fd: {}", e)))?; + let actual: V4l2rFormat = ioctl::s_fmt(&mut fd, (queue, &fmt)) .map_err(|e| AppError::VideoError(format!("Failed to set format: {}", e)))?; if actual.width != width || actual.height != height { @@ -376,8 +423,21 @@ impl VideoDevice { } /// Get the inner device reference (for advanced usage) - pub fn inner(&self) -> &Device { - &self.device + pub fn inner(&self) -> &File { + &self.fd + } + + fn capture_queue_type(&self) -> Result { + let caps = self.capabilities()?; + if caps.video_capture { + Ok(QueueType::VideoCapture) + } else if caps.video_capture_mplane { + Ok(QueueType::VideoCaptureMplane) + } else { + Err(AppError::VideoError( + "Device does not expose a capture queue".to_string(), + )) + } } } @@ -446,7 +506,7 @@ fn probe_device_with_timeout(path: &Path, timeout: Duration) -> Option Result { - let device = VideoDevice::open(&path_for_thread)?; + let device = VideoDevice::open_readonly(&path_for_thread)?; device.info() })(); let _ = tx.send(result); diff --git a/src/video/format.rs b/src/video/format.rs index 4097ae6f..f794dcfe 100644 --- a/src/video/format.rs +++ b/src/video/format.rs @@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize}; use std::fmt; -use v4l::format::fourcc; +use v4l2r::PixelFormat as V4l2rPixelFormat; /// Supported pixel formats #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] @@ -41,30 +41,29 @@ pub enum PixelFormat { } impl PixelFormat { - /// Convert to V4L2 FourCC - pub fn to_fourcc(&self) -> fourcc::FourCC { + /// Convert to V4L2 FourCC bytes + pub fn to_fourcc(&self) -> [u8; 4] { match self { - PixelFormat::Mjpeg => fourcc::FourCC::new(b"MJPG"), - PixelFormat::Jpeg => fourcc::FourCC::new(b"JPEG"), - PixelFormat::Yuyv => fourcc::FourCC::new(b"YUYV"), - PixelFormat::Yvyu => fourcc::FourCC::new(b"YVYU"), - PixelFormat::Uyvy => fourcc::FourCC::new(b"UYVY"), - PixelFormat::Nv12 => fourcc::FourCC::new(b"NV12"), - PixelFormat::Nv21 => fourcc::FourCC::new(b"NV21"), - PixelFormat::Nv16 => fourcc::FourCC::new(b"NV16"), - PixelFormat::Nv24 => fourcc::FourCC::new(b"NV24"), - PixelFormat::Yuv420 => fourcc::FourCC::new(b"YU12"), - PixelFormat::Yvu420 => fourcc::FourCC::new(b"YV12"), - PixelFormat::Rgb565 => fourcc::FourCC::new(b"RGBP"), - PixelFormat::Rgb24 => fourcc::FourCC::new(b"RGB3"), - PixelFormat::Bgr24 => fourcc::FourCC::new(b"BGR3"), - PixelFormat::Grey => fourcc::FourCC::new(b"GREY"), + PixelFormat::Mjpeg => *b"MJPG", + PixelFormat::Jpeg => *b"JPEG", + PixelFormat::Yuyv => *b"YUYV", + PixelFormat::Yvyu => *b"YVYU", + PixelFormat::Uyvy => *b"UYVY", + PixelFormat::Nv12 => *b"NV12", + PixelFormat::Nv21 => *b"NV21", + PixelFormat::Nv16 => *b"NV16", + PixelFormat::Nv24 => *b"NV24", + PixelFormat::Yuv420 => *b"YU12", + PixelFormat::Yvu420 => *b"YV12", + PixelFormat::Rgb565 => *b"RGBP", + PixelFormat::Rgb24 => *b"RGB3", + PixelFormat::Bgr24 => *b"BGR3", + PixelFormat::Grey => *b"GREY", } } /// Try to convert from V4L2 FourCC - pub fn from_fourcc(fourcc: fourcc::FourCC) -> Option { - let repr = fourcc.repr; + pub fn from_fourcc(repr: [u8; 4]) -> Option { match &repr { b"MJPG" => Some(PixelFormat::Mjpeg), b"JPEG" => Some(PixelFormat::Jpeg), @@ -85,6 +84,17 @@ impl PixelFormat { } } + /// Convert to v4l2r PixelFormat + pub fn to_v4l2r(&self) -> V4l2rPixelFormat { + V4l2rPixelFormat::from(&self.to_fourcc()) + } + + /// Convert from v4l2r PixelFormat + pub fn from_v4l2r(format: V4l2rPixelFormat) -> Option { + let repr: [u8; 4] = format.into(); + Self::from_fourcc(repr) + } + /// Check if format is compressed (JPEG/MJPEG) pub fn is_compressed(&self) -> bool { matches!(self, PixelFormat::Mjpeg | PixelFormat::Jpeg) diff --git a/src/video/mod.rs b/src/video/mod.rs index b5664f48..f13385a9 100644 --- a/src/video/mod.rs +++ b/src/video/mod.rs @@ -13,6 +13,7 @@ pub mod h264_pipeline; pub mod shared_video_pipeline; pub mod stream_manager; pub mod streamer; +pub mod v4l2r_capture; pub mod video_session; pub use capture::VideoCapturer; diff --git a/src/video/shared_video_pipeline.rs b/src/video/shared_video_pipeline.rs index 6ab721ff..55fd660f 100644 --- a/src/video/shared_video_pipeline.rs +++ b/src/video/shared_video_pipeline.rs @@ -18,6 +18,7 @@ use bytes::Bytes; use parking_lot::RwLock as ParkingRwLock; +use std::collections::HashMap; use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -32,16 +33,12 @@ const MIN_CAPTURE_FRAME_SIZE: usize = 128; const JPEG_VALIDATE_INTERVAL: u64 = 30; use crate::error::{AppError, Result}; +use crate::utils::LogThrottler; use crate::video::convert::{Nv12Converter, PixelConverter}; use crate::video::decoder::MjpegTurboDecoder; #[cfg(any(target_arch = "aarch64", target_arch = "arm"))] use hwcodec::ffmpeg_hw::{last_error_message as ffmpeg_hw_last_error, HwMjpegH26xConfig, HwMjpegH26xPipeline}; -use v4l::buffer::Type as BufferType; -use v4l::io::traits::CaptureStream; -use v4l::prelude::*; -use v4l::video::Capture; -use v4l::video::capture::Parameters; -use v4l::Format; +use crate::video::v4l2r_capture::V4l2rCaptureStream; use crate::video::encoder::h264::{detect_best_encoder, H264Config, H264Encoder, H264InputFormat}; use crate::video::encoder::h265::{ detect_best_h265_encoder, H265Config, H265Encoder, H265InputFormat, @@ -1279,53 +1276,17 @@ impl SharedVideoPipeline { let frame_seq_tx = frame_seq_tx.clone(); let buffer_pool = buffer_pool.clone(); std::thread::spawn(move || { - let device = match Device::with_path(&device_path) { - Ok(d) => d, - Err(e) => { - error!("Failed to open device {:?}: {}", device_path, e); - let _ = pipeline.running.send(false); - pipeline.running_flag.store(false, Ordering::Release); - let _ = frame_seq_tx.send(1); - return; - } - }; - - let requested_format = Format::new( - config.resolution.width, - config.resolution.height, - config.input_format.to_fourcc(), - ); - - let actual_format = match device.set_format(&requested_format) { - Ok(f) => f, - Err(e) => { - error!("Failed to set capture format: {}", e); - let _ = pipeline.running.send(false); - pipeline.running_flag.store(false, Ordering::Release); - let _ = frame_seq_tx.send(1); - return; - } - }; - - let resolution = Resolution::new(actual_format.width, actual_format.height); - let pixel_format = - PixelFormat::from_fourcc(actual_format.fourcc).unwrap_or(config.input_format); - let stride = actual_format.stride; - - if config.fps > 0 { - if let Err(e) = device.set_params(&Parameters::with_fps(config.fps)) { - warn!("Failed to set hardware FPS: {}", e); - } - } - - let mut stream = match MmapStream::with_buffers( - &device, - BufferType::VideoCapture, + let mut stream = match V4l2rCaptureStream::open( + &device_path, + config.resolution, + config.input_format, + config.fps, buffer_count.max(1), + Duration::from_secs(2), ) { - Ok(s) => s, + Ok(stream) => stream, Err(e) => { - error!("Failed to create capture stream: {}", e); + error!("Failed to open capture stream: {}", e); let _ = pipeline.running.send(false); pipeline.running_flag.store(false, Ordering::Release); let _ = frame_seq_tx.send(1); @@ -1333,10 +1294,27 @@ impl SharedVideoPipeline { } }; + let resolution = stream.resolution(); + let pixel_format = stream.format(); + let stride = stream.stride(); + let mut no_subscribers_since: Option = None; let grace_period = Duration::from_secs(AUTO_STOP_GRACE_PERIOD_SECS); let mut sequence: u64 = 0; let mut validate_counter: u64 = 0; + let capture_error_throttler = LogThrottler::with_secs(5); + let mut suppressed_capture_errors: HashMap = HashMap::new(); + + let classify_capture_error = |err: &std::io::Error| -> String { + let message = err.to_string(); + if message.contains("dqbuf failed") && message.contains("EINVAL") { + "capture_dqbuf_einval".to_string() + } else if message.contains("dqbuf failed") { + "capture_dqbuf".to_string() + } else { + format!("capture_{:?}", err.kind()) + } + }; while pipeline.running_flag.load(Ordering::Acquire) { let subscriber_count = pipeline.subscriber_count(); @@ -1366,19 +1344,36 @@ impl SharedVideoPipeline { no_subscribers_since = None; } - let (buf, meta) = match stream.next() { - Ok(frame_data) => frame_data, + let mut owned = buffer_pool.take(MIN_CAPTURE_FRAME_SIZE); + let meta = match stream.next_into(&mut owned) { + Ok(meta) => meta, Err(e) => { if e.kind() == std::io::ErrorKind::TimedOut { warn!("Capture timeout - no signal?"); } else { - error!("Capture error: {}", e); + let key = classify_capture_error(&e); + if capture_error_throttler.should_log(&key) { + let suppressed = + suppressed_capture_errors.remove(&key).unwrap_or(0); + if suppressed > 0 { + error!( + "Capture error: {} (suppressed {} repeats)", + e, suppressed + ); + } else { + error!("Capture error: {}", e); + } + } else { + let counter = + suppressed_capture_errors.entry(key).or_insert(0); + *counter = counter.saturating_add(1); + } } continue; } }; - let frame_size = meta.bytesused as usize; + let frame_size = meta.bytes_used; if frame_size < MIN_CAPTURE_FRAME_SIZE { continue; } @@ -1386,22 +1381,20 @@ impl SharedVideoPipeline { validate_counter = validate_counter.wrapping_add(1); if pixel_format.is_compressed() && validate_counter % JPEG_VALIDATE_INTERVAL == 0 - && !VideoFrame::is_valid_jpeg_bytes(&buf[..frame_size]) + && !VideoFrame::is_valid_jpeg_bytes(&owned[..frame_size]) { continue; } - let mut owned = buffer_pool.take(frame_size); - owned.resize(frame_size, 0); - owned[..frame_size].copy_from_slice(&buf[..frame_size]); + owned.truncate(frame_size); let frame = Arc::new(VideoFrame::from_pooled( Arc::new(FrameBuffer::new(owned, Some(buffer_pool.clone()))), resolution, pixel_format, stride, - sequence, + meta.sequence, )); - sequence = sequence.wrapping_add(1); + sequence = meta.sequence.wrapping_add(1); { let mut guard = latest_frame.write(); diff --git a/src/video/streamer.rs b/src/video/streamer.rs index 2b11e744..af2e073f 100644 --- a/src/video/streamer.rs +++ b/src/video/streamer.rs @@ -3,9 +3,11 @@ //! This module provides a high-level interface for video capture and streaming, //! managing the lifecycle of the capture thread and MJPEG/WebRTC distribution. +use std::collections::HashMap; use std::path::PathBuf; use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use std::sync::Arc; +use std::time::Duration; use tokio::sync::RwLock; use tracing::{debug, error, info, trace, warn}; @@ -15,12 +17,8 @@ use super::frame::{FrameBuffer, FrameBufferPool, VideoFrame}; use crate::error::{AppError, Result}; use crate::events::{EventBus, SystemEvent}; use crate::stream::MjpegStreamHandler; -use v4l::buffer::Type as BufferType; -use v4l::io::traits::CaptureStream; -use v4l::prelude::*; -use v4l::video::capture::Parameters; -use v4l::video::Capture; -use v4l::Format; +use crate::utils::LogThrottler; +use crate::video::v4l2r_capture::V4l2rCaptureStream; /// Minimum valid frame size for capture const MIN_CAPTURE_FRAME_SIZE: usize = 128; @@ -632,8 +630,7 @@ impl Streamer { } }; - let mut device_opt: Option = None; - let mut format_opt: Option = None; + let mut stream_opt: Option = None; let mut last_error: Option = None; for attempt in 0..MAX_RETRIES { @@ -642,8 +639,18 @@ impl Streamer { return; } - let device = match Device::with_path(&device_path) { - Ok(d) => d, + match V4l2rCaptureStream::open( + &device_path, + config.resolution, + config.format, + config.fps, + BUFFER_COUNT, + Duration::from_secs(2), + ) { + Ok(stream) => { + stream_opt = Some(stream); + break; + } Err(e) => { let err_str = e.to_string(); if err_str.contains("busy") || err_str.contains("resource") { @@ -660,42 +667,12 @@ impl Streamer { last_error = Some(err_str); break; } - }; - - let requested = Format::new( - config.resolution.width, - config.resolution.height, - config.format.to_fourcc(), - ); - - match device.set_format(&requested) { - Ok(actual) => { - device_opt = Some(device); - format_opt = Some(actual); - break; - } - Err(e) => { - let err_str = e.to_string(); - if err_str.contains("busy") || err_str.contains("resource") { - warn!( - "Device busy on set_format attempt {}/{}, retrying in {}ms...", - attempt + 1, - MAX_RETRIES, - RETRY_DELAY_MS - ); - std::thread::sleep(std::time::Duration::from_millis(RETRY_DELAY_MS)); - last_error = Some(err_str); - continue; - } - last_error = Some(err_str); - break; - } } } - let (device, actual_format) = match (device_opt, format_opt) { - (Some(d), Some(f)) => (d, f), - _ => { + let mut stream = match stream_opt { + Some(stream) => stream, + None => { error!( "Failed to open device {:?}: {}", device_path, @@ -709,42 +686,35 @@ impl Streamer { } }; + let resolution = stream.resolution(); + let pixel_format = stream.format(); + let stride = stream.stride(); + info!( "Capture format: {}x{} {:?} stride={}", - actual_format.width, actual_format.height, actual_format.fourcc, actual_format.stride + resolution.width, resolution.height, pixel_format, stride ); - let resolution = Resolution::new(actual_format.width, actual_format.height); - let pixel_format = - PixelFormat::from_fourcc(actual_format.fourcc).unwrap_or(config.format); - - if config.fps > 0 { - if let Err(e) = device.set_params(&Parameters::with_fps(config.fps)) { - warn!("Failed to set hardware FPS: {}", e); - } - } - - let mut stream = - match MmapStream::with_buffers(&device, BufferType::VideoCapture, BUFFER_COUNT) { - Ok(s) => s, - Err(e) => { - error!("Failed to create capture stream: {}", e); - self.mjpeg_handler.set_offline(); - set_state(StreamerState::Error); - self.direct_active.store(false, Ordering::SeqCst); - self.current_fps.store(0, Ordering::Relaxed); - return; - } - }; - let buffer_pool = Arc::new(FrameBufferPool::new(BUFFER_COUNT.max(4) as usize)); let mut signal_present = true; - let mut sequence: u64 = 0; let mut validate_counter: u64 = 0; let mut idle_since: Option = None; let mut fps_frame_count: u64 = 0; let mut last_fps_time = std::time::Instant::now(); + let capture_error_throttler = LogThrottler::with_secs(5); + let mut suppressed_capture_errors: HashMap = HashMap::new(); + + let classify_capture_error = |err: &std::io::Error| -> String { + let message = err.to_string(); + if message.contains("dqbuf failed") && message.contains("EINVAL") { + "capture_dqbuf_einval".to_string() + } else if message.contains("dqbuf failed") { + "capture_dqbuf".to_string() + } else { + format!("capture_{:?}", err.kind()) + } + }; while !self.direct_stop.load(Ordering::Relaxed) { let mjpeg_clients = self.mjpeg_handler.client_count(); @@ -768,8 +738,9 @@ impl Streamer { idle_since = None; } - let (buf, meta) = match stream.next() { - Ok(frame_data) => frame_data, + let mut owned = buffer_pool.take(MIN_CAPTURE_FRAME_SIZE); + let meta = match stream.next_into(&mut owned) { + Ok(meta) => meta, Err(e) => { if e.kind() == std::io::ErrorKind::TimedOut { if signal_present { @@ -811,12 +782,23 @@ impl Streamer { break; } - error!("Capture error: {}", e); + let key = classify_capture_error(&e); + if capture_error_throttler.should_log(&key) { + let suppressed = suppressed_capture_errors.remove(&key).unwrap_or(0); + if suppressed > 0 { + error!("Capture error: {} (suppressed {} repeats)", e, suppressed); + } else { + error!("Capture error: {}", e); + } + } else { + let counter = suppressed_capture_errors.entry(key).or_insert(0); + *counter = counter.saturating_add(1); + } continue; } }; - let frame_size = meta.bytesused as usize; + let frame_size = meta.bytes_used; if frame_size < MIN_CAPTURE_FRAME_SIZE { continue; } @@ -824,22 +806,19 @@ impl Streamer { validate_counter = validate_counter.wrapping_add(1); if pixel_format.is_compressed() && validate_counter % JPEG_VALIDATE_INTERVAL == 0 - && !VideoFrame::is_valid_jpeg_bytes(&buf[..frame_size]) + && !VideoFrame::is_valid_jpeg_bytes(&owned[..frame_size]) { continue; } - let mut owned = buffer_pool.take(frame_size); - owned.resize(frame_size, 0); - owned[..frame_size].copy_from_slice(&buf[..frame_size]); + owned.truncate(frame_size); let frame = VideoFrame::from_pooled( Arc::new(FrameBuffer::new(owned, Some(buffer_pool.clone()))), resolution, pixel_format, - actual_format.stride, - sequence, + stride, + meta.sequence, ); - sequence = sequence.wrapping_add(1); if !signal_present { signal_present = true; diff --git a/src/video/v4l2r_capture.rs b/src/video/v4l2r_capture.rs new file mode 100644 index 00000000..cd23f263 --- /dev/null +++ b/src/video/v4l2r_capture.rs @@ -0,0 +1,284 @@ +//! V4L2 capture implementation using v4l2r (ioctl layer). + +use std::fs::File; +use std::io; +use std::os::fd::AsFd; +use std::path::Path; +use std::time::Duration; + +use nix::poll::{poll, PollFd, PollFlags, PollTimeout}; +use tracing::{debug, warn}; +use v4l2r::bindings::{v4l2_requestbuffers, v4l2_streamparm, v4l2_streamparm__bindgen_ty_1}; +use v4l2r::ioctl::{ + self, Capabilities, Capability as V4l2rCapability, MemoryConsistency, PlaneMapping, + QBufPlane, QBuffer, QueryBuffer, V4l2Buffer, +}; +use v4l2r::memory::{MemoryType, MmapHandle}; +use v4l2r::{Format as V4l2rFormat, PixelFormat as V4l2rPixelFormat, QueueType}; + +use crate::error::{AppError, Result}; +use crate::video::format::{PixelFormat, Resolution}; + +/// Metadata for a captured frame. +#[derive(Debug, Clone, Copy)] +pub struct CaptureMeta { + pub bytes_used: usize, + pub sequence: u64, +} + +/// V4L2 capture stream backed by v4l2r ioctl. +pub struct V4l2rCaptureStream { + fd: File, + queue: QueueType, + resolution: Resolution, + format: PixelFormat, + stride: u32, + timeout: Duration, + mappings: Vec>, +} + +impl V4l2rCaptureStream { + pub fn open( + device_path: impl AsRef, + resolution: Resolution, + format: PixelFormat, + fps: u32, + buffer_count: u32, + timeout: Duration, + ) -> Result { + let mut fd = File::options() + .read(true) + .write(true) + .open(device_path.as_ref()) + .map_err(|e| AppError::VideoError(format!("Failed to open device: {}", e)))?; + + let caps: V4l2rCapability = ioctl::querycap(&fd) + .map_err(|e| AppError::VideoError(format!("Failed to query capabilities: {}", e)))?; + let caps_flags = caps.device_caps(); + + // Prefer multi-planar capture when available, as it is required for some + // devices/pixel formats (e.g. NV12 via VIDEO_CAPTURE_MPLANE). + let queue = if caps_flags.contains(Capabilities::VIDEO_CAPTURE_MPLANE) { + QueueType::VideoCaptureMplane + } else if caps_flags.contains(Capabilities::VIDEO_CAPTURE) { + QueueType::VideoCapture + } else { + return Err(AppError::VideoError( + "Device does not support capture queues".to_string(), + )); + }; + + let mut fmt: V4l2rFormat = ioctl::g_fmt(&fd, queue).map_err(|e| { + AppError::VideoError(format!("Failed to get device format: {}", e)) + })?; + + fmt.width = resolution.width; + fmt.height = resolution.height; + fmt.pixelformat = V4l2rPixelFormat::from(&format.to_fourcc()); + + let actual_fmt: V4l2rFormat = ioctl::s_fmt(&mut fd, (queue, &fmt)).map_err(|e| { + AppError::VideoError(format!("Failed to set device format: {}", e)) + })?; + + let actual_resolution = Resolution::new(actual_fmt.width, actual_fmt.height); + let actual_format = PixelFormat::from_v4l2r(actual_fmt.pixelformat).unwrap_or(format); + + let stride = actual_fmt + .plane_fmt + .get(0) + .map(|p| p.bytesperline) + .unwrap_or_else(|| match actual_format.bytes_per_pixel() { + Some(bpp) => actual_resolution.width * bpp as u32, + None => actual_resolution.width, + }); + + if fps > 0 { + if let Err(e) = set_fps(&fd, queue, fps) { + warn!("Failed to set hardware FPS: {}", e); + } + } + + let req: v4l2_requestbuffers = ioctl::reqbufs( + &fd, + queue, + MemoryType::Mmap, + buffer_count, + MemoryConsistency::empty(), + ) + .map_err(|e| AppError::VideoError(format!("Failed to request buffers: {}", e)))?; + let allocated = req.count as usize; + if allocated == 0 { + return Err(AppError::VideoError( + "Driver returned zero capture buffers".to_string(), + )); + } + + let mut mappings = Vec::with_capacity(allocated); + for index in 0..allocated as u32 { + let query: QueryBuffer = ioctl::querybuf(&fd, queue, index as usize).map_err(|e| { + AppError::VideoError(format!("Failed to query buffer {}: {}", index, e)) + })?; + + if query.planes.is_empty() { + return Err(AppError::VideoError(format!( + "Driver returned zero planes for buffer {}", + index + ))); + } + + let mut plane_maps = Vec::with_capacity(query.planes.len()); + for plane in &query.planes { + let mapping = ioctl::mmap(&fd, plane.mem_offset, plane.length).map_err(|e| { + AppError::VideoError(format!( + "Failed to mmap buffer {}: {}", + index, e + )) + })?; + plane_maps.push(mapping); + } + mappings.push(plane_maps); + } + + let mut stream = Self { + fd, + queue, + resolution: actual_resolution, + format: actual_format, + stride, + timeout, + mappings, + }; + + stream.queue_all_buffers()?; + ioctl::streamon(&stream.fd, stream.queue).map_err(|e| { + AppError::VideoError(format!("Failed to start capture stream: {}", e)) + })?; + + Ok(stream) + } + + pub fn resolution(&self) -> Resolution { + self.resolution + } + + pub fn format(&self) -> PixelFormat { + self.format + } + + pub fn stride(&self) -> u32 { + self.stride + } + + pub fn next_into(&mut self, dst: &mut Vec) -> io::Result { + self.wait_ready()?; + + let dqbuf: V4l2Buffer = ioctl::dqbuf(&self.fd, self.queue).map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("dqbuf failed: {}", e)) + })?; + let index = dqbuf.as_v4l2_buffer().index as usize; + let sequence = dqbuf.as_v4l2_buffer().sequence as u64; + + let mut total = 0usize; + for (plane_idx, plane) in dqbuf.planes_iter().enumerate() { + let bytes_used = *plane.bytesused as usize; + let data_offset = plane.data_offset.copied().unwrap_or(0) as usize; + if bytes_used == 0 { + continue; + } + let mapping = &self.mappings[index][plane_idx]; + let start = data_offset.min(mapping.len()); + let end = (data_offset + bytes_used).min(mapping.len()); + total += end.saturating_sub(start); + } + + dst.resize(total, 0); + let mut cursor = 0usize; + for (plane_idx, plane) in dqbuf.planes_iter().enumerate() { + let bytes_used = *plane.bytesused as usize; + let data_offset = plane.data_offset.copied().unwrap_or(0) as usize; + if bytes_used == 0 { + continue; + } + let mapping = &self.mappings[index][plane_idx]; + let start = data_offset.min(mapping.len()); + let end = (data_offset + bytes_used).min(mapping.len()); + let len = end.saturating_sub(start); + if len == 0 { + continue; + } + dst[cursor..cursor + len].copy_from_slice(&mapping[start..end]); + cursor += len; + } + + self.queue_buffer(index as u32) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; + + Ok(CaptureMeta { + bytes_used: total, + sequence, + }) + } + + fn wait_ready(&self) -> io::Result<()> { + if self.timeout.is_zero() { + return Ok(()); + } + let mut fds = [PollFd::new(self.fd.as_fd(), PollFlags::POLLIN)]; + let timeout_ms = self.timeout.as_millis().min(u16::MAX as u128) as u16; + let ready = poll(&mut fds, PollTimeout::from(timeout_ms))?; + if ready == 0 { + return Err(io::Error::new(io::ErrorKind::TimedOut, "capture timeout")); + } + Ok(()) + } + + fn queue_all_buffers(&mut self) -> Result<()> { + for index in 0..self.mappings.len() as u32 { + self.queue_buffer(index)?; + } + Ok(()) + } + + fn queue_buffer(&mut self, index: u32) -> Result<()> { + let handle = MmapHandle::default(); + let planes = self.mappings[index as usize] + .iter() + .map(|mapping| { + let mut plane = QBufPlane::new_from_handle(&handle, 0); + plane.0.length = mapping.len() as u32; + plane + }) + .collect(); + let mut qbuf: QBuffer = QBuffer::new(self.queue, index); + qbuf.planes = planes; + ioctl::qbuf::<_, ()>(&self.fd, qbuf) + .map_err(|e| AppError::VideoError(format!("Failed to queue buffer: {}", e)))?; + Ok(()) + } +} + +impl Drop for V4l2rCaptureStream { + fn drop(&mut self) { + if let Err(e) = ioctl::streamoff(&self.fd, self.queue) { + debug!("Failed to stop capture stream: {}", e); + } + } +} + +fn set_fps(fd: &File, queue: QueueType, fps: u32) -> Result<()> { + let mut params = unsafe { std::mem::zeroed::() }; + params.type_ = queue as u32; + params.parm = v4l2_streamparm__bindgen_ty_1 { + capture: v4l2r::bindings::v4l2_captureparm { + timeperframe: v4l2r::bindings::v4l2_fract { + numerator: 1, + denominator: fps, + }, + ..unsafe { std::mem::zeroed() } + }, + }; + + let _actual: v4l2_streamparm = ioctl::s_parm(fd, params) + .map_err(|e| AppError::VideoError(format!("Failed to set FPS: {}", e)))?; + Ok(()) +}