mirror of
https://github.com/mofeng-git/One-KVM.git
synced 2026-03-15 07:26:44 +08:00
perf(rustdesk): 优化视频流性能和修复管道重启问题
- 使用 bounded channel(4) 替代 unbounded channel 提供背压控制 - 配置 protobuf 使用 bytes::Bytes 类型实现零拷贝 - 添加 encode_frame_bytes_zero_copy 方法避免帧数据拷贝 - 预分配 128KB 发送缓冲区减少内存分配 - 添加 write_frame_buffered 函数复用缓冲区 - 修复视频管道重启后 RustDesk 连接不恢复的问题 - 实现双层循环自动重新订阅新管道 - 修复 WebRTC set_bitrate_preset 中 video_frame_tx 被清除的问题 - 删除冗余的 RegisterPeer 日志
This commit is contained in:
@@ -12,7 +12,7 @@ use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
|
||||
|
||||
use bytes::Bytes;
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use parking_lot::RwLock;
|
||||
use prost::Message as ProstMessage;
|
||||
use tokio::net::TcpStream;
|
||||
@@ -25,7 +25,7 @@ use crate::video::encoder::registry::{EncoderRegistry, VideoEncoderType};
|
||||
use crate::video::encoder::BitratePreset;
|
||||
use crate::video::stream_manager::VideoStreamManager;
|
||||
|
||||
use super::bytes_codec::{read_frame, write_frame};
|
||||
use super::bytes_codec::{read_frame, write_frame, write_frame_buffered};
|
||||
use super::config::RustDeskConfig;
|
||||
use super::crypto::{self, decrypt_symmetric_key_msg, KeyPair, SigningKeyPair};
|
||||
use super::frame_adapters::{VideoCodec, VideoFrameAdapter};
|
||||
@@ -145,7 +145,7 @@ pub struct Connection {
|
||||
/// Negotiated video codec (after client capability exchange)
|
||||
negotiated_codec: Option<VideoEncoderType>,
|
||||
/// Video frame sender for restarting video after codec switch
|
||||
video_frame_tx: Option<mpsc::UnboundedSender<Bytes>>,
|
||||
video_frame_tx: Option<mpsc::Sender<Bytes>>,
|
||||
/// Input event throttler to prevent HID device EAGAIN errors
|
||||
input_throttler: InputThrottler,
|
||||
/// Last measured round-trip delay in milliseconds (for TestDelay responses)
|
||||
@@ -263,8 +263,8 @@ impl Connection {
|
||||
info!("Sending SignedId with device_id={}", self.device_id);
|
||||
self.send_framed_arc(&writer, &signed_id_bytes).await?;
|
||||
|
||||
// Channel for receiving video frames to send
|
||||
let (video_tx, mut video_rx) = mpsc::unbounded_channel::<Bytes>();
|
||||
// Channel for receiving video frames to send (bounded to provide backpressure)
|
||||
let (video_tx, mut video_rx) = mpsc::channel::<Bytes>(4);
|
||||
let mut video_streaming = false;
|
||||
|
||||
// Timer for sending TestDelay to measure round-trip latency
|
||||
@@ -272,6 +272,10 @@ impl Connection {
|
||||
let mut test_delay_interval = tokio::time::interval(Duration::from_secs(1));
|
||||
test_delay_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
|
||||
|
||||
// Pre-allocated buffer for framing (reused across sends to reduce allocations)
|
||||
// Typical H264 frame is 10-100KB, pre-allocate 128KB
|
||||
let mut frame_buf = BytesMut::with_capacity(128 * 1024);
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
// Read framed message from client using RustDesk's variable-length encoding
|
||||
@@ -295,8 +299,23 @@ impl Connection {
|
||||
}
|
||||
|
||||
// Send video frames (encrypted if session key is set)
|
||||
// Optimized path: inline encryption and use pre-allocated buffer
|
||||
Some(frame_data) = video_rx.recv() => {
|
||||
if let Err(e) = self.send_encrypted_arc(&writer, &frame_data).await {
|
||||
let send_result = if let Some(ref key) = self.session_key {
|
||||
// Encrypt the frame
|
||||
self.enc_seqnum += 1;
|
||||
let nonce = Self::get_nonce(self.enc_seqnum);
|
||||
let ciphertext = secretbox::seal(&frame_data, &nonce, key);
|
||||
// Send using pre-allocated buffer
|
||||
let mut w = writer.lock().await;
|
||||
write_frame_buffered(&mut *w, &ciphertext, &mut frame_buf).await
|
||||
} else {
|
||||
// No encryption, send plain
|
||||
let mut w = writer.lock().await;
|
||||
write_frame_buffered(&mut *w, &frame_data, &mut frame_buf).await
|
||||
};
|
||||
|
||||
if let Err(e) = send_result {
|
||||
error!("Error sending video frame: {}", e);
|
||||
break;
|
||||
}
|
||||
@@ -368,7 +387,7 @@ impl Connection {
|
||||
&mut self,
|
||||
data: &[u8],
|
||||
writer: &Arc<Mutex<OwnedWriteHalf>>,
|
||||
video_tx: &mpsc::UnboundedSender<Bytes>,
|
||||
video_tx: &mpsc::Sender<Bytes>,
|
||||
video_streaming: &mut bool,
|
||||
) -> anyhow::Result<()> {
|
||||
// Try to decrypt if we have a session key
|
||||
@@ -677,7 +696,7 @@ impl Connection {
|
||||
}
|
||||
|
||||
/// Start video streaming task
|
||||
fn start_video_streaming(&mut self, video_tx: mpsc::UnboundedSender<Bytes>) {
|
||||
fn start_video_streaming(&mut self, video_tx: mpsc::Sender<Bytes>) {
|
||||
let video_manager = match &self.video_manager {
|
||||
Some(vm) => vm.clone(),
|
||||
None => {
|
||||
@@ -1284,10 +1303,13 @@ impl ConnectionManager {
|
||||
/// This function subscribes to the shared video encoding pipeline (used by WebRTC)
|
||||
/// and forwards encoded frames to the RustDesk client. This avoids duplicate encoding
|
||||
/// when both WebRTC and RustDesk clients are connected.
|
||||
///
|
||||
/// When the pipeline is restarted (e.g., due to bitrate/codec change), this function
|
||||
/// will automatically re-subscribe to the new pipeline.
|
||||
async fn run_video_streaming(
|
||||
conn_id: u32,
|
||||
video_manager: Arc<VideoStreamManager>,
|
||||
video_tx: mpsc::UnboundedSender<Bytes>,
|
||||
video_tx: mpsc::Sender<Bytes>,
|
||||
state: Arc<RwLock<ConnectionState>>,
|
||||
shutdown_tx: broadcast::Sender<()>,
|
||||
negotiated_codec: VideoEncoderType,
|
||||
@@ -1309,30 +1331,7 @@ async fn run_video_streaming(
|
||||
// Continue anyway, will use whatever codec the pipeline already has
|
||||
}
|
||||
|
||||
// Subscribe to the shared video encoding pipeline
|
||||
// This uses the same encoder as WebRTC, avoiding duplicate encoding
|
||||
let mut encoded_frame_rx = match video_manager.subscribe_encoded_frames().await {
|
||||
Some(rx) => rx,
|
||||
None => {
|
||||
warn!("No encoded frame source available for video streaming");
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
// Get encoding config for logging
|
||||
if let Some(config) = video_manager.get_encoding_config().await {
|
||||
info!(
|
||||
"RustDesk connection {} using shared video pipeline: {:?} {}x{} @ {}",
|
||||
conn_id,
|
||||
config.output_codec,
|
||||
config.resolution.width,
|
||||
config.resolution.height,
|
||||
config.bitrate_preset
|
||||
);
|
||||
}
|
||||
|
||||
// Create video frame adapter for RustDesk protocol
|
||||
// Use the negotiated codec for the adapter
|
||||
let codec = match negotiated_codec {
|
||||
VideoEncoderType::H264 => VideoCodec::H264,
|
||||
VideoEncoderType::H265 => VideoCodec::H265,
|
||||
@@ -1347,54 +1346,91 @@ async fn run_video_streaming(
|
||||
|
||||
info!("Started shared video streaming for connection {} (codec: {:?})", conn_id, codec);
|
||||
|
||||
loop {
|
||||
// Check if connection is still active
|
||||
// Outer loop: handles pipeline restarts by re-subscribing
|
||||
'subscribe_loop: loop {
|
||||
// Check if connection is still active before subscribing
|
||||
if *state.read() != ConnectionState::Active {
|
||||
debug!("Connection {} no longer active, stopping video", conn_id);
|
||||
break;
|
||||
}
|
||||
|
||||
tokio::select! {
|
||||
biased;
|
||||
// Subscribe to the shared video encoding pipeline
|
||||
let mut encoded_frame_rx = match video_manager.subscribe_encoded_frames().await {
|
||||
Some(rx) => rx,
|
||||
None => {
|
||||
// Pipeline not ready yet, wait and retry
|
||||
debug!("No encoded frame source available for connection {}, retrying...", conn_id);
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
continue 'subscribe_loop;
|
||||
}
|
||||
};
|
||||
|
||||
_ = shutdown_rx.recv() => {
|
||||
debug!("Shutdown signal received, stopping video for connection {}", conn_id);
|
||||
break;
|
||||
// Log encoding config
|
||||
if let Some(config) = video_manager.get_encoding_config().await {
|
||||
info!(
|
||||
"RustDesk connection {} subscribed to video pipeline: {:?} {}x{} @ {}",
|
||||
conn_id,
|
||||
config.output_codec,
|
||||
config.resolution.width,
|
||||
config.resolution.height,
|
||||
config.bitrate_preset
|
||||
);
|
||||
}
|
||||
|
||||
// Inner loop: receives frames from current subscription
|
||||
loop {
|
||||
// Check if connection is still active
|
||||
if *state.read() != ConnectionState::Active {
|
||||
debug!("Connection {} no longer active, stopping video", conn_id);
|
||||
break 'subscribe_loop;
|
||||
}
|
||||
|
||||
result = encoded_frame_rx.recv() => {
|
||||
match result {
|
||||
Ok(frame) => {
|
||||
// Convert EncodedVideoFrame to RustDesk VideoFrame message
|
||||
let msg_bytes = video_adapter.encode_frame_bytes(
|
||||
&frame.data,
|
||||
frame.is_keyframe,
|
||||
frame.pts_ms as u64,
|
||||
);
|
||||
tokio::select! {
|
||||
biased;
|
||||
|
||||
// Send to connection
|
||||
if video_tx.send(msg_bytes).is_err() {
|
||||
debug!("Video channel closed for connection {}", conn_id);
|
||||
return Ok(());
|
||||
}
|
||||
_ = shutdown_rx.recv() => {
|
||||
debug!("Shutdown signal received, stopping video for connection {}", conn_id);
|
||||
break 'subscribe_loop;
|
||||
}
|
||||
|
||||
encoded_count += 1;
|
||||
|
||||
// Log stats periodically
|
||||
if last_log_time.elapsed().as_secs() >= 10 {
|
||||
info!(
|
||||
"Video streaming stats for connection {}: {} frames forwarded",
|
||||
conn_id, encoded_count
|
||||
result = encoded_frame_rx.recv() => {
|
||||
match result {
|
||||
Ok(frame) => {
|
||||
// Convert EncodedVideoFrame to RustDesk VideoFrame message
|
||||
// Use zero-copy version: Bytes.clone() only increments refcount
|
||||
let msg_bytes = video_adapter.encode_frame_bytes_zero_copy(
|
||||
frame.data.clone(),
|
||||
frame.is_keyframe,
|
||||
frame.pts_ms as u64,
|
||||
);
|
||||
last_log_time = Instant::now();
|
||||
|
||||
// Send to connection (blocks if channel is full, providing backpressure)
|
||||
if video_tx.send(msg_bytes).await.is_err() {
|
||||
debug!("Video channel closed for connection {}", conn_id);
|
||||
break 'subscribe_loop;
|
||||
}
|
||||
|
||||
encoded_count += 1;
|
||||
|
||||
// Log stats periodically
|
||||
if last_log_time.elapsed().as_secs() >= 10 {
|
||||
info!(
|
||||
"Video streaming stats for connection {}: {} frames forwarded",
|
||||
conn_id, encoded_count
|
||||
);
|
||||
last_log_time = Instant::now();
|
||||
}
|
||||
}
|
||||
Err(broadcast::error::RecvError::Lagged(n)) => {
|
||||
debug!("Connection {} lagged {} encoded frames", conn_id, n);
|
||||
}
|
||||
Err(broadcast::error::RecvError::Closed) => {
|
||||
// Pipeline was restarted (e.g., bitrate/codec change)
|
||||
// Re-subscribe to the new pipeline
|
||||
info!("Video pipeline closed for connection {}, re-subscribing...", conn_id);
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
continue 'subscribe_loop;
|
||||
}
|
||||
}
|
||||
Err(broadcast::error::RecvError::Lagged(n)) => {
|
||||
debug!("Connection {} lagged {} encoded frames", conn_id, n);
|
||||
}
|
||||
Err(broadcast::error::RecvError::Closed) => {
|
||||
debug!("Encoded frame channel closed for connection {}", conn_id);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user