feat: 完善 USB UVC 设备异常处理,添加 USB 设备复位功能

This commit is contained in:
mofeng-git
2026-04-27 16:37:04 +08:00
parent 9065e01225
commit 07b982d1d2
14 changed files with 631 additions and 33 deletions

View File

@@ -470,13 +470,25 @@ async fn main() -> anyhow::Result<()> {
.update_video_config(actual_resolution, actual_format, actual_fps)
.await;
if let Some(device_path) = device_path {
let (subdev_path, bridge_kind) = streamer
let (subdev_path, bridge_kind, v4l2_driver) = streamer
.current_device()
.await
.map(|d| (d.subdev_path.clone(), d.bridge_kind.clone()))
.unwrap_or((None, None));
.map(|d| {
(
d.subdev_path.clone(),
d.bridge_kind.clone(),
Some(d.driver.clone()),
)
})
.unwrap_or((None, None, None));
webrtc_streamer
.set_capture_device(device_path, jpeg_quality, subdev_path, bridge_kind)
.set_capture_device(
device_path,
jpeg_quality,
subdev_path,
bridge_kind,
v4l2_driver,
)
.await;
tracing::info!("WebRTC streamer configured for direct capture");
} else {

View File

@@ -13,6 +13,7 @@ pub mod frame;
pub mod shared_video_pipeline;
pub mod stream_manager;
pub mod streamer;
pub mod usb_reset;
pub mod v4l2r_capture;
pub use convert::{PixelConverter, Yuv420pBuffer};
@@ -41,6 +42,10 @@ pub enum SignalStatus {
OutOfRange,
/// Generic "no usable source" (fallback for EINVAL / EIO / unknown errnos).
NoSignal,
/// UVC/USB isochronous protocol error (common kernel: status -71 / userspace EPROTO).
UvcUsbError,
/// UVC capture stalled (repeated DQBUF timeouts; often cable, hub, or controller load).
UvcCaptureStall,
}
impl SignalStatus {
@@ -50,6 +55,8 @@ impl SignalStatus {
SignalStatus::NoSync => "no_sync",
SignalStatus::OutOfRange => "out_of_range",
SignalStatus::NoSignal => "no_signal",
SignalStatus::UvcUsbError => "uvc_usb_error",
SignalStatus::UvcCaptureStall => "uvc_capture_stall",
}
}
@@ -59,6 +66,8 @@ impl SignalStatus {
"no_sync" => SignalStatus::NoSync,
"out_of_range" => SignalStatus::OutOfRange,
"no_signal" => SignalStatus::NoSignal,
"uvc_usb_error" => SignalStatus::UvcUsbError,
"uvc_capture_stall" => SignalStatus::UvcCaptureStall,
_ => return None,
})
}
@@ -71,6 +80,8 @@ impl From<SignalStatus> for streamer::StreamerState {
SignalStatus::NoSync => streamer::StreamerState::NoSync,
SignalStatus::OutOfRange => streamer::StreamerState::OutOfRange,
SignalStatus::NoSignal => streamer::StreamerState::NoSignal,
SignalStatus::UvcUsbError => streamer::StreamerState::UvcUsbError,
SignalStatus::UvcCaptureStall => streamer::StreamerState::UvcCaptureStall,
}
}
}

View File

@@ -415,6 +415,11 @@ impl SharedVideoPipeline {
if !should_emit {
return;
}
tracing::debug!(
"Pipeline state notification: state={}, reason={:?}",
notification.state,
notification.reason
);
if let Some(notifier) = self.state_notifier.read().clone() {
notifier(notification);
}
@@ -542,6 +547,7 @@ impl SharedVideoPipeline {
_jpeg_quality: u8,
subdev_path: Option<std::path::PathBuf>,
bridge_kind: Option<String>,
_v4l2_driver: Option<String>,
) -> Result<()> {
if *self.running_rx.borrow() {
warn!("Pipeline already running");
@@ -982,7 +988,6 @@ impl SharedVideoPipeline {
let meta = match next_result {
Ok(meta) => {
consecutive_timeouts = 0;
pipeline.notify_state(PipelineStateNotification::streaming());
meta
}
Err(e) => {
@@ -1100,6 +1105,12 @@ impl SharedVideoPipeline {
closing stream for soft-restart",
consecutive_timeouts
);
pipeline.notify_state(
PipelineStateNotification::no_signal(
SignalStatus::UvcCaptureStall,
Some(Duration::from_secs(2).as_millis() as u64),
),
);
stream = None;
continue;
}
@@ -1123,17 +1134,29 @@ impl SharedVideoPipeline {
}
} else {
consecutive_timeouts = 0;
// EIO (5) / EPIPE (32) in next_into generally
// mean the source glitched mid-stream.
// Tear down the stream and let the open loop
// re-probe via DV_TIMINGS — same logic as
// timeouts, just triggered earlier.
if matches!(e.raw_os_error(), Some(5) | Some(32)) {
warn!(
"Capture transient error ({}), closing stream for \
soft-restart",
e
);
// EIO (5) / EPIPE (32) / EPROTO (71) in next_into generally
// mean the source or UVC USB transport glitched mid-stream.
// Tear down the stream and let the open loop re-probe.
let os = e.raw_os_error();
if matches!(os, Some(5) | Some(32) | Some(71)) {
if os == Some(71) {
warn!(
"Capture transient error (EPROTO/-71, often UVC USB): {} — soft-restart",
e
);
pipeline.notify_state(
PipelineStateNotification::no_signal(
SignalStatus::UvcUsbError,
Some(Duration::from_secs(2).as_millis() as u64),
),
);
} else {
warn!(
"Capture transient error ({}), closing stream for \
soft-restart",
e
);
}
stream = None;
continue;
}
@@ -1172,6 +1195,11 @@ impl SharedVideoPipeline {
}
owned.truncate(frame_size);
// Notify streaming only after frame validation passes —
// stale/warm-up frames from V4L2 kernel queues can cause
// DQBUF Ok with invalid data, which would prematurely
// clear the frontend error overlay.
pipeline.notify_state(PipelineStateNotification::streaming());
let frame = Arc::new(VideoFrame::from_pooled(
Arc::new(FrameBuffer::new(owned, Some(buffer_pool.clone()))),
resolution,

View File

@@ -356,14 +356,26 @@ impl VideoStreamManager {
// Resolve the paired subdev so the WebRTC pipeline can run the
// RK628 STREAMON gate + SOURCE_CHANGE polling identically to the
// MJPEG path. See `csi_bridge::discover_subdev_for_video`.
let (subdev_path, bridge_kind) = self
let (subdev_path, bridge_kind, v4l2_driver) = self
.streamer
.current_device()
.await
.map(|d| (d.subdev_path.clone(), d.bridge_kind.clone()))
.unwrap_or((None, None));
.map(|d| {
(
d.subdev_path.clone(),
d.bridge_kind.clone(),
Some(d.driver.clone()),
)
})
.unwrap_or((None, None, None));
self.webrtc_streamer
.set_capture_device(device_path, jpeg_quality, subdev_path, bridge_kind)
.set_capture_device(
device_path,
jpeg_quality,
subdev_path,
bridge_kind,
v4l2_driver,
)
.await;
} else {
warn!("No capture device configured while syncing WebRTC capture source");
@@ -559,14 +571,26 @@ impl VideoStreamManager {
}
if let Some(device_path) = device_path {
info!("Configuring direct capture for WebRTC after config change");
let (subdev_path, bridge_kind) = self
let (subdev_path, bridge_kind, v4l2_driver) = self
.streamer
.current_device()
.await
.map(|d| (d.subdev_path.clone(), d.bridge_kind.clone()))
.unwrap_or((None, None));
.map(|d| {
(
d.subdev_path.clone(),
d.bridge_kind.clone(),
Some(d.driver.clone()),
)
})
.unwrap_or((None, None, None));
self.webrtc_streamer
.set_capture_device(device_path, jpeg_quality, subdev_path, bridge_kind)
.set_capture_device(
device_path,
jpeg_quality,
subdev_path,
bridge_kind,
v4l2_driver,
)
.await;
} else {
warn!("No capture device configured for WebRTC after config change");

View File

@@ -71,6 +71,10 @@ pub enum StreamerState {
NoSync,
/// Source timings are outside of what the capture hardware supports (ERANGE)
OutOfRange,
/// UVC/USB isochronous protocol error (kernel EPROTO/-71)
UvcUsbError,
/// UVC capture stalled (repeated DQBUF timeouts)
UvcCaptureStall,
/// Error occurred
Error,
/// Device was lost (unplugged)
@@ -90,6 +94,8 @@ impl StreamerState {
StreamerState::NoCable => "no_cable",
StreamerState::NoSync => "no_sync",
StreamerState::OutOfRange => "out_of_range",
StreamerState::UvcUsbError => "uvc_usb_error",
StreamerState::UvcCaptureStall => "uvc_capture_stall",
StreamerState::Error => "error",
StreamerState::DeviceLost => "device_lost",
StreamerState::Recovering => "recovering",
@@ -107,6 +113,8 @@ impl StreamerState {
"no_cable" => StreamerState::NoCable,
"no_sync" => StreamerState::NoSync,
"out_of_range" => StreamerState::OutOfRange,
"uvc_usb_error" => StreamerState::UvcUsbError,
"uvc_capture_stall" => StreamerState::UvcCaptureStall,
"error" => StreamerState::Error,
"device_lost" => StreamerState::DeviceLost,
"recovering" => StreamerState::Recovering,
@@ -122,6 +130,8 @@ impl StreamerState {
| StreamerState::NoCable
| StreamerState::NoSync
| StreamerState::OutOfRange
| StreamerState::UvcUsbError
| StreamerState::UvcCaptureStall
)
}
@@ -135,6 +145,8 @@ impl StreamerState {
StreamerState::NoCable => ("no_signal", Some("no_cable")),
StreamerState::NoSync => ("no_signal", Some("no_sync")),
StreamerState::OutOfRange => ("no_signal", Some("out_of_range")),
StreamerState::UvcUsbError => ("no_signal", Some("uvc_usb_error")),
StreamerState::UvcCaptureStall => ("no_signal", Some("uvc_capture_stall")),
StreamerState::DeviceLost => ("device_lost", Some("device_lost")),
StreamerState::Recovering => ("device_lost", Some("recovering")),
StreamerState::Busy => ("device_busy", None),
@@ -1076,7 +1088,8 @@ impl Streamer {
// in ~1 s instead of the 1 s recovery-poll loop.
let os_err = e.raw_os_error();
let is_device_lost = matches!(os_err, Some(6) | Some(19) | Some(108));
let is_transient_signal_error = matches!(os_err, Some(5) | Some(32));
let is_transient_signal_error =
matches!(os_err, Some(5) | Some(32) | Some(71));
if is_device_lost {
error!("Video device lost: {} - {}", device_path.display(), e);
@@ -1098,10 +1111,28 @@ impl Streamer {
}
if is_transient_signal_error {
warn!(
"Capture transient error ({}): treating as NoSignal + soft-restart",
e
);
if os_err == Some(71) {
warn!(
"Capture transient error (EPROTO/-71, often UVC USB): {}",
e
);
let is_uvc = handle.block_on(async {
self.current_device.read().await.as_ref().is_some_and(
|d| d.driver.eq_ignore_ascii_case("uvcvideo"),
)
});
if is_uvc {
go_offline();
set_state(StreamerState::UvcUsbError);
need_soft_restart = true;
break 'capture;
}
} else {
warn!(
"Capture transient error ({}): treating as NoSignal + soft-restart",
e
);
}
set_retry(
backoff_secs(no_signal_restart_count).saturating_mul(1000),
);

205
src/video/usb_reset.rs Normal file
View File

@@ -0,0 +1,205 @@
//! USB device enumeration and reset via sysfs `authorized`.
//!
//! Provides APIs for the settings page to list and reset USB devices.
//! Requires write access to `/sys/bus/usb/devices/.../authorized` (typically root).
use std::io;
use std::path::{Path, PathBuf};
use std::time::{Duration, Instant};
/// Walk up from a V4L sysfs `device` link until we find a USB device node
/// (`busnum` + `devnum` present).
fn usb_device_dir_for_v4l_sysfs(device_link: &Path) -> io::Result<PathBuf> {
let mut p = device_link.canonicalize()?;
loop {
if p.join("busnum").is_file() && p.join("devnum").is_file() {
return Ok(p);
}
p = p
.parent()
.ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "no USB parent in sysfs"))?
.to_path_buf();
if p.as_os_str().is_empty() || p == Path::new("/") {
return Err(io::Error::new(
io::ErrorKind::NotFound,
"reached sysfs root without USB device",
));
}
}
}
// ---------------------------------------------------------------------------
// USB device enumeration & reset-by-bus/dev (for the settings API)
// ---------------------------------------------------------------------------
use serde::Serialize;
/// Information about a single USB device, read from `/sys/bus/usb/devices/`.
#[derive(Debug, Serialize)]
pub struct UsbDeviceInfo {
/// USB bus number (`busnum` sysfs attribute).
pub bus_num: u32,
/// USB device number on the bus (`devnum` sysfs attribute).
pub dev_num: u32,
/// Vendor ID hex string, e.g. `"1d6b"`.
pub id_vendor: String,
/// Product ID hex string, e.g. `"0002"`.
pub id_product: String,
/// Product name from sysfs `product`.
#[serde(skip_serializing_if = "Option::is_none")]
pub product: Option<String>,
/// Manufacturer name from sysfs `manufacturer`.
#[serde(skip_serializing_if = "Option::is_none")]
pub manufacturer: Option<String>,
/// Speed in Mbps from sysfs `speed`, e.g. `"480"`.
#[serde(skip_serializing_if = "Option::is_none")]
pub speed: Option<String>,
/// `true` if authorized=1, `false` if authorized=0, `None` if no file.
#[serde(skip_serializing_if = "Option::is_none")]
pub authorized: Option<bool>,
/// Kernel driver bound to this device (from driver symlink).
#[serde(skip_serializing_if = "Option::is_none")]
pub driver: Option<String>,
/// Associated `/dev/videoN` node, if this USB device has a V4L2 child.
#[serde(skip_serializing_if = "Option::is_none")]
pub video_device: Option<String>,
}
/// Read a sysfs string attribute, trimming trailing newline.
fn read_sysfs_str(dir: &Path, attr: &str) -> Option<String> {
std::fs::read_to_string(dir.join(attr))
.ok()
.map(|s| s.trim_end().to_string())
}
/// Read a sysfs u32 attribute.
fn read_sysfs_u32(dir: &Path, attr: &str) -> Option<u32> {
read_sysfs_str(dir, attr).and_then(|s| s.parse().ok())
}
/// Build a map from USB sysfs dir → video device name by scanning
/// `/sys/class/video4linux/`.
fn build_usb_to_video_map() -> std::collections::HashMap<String, String> {
let mut map = std::collections::HashMap::new();
let v4l_class = Path::new("/sys/class/video4linux");
let entries = match std::fs::read_dir(v4l_class) {
Ok(e) => e,
Err(_) => return map,
};
for entry in entries.flatten() {
let name = entry.file_name();
let name_str = match name.to_str() {
Some(s) if s.starts_with("video") => s,
_ => continue,
};
// Resolve the device symlink and walk up to find the USB parent
let device_link = v4l_class.join(name_str).join("device");
if let Ok(usb_dir) = usb_device_dir_for_v4l_sysfs(&device_link) {
if let Some(key) = usb_dir.file_name().and_then(|k| k.to_str()) {
map.insert(key.to_string(), format!("/dev/{}", name_str));
}
}
}
map
}
/// List all USB devices visible in `/sys/bus/usb/devices/`.
pub fn list_usb_devices() -> Vec<UsbDeviceInfo> {
let usb_bus = Path::new("/sys/bus/usb/devices");
let entries = match std::fs::read_dir(usb_bus) {
Ok(e) => e,
Err(_) => return vec![],
};
let video_map = build_usb_to_video_map();
let mut devices: Vec<UsbDeviceInfo> = entries
.flatten()
.filter_map(|entry| {
let dir = entry.path();
// Only consider entries that have busnum + devnum (actual devices, not interfaces)
let bus_num = read_sysfs_u32(&dir, "busnum")?;
let dev_num = read_sysfs_u32(&dir, "devnum")?;
let id_vendor = read_sysfs_str(&dir, "idVendor").unwrap_or_default();
let id_product = read_sysfs_str(&dir, "idProduct").unwrap_or_default();
let product = read_sysfs_str(&dir, "product");
let manufacturer = read_sysfs_str(&dir, "manufacturer");
let speed = read_sysfs_str(&dir, "speed");
let authorized = if dir.join("authorized").exists() {
read_sysfs_str(&dir, "authorized")
.and_then(|s| s.trim().parse::<u8>().ok())
.map(|v| v != 0)
} else {
None
};
let driver = std::fs::read_link(dir.join("driver"))
.ok()
.and_then(|p| p.file_name().map(|f| f.to_string_lossy().to_string()));
let dir_name = dir.file_name()?.to_str()?.to_string();
let video_device = video_map.get(&dir_name).cloned();
Some(UsbDeviceInfo {
bus_num,
dev_num,
id_vendor,
id_product,
product,
manufacturer,
speed,
authorized,
driver,
video_device,
})
})
.collect();
// Sort by bus, then device number for stable ordering.
devices.sort_by(|a, b| (a.bus_num, a.dev_num).cmp(&(b.bus_num, b.dev_num)));
devices
}
/// Reset a USB device identified by bus/dev numbers via the `authorized` sysfs
/// attribute. After re-authorizing, waits for the device to reappear.
pub fn reset_usb_device(bus_num: u32, dev_num: u32) -> io::Result<()> {
let usb_bus = Path::new("/sys/bus/usb/devices");
let entries = std::fs::read_dir(usb_bus)?;
for entry in entries.flatten() {
let dir = entry.path();
if read_sysfs_u32(&dir, "busnum") != Some(bus_num)
|| read_sysfs_u32(&dir, "devnum") != Some(dev_num)
{
continue;
}
let authorized = dir.join("authorized");
if !authorized.exists() {
return Err(io::Error::new(
io::ErrorKind::NotFound,
format!("device {bus_num}-{dev_num} has no authorized attribute"),
));
}
std::fs::write(&authorized, b"0")?;
std::thread::sleep(Duration::from_millis(300));
std::fs::write(&authorized, b"1")?;
// Wait for device to reappear
let wait_until = Instant::now() + Duration::from_secs(2);
while !dir.join("busnum").exists() {
if Instant::now() >= wait_until {
break;
}
std::thread::sleep(Duration::from_millis(50));
}
return Ok(());
}
Err(io::Error::new(
io::ErrorKind::NotFound,
format!("USB device {bus_num}-{dev_num} not found in sysfs"),
))
}

View File

@@ -3,8 +3,11 @@
//! Provides API endpoints for discovering available hardware devices.
use axum::Json;
use serde::Deserialize;
use crate::atx::{discover_devices, AtxDevices};
use crate::error::{AppError, Result};
use crate::video::usb_reset;
/// GET /api/devices/atx - List available ATX devices
///
@@ -12,3 +15,35 @@ use crate::atx::{discover_devices, AtxDevices};
pub async fn list_atx_devices() -> Json<AtxDevices> {
Json(discover_devices())
}
/// GET /api/devices/usb - List all USB devices
///
/// Enumerates USB devices from `/sys/bus/usb/devices/` with associated
/// video device mappings.
pub async fn list_usb_devices() -> Json<Vec<usb_reset::UsbDeviceInfo>> {
Json(usb_reset::list_usb_devices())
}
#[derive(Deserialize)]
pub struct UsbResetRequest {
pub bus_num: u32,
pub dev_num: u32,
}
/// POST /api/devices/usb/reset - Reset a USB device via authorized cycle
///
/// Writes `0` then `1` to the device's `authorized` sysfs attribute,
/// causing the kernel to deauthorize and re-authorize the device.
/// Requires root or write access to sysfs.
pub async fn reset_usb_device(Json(req): Json<UsbResetRequest>) -> Result<Json<serde_json::Value>> {
usb_reset::reset_usb_device(req.bus_num, req.dev_num).map_err(|e| {
AppError::VideoError(format!(
"USB reset failed for device {}-{}: {}",
req.bus_num, req.dev_num, e
))
})?;
Ok(Json(serde_json::json!({
"success": true,
"message": format!("USB device {}-{} reset successfully", req.bus_num, req.dev_num)
})))
}

View File

@@ -177,6 +177,11 @@ pub fn create_router(state: Arc<AppState>) -> Router {
.route("/atx/wol/history", get(handlers::atx_wol_history))
// Device discovery endpoints
.route("/devices/atx", get(handlers::devices::list_atx_devices))
.route("/devices/usb", get(handlers::devices::list_usb_devices))
.route(
"/devices/usb/reset",
post(handlers::devices::reset_usb_device),
)
// Extension management endpoints
.route("/extensions", get(handlers::extensions::list_extensions))
.route("/extensions/{id}", get(handlers::extensions::get_extension))

View File

@@ -96,6 +96,8 @@ pub struct CaptureDeviceConfig {
pub jpeg_quality: u8,
pub subdev_path: Option<PathBuf>,
pub bridge_kind: Option<String>,
/// V4L2 driver name (e.g. `uvcvideo`) for UVC-specific recovery hints.
pub v4l2_driver: Option<String>,
}
/// WebRTC streamer statistics
@@ -382,6 +384,7 @@ impl WebRtcStreamer {
device.jpeg_quality,
device.subdev_path,
device.bridge_kind,
device.v4l2_driver,
)
.await?;
} else {
@@ -575,10 +578,11 @@ impl WebRtcStreamer {
jpeg_quality: u8,
subdev_path: Option<PathBuf>,
bridge_kind: Option<String>,
v4l2_driver: Option<String>,
) {
info!(
"Setting direct capture device for WebRTC: {:?} (subdev={:?}, kind={:?})",
device_path, subdev_path, bridge_kind
"Setting direct capture device for WebRTC: {:?} (subdev={:?}, kind={:?}, driver={:?})",
device_path, subdev_path, bridge_kind, v4l2_driver
);
*self.capture_device.write().await = Some(CaptureDeviceConfig {
device_path,
@@ -586,6 +590,7 @@ impl WebRtcStreamer {
jpeg_quality,
subdev_path,
bridge_kind,
v4l2_driver,
});
}