diff --git a/build.rs b/build.rs index 7821547a..76374b61 100644 --- a/build.rs +++ b/build.rs @@ -36,6 +36,12 @@ fn compile_protos() { prost_build::Config::new() .out_dir(&out_dir) + // Use bytes::Bytes for video/audio frame data to enable zero-copy + .bytes([ + "EncodedVideoFrame.data", + "AudioFrame.data", + "CursorData.colors", + ]) .compile_protos( &["protos/rendezvous.proto", "protos/message.proto"], &["protos/"], diff --git a/src/rustdesk/bytes_codec.rs b/src/rustdesk/bytes_codec.rs index 3da7a2bb..ec210bc7 100644 --- a/src/rustdesk/bytes_codec.rs +++ b/src/rustdesk/bytes_codec.rs @@ -98,6 +98,48 @@ pub async fn write_frame(writer: &mut W, data: &[u8]) -> Ok(()) } +/// Write a framed message using a reusable buffer (reduces allocations) +/// +/// This version reuses the provided BytesMut buffer to avoid allocation on each call. +/// The buffer is cleared before use and will grow as needed. +pub async fn write_frame_buffered( + writer: &mut W, + data: &[u8], + buf: &mut BytesMut, +) -> io::Result<()> { + buf.clear(); + encode_frame_into(data, buf)?; + writer.write_all(buf).await?; + writer.flush().await?; + Ok(()) +} + +/// Encode a message with RustDesk's variable-length framing into an existing buffer +pub fn encode_frame_into(data: &[u8], buf: &mut BytesMut) -> io::Result<()> { + let len = data.len(); + + // Reserve space for header (max 4 bytes) + data + buf.reserve(4 + len); + + if len <= 0x3F { + buf.put_u8((len << 2) as u8); + } else if len <= 0x3FFF { + buf.put_u16_le(((len << 2) as u16) | 0x1); + } else if len <= 0x3FFFFF { + let h = ((len << 2) as u32) | 0x2; + buf.put_u8((h & 0xFF) as u8); + buf.put_u8(((h >> 8) & 0xFF) as u8); + buf.put_u8(((h >> 16) & 0xFF) as u8); + } else if len <= MAX_PACKET_LENGTH { + buf.put_u32_le(((len << 2) as u32) | 0x3); + } else { + return Err(io::Error::new(io::ErrorKind::InvalidInput, "Message too large")); + } + + buf.extend_from_slice(data); + Ok(()) +} + /// BytesCodec for stateful decoding (compatible with tokio-util codec) #[derive(Debug, Clone, Copy)] pub struct BytesCodec { diff --git a/src/rustdesk/connection.rs b/src/rustdesk/connection.rs index cb893cdc..9f316fbb 100644 --- a/src/rustdesk/connection.rs +++ b/src/rustdesk/connection.rs @@ -12,7 +12,7 @@ use std::net::SocketAddr; use std::sync::Arc; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; use parking_lot::RwLock; use prost::Message as ProstMessage; use tokio::net::TcpStream; @@ -25,7 +25,7 @@ use crate::video::encoder::registry::{EncoderRegistry, VideoEncoderType}; use crate::video::encoder::BitratePreset; use crate::video::stream_manager::VideoStreamManager; -use super::bytes_codec::{read_frame, write_frame}; +use super::bytes_codec::{read_frame, write_frame, write_frame_buffered}; use super::config::RustDeskConfig; use super::crypto::{self, decrypt_symmetric_key_msg, KeyPair, SigningKeyPair}; use super::frame_adapters::{VideoCodec, VideoFrameAdapter}; @@ -145,7 +145,7 @@ pub struct Connection { /// Negotiated video codec (after client capability exchange) negotiated_codec: Option, /// Video frame sender for restarting video after codec switch - video_frame_tx: Option>, + video_frame_tx: Option>, /// Input event throttler to prevent HID device EAGAIN errors input_throttler: InputThrottler, /// Last measured round-trip delay in milliseconds (for TestDelay responses) @@ -263,8 +263,8 @@ impl Connection { info!("Sending SignedId with device_id={}", self.device_id); self.send_framed_arc(&writer, &signed_id_bytes).await?; - // Channel for receiving video frames to send - let (video_tx, mut video_rx) = mpsc::unbounded_channel::(); + // Channel for receiving video frames to send (bounded to provide backpressure) + let (video_tx, mut video_rx) = mpsc::channel::(4); let mut video_streaming = false; // Timer for sending TestDelay to measure round-trip latency @@ -272,6 +272,10 @@ impl Connection { let mut test_delay_interval = tokio::time::interval(Duration::from_secs(1)); test_delay_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + // Pre-allocated buffer for framing (reused across sends to reduce allocations) + // Typical H264 frame is 10-100KB, pre-allocate 128KB + let mut frame_buf = BytesMut::with_capacity(128 * 1024); + loop { tokio::select! { // Read framed message from client using RustDesk's variable-length encoding @@ -295,8 +299,23 @@ impl Connection { } // Send video frames (encrypted if session key is set) + // Optimized path: inline encryption and use pre-allocated buffer Some(frame_data) = video_rx.recv() => { - if let Err(e) = self.send_encrypted_arc(&writer, &frame_data).await { + let send_result = if let Some(ref key) = self.session_key { + // Encrypt the frame + self.enc_seqnum += 1; + let nonce = Self::get_nonce(self.enc_seqnum); + let ciphertext = secretbox::seal(&frame_data, &nonce, key); + // Send using pre-allocated buffer + let mut w = writer.lock().await; + write_frame_buffered(&mut *w, &ciphertext, &mut frame_buf).await + } else { + // No encryption, send plain + let mut w = writer.lock().await; + write_frame_buffered(&mut *w, &frame_data, &mut frame_buf).await + }; + + if let Err(e) = send_result { error!("Error sending video frame: {}", e); break; } @@ -368,7 +387,7 @@ impl Connection { &mut self, data: &[u8], writer: &Arc>, - video_tx: &mpsc::UnboundedSender, + video_tx: &mpsc::Sender, video_streaming: &mut bool, ) -> anyhow::Result<()> { // Try to decrypt if we have a session key @@ -677,7 +696,7 @@ impl Connection { } /// Start video streaming task - fn start_video_streaming(&mut self, video_tx: mpsc::UnboundedSender) { + fn start_video_streaming(&mut self, video_tx: mpsc::Sender) { let video_manager = match &self.video_manager { Some(vm) => vm.clone(), None => { @@ -1284,10 +1303,13 @@ impl ConnectionManager { /// This function subscribes to the shared video encoding pipeline (used by WebRTC) /// and forwards encoded frames to the RustDesk client. This avoids duplicate encoding /// when both WebRTC and RustDesk clients are connected. +/// +/// When the pipeline is restarted (e.g., due to bitrate/codec change), this function +/// will automatically re-subscribe to the new pipeline. async fn run_video_streaming( conn_id: u32, video_manager: Arc, - video_tx: mpsc::UnboundedSender, + video_tx: mpsc::Sender, state: Arc>, shutdown_tx: broadcast::Sender<()>, negotiated_codec: VideoEncoderType, @@ -1309,30 +1331,7 @@ async fn run_video_streaming( // Continue anyway, will use whatever codec the pipeline already has } - // Subscribe to the shared video encoding pipeline - // This uses the same encoder as WebRTC, avoiding duplicate encoding - let mut encoded_frame_rx = match video_manager.subscribe_encoded_frames().await { - Some(rx) => rx, - None => { - warn!("No encoded frame source available for video streaming"); - return Ok(()); - } - }; - - // Get encoding config for logging - if let Some(config) = video_manager.get_encoding_config().await { - info!( - "RustDesk connection {} using shared video pipeline: {:?} {}x{} @ {}", - conn_id, - config.output_codec, - config.resolution.width, - config.resolution.height, - config.bitrate_preset - ); - } - // Create video frame adapter for RustDesk protocol - // Use the negotiated codec for the adapter let codec = match negotiated_codec { VideoEncoderType::H264 => VideoCodec::H264, VideoEncoderType::H265 => VideoCodec::H265, @@ -1347,54 +1346,91 @@ async fn run_video_streaming( info!("Started shared video streaming for connection {} (codec: {:?})", conn_id, codec); - loop { - // Check if connection is still active + // Outer loop: handles pipeline restarts by re-subscribing + 'subscribe_loop: loop { + // Check if connection is still active before subscribing if *state.read() != ConnectionState::Active { debug!("Connection {} no longer active, stopping video", conn_id); break; } - tokio::select! { - biased; + // Subscribe to the shared video encoding pipeline + let mut encoded_frame_rx = match video_manager.subscribe_encoded_frames().await { + Some(rx) => rx, + None => { + // Pipeline not ready yet, wait and retry + debug!("No encoded frame source available for connection {}, retrying...", conn_id); + tokio::time::sleep(Duration::from_millis(100)).await; + continue 'subscribe_loop; + } + }; - _ = shutdown_rx.recv() => { - debug!("Shutdown signal received, stopping video for connection {}", conn_id); - break; + // Log encoding config + if let Some(config) = video_manager.get_encoding_config().await { + info!( + "RustDesk connection {} subscribed to video pipeline: {:?} {}x{} @ {}", + conn_id, + config.output_codec, + config.resolution.width, + config.resolution.height, + config.bitrate_preset + ); + } + + // Inner loop: receives frames from current subscription + loop { + // Check if connection is still active + if *state.read() != ConnectionState::Active { + debug!("Connection {} no longer active, stopping video", conn_id); + break 'subscribe_loop; } - result = encoded_frame_rx.recv() => { - match result { - Ok(frame) => { - // Convert EncodedVideoFrame to RustDesk VideoFrame message - let msg_bytes = video_adapter.encode_frame_bytes( - &frame.data, - frame.is_keyframe, - frame.pts_ms as u64, - ); + tokio::select! { + biased; - // Send to connection - if video_tx.send(msg_bytes).is_err() { - debug!("Video channel closed for connection {}", conn_id); - return Ok(()); - } + _ = shutdown_rx.recv() => { + debug!("Shutdown signal received, stopping video for connection {}", conn_id); + break 'subscribe_loop; + } - encoded_count += 1; - - // Log stats periodically - if last_log_time.elapsed().as_secs() >= 10 { - info!( - "Video streaming stats for connection {}: {} frames forwarded", - conn_id, encoded_count + result = encoded_frame_rx.recv() => { + match result { + Ok(frame) => { + // Convert EncodedVideoFrame to RustDesk VideoFrame message + // Use zero-copy version: Bytes.clone() only increments refcount + let msg_bytes = video_adapter.encode_frame_bytes_zero_copy( + frame.data.clone(), + frame.is_keyframe, + frame.pts_ms as u64, ); - last_log_time = Instant::now(); + + // Send to connection (blocks if channel is full, providing backpressure) + if video_tx.send(msg_bytes).await.is_err() { + debug!("Video channel closed for connection {}", conn_id); + break 'subscribe_loop; + } + + encoded_count += 1; + + // Log stats periodically + if last_log_time.elapsed().as_secs() >= 10 { + info!( + "Video streaming stats for connection {}: {} frames forwarded", + conn_id, encoded_count + ); + last_log_time = Instant::now(); + } + } + Err(broadcast::error::RecvError::Lagged(n)) => { + debug!("Connection {} lagged {} encoded frames", conn_id, n); + } + Err(broadcast::error::RecvError::Closed) => { + // Pipeline was restarted (e.g., bitrate/codec change) + // Re-subscribe to the new pipeline + info!("Video pipeline closed for connection {}, re-subscribing...", conn_id); + tokio::time::sleep(Duration::from_millis(100)).await; + continue 'subscribe_loop; } - } - Err(broadcast::error::RecvError::Lagged(n)) => { - debug!("Connection {} lagged {} encoded frames", conn_id, n); - } - Err(broadcast::error::RecvError::Closed) => { - debug!("Encoded frame channel closed for connection {}", conn_id); - break; } } } diff --git a/src/rustdesk/frame_adapters.rs b/src/rustdesk/frame_adapters.rs index 0e1044a3..17e5d9f1 100644 --- a/src/rustdesk/frame_adapters.rs +++ b/src/rustdesk/frame_adapters.rs @@ -1,8 +1,9 @@ //! RustDesk Frame Adapters //! //! Converts One-KVM video/audio frames to RustDesk protocol format. +//! Optimized for zero-copy where possible and buffer reuse. -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; use prost::Message as ProstMessage; use super::protocol::hbb::{self, message, EncodedVideoFrame, EncodedVideoFrames, AudioFrame, AudioFormat, Misc}; @@ -55,8 +56,10 @@ impl VideoFrameAdapter { self.codec = codec; } - /// Convert encoded video data to RustDesk Message - pub fn encode_frame(&mut self, data: &[u8], is_keyframe: bool, timestamp_ms: u64) -> hbb::Message { + /// Convert encoded video data to RustDesk Message (zero-copy version) + /// + /// This version takes Bytes directly to avoid copying the frame data. + pub fn encode_frame_from_bytes(&mut self, data: Bytes, is_keyframe: bool, timestamp_ms: u64) -> hbb::Message { // Calculate relative timestamp if self.seq == 0 { self.timestamp_base = timestamp_ms; @@ -64,7 +67,7 @@ impl VideoFrameAdapter { let pts = (timestamp_ms - self.timestamp_base) as i64; let frame = EncodedVideoFrame { - data: data.to_vec(), + data, // Zero-copy: Bytes is reference-counted key: is_keyframe, pts, ..Default::default() @@ -107,10 +110,24 @@ impl VideoFrameAdapter { } } + /// Convert encoded video data to RustDesk Message + pub fn encode_frame(&mut self, data: &[u8], is_keyframe: bool, timestamp_ms: u64) -> hbb::Message { + self.encode_frame_from_bytes(Bytes::copy_from_slice(data), is_keyframe, timestamp_ms) + } + + /// Encode frame to bytes for sending (zero-copy version) + /// + /// Takes Bytes directly to avoid copying the frame data. + pub fn encode_frame_bytes_zero_copy(&mut self, data: Bytes, is_keyframe: bool, timestamp_ms: u64) -> Bytes { + let msg = self.encode_frame_from_bytes(data, is_keyframe, timestamp_ms); + let mut buf = BytesMut::with_capacity(msg.encoded_len()); + msg.encode(&mut buf).expect("encode should not fail"); + buf.freeze() + } + /// Encode frame to bytes for sending pub fn encode_frame_bytes(&mut self, data: &[u8], is_keyframe: bool, timestamp_ms: u64) -> Bytes { - let msg = self.encode_frame(data, is_keyframe, timestamp_ms); - Bytes::from(ProstMessage::encode_to_vec(&msg)) + self.encode_frame_bytes_zero_copy(Bytes::copy_from_slice(data), is_keyframe, timestamp_ms) } /// Get current sequence number @@ -163,7 +180,7 @@ impl AudioFrameAdapter { /// Convert Opus audio data to RustDesk Message pub fn encode_opus_frame(&self, data: &[u8]) -> hbb::Message { let frame = AudioFrame { - data: data.to_vec(), + data: Bytes::copy_from_slice(data), }; hbb::Message { @@ -202,7 +219,7 @@ impl CursorAdapter { hoty, width, height, - colors, + colors: Bytes::from(colors), ..Default::default() }; diff --git a/src/rustdesk/rendezvous.rs b/src/rustdesk/rendezvous.rs index 63d17b9c..6d144c57 100644 --- a/src/rustdesk/rendezvous.rs +++ b/src/rustdesk/rendezvous.rs @@ -353,7 +353,6 @@ impl RendezvousMediator { let id = self.device_id(); let serial = *self.serial.read(); - debug!("Sending RegisterPeer: id={}, serial={}", id, serial); let msg = make_register_peer(&id, serial); let bytes = msg.encode_to_vec(); socket.send(&bytes).await?; @@ -445,7 +444,6 @@ impl RendezvousMediator { match msg.union { Some(rendezvous_message::Union::RegisterPeerResponse(rpr)) => { - debug!("Received RegisterPeerResponse, request_pk={}", rpr.request_pk); if rpr.request_pk { // Server wants us to register our public key info!("Server requested public key registration"); diff --git a/src/webrtc/webrtc_streamer.rs b/src/webrtc/webrtc_streamer.rs index 4f225b30..799b568d 100644 --- a/src/webrtc/webrtc_streamer.rs +++ b/src/webrtc/webrtc_streamer.rs @@ -895,6 +895,9 @@ impl WebRtcStreamer { preset ); + // Save video_frame_tx BEFORE stopping pipeline (monitor task will clear it) + let saved_frame_tx = self.video_frame_tx.read().await.clone(); + // Stop existing pipeline if let Some(ref pipeline) = *self.video_pipeline.read().await { pipeline.stop(); @@ -907,13 +910,16 @@ impl WebRtcStreamer { *self.video_pipeline.write().await = None; // Recreate pipeline with new config if we have a frame source - if let Some(ref tx) = *self.video_frame_tx.read().await { + if let Some(tx) = saved_frame_tx { // Get existing sessions that need to be reconnected let session_ids: Vec = self.sessions.read().await.keys().cloned().collect(); if !session_ids.is_empty() { + // Restore video_frame_tx before recreating pipeline + *self.video_frame_tx.write().await = Some(tx.clone()); + // Recreate pipeline - let pipeline = self.ensure_video_pipeline(tx.clone()).await?; + let pipeline = self.ensure_video_pipeline(tx).await?; // Reconnect all sessions to new pipeline let sessions = self.sessions.read().await;