feat(webrtc): 添加公共ICE服务器支持和优化HID延迟

- 重构ICE配置:将TURN配置改为统一的ICE配置,支持STUN和多TURN URL
- 添加公共ICE服务器:类似RustDesk,用户留空时使用编译时配置的公共服务器
- 优化DataChannel HID消息:使用tokio::spawn立即处理,避免依赖webrtc-rs轮询
- 添加WebRTCReady事件:客户端等待此事件后再建立连接
- 初始化时启动音频流,确保WebRTC可订阅
- 移除多余的trace/debug日志减少开销
- 更新前端配置界面支持公共ICE服务器显示
This commit is contained in:
mofeng-git
2026-01-04 15:06:08 +08:00
parent 0c82d1a840
commit 9ab3d052f9
24 changed files with 766 additions and 258 deletions

View File

@@ -2,6 +2,63 @@
use serde::{Deserialize, Serialize};
use crate::secrets;
/// Public ICE server utilities
pub mod public_ice {
use super::*;
/// Check if public ICE servers are configured (at compile time)
pub fn is_configured() -> bool {
secrets::ice::is_configured()
}
/// Check if public TURN servers are configured (requires credentials)
pub fn has_turn() -> bool {
secrets::ice::has_turn()
}
/// Get the public STUN server URL
pub fn stun_server() -> Option<String> {
let server = secrets::ice::STUN_SERVER;
if server.is_empty() {
None
} else {
Some(server.to_string())
}
}
/// Get public TURN servers as TurnServer structs
pub fn turn_servers() -> Vec<TurnServer> {
if !secrets::ice::has_turn() {
return vec![];
}
let urls: Vec<String> = secrets::ice::TURN_URLS
.split(',')
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.collect();
if urls.is_empty() {
return vec![];
}
vec![TurnServer {
urls,
username: secrets::ice::TURN_USERNAME.to_string(),
credential: secrets::ice::TURN_PASSWORD.to_string(),
}]
}
/// Get all public ICE servers (STUN + TURN) for WebRTC configuration
pub fn get_all_servers() -> (Vec<String>, Vec<TurnServer>) {
let stun_servers = stun_server().into_iter().collect();
let turn_servers = turn_servers();
(stun_servers, turn_servers)
}
}
/// WebRTC configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WebRtcConfig {
@@ -46,14 +103,26 @@ impl Default for WebRtcConfig {
/// TURN server configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TurnServer {
/// TURN server URL (e.g., "turn:turn.example.com:3478")
pub url: String,
/// TURN server URLs (e.g., ["turn:turn.example.com:3478?transport=udp", "turn:turn.example.com:3478?transport=tcp"])
/// Multiple URLs allow fallback between UDP and TCP transports
pub urls: Vec<String>,
/// Username for TURN authentication
pub username: String,
/// Credential for TURN authentication
pub credential: String,
}
impl TurnServer {
/// Create a TurnServer with a single URL (for backwards compatibility)
pub fn new(url: String, username: String, credential: String) -> Self {
Self {
urls: vec![url],
username,
credential,
}
}
}
/// Video codec preference
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]

View File

@@ -78,7 +78,7 @@ impl PeerConnection {
for turn in &config.turn_servers {
ice_servers.push(RTCIceServer {
urls: vec![turn.url.clone()],
urls: turn.urls.clone(),
username: turn.username.clone(),
credential: turn.credential.clone(),
..Default::default()
@@ -207,10 +207,11 @@ impl PeerConnection {
*data_channel.write().await = Some(dc.clone());
// Set up message handler with HID processing
// Immediately spawn task in tokio runtime for low latency
dc.on_message(Box::new(move |msg: DataChannelMessage| {
let hid = hid.clone();
Box::pin(async move {
tokio::spawn(async move {
debug!("DataChannel HID message: {} bytes", msg.data.len());
// Parse and process HID message
@@ -233,7 +234,10 @@ impl PeerConnection {
}
}
}
})
});
// Return empty future (actual work is spawned above)
Box::pin(async {})
}));
})
}));
@@ -432,11 +436,10 @@ impl PeerConnectionManager {
// Add video track
peer.add_video_track(VideoTrackConfig::default()).await?;
// Create data channel and set HID controller
// Set HID controller if available
// Note: We DON'T create a data channel here - the frontend creates it.
// The server receives it via on_data_channel callback set in set_hid_controller().
if self.config.enable_datachannel {
peer.create_data_channel("hid").await?;
// Set HID controller if available
if let Some(ref hid) = self.hid_controller {
peer.set_hid_controller(hid.clone());
}

View File

@@ -250,13 +250,13 @@ impl UniversalSession {
// Skip TURN servers without credentials (webrtc-rs requires them)
if turn.username.is_empty() || turn.credential.is_empty() {
warn!(
"Skipping TURN server {} - credentials required but missing",
turn.url
"Skipping TURN server {:?} - credentials required but missing",
turn.urls
);
continue;
}
ice_servers.push(RTCIceServer {
urls: vec![turn.url.clone()],
urls: turn.urls.clone(),
username: turn.username.clone(),
credential: turn.credential.clone(),
..Default::default()
@@ -424,7 +424,9 @@ impl UniversalSession {
dc.on_message(Box::new(move |msg: DataChannelMessage| {
let hid = hid.clone();
Box::pin(async move {
// Immediately spawn task in tokio runtime for low latency
// Don't rely on webrtc-rs to poll the returned Future
tokio::spawn(async move {
if let Some(event) = parse_hid_message(&msg.data) {
match event {
HidChannelEvent::Keyboard(kb_event) => {
@@ -444,7 +446,10 @@ impl UniversalSession {
}
}
}
})
});
// Return empty future (actual work is spawned above)
Box::pin(async {})
}));
})
}));
@@ -654,12 +659,6 @@ impl UniversalSession {
}
} else {
packets_sent += 1;
trace!(
"Session {} sent audio packet {}: {} bytes",
session_id,
packets_sent,
opus_frame.data.len()
);
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {

View File

@@ -35,7 +35,7 @@
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{broadcast, RwLock};
use tracing::{debug, error, info, warn};
use tracing::{debug, error, info, trace, warn};
use crate::audio::shared_pipeline::{SharedAudioPipeline, SharedAudioPipelineConfig};
use crate::audio::{AudioController, OpusFrame};
@@ -315,9 +315,11 @@ impl WebRtcStreamer {
}
drop(pipeline_guard);
// Clear video frame source to signal upstream to stop
*streamer.video_frame_tx.write().await = None;
info!("Cleared video frame source");
// NOTE: Don't clear video_frame_tx here!
// The frame source is managed by stream_manager and should
// remain available for new sessions. Only stream_manager
// should clear it during mode switches.
info!("Video pipeline stopped, but keeping frame source for new sessions");
}
break;
}
@@ -512,13 +514,37 @@ impl WebRtcStreamer {
/// Update video configuration
///
/// This will restart the encoding pipeline and close all sessions.
/// Only restarts the encoding pipeline if configuration actually changed.
/// This allows multiple consumers (WebRTC, RustDesk) to share the same pipeline
/// without interrupting each other when they call this method with the same config.
pub async fn update_video_config(
&self,
resolution: Resolution,
format: PixelFormat,
fps: u32,
) {
// Check if configuration actually changed
let config = self.config.read().await;
let config_changed = config.resolution != resolution
|| config.input_format != format
|| config.fps != fps;
drop(config);
if !config_changed {
// Configuration unchanged, no need to restart pipeline
trace!(
"Video config unchanged: {}x{} {:?} @ {} fps",
resolution.width, resolution.height, format, fps
);
return;
}
// Configuration changed, restart pipeline
info!(
"Video config changed, restarting pipeline: {}x{} {:?} @ {} fps",
resolution.width, resolution.height, format, fps
);
// Stop existing pipeline
if let Some(ref pipeline) = *self.video_pipeline.read().await {
pipeline.stop();
@@ -598,6 +624,8 @@ impl WebRtcStreamer {
///
/// Note: Changes take effect for new sessions only.
/// Existing sessions need to be reconnected to use the new ICE config.
///
/// If both stun_server and turn_server are empty/None, uses public ICE servers.
pub async fn update_ice_config(
&self,
stun_server: Option<String>,
@@ -607,32 +635,49 @@ impl WebRtcStreamer {
) {
let mut config = self.config.write().await;
// Update STUN servers
// Clear existing servers
config.webrtc.stun_servers.clear();
if let Some(ref stun) = stun_server {
if !stun.is_empty() {
config.webrtc.stun_servers.push(stun.clone());
info!("WebRTC STUN server updated: {}", stun);
}
}
// Update TURN servers
config.webrtc.turn_servers.clear();
if let Some(ref turn) = turn_server {
if !turn.is_empty() {
let username = turn_username.unwrap_or_default();
let credential = turn_password.unwrap_or_default();
config.webrtc.turn_servers.push(TurnServer {
url: turn.clone(),
username: username.clone(),
credential,
});
info!("WebRTC TURN server updated: {} (user: {})", turn, username);
}
}
if config.webrtc.stun_servers.is_empty() && config.webrtc.turn_servers.is_empty() {
info!("WebRTC ICE config cleared - only host candidates will be used");
// Check if user configured custom servers
let has_custom_stun = stun_server.as_ref().map(|s| !s.is_empty()).unwrap_or(false);
let has_custom_turn = turn_server.as_ref().map(|s| !s.is_empty()).unwrap_or(false);
// If no custom servers, use public ICE servers (like RustDesk)
if !has_custom_stun && !has_custom_turn {
use crate::webrtc::config::public_ice;
if public_ice::is_configured() {
if let Some(stun) = public_ice::stun_server() {
config.webrtc.stun_servers.push(stun.clone());
info!("Using public STUN server: {}", stun);
}
for turn in public_ice::turn_servers() {
info!("Using public TURN server: {:?}", turn.urls);
config.webrtc.turn_servers.push(turn);
}
} else {
info!("No public ICE servers configured, using host candidates only");
}
} else {
// Use custom servers
if let Some(ref stun) = stun_server {
if !stun.is_empty() {
config.webrtc.stun_servers.push(stun.clone());
info!("Using custom STUN server: {}", stun);
}
}
if let Some(ref turn) = turn_server {
if !turn.is_empty() {
let username = turn_username.unwrap_or_default();
let credential = turn_password.unwrap_or_default();
config.webrtc.turn_servers.push(TurnServer::new(
turn.clone(),
username.clone(),
credential,
));
info!("Using custom TURN server: {} (user: {})", turn, username);
}
}
}
}
@@ -670,15 +715,14 @@ impl WebRtcStreamer {
let mut session = UniversalSession::new(session_config.clone(), session_id.clone()).await?;
// Set HID controller if available
// Note: We DON'T create a data channel here - the frontend creates it.
// The server only receives it via on_data_channel callback set in set_hid_controller().
// If server also created a channel, frontend's ondatachannel would overwrite its
// own channel with server's, but server's channel has no message handler!
if let Some(ref hid) = *self.hid_controller.read().await {
session.set_hid_controller(hid.clone());
}
// Create data channel
if self.config.read().await.webrtc.enable_datachannel {
session.create_data_channel("hid").await?;
}
let session = Arc::new(session);
// Subscribe to video pipeline frames
@@ -901,9 +945,16 @@ impl WebRtcStreamer {
/// Set bitrate using preset
///
/// Note: Hardware encoders (VAAPI, NVENC, etc.) don't support dynamic bitrate changes.
/// This method restarts the pipeline to apply the new bitrate.
/// This method restarts the pipeline to apply the new bitrate only if the preset actually changed.
pub async fn set_bitrate_preset(self: &Arc<Self>, preset: BitratePreset) -> Result<()> {
// Update config first
// Check if preset actually changed
let current_preset = self.config.read().await.bitrate_preset;
if current_preset == preset {
trace!("Bitrate preset unchanged: {}", preset);
return Ok(());
}
// Update config
self.config.write().await.bitrate_preset = preset;
// Check if pipeline exists and is running