fix(video): v4l path + webrtc h264 startup diagnostics

This commit is contained in:
a15355447898a
2026-03-01 01:24:26 +08:00
parent bd17f8d0f8
commit 4f2fb534a4
12 changed files with 856 additions and 797 deletions

View File

@@ -3,11 +3,9 @@
//! This module provides a high-level interface for video capture and streaming,
//! managing the lifecycle of the capture thread and MJPEG/WebRTC distribution.
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
use tracing::{debug, error, info, trace, warn};
@@ -17,8 +15,12 @@ use super::frame::{FrameBuffer, FrameBufferPool, VideoFrame};
use crate::error::{AppError, Result};
use crate::events::{EventBus, SystemEvent};
use crate::stream::MjpegStreamHandler;
use crate::utils::LogThrottler;
use crate::video::v4l2r_capture::V4l2rCaptureStream;
use v4l::buffer::Type as BufferType;
use v4l::io::traits::CaptureStream;
use v4l::prelude::*;
use v4l::video::capture::Parameters;
use v4l::video::Capture;
use v4l::Format;
/// Minimum valid frame size for capture
const MIN_CAPTURE_FRAME_SIZE: usize = 128;
@@ -571,9 +573,11 @@ impl Streamer {
break;
}
}
} else if zero_since.is_some() {
info!("Clients reconnected, canceling auto-pause");
zero_since = None;
} else {
if zero_since.is_some() {
info!("Clients reconnected, canceling auto-pause");
zero_since = None;
}
}
}
});
@@ -628,7 +632,8 @@ impl Streamer {
}
};
let mut stream_opt: Option<V4l2rCaptureStream> = None;
let mut device_opt: Option<Device> = None;
let mut format_opt: Option<Format> = None;
let mut last_error: Option<String> = None;
for attempt in 0..MAX_RETRIES {
@@ -637,18 +642,8 @@ impl Streamer {
return;
}
match V4l2rCaptureStream::open(
&device_path,
config.resolution,
config.format,
config.fps,
BUFFER_COUNT,
Duration::from_secs(2),
) {
Ok(stream) => {
stream_opt = Some(stream);
break;
}
let device = match Device::with_path(&device_path) {
Ok(d) => d,
Err(e) => {
let err_str = e.to_string();
if err_str.contains("busy") || err_str.contains("resource") {
@@ -665,12 +660,42 @@ impl Streamer {
last_error = Some(err_str);
break;
}
};
let requested = Format::new(
config.resolution.width,
config.resolution.height,
config.format.to_fourcc(),
);
match device.set_format(&requested) {
Ok(actual) => {
device_opt = Some(device);
format_opt = Some(actual);
break;
}
Err(e) => {
let err_str = e.to_string();
if err_str.contains("busy") || err_str.contains("resource") {
warn!(
"Device busy on set_format attempt {}/{}, retrying in {}ms...",
attempt + 1,
MAX_RETRIES,
RETRY_DELAY_MS
);
std::thread::sleep(std::time::Duration::from_millis(RETRY_DELAY_MS));
last_error = Some(err_str);
continue;
}
last_error = Some(err_str);
break;
}
}
}
let mut stream = match stream_opt {
Some(stream) => stream,
None => {
let (device, actual_format) = match (device_opt, format_opt) {
(Some(d), Some(f)) => (d, f),
_ => {
error!(
"Failed to open device {:?}: {}",
device_path,
@@ -684,35 +709,42 @@ impl Streamer {
}
};
let resolution = stream.resolution();
let pixel_format = stream.format();
let stride = stream.stride();
info!(
"Capture format: {}x{} {:?} stride={}",
resolution.width, resolution.height, pixel_format, stride
actual_format.width, actual_format.height, actual_format.fourcc, actual_format.stride
);
let resolution = Resolution::new(actual_format.width, actual_format.height);
let pixel_format =
PixelFormat::from_fourcc(actual_format.fourcc).unwrap_or(config.format);
if config.fps > 0 {
if let Err(e) = device.set_params(&Parameters::with_fps(config.fps)) {
warn!("Failed to set hardware FPS: {}", e);
}
}
let mut stream =
match MmapStream::with_buffers(&device, BufferType::VideoCapture, BUFFER_COUNT) {
Ok(s) => s,
Err(e) => {
error!("Failed to create capture stream: {}", e);
self.mjpeg_handler.set_offline();
set_state(StreamerState::Error);
self.direct_active.store(false, Ordering::SeqCst);
self.current_fps.store(0, Ordering::Relaxed);
return;
}
};
let buffer_pool = Arc::new(FrameBufferPool::new(BUFFER_COUNT.max(4) as usize));
let mut signal_present = true;
let mut sequence: u64 = 0;
let mut validate_counter: u64 = 0;
let mut idle_since: Option<std::time::Instant> = None;
let mut fps_frame_count: u64 = 0;
let mut last_fps_time = std::time::Instant::now();
let capture_error_throttler = LogThrottler::with_secs(5);
let mut suppressed_capture_errors: HashMap<String, u64> = HashMap::new();
let classify_capture_error = |err: &std::io::Error| -> String {
let message = err.to_string();
if message.contains("dqbuf failed") && message.contains("EINVAL") {
"capture_dqbuf_einval".to_string()
} else if message.contains("dqbuf failed") {
"capture_dqbuf".to_string()
} else {
format!("capture_{:?}", err.kind())
}
};
while !self.direct_stop.load(Ordering::Relaxed) {
let mjpeg_clients = self.mjpeg_handler.client_count();
@@ -736,9 +768,8 @@ impl Streamer {
idle_since = None;
}
let mut owned = buffer_pool.take(MIN_CAPTURE_FRAME_SIZE);
let meta = match stream.next_into(&mut owned) {
Ok(meta) => meta,
let (buf, meta) = match stream.next() {
Ok(frame_data) => frame_data,
Err(e) => {
if e.kind() == std::io::ErrorKind::TimedOut {
if signal_present {
@@ -780,43 +811,35 @@ impl Streamer {
break;
}
let key = classify_capture_error(&e);
if capture_error_throttler.should_log(&key) {
let suppressed = suppressed_capture_errors.remove(&key).unwrap_or(0);
if suppressed > 0 {
error!("Capture error: {} (suppressed {} repeats)", e, suppressed);
} else {
error!("Capture error: {}", e);
}
} else {
let counter = suppressed_capture_errors.entry(key).or_insert(0);
*counter = counter.saturating_add(1);
}
error!("Capture error: {}", e);
continue;
}
};
let frame_size = meta.bytes_used;
let frame_size = meta.bytesused as usize;
if frame_size < MIN_CAPTURE_FRAME_SIZE {
continue;
}
validate_counter = validate_counter.wrapping_add(1);
if pixel_format.is_compressed()
&& validate_counter.is_multiple_of(JPEG_VALIDATE_INTERVAL)
&& !VideoFrame::is_valid_jpeg_bytes(&owned[..frame_size])
&& validate_counter % JPEG_VALIDATE_INTERVAL == 0
&& !VideoFrame::is_valid_jpeg_bytes(&buf[..frame_size])
{
continue;
}
owned.truncate(frame_size);
let mut owned = buffer_pool.take(frame_size);
owned.resize(frame_size, 0);
owned[..frame_size].copy_from_slice(&buf[..frame_size]);
let frame = VideoFrame::from_pooled(
Arc::new(FrameBuffer::new(owned, Some(buffer_pool.clone()))),
resolution,
pixel_format,
stride,
meta.sequence,
actual_format.stride,
sequence,
);
sequence = sequence.wrapping_add(1);
if !signal_present {
signal_present = true;
@@ -962,7 +985,7 @@ impl Streamer {
*streamer.state.write().await = StreamerState::Recovering;
// Publish reconnecting event (every 5 attempts to avoid spam)
if attempt == 1 || attempt.is_multiple_of(5) {
if attempt == 1 || attempt % 5 == 0 {
streamer
.publish_event(SystemEvent::StreamReconnecting {
device: device_path.clone(),