From 3824e57fc523d010c4fb835b96449dc04ecf6218 Mon Sep 17 00:00:00 2001 From: mofeng-git Date: Wed, 11 Feb 2026 16:06:06 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=94=AF=E6=8C=81=20rtsp=20=E5=8A=9F?= =?UTF-8?q?=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.toml | 2 + src/config/schema.rs | 52 + src/lib.rs | 1 + src/main.rs | 53 + src/rtsp/mod.rs | 3 + src/rtsp/service.rs | 1235 +++++++++++++++++++++ src/rustdesk/connection.rs | 70 +- src/state.rs | 5 + src/video/codec_constraints.rs | 193 ++++ src/video/mod.rs | 1 + src/video/stream_manager.rs | 11 + src/web/handlers/config/apply.rs | 109 +- src/web/handlers/config/mod.rs | 5 + src/web/handlers/config/rtsp.rs | 67 ++ src/web/handlers/config/rustdesk.rs | 9 + src/web/handlers/config/stream.rs | 5 + src/web/handlers/config/types.rs | 119 ++ src/web/handlers/mod.rs | 92 ++ src/web/routes.rs | 8 + web/src/api/config.ts | 43 + web/src/api/index.ts | 20 + web/src/components/VideoConfigPopover.vue | 35 +- web/src/types/generated.ts | 53 + 23 files changed, 2154 insertions(+), 37 deletions(-) create mode 100644 src/rtsp/mod.rs create mode 100644 src/rtsp/service.rs create mode 100644 src/video/codec_constraints.rs create mode 100644 src/web/handlers/config/rtsp.rs diff --git a/Cargo.toml b/Cargo.toml index 1ac350c5..e1431c00 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -92,6 +92,8 @@ arc-swap = "1.8" # WebRTC webrtc = "0.14" rtp = "0.14" +rtsp-types = "0.1" +sdp-types = "0.1" # Audio (ALSA capture + Opus encoding) # Note: audiopus links to libopus.so (unavoidable for audio support) diff --git a/src/config/schema.rs b/src/config/schema.rs index bb5d06a6..ece7edd8 100644 --- a/src/config/schema.rs +++ b/src/config/schema.rs @@ -35,6 +35,8 @@ pub struct AppConfig { pub extensions: ExtensionsConfig, /// RustDesk remote access settings pub rustdesk: RustDeskConfig, + /// RTSP streaming settings + pub rtsp: RtspConfig, } @@ -404,6 +406,56 @@ pub enum StreamMode { Mjpeg, } +/// RTSP output codec +#[typeshare] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[serde(rename_all = "lowercase")] +#[derive(Default)] +pub enum RtspCodec { + #[default] + H264, + H265, +} + +/// RTSP configuration +#[typeshare] +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct RtspConfig { + /// Enable RTSP output + pub enabled: bool, + /// Bind IP address + pub bind: String, + /// RTSP TCP listen port + pub port: u16, + /// Stream path (without leading slash) + pub path: String, + /// Allow only one client connection at a time + pub allow_one_client: bool, + /// Output codec (H264/H265) + pub codec: RtspCodec, + /// Optional username for authentication + pub username: Option, + /// Optional password for authentication + #[typeshare(skip)] + pub password: Option, +} + +impl Default for RtspConfig { + fn default() -> Self { + Self { + enabled: false, + bind: "0.0.0.0".to_string(), + port: 8554, + path: "live".to_string(), + allow_one_client: true, + codec: RtspCodec::H264, + username: None, + password: None, + } + } +} + /// Encoder type #[typeshare] diff --git a/src/lib.rs b/src/lib.rs index a854c712..e5cf69ae 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,6 +14,7 @@ pub mod hid; pub mod modules; pub mod msd; pub mod otg; +pub mod rtsp; pub mod rustdesk; pub mod state; pub mod stream; diff --git a/src/main.rs b/src/main.rs index 1b264448..6ffe5734 100644 --- a/src/main.rs +++ b/src/main.rs @@ -19,9 +19,13 @@ use one_kvm::extensions::ExtensionManager; use one_kvm::hid::{HidBackendType, HidController}; use one_kvm::msd::MsdController; use one_kvm::otg::{configfs, OtgService}; +use one_kvm::rtsp::RtspService; use one_kvm::rustdesk::RustDeskService; use one_kvm::state::AppState; use one_kvm::utils::bind_tcp_listener; +use one_kvm::video::codec_constraints::{ + enforce_constraints_with_stream_manager, StreamCodecConstraints, +}; use one_kvm::video::format::{PixelFormat, Resolution}; use one_kvm::video::{Streamer, VideoStreamManager}; use one_kvm::web; @@ -534,6 +538,21 @@ async fn main() -> anyhow::Result<()> { None }; + // Create RTSP service (optional, based on config) + let rtsp = if config.rtsp.enabled { + tracing::info!( + "Initializing RTSP service: rtsp://{}:{}/{}", + config.rtsp.bind, + config.rtsp.port, + config.rtsp.path + ); + let service = RtspService::new(config.rtsp.clone(), stream_manager.clone()); + Some(Arc::new(service)) + } else { + tracing::info!("RTSP disabled in configuration"); + None + }; + // Create application state let state = AppState::new( config_store.clone(), @@ -546,6 +565,7 @@ async fn main() -> anyhow::Result<()> { atx, audio, rustdesk.clone(), + rtsp.clone(), extensions.clone(), events.clone(), shutdown_tx.clone(), @@ -577,6 +597,30 @@ async fn main() -> anyhow::Result<()> { } } + // Start RTSP service if enabled + if let Some(ref service) = rtsp { + if let Err(e) = service.start().await { + tracing::error!("Failed to start RTSP service: {}", e); + } else { + tracing::info!("RTSP service started"); + } + } + + // Enforce startup codec constraints (e.g. RTSP/RustDesk locks) + { + let runtime_config = state.config.get(); + let constraints = StreamCodecConstraints::from_config(&runtime_config); + match enforce_constraints_with_stream_manager(&state.stream_manager, &constraints).await { + Ok(result) if result.changed => { + if let Some(message) = result.message { + tracing::info!("{}", message); + } + } + Ok(_) => {} + Err(e) => tracing::warn!("Failed to enforce startup codec constraints: {}", e), + } + } + // Start enabled extensions { let ext_config = config_store.get(); @@ -886,6 +930,15 @@ async fn cleanup(state: &Arc) { } } + // Stop RTSP service + if let Some(ref service) = *state.rtsp.read().await { + if let Err(e) = service.stop().await { + tracing::warn!("Failed to stop RTSP service: {}", e); + } else { + tracing::info!("RTSP service stopped"); + } + } + // Stop video if let Err(e) = state.stream_manager.stop().await { tracing::warn!("Failed to stop streamer: {}", e); diff --git a/src/rtsp/mod.rs b/src/rtsp/mod.rs new file mode 100644 index 00000000..b8cfd8b5 --- /dev/null +++ b/src/rtsp/mod.rs @@ -0,0 +1,3 @@ +pub mod service; + +pub use service::{RtspService, RtspServiceStatus}; diff --git a/src/rtsp/service.rs b/src/rtsp/service.rs new file mode 100644 index 00000000..1737ec5f --- /dev/null +++ b/src/rtsp/service.rs @@ -0,0 +1,1235 @@ +use bytes::Bytes; +use base64::Engine; +use rand::Rng; +use rtp::packet::Packet; +use rtp::packetizer::Payloader; +use std::collections::HashMap; +use std::io; +use std::net::SocketAddr; +use std::sync::Arc; +use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use tokio::net::{TcpListener, TcpStream}; +use tokio::sync::{broadcast, Mutex, RwLock}; +use webrtc::util::Marshal; +use rtsp_types as rtsp; +use sdp_types as sdp; + +use crate::config::{RtspCodec, RtspConfig}; +use crate::error::{AppError, Result}; +use crate::video::encoder::registry::VideoEncoderType; +use crate::video::encoder::VideoCodecType; +use crate::video::shared_video_pipeline::EncodedVideoFrame; +use crate::video::VideoStreamManager; +use crate::webrtc::h265_payloader::H265Payloader; +use crate::webrtc::rtp::parse_profile_level_id_from_sps; + +const RTP_CLOCK_RATE: u32 = 90_000; +const RTP_MTU: usize = 1200; +const RTSP_BUF_SIZE: usize = 8192; + +#[derive(Debug, Clone, PartialEq)] +pub enum RtspServiceStatus { + Stopped, + Starting, + Running, + Error(String), +} + +impl std::fmt::Display for RtspServiceStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Stopped => write!(f, "stopped"), + Self::Starting => write!(f, "starting"), + Self::Running => write!(f, "running"), + Self::Error(err) => write!(f, "error: {}", err), + } + } +} + +#[derive(Debug, Clone)] +struct RtspRequest { + method: rtsp::Method, + uri: String, + version: rtsp::Version, + headers: HashMap, +} + +struct RtspConnectionState { + session_id: String, + setup_done: bool, + interleaved_channel: u8, +} + +impl RtspConnectionState { + fn new() -> Self { + Self { + session_id: generate_session_id(), + setup_done: false, + interleaved_channel: 0, + } + } +} + +#[derive(Default, Clone)] +struct ParameterSets { + h264_sps: Option, + h264_pps: Option, + h265_vps: Option, + h265_sps: Option, + h265_pps: Option, +} + +#[derive(Clone)] +struct SharedRtspState { + active_client: Arc>>, + parameter_sets: Arc>, +} + +impl SharedRtspState { + fn new() -> Self { + Self { + active_client: Arc::new(Mutex::new(None)), + parameter_sets: Arc::new(RwLock::new(ParameterSets::default())), + } + } +} + +pub struct RtspService { + config: Arc>, + status: Arc>, + video_manager: Arc, + shutdown_tx: broadcast::Sender<()>, + server_handle: Arc>>>, + client_handles: Arc>>>, + shared_state: SharedRtspState, +} + +impl RtspService { + pub fn new(config: RtspConfig, video_manager: Arc) -> Self { + let (shutdown_tx, _) = broadcast::channel(1); + Self { + config: Arc::new(RwLock::new(config)), + status: Arc::new(RwLock::new(RtspServiceStatus::Stopped)), + video_manager, + shutdown_tx, + server_handle: Arc::new(Mutex::new(None)), + client_handles: Arc::new(Mutex::new(Vec::new())), + shared_state: SharedRtspState::new(), + } + } + + pub async fn start(&self) -> Result<()> { + let config = self.config.read().await.clone(); + if !config.enabled { + *self.status.write().await = RtspServiceStatus::Stopped; + return Ok(()); + } + + if matches!(*self.status.read().await, RtspServiceStatus::Running) { + return Ok(()); + } + + *self.status.write().await = RtspServiceStatus::Starting; + + let codec = match config.codec { + RtspCodec::H264 => VideoCodecType::H264, + RtspCodec::H265 => VideoCodecType::H265, + }; + + if let Err(err) = self.video_manager.set_video_codec(codec).await { + let message = format!("failed to set codec before RTSP start: {}", err); + *self.status.write().await = RtspServiceStatus::Error(message.clone()); + return Err(AppError::VideoError(message)); + } + + if let Err(err) = self.video_manager.request_keyframe().await { + tracing::debug!("Failed to request keyframe on RTSP start: {}", err); + } + + let bind_addr: SocketAddr = format!("{}:{}", config.bind, config.port) + .parse() + .map_err(|e| AppError::BadRequest(format!("Invalid RTSP bind address: {}", e)))?; + + let listener = TcpListener::bind(bind_addr) + .await + .map_err(|e| AppError::Io(io::Error::new(e.kind(), format!("RTSP bind failed: {}", e))))?; + + let service_config = self.config.clone(); + let video_manager = self.video_manager.clone(); + let shared_state = self.shared_state.clone(); + let mut shutdown_rx = self.shutdown_tx.subscribe(); + let status = self.status.clone(); + let client_handles = self.client_handles.clone(); + + let handle = tokio::spawn(async move { + tracing::info!("RTSP service listening on {}", bind_addr); + *status.write().await = RtspServiceStatus::Running; + + loop { + tokio::select! { + _ = shutdown_rx.recv() => { + tracing::info!("RTSP service shutdown signal received"); + break; + } + result = listener.accept() => { + match result { + Ok((stream, addr)) => { + let cfg = service_config.clone(); + let vm = video_manager.clone(); + let shared = shared_state.clone(); + let handle = tokio::spawn(async move { + if let Err(e) = handle_client(stream, addr, cfg, vm, shared).await { + tracing::warn!("RTSP client {} ended with error: {}", addr, e); + } + }); + let mut handles = client_handles.lock().await; + handles.retain(|task| !task.is_finished()); + handles.push(handle); + } + Err(e) => { + tracing::warn!("RTSP accept failed: {}", e); + } + } + } + } + } + + *status.write().await = RtspServiceStatus::Stopped; + }); + + *self.server_handle.lock().await = Some(handle); + Ok(()) + } + + pub async fn stop(&self) -> Result<()> { + let _ = self.shutdown_tx.send(()); + if let Some(handle) = self.server_handle.lock().await.take() { + handle.abort(); + } + + let mut client_handles = self.client_handles.lock().await; + for handle in client_handles.drain(..) { + handle.abort(); + } + + *self.shared_state.active_client.lock().await = None; + *self.status.write().await = RtspServiceStatus::Stopped; + Ok(()) + } + + pub async fn restart(&self, config: RtspConfig) -> Result<()> { + self.update_config(config).await; + self.stop().await?; + self.start().await + } + + pub async fn update_config(&self, config: RtspConfig) { + *self.config.write().await = config; + } + + pub async fn config(&self) -> RtspConfig { + self.config.read().await.clone() + } + + pub async fn status(&self) -> RtspServiceStatus { + self.status.read().await.clone() + } +} + +async fn handle_client( + mut stream: TcpStream, + peer: SocketAddr, + config: Arc>, + video_manager: Arc, + shared: SharedRtspState, +) -> Result<()> { + let cfg_snapshot = config.read().await.clone(); + + let auth_enabled = cfg_snapshot.username.as_ref().is_some_and(|u| !u.is_empty()) + || cfg_snapshot.password.as_ref().is_some_and(|p| !p.is_empty()); + + if cfg_snapshot.allow_one_client { + let mut active_guard = shared.active_client.lock().await; + if let Some(active) = *active_guard { + if active != peer { + send_simple_response( + &mut stream, + 453, + "Not Enough Bandwidth", + None, + "another client is active", + ) + .await?; + return Ok(()); + } + } else { + *active_guard = Some(peer); + } + } + + let mut state = RtspConnectionState::new(); + let mut read_buf = [0u8; RTSP_BUF_SIZE]; + let mut request_buffer = Vec::with_capacity(RTSP_BUF_SIZE); + + 'client_loop: loop { + let n = stream.read(&mut read_buf).await?; + if n == 0 { + break; + } + + request_buffer.extend_from_slice(&read_buf[..n]); + + while let Some(req_text) = take_rtsp_request_from_buffer(&mut request_buffer) { + let req = match parse_rtsp_request(&req_text) { + Some(r) => r, + None => { + send_simple_response(&mut stream, 400, "Bad Request", None, "").await?; + continue; + } + }; + + if !is_valid_rtsp_path(&req.uri, &cfg_snapshot.path) { + send_response( + &mut stream, + &req, + 404, + "Not Found", + vec![], + "", + "", + ) + .await?; + continue; + } + + if auth_enabled { + let expected_user = cfg_snapshot.username.clone().unwrap_or_default(); + let expected_pass = cfg_snapshot.password.clone().unwrap_or_default(); + let ok = extract_basic_auth(&req) + .map(|(u, p)| u == expected_user && p == expected_pass) + .unwrap_or(false); + if !ok { + send_response( + &mut stream, + &req, + 401, + "Unauthorized", + vec![( + "WWW-Authenticate".to_string(), + "Basic realm=\"One-KVM RTSP\"".to_string(), + )], + "", + "", + ) + .await?; + continue; + } + } + + match &req.method { + rtsp::Method::Options => { + send_response( + &mut stream, + &req, + 200, + "OK", + vec![( + "Public".to_string(), + "OPTIONS, DESCRIBE, SETUP, PLAY, TEARDOWN".to_string(), + )], + "", + "", + ) + .await?; + } + rtsp::Method::Describe => { + let codec = match cfg_snapshot.codec { + RtspCodec::H264 => VideoCodecType::H264, + RtspCodec::H265 => VideoCodecType::H265, + }; + let params = shared.parameter_sets.read().await.clone(); + let sdp = build_sdp(&cfg_snapshot, codec, ¶ms); + if sdp.is_empty() { + send_response( + &mut stream, + &req, + 500, + "Internal Server Error", + vec![], + "", + &state.session_id, + ) + .await?; + continue; + } + + send_response( + &mut stream, + &req, + 200, + "OK", + vec![( + "Content-Type".to_string(), + "application/sdp".to_string(), + )], + &sdp, + &state.session_id, + ) + .await?; + } + rtsp::Method::Setup => { + let transport = req + .headers + .get("transport") + .cloned() + .unwrap_or_default(); + + let interleaved = parse_interleaved_channel(&transport).unwrap_or(0); + state.setup_done = true; + state.interleaved_channel = interleaved; + + let transport_resp = format!( + "RTP/AVP/TCP;unicast;interleaved={}-{}", + interleaved, + interleaved.saturating_add(1) + ); + + send_response( + &mut stream, + &req, + 200, + "OK", + vec![("Transport".to_string(), transport_resp)], + "", + &state.session_id, + ) + .await?; + } + rtsp::Method::Play => { + if !state.setup_done { + send_response( + &mut stream, + &req, + 455, + "Method Not Valid in This State", + vec![], + "", + &state.session_id, + ) + .await?; + continue; + } + + send_response( + &mut stream, + &req, + 200, + "OK", + vec![], + "", + &state.session_id, + ) + .await?; + + if let Err(e) = stream_video_interleaved( + stream, + &video_manager, + cfg_snapshot.codec.clone(), + state.interleaved_channel, + shared.clone(), + state.session_id.clone(), + ) + .await + { + tracing::warn!("RTSP stream loop ended for {}: {}", peer, e); + } + + break 'client_loop; + } + rtsp::Method::Teardown => { + send_response( + &mut stream, + &req, + 200, + "OK", + vec![], + "", + &state.session_id, + ) + .await?; + break 'client_loop; + } + _ => { + send_response( + &mut stream, + &req, + 405, + "Method Not Allowed", + vec![], + "", + &state.session_id, + ) + .await?; + } + } + } + } + + if cfg_snapshot.allow_one_client { + let mut active_guard = shared.active_client.lock().await; + if active_guard.as_ref().copied() == Some(peer) { + *active_guard = None; + } + } + + Ok(()) +} + +async fn stream_video_interleaved( + stream: TcpStream, + video_manager: &Arc, + rtsp_codec: RtspCodec, + channel: u8, + shared: SharedRtspState, + session_id: String, +) -> Result<()> { + let (mut reader, mut writer) = stream.into_split(); + + let mut rx = video_manager + .subscribe_encoded_frames() + .await + .ok_or_else(|| AppError::VideoError("RTSP failed to subscribe encoded frames".to_string()))?; + + video_manager.request_keyframe().await.ok(); + + let payload_type = match rtsp_codec { + RtspCodec::H264 => 96, + RtspCodec::H265 => 99, + }; + let mut sequence_number: u16 = rand::rng().random(); + let ssrc: u32 = rand::rng().random(); + + let mut h264_payloader = rtp::codecs::h264::H264Payloader::default(); + let mut h265_payloader = H265Payloader::new(); + let mut ctrl_read_buf = [0u8; RTSP_BUF_SIZE]; + let mut ctrl_buffer = Vec::with_capacity(RTSP_BUF_SIZE); + + loop { + tokio::select! { + maybe_frame = rx.recv() => { + let Some(frame) = maybe_frame else { + break; + }; + + if !is_frame_codec_match(&frame, &rtsp_codec) { + continue; + } + + { + let mut params = shared.parameter_sets.write().await; + update_parameter_sets(&mut params, &frame); + } + + let rtp_timestamp = pts_to_rtp_timestamp(frame.pts_ms); + + let payloads: Vec = match rtsp_codec { + RtspCodec::H264 => h264_payloader + .payload(RTP_MTU, &frame.data) + .map_err(|e| AppError::VideoError(format!("H264 payload failed: {}", e)))?, + RtspCodec::H265 => h265_payloader.payload(RTP_MTU, &frame.data), + }; + + if payloads.is_empty() { + continue; + } + + let total_payloads = payloads.len(); + for (idx, payload) in payloads.into_iter().enumerate() { + let marker = idx == total_payloads.saturating_sub(1); + let packet = Packet { + header: rtp::header::Header { + version: 2, + padding: false, + extension: false, + marker, + payload_type, + sequence_number, + timestamp: rtp_timestamp, + ssrc, + ..Default::default() + }, + payload, + }; + + sequence_number = sequence_number.wrapping_add(1); + send_interleaved_rtp(&mut writer, channel, &packet).await?; + } + + if frame.is_keyframe { + tracing::debug!("RTSP keyframe sent"); + } + } + read_res = reader.read(&mut ctrl_read_buf) => { + let n = read_res?; + if n == 0 { + break; + } + + ctrl_buffer.extend_from_slice(&ctrl_read_buf[..n]); + + while strip_interleaved_frames_prefix(&mut ctrl_buffer) {} + + while let Some(raw_req) = take_rtsp_request_from_buffer(&mut ctrl_buffer) { + let Some(req) = parse_rtsp_request(&raw_req) else { + continue; + }; + + if handle_play_control_request(&mut writer, &req, &session_id).await? { + return Ok(()); + } + + while strip_interleaved_frames_prefix(&mut ctrl_buffer) {} + } + } + } + } + + Ok(()) +} + +async fn send_interleaved_rtp( + stream: &mut W, + channel: u8, + packet: &Packet, +) -> Result<()> { + let marshaled = packet + .marshal() + .map_err(|e| AppError::VideoError(format!("RTP marshal failed: {}", e)))?; + let len = marshaled.len() as u16; + + let mut header = [0u8; 4]; + header[0] = b'$'; + header[1] = channel; + header[2] = (len >> 8) as u8; + header[3] = (len & 0xff) as u8; + + stream.write_all(&header).await?; + stream.write_all(&marshaled).await?; + Ok(()) +} + +async fn handle_play_control_request( + stream: &mut W, + req: &RtspRequest, + session_id: &str, +) -> Result { + match &req.method { + rtsp::Method::Teardown => { + send_response(stream, req, 200, "OK", vec![], "", session_id).await?; + Ok(true) + } + rtsp::Method::Options => { + send_response( + stream, + req, + 200, + "OK", + vec![( + "Public".to_string(), + "OPTIONS, DESCRIBE, SETUP, PLAY, GET_PARAMETER, SET_PARAMETER, TEARDOWN" + .to_string(), + )], + "", + session_id, + ) + .await?; + Ok(false) + } + rtsp::Method::GetParameter | rtsp::Method::SetParameter => { + send_response(stream, req, 200, "OK", vec![], "", session_id).await?; + Ok(false) + } + _ => { + send_response( + stream, + req, + 405, + "Method Not Allowed", + vec![], + "", + session_id, + ) + .await?; + Ok(false) + } + } +} + +fn strip_interleaved_frames_prefix(buffer: &mut Vec) -> bool { + if buffer.len() < 4 || buffer[0] != b'$' { + return false; + } + + let payload_len = u16::from_be_bytes([buffer[2], buffer[3]]) as usize; + let frame_len = 4 + payload_len; + if buffer.len() < frame_len { + return false; + } + + buffer.drain(0..frame_len); + true +} + +fn take_rtsp_request_from_buffer(buffer: &mut Vec) -> Option { + let delimiter = b"\r\n\r\n"; + let pos = find_bytes(buffer, delimiter)?; + let req_end = pos + delimiter.len(); + let req_bytes: Vec = buffer.drain(0..req_end).collect(); + Some(String::from_utf8_lossy(&req_bytes).to_string()) +} + +fn find_bytes(haystack: &[u8], needle: &[u8]) -> Option { + haystack.windows(needle.len()).position(|window| window == needle) +} + +fn parse_rtsp_request(raw: &str) -> Option { + let (message, consumed): (rtsp::Message>, usize) = rtsp::Message::parse(raw.as_bytes()).ok()?; + if consumed != raw.len() { + return None; + } + + let request = match message { + rtsp::Message::Request(req) => req, + _ => return None, + }; + + let uri = request + .request_uri() + .map(|value| value.as_str().to_string()) + .unwrap_or_default(); + + let mut headers = HashMap::new(); + for (name, value) in request.headers() { + headers.insert(name.to_string().to_ascii_lowercase(), value.to_string()); + } + + Some(RtspRequest { + method: request.method().clone(), + uri, + version: request.version(), + headers, + }) +} + +fn extract_basic_auth(req: &RtspRequest) -> Option<(String, String)> { + let value = req.headers.get("authorization")?; + let mut parts = value.split_whitespace(); + let scheme = parts.next()?; + if !scheme.eq_ignore_ascii_case("basic") { + return None; + } + let b64 = parts.next()?; + let decoded = base64::engine::general_purpose::STANDARD.decode(b64).ok()?; + let raw = String::from_utf8(decoded).ok()?; + let (user, pass) = raw.split_once(':')?; + Some((user.to_string(), pass.to_string())) +} + +fn parse_interleaved_channel(transport: &str) -> Option { + let lower = transport.to_ascii_lowercase(); + if let Some((_, v)) = lower.split_once("interleaved=") { + let head = v.split(';').next().unwrap_or(v); + let first = head.split('-').next().unwrap_or(head).trim(); + return first.parse::().ok(); + } + None +} + +fn update_parameter_sets(params: &mut ParameterSets, frame: &EncodedVideoFrame) { + let nal_units = split_annexb_nal_units(frame.data.as_ref()); + + match frame.codec { + VideoEncoderType::H264 => { + for nal in nal_units { + match h264_nal_type(nal) { + Some(7) => params.h264_sps = Some(Bytes::copy_from_slice(nal)), + Some(8) => params.h264_pps = Some(Bytes::copy_from_slice(nal)), + _ => {} + } + } + } + VideoEncoderType::H265 => { + for nal in nal_units { + match h265_nal_type(nal) { + Some(32) => params.h265_vps = Some(Bytes::copy_from_slice(nal)), + Some(33) => params.h265_sps = Some(Bytes::copy_from_slice(nal)), + Some(34) => params.h265_pps = Some(Bytes::copy_from_slice(nal)), + _ => {} + } + } + } + _ => {} + } +} + +fn split_annexb_nal_units(data: &[u8]) -> Vec<&[u8]> { + let mut nal_units = Vec::new(); + let mut cursor = 0usize; + + while let Some((start, start_code_len)) = find_annexb_start_code(data, cursor) { + let nal_start = start + start_code_len; + if nal_start >= data.len() { + break; + } + + let next_start = find_annexb_start_code(data, nal_start) + .map(|(idx, _)| idx) + .unwrap_or(data.len()); + + let mut nal_end = next_start; + while nal_end > nal_start && data[nal_end - 1] == 0 { + nal_end -= 1; + } + + if nal_end > nal_start { + nal_units.push(&data[nal_start..nal_end]); + } + + cursor = next_start; + } + + nal_units +} + +fn find_annexb_start_code(data: &[u8], from: usize) -> Option<(usize, usize)> { + if from >= data.len() { + return None; + } + + let mut i = from; + while i + 3 <= data.len() { + if i + 4 <= data.len() + && data[i] == 0 + && data[i + 1] == 0 + && data[i + 2] == 0 + && data[i + 3] == 1 + { + return Some((i, 4)); + } + + if data[i] == 0 && data[i + 1] == 0 && data[i + 2] == 1 { + return Some((i, 3)); + } + + i += 1; + } + + None +} + +fn h264_nal_type(nal: &[u8]) -> Option { + nal.first().map(|value| value & 0x1f) +} + +fn h265_nal_type(nal: &[u8]) -> Option { + nal.first().map(|value| (value >> 1) & 0x3f) +} + +fn build_h264_fmtp(payload_type: u8, params: &ParameterSets) -> String { + let mut attrs = vec!["packetization-mode=1".to_string()]; + + if let Some(sps) = params.h264_sps.as_ref() { + if let Some(profile_level_id) = parse_profile_level_id_from_sps(sps) { + attrs.push(format!("profile-level-id={}", profile_level_id)); + } + } else { + attrs.push("profile-level-id=42e01f".to_string()); + } + + if let (Some(sps), Some(pps)) = (params.h264_sps.as_ref(), params.h264_pps.as_ref()) { + let sps_b64 = base64::engine::general_purpose::STANDARD.encode(sps.as_ref()); + let pps_b64 = base64::engine::general_purpose::STANDARD.encode(pps.as_ref()); + attrs.push(format!("sprop-parameter-sets={},{}", sps_b64, pps_b64)); + } + + format!("{} {}", payload_type, attrs.join(";")) +} + +fn build_h265_fmtp(payload_type: u8, params: &ParameterSets) -> String { + let mut attrs = Vec::new(); + + if let Some(vps) = params.h265_vps.as_ref() { + attrs.push(format!( + "sprop-vps={}", + base64::engine::general_purpose::STANDARD.encode(vps.as_ref()) + )); + } + + if let Some(sps) = params.h265_sps.as_ref() { + attrs.push(format!( + "sprop-sps={}", + base64::engine::general_purpose::STANDARD.encode(sps.as_ref()) + )); + } + + if let Some(pps) = params.h265_pps.as_ref() { + attrs.push(format!( + "sprop-pps={}", + base64::engine::general_purpose::STANDARD.encode(pps.as_ref()) + )); + } + + if attrs.is_empty() { + format!("{} profile-id=1", payload_type) + } else { + format!("{} {}", payload_type, attrs.join(";")) + } +} + +fn build_sdp(config: &RtspConfig, codec: VideoCodecType, params: &ParameterSets) -> String { + let (payload_type, codec_name, fmtp_value) = match codec { + VideoCodecType::H264 => (96u8, "H264", build_h264_fmtp(96, params)), + VideoCodecType::H265 => (99u8, "H265", build_h265_fmtp(99, params)), + _ => (96u8, "H264", build_h264_fmtp(96, params)), + }; + + let session = sdp::Session { + origin: sdp::Origin { + username: Some("-".to_string()), + sess_id: "0".to_string(), + sess_version: 0, + nettype: "IN".to_string(), + addrtype: "IP4".to_string(), + unicast_address: config.bind.clone(), + }, + session_name: "One-KVM RTSP Stream".to_string(), + session_description: None, + uri: None, + emails: Vec::new(), + phones: Vec::new(), + connection: Some(sdp::Connection { + nettype: "IN".to_string(), + addrtype: "IP4".to_string(), + connection_address: "0.0.0.0".to_string(), + }), + bandwidths: Vec::new(), + times: vec![sdp::Time { + start_time: 0, + stop_time: 0, + repeats: Vec::new(), + }], + time_zones: Vec::new(), + key: None, + attributes: vec![sdp::Attribute { + attribute: "control".to_string(), + value: Some("*".to_string()), + }], + medias: vec![sdp::Media { + media: "video".to_string(), + port: 0, + num_ports: None, + proto: "RTP/AVP".to_string(), + fmt: payload_type.to_string(), + media_title: None, + connections: Vec::new(), + bandwidths: Vec::new(), + key: None, + attributes: vec![ + sdp::Attribute { + attribute: "rtpmap".to_string(), + value: Some(format!("{} {}/90000", payload_type, codec_name)), + }, + sdp::Attribute { + attribute: "fmtp".to_string(), + value: Some(fmtp_value), + }, + sdp::Attribute { + attribute: "control".to_string(), + value: Some("trackID=0".to_string()), + }, + ], + }], + }; + + let mut output = Vec::new(); + if let Err(err) = session.write(&mut output) { + tracing::warn!("Failed to serialize SDP with sdp-types: {}", err); + return String::new(); + } + + match String::from_utf8(output) { + Ok(sdp_text) => sdp_text, + Err(err) => { + tracing::warn!("Failed to convert SDP bytes to UTF-8: {}", err); + String::new() + } + } +} + +async fn send_simple_response( + stream: &mut W, + code: u16, + _reason: &str, + cseq: Option<&str>, + body: &str, +) -> Result<()> { + let mut builder = rtsp::Response::builder(rtsp::Version::V1_0, status_code_from_u16(code)); + if let Some(cseq) = cseq { + builder = builder.header(rtsp::headers::CSEQ, cseq); + } + + let response = builder.build(body.as_bytes().to_vec()); + + let mut data = Vec::new(); + response + .write(&mut data) + .map_err(|e| AppError::BadRequest(format!("failed to serialize RTSP response: {}", e)))?; + stream.write_all(&data).await?; + Ok(()) +} + +async fn send_response( + stream: &mut W, + req: &RtspRequest, + code: u16, + _reason: &str, + extra_headers: Vec<(String, String)>, + body: &str, + session_id: &str, +) -> Result<()> { + let cseq = req + .headers + .get("cseq") + .cloned() + .unwrap_or_else(|| "1".to_string()); + + let mut builder = rtsp::Response::builder(req.version, status_code_from_u16(code)) + .header(rtsp::headers::CSEQ, cseq.as_str()); + + if !session_id.is_empty() { + builder = builder.header(rtsp::headers::SESSION, session_id); + } + + for (name, value) in extra_headers { + let header_name = rtsp::HeaderName::try_from(name.as_str()).map_err(|e| { + AppError::BadRequest(format!("invalid RTSP header name {}: {}", name, e)) + })?; + builder = builder.header(header_name, value); + } + + let response = builder.build(body.as_bytes().to_vec()); + + let mut data = Vec::new(); + response + .write(&mut data) + .map_err(|e| AppError::BadRequest(format!("failed to serialize RTSP response: {}", e)))?; + stream.write_all(&data).await?; + Ok(()) +} + +fn status_code_from_u16(code: u16) -> rtsp::StatusCode { + match code { + 200 => rtsp::StatusCode::Ok, + 400 => rtsp::StatusCode::BadRequest, + 401 => rtsp::StatusCode::Unauthorized, + 404 => rtsp::StatusCode::NotFound, + 405 => rtsp::StatusCode::MethodNotAllowed, + 453 => rtsp::StatusCode::NotEnoughBandwidth, + 455 => rtsp::StatusCode::MethodNotValidInThisState, + _ => rtsp::StatusCode::InternalServerError, + } +} + +fn is_valid_rtsp_path(uri: &str, configured_path: &str) -> bool { + let normalized_cfg = configured_path.trim_matches('/'); + if normalized_cfg.is_empty() { + return false; + } + + let request_path = extract_rtsp_path(uri); + request_path == normalized_cfg +} + +fn extract_rtsp_path(uri: &str) -> String { + let raw_path = if let Some((_, remainder)) = uri.split_once("://") { + match remainder.find('/') { + Some(idx) => &remainder[idx..], + None => "/", + } + } else { + uri + }; + + raw_path + .split('?') + .next() + .unwrap_or(raw_path) + .split('#') + .next() + .unwrap_or(raw_path) + .trim_matches('/') + .to_string() +} + +fn is_frame_codec_match(frame: &EncodedVideoFrame, codec: &RtspCodec) -> bool { + matches!( + (frame.codec, codec), + (crate::video::encoder::registry::VideoEncoderType::H264, RtspCodec::H264) + | (crate::video::encoder::registry::VideoEncoderType::H265, RtspCodec::H265) + ) +} + +fn pts_to_rtp_timestamp(pts_ms: i64) -> u32 { + if pts_ms <= 0 { + return 0; + } + ((pts_ms as u64 * RTP_CLOCK_RATE as u64) / 1000) as u32 +} + +fn generate_session_id() -> String { + let mut rng = rand::rng(); + let value: u64 = rng.random(); + format!("{:016x}", value) +} + + +#[cfg(test)] +mod tests { + use super::*; + use tokio::io::{duplex, AsyncReadExt}; + + fn make_test_request(method: rtsp::Method) -> RtspRequest { + let mut headers = HashMap::new(); + headers.insert("cseq".to_string(), "7".to_string()); + RtspRequest { + method, + uri: "rtsp://127.0.0.1/live".to_string(), + version: rtsp::Version::V1_0, + headers, + } + } + + async fn read_response_from_duplex(mut client: tokio::io::DuplexStream) -> rtsp::Response> { + let mut buf = vec![0u8; 4096]; + let n = client.read(&mut buf).await.expect("failed to read rtsp response"); + assert!(n > 0); + let (message, consumed): (rtsp::Message>, usize) = + rtsp::Message::parse(&buf[..n]).expect("failed to parse rtsp response"); + assert_eq!(consumed, n); + + match message { + rtsp::Message::Response(response) => response, + _ => panic!("expected RTSP response"), + } + } + + #[tokio::test] + async fn play_control_teardown_returns_ok_and_stop() { + let req = make_test_request(rtsp::Method::Teardown); + let (client, mut server) = duplex(4096); + + let should_stop = handle_play_control_request(&mut server, &req, "session-1") + .await + .expect("control handling failed"); + assert!(should_stop); + + drop(server); + let response = read_response_from_duplex(client).await; + assert_eq!(response.status(), rtsp::StatusCode::Ok); + } + + #[tokio::test] + async fn play_control_pause_returns_method_not_allowed() { + let req = make_test_request(rtsp::Method::Pause); + let (client, mut server) = duplex(4096); + + let should_stop = handle_play_control_request(&mut server, &req, "session-1") + .await + .expect("control handling failed"); + assert!(!should_stop); + + drop(server); + let response = read_response_from_duplex(client).await; + assert_eq!(response.status(), rtsp::StatusCode::MethodNotAllowed); + } + + #[test] + fn build_sdp_h264_is_parseable_with_expected_video_attributes() { + let config = RtspConfig::default(); + let mut params = ParameterSets::default(); + params.h264_sps = Some(Bytes::from_static(&[0x67, 0x42, 0xe0, 0x1f, 0x96, 0x54])); + params.h264_pps = Some(Bytes::from_static(&[0x68, 0xce, 0x06, 0xe2])); + + let sdp_text = build_sdp(&config, VideoCodecType::H264, ¶ms); + assert!(!sdp_text.is_empty()); + + let session = sdp::Session::parse(sdp_text.as_bytes()).expect("sdp parse failed"); + assert_eq!(session.session_name, "One-KVM RTSP Stream"); + assert_eq!(session.medias.len(), 1); + + let media = &session.medias[0]; + assert_eq!(media.media, "video"); + assert_eq!(media.proto, "RTP/AVP"); + assert_eq!(media.fmt, "96"); + + let has_rtpmap = media.attributes.iter().any(|attr| { + attr.attribute == "rtpmap" && attr.value.as_deref() == Some("96 H264/90000") + }); + assert!(has_rtpmap); + + let fmtp_value = media + .attributes + .iter() + .find(|attr| attr.attribute == "fmtp") + .and_then(|attr| attr.value.as_deref()) + .expect("missing fmtp value"); + assert!(fmtp_value.starts_with("96 ")); + assert!(fmtp_value.contains("packetization-mode=1")); + assert!(fmtp_value.contains("sprop-parameter-sets=")); + } + + + #[test] + fn rtsp_path_matching_is_exact_after_normalization() { + assert!(is_valid_rtsp_path("rtsp://127.0.0.1/live", "live")); + assert!(is_valid_rtsp_path("rtsp://127.0.0.1/live/?token=1", "/live/")); + assert!(!is_valid_rtsp_path("rtsp://127.0.0.1/live2", "live")); + assert!(!is_valid_rtsp_path("rtsp://127.0.0.1/", "/")); + } + + #[test] + fn build_sdp_h265_is_parseable_with_expected_video_attributes() { + let config = RtspConfig::default(); + let mut params = ParameterSets::default(); + params.h265_vps = Some(Bytes::from_static(&[0x40, 0x01, 0x0c, 0x01])); + params.h265_sps = Some(Bytes::from_static(&[0x42, 0x01, 0x01, 0x60])); + params.h265_pps = Some(Bytes::from_static(&[0x44, 0x01, 0xc0, 0x73])); + + let sdp_text = build_sdp(&config, VideoCodecType::H265, ¶ms); + assert!(!sdp_text.is_empty()); + + let session = sdp::Session::parse(sdp_text.as_bytes()).expect("sdp parse failed"); + assert_eq!(session.medias.len(), 1); + + let media = &session.medias[0]; + assert_eq!(media.media, "video"); + assert_eq!(media.proto, "RTP/AVP"); + assert_eq!(media.fmt, "99"); + + let has_rtpmap = media.attributes.iter().any(|attr| { + attr.attribute == "rtpmap" && attr.value.as_deref() == Some("99 H265/90000") + }); + assert!(has_rtpmap); + + let fmtp_value = media + .attributes + .iter() + .find(|attr| attr.attribute == "fmtp") + .and_then(|attr| attr.value.as_deref()) + .expect("missing fmtp value"); + assert!(fmtp_value.starts_with("99 ")); + assert!(fmtp_value.contains("sprop-vps=")); + assert!(fmtp_value.contains("sprop-sps=")); + assert!(fmtp_value.contains("sprop-pps=")); + } +} diff --git a/src/rustdesk/connection.rs b/src/rustdesk/connection.rs index 9581bd91..c94bc3ac 100644 --- a/src/rustdesk/connection.rs +++ b/src/rustdesk/connection.rs @@ -23,6 +23,9 @@ use tracing::{debug, error, info, warn}; use crate::audio::AudioController; use crate::hid::{HidController, KeyEventType, KeyboardEvent, KeyboardModifiers}; +use crate::video::codec_constraints::{ + encoder_codec_to_id, encoder_codec_to_video_codec, video_codec_to_encoder_codec, +}; use crate::video::encoder::registry::{EncoderRegistry, VideoEncoderType}; use crate::video::encoder::BitratePreset; use crate::video::stream_manager::VideoStreamManager; @@ -627,7 +630,7 @@ impl Connection { // Select the best available video codec // Priority: H264 > H265 > VP8 > VP9 (H264/H265 leverage hardware encoding) - let negotiated = self.negotiate_video_codec(); + let negotiated = self.negotiate_video_codec().await; self.negotiated_codec = Some(negotiated); info!("Negotiated video codec: {:?}", negotiated); @@ -641,28 +644,49 @@ impl Connection { /// Negotiate video codec - select the best available encoder /// Priority: H264 > H265 > VP8 > VP9 (H264/H265 leverage hardware encoding on embedded devices) - fn negotiate_video_codec(&self) -> VideoEncoderType { + async fn negotiate_video_codec(&self) -> VideoEncoderType { let registry = EncoderRegistry::global(); + let constraints = self.current_codec_constraints().await; // Check availability in priority order // H264 is preferred because it has the best hardware encoder support (RKMPP, VAAPI, etc.) // and most RustDesk clients support H264 hardware decoding - if registry.is_format_available(VideoEncoderType::H264, false) { + if constraints.is_webrtc_codec_allowed(crate::video::encoder::VideoCodecType::H264) + && registry.is_format_available(VideoEncoderType::H264, false) + { return VideoEncoderType::H264; } - if registry.is_format_available(VideoEncoderType::H265, false) { + if constraints.is_webrtc_codec_allowed(crate::video::encoder::VideoCodecType::H265) + && registry.is_format_available(VideoEncoderType::H265, false) + { return VideoEncoderType::H265; } - if registry.is_format_available(VideoEncoderType::VP8, false) { + if constraints.is_webrtc_codec_allowed(crate::video::encoder::VideoCodecType::VP8) + && registry.is_format_available(VideoEncoderType::VP8, false) + { return VideoEncoderType::VP8; } - if registry.is_format_available(VideoEncoderType::VP9, false) { + if constraints.is_webrtc_codec_allowed(crate::video::encoder::VideoCodecType::VP9) + && registry.is_format_available(VideoEncoderType::VP9, false) + { return VideoEncoderType::VP9; } - // Fallback to H264 (should be available via hardware or software encoder) - warn!("No video encoder available, defaulting to H264"); - VideoEncoderType::H264 + // Fallback to preferred allowed codec + let preferred = constraints.preferred_webrtc_codec(); + warn!( + "No allowed encoder available in priority order, falling back to {}", + encoder_codec_to_id(video_codec_to_encoder_codec(preferred)) + ); + video_codec_to_encoder_codec(preferred) + } + + async fn current_codec_constraints(&self) -> crate::video::codec_constraints::StreamCodecConstraints { + if let Some(ref video_manager) = self.video_manager { + video_manager.codec_constraints().await + } else { + crate::video::codec_constraints::StreamCodecConstraints::unrestricted() + } } /// Handle misc message with Arc writer @@ -747,6 +771,17 @@ impl Connection { if let Some(new_codec) = requested_codec { // Check if this codec is different from current and available if self.negotiated_codec != Some(new_codec) { + let constraints = self.current_codec_constraints().await; + if !constraints + .is_webrtc_codec_allowed(encoder_codec_to_video_codec(new_codec)) + { + warn!( + "Client requested codec {:?} but it's blocked by constraints: {}", + new_codec, constraints.reason + ); + return Ok(()); + } + let registry = EncoderRegistry::global(); if registry.is_format_available(new_codec, false) { info!( @@ -1080,12 +1115,21 @@ impl Connection { if success { // Dynamically detect available encoders let registry = EncoderRegistry::global(); + let constraints = self.current_codec_constraints().await; // Check which encoders are available (include software fallback) - let h264_available = registry.is_format_available(VideoEncoderType::H264, false); - let h265_available = registry.is_format_available(VideoEncoderType::H265, false); - let vp8_available = registry.is_format_available(VideoEncoderType::VP8, false); - let vp9_available = registry.is_format_available(VideoEncoderType::VP9, false); + let h264_available = constraints + .is_webrtc_codec_allowed(crate::video::encoder::VideoCodecType::H264) + && registry.is_format_available(VideoEncoderType::H264, false); + let h265_available = constraints + .is_webrtc_codec_allowed(crate::video::encoder::VideoCodecType::H265) + && registry.is_format_available(VideoEncoderType::H265, false); + let vp8_available = constraints + .is_webrtc_codec_allowed(crate::video::encoder::VideoCodecType::VP8) + && registry.is_format_available(VideoEncoderType::VP8, false); + let vp9_available = constraints + .is_webrtc_codec_allowed(crate::video::encoder::VideoCodecType::VP9) + && registry.is_format_available(VideoEncoderType::VP9, false); info!( "Server encoding capabilities: H264={}, H265={}, VP8={}, VP9={}", diff --git a/src/state.rs b/src/state.rs index 83d8237f..f1309171 100644 --- a/src/state.rs +++ b/src/state.rs @@ -13,6 +13,7 @@ use crate::extensions::ExtensionManager; use crate::hid::HidController; use crate::msd::MsdController; use crate::otg::OtgService; +use crate::rtsp::RtspService; use crate::rustdesk::RustDeskService; use crate::video::VideoStreamManager; @@ -50,6 +51,8 @@ pub struct AppState { pub audio: Arc, /// RustDesk remote access service (optional) pub rustdesk: Arc>>>, + /// RTSP streaming service (optional) + pub rtsp: Arc>>>, /// Extension manager (ttyd, gostc, easytier) pub extensions: Arc, /// Event bus for real-time notifications @@ -76,6 +79,7 @@ impl AppState { atx: Option, audio: Arc, rustdesk: Option>, + rtsp: Option>, extensions: Arc, events: Arc, shutdown_tx: broadcast::Sender<()>, @@ -92,6 +96,7 @@ impl AppState { atx: Arc::new(RwLock::new(atx)), audio, rustdesk: Arc::new(RwLock::new(rustdesk)), + rtsp: Arc::new(RwLock::new(rtsp)), extensions, events, shutdown_tx, diff --git a/src/video/codec_constraints.rs b/src/video/codec_constraints.rs new file mode 100644 index 00000000..cb9ff711 --- /dev/null +++ b/src/video/codec_constraints.rs @@ -0,0 +1,193 @@ +use crate::config::{AppConfig, RtspCodec, StreamMode}; +use crate::error::Result; +use crate::video::encoder::registry::VideoEncoderType; +use crate::video::encoder::VideoCodecType; +use crate::video::VideoStreamManager; +use std::sync::Arc; + +#[derive(Debug, Clone)] +pub struct StreamCodecConstraints { + pub rustdesk_enabled: bool, + pub rtsp_enabled: bool, + pub allowed_webrtc_codecs: Vec, + pub allow_mjpeg: bool, + pub locked_codec: Option, + pub reason: String, +} + +#[derive(Debug, Clone)] +pub struct ConstraintEnforcementResult { + pub changed: bool, + pub message: Option, +} + +impl StreamCodecConstraints { + pub fn unrestricted() -> Self { + Self { + rustdesk_enabled: false, + rtsp_enabled: false, + allowed_webrtc_codecs: vec![ + VideoCodecType::H264, + VideoCodecType::H265, + VideoCodecType::VP8, + VideoCodecType::VP9, + ], + allow_mjpeg: true, + locked_codec: None, + reason: "No codec lock active".to_string(), + } + } + + pub fn from_config(config: &AppConfig) -> Self { + let rustdesk_enabled = config.rustdesk.enabled; + let rtsp_enabled = config.rtsp.enabled; + + if rtsp_enabled { + let locked_codec = match config.rtsp.codec { + RtspCodec::H264 => VideoCodecType::H264, + RtspCodec::H265 => VideoCodecType::H265, + }; + return Self { + rustdesk_enabled, + rtsp_enabled, + allowed_webrtc_codecs: vec![locked_codec], + allow_mjpeg: false, + locked_codec: Some(locked_codec), + reason: if rustdesk_enabled { + format!( + "RTSP enabled with codec lock ({:?}) and RustDesk enabled", + locked_codec + ) + } else { + format!("RTSP enabled with codec lock ({:?})", locked_codec) + }, + }; + } + + if rustdesk_enabled { + return Self { + rustdesk_enabled, + rtsp_enabled, + allowed_webrtc_codecs: vec![ + VideoCodecType::H264, + VideoCodecType::H265, + VideoCodecType::VP8, + VideoCodecType::VP9, + ], + allow_mjpeg: false, + locked_codec: None, + reason: "RustDesk enabled, MJPEG disabled".to_string(), + }; + } + + Self::unrestricted() + } + + pub fn is_mjpeg_allowed(&self) -> bool { + self.allow_mjpeg + } + + pub fn is_webrtc_codec_allowed(&self, codec: VideoCodecType) -> bool { + self.allowed_webrtc_codecs.contains(&codec) + } + + pub fn preferred_webrtc_codec(&self) -> VideoCodecType { + if let Some(codec) = self.locked_codec { + return codec; + } + self.allowed_webrtc_codecs + .first() + .copied() + .unwrap_or(VideoCodecType::H264) + } + + pub fn allowed_codecs_for_api(&self) -> Vec<&'static str> { + let mut codecs = Vec::new(); + if self.allow_mjpeg { + codecs.push("mjpeg"); + } + for codec in &self.allowed_webrtc_codecs { + codecs.push(codec_to_id(*codec)); + } + codecs + } +} + +pub async fn enforce_constraints_with_stream_manager( + stream_manager: &Arc, + constraints: &StreamCodecConstraints, +) -> Result { + let current_mode = stream_manager.current_mode().await; + + if current_mode == StreamMode::Mjpeg && !constraints.allow_mjpeg { + let target_codec = constraints.preferred_webrtc_codec(); + stream_manager.set_video_codec(target_codec).await?; + let _ = stream_manager + .switch_mode_transaction(StreamMode::WebRTC) + .await?; + return Ok(ConstraintEnforcementResult { + changed: true, + message: Some(format!( + "Auto-switched from MJPEG to {} due to codec lock", + codec_to_id(target_codec) + )), + }); + } + + if current_mode == StreamMode::WebRTC { + let current_codec = stream_manager.webrtc_streamer().current_video_codec().await; + if !constraints.is_webrtc_codec_allowed(current_codec) { + let target_codec = constraints.preferred_webrtc_codec(); + stream_manager.set_video_codec(target_codec).await?; + return Ok(ConstraintEnforcementResult { + changed: true, + message: Some(format!( + "Auto-switched codec from {} to {} due to codec lock", + codec_to_id(current_codec), + codec_to_id(target_codec) + )), + }); + } + } + + Ok(ConstraintEnforcementResult { + changed: false, + message: None, + }) +} + +pub fn codec_to_id(codec: VideoCodecType) -> &'static str { + match codec { + VideoCodecType::H264 => "h264", + VideoCodecType::H265 => "h265", + VideoCodecType::VP8 => "vp8", + VideoCodecType::VP9 => "vp9", + } +} + +pub fn encoder_codec_to_id(codec: VideoEncoderType) -> &'static str { + match codec { + VideoEncoderType::H264 => "h264", + VideoEncoderType::H265 => "h265", + VideoEncoderType::VP8 => "vp8", + VideoEncoderType::VP9 => "vp9", + } +} + +pub fn video_codec_to_encoder_codec(codec: VideoCodecType) -> VideoEncoderType { + match codec { + VideoCodecType::H264 => VideoEncoderType::H264, + VideoCodecType::H265 => VideoEncoderType::H265, + VideoCodecType::VP8 => VideoEncoderType::VP8, + VideoCodecType::VP9 => VideoEncoderType::VP9, + } +} + +pub fn encoder_codec_to_video_codec(codec: VideoEncoderType) -> VideoCodecType { + match codec { + VideoEncoderType::H264 => VideoCodecType::H264, + VideoEncoderType::H265 => VideoCodecType::H265, + VideoEncoderType::VP8 => VideoCodecType::VP8, + VideoEncoderType::VP9 => VideoCodecType::VP9, + } +} diff --git a/src/video/mod.rs b/src/video/mod.rs index f13385a9..1ea9600b 100644 --- a/src/video/mod.rs +++ b/src/video/mod.rs @@ -3,6 +3,7 @@ //! This module provides V4L2 video capture, encoding, and streaming functionality. pub mod capture; +pub mod codec_constraints; pub mod convert; pub mod decoder; pub mod device; diff --git a/src/video/stream_manager.rs b/src/video/stream_manager.rs index a2734a5f..b5beb25f 100644 --- a/src/video/stream_manager.rs +++ b/src/video/stream_manager.rs @@ -37,6 +37,7 @@ use crate::error::Result; use crate::events::{EventBus, SystemEvent, VideoDeviceInfo}; use crate::hid::HidController; use crate::stream::MjpegStreamHandler; +use crate::video::codec_constraints::StreamCodecConstraints; use crate::video::format::{PixelFormat, Resolution}; use crate::video::streamer::{Streamer, StreamerState}; use crate::webrtc::WebRtcStreamer; @@ -144,6 +145,16 @@ impl VideoStreamManager { *self.config_store.write().await = Some(config); } + /// Get current stream codec constraints derived from global configuration. + pub async fn codec_constraints(&self) -> StreamCodecConstraints { + if let Some(ref config_store) = *self.config_store.read().await { + let config = config_store.get(); + StreamCodecConstraints::from_config(&config) + } else { + StreamCodecConstraints::unrestricted() + } + } + /// Get current streaming mode pub async fn current_mode(&self) -> StreamMode { self.mode.read().await.clone() diff --git a/src/web/handlers/config/apply.rs b/src/web/handlers/config/apply.rs index 7af5d15a..e4a16036 100644 --- a/src/web/handlers/config/apply.rs +++ b/src/web/handlers/config/apply.rs @@ -7,7 +7,11 @@ use std::sync::Arc; use crate::config::*; use crate::error::{AppError, Result}; use crate::events::SystemEvent; +use crate::rtsp::RtspService; use crate::state::AppState; +use crate::video::codec_constraints::{ + enforce_constraints_with_stream_manager, StreamCodecConstraints, +}; /// 应用 Video 配置变更 pub async fn apply_video_config( @@ -444,6 +448,15 @@ pub async fn apply_audio_config( Ok(()) } +/// Apply stream codec constraints derived from global config. +pub async fn enforce_stream_codec_constraints(state: &Arc) -> Result> { + let config = state.config.get(); + let constraints = StreamCodecConstraints::from_config(&config); + let enforcement = + enforce_constraints_with_stream_manager(&state.stream_manager, &constraints).await?; + Ok(enforcement.message) +} + /// 应用 RustDesk 配置变更 pub async fn apply_rustdesk_config( state: &Arc, @@ -453,6 +466,7 @@ pub async fn apply_rustdesk_config( tracing::info!("Applying RustDesk config changes..."); let mut rustdesk_guard = state.rustdesk.write().await; + let mut credentials_to_save = None; // Check if service needs to be stopped if old_config.enabled && !new_config.enabled { @@ -464,7 +478,6 @@ pub async fn apply_rustdesk_config( tracing::info!("RustDesk service stopped"); } *rustdesk_guard = None; - return Ok(()); } // Check if service needs to be started or restarted @@ -473,8 +486,6 @@ pub async fn apply_rustdesk_config( || old_config.device_id != new_config.device_id || old_config.device_password != new_config.device_password; - let mut credentials_to_save = None; - if rustdesk_guard.is_none() { // Create new service tracing::info!("Initializing RustDesk service..."); @@ -507,28 +518,82 @@ pub async fn apply_rustdesk_config( } } } + } - // Save credentials to persistent config store (outside the lock) - drop(rustdesk_guard); - if let Some(updated_config) = credentials_to_save { - tracing::info!("Saving RustDesk credentials to config store..."); - if let Err(e) = state - .config - .update(|cfg| { - cfg.rustdesk.public_key = updated_config.public_key.clone(); - cfg.rustdesk.private_key = updated_config.private_key.clone(); - cfg.rustdesk.signing_public_key = updated_config.signing_public_key.clone(); - cfg.rustdesk.signing_private_key = updated_config.signing_private_key.clone(); - cfg.rustdesk.uuid = updated_config.uuid.clone(); - }) - .await - { - tracing::warn!("Failed to save RustDesk credentials: {}", e); - } else { - tracing::info!("RustDesk credentials saved successfully"); - } + // Save credentials to persistent config store (outside the lock) + drop(rustdesk_guard); + if let Some(updated_config) = credentials_to_save { + tracing::info!("Saving RustDesk credentials to config store..."); + if let Err(e) = state + .config + .update(|cfg| { + cfg.rustdesk.public_key = updated_config.public_key.clone(); + cfg.rustdesk.private_key = updated_config.private_key.clone(); + cfg.rustdesk.signing_public_key = updated_config.signing_public_key.clone(); + cfg.rustdesk.signing_private_key = updated_config.signing_private_key.clone(); + cfg.rustdesk.uuid = updated_config.uuid.clone(); + }) + .await + { + tracing::warn!("Failed to save RustDesk credentials: {}", e); + } else { + tracing::info!("RustDesk credentials saved successfully"); } } + if let Some(message) = enforce_stream_codec_constraints(state).await? { + tracing::info!("{}", message); + } + + Ok(()) +} + +/// 应用 RTSP 配置变更 +pub async fn apply_rtsp_config( + state: &Arc, + old_config: &RtspConfig, + new_config: &RtspConfig, +) -> Result<()> { + tracing::info!("Applying RTSP config changes..."); + + let mut rtsp_guard = state.rtsp.write().await; + + if old_config.enabled && !new_config.enabled { + if let Some(ref service) = *rtsp_guard { + if let Err(e) = service.stop().await { + tracing::error!("Failed to stop RTSP service: {}", e); + } + } + *rtsp_guard = None; + } + + if new_config.enabled { + let need_restart = old_config.bind != new_config.bind + || old_config.port != new_config.port + || old_config.path != new_config.path + || old_config.codec != new_config.codec + || old_config.username != new_config.username + || old_config.password != new_config.password + || old_config.allow_one_client != new_config.allow_one_client; + + if rtsp_guard.is_none() { + let service = RtspService::new(new_config.clone(), state.stream_manager.clone()); + service.start().await?; + tracing::info!("RTSP service started"); + *rtsp_guard = Some(Arc::new(service)); + } else if need_restart { + if let Some(ref service) = *rtsp_guard { + service.restart(new_config.clone()).await?; + tracing::info!("RTSP service restarted"); + } + } + } + + drop(rtsp_guard); + + if let Some(message) = enforce_stream_codec_constraints(state).await? { + tracing::info!("{}", message); + } + Ok(()) } diff --git a/src/web/handlers/config/mod.rs b/src/web/handlers/config/mod.rs index 3a28bc10..9e133e46 100644 --- a/src/web/handlers/config/mod.rs +++ b/src/web/handlers/config/mod.rs @@ -25,6 +25,7 @@ mod auth; mod hid; mod msd; mod rustdesk; +mod rtsp; mod stream; pub(crate) mod video; mod web; @@ -39,6 +40,7 @@ pub use rustdesk::{ get_device_password, get_rustdesk_config, get_rustdesk_status, regenerate_device_id, regenerate_device_password, update_rustdesk_config, }; +pub use rtsp::{get_rtsp_config, get_rtsp_status, update_rtsp_config}; pub use stream::{get_stream_config, update_stream_config}; pub use video::{get_video_config, update_video_config}; pub use web::{get_web_config, update_web_config}; @@ -64,6 +66,9 @@ fn sanitize_config_for_api(config: &mut AppConfig) { config.rustdesk.private_key = None; config.rustdesk.signing_public_key = None; config.rustdesk.signing_private_key = None; + + // RTSP secrets + config.rtsp.password = None; } /// 获取完整配置 diff --git a/src/web/handlers/config/rtsp.rs b/src/web/handlers/config/rtsp.rs new file mode 100644 index 00000000..16b395fd --- /dev/null +++ b/src/web/handlers/config/rtsp.rs @@ -0,0 +1,67 @@ +use axum::{extract::State, Json}; +use std::sync::Arc; + +use crate::error::{AppError, Result}; +use crate::state::AppState; + +use super::apply::apply_rtsp_config; +use super::types::{RtspConfigResponse, RtspConfigUpdate, RtspStatusResponse}; + +/// Get RTSP config +pub async fn get_rtsp_config(State(state): State>) -> Json { + let config = state.config.get(); + Json(RtspConfigResponse::from(&config.rtsp)) +} + +/// Get RTSP status (config + service status) +pub async fn get_rtsp_status(State(state): State>) -> Json { + let config = state.config.get().rtsp.clone(); + let status = { + let guard = state.rtsp.read().await; + if let Some(ref service) = *guard { + service.status().await + } else { + crate::rtsp::RtspServiceStatus::Stopped + } + }; + + Json(RtspStatusResponse::new(&config, status)) +} + +/// Update RTSP config +pub async fn update_rtsp_config( + State(state): State>, + Json(req): Json, +) -> Result> { + req.validate()?; + + let old_config = state.config.get().rtsp.clone(); + + state + .config + .update(|config| { + req.apply_to(&mut config.rtsp); + }) + .await?; + + let new_config = state.config.get().rtsp.clone(); + if let Err(err) = apply_rtsp_config(&state, &old_config, &new_config).await { + tracing::error!("Failed to apply RTSP config: {}", err); + if let Err(rollback_err) = state + .config + .update(|config| { + config.rtsp = old_config.clone(); + }) + .await + { + tracing::error!("Failed to rollback RTSP config after apply failure: {}", rollback_err); + return Err(AppError::ServiceUnavailable(format!( + "RTSP apply failed: {}; rollback failed: {}", + err, rollback_err + ))); + } + return Err(err); + } + + Ok(Json(RtspConfigResponse::from(&new_config))) +} diff --git a/src/web/handlers/config/rustdesk.rs b/src/web/handlers/config/rustdesk.rs index ae1a9648..9e1e0460 100644 --- a/src/web/handlers/config/rustdesk.rs +++ b/src/web/handlers/config/rustdesk.rs @@ -106,6 +106,15 @@ pub async fn update_rustdesk_config( tracing::error!("Failed to apply RustDesk config: {}", e); } + // Share a non-sensitive summary for frontend UX + let constraints = state.stream_manager.codec_constraints().await; + if constraints.rustdesk_enabled || constraints.rtsp_enabled { + tracing::info!( + "Stream codec constraints active after RustDesk update: {}", + constraints.reason + ); + } + Ok(Json(RustDeskConfigResponse::from(&new_config))) } diff --git a/src/web/handlers/config/stream.rs b/src/web/handlers/config/stream.rs index a5705ac7..5f0c3234 100644 --- a/src/web/handlers/config/stream.rs +++ b/src/web/handlers/config/stream.rs @@ -42,5 +42,10 @@ pub async fn update_stream_config( tracing::error!("Failed to apply stream config: {}", e); } + // 6. Enforce codec constraints after any stream config update + if let Err(e) = super::apply::enforce_stream_codec_constraints(&state).await { + tracing::error!("Failed to enforce stream codec constraints: {}", e); + } + Ok(Json(StreamConfigResponse::from(&new_stream_config))) } diff --git a/src/web/handlers/config/types.rs b/src/web/handlers/config/types.rs index 3e500085..cffef3a1 100644 --- a/src/web/handlers/config/types.rs +++ b/src/web/handlers/config/types.rs @@ -1,5 +1,6 @@ use crate::config::*; use crate::error::AppError; +use crate::rtsp::RtspServiceStatus; use crate::rustdesk::config::RustDeskConfig; use crate::video::encoder::BitratePreset; use serde::Deserialize; @@ -604,6 +605,124 @@ impl RustDeskConfigUpdate { } } +// ===== RTSP Config ===== +#[typeshare] +#[derive(Debug, serde::Serialize)] +pub struct RtspConfigResponse { + pub enabled: bool, + pub bind: String, + pub port: u16, + pub path: String, + pub allow_one_client: bool, + pub codec: RtspCodec, + pub username: Option, + pub has_password: bool, +} + +impl From<&RtspConfig> for RtspConfigResponse { + fn from(config: &RtspConfig) -> Self { + Self { + enabled: config.enabled, + bind: config.bind.clone(), + port: config.port, + path: config.path.clone(), + allow_one_client: config.allow_one_client, + codec: config.codec.clone(), + username: config.username.clone(), + has_password: config.password.is_some(), + } + } +} + +#[typeshare] +#[derive(Debug, serde::Serialize)] +pub struct RtspStatusResponse { + pub config: RtspConfigResponse, + pub service_status: String, +} + +impl RtspStatusResponse { + pub fn new(config: &RtspConfig, status: RtspServiceStatus) -> Self { + Self { + config: RtspConfigResponse::from(config), + service_status: status.to_string(), + } + } +} + +#[typeshare] +#[derive(Debug, Deserialize)] +pub struct RtspConfigUpdate { + pub enabled: Option, + pub bind: Option, + pub port: Option, + pub path: Option, + pub allow_one_client: Option, + pub codec: Option, + pub username: Option, + pub password: Option, +} + +impl RtspConfigUpdate { + pub fn validate(&self) -> crate::error::Result<()> { + if let Some(port) = self.port { + if port == 0 { + return Err(AppError::BadRequest("RTSP port cannot be 0".into())); + } + } + + if let Some(ref bind) = self.bind { + if bind.parse::().is_err() { + return Err(AppError::BadRequest("RTSP bind must be a valid IP".into())); + } + } + + if let Some(ref path) = self.path { + let normalized = path.trim_matches('/'); + if normalized.is_empty() { + return Err(AppError::BadRequest("RTSP path cannot be empty".into())); + } + } + + Ok(()) + } + + pub fn apply_to(&self, config: &mut RtspConfig) { + if let Some(enabled) = self.enabled { + config.enabled = enabled; + } + if let Some(ref bind) = self.bind { + config.bind = bind.clone(); + } + if let Some(port) = self.port { + config.port = port; + } + if let Some(ref path) = self.path { + config.path = path.trim_matches('/').to_string(); + } + if let Some(allow_one_client) = self.allow_one_client { + config.allow_one_client = allow_one_client; + } + if let Some(codec) = self.codec.clone() { + config.codec = codec; + } + if let Some(ref username) = self.username { + config.username = if username.is_empty() { + None + } else { + Some(username.clone()) + }; + } + if let Some(ref password) = self.password { + config.password = if password.is_empty() { + None + } else { + Some(password.clone()) + }; + } + } +} + // ===== Web Config ===== #[typeshare] #[derive(Debug, Deserialize)] diff --git a/src/web/handlers/mod.rs b/src/web/handlers/mod.rs index 6c3d9f6c..47e19fce 100644 --- a/src/web/handlers/mod.rs +++ b/src/web/handlers/mod.rs @@ -14,6 +14,7 @@ use crate::config::{AppConfig, StreamMode}; use crate::error::{AppError, Result}; use crate::events::SystemEvent; use crate::state::AppState; +use crate::video::codec_constraints::codec_to_id; use crate::video::encoder::BitratePreset; // ============================================================================ @@ -747,6 +748,17 @@ pub async fn setup_init( } } + // Start RTSP if enabled + if new_config.rtsp.enabled { + let empty_config = crate::config::RtspConfig::default(); + if let Err(e) = config::apply::apply_rtsp_config(&state, &empty_config, &new_config.rtsp).await + { + tracing::warn!("Failed to start RTSP during setup: {}", e); + } else { + tracing::info!("RTSP started during setup"); + } + } + // Start audio streaming if audio device was selected during setup if new_config.audio.enabled { let audio_config = crate::audio::AudioControllerConfig { @@ -1439,6 +1451,8 @@ pub async fn stream_mode_set( ) -> Result> { use crate::video::encoder::VideoCodecType; + let constraints = state.stream_manager.codec_constraints().await; + let mode_lower = req.mode.to_lowercase(); let (new_mode, video_codec) = match mode_lower.as_str() { "mjpeg" => (StreamMode::Mjpeg, None), @@ -1454,6 +1468,23 @@ pub async fn stream_mode_set( } }; + if new_mode == StreamMode::Mjpeg && !constraints.is_mjpeg_allowed() { + return Err(AppError::BadRequest(format!( + "Codec 'mjpeg' is not allowed: {}", + constraints.reason + ))); + } + + if let Some(codec) = video_codec { + if !constraints.is_webrtc_codec_allowed(codec) { + return Err(AppError::BadRequest(format!( + "Codec '{}' is not allowed: {}", + codec_to_id(codec), + constraints.reason + ))); + } + } + // Set video codec if switching to WebRTC mode with specific codec if let Some(codec) = video_codec { info!("Setting WebRTC video codec to {:?}", codec); @@ -1560,6 +1591,67 @@ pub struct AvailableCodecsResponse { pub codecs: Vec, } +/// Stream constraints response +#[derive(Serialize)] +pub struct StreamConstraintsResponse { + pub success: bool, + pub allowed_codecs: Vec, + pub locked_codec: Option, + pub disallow_mjpeg: bool, + pub sources: ConstraintSources, + pub reason: String, + pub current_mode: String, +} + +#[derive(Serialize)] +pub struct ConstraintSources { + pub rustdesk: bool, + pub rtsp: bool, +} + +/// Get stream codec constraints derived from enabled services. +pub async fn stream_constraints_get( + State(state): State>, +) -> Json { + use crate::video::encoder::VideoCodecType; + + let constraints = state.stream_manager.codec_constraints().await; + let current_mode = state.stream_manager.current_mode().await; + let current_mode = match current_mode { + StreamMode::Mjpeg => "mjpeg".to_string(), + StreamMode::WebRTC => { + let codec = state + .stream_manager + .webrtc_streamer() + .current_video_codec() + .await; + match codec { + VideoCodecType::H264 => "h264".to_string(), + VideoCodecType::H265 => "h265".to_string(), + VideoCodecType::VP8 => "vp8".to_string(), + VideoCodecType::VP9 => "vp9".to_string(), + } + } + }; + + Json(StreamConstraintsResponse { + success: true, + allowed_codecs: constraints + .allowed_codecs_for_api() + .into_iter() + .map(str::to_string) + .collect(), + locked_codec: constraints.locked_codec.map(codec_to_id).map(str::to_string), + disallow_mjpeg: !constraints.allow_mjpeg, + sources: ConstraintSources { + rustdesk: constraints.rustdesk_enabled, + rtsp: constraints.rtsp_enabled, + }, + reason: constraints.reason, + current_mode, + }) +} + /// Set bitrate request #[derive(Deserialize)] pub struct SetBitrateRequest { diff --git a/src/web/routes.rs b/src/web/routes.rs index 02e74bf5..02e7bfa9 100644 --- a/src/web/routes.rs +++ b/src/web/routes.rs @@ -50,6 +50,7 @@ pub fn create_router(state: Arc) -> Router { .route("/stream/mode", post(handlers::stream_mode_set)) .route("/stream/bitrate", post(handlers::stream_set_bitrate)) .route("/stream/codecs", get(handlers::stream_codecs_list)) + .route("/stream/constraints", get(handlers::stream_constraints_get)) // WebRTC endpoints .route("/webrtc/session", post(handlers::webrtc_create_session)) .route("/webrtc/offer", post(handlers::webrtc_offer)) @@ -120,6 +121,13 @@ pub fn create_router(state: Arc) -> Router { "/config/rustdesk/regenerate-password", post(handlers::config::regenerate_device_password), ) + // RTSP configuration endpoints + .route("/config/rtsp", get(handlers::config::get_rtsp_config)) + .route("/config/rtsp", patch(handlers::config::update_rtsp_config)) + .route( + "/config/rtsp/status", + get(handlers::config::get_rtsp_status), + ) // Web server configuration .route("/config/web", get(handlers::config::get_web_config)) .route("/config/web", patch(handlers::config::update_web_config)) diff --git a/web/src/api/config.ts b/web/src/api/config.ts index 8edef184..d48a5e8c 100644 --- a/web/src/api/config.ts +++ b/web/src/api/config.ts @@ -330,6 +330,49 @@ export const rustdeskConfigApi = { }), } +// ===== RTSP 配置 API ===== + +export type RtspCodec = 'h264' | 'h265' + +export interface RtspConfigResponse { + enabled: boolean + bind: string + port: number + path: string + allow_one_client: boolean + codec: RtspCodec + username?: string | null + has_password: boolean +} + +export interface RtspConfigUpdate { + enabled?: boolean + bind?: string + port?: number + path?: string + allow_one_client?: boolean + codec?: RtspCodec + username?: string + password?: string +} + +export interface RtspStatusResponse { + config: RtspConfigResponse + service_status: string +} + +export const rtspConfigApi = { + get: () => request('/config/rtsp'), + + update: (config: RtspConfigUpdate) => + request('/config/rtsp', { + method: 'PATCH', + body: JSON.stringify(config), + }), + + getStatus: () => request('/config/rtsp/status'), +} + // ===== Web 服务器配置 API ===== /** Web 服务器配置 */ diff --git a/web/src/api/index.ts b/web/src/api/index.ts index 5cc503ae..1d433ae1 100644 --- a/web/src/api/index.ts +++ b/web/src/api/index.ts @@ -124,6 +124,19 @@ export interface AvailableCodecsResponse { codecs: VideoCodecInfo[] } +export interface StreamConstraintsResponse { + success: boolean + allowed_codecs: string[] + locked_codec: string | null + disallow_mjpeg: boolean + sources: { + rustdesk: boolean + rtsp: boolean + } + reason: string + current_mode: string +} + export const streamApi = { status: () => request<{ @@ -161,6 +174,9 @@ export const streamApi = { getCodecs: () => request('/stream/codecs'), + getConstraints: () => + request('/stream/constraints'), + setBitratePreset: (bitrate_preset: import('@/types/generated').BitratePreset) => request<{ success: boolean; message?: string }>('/stream/bitrate', { method: 'POST', @@ -536,11 +552,15 @@ export { audioConfigApi, extensionsApi, rustdeskConfigApi, + rtspConfigApi, webConfigApi, type RustDeskConfigResponse, type RustDeskStatusResponse, type RustDeskConfigUpdate, type RustDeskPasswordResponse, + type RtspConfigResponse, + type RtspConfigUpdate, + type RtspStatusResponse, type WebConfig, } from './config' diff --git a/web/src/components/VideoConfigPopover.vue b/web/src/components/VideoConfigPopover.vue index 3076f696..96625bb6 100644 --- a/web/src/components/VideoConfigPopover.vue +++ b/web/src/components/VideoConfigPopover.vue @@ -19,7 +19,14 @@ import { } from '@/components/ui/select' import { Monitor, RefreshCw, Loader2, Settings, Zap, Scale, Image } from 'lucide-vue-next' import HelpTooltip from '@/components/HelpTooltip.vue' -import { configApi, streamApi, type VideoCodecInfo, type EncoderBackendInfo, type BitratePreset } from '@/api' +import { + configApi, + streamApi, + type VideoCodecInfo, + type EncoderBackendInfo, + type BitratePreset, + type StreamConstraintsResponse, +} from '@/api' import { useConfigStore } from '@/stores/config' import { useRouter } from 'vue-router' @@ -64,6 +71,7 @@ const loadingCodecs = ref(false) // Backend list const backends = ref([]) +const constraints = ref(null) const currentEncoderBackend = computed(() => configStore.stream?.encoder || 'auto') // Browser supported codecs (WebRTC receive capabilities) @@ -220,7 +228,7 @@ const availableCodecs = computed(() => { const backend = backends.value.find(b => b.id === currentEncoderBackend.value) if (!backend) return allAvailable - return allAvailable + const backendFiltered = allAvailable .filter(codec => { // MJPEG is always available (doesn't require encoder) if (codec.id === 'mjpeg') return true @@ -238,6 +246,13 @@ const availableCodecs = computed(() => { backend: backend.name, } }) + + const allowed = constraints.value?.allowed_codecs + if (!allowed || allowed.length === 0) { + return backendFiltered + } + + return backendFiltered.filter(codec => allowed.includes(codec.id)) }) // Cascading filters @@ -303,6 +318,14 @@ async function loadCodecs() { } } +async function loadConstraints() { + try { + constraints.value = await streamApi.getConstraints() + } catch { + constraints.value = null + } +} + // Navigate to settings page (video tab) function goToSettings() { router.push('/settings?tab=video') @@ -339,6 +362,12 @@ function syncFromCurrentIfChanged() { // Handle video mode change function handleVideoModeChange(mode: unknown) { if (typeof mode !== 'string') return + + if (constraints.value?.allowed_codecs?.length && !constraints.value.allowed_codecs.includes(mode)) { + toast.error(constraints.value.reason || t('actionbar.selectMode')) + return + } + emit('update:videoMode', mode as VideoMode) } @@ -466,6 +495,8 @@ watch(() => props.open, (isOpen) => { loadCodecs() } + loadConstraints() + Promise.all([ configStore.refreshVideo(), configStore.refreshStream(), diff --git a/web/src/types/generated.ts b/web/src/types/generated.ts index b2fd535a..a88f18d6 100644 --- a/web/src/types/generated.ts +++ b/web/src/types/generated.ts @@ -357,6 +357,30 @@ export interface RustDeskConfig { device_id: string; } +/** RTSP output codec */ +export enum RtspCodec { + H264 = "h264", + H265 = "h265", +} + +/** RTSP configuration */ +export interface RtspConfig { + /** Enable RTSP output */ + enabled: boolean; + /** Bind IP address */ + bind: string; + /** RTSP TCP listen port */ + port: number; + /** Stream path (without leading slash) */ + path: string; + /** Allow only one client connection at a time */ + allow_one_client: boolean; + /** Output codec (H264/H265) */ + codec: RtspCodec; + /** Optional username for authentication */ + username?: string; +} + /** Main application configuration */ export interface AppConfig { /** Whether initial setup has been completed */ @@ -381,6 +405,8 @@ export interface AppConfig { extensions: ExtensionsConfig; /** RustDesk remote access settings */ rustdesk: RustDeskConfig; + /** RTSP streaming settings */ + rtsp: RtspConfig; } /** Update for a single ATX key configuration */ @@ -557,6 +583,33 @@ export interface MsdConfigUpdate { msd_dir?: string; } +export interface RtspConfigResponse { + enabled: boolean; + bind: string; + port: number; + path: string; + allow_one_client: boolean; + codec: RtspCodec; + username?: string; + has_password: boolean; +} + +export interface RtspConfigUpdate { + enabled?: boolean; + bind?: string; + port?: number; + path?: string; + allow_one_client?: boolean; + codec?: RtspCodec; + username?: string; + password?: string; +} + +export interface RtspStatusResponse { + config: RtspConfigResponse; + service_status: string; +} + export interface RustDeskConfigUpdate { enabled?: boolean; rendezvous_server?: string;