refactor: 重构视频采集状态与错误分类公共逻辑

This commit is contained in:
mofeng-git
2026-05-01 17:56:56 +08:00
parent d82c863f40
commit 0d47d8395d
8 changed files with 173 additions and 124 deletions

View File

@@ -193,7 +193,6 @@ async fn main() -> anyhow::Result<()> {
config.web.http_port config.web.http_port
}; };
for ip in &bind_ips { for ip in &bind_ips {
let addr = SocketAddr::new(*ip, bind_port); let addr = SocketAddr::new(*ip, bind_port);
tracing::info!("Server will listen on: {}://{}", scheme, addr); tracing::info!("Server will listen on: {}://{}", scheme, addr);
@@ -327,10 +326,7 @@ async fn main() -> anyhow::Result<()> {
}, },
config::HidBackend::None => HidBackendType::None, config::HidBackend::None => HidBackendType::None,
}; };
let hid = Arc::new(HidController::new( let hid = Arc::new(HidController::new(hid_backend, Some(otg_service.clone())));
hid_backend,
Some(otg_service.clone()),
));
hid.set_event_bus(events.clone()).await; hid.set_event_bus(events.clone()).await;
if let Err(e) = hid.init().await { if let Err(e) = hid.init().await {
tracing::warn!("Failed to initialize HID backend: {}", e); tracing::warn!("Failed to initialize HID backend: {}", e);
@@ -774,10 +770,9 @@ async fn run_user_action(
} }
async fn set_user_password(users: &UserStore, sessions: &SessionStore) -> anyhow::Result<()> { async fn set_user_password(users: &UserStore, sessions: &SessionStore) -> anyhow::Result<()> {
let user = users let user = users.single_user().await?.ok_or_else(|| {
.single_user() anyhow::anyhow!("No local user exists yet; complete setup in the web UI first.")
.await? })?;
.ok_or_else(|| anyhow::anyhow!("No local user exists yet; complete setup in the web UI first."))?;
let new_password = read_new_password_interactive()?; let new_password = read_new_password_interactive()?;
if new_password.len() < 4 { if new_password.len() < 4 {

View File

@@ -0,0 +1,66 @@
//! Shared capture status and error classification helpers.
use std::io;
use crate::video::SignalStatus;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CaptureIoErrorKind {
DeviceLost,
TransientSignal { status: Option<SignalStatus> },
Other,
}
pub fn signal_status_from_capture_kind(kind: &str) -> SignalStatus {
SignalStatus::from_str(kind).unwrap_or(SignalStatus::NoSignal)
}
pub fn classify_capture_io_error(err: &io::Error) -> CaptureIoErrorKind {
match err.raw_os_error() {
// ENXIO / ENODEV / ESHUTDOWN: the device node or endpoint is gone.
Some(6) | Some(19) | Some(108) => CaptureIoErrorKind::DeviceLost,
// EIO / EPIPE: source or transport glitched; EPROTO is common for UVC USB.
Some(5) | Some(32) => CaptureIoErrorKind::TransientSignal { status: None },
Some(71) => CaptureIoErrorKind::TransientSignal {
status: Some(SignalStatus::UvcUsbError),
},
_ => CaptureIoErrorKind::Other,
}
}
pub fn capture_error_log_key(err: &io::Error) -> String {
let message = err.to_string();
if message.contains("dqbuf failed") && message.contains("EINVAL") {
"capture_dqbuf_einval".to_string()
} else if message.contains("dqbuf failed") {
"capture_dqbuf".to_string()
} else {
format!("capture_{:?}", err.kind())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn maps_known_signal_status_strings() {
assert_eq!(
signal_status_from_capture_kind("out_of_range"),
SignalStatus::OutOfRange
);
assert_eq!(
signal_status_from_capture_kind("unknown"),
SignalStatus::NoSignal
);
}
#[test]
fn classifies_source_change_log_keys() {
let err = io::Error::other("dqbuf failed: EINVAL");
assert_eq!(capture_error_log_key(&err), "capture_dqbuf_einval");
let err = io::Error::new(io::ErrorKind::TimedOut, "capture timeout");
assert_eq!(capture_error_log_key(&err), "capture_TimedOut");
}
}

View File

@@ -3,6 +3,7 @@
//! This module provides V4L2 video capture, encoding, and streaming functionality. //! This module provides V4L2 video capture, encoding, and streaming functionality.
pub(crate) mod capture_limits; pub(crate) mod capture_limits;
pub(crate) mod capture_status;
pub mod codec_constraints; pub mod codec_constraints;
pub mod convert; pub mod convert;
pub mod csi_bridge; pub mod csi_bridge;

View File

@@ -44,6 +44,10 @@ const ENCODE_ERROR_THROTTLE_SECS: u64 = 5;
use crate::error::{AppError, Result}; use crate::error::{AppError, Result};
use crate::utils::LogThrottler; use crate::utils::LogThrottler;
use crate::video::capture_limits::{should_validate_jpeg_frame, MIN_CAPTURE_FRAME_SIZE}; use crate::video::capture_limits::{should_validate_jpeg_frame, MIN_CAPTURE_FRAME_SIZE};
use crate::video::capture_status::{
capture_error_log_key, classify_capture_io_error, signal_status_from_capture_kind,
CaptureIoErrorKind,
};
use crate::video::csi_bridge::{self, ProbeResult}; use crate::video::csi_bridge::{self, ProbeResult};
use crate::video::device::parse_bridge_kind; use crate::video::device::parse_bridge_kind;
use crate::video::encoder::registry::{EncoderBackend, VideoEncoderType}; use crate::video::encoder::registry::{EncoderBackend, VideoEncoderType};
@@ -586,7 +590,7 @@ impl SharedVideoPipeline {
debug!( debug!(
"Pre-probe: no signal — encoder uses configured geometry until capture opens" "Pre-probe: no signal — encoder uses configured geometry until capture opens"
); );
let status = SignalStatus::from_str(&kind).unwrap_or(SignalStatus::NoSignal); let status = signal_status_from_capture_kind(&kind);
self.notify_state(PipelineStateNotification::no_signal( self.notify_state(PipelineStateNotification::no_signal(
status, status,
Some(Duration::from_secs(2).as_millis() as u64), Some(Duration::from_secs(2).as_millis() as u64),
@@ -757,7 +761,7 @@ impl SharedVideoPipeline {
kind kind
); );
pipeline.notify_state(PipelineStateNotification::no_signal( pipeline.notify_state(PipelineStateNotification::no_signal(
SignalStatus::from_str(&kind).unwrap_or(SignalStatus::NoSignal), signal_status_from_capture_kind(&kind),
Some(CSI_BRIDGE_NOSIGNAL_INTERVAL_MS), Some(CSI_BRIDGE_NOSIGNAL_INTERVAL_MS),
)); ));
} }
@@ -800,9 +804,7 @@ impl SharedVideoPipeline {
Ok(s) => OpenResult::Opened(s), Ok(s) => OpenResult::Opened(s),
Err(AppError::CaptureNoSignal { kind }) => { Err(AppError::CaptureNoSignal { kind }) => {
debug!("Capture soft-restart: still no signal ({})", kind); debug!("Capture soft-restart: still no signal ({})", kind);
OpenResult::NoSignal( OpenResult::NoSignal(signal_status_from_capture_kind(&kind))
SignalStatus::from_str(&kind).unwrap_or(SignalStatus::NoSignal),
)
} }
Err(e) => { Err(e) => {
error!("Capture soft-restart failed: {}", e); error!("Capture soft-restart failed: {}", e);
@@ -819,17 +821,6 @@ impl SharedVideoPipeline {
let capture_error_throttler = LogThrottler::with_secs(5); let capture_error_throttler = LogThrottler::with_secs(5);
let mut suppressed_capture_errors: HashMap<String, u64> = HashMap::new(); let mut suppressed_capture_errors: HashMap<String, u64> = HashMap::new();
let classify_capture_error = |err: &std::io::Error| -> String {
let message = err.to_string();
if message.contains("dqbuf failed") && message.contains("EINVAL") {
"capture_dqbuf_einval".to_string()
} else if message.contains("dqbuf failed") {
"capture_dqbuf".to_string()
} else {
format!("capture_{:?}", err.kind())
}
};
while pipeline.running_flag.load(Ordering::Acquire) { while pipeline.running_flag.load(Ordering::Acquire) {
let subscriber_count = pipeline.subscriber_count(); let subscriber_count = pipeline.subscriber_count();
if subscriber_count == 0 { if subscriber_count == 0 {
@@ -1121,30 +1112,39 @@ impl SharedVideoPipeline {
// EIO (5) / EPIPE (32) / EPROTO (71) in next_into generally // EIO (5) / EPIPE (32) / EPROTO (71) in next_into generally
// mean the source or UVC USB transport glitched mid-stream. // mean the source or UVC USB transport glitched mid-stream.
// Tear down the stream and let the open loop re-probe. // Tear down the stream and let the open loop re-probe.
let os = e.raw_os_error(); match classify_capture_io_error(&e) {
if matches!(os, Some(5) | Some(32) | Some(71)) { CaptureIoErrorKind::TransientSignal { status } => {
if os == Some(71) { if status == Some(SignalStatus::UvcUsbError) {
warn!( warn!(
"Capture transient error (EPROTO/-71, often UVC USB): {} — soft-restart", "Capture transient error (EPROTO/-71, often UVC USB): {} — soft-restart",
e e
); );
pipeline.notify_state( pipeline.notify_state(
PipelineStateNotification::no_signal( PipelineStateNotification::no_signal(
SignalStatus::UvcUsbError, SignalStatus::UvcUsbError,
Some(Duration::from_secs(2).as_millis() as u64), Some(Duration::from_secs(2).as_millis() as u64),
), ),
); );
} else { } else {
warn!( warn!(
"Capture transient error ({}), closing stream for \ "Capture transient error ({}), closing stream for \
soft-restart", soft-restart",
e e
); );
}
stream = None;
continue;
} }
stream = None; CaptureIoErrorKind::DeviceLost => {
continue; error!("Capture device lost: {}", e);
let _ = pipeline.running.send(false);
pipeline.running_flag.store(false, Ordering::Release);
let _ = frame_seq_tx.send(sequence.wrapping_add(1));
break;
}
CaptureIoErrorKind::Other => {}
} }
let key = classify_capture_error(&e); let key = capture_error_log_key(&e);
if capture_error_throttler.should_log(&key) { if capture_error_throttler.should_log(&key) {
let suppressed = let suppressed =
suppressed_capture_errors.remove(&key).unwrap_or(0); suppressed_capture_errors.remove(&key).unwrap_or(0);

View File

@@ -23,6 +23,10 @@ use crate::events::{EventBus, SystemEvent};
use crate::stream::MjpegStreamHandler; use crate::stream::MjpegStreamHandler;
use crate::utils::LogThrottler; use crate::utils::LogThrottler;
use crate::video::capture_limits::{should_validate_jpeg_frame, MIN_CAPTURE_FRAME_SIZE}; use crate::video::capture_limits::{should_validate_jpeg_frame, MIN_CAPTURE_FRAME_SIZE};
use crate::video::capture_status::{
capture_error_log_key, classify_capture_io_error, signal_status_from_capture_kind,
CaptureIoErrorKind,
};
use crate::video::v4l2r_capture::{is_source_changed_error, BridgeContext, V4l2rCaptureStream}; use crate::video::v4l2r_capture::{is_source_changed_error, BridgeContext, V4l2rCaptureStream};
/// Streamer configuration /// Streamer configuration
@@ -907,8 +911,7 @@ impl Streamer {
// "no signal" overlay, update the state with the // "no signal" overlay, update the state with the
// fine-grained reason, and let the outer 'session // fine-grained reason, and let the outer 'session
// loop back off before the next retry. // loop back off before the next retry.
let status = crate::video::SignalStatus::from_str(&kind) let status = signal_status_from_capture_kind(&kind);
.unwrap_or(crate::video::SignalStatus::NoSignal);
debug!( debug!(
"CSI open probe reports no signal ({:?}), will soft-restart", "CSI open probe reports no signal ({:?}), will soft-restart",
status status
@@ -989,17 +992,6 @@ impl Streamer {
let capture_error_throttler = LogThrottler::with_secs(5); let capture_error_throttler = LogThrottler::with_secs(5);
let mut suppressed_capture_errors: HashMap<String, u64> = HashMap::new(); let mut suppressed_capture_errors: HashMap<String, u64> = HashMap::new();
let classify_capture_error = |err: &std::io::Error| -> String {
let message = err.to_string();
if message.contains("dqbuf failed") && message.contains("EINVAL") {
"capture_dqbuf_einval".to_string()
} else if message.contains("dqbuf failed") {
"capture_dqbuf".to_string()
} else {
format!("capture_{:?}", err.kind())
}
};
// None = signal is present; Some(Instant) = when signal was first lost. // None = signal is present; Some(Instant) = when signal was first lost.
let mut no_signal_since: Option<std::time::Instant> = None; let mut no_signal_since: Option<std::time::Instant> = None;
// Whether the inner 'capture loop should trigger a soft restart. // Whether the inner 'capture loop should trigger a soft restart.
@@ -1078,59 +1070,60 @@ impl Streamer {
// when the source glitches or re-locks; those are // when the source glitches or re-locks; those are
// treated as NoSignal + soft-restart so we recover // treated as NoSignal + soft-restart so we recover
// in ~1 s instead of the 1 s recovery-poll loop. // in ~1 s instead of the 1 s recovery-poll loop.
let os_err = e.raw_os_error(); match classify_capture_io_error(&e) {
let is_device_lost = matches!(os_err, Some(6) | Some(19) | Some(108)); CaptureIoErrorKind::DeviceLost => {
let is_transient_signal_error = error!("Video device lost: {} - {}", device_path.display(), e);
matches!(os_err, Some(5) | Some(32) | Some(71)); go_offline();
set_retry(0);
if is_device_lost { handle.block_on(async {
error!("Video device lost: {} - {}", device_path.display(), e); *self.last_lost_device.write().await =
go_offline(); Some(device_path.display().to_string());
set_retry(0); *self.last_lost_reason.write().await = Some(e.to_string());
handle.block_on(async {
*self.last_lost_device.write().await =
Some(device_path.display().to_string());
*self.last_lost_reason.write().await = Some(e.to_string());
});
set_state(StreamerState::DeviceLost);
handle.block_on(async {
let streamer = Arc::clone(&self);
tokio::spawn(async move {
streamer.start_device_recovery_internal().await;
}); });
}); set_state(StreamerState::DeviceLost);
break 'capture; handle.block_on(async {
} let streamer = Arc::clone(&self);
tokio::spawn(async move {
if is_transient_signal_error { streamer.start_device_recovery_internal().await;
if os_err == Some(71) { });
warn!("Capture transient error (EPROTO/-71, often UVC USB): {}", e); });
let is_uvc = break 'capture;
handle.block_on(async { }
CaptureIoErrorKind::TransientSignal { status } => {
if status == Some(crate::video::SignalStatus::UvcUsbError) {
warn!(
"Capture transient error (EPROTO/-71, often UVC USB): {}",
e
);
let is_uvc = handle.block_on(async {
self.current_device.read().await.as_ref().is_some_and(|d| { self.current_device.read().await.as_ref().is_some_and(|d| {
d.driver.eq_ignore_ascii_case("uvcvideo") d.driver.eq_ignore_ascii_case("uvcvideo")
}) })
}); });
if is_uvc { if is_uvc {
go_offline(); go_offline();
set_state(StreamerState::UvcUsbError); set_state(StreamerState::UvcUsbError);
need_soft_restart = true; need_soft_restart = true;
break 'capture; break 'capture;
} }
} else { } else {
warn!( warn!(
"Capture transient error ({}): treating as NoSignal + soft-restart", "Capture transient error ({}): treating as NoSignal + soft-restart",
e e
); );
}
set_retry(
backoff_secs(no_signal_restart_count).saturating_mul(1000),
);
go_offline();
set_state(StreamerState::NoSignal);
need_soft_restart = true;
break 'capture;
} }
set_retry(backoff_secs(no_signal_restart_count).saturating_mul(1000)); CaptureIoErrorKind::Other => {}
go_offline();
set_state(StreamerState::NoSignal);
need_soft_restart = true;
break 'capture;
} }
let key = classify_capture_error(&e); let key = capture_error_log_key(&e);
if capture_error_throttler.should_log(&key) { if capture_error_throttler.should_log(&key) {
let suppressed = suppressed_capture_errors.remove(&key).unwrap_or(0); let suppressed = suppressed_capture_errors.remove(&key).unwrap_or(0);
if suppressed > 0 { if suppressed > 0 {

View File

@@ -301,9 +301,8 @@ mod tests {
assert!(!is_h264_keyframe(&sps)); assert!(!is_h264_keyframe(&sps));
let multi_nal = vec![ let multi_nal = vec![
0x00, 0x00, 0x00, 0x01, 0x67, 0x42, 0x00, 0x1f, 0x00, 0x00, 0x00, 0x01, 0x67, 0x42, 0x00, 0x1f, 0x00, 0x00, 0x00, 0x01, 0x68, 0xce,
0x00, 0x00, 0x00, 0x01, 0x68, 0xce, 0x38, 0x80, 0x38, 0x80, 0x00, 0x00, 0x00, 0x01, 0x65, 0x88, 0x84,
0x00, 0x00, 0x00, 0x01, 0x65, 0x88, 0x84,
]; ];
assert!(is_h264_keyframe(&multi_nal)); assert!(is_h264_keyframe(&multi_nal));
} }

View File

@@ -55,9 +55,7 @@ impl VideoCodec {
VideoCodec::H264 => { VideoCodec::H264 => {
"level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f".to_string() "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f".to_string()
} }
VideoCodec::H265 => { VideoCodec::H265 => "level-id=180;profile-id=1;tier-flag=0;tx-mode=SRST".to_string(),
"level-id=180;profile-id=1;tier-flag=0;tx-mode=SRST".to_string()
}
VideoCodec::VP8 => String::new(), VideoCodec::VP8 => String::new(),
VideoCodec::VP9 => "profile-id=0".to_string(), VideoCodec::VP9 => "profile-id=0".to_string(),
} }

View File

@@ -11,9 +11,9 @@ use crate::error::{AppError, Result};
use crate::events::{EventBus, SystemEvent}; use crate::events::{EventBus, SystemEvent};
use crate::hid::HidController; use crate::hid::HidController;
use crate::video::types::{ use crate::video::types::{
BitratePreset, EncoderBackend, PixelFormat, Resolution, VideoCodecType, VideoEncoderType, BitratePreset, EncoderBackend, PipelineStateNotification, PixelFormat, Resolution,
PipelineStateNotification, SharedVideoPipeline, SharedVideoPipelineConfig, SharedVideoPipeline, SharedVideoPipelineConfig, SharedVideoPipelineStats, VideoCodecType,
SharedVideoPipelineStats, VideoEncoderType,
}; };
use super::config::{TurnServer, WebRtcConfig}; use super::config::{TurnServer, WebRtcConfig};
@@ -249,10 +249,7 @@ impl WebRtcStreamer {
}) })
} }
async fn reconnect_sessions_to_current_pipeline( async fn reconnect_sessions_to_current_pipeline(&self, reason: &str) -> Result<usize> {
&self,
reason: &str,
) -> Result<usize> {
if self.capture_device.read().await.is_none() { if self.capture_device.read().await.is_none() {
return Ok(0); return Ok(0);
} }
@@ -352,7 +349,8 @@ impl WebRtcStreamer {
if let Some(ref current) = *pipeline_guard { if let Some(ref current) = *pipeline_guard {
if let Some(stopped_pipeline) = pipeline_weak.upgrade() { if let Some(stopped_pipeline) = pipeline_weak.upgrade() {
if Arc::ptr_eq(current, &stopped_pipeline) { if Arc::ptr_eq(current, &stopped_pipeline) {
pending_geometry = stopped_pipeline.take_pending_sync_geometry(); pending_geometry =
stopped_pipeline.take_pending_sync_geometry();
*pipeline_guard = None; *pipeline_guard = None;
info!("Cleared stopped video pipeline reference"); info!("Cleared stopped video pipeline reference");
} }
@@ -414,9 +412,7 @@ impl WebRtcStreamer {
/// ///
/// This is a public wrapper around ensure_video_pipeline for external /// This is a public wrapper around ensure_video_pipeline for external
/// components (like RustDesk) that need to share the encoded video stream. /// components (like RustDesk) that need to share the encoded video stream.
pub async fn ensure_video_pipeline_for_external( pub async fn ensure_video_pipeline_for_external(&self) -> Result<Arc<SharedVideoPipeline>> {
&self,
) -> Result<Arc<SharedVideoPipeline>> {
self.ensure_video_pipeline().await self.ensure_video_pipeline().await
} }
@@ -608,11 +604,7 @@ impl WebRtcStreamer {
info!( info!(
"WebRTC config updated: {}x{} {:?} @ {} fps, {}", "WebRTC config updated: {}x{} {:?} @ {} fps, {}",
resolution.width, resolution.width, resolution.height, format, fps, config.bitrate_preset
resolution.height,
format,
fps,
config.bitrate_preset
); );
} }
@@ -1053,8 +1045,7 @@ impl WebRtcStreamer {
if reconnected > 0 { if reconnected > 0 {
info!( info!(
"Video pipeline restarted with {}, reconnected {} sessions", "Video pipeline restarted with {}, reconnected {} sessions",
preset, preset, reconnected
reconnected
); );
} }
} else { } else {
@@ -1086,8 +1077,14 @@ impl crate::video::traits::VideoOutput for WebRtcStreamer {
bridge_kind: Option<String>, bridge_kind: Option<String>,
v4l2_driver: Option<String>, v4l2_driver: Option<String>,
) { ) {
self.set_capture_device(device_path, jpeg_quality, subdev_path, bridge_kind, v4l2_driver) self.set_capture_device(
.await; device_path,
jpeg_quality,
subdev_path,
bridge_kind,
v4l2_driver,
)
.await;
} }
async fn current_video_codec(&self) -> VideoCodecType { async fn current_video_codec(&self) -> VideoCodecType {