fix:改进atx usb 继电器适配;修复 webrtc 无法建立连接问题;网页样式优化

This commit is contained in:
mofeng-git
2026-05-05 00:52:16 +08:00
parent 6723f432a3
commit c27d3a6703
27 changed files with 1388 additions and 709 deletions

View File

@@ -7,6 +7,7 @@ use gpio_cdev::{Chip, LineHandle, LineRequestFlags};
use serialport::SerialPort;
use std::fs::{File, OpenOptions};
use std::io::Write;
use std::os::fd::AsRawFd;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
@@ -18,6 +19,10 @@ use crate::error::{AppError, Result};
pub type SharedSerialHandle = Arc<Mutex<Box<dyn SerialPort>>>;
const USB_RELAY_MAX_CHANNEL: u8 = 8;
const USB_RELAY_REPORT_LEN: usize = 9;
const HIDIOCSFEATURE_9: libc::c_ulong = 0xC009_4806; // _IOC(_IOC_READ|_IOC_WRITE, 'H', 0x06, 9)
/// Timing constants for ATX operations
pub mod timing {
use std::time::Duration;
@@ -129,12 +134,23 @@ impl AtxKeyExecutor {
}
}
AtxDriverType::UsbRelay => {
if self.config.pin == 0 {
return Err(AppError::Config(
"USB relay channel must be 1-based (>= 1)".to_string(),
));
}
if self.config.pin > u8::MAX as u32 {
return Err(AppError::Config(format!(
"USB relay channel must be <= {}",
u8::MAX
)));
}
if self.config.pin > USB_RELAY_MAX_CHANNEL as u32 {
return Err(AppError::Config(format!(
"USB HID relay channel must be <= {}",
USB_RELAY_MAX_CHANNEL
)));
}
}
AtxDriverType::Gpio | AtxDriverType::None => {}
}
@@ -292,26 +308,64 @@ impl AtxKeyExecutor {
u8::MAX
))
})?;
if channel == 0 {
return Err(AppError::Config(
"USB relay channel must be 1-based (>= 1)".to_string(),
));
}
if channel > USB_RELAY_MAX_CHANNEL {
return Err(AppError::Config(format!(
"USB HID relay channel must be <= {}",
USB_RELAY_MAX_CHANNEL
)));
}
// Standard HID relay command format
let cmd = if on {
[0x00, channel + 1, 0xFF, 0x00, 0x00, 0x00, 0x00, 0x00]
} else {
[0x00, channel + 1, 0xFD, 0x00, 0x00, 0x00, 0x00, 0x00]
};
let cmd = Self::build_usb_relay_command(channel, on);
let mut guard = self.usb_relay_handle.lock().unwrap();
let device = guard
.as_mut()
.ok_or_else(|| AppError::Internal("USB relay not initialized".to_string()))?;
device
.write_all(&cmd)
.map_err(|e| AppError::Internal(format!("USB relay write failed: {}", e)))?;
if let Err(feature_err) = Self::send_usb_relay_feature_report(device, &cmd) {
debug!(
"USB relay feature report failed ({}), falling back to hidraw write",
feature_err
);
device.write_all(&cmd).map_err(|write_err| {
AppError::Internal(format!(
"USB relay feature report failed: {}; raw write failed: {}",
feature_err, write_err
))
})?;
device
.flush()
.map_err(|e| AppError::Internal(format!("USB relay flush failed: {}", e)))?;
}
Ok(())
}
fn build_usb_relay_command(channel: u8, on: bool) -> [u8; USB_RELAY_REPORT_LEN] {
let mut cmd = [0x00; USB_RELAY_REPORT_LEN];
cmd[1] = if on { 0xFF } else { 0xFD };
cmd[2] = channel;
cmd
}
fn send_usb_relay_feature_report(
device: &File,
report: &[u8; USB_RELAY_REPORT_LEN],
) -> std::io::Result<()> {
// Linux hidraw feature reports include the report ID as the first byte.
let rc = unsafe { libc::ioctl(device.as_raw_fd(), HIDIOCSFEATURE_9, report.as_ptr()) };
if rc < 0 {
Err(std::io::Error::last_os_error())
} else {
Ok(())
}
}
/// Pulse Serial relay
async fn pulse_serial(&self, duration: Duration) -> Result<()> {
info!(
@@ -367,6 +421,8 @@ impl AtxKeyExecutor {
port.write_all(&cmd)
.map_err(|e| AppError::Internal(format!("Serial relay write failed: {}", e)))?;
port.flush()
.map_err(|e| AppError::Internal(format!("Serial relay flush failed: {}", e)))?;
Ok(())
}
@@ -453,7 +509,7 @@ mod tests {
let config = AtxKeyConfig {
driver: AtxDriverType::UsbRelay,
device: "/dev/hidraw0".to_string(),
pin: 0,
pin: 1,
active_level: ActiveLevel::High, // Ignored for USB relay
baud_rate: 9600,
};
@@ -481,6 +537,18 @@ mod tests {
assert_eq!(timing::RESET_PRESS.as_millis(), 500);
}
#[test]
fn test_usb_relay_command_format() {
assert_eq!(
AtxKeyExecutor::build_usb_relay_command(1, true),
[0x00, 0xFF, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]
);
assert_eq!(
AtxKeyExecutor::build_usb_relay_command(1, false),
[0x00, 0xFD, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]
);
}
#[tokio::test]
async fn test_executor_init_rejects_serial_channel_zero() {
let config = AtxKeyConfig {
@@ -495,6 +563,34 @@ mod tests {
assert!(matches!(err, AppError::Config(_)));
}
#[tokio::test]
async fn test_executor_init_rejects_usb_relay_channel_zero() {
let config = AtxKeyConfig {
driver: AtxDriverType::UsbRelay,
device: "/dev/hidraw0".to_string(),
pin: 0,
active_level: ActiveLevel::High,
baud_rate: 9600,
};
let mut executor = AtxKeyExecutor::new(config);
let err = executor.init().await.unwrap_err();
assert!(matches!(err, AppError::Config(_)));
}
#[tokio::test]
async fn test_executor_init_rejects_usb_relay_channel_overflow() {
let config = AtxKeyConfig {
driver: AtxDriverType::UsbRelay,
device: "/dev/hidraw0".to_string(),
pin: USB_RELAY_MAX_CHANNEL as u32 + 1,
active_level: ActiveLevel::High,
baud_rate: 9600,
};
let mut executor = AtxKeyExecutor::new(config);
let err = executor.init().await.unwrap_err();
assert!(matches!(err, AppError::Config(_)));
}
#[tokio::test]
async fn test_executor_init_rejects_serial_channel_overflow() {
let config = AtxKeyConfig {

View File

@@ -15,6 +15,7 @@
//!
//! - **GPIO**: Uses Linux GPIO character device (/dev/gpiochipX) for direct hardware control
//! - **USB Relay**: Uses HID USB relay modules for isolated switching
//! - **Serial Relay**: Uses LCUS-style serial relay modules
//!
//! # Example
//!
@@ -59,9 +60,25 @@ pub use types::{
};
pub use wol::send_wol;
fn hidraw_uevent_is_usb_relay(uevent: &str) -> bool {
let upper = uevent.to_ascii_uppercase();
upper.contains("000016C0:000005DF")
|| upper.contains("16C0:05DF")
|| upper.contains("PRODUCT=16C0/5DF")
|| upper.contains("USBRELAY")
|| upper.contains("USB RELAY")
}
fn is_usb_relay_hidraw(name: &str) -> bool {
let uevent_path = format!("/sys/class/hidraw/{}/device/uevent", name);
std::fs::read_to_string(uevent_path)
.map(|uevent| hidraw_uevent_is_usb_relay(&uevent))
.unwrap_or(false)
}
/// Discover available ATX devices on the system
///
/// Scans for GPIO chips and USB HID relay devices in a single pass.
/// Scans for GPIO chips, LCUS USB HID relay devices, and serial relay ports.
pub fn discover_devices() -> AtxDevices {
let mut devices = AtxDevices::default();
@@ -72,7 +89,7 @@ pub fn discover_devices() -> AtxDevices {
let name_str = name.to_string_lossy();
if name_str.starts_with("gpiochip") {
devices.gpio_chips.push(format!("/dev/{}", name_str));
} else if name_str.starts_with("hidraw") {
} else if name_str.starts_with("hidraw") && is_usb_relay_hidraw(&name_str) {
devices.usb_relays.push(format!("/dev/{}", name_str));
} else if name_str.starts_with("ttyUSB") || name_str.starts_with("ttyACM") {
devices.serial_ports.push(format!("/dev/{}", name_str));
@@ -96,6 +113,20 @@ mod tests {
let _devices = discover_devices();
}
#[test]
fn test_hidraw_uevent_detects_usb_relay_id() {
assert!(hidraw_uevent_is_usb_relay(
"HID_ID=0003:000016C0:000005DF\nHID_NAME=www.dcttech.com USBRelay2\n"
));
}
#[test]
fn test_hidraw_uevent_rejects_unrelated_hid() {
assert!(!hidraw_uevent_is_usb_relay(
"HID_ID=0003:0000046D:0000C534\nHID_NAME=Logitech USB Receiver\n"
));
}
#[test]
fn test_module_exports() {
// Verify all public exports are accessible

View File

@@ -61,7 +61,7 @@ pub struct AtxKeyConfig {
pub device: String,
/// Pin or channel number:
/// - For GPIO: GPIO pin number
/// - For USB Relay: relay channel (0-based)
/// - For USB Relay: relay channel (1-based)
/// - For Serial Relay (LCUS): relay channel (1-based)
pub pin: u32,
/// Active level (only applicable to GPIO, ignored for USB Relay)

View File

@@ -17,7 +17,7 @@ use super::encoder::{OpusConfig, OpusFrame};
use super::monitor::AudioHealthMonitor;
use super::streamer::{AudioStreamState, AudioStreamer, AudioStreamerConfig};
use crate::error::{AppError, Result};
use crate::events::{EventBus, SystemEvent};
use crate::events::{EventBus, StreamDeviceLostKind, SystemEvent};
const AUDIO_RECOVERY_RETRY_DELAY: Duration = Duration::from_secs(1);
@@ -165,6 +165,7 @@ impl AudioController {
) {
if let Some(ref bus) = *event_bus.read().await {
bus.publish(SystemEvent::StreamDeviceLost {
kind: StreamDeviceLostKind::Audio,
device: device.to_string(),
reason: reason.to_string(),
});

View File

@@ -6,7 +6,7 @@ use self::types::EXACT_EVENT_TOPICS;
pub use types::{
AtxDeviceInfo, AudioDeviceInfo, ClientStats, HidDeviceInfo, LedState, MsdDeviceInfo,
SystemEvent, TtydDeviceInfo, VideoDeviceInfo,
StreamDeviceLostKind, SystemEvent, TtydDeviceInfo, VideoDeviceInfo,
};
use tokio::sync::broadcast;

View File

@@ -79,6 +79,14 @@ pub struct ClientStats {
pub connected_secs: u64,
}
/// Video vs audio source for [`SystemEvent::StreamDeviceLost`] (WebSocket `stream.device_lost`).
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum StreamDeviceLostKind {
Video,
Audio,
}
/// JSON: `{"event": "<name>", "data": { ... }}`.
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(tag = "event", content = "data")]
@@ -119,7 +127,11 @@ pub enum SystemEvent {
},
#[serde(rename = "stream.device_lost")]
StreamDeviceLost { device: String, reason: String },
StreamDeviceLost {
kind: StreamDeviceLostKind,
device: String,
reason: String,
},
#[serde(rename = "stream.reconnecting")]
StreamReconnecting { device: String, attempt: u32 },
@@ -255,6 +267,19 @@ mod tests {
assert_eq!(event.event_name(), "stream.state_changed");
}
#[test]
fn stream_device_lost_json_snake_case_kind() {
let event = SystemEvent::StreamDeviceLost {
kind: StreamDeviceLostKind::Audio,
device: "hw:0,0".to_string(),
reason: "test".to_string(),
};
let v = serde_json::to_value(&event).unwrap();
let data = v.get("data").unwrap();
assert_eq!(data.get("kind").and_then(|x| x.as_str()), Some("audio"));
assert_eq!(data.get("device").and_then(|x| x.as_str()), Some("hw:0,0"));
}
#[test]
fn exact_topics_covers_all_variants() {
use std::collections::HashSet;
@@ -283,6 +308,7 @@ mod tests {
fps: 0,
},
SystemEvent::StreamDeviceLost {
kind: StreamDeviceLostKind::Video,
device: String::new(),
reason: String::new(),
},

View File

@@ -8,6 +8,9 @@ use std::time::{Duration, Instant};
use tokio::sync::broadcast;
use tracing::{debug, info, warn};
/// Generation token paired with `client_id` so [`unregister_client`] ignores stale drops.
pub type ClientGeneration = u64;
use crate::video::encoder::traits::{Encoder, EncoderConfig};
use crate::video::encoder::JpegEncoder;
use crate::video::format::PixelFormat;
@@ -18,6 +21,7 @@ pub type ClientId = String;
#[derive(Debug, Clone)]
pub struct ClientSession {
pub id: ClientId,
pub generation: ClientGeneration,
pub connected_at: Instant,
pub last_activity: Instant,
pub frames_sent: u64,
@@ -25,10 +29,11 @@ pub struct ClientSession {
}
impl ClientSession {
pub fn new(id: ClientId) -> Self {
pub fn new(id: ClientId, generation: ClientGeneration) -> Self {
let now = Instant::now();
Self {
id,
generation,
connected_at: now,
last_activity: now,
frames_sent: 0,
@@ -45,7 +50,6 @@ impl ClientSession {
pub struct FpsCalculator {
frame_times: VecDeque<Instant>,
window: Duration,
count_in_window: usize,
}
impl FpsCalculator {
@@ -53,28 +57,26 @@ impl FpsCalculator {
Self {
frame_times: VecDeque::with_capacity(120),
window: Duration::from_secs(1),
count_in_window: 0,
}
}
pub fn record_frame(&mut self) {
let now = Instant::now();
self.frame_times.push_back(now);
let cutoff = now - self.window;
while let Some(&oldest) = self.frame_times.front() {
if oldest < cutoff {
self.frame_times.pop_front();
} else {
break;
}
}
self.count_in_window = self.frame_times.len();
self.prune(now);
}
pub fn current_fps(&self) -> u32 {
self.count_in_window as u32
/// Rolling-window FPS sample count (~1s).
pub fn current_fps(&mut self) -> u32 {
self.prune(Instant::now());
self.frame_times.len() as u32
}
fn prune(&mut self, now: Instant) {
let cutoff = now - self.window;
while matches!(self.frame_times.front(), Some(&t) if t < cutoff) {
self.frame_times.pop_front();
}
}
}
@@ -101,6 +103,7 @@ pub struct MjpegStreamHandler {
online: AtomicBool,
sequence: AtomicU64,
clients: ParkingRwLock<HashMap<ClientId, ClientSession>>,
next_generation: AtomicU64,
auto_pause_config: ParkingRwLock<AutoPauseConfig>,
last_frame_ts: ParkingRwLock<Option<Instant>>,
dropped_same_frames: AtomicU64,
@@ -122,6 +125,7 @@ impl MjpegStreamHandler {
online: AtomicBool::new(false),
sequence: AtomicU64::new(0),
clients: ParkingRwLock::new(HashMap::new()),
next_generation: AtomicU64::new(1),
jpeg_encoder: ParkingMutex::new(None),
auto_pause_config: ParkingRwLock::new(AutoPauseConfig::default()),
last_frame_ts: ParkingRwLock::new(None),
@@ -292,18 +296,26 @@ impl MjpegStreamHandler {
self.clients.read().len() as u64
}
pub fn register_client(&self, client_id: ClientId) {
let session = ClientSession::new(client_id.clone());
/// Connects `client_id`; return value must be passed to [`unregister_client`].
pub fn register_client(&self, client_id: ClientId) -> ClientGeneration {
let generation = self.next_generation.fetch_add(1, Ordering::Relaxed);
let session = ClientSession::new(client_id.clone(), generation);
self.clients.write().insert(client_id.clone(), session);
info!(
"Client {} connected (total: {})",
client_id,
self.client_count()
);
generation
}
pub fn unregister_client(&self, client_id: &str) {
if let Some(session) = self.clients.write().remove(client_id) {
pub fn unregister_client(&self, client_id: &str, expected_generation: ClientGeneration) {
let mut clients = self.clients.write();
match clients.get(client_id) {
Some(session) if session.generation == expected_generation => {}
_ => return,
}
if let Some(session) = clients.remove(client_id) {
let duration = session.connected_elapsed();
let duration_secs = duration.as_secs_f32();
let avg_fps = if duration_secs > 0.1 {
@@ -327,9 +339,12 @@ impl MjpegStreamHandler {
}
pub fn get_clients_stat(&self) -> HashMap<String, crate::events::types::ClientStats> {
// write() because `current_fps()` mutates the underlying VecDeque
// to prune stale samples. Held for ~microseconds, called once per
// second by the stats broadcaster.
self.clients
.read()
.iter()
.write()
.iter_mut()
.map(|(id, session)| {
(
id.clone(),
@@ -379,13 +394,18 @@ impl MjpegStreamHandler {
pub struct ClientGuard {
client_id: ClientId,
generation: ClientGeneration,
handler: Arc<MjpegStreamHandler>,
}
impl ClientGuard {
pub fn new(client_id: ClientId, handler: Arc<MjpegStreamHandler>) -> Self {
handler.register_client(client_id.clone());
Self { client_id, handler }
let generation = handler.register_client(client_id.clone());
Self {
client_id,
generation,
handler,
}
}
pub fn id(&self) -> &ClientId {
@@ -395,7 +415,8 @@ impl ClientGuard {
impl Drop for ClientGuard {
fn drop(&mut self) {
self.handler.unregister_client(&self.client_id);
self.handler
.unregister_client(&self.client_id, self.generation);
}
}
@@ -525,6 +546,41 @@ mod tests {
calc.record_frame();
calc.record_frame();
assert!(calc.frame_times.len() == 3);
assert_eq!(calc.current_fps(), 3);
assert_eq!(calc.frame_times.len(), 3);
}
#[test]
fn test_fps_calculator_decays_without_new_frames() {
let mut calc = FpsCalculator::new();
calc.window = Duration::from_millis(50);
calc.record_frame();
calc.record_frame();
assert_eq!(calc.current_fps(), 2);
std::thread::sleep(Duration::from_millis(80));
assert_eq!(calc.current_fps(), 0);
assert!(calc.frame_times.is_empty());
}
#[test]
fn test_client_guard_generation_isolation() {
let handler = Arc::new(MjpegStreamHandler::new());
let id = "shared-id".to_string();
let stale = ClientGuard::new(id.clone(), handler.clone());
let stale_gen = stale.generation;
let fresh = ClientGuard::new(id.clone(), handler.clone());
assert_ne!(stale_gen, fresh.generation);
assert_eq!(handler.client_count(), 1);
drop(stale);
assert_eq!(handler.client_count(), 1);
drop(fresh);
assert_eq!(handler.client_count(), 0);
}
}

View File

@@ -20,7 +20,7 @@ use super::format::{PixelFormat, Resolution};
use super::frame::{FrameBuffer, FrameBufferPool, VideoFrame};
use super::is_csi_hdmi_bridge;
use crate::error::{AppError, Result};
use crate::events::{EventBus, SystemEvent};
use crate::events::{EventBus, StreamDeviceLostKind, SystemEvent};
use crate::stream::MjpegStreamHandler;
use crate::utils::LogThrottler;
use crate::video::capture_limits::{should_validate_jpeg_frame, MIN_CAPTURE_FRAME_SIZE};
@@ -1417,6 +1417,7 @@ impl Streamer {
// Publish device lost event
self.publish_event(SystemEvent::StreamDeviceLost {
kind: StreamDeviceLostKind::Video,
device: device.clone(),
reason: reason.clone(),
})

View File

@@ -34,7 +34,7 @@ pub async fn update_stream_config(
&state,
&old_stream_config,
&new_stream_config,
ConfigApplyOptions::forced(),
ConfigApplyOptions::default(),
)
.await?;

View File

@@ -509,6 +509,12 @@ impl AtxConfigUpdate {
}
crate::atx::AtxDriverType::UsbRelay => {
if let Some(pin) = key.pin {
if pin == 0 {
return Err(AppError::BadRequest(format!(
"{} USB relay channel must be 1-based (>= 1)",
name
)));
}
if pin > u8::MAX as u32 {
return Err(AppError::BadRequest(format!(
"{} USB relay channel must be <= {}",
@@ -516,6 +522,12 @@ impl AtxConfigUpdate {
u8::MAX
)));
}
if pin > 8 {
return Err(AppError::BadRequest(format!(
"{} USB HID relay channel must be <= 8",
name
)));
}
}
}
crate::atx::AtxDriverType::Gpio | crate::atx::AtxDriverType::None => {}
@@ -551,6 +563,12 @@ impl AtxConfigUpdate {
}
}
crate::atx::AtxDriverType::UsbRelay => {
if key.pin == 0 {
return Err(AppError::BadRequest(format!(
"{} USB relay channel must be 1-based (>= 1)",
name
)));
}
if key.pin > u8::MAX as u32 {
return Err(AppError::BadRequest(format!(
"{} USB relay channel must be <= {}",
@@ -558,6 +576,12 @@ impl AtxConfigUpdate {
u8::MAX
)));
}
if key.pin > 8 {
return Err(AppError::BadRequest(format!(
"{} USB HID relay channel must be <= 8",
name
)));
}
}
crate::atx::AtxDriverType::Gpio | crate::atx::AtxDriverType::None => {}
}

View File

@@ -1493,12 +1493,8 @@ pub async fn mjpeg_stream(
handler.clone(),
));
// Use bounded channel (capacity=1) to implement backpressure
// This ensures record_frame_sent() is only called when the previous frame
// has been successfully consumed by the HTTP client
let (tx, mut rx) = tokio::sync::mpsc::channel::<bytes::Bytes>(1);
// Spawn background task to send frames to channel
let guard_clone = guard.clone();
let handler_clone = handler.clone();
tokio::spawn(async move {
@@ -1593,20 +1589,19 @@ pub async fn mjpeg_stream(
}
}
// Guard is automatically dropped here
});
// Create stream that receives from channel
// Record FPS after yield - this is closer to actual TCP send than tx.send()
// Create stream that receives from channel and forwards to the HTTP
// body. Record FPS *before* yield so the final frame of a session
// still gets counted (after-yield code in async_stream! only runs
// when the consumer polls again, which never happens for the last
// frame of a closing connection).
let handler_for_stream = handler.clone();
let guard_for_stream = guard.clone();
let body_stream = async_stream::stream! {
// Consume from channel - this drives the backpressure
while let Some(data) = rx.recv().await {
yield Ok::<bytes::Bytes, std::io::Error>(data);
// Record FPS after yield - data has been handed to Axum/hyper
// This is closer to actual TCP send than recording at tx.send()
handler_for_stream.record_frame_sent(guard_for_stream.id());
yield Ok::<bytes::Bytes, std::io::Error>(data);
}
};

View File

@@ -12,6 +12,7 @@ use webrtc::data_channel::data_channel_message::DataChannelMessage;
use webrtc::data_channel::RTCDataChannel;
use webrtc::ice::mdns::MulticastDnsMode;
use webrtc::ice_transport::ice_candidate::RTCIceCandidate;
use webrtc::ice_transport::ice_connection_state::RTCIceConnectionState;
use webrtc::ice_transport::ice_server::RTCIceServer;
use webrtc::interceptor::registry::Registry;
use webrtc::peer_connection::configuration::RTCConfiguration;
@@ -30,14 +31,12 @@ use super::signaling::{ConnectionState, IceCandidate, SdpAnswer, SdpOffer};
use super::video_track::{UniversalVideoTrack, UniversalVideoTrackConfig, VideoCodec};
use crate::audio::OpusFrame;
use crate::error::{AppError, Result};
use crate::events::{EventBus, SystemEvent};
use crate::hid::datachannel::{parse_hid_message, HidChannelEvent};
use crate::hid::HidController;
use crate::video::types::{
BitratePreset, EncodedVideoFrame, PixelFormat, Resolution, VideoEncoderType,
};
use std::sync::atomic::AtomicBool;
use webrtc::ice_transport::ice_gatherer_state::RTCIceGathererState;
const MIME_TYPE_H265: &str = "video/H265";
@@ -140,7 +139,6 @@ pub struct UniversalSession {
state_rx: watch::Receiver<ConnectionState>,
ice_candidates: Arc<Mutex<Vec<IceCandidate>>>,
hid_controller: Option<Arc<HidController>>,
event_bus: Option<Arc<EventBus>>,
video_receiver_handle: Mutex<Option<tokio::task::JoinHandle<()>>>,
audio_receiver_handle: Mutex<Option<tokio::task::JoinHandle<()>>>,
fps: u32,
@@ -150,7 +148,7 @@ impl UniversalSession {
pub async fn new(
config: UniversalSessionConfig,
session_id: String,
event_bus: Option<Arc<EventBus>>,
_event_bus: Option<Arc<crate::events::EventBus>>,
) -> Result<Self> {
info!(
"Creating {} session: {} @ {}x{} (audio={})",
@@ -338,7 +336,6 @@ impl UniversalSession {
state_rx,
ice_candidates: Arc::new(Mutex::new(vec![])),
hid_controller: None,
event_bus,
video_receiver_handle: Mutex::new(None),
audio_receiver_handle: Mutex::new(None),
fps: config.fps,
@@ -353,8 +350,6 @@ impl UniversalSession {
let state = self.state.clone();
let session_id = self.session_id.clone();
let codec = self.codec;
let event_bus = self.event_bus.clone();
self.pc
.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| {
let state = state.clone();
@@ -372,42 +367,49 @@ impl UniversalSession {
};
info!("{} session {} state: {}", codec, session_id, new_state);
if matches!(
(*state.borrow(), new_state),
(
ConnectionState::Connected,
ConnectionState::New | ConnectionState::Connecting
)
) {
return;
}
let _ = state.send(new_state);
})
}));
let state_for_ice = self.state.clone();
let session_id_ice = self.session_id.clone();
self.pc
.on_ice_connection_state_change(Box::new(move |state| {
.on_ice_connection_state_change(Box::new(move |ice_state| {
let state = state_for_ice.clone();
let session_id = session_id_ice.clone();
Box::pin(async move {
info!("[ICE] Session {} connection state: {:?}", session_id, state);
})
}));
info!(
"[ICE] Session {} connection state: {:?}",
session_id, ice_state
);
let session_id_gather = self.session_id.clone();
let event_bus_gather = event_bus.clone();
self.pc
.on_ice_gathering_state_change(Box::new(move |state| {
let session_id = session_id_gather.clone();
let event_bus = event_bus_gather.clone();
Box::pin(async move {
if matches!(state, RTCIceGathererState::Complete) {
if let Some(bus) = event_bus.as_ref() {
bus.publish(SystemEvent::WebRTCIceComplete { session_id });
let new_state = match ice_state {
RTCIceConnectionState::Connected | RTCIceConnectionState::Completed => {
ConnectionState::Connected
}
}
RTCIceConnectionState::Disconnected => ConnectionState::Disconnected,
RTCIceConnectionState::Failed => ConnectionState::Failed,
RTCIceConnectionState::Closed => ConnectionState::Closed,
_ => return,
};
let _ = state.send(new_state);
})
}));
let ice_candidates = self.ice_candidates.clone();
let session_id_candidate = self.session_id.clone();
let event_bus_candidate = event_bus.clone();
self.pc
.on_ice_candidate(Box::new(move |candidate: Option<RTCIceCandidate>| {
let ice_candidates = ice_candidates.clone();
let session_id = session_id_candidate.clone();
let event_bus = event_bus_candidate.clone();
Box::pin(async move {
if let Some(c) = candidate {
@@ -430,14 +432,6 @@ impl UniversalSession {
let mut candidates = ice_candidates.lock().await;
candidates.push(candidate.clone());
drop(candidates);
if let Some(bus) = event_bus.as_ref() {
bus.publish(SystemEvent::WebRTCIceCandidate {
session_id,
candidate: serde_json::to_value(&candidate)
.unwrap_or(serde_json::Value::Null),
});
}
}
})
}));
@@ -660,9 +654,21 @@ impl UniversalSession {
.await;
let _ = send_in_flight;
if send_result.is_ok() {
frames_sent += 1;
last_sequence = Some(encoded_frame.sequence);
match send_result {
Ok(()) => {
frames_sent += 1;
last_sequence = Some(encoded_frame.sequence);
}
Err(e) => {
warn!(
"Session {} failed to write video frame: sequence={}, keyframe={}, bytes={}, error={}",
session_id,
encoded_frame.sequence,
encoded_frame.is_keyframe,
encoded_frame.data.len(),
e
);
}
}
}
}
@@ -810,25 +816,14 @@ impl UniversalSession {
}
}
let mut gather_complete = self.pc.gathering_complete_promise().await;
self.pc
.set_local_description(answer.clone())
.await
.map_err(|e| AppError::VideoError(format!("Failed to set local description: {}", e)))?;
const ICE_GATHER_TIMEOUT: Duration = Duration::from_millis(2500);
if tokio::time::timeout(ICE_GATHER_TIMEOUT, gather_complete.recv())
.await
.is_err()
{
debug!(
"ICE gathering timeout after {:?} for session {}",
ICE_GATHER_TIMEOUT, self.session_id
);
}
tokio::time::sleep(Duration::from_millis(500)).await;
let candidates = self.ice_candidates.lock().await.clone();
Ok(SdpAnswer::with_candidates(answer.sdp, candidates))
}
@@ -842,10 +837,16 @@ impl UniversalSession {
username_fragment: candidate.username_fragment,
};
self.pc
.add_ice_candidate(init)
.await
.map_err(|e| AppError::VideoError(format!("Failed to add ICE candidate: {}", e)))?;
if let Err(e) = self.pc.add_ice_candidate(init).await {
warn!(
"[ICE] Session {} failed to add remote candidate: {}",
self.session_id, e
);
return Err(AppError::VideoError(format!(
"Failed to add ICE candidate: {}",
e
)));
}
Ok(())
}

View File

@@ -14,7 +14,7 @@ use webrtc::track::track_local::{TrackLocal, TrackLocalWriter};
// rtp `HevcPayloader` mishandles AP+IDR and NAL 20 (`IDR_N_LP`).
use super::h265_payloader::H265Payloader;
use crate::error::Result;
use crate::error::{AppError, Result};
use crate::video::types::Resolution;
const RTP_MTU: usize = 1200;
@@ -250,6 +250,10 @@ impl UniversalVideoTrack {
TrackType::Sample(track) => {
if let Err(e) = track.write_sample(&sample).await {
debug!("H264 write_sample failed: {}", e);
return Err(AppError::WebRtcError(format!(
"H264 write_sample failed: {}",
e
)));
}
}
TrackType::Rtp(_) => {
@@ -276,6 +280,10 @@ impl UniversalVideoTrack {
TrackType::Sample(track) => {
if let Err(e) = track.write_sample(&sample).await {
debug!("VP8 write_sample failed: {}", e);
return Err(AppError::WebRtcError(format!(
"VP8 write_sample failed: {}",
e
)));
}
}
TrackType::Rtp(_) => {
@@ -298,6 +306,10 @@ impl UniversalVideoTrack {
TrackType::Sample(track) => {
if let Err(e) = track.write_sample(&sample).await {
debug!("VP9 write_sample failed: {}", e);
return Err(AppError::WebRtcError(format!(
"VP9 write_sample failed: {}",
e
)));
}
}
TrackType::Rtp(_) => {
@@ -366,6 +378,10 @@ impl UniversalVideoTrack {
if let Err(e) = rtp_track.write_rtp(&packet).await {
trace!("H265 write_rtp failed: {}", e);
return Err(AppError::WebRtcError(format!(
"H265 write_rtp failed: {}",
e
)));
}
}

View File

@@ -9,7 +9,7 @@ use tracing::{debug, info, trace, warn};
use crate::audio::{AudioController, OpusFrame};
use crate::error::{AppError, Result};
use crate::events::{EventBus, SystemEvent};
use crate::events::{EventBus, StreamDeviceLostKind, SystemEvent};
use crate::hid::HidController;
use crate::video::device::{
enumerate_devices, select_recovery_device, VideoDevice, VideoDeviceRecoveryHint,
@@ -352,6 +352,7 @@ impl WebRtcStreamer {
);
streamer
.publish_stream_event(SystemEvent::StreamDeviceLost {
kind: StreamDeviceLostKind::Video,
device: original_device.clone(),
reason: reason.clone(),
})