CSI 采集适配优化

This commit is contained in:
mofeng-git
2026-04-11 20:26:33 +08:00
parent 2d81a071e5
commit eecbc0fc13
5 changed files with 549 additions and 198 deletions

View File

@@ -3,20 +3,64 @@
//! Manages video frame distribution and per-client statistics.
use arc_swap::ArcSwap;
use bytes::Bytes;
use parking_lot::Mutex as ParkingMutex;
use parking_lot::RwLock as ParkingRwLock;
use std::collections::{HashMap, VecDeque};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::sync::{Arc, OnceLock};
use std::time::{Duration, Instant};
use tokio::sync::broadcast;
use tracing::{debug, info, warn};
use crate::video::encoder::traits::{Encoder, EncoderConfig};
use crate::video::encoder::JpegEncoder;
use crate::video::format::PixelFormat;
use crate::video::format::{PixelFormat, Resolution};
use crate::video::VideoFrame;
/// Cached "no signal" placeholder JPEG (640×360 dark-gray image).
/// Generated once on first use and reused for all NoSignal frames.
static NO_SIGNAL_JPEG: OnceLock<Bytes> = OnceLock::new();
/// Generate a minimal "no signal" JPEG (640×360, dark gray background).
/// Uses turbojpeg directly to produce a valid JPEG without additional deps.
fn generate_no_signal_jpeg() -> Bytes {
const W: usize = 640;
const H: usize = 360;
let y_size = W * H;
let uv_size = y_size / 4;
let mut i420 = vec![0u8; y_size + uv_size * 2];
// Y = 32 (dark gray, above the 16 black floor so it is clearly visible)
i420[..y_size].fill(32);
// U and V = 128 (neutral chroma → no colour tint)
i420[y_size..].fill(128);
match turbojpeg::Compressor::new() {
Ok(mut compressor) => {
let _ = compressor.set_quality(70);
let yuv = turbojpeg::YuvImage {
pixels: i420.as_slice(),
width: W,
height: H,
align: 1,
subsamp: turbojpeg::Subsamp::Sub2x2,
};
match compressor.compress_yuv_to_vec(yuv) {
Ok(jpeg) => Bytes::from(jpeg),
Err(_) => Bytes::new(),
}
}
Err(_) => Bytes::new(),
}
}
/// Return a reference to the cached no-signal JPEG bytes.
fn no_signal_jpeg() -> &'static Bytes {
NO_SIGNAL_JPEG.get_or_init(generate_no_signal_jpeg)
}
/// Client ID type (UUID string)
pub type ClientId = String;
@@ -354,6 +398,34 @@ impl MjpegStreamHandler {
let _ = self.frame_notify.send(());
}
/// Push a "no signal" placeholder JPEG to all connected MJPEG clients.
///
/// Unlike `set_offline()`, this keeps the stream marked as **online** so
/// that HTTP clients remain connected and see the placeholder image instead
/// of a black/empty screen. Call this whenever the capture thread enters
/// the `NoSignal` state.
pub fn push_no_signal_placeholder(&self) {
let jpeg = no_signal_jpeg();
if jpeg.is_empty() {
return;
}
let frame = VideoFrame::new(
jpeg.clone(),
Resolution::new(640, 360),
PixelFormat::Mjpeg,
0,
self.sequence.fetch_add(1, Ordering::Relaxed),
);
// Store as current frame so late-joining clients get it immediately.
self.current_frame.store(Arc::new(Some(frame)));
// Ensure stream is marked online so the HTTP handler keeps iterating.
self.online.store(true, Ordering::SeqCst);
// Wake up waiting HTTP clients.
let _ = self.frame_notify.send(());
}
/// Set stream online (called when streaming starts)
pub fn set_online(&self) {
self.online.store(true, Ordering::SeqCst);

View File

@@ -563,6 +563,16 @@ impl VideoDevice {
Some((bt.width, bt.height, dv_timings_fps(&bt)))
}
/// Query current DV timings resolution for runtime change detection.
///
/// Returns the active resolution reported by DV timings (used by CSI/HDMI bridges
/// such as TC358743, rk_hdmirx, etc.). Returns `None` when the device does not
/// support DV timings or no signal is detected.
pub fn query_dv_timings_resolution(&self) -> Option<Resolution> {
let (w, h, _fps) = self.current_dv_timings_mode()?;
Some(Resolution::new(w, h))
}
fn current_format_resolution(&self) -> Option<(u32, u32)> {
let format = self.get_format().ok()?;
if format.width == 0 || format.height == 0 {

View File

@@ -31,8 +31,12 @@ use self::encoder_state::{build_encoder_state, EncoderThreadState};
/// Grace period before auto-stopping pipeline when no subscribers (in seconds)
const AUTO_STOP_GRACE_PERIOD_SECS: u64 = 3;
/// Restart capture stream after this many consecutive timeouts.
/// After this many consecutive timeouts, log a prominent warning.
const CAPTURE_TIMEOUT_RESTART_THRESHOLD: u32 = 5;
/// After this many consecutive timeouts, actually stop the pipeline.
/// Setting this high (60 × 2 s poll = ~120 s) keeps WebRTC sessions alive
/// while the source is temporarily unavailable (e.g. resolution change/reboot).
const CAPTURE_TIMEOUT_STOP_THRESHOLD: u32 = 60;
/// Minimum valid frame size for capture
const MIN_CAPTURE_FRAME_SIZE: usize = 128;
/// Validate every JPEG frame during startup to avoid poisoning HW decoders
@@ -576,9 +580,16 @@ impl SharedVideoPipeline {
consecutive_timeouts = consecutive_timeouts.saturating_add(1);
warn!("Capture timeout - no signal?");
if consecutive_timeouts >= CAPTURE_TIMEOUT_RESTART_THRESHOLD {
if consecutive_timeouts == CAPTURE_TIMEOUT_RESTART_THRESHOLD {
warn!(
"Capture timed out {} consecutive times, restarting video pipeline",
"Capture timed out {} consecutive times no signal?",
consecutive_timeouts
);
}
if consecutive_timeouts >= CAPTURE_TIMEOUT_STOP_THRESHOLD {
warn!(
"Capture timed out {} consecutive times, stopping video pipeline",
consecutive_timeouts
);
let _ = pipeline.running.send(false);

View File

@@ -11,7 +11,7 @@ use std::time::Duration;
use tokio::sync::RwLock;
use tracing::{debug, error, info, trace, warn};
use super::device::{enumerate_devices, find_best_device, VideoDeviceInfo};
use super::device::{enumerate_devices, find_best_device, VideoDevice, VideoDeviceInfo};
use super::format::{PixelFormat, Resolution};
use super::frame::{FrameBuffer, FrameBufferPool, VideoFrame};
use super::is_rk_hdmirx_device;
@@ -620,12 +620,22 @@ impl Streamer {
Ok(())
}
/// Direct capture loop for MJPEG mode (single loop, no broadcast)
fn run_direct_capture(self: Arc<Self>, device_path: PathBuf, config: StreamerConfig) {
/// Direct capture loop for MJPEG mode.
///
/// The outer `'session` loop allows "soft restarts": when no signal has been
/// detected for `NOSIGNAL_SOFT_RESTART_SECS` the capture stream is closed and
/// re-opened (re-probing format/resolution) without going through the full
/// DeviceLost recovery path. This handles the common CSI/HDMI-bridge case where
/// the source switches resolution and the driver requires a new `s_fmt` call.
fn run_direct_capture(self: Arc<Self>, device_path: PathBuf, _initial_config: StreamerConfig) {
const MAX_RETRIES: u32 = 5;
const RETRY_DELAY_MS: u64 = 200;
const IDLE_STOP_DELAY_SECS: u64 = 5;
const BUFFER_COUNT: u32 = 2;
/// After this many seconds without signal, close+re-open the device.
const NOSIGNAL_SOFT_RESTART_SECS: u64 = 8;
/// Placeholder frame re-send interval while in NoSignal state (iterations of 100 ms).
const NOSIGNAL_PLACEHOLDER_INTERVAL: u32 = 10; // every ~1 s
let handle = tokio::runtime::Handle::current();
let mut last_state = StreamerState::Streaming;
@@ -640,6 +650,19 @@ impl Streamer {
}
};
// How many soft-restart cycles have been attempted (for exponential back-off).
let mut no_signal_restart_count: u32 = 0;
'session: loop {
if self.direct_stop.load(Ordering::Relaxed) {
break 'session;
}
// Re-read config at the start of each session so that a re_init_device()
// call (from a previous soft-restart or recovery) is reflected here.
let config = handle.block_on(async { self.config.read().await.clone() });
// ── Open the capture stream ─────────────────────────────────────────
let mut stream_opt: Option<V4l2rCaptureStream> = None;
let mut last_error: Option<String> = None;
@@ -690,9 +713,7 @@ impl Streamer {
);
self.mjpeg_handler.set_offline();
set_state(StreamerState::Error);
self.direct_active.store(false, Ordering::SeqCst);
self.current_fps.store(0, Ordering::Relaxed);
return;
break 'session;
}
};
@@ -726,7 +747,15 @@ impl Streamer {
}
};
while !self.direct_stop.load(Ordering::Relaxed) {
// None = signal is present; Some(Instant) = when signal was first lost.
let mut no_signal_since: Option<std::time::Instant> = None;
// Counter for periodic placeholder pushes during NoSignal.
let mut nosignal_placeholder_counter: u32 = 0;
// Whether the inner 'capture loop should trigger a soft restart.
let mut need_soft_restart = false;
// ── Inner capture loop ──────────────────────────────────────────────
'capture: while !self.direct_stop.load(Ordering::Relaxed) {
let mjpeg_clients = self.mjpeg_handler.client_count();
if mjpeg_clients == 0 {
if idle_since.is_none() {
@@ -740,7 +769,7 @@ impl Streamer {
);
self.mjpeg_handler.set_offline();
set_state(StreamerState::Ready);
break;
break 'capture;
}
}
} else if idle_since.is_some() {
@@ -755,14 +784,47 @@ impl Streamer {
if e.kind() == std::io::ErrorKind::TimedOut {
if signal_present {
signal_present = false;
self.mjpeg_handler.set_offline();
// Don't call set_offline() instead keep the MJPEG stream
// alive by pushing a placeholder frame so clients stay
// connected and see the "no signal" image.
self.mjpeg_handler.push_no_signal_placeholder();
set_state(StreamerState::NoSignal);
no_signal_since = Some(std::time::Instant::now());
self.current_fps.store(0, Ordering::Relaxed);
fps_frame_count = 0;
last_fps_time = std::time::Instant::now();
nosignal_placeholder_counter = 0;
} else {
// Already in NoSignal re-send placeholder periodically so
// the HTTP keepalive timer does not expire.
nosignal_placeholder_counter =
nosignal_placeholder_counter.wrapping_add(1);
if nosignal_placeholder_counter >= NOSIGNAL_PLACEHOLDER_INTERVAL {
nosignal_placeholder_counter = 0;
self.mjpeg_handler.push_no_signal_placeholder();
}
// Soft-restart after exponential back-off.
if let Some(since) = no_signal_since {
let backoff_secs = NOSIGNAL_SOFT_RESTART_SECS
.saturating_mul(
2u64.pow(no_signal_restart_count.min(2)),
)
.min(30);
if since.elapsed().as_secs() >= backoff_secs {
info!(
"NoSignal for {}s, attempting soft restart (attempt {})",
backoff_secs,
no_signal_restart_count + 1
);
need_soft_restart = true;
break 'capture;
}
}
}
std::thread::sleep(std::time::Duration::from_millis(100));
continue;
continue 'capture;
}
let is_device_lost = match e.raw_os_error() {
@@ -789,14 +851,17 @@ impl Streamer {
streamer.start_device_recovery_internal().await;
});
});
break;
break 'capture;
}
let key = classify_capture_error(&e);
if capture_error_throttler.should_log(&key) {
let suppressed = suppressed_capture_errors.remove(&key).unwrap_or(0);
if suppressed > 0 {
error!("Capture error: {} (suppressed {} repeats)", e, suppressed);
error!(
"Capture error: {} (suppressed {} repeats)",
e, suppressed
);
} else {
error!("Capture error: {}", e);
}
@@ -804,13 +869,13 @@ impl Streamer {
let counter = suppressed_capture_errors.entry(key).or_insert(0);
*counter = counter.saturating_add(1);
}
continue;
continue 'capture;
}
};
let frame_size = meta.bytes_used;
if frame_size < MIN_CAPTURE_FRAME_SIZE {
continue;
continue 'capture;
}
validate_counter = validate_counter.wrapping_add(1);
@@ -818,7 +883,7 @@ impl Streamer {
&& validate_counter.is_multiple_of(JPEG_VALIDATE_INTERVAL)
&& !VideoFrame::is_valid_jpeg_bytes(&owned[..frame_size])
{
continue;
continue 'capture;
}
owned.truncate(frame_size);
@@ -832,7 +897,9 @@ impl Streamer {
if !signal_present {
signal_present = true;
self.mjpeg_handler.set_online();
no_signal_since = None;
no_signal_restart_count = 0;
// Stream was kept online (placeholder pushes), just update state.
set_state(StreamerState::Streaming);
}
@@ -847,15 +914,111 @@ impl Streamer {
self.current_fps
.store((current_fps * 100.0) as u32, Ordering::Relaxed);
}
} // 'capture
// ── After inner loop ────────────────────────────────────────────────
// The stream is dropped here, releasing the device FD.
drop(stream);
if self.direct_stop.load(Ordering::Relaxed) {
break 'session;
}
if !need_soft_restart {
// Normal exit (idle / device-lost / stop).
break 'session;
}
// ── Soft restart path ───────────────────────────────────────────────
no_signal_restart_count = no_signal_restart_count.saturating_add(1);
// Re-probe the device to pick up a changed resolution/format.
match VideoDevice::open_readonly(&device_path).and_then(|d| d.info()) {
Ok(device_info) => {
handle.block_on(async {
let fmt;
let res;
{
let cfg = self.config.read().await;
fmt = self
.select_format(&device_info, cfg.format)
.unwrap_or(cfg.format);
res = self
.select_resolution(&device_info, &fmt, cfg.resolution)
.unwrap_or(cfg.resolution);
}
{
let mut cfg = self.config.write().await;
cfg.format = fmt;
cfg.resolution = res;
}
*self.current_device.write().await = Some(device_info);
info!(
"Soft restart: re-probed device → {}x{} {:?}",
res.width, res.height, fmt
);
});
}
Err(e) => {
warn!("Soft restart: failed to re-probe device: {}", e);
// Brief wait before retrying to avoid spinning.
let wait = 2u64.pow(no_signal_restart_count.min(3));
std::thread::sleep(Duration::from_secs(wait));
}
}
// Reset no_signal_since so the back-off timer is fresh for the new session.
// no_signal_since will be re-set if the new session immediately times out.
// Continue 'session → re-open V4l2rCaptureStream with updated config.
} // 'session
self.direct_active.store(false, Ordering::SeqCst);
self.current_fps.store(0, Ordering::Relaxed);
}
/// Check if streaming
/// Check if streaming (or in NoSignal state — capture thread is still running)
pub async fn is_streaming(&self) -> bool {
self.state().await == StreamerState::Streaming
matches!(
self.state().await,
StreamerState::Streaming | StreamerState::NoSignal
)
}
/// Re-probe a device and update the stored config/device info.
///
/// Called during recovery or after a NoSignal soft restart so that a
/// resolution / format change on the source side is picked up before
/// the capture stream is re-opened.
pub async fn re_init_device(self: &Arc<Self>, device_path: &str) -> Result<()> {
let device = VideoDevice::open_readonly(device_path).map_err(|e| {
AppError::VideoError(format!("Cannot open device for re-init: {}", e))
})?;
let device_info = device.info()?;
let (format, resolution) = {
let config = self.config.read().await;
let fmt = self
.select_format(&device_info, config.format)
.unwrap_or(config.format);
let res = self
.select_resolution(&device_info, &fmt, config.resolution)
.unwrap_or(config.resolution);
(fmt, res)
};
{
let mut cfg = self.config.write().await;
cfg.format = format;
cfg.resolution = resolution;
}
*self.current_device.write().await = Some(device_info);
info!(
"Device re-initialized: {}x{} {:?}",
resolution.width, resolution.height, format
);
Ok(())
}
/// Get stream statistics
@@ -997,6 +1160,15 @@ impl Streamer {
continue;
}
// Re-probe device to pick up resolution/format changes
if let Err(e) = streamer.re_init_device(&device_path).await {
debug!(
"Failed to re-probe device format (attempt {}): {}",
attempt, e
);
// Don't skip device exists, try restart anyway
}
// Try to restart capture
match streamer.restart_capture().await {
Ok(_) => {

View File

@@ -567,6 +567,12 @@ const MAX_CONSECUTIVE_ERRORS = 2 // If 2+ errors in grace period, it's a real pr
let pendingWebRTCReadyGate = false
let webrtcConnectTask: Promise<boolean> | null = null
// WebRTC auto-reconnect on device-lost/recovery
let webrtcRecoveryTimerId: number | null = null
let webrtcRecoveryAttempts = 0
const MAX_WEBRTC_RECOVERY_ATTEMPTS = 8
const WEBRTC_RECOVERY_BASE_DELAY = 2000
// Last-frame overlay (prevents black flash during mode switches)
const frameOverlayUrl = ref<string | null>(null)
@@ -781,9 +787,78 @@ function handleVideoError() {
function handleStreamDeviceLost(data: { device: string; reason: string }) {
videoError.value = true
videoErrorMessage.value = t('console.deviceLostDesc', { device: data.device, reason: data.reason })
// In WebRTC mode, the pipeline will attempt to restart itself.
// Start an exponential-backoff reconnect loop so the session is
// re-established automatically once the backend is ready again.
if (videoMode.value !== 'mjpeg') {
scheduleWebRTCRecovery()
}
}
function scheduleWebRTCRecovery() {
// Clear any previous timer
if (webrtcRecoveryTimerId !== null) {
clearTimeout(webrtcRecoveryTimerId)
webrtcRecoveryTimerId = null
}
if (webrtcRecoveryAttempts >= MAX_WEBRTC_RECOVERY_ATTEMPTS) {
console.warn('[Recovery] Max WebRTC recovery attempts reached, giving up')
webrtcRecoveryAttempts = 0
return
}
const delay = Math.min(
WEBRTC_RECOVERY_BASE_DELAY * Math.pow(2, webrtcRecoveryAttempts),
30000,
)
console.log(
`[Recovery] Scheduling WebRTC reconnect attempt ${webrtcRecoveryAttempts + 1}/${MAX_WEBRTC_RECOVERY_ATTEMPTS} in ${delay}ms`,
)
webrtcRecoveryTimerId = window.setTimeout(async () => {
webrtcRecoveryTimerId = null
webrtcRecoveryAttempts++
// Only reconnect if we are still in a WebRTC mode and error state
if (videoMode.value === 'mjpeg' || !videoError.value) {
webrtcRecoveryAttempts = 0
return
}
console.log(`[Recovery] Attempting WebRTC reconnect (attempt ${webrtcRecoveryAttempts})`)
try {
await webrtc.disconnect()
const ok = await connectWebRTCSerial('device-recovery')
if (ok) {
console.log('[Recovery] WebRTC reconnected successfully')
videoError.value = false
videoErrorMessage.value = ''
webrtcRecoveryAttempts = 0
} else {
// Retry
scheduleWebRTCRecovery()
}
} catch {
scheduleWebRTCRecovery()
}
}, delay)
}
function cancelWebRTCRecovery() {
if (webrtcRecoveryTimerId !== null) {
clearTimeout(webrtcRecoveryTimerId)
webrtcRecoveryTimerId = null
}
webrtcRecoveryAttempts = 0
}
function handleStreamRecovered(_data: { device: string }) {
// Cancel any pending recovery timer backend is back
cancelWebRTCRecovery()
// Reset video error state
videoError.value = false
videoErrorMessage.value = ''
@@ -918,6 +993,16 @@ function handleStreamStateChanged(data: any) {
if (data.state === 'error') {
videoError.value = true
videoErrorMessage.value = t('console.streamError')
} else if (data.state === 'recovering' && videoMode.value !== 'mjpeg') {
// Backend is in the DeviceLost recovery loop; start WebRTC reconnect if not already scheduled.
if (webrtcRecoveryTimerId === null && webrtcRecoveryAttempts === 0) {
scheduleWebRTCRecovery()
}
} else if (data.state === 'streaming' || data.state === 'no_signal') {
// Backend stream is alive; cancel any pending recovery timers.
if (data.state === 'streaming') {
cancelWebRTCRecovery()
}
}
}
@@ -2224,6 +2309,7 @@ onUnmounted(() => {
clearTimeout(gracePeriodTimeoutId)
gracePeriodTimeoutId = null
}
cancelWebRTCRecovery()
videoSession.clearWaiters()
// Reset counters