From eecbc0fc13eb31298f470d495b27f34743b50825 Mon Sep 17 00:00:00 2001 From: mofeng-git Date: Sat, 11 Apr 2026 20:26:33 +0800 Subject: [PATCH] =?UTF-8?q?CSI=20=E9=87=87=E9=9B=86=E9=80=82=E9=85=8D?= =?UTF-8?q?=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/stream/mjpeg.rs | 76 +++- src/video/device.rs | 10 + src/video/shared_video_pipeline.rs | 17 +- src/video/streamer.rs | 558 +++++++++++++++++++---------- web/src/views/ConsoleView.vue | 86 +++++ 5 files changed, 549 insertions(+), 198 deletions(-) diff --git a/src/stream/mjpeg.rs b/src/stream/mjpeg.rs index 12b360ab..26c2ec61 100644 --- a/src/stream/mjpeg.rs +++ b/src/stream/mjpeg.rs @@ -3,20 +3,64 @@ //! Manages video frame distribution and per-client statistics. use arc_swap::ArcSwap; +use bytes::Bytes; use parking_lot::Mutex as ParkingMutex; use parking_lot::RwLock as ParkingRwLock; use std::collections::{HashMap, VecDeque}; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, OnceLock}; use std::time::{Duration, Instant}; use tokio::sync::broadcast; use tracing::{debug, info, warn}; use crate::video::encoder::traits::{Encoder, EncoderConfig}; use crate::video::encoder::JpegEncoder; -use crate::video::format::PixelFormat; +use crate::video::format::{PixelFormat, Resolution}; use crate::video::VideoFrame; +/// Cached "no signal" placeholder JPEG (640×360 dark-gray image). +/// Generated once on first use and reused for all NoSignal frames. +static NO_SIGNAL_JPEG: OnceLock = OnceLock::new(); + +/// Generate a minimal "no signal" JPEG (640×360, dark gray background). +/// Uses turbojpeg directly to produce a valid JPEG without additional deps. +fn generate_no_signal_jpeg() -> Bytes { + const W: usize = 640; + const H: usize = 360; + + let y_size = W * H; + let uv_size = y_size / 4; + let mut i420 = vec![0u8; y_size + uv_size * 2]; + + // Y = 32 (dark gray, above the 16 black floor so it is clearly visible) + i420[..y_size].fill(32); + // U and V = 128 (neutral chroma → no colour tint) + i420[y_size..].fill(128); + + match turbojpeg::Compressor::new() { + Ok(mut compressor) => { + let _ = compressor.set_quality(70); + let yuv = turbojpeg::YuvImage { + pixels: i420.as_slice(), + width: W, + height: H, + align: 1, + subsamp: turbojpeg::Subsamp::Sub2x2, + }; + match compressor.compress_yuv_to_vec(yuv) { + Ok(jpeg) => Bytes::from(jpeg), + Err(_) => Bytes::new(), + } + } + Err(_) => Bytes::new(), + } +} + +/// Return a reference to the cached no-signal JPEG bytes. +fn no_signal_jpeg() -> &'static Bytes { + NO_SIGNAL_JPEG.get_or_init(generate_no_signal_jpeg) +} + /// Client ID type (UUID string) pub type ClientId = String; @@ -354,6 +398,34 @@ impl MjpegStreamHandler { let _ = self.frame_notify.send(()); } + /// Push a "no signal" placeholder JPEG to all connected MJPEG clients. + /// + /// Unlike `set_offline()`, this keeps the stream marked as **online** so + /// that HTTP clients remain connected and see the placeholder image instead + /// of a black/empty screen. Call this whenever the capture thread enters + /// the `NoSignal` state. + pub fn push_no_signal_placeholder(&self) { + let jpeg = no_signal_jpeg(); + if jpeg.is_empty() { + return; + } + + let frame = VideoFrame::new( + jpeg.clone(), + Resolution::new(640, 360), + PixelFormat::Mjpeg, + 0, + self.sequence.fetch_add(1, Ordering::Relaxed), + ); + + // Store as current frame so late-joining clients get it immediately. + self.current_frame.store(Arc::new(Some(frame))); + // Ensure stream is marked online so the HTTP handler keeps iterating. + self.online.store(true, Ordering::SeqCst); + // Wake up waiting HTTP clients. + let _ = self.frame_notify.send(()); + } + /// Set stream online (called when streaming starts) pub fn set_online(&self) { self.online.store(true, Ordering::SeqCst); diff --git a/src/video/device.rs b/src/video/device.rs index baca3f2f..2c517cf3 100644 --- a/src/video/device.rs +++ b/src/video/device.rs @@ -563,6 +563,16 @@ impl VideoDevice { Some((bt.width, bt.height, dv_timings_fps(&bt))) } + /// Query current DV timings resolution for runtime change detection. + /// + /// Returns the active resolution reported by DV timings (used by CSI/HDMI bridges + /// such as TC358743, rk_hdmirx, etc.). Returns `None` when the device does not + /// support DV timings or no signal is detected. + pub fn query_dv_timings_resolution(&self) -> Option { + let (w, h, _fps) = self.current_dv_timings_mode()?; + Some(Resolution::new(w, h)) + } + fn current_format_resolution(&self) -> Option<(u32, u32)> { let format = self.get_format().ok()?; if format.width == 0 || format.height == 0 { diff --git a/src/video/shared_video_pipeline.rs b/src/video/shared_video_pipeline.rs index 664da85c..cf84d1a2 100644 --- a/src/video/shared_video_pipeline.rs +++ b/src/video/shared_video_pipeline.rs @@ -31,8 +31,12 @@ use self::encoder_state::{build_encoder_state, EncoderThreadState}; /// Grace period before auto-stopping pipeline when no subscribers (in seconds) const AUTO_STOP_GRACE_PERIOD_SECS: u64 = 3; -/// Restart capture stream after this many consecutive timeouts. +/// After this many consecutive timeouts, log a prominent warning. const CAPTURE_TIMEOUT_RESTART_THRESHOLD: u32 = 5; +/// After this many consecutive timeouts, actually stop the pipeline. +/// Setting this high (60 × 2 s poll = ~120 s) keeps WebRTC sessions alive +/// while the source is temporarily unavailable (e.g. resolution change/reboot). +const CAPTURE_TIMEOUT_STOP_THRESHOLD: u32 = 60; /// Minimum valid frame size for capture const MIN_CAPTURE_FRAME_SIZE: usize = 128; /// Validate every JPEG frame during startup to avoid poisoning HW decoders @@ -576,9 +580,16 @@ impl SharedVideoPipeline { consecutive_timeouts = consecutive_timeouts.saturating_add(1); warn!("Capture timeout - no signal?"); - if consecutive_timeouts >= CAPTURE_TIMEOUT_RESTART_THRESHOLD { + if consecutive_timeouts == CAPTURE_TIMEOUT_RESTART_THRESHOLD { warn!( - "Capture timed out {} consecutive times, restarting video pipeline", + "Capture timed out {} consecutive times – no signal?", + consecutive_timeouts + ); + } + + if consecutive_timeouts >= CAPTURE_TIMEOUT_STOP_THRESHOLD { + warn!( + "Capture timed out {} consecutive times, stopping video pipeline", consecutive_timeouts ); let _ = pipeline.running.send(false); diff --git a/src/video/streamer.rs b/src/video/streamer.rs index a322766d..666e6144 100644 --- a/src/video/streamer.rs +++ b/src/video/streamer.rs @@ -11,7 +11,7 @@ use std::time::Duration; use tokio::sync::RwLock; use tracing::{debug, error, info, trace, warn}; -use super::device::{enumerate_devices, find_best_device, VideoDeviceInfo}; +use super::device::{enumerate_devices, find_best_device, VideoDevice, VideoDeviceInfo}; use super::format::{PixelFormat, Resolution}; use super::frame::{FrameBuffer, FrameBufferPool, VideoFrame}; use super::is_rk_hdmirx_device; @@ -620,12 +620,22 @@ impl Streamer { Ok(()) } - /// Direct capture loop for MJPEG mode (single loop, no broadcast) - fn run_direct_capture(self: Arc, device_path: PathBuf, config: StreamerConfig) { + /// Direct capture loop for MJPEG mode. + /// + /// The outer `'session` loop allows "soft restarts": when no signal has been + /// detected for `NOSIGNAL_SOFT_RESTART_SECS` the capture stream is closed and + /// re-opened (re-probing format/resolution) without going through the full + /// DeviceLost recovery path. This handles the common CSI/HDMI-bridge case where + /// the source switches resolution and the driver requires a new `s_fmt` call. + fn run_direct_capture(self: Arc, device_path: PathBuf, _initial_config: StreamerConfig) { const MAX_RETRIES: u32 = 5; const RETRY_DELAY_MS: u64 = 200; const IDLE_STOP_DELAY_SECS: u64 = 5; const BUFFER_COUNT: u32 = 2; + /// After this many seconds without signal, close+re-open the device. + const NOSIGNAL_SOFT_RESTART_SECS: u64 = 8; + /// Placeholder frame re-send interval while in NoSignal state (iterations of 100 ms). + const NOSIGNAL_PLACEHOLDER_INTERVAL: u32 = 10; // every ~1 s let handle = tokio::runtime::Handle::current(); let mut last_state = StreamerState::Streaming; @@ -640,222 +650,375 @@ impl Streamer { } }; - let mut stream_opt: Option = None; - let mut last_error: Option = None; + // How many soft-restart cycles have been attempted (for exponential back-off). + let mut no_signal_restart_count: u32 = 0; - for attempt in 0..MAX_RETRIES { + 'session: loop { if self.direct_stop.load(Ordering::Relaxed) { - self.direct_active.store(false, Ordering::SeqCst); - return; + break 'session; } - match V4l2rCaptureStream::open( - &device_path, - config.resolution, - config.format, - config.fps, - BUFFER_COUNT, - Duration::from_secs(2), - ) { - Ok(stream) => { - stream_opt = Some(stream); - break; + // Re-read config at the start of each session so that a re_init_device() + // call (from a previous soft-restart or recovery) is reflected here. + let config = handle.block_on(async { self.config.read().await.clone() }); + + // ── Open the capture stream ───────────────────────────────────────── + let mut stream_opt: Option = None; + let mut last_error: Option = None; + + for attempt in 0..MAX_RETRIES { + if self.direct_stop.load(Ordering::Relaxed) { + self.direct_active.store(false, Ordering::SeqCst); + return; } - Err(e) => { - let err_str = e.to_string(); - if err_str.contains("busy") || err_str.contains("resource") { - warn!( - "Device busy on attempt {}/{}, retrying in {}ms...", - attempt + 1, - MAX_RETRIES, - RETRY_DELAY_MS - ); - std::thread::sleep(std::time::Duration::from_millis(RETRY_DELAY_MS)); + + match V4l2rCaptureStream::open( + &device_path, + config.resolution, + config.format, + config.fps, + BUFFER_COUNT, + Duration::from_secs(2), + ) { + Ok(stream) => { + stream_opt = Some(stream); + break; + } + Err(e) => { + let err_str = e.to_string(); + if err_str.contains("busy") || err_str.contains("resource") { + warn!( + "Device busy on attempt {}/{}, retrying in {}ms...", + attempt + 1, + MAX_RETRIES, + RETRY_DELAY_MS + ); + std::thread::sleep(std::time::Duration::from_millis(RETRY_DELAY_MS)); + last_error = Some(err_str); + continue; + } last_error = Some(err_str); - continue; - } - last_error = Some(err_str); - break; - } - } - } - - let mut stream = match stream_opt { - Some(stream) => stream, - None => { - error!( - "Failed to open device {:?}: {}", - device_path, - last_error.unwrap_or_else(|| "unknown error".to_string()) - ); - self.mjpeg_handler.set_offline(); - set_state(StreamerState::Error); - self.direct_active.store(false, Ordering::SeqCst); - self.current_fps.store(0, Ordering::Relaxed); - return; - } - }; - - let resolution = stream.resolution(); - let pixel_format = stream.format(); - let stride = stream.stride(); - - info!( - "Capture format: {}x{} {:?} stride={}", - resolution.width, resolution.height, pixel_format, stride - ); - - let buffer_pool = Arc::new(FrameBufferPool::new(BUFFER_COUNT.max(4) as usize)); - let mut signal_present = true; - let mut validate_counter: u64 = 0; - let mut idle_since: Option = None; - - let mut fps_frame_count: u64 = 0; - let mut last_fps_time = std::time::Instant::now(); - let capture_error_throttler = LogThrottler::with_secs(5); - let mut suppressed_capture_errors: HashMap = HashMap::new(); - - let classify_capture_error = |err: &std::io::Error| -> String { - let message = err.to_string(); - if message.contains("dqbuf failed") && message.contains("EINVAL") { - "capture_dqbuf_einval".to_string() - } else if message.contains("dqbuf failed") { - "capture_dqbuf".to_string() - } else { - format!("capture_{:?}", err.kind()) - } - }; - - while !self.direct_stop.load(Ordering::Relaxed) { - let mjpeg_clients = self.mjpeg_handler.client_count(); - if mjpeg_clients == 0 { - if idle_since.is_none() { - idle_since = Some(std::time::Instant::now()); - trace!("No active video consumers, starting idle timer"); - } else if let Some(since) = idle_since { - if since.elapsed().as_secs() >= IDLE_STOP_DELAY_SECS { - info!( - "No active video consumers for {}s, stopping capture", - IDLE_STOP_DELAY_SECS - ); - self.mjpeg_handler.set_offline(); - set_state(StreamerState::Ready); break; } } - } else if idle_since.is_some() { - trace!("Video consumers active, resetting idle timer"); - idle_since = None; } - let mut owned = buffer_pool.take(MIN_CAPTURE_FRAME_SIZE); - let meta = match stream.next_into(&mut owned) { - Ok(meta) => meta, - Err(e) => { - if e.kind() == std::io::ErrorKind::TimedOut { - if signal_present { - signal_present = false; - self.mjpeg_handler.set_offline(); - set_state(StreamerState::NoSignal); - self.current_fps.store(0, Ordering::Relaxed); - fps_frame_count = 0; - last_fps_time = std::time::Instant::now(); - } - std::thread::sleep(std::time::Duration::from_millis(100)); - continue; - } - - let is_device_lost = match e.raw_os_error() { - Some(6) => true, // ENXIO - Some(19) => true, // ENODEV - Some(5) => true, // EIO - Some(32) => true, // EPIPE - Some(108) => true, // ESHUTDOWN - _ => false, - }; - - if is_device_lost { - error!("Video device lost: {} - {}", device_path.display(), e); - self.mjpeg_handler.set_offline(); - handle.block_on(async { - *self.last_lost_device.write().await = - Some(device_path.display().to_string()); - *self.last_lost_reason.write().await = Some(e.to_string()); - }); - set_state(StreamerState::DeviceLost); - handle.block_on(async { - let streamer = Arc::clone(&self); - tokio::spawn(async move { - streamer.start_device_recovery_internal().await; - }); - }); - break; - } - - let key = classify_capture_error(&e); - if capture_error_throttler.should_log(&key) { - let suppressed = suppressed_capture_errors.remove(&key).unwrap_or(0); - if suppressed > 0 { - error!("Capture error: {} (suppressed {} repeats)", e, suppressed); - } else { - error!("Capture error: {}", e); - } - } else { - let counter = suppressed_capture_errors.entry(key).or_insert(0); - *counter = counter.saturating_add(1); - } - continue; + let mut stream = match stream_opt { + Some(stream) => stream, + None => { + error!( + "Failed to open device {:?}: {}", + device_path, + last_error.unwrap_or_else(|| "unknown error".to_string()) + ); + self.mjpeg_handler.set_offline(); + set_state(StreamerState::Error); + break 'session; } }; - let frame_size = meta.bytes_used; - if frame_size < MIN_CAPTURE_FRAME_SIZE { - continue; - } + let resolution = stream.resolution(); + let pixel_format = stream.format(); + let stride = stream.stride(); - validate_counter = validate_counter.wrapping_add(1); - if pixel_format.is_compressed() - && validate_counter.is_multiple_of(JPEG_VALIDATE_INTERVAL) - && !VideoFrame::is_valid_jpeg_bytes(&owned[..frame_size]) - { - continue; - } - - owned.truncate(frame_size); - let frame = VideoFrame::from_pooled( - Arc::new(FrameBuffer::new(owned, Some(buffer_pool.clone()))), - resolution, - pixel_format, - stride, - meta.sequence, + info!( + "Capture format: {}x{} {:?} stride={}", + resolution.width, resolution.height, pixel_format, stride ); - if !signal_present { - signal_present = true; - self.mjpeg_handler.set_online(); - set_state(StreamerState::Streaming); + let buffer_pool = Arc::new(FrameBufferPool::new(BUFFER_COUNT.max(4) as usize)); + let mut signal_present = true; + let mut validate_counter: u64 = 0; + let mut idle_since: Option = None; + + let mut fps_frame_count: u64 = 0; + let mut last_fps_time = std::time::Instant::now(); + let capture_error_throttler = LogThrottler::with_secs(5); + let mut suppressed_capture_errors: HashMap = HashMap::new(); + + let classify_capture_error = |err: &std::io::Error| -> String { + let message = err.to_string(); + if message.contains("dqbuf failed") && message.contains("EINVAL") { + "capture_dqbuf_einval".to_string() + } else if message.contains("dqbuf failed") { + "capture_dqbuf".to_string() + } else { + format!("capture_{:?}", err.kind()) + } + }; + + // None = signal is present; Some(Instant) = when signal was first lost. + let mut no_signal_since: Option = None; + // Counter for periodic placeholder pushes during NoSignal. + let mut nosignal_placeholder_counter: u32 = 0; + // Whether the inner 'capture loop should trigger a soft restart. + let mut need_soft_restart = false; + + // ── Inner capture loop ────────────────────────────────────────────── + 'capture: while !self.direct_stop.load(Ordering::Relaxed) { + let mjpeg_clients = self.mjpeg_handler.client_count(); + if mjpeg_clients == 0 { + if idle_since.is_none() { + idle_since = Some(std::time::Instant::now()); + trace!("No active video consumers, starting idle timer"); + } else if let Some(since) = idle_since { + if since.elapsed().as_secs() >= IDLE_STOP_DELAY_SECS { + info!( + "No active video consumers for {}s, stopping capture", + IDLE_STOP_DELAY_SECS + ); + self.mjpeg_handler.set_offline(); + set_state(StreamerState::Ready); + break 'capture; + } + } + } else if idle_since.is_some() { + trace!("Video consumers active, resetting idle timer"); + idle_since = None; + } + + let mut owned = buffer_pool.take(MIN_CAPTURE_FRAME_SIZE); + let meta = match stream.next_into(&mut owned) { + Ok(meta) => meta, + Err(e) => { + if e.kind() == std::io::ErrorKind::TimedOut { + if signal_present { + signal_present = false; + // Don't call set_offline() – instead keep the MJPEG stream + // alive by pushing a placeholder frame so clients stay + // connected and see the "no signal" image. + self.mjpeg_handler.push_no_signal_placeholder(); + set_state(StreamerState::NoSignal); + no_signal_since = Some(std::time::Instant::now()); + self.current_fps.store(0, Ordering::Relaxed); + fps_frame_count = 0; + last_fps_time = std::time::Instant::now(); + nosignal_placeholder_counter = 0; + } else { + // Already in NoSignal – re-send placeholder periodically so + // the HTTP keepalive timer does not expire. + nosignal_placeholder_counter = + nosignal_placeholder_counter.wrapping_add(1); + if nosignal_placeholder_counter >= NOSIGNAL_PLACEHOLDER_INTERVAL { + nosignal_placeholder_counter = 0; + self.mjpeg_handler.push_no_signal_placeholder(); + } + + // Soft-restart after exponential back-off. + if let Some(since) = no_signal_since { + let backoff_secs = NOSIGNAL_SOFT_RESTART_SECS + .saturating_mul( + 2u64.pow(no_signal_restart_count.min(2)), + ) + .min(30); + if since.elapsed().as_secs() >= backoff_secs { + info!( + "NoSignal for {}s, attempting soft restart (attempt {})", + backoff_secs, + no_signal_restart_count + 1 + ); + need_soft_restart = true; + break 'capture; + } + } + } + + std::thread::sleep(std::time::Duration::from_millis(100)); + continue 'capture; + } + + let is_device_lost = match e.raw_os_error() { + Some(6) => true, // ENXIO + Some(19) => true, // ENODEV + Some(5) => true, // EIO + Some(32) => true, // EPIPE + Some(108) => true, // ESHUTDOWN + _ => false, + }; + + if is_device_lost { + error!("Video device lost: {} - {}", device_path.display(), e); + self.mjpeg_handler.set_offline(); + handle.block_on(async { + *self.last_lost_device.write().await = + Some(device_path.display().to_string()); + *self.last_lost_reason.write().await = Some(e.to_string()); + }); + set_state(StreamerState::DeviceLost); + handle.block_on(async { + let streamer = Arc::clone(&self); + tokio::spawn(async move { + streamer.start_device_recovery_internal().await; + }); + }); + break 'capture; + } + + let key = classify_capture_error(&e); + if capture_error_throttler.should_log(&key) { + let suppressed = suppressed_capture_errors.remove(&key).unwrap_or(0); + if suppressed > 0 { + error!( + "Capture error: {} (suppressed {} repeats)", + e, suppressed + ); + } else { + error!("Capture error: {}", e); + } + } else { + let counter = suppressed_capture_errors.entry(key).or_insert(0); + *counter = counter.saturating_add(1); + } + continue 'capture; + } + }; + + let frame_size = meta.bytes_used; + if frame_size < MIN_CAPTURE_FRAME_SIZE { + continue 'capture; + } + + validate_counter = validate_counter.wrapping_add(1); + if pixel_format.is_compressed() + && validate_counter.is_multiple_of(JPEG_VALIDATE_INTERVAL) + && !VideoFrame::is_valid_jpeg_bytes(&owned[..frame_size]) + { + continue 'capture; + } + + owned.truncate(frame_size); + let frame = VideoFrame::from_pooled( + Arc::new(FrameBuffer::new(owned, Some(buffer_pool.clone()))), + resolution, + pixel_format, + stride, + meta.sequence, + ); + + if !signal_present { + signal_present = true; + no_signal_since = None; + no_signal_restart_count = 0; + // Stream was kept online (placeholder pushes), just update state. + set_state(StreamerState::Streaming); + } + + self.mjpeg_handler.update_frame(frame); + + fps_frame_count += 1; + let fps_elapsed = last_fps_time.elapsed(); + if fps_elapsed >= std::time::Duration::from_secs(1) { + let current_fps = fps_frame_count as f32 / fps_elapsed.as_secs_f32(); + fps_frame_count = 0; + last_fps_time = std::time::Instant::now(); + self.current_fps + .store((current_fps * 100.0) as u32, Ordering::Relaxed); + } + } // 'capture + + // ── After inner loop ──────────────────────────────────────────────── + // The stream is dropped here, releasing the device FD. + drop(stream); + + if self.direct_stop.load(Ordering::Relaxed) { + break 'session; } - self.mjpeg_handler.update_frame(frame); - - fps_frame_count += 1; - let fps_elapsed = last_fps_time.elapsed(); - if fps_elapsed >= std::time::Duration::from_secs(1) { - let current_fps = fps_frame_count as f32 / fps_elapsed.as_secs_f32(); - fps_frame_count = 0; - last_fps_time = std::time::Instant::now(); - self.current_fps - .store((current_fps * 100.0) as u32, Ordering::Relaxed); + if !need_soft_restart { + // Normal exit (idle / device-lost / stop). + break 'session; } - } + + // ── Soft restart path ─────────────────────────────────────────────── + no_signal_restart_count = no_signal_restart_count.saturating_add(1); + + // Re-probe the device to pick up a changed resolution/format. + match VideoDevice::open_readonly(&device_path).and_then(|d| d.info()) { + Ok(device_info) => { + handle.block_on(async { + let fmt; + let res; + { + let cfg = self.config.read().await; + fmt = self + .select_format(&device_info, cfg.format) + .unwrap_or(cfg.format); + res = self + .select_resolution(&device_info, &fmt, cfg.resolution) + .unwrap_or(cfg.resolution); + } + { + let mut cfg = self.config.write().await; + cfg.format = fmt; + cfg.resolution = res; + } + *self.current_device.write().await = Some(device_info); + info!( + "Soft restart: re-probed device → {}x{} {:?}", + res.width, res.height, fmt + ); + }); + } + Err(e) => { + warn!("Soft restart: failed to re-probe device: {}", e); + // Brief wait before retrying to avoid spinning. + let wait = 2u64.pow(no_signal_restart_count.min(3)); + std::thread::sleep(Duration::from_secs(wait)); + } + } + + // Reset no_signal_since so the back-off timer is fresh for the new session. + // no_signal_since will be re-set if the new session immediately times out. + + // Continue 'session → re-open V4l2rCaptureStream with updated config. + } // 'session self.direct_active.store(false, Ordering::SeqCst); self.current_fps.store(0, Ordering::Relaxed); } - /// Check if streaming + /// Check if streaming (or in NoSignal state — capture thread is still running) pub async fn is_streaming(&self) -> bool { - self.state().await == StreamerState::Streaming + matches!( + self.state().await, + StreamerState::Streaming | StreamerState::NoSignal + ) + } + + /// Re-probe a device and update the stored config/device info. + /// + /// Called during recovery or after a NoSignal soft restart so that a + /// resolution / format change on the source side is picked up before + /// the capture stream is re-opened. + pub async fn re_init_device(self: &Arc, device_path: &str) -> Result<()> { + let device = VideoDevice::open_readonly(device_path).map_err(|e| { + AppError::VideoError(format!("Cannot open device for re-init: {}", e)) + })?; + let device_info = device.info()?; + + let (format, resolution) = { + let config = self.config.read().await; + let fmt = self + .select_format(&device_info, config.format) + .unwrap_or(config.format); + let res = self + .select_resolution(&device_info, &fmt, config.resolution) + .unwrap_or(config.resolution); + (fmt, res) + }; + + { + let mut cfg = self.config.write().await; + cfg.format = format; + cfg.resolution = resolution; + } + *self.current_device.write().await = Some(device_info); + + info!( + "Device re-initialized: {}x{} {:?}", + resolution.width, resolution.height, format + ); + Ok(()) } /// Get stream statistics @@ -997,6 +1160,15 @@ impl Streamer { continue; } + // Re-probe device to pick up resolution/format changes + if let Err(e) = streamer.re_init_device(&device_path).await { + debug!( + "Failed to re-probe device format (attempt {}): {}", + attempt, e + ); + // Don't skip – device exists, try restart anyway + } + // Try to restart capture match streamer.restart_capture().await { Ok(_) => { diff --git a/web/src/views/ConsoleView.vue b/web/src/views/ConsoleView.vue index 7ce6bf49..0325c1f9 100644 --- a/web/src/views/ConsoleView.vue +++ b/web/src/views/ConsoleView.vue @@ -567,6 +567,12 @@ const MAX_CONSECUTIVE_ERRORS = 2 // If 2+ errors in grace period, it's a real pr let pendingWebRTCReadyGate = false let webrtcConnectTask: Promise | null = null +// WebRTC auto-reconnect on device-lost/recovery +let webrtcRecoveryTimerId: number | null = null +let webrtcRecoveryAttempts = 0 +const MAX_WEBRTC_RECOVERY_ATTEMPTS = 8 +const WEBRTC_RECOVERY_BASE_DELAY = 2000 + // Last-frame overlay (prevents black flash during mode switches) const frameOverlayUrl = ref(null) @@ -781,9 +787,78 @@ function handleVideoError() { function handleStreamDeviceLost(data: { device: string; reason: string }) { videoError.value = true videoErrorMessage.value = t('console.deviceLostDesc', { device: data.device, reason: data.reason }) + + // In WebRTC mode, the pipeline will attempt to restart itself. + // Start an exponential-backoff reconnect loop so the session is + // re-established automatically once the backend is ready again. + if (videoMode.value !== 'mjpeg') { + scheduleWebRTCRecovery() + } +} + +function scheduleWebRTCRecovery() { + // Clear any previous timer + if (webrtcRecoveryTimerId !== null) { + clearTimeout(webrtcRecoveryTimerId) + webrtcRecoveryTimerId = null + } + + if (webrtcRecoveryAttempts >= MAX_WEBRTC_RECOVERY_ATTEMPTS) { + console.warn('[Recovery] Max WebRTC recovery attempts reached, giving up') + webrtcRecoveryAttempts = 0 + return + } + + const delay = Math.min( + WEBRTC_RECOVERY_BASE_DELAY * Math.pow(2, webrtcRecoveryAttempts), + 30000, + ) + + console.log( + `[Recovery] Scheduling WebRTC reconnect attempt ${webrtcRecoveryAttempts + 1}/${MAX_WEBRTC_RECOVERY_ATTEMPTS} in ${delay}ms`, + ) + + webrtcRecoveryTimerId = window.setTimeout(async () => { + webrtcRecoveryTimerId = null + webrtcRecoveryAttempts++ + + // Only reconnect if we are still in a WebRTC mode and error state + if (videoMode.value === 'mjpeg' || !videoError.value) { + webrtcRecoveryAttempts = 0 + return + } + + console.log(`[Recovery] Attempting WebRTC reconnect (attempt ${webrtcRecoveryAttempts})`) + try { + await webrtc.disconnect() + const ok = await connectWebRTCSerial('device-recovery') + if (ok) { + console.log('[Recovery] WebRTC reconnected successfully') + videoError.value = false + videoErrorMessage.value = '' + webrtcRecoveryAttempts = 0 + } else { + // Retry + scheduleWebRTCRecovery() + } + } catch { + scheduleWebRTCRecovery() + } + }, delay) +} + +function cancelWebRTCRecovery() { + if (webrtcRecoveryTimerId !== null) { + clearTimeout(webrtcRecoveryTimerId) + webrtcRecoveryTimerId = null + } + webrtcRecoveryAttempts = 0 } function handleStreamRecovered(_data: { device: string }) { + // Cancel any pending recovery timer – backend is back + cancelWebRTCRecovery() + // Reset video error state videoError.value = false videoErrorMessage.value = '' @@ -918,6 +993,16 @@ function handleStreamStateChanged(data: any) { if (data.state === 'error') { videoError.value = true videoErrorMessage.value = t('console.streamError') + } else if (data.state === 'recovering' && videoMode.value !== 'mjpeg') { + // Backend is in the DeviceLost recovery loop; start WebRTC reconnect if not already scheduled. + if (webrtcRecoveryTimerId === null && webrtcRecoveryAttempts === 0) { + scheduleWebRTCRecovery() + } + } else if (data.state === 'streaming' || data.state === 'no_signal') { + // Backend stream is alive; cancel any pending recovery timers. + if (data.state === 'streaming') { + cancelWebRTCRecovery() + } } } @@ -2224,6 +2309,7 @@ onUnmounted(() => { clearTimeout(gracePeriodTimeoutId) gracePeriodTimeoutId = null } + cancelWebRTCRecovery() videoSession.clearWaiters() // Reset counters