diff --git a/src/main.rs b/src/main.rs index 484ee213..392f37ac 100644 --- a/src/main.rs +++ b/src/main.rs @@ -193,7 +193,6 @@ async fn main() -> anyhow::Result<()> { config.web.http_port }; - for ip in &bind_ips { let addr = SocketAddr::new(*ip, bind_port); tracing::info!("Server will listen on: {}://{}", scheme, addr); @@ -327,10 +326,7 @@ async fn main() -> anyhow::Result<()> { }, config::HidBackend::None => HidBackendType::None, }; - let hid = Arc::new(HidController::new( - hid_backend, - Some(otg_service.clone()), - )); + let hid = Arc::new(HidController::new(hid_backend, Some(otg_service.clone()))); hid.set_event_bus(events.clone()).await; if let Err(e) = hid.init().await { tracing::warn!("Failed to initialize HID backend: {}", e); @@ -774,10 +770,9 @@ async fn run_user_action( } async fn set_user_password(users: &UserStore, sessions: &SessionStore) -> anyhow::Result<()> { - let user = users - .single_user() - .await? - .ok_or_else(|| anyhow::anyhow!("No local user exists yet; complete setup in the web UI first."))?; + let user = users.single_user().await?.ok_or_else(|| { + anyhow::anyhow!("No local user exists yet; complete setup in the web UI first.") + })?; let new_password = read_new_password_interactive()?; if new_password.len() < 4 { diff --git a/src/video/capture_status.rs b/src/video/capture_status.rs new file mode 100644 index 00000000..1b9f7ea8 --- /dev/null +++ b/src/video/capture_status.rs @@ -0,0 +1,66 @@ +//! Shared capture status and error classification helpers. + +use std::io; + +use crate::video::SignalStatus; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum CaptureIoErrorKind { + DeviceLost, + TransientSignal { status: Option }, + Other, +} + +pub fn signal_status_from_capture_kind(kind: &str) -> SignalStatus { + SignalStatus::from_str(kind).unwrap_or(SignalStatus::NoSignal) +} + +pub fn classify_capture_io_error(err: &io::Error) -> CaptureIoErrorKind { + match err.raw_os_error() { + // ENXIO / ENODEV / ESHUTDOWN: the device node or endpoint is gone. + Some(6) | Some(19) | Some(108) => CaptureIoErrorKind::DeviceLost, + // EIO / EPIPE: source or transport glitched; EPROTO is common for UVC USB. + Some(5) | Some(32) => CaptureIoErrorKind::TransientSignal { status: None }, + Some(71) => CaptureIoErrorKind::TransientSignal { + status: Some(SignalStatus::UvcUsbError), + }, + _ => CaptureIoErrorKind::Other, + } +} + +pub fn capture_error_log_key(err: &io::Error) -> String { + let message = err.to_string(); + if message.contains("dqbuf failed") && message.contains("EINVAL") { + "capture_dqbuf_einval".to_string() + } else if message.contains("dqbuf failed") { + "capture_dqbuf".to_string() + } else { + format!("capture_{:?}", err.kind()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn maps_known_signal_status_strings() { + assert_eq!( + signal_status_from_capture_kind("out_of_range"), + SignalStatus::OutOfRange + ); + assert_eq!( + signal_status_from_capture_kind("unknown"), + SignalStatus::NoSignal + ); + } + + #[test] + fn classifies_source_change_log_keys() { + let err = io::Error::other("dqbuf failed: EINVAL"); + assert_eq!(capture_error_log_key(&err), "capture_dqbuf_einval"); + + let err = io::Error::new(io::ErrorKind::TimedOut, "capture timeout"); + assert_eq!(capture_error_log_key(&err), "capture_TimedOut"); + } +} diff --git a/src/video/mod.rs b/src/video/mod.rs index 3d76bc7f..fa64949c 100644 --- a/src/video/mod.rs +++ b/src/video/mod.rs @@ -3,6 +3,7 @@ //! This module provides V4L2 video capture, encoding, and streaming functionality. pub(crate) mod capture_limits; +pub(crate) mod capture_status; pub mod codec_constraints; pub mod convert; pub mod csi_bridge; diff --git a/src/video/shared_video_pipeline.rs b/src/video/shared_video_pipeline.rs index de5417dd..a5603a75 100644 --- a/src/video/shared_video_pipeline.rs +++ b/src/video/shared_video_pipeline.rs @@ -44,6 +44,10 @@ const ENCODE_ERROR_THROTTLE_SECS: u64 = 5; use crate::error::{AppError, Result}; use crate::utils::LogThrottler; use crate::video::capture_limits::{should_validate_jpeg_frame, MIN_CAPTURE_FRAME_SIZE}; +use crate::video::capture_status::{ + capture_error_log_key, classify_capture_io_error, signal_status_from_capture_kind, + CaptureIoErrorKind, +}; use crate::video::csi_bridge::{self, ProbeResult}; use crate::video::device::parse_bridge_kind; use crate::video::encoder::registry::{EncoderBackend, VideoEncoderType}; @@ -586,7 +590,7 @@ impl SharedVideoPipeline { debug!( "Pre-probe: no signal — encoder uses configured geometry until capture opens" ); - let status = SignalStatus::from_str(&kind).unwrap_or(SignalStatus::NoSignal); + let status = signal_status_from_capture_kind(&kind); self.notify_state(PipelineStateNotification::no_signal( status, Some(Duration::from_secs(2).as_millis() as u64), @@ -757,7 +761,7 @@ impl SharedVideoPipeline { kind ); pipeline.notify_state(PipelineStateNotification::no_signal( - SignalStatus::from_str(&kind).unwrap_or(SignalStatus::NoSignal), + signal_status_from_capture_kind(&kind), Some(CSI_BRIDGE_NOSIGNAL_INTERVAL_MS), )); } @@ -800,9 +804,7 @@ impl SharedVideoPipeline { Ok(s) => OpenResult::Opened(s), Err(AppError::CaptureNoSignal { kind }) => { debug!("Capture soft-restart: still no signal ({})", kind); - OpenResult::NoSignal( - SignalStatus::from_str(&kind).unwrap_or(SignalStatus::NoSignal), - ) + OpenResult::NoSignal(signal_status_from_capture_kind(&kind)) } Err(e) => { error!("Capture soft-restart failed: {}", e); @@ -819,17 +821,6 @@ impl SharedVideoPipeline { let capture_error_throttler = LogThrottler::with_secs(5); let mut suppressed_capture_errors: HashMap = HashMap::new(); - let classify_capture_error = |err: &std::io::Error| -> String { - let message = err.to_string(); - if message.contains("dqbuf failed") && message.contains("EINVAL") { - "capture_dqbuf_einval".to_string() - } else if message.contains("dqbuf failed") { - "capture_dqbuf".to_string() - } else { - format!("capture_{:?}", err.kind()) - } - }; - while pipeline.running_flag.load(Ordering::Acquire) { let subscriber_count = pipeline.subscriber_count(); if subscriber_count == 0 { @@ -1121,30 +1112,39 @@ impl SharedVideoPipeline { // EIO (5) / EPIPE (32) / EPROTO (71) in next_into generally // mean the source or UVC USB transport glitched mid-stream. // Tear down the stream and let the open loop re-probe. - let os = e.raw_os_error(); - if matches!(os, Some(5) | Some(32) | Some(71)) { - if os == Some(71) { - warn!( + match classify_capture_io_error(&e) { + CaptureIoErrorKind::TransientSignal { status } => { + if status == Some(SignalStatus::UvcUsbError) { + warn!( "Capture transient error (EPROTO/-71, often UVC USB): {} — soft-restart", e ); - pipeline.notify_state( - PipelineStateNotification::no_signal( - SignalStatus::UvcUsbError, - Some(Duration::from_secs(2).as_millis() as u64), - ), - ); - } else { - warn!( - "Capture transient error ({}), closing stream for \ + pipeline.notify_state( + PipelineStateNotification::no_signal( + SignalStatus::UvcUsbError, + Some(Duration::from_secs(2).as_millis() as u64), + ), + ); + } else { + warn!( + "Capture transient error ({}), closing stream for \ soft-restart", - e - ); + e + ); + } + stream = None; + continue; } - stream = None; - continue; + CaptureIoErrorKind::DeviceLost => { + error!("Capture device lost: {}", e); + let _ = pipeline.running.send(false); + pipeline.running_flag.store(false, Ordering::Release); + let _ = frame_seq_tx.send(sequence.wrapping_add(1)); + break; + } + CaptureIoErrorKind::Other => {} } - let key = classify_capture_error(&e); + let key = capture_error_log_key(&e); if capture_error_throttler.should_log(&key) { let suppressed = suppressed_capture_errors.remove(&key).unwrap_or(0); diff --git a/src/video/streamer.rs b/src/video/streamer.rs index 01d83779..2c1ca194 100644 --- a/src/video/streamer.rs +++ b/src/video/streamer.rs @@ -23,6 +23,10 @@ use crate::events::{EventBus, SystemEvent}; use crate::stream::MjpegStreamHandler; use crate::utils::LogThrottler; use crate::video::capture_limits::{should_validate_jpeg_frame, MIN_CAPTURE_FRAME_SIZE}; +use crate::video::capture_status::{ + capture_error_log_key, classify_capture_io_error, signal_status_from_capture_kind, + CaptureIoErrorKind, +}; use crate::video::v4l2r_capture::{is_source_changed_error, BridgeContext, V4l2rCaptureStream}; /// Streamer configuration @@ -907,8 +911,7 @@ impl Streamer { // "no signal" overlay, update the state with the // fine-grained reason, and let the outer 'session // loop back off before the next retry. - let status = crate::video::SignalStatus::from_str(&kind) - .unwrap_or(crate::video::SignalStatus::NoSignal); + let status = signal_status_from_capture_kind(&kind); debug!( "CSI open probe reports no signal ({:?}), will soft-restart", status @@ -989,17 +992,6 @@ impl Streamer { let capture_error_throttler = LogThrottler::with_secs(5); let mut suppressed_capture_errors: HashMap = HashMap::new(); - let classify_capture_error = |err: &std::io::Error| -> String { - let message = err.to_string(); - if message.contains("dqbuf failed") && message.contains("EINVAL") { - "capture_dqbuf_einval".to_string() - } else if message.contains("dqbuf failed") { - "capture_dqbuf".to_string() - } else { - format!("capture_{:?}", err.kind()) - } - }; - // None = signal is present; Some(Instant) = when signal was first lost. let mut no_signal_since: Option = None; // Whether the inner 'capture loop should trigger a soft restart. @@ -1078,59 +1070,60 @@ impl Streamer { // when the source glitches or re-locks; those are // treated as NoSignal + soft-restart so we recover // in ~1 s instead of the 1 s recovery-poll loop. - let os_err = e.raw_os_error(); - let is_device_lost = matches!(os_err, Some(6) | Some(19) | Some(108)); - let is_transient_signal_error = - matches!(os_err, Some(5) | Some(32) | Some(71)); - - if is_device_lost { - error!("Video device lost: {} - {}", device_path.display(), e); - go_offline(); - set_retry(0); - handle.block_on(async { - *self.last_lost_device.write().await = - Some(device_path.display().to_string()); - *self.last_lost_reason.write().await = Some(e.to_string()); - }); - set_state(StreamerState::DeviceLost); - handle.block_on(async { - let streamer = Arc::clone(&self); - tokio::spawn(async move { - streamer.start_device_recovery_internal().await; + match classify_capture_io_error(&e) { + CaptureIoErrorKind::DeviceLost => { + error!("Video device lost: {} - {}", device_path.display(), e); + go_offline(); + set_retry(0); + handle.block_on(async { + *self.last_lost_device.write().await = + Some(device_path.display().to_string()); + *self.last_lost_reason.write().await = Some(e.to_string()); }); - }); - break 'capture; - } - - if is_transient_signal_error { - if os_err == Some(71) { - warn!("Capture transient error (EPROTO/-71, often UVC USB): {}", e); - let is_uvc = - handle.block_on(async { + set_state(StreamerState::DeviceLost); + handle.block_on(async { + let streamer = Arc::clone(&self); + tokio::spawn(async move { + streamer.start_device_recovery_internal().await; + }); + }); + break 'capture; + } + CaptureIoErrorKind::TransientSignal { status } => { + if status == Some(crate::video::SignalStatus::UvcUsbError) { + warn!( + "Capture transient error (EPROTO/-71, often UVC USB): {}", + e + ); + let is_uvc = handle.block_on(async { self.current_device.read().await.as_ref().is_some_and(|d| { d.driver.eq_ignore_ascii_case("uvcvideo") }) }); - if is_uvc { - go_offline(); - set_state(StreamerState::UvcUsbError); - need_soft_restart = true; - break 'capture; - } - } else { - warn!( + if is_uvc { + go_offline(); + set_state(StreamerState::UvcUsbError); + need_soft_restart = true; + break 'capture; + } + } else { + warn!( "Capture transient error ({}): treating as NoSignal + soft-restart", e ); + } + set_retry( + backoff_secs(no_signal_restart_count).saturating_mul(1000), + ); + go_offline(); + set_state(StreamerState::NoSignal); + need_soft_restart = true; + break 'capture; } - set_retry(backoff_secs(no_signal_restart_count).saturating_mul(1000)); - go_offline(); - set_state(StreamerState::NoSignal); - need_soft_restart = true; - break 'capture; + CaptureIoErrorKind::Other => {} } - let key = classify_capture_error(&e); + let key = capture_error_log_key(&e); if capture_error_throttler.should_log(&key) { let suppressed = suppressed_capture_errors.remove(&key).unwrap_or(0); if suppressed > 0 { diff --git a/src/webrtc/rtp.rs b/src/webrtc/rtp.rs index d72228cd..1e847873 100644 --- a/src/webrtc/rtp.rs +++ b/src/webrtc/rtp.rs @@ -301,9 +301,8 @@ mod tests { assert!(!is_h264_keyframe(&sps)); let multi_nal = vec![ - 0x00, 0x00, 0x00, 0x01, 0x67, 0x42, 0x00, 0x1f, - 0x00, 0x00, 0x00, 0x01, 0x68, 0xce, 0x38, 0x80, - 0x00, 0x00, 0x00, 0x01, 0x65, 0x88, 0x84, + 0x00, 0x00, 0x00, 0x01, 0x67, 0x42, 0x00, 0x1f, 0x00, 0x00, 0x00, 0x01, 0x68, 0xce, + 0x38, 0x80, 0x00, 0x00, 0x00, 0x01, 0x65, 0x88, 0x84, ]; assert!(is_h264_keyframe(&multi_nal)); } diff --git a/src/webrtc/video_track.rs b/src/webrtc/video_track.rs index a2d5967a..30a7abb0 100644 --- a/src/webrtc/video_track.rs +++ b/src/webrtc/video_track.rs @@ -55,9 +55,7 @@ impl VideoCodec { VideoCodec::H264 => { "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f".to_string() } - VideoCodec::H265 => { - "level-id=180;profile-id=1;tier-flag=0;tx-mode=SRST".to_string() - } + VideoCodec::H265 => "level-id=180;profile-id=1;tier-flag=0;tx-mode=SRST".to_string(), VideoCodec::VP8 => String::new(), VideoCodec::VP9 => "profile-id=0".to_string(), } diff --git a/src/webrtc/webrtc_streamer.rs b/src/webrtc/webrtc_streamer.rs index a09bca97..a67fefa4 100644 --- a/src/webrtc/webrtc_streamer.rs +++ b/src/webrtc/webrtc_streamer.rs @@ -11,9 +11,9 @@ use crate::error::{AppError, Result}; use crate::events::{EventBus, SystemEvent}; use crate::hid::HidController; use crate::video::types::{ - BitratePreset, EncoderBackend, PixelFormat, Resolution, VideoCodecType, VideoEncoderType, - PipelineStateNotification, SharedVideoPipeline, SharedVideoPipelineConfig, - SharedVideoPipelineStats, + BitratePreset, EncoderBackend, PipelineStateNotification, PixelFormat, Resolution, + SharedVideoPipeline, SharedVideoPipelineConfig, SharedVideoPipelineStats, VideoCodecType, + VideoEncoderType, }; use super::config::{TurnServer, WebRtcConfig}; @@ -249,10 +249,7 @@ impl WebRtcStreamer { }) } - async fn reconnect_sessions_to_current_pipeline( - &self, - reason: &str, - ) -> Result { + async fn reconnect_sessions_to_current_pipeline(&self, reason: &str) -> Result { if self.capture_device.read().await.is_none() { return Ok(0); } @@ -352,7 +349,8 @@ impl WebRtcStreamer { if let Some(ref current) = *pipeline_guard { if let Some(stopped_pipeline) = pipeline_weak.upgrade() { if Arc::ptr_eq(current, &stopped_pipeline) { - pending_geometry = stopped_pipeline.take_pending_sync_geometry(); + pending_geometry = + stopped_pipeline.take_pending_sync_geometry(); *pipeline_guard = None; info!("Cleared stopped video pipeline reference"); } @@ -414,9 +412,7 @@ impl WebRtcStreamer { /// /// This is a public wrapper around ensure_video_pipeline for external /// components (like RustDesk) that need to share the encoded video stream. - pub async fn ensure_video_pipeline_for_external( - &self, - ) -> Result> { + pub async fn ensure_video_pipeline_for_external(&self) -> Result> { self.ensure_video_pipeline().await } @@ -608,11 +604,7 @@ impl WebRtcStreamer { info!( "WebRTC config updated: {}x{} {:?} @ {} fps, {}", - resolution.width, - resolution.height, - format, - fps, - config.bitrate_preset + resolution.width, resolution.height, format, fps, config.bitrate_preset ); } @@ -1053,8 +1045,7 @@ impl WebRtcStreamer { if reconnected > 0 { info!( "Video pipeline restarted with {}, reconnected {} sessions", - preset, - reconnected + preset, reconnected ); } } else { @@ -1086,8 +1077,14 @@ impl crate::video::traits::VideoOutput for WebRtcStreamer { bridge_kind: Option, v4l2_driver: Option, ) { - self.set_capture_device(device_path, jpeg_quality, subdev_path, bridge_kind, v4l2_driver) - .await; + self.set_capture_device( + device_path, + jpeg_quality, + subdev_path, + bridge_kind, + v4l2_driver, + ) + .await; } async fn current_video_codec(&self) -> VideoCodecType {