feat: 支持 rtsp 功能

This commit is contained in:
mofeng-git
2026-02-11 16:06:06 +08:00
parent 261deb1303
commit 3824e57fc5
23 changed files with 2154 additions and 37 deletions

View File

@@ -7,7 +7,11 @@ use std::sync::Arc;
use crate::config::*;
use crate::error::{AppError, Result};
use crate::events::SystemEvent;
use crate::rtsp::RtspService;
use crate::state::AppState;
use crate::video::codec_constraints::{
enforce_constraints_with_stream_manager, StreamCodecConstraints,
};
/// 应用 Video 配置变更
pub async fn apply_video_config(
@@ -444,6 +448,15 @@ pub async fn apply_audio_config(
Ok(())
}
/// Apply stream codec constraints derived from global config.
pub async fn enforce_stream_codec_constraints(state: &Arc<AppState>) -> Result<Option<String>> {
let config = state.config.get();
let constraints = StreamCodecConstraints::from_config(&config);
let enforcement =
enforce_constraints_with_stream_manager(&state.stream_manager, &constraints).await?;
Ok(enforcement.message)
}
/// 应用 RustDesk 配置变更
pub async fn apply_rustdesk_config(
state: &Arc<AppState>,
@@ -453,6 +466,7 @@ pub async fn apply_rustdesk_config(
tracing::info!("Applying RustDesk config changes...");
let mut rustdesk_guard = state.rustdesk.write().await;
let mut credentials_to_save = None;
// Check if service needs to be stopped
if old_config.enabled && !new_config.enabled {
@@ -464,7 +478,6 @@ pub async fn apply_rustdesk_config(
tracing::info!("RustDesk service stopped");
}
*rustdesk_guard = None;
return Ok(());
}
// Check if service needs to be started or restarted
@@ -473,8 +486,6 @@ pub async fn apply_rustdesk_config(
|| old_config.device_id != new_config.device_id
|| old_config.device_password != new_config.device_password;
let mut credentials_to_save = None;
if rustdesk_guard.is_none() {
// Create new service
tracing::info!("Initializing RustDesk service...");
@@ -507,28 +518,82 @@ pub async fn apply_rustdesk_config(
}
}
}
}
// Save credentials to persistent config store (outside the lock)
drop(rustdesk_guard);
if let Some(updated_config) = credentials_to_save {
tracing::info!("Saving RustDesk credentials to config store...");
if let Err(e) = state
.config
.update(|cfg| {
cfg.rustdesk.public_key = updated_config.public_key.clone();
cfg.rustdesk.private_key = updated_config.private_key.clone();
cfg.rustdesk.signing_public_key = updated_config.signing_public_key.clone();
cfg.rustdesk.signing_private_key = updated_config.signing_private_key.clone();
cfg.rustdesk.uuid = updated_config.uuid.clone();
})
.await
{
tracing::warn!("Failed to save RustDesk credentials: {}", e);
} else {
tracing::info!("RustDesk credentials saved successfully");
}
// Save credentials to persistent config store (outside the lock)
drop(rustdesk_guard);
if let Some(updated_config) = credentials_to_save {
tracing::info!("Saving RustDesk credentials to config store...");
if let Err(e) = state
.config
.update(|cfg| {
cfg.rustdesk.public_key = updated_config.public_key.clone();
cfg.rustdesk.private_key = updated_config.private_key.clone();
cfg.rustdesk.signing_public_key = updated_config.signing_public_key.clone();
cfg.rustdesk.signing_private_key = updated_config.signing_private_key.clone();
cfg.rustdesk.uuid = updated_config.uuid.clone();
})
.await
{
tracing::warn!("Failed to save RustDesk credentials: {}", e);
} else {
tracing::info!("RustDesk credentials saved successfully");
}
}
if let Some(message) = enforce_stream_codec_constraints(state).await? {
tracing::info!("{}", message);
}
Ok(())
}
/// 应用 RTSP 配置变更
pub async fn apply_rtsp_config(
state: &Arc<AppState>,
old_config: &RtspConfig,
new_config: &RtspConfig,
) -> Result<()> {
tracing::info!("Applying RTSP config changes...");
let mut rtsp_guard = state.rtsp.write().await;
if old_config.enabled && !new_config.enabled {
if let Some(ref service) = *rtsp_guard {
if let Err(e) = service.stop().await {
tracing::error!("Failed to stop RTSP service: {}", e);
}
}
*rtsp_guard = None;
}
if new_config.enabled {
let need_restart = old_config.bind != new_config.bind
|| old_config.port != new_config.port
|| old_config.path != new_config.path
|| old_config.codec != new_config.codec
|| old_config.username != new_config.username
|| old_config.password != new_config.password
|| old_config.allow_one_client != new_config.allow_one_client;
if rtsp_guard.is_none() {
let service = RtspService::new(new_config.clone(), state.stream_manager.clone());
service.start().await?;
tracing::info!("RTSP service started");
*rtsp_guard = Some(Arc::new(service));
} else if need_restart {
if let Some(ref service) = *rtsp_guard {
service.restart(new_config.clone()).await?;
tracing::info!("RTSP service restarted");
}
}
}
drop(rtsp_guard);
if let Some(message) = enforce_stream_codec_constraints(state).await? {
tracing::info!("{}", message);
}
Ok(())
}

View File

@@ -25,6 +25,7 @@ mod auth;
mod hid;
mod msd;
mod rustdesk;
mod rtsp;
mod stream;
pub(crate) mod video;
mod web;
@@ -39,6 +40,7 @@ pub use rustdesk::{
get_device_password, get_rustdesk_config, get_rustdesk_status, regenerate_device_id,
regenerate_device_password, update_rustdesk_config,
};
pub use rtsp::{get_rtsp_config, get_rtsp_status, update_rtsp_config};
pub use stream::{get_stream_config, update_stream_config};
pub use video::{get_video_config, update_video_config};
pub use web::{get_web_config, update_web_config};
@@ -64,6 +66,9 @@ fn sanitize_config_for_api(config: &mut AppConfig) {
config.rustdesk.private_key = None;
config.rustdesk.signing_public_key = None;
config.rustdesk.signing_private_key = None;
// RTSP secrets
config.rtsp.password = None;
}
/// 获取完整配置

View File

@@ -0,0 +1,67 @@
use axum::{extract::State, Json};
use std::sync::Arc;
use crate::error::{AppError, Result};
use crate::state::AppState;
use super::apply::apply_rtsp_config;
use super::types::{RtspConfigResponse, RtspConfigUpdate, RtspStatusResponse};
/// Get RTSP config
pub async fn get_rtsp_config(State(state): State<Arc<AppState>>) -> Json<RtspConfigResponse> {
let config = state.config.get();
Json(RtspConfigResponse::from(&config.rtsp))
}
/// Get RTSP status (config + service status)
pub async fn get_rtsp_status(State(state): State<Arc<AppState>>) -> Json<RtspStatusResponse> {
let config = state.config.get().rtsp.clone();
let status = {
let guard = state.rtsp.read().await;
if let Some(ref service) = *guard {
service.status().await
} else {
crate::rtsp::RtspServiceStatus::Stopped
}
};
Json(RtspStatusResponse::new(&config, status))
}
/// Update RTSP config
pub async fn update_rtsp_config(
State(state): State<Arc<AppState>>,
Json(req): Json<RtspConfigUpdate>,
) -> Result<Json<RtspConfigResponse>> {
req.validate()?;
let old_config = state.config.get().rtsp.clone();
state
.config
.update(|config| {
req.apply_to(&mut config.rtsp);
})
.await?;
let new_config = state.config.get().rtsp.clone();
if let Err(err) = apply_rtsp_config(&state, &old_config, &new_config).await {
tracing::error!("Failed to apply RTSP config: {}", err);
if let Err(rollback_err) = state
.config
.update(|config| {
config.rtsp = old_config.clone();
})
.await
{
tracing::error!("Failed to rollback RTSP config after apply failure: {}", rollback_err);
return Err(AppError::ServiceUnavailable(format!(
"RTSP apply failed: {}; rollback failed: {}",
err, rollback_err
)));
}
return Err(err);
}
Ok(Json(RtspConfigResponse::from(&new_config)))
}

View File

@@ -106,6 +106,15 @@ pub async fn update_rustdesk_config(
tracing::error!("Failed to apply RustDesk config: {}", e);
}
// Share a non-sensitive summary for frontend UX
let constraints = state.stream_manager.codec_constraints().await;
if constraints.rustdesk_enabled || constraints.rtsp_enabled {
tracing::info!(
"Stream codec constraints active after RustDesk update: {}",
constraints.reason
);
}
Ok(Json(RustDeskConfigResponse::from(&new_config)))
}

View File

@@ -42,5 +42,10 @@ pub async fn update_stream_config(
tracing::error!("Failed to apply stream config: {}", e);
}
// 6. Enforce codec constraints after any stream config update
if let Err(e) = super::apply::enforce_stream_codec_constraints(&state).await {
tracing::error!("Failed to enforce stream codec constraints: {}", e);
}
Ok(Json(StreamConfigResponse::from(&new_stream_config)))
}

View File

@@ -1,5 +1,6 @@
use crate::config::*;
use crate::error::AppError;
use crate::rtsp::RtspServiceStatus;
use crate::rustdesk::config::RustDeskConfig;
use crate::video::encoder::BitratePreset;
use serde::Deserialize;
@@ -604,6 +605,124 @@ impl RustDeskConfigUpdate {
}
}
// ===== RTSP Config =====
#[typeshare]
#[derive(Debug, serde::Serialize)]
pub struct RtspConfigResponse {
pub enabled: bool,
pub bind: String,
pub port: u16,
pub path: String,
pub allow_one_client: bool,
pub codec: RtspCodec,
pub username: Option<String>,
pub has_password: bool,
}
impl From<&RtspConfig> for RtspConfigResponse {
fn from(config: &RtspConfig) -> Self {
Self {
enabled: config.enabled,
bind: config.bind.clone(),
port: config.port,
path: config.path.clone(),
allow_one_client: config.allow_one_client,
codec: config.codec.clone(),
username: config.username.clone(),
has_password: config.password.is_some(),
}
}
}
#[typeshare]
#[derive(Debug, serde::Serialize)]
pub struct RtspStatusResponse {
pub config: RtspConfigResponse,
pub service_status: String,
}
impl RtspStatusResponse {
pub fn new(config: &RtspConfig, status: RtspServiceStatus) -> Self {
Self {
config: RtspConfigResponse::from(config),
service_status: status.to_string(),
}
}
}
#[typeshare]
#[derive(Debug, Deserialize)]
pub struct RtspConfigUpdate {
pub enabled: Option<bool>,
pub bind: Option<String>,
pub port: Option<u16>,
pub path: Option<String>,
pub allow_one_client: Option<bool>,
pub codec: Option<RtspCodec>,
pub username: Option<String>,
pub password: Option<String>,
}
impl RtspConfigUpdate {
pub fn validate(&self) -> crate::error::Result<()> {
if let Some(port) = self.port {
if port == 0 {
return Err(AppError::BadRequest("RTSP port cannot be 0".into()));
}
}
if let Some(ref bind) = self.bind {
if bind.parse::<std::net::IpAddr>().is_err() {
return Err(AppError::BadRequest("RTSP bind must be a valid IP".into()));
}
}
if let Some(ref path) = self.path {
let normalized = path.trim_matches('/');
if normalized.is_empty() {
return Err(AppError::BadRequest("RTSP path cannot be empty".into()));
}
}
Ok(())
}
pub fn apply_to(&self, config: &mut RtspConfig) {
if let Some(enabled) = self.enabled {
config.enabled = enabled;
}
if let Some(ref bind) = self.bind {
config.bind = bind.clone();
}
if let Some(port) = self.port {
config.port = port;
}
if let Some(ref path) = self.path {
config.path = path.trim_matches('/').to_string();
}
if let Some(allow_one_client) = self.allow_one_client {
config.allow_one_client = allow_one_client;
}
if let Some(codec) = self.codec.clone() {
config.codec = codec;
}
if let Some(ref username) = self.username {
config.username = if username.is_empty() {
None
} else {
Some(username.clone())
};
}
if let Some(ref password) = self.password {
config.password = if password.is_empty() {
None
} else {
Some(password.clone())
};
}
}
}
// ===== Web Config =====
#[typeshare]
#[derive(Debug, Deserialize)]

View File

@@ -14,6 +14,7 @@ use crate::config::{AppConfig, StreamMode};
use crate::error::{AppError, Result};
use crate::events::SystemEvent;
use crate::state::AppState;
use crate::video::codec_constraints::codec_to_id;
use crate::video::encoder::BitratePreset;
// ============================================================================
@@ -747,6 +748,17 @@ pub async fn setup_init(
}
}
// Start RTSP if enabled
if new_config.rtsp.enabled {
let empty_config = crate::config::RtspConfig::default();
if let Err(e) = config::apply::apply_rtsp_config(&state, &empty_config, &new_config.rtsp).await
{
tracing::warn!("Failed to start RTSP during setup: {}", e);
} else {
tracing::info!("RTSP started during setup");
}
}
// Start audio streaming if audio device was selected during setup
if new_config.audio.enabled {
let audio_config = crate::audio::AudioControllerConfig {
@@ -1439,6 +1451,8 @@ pub async fn stream_mode_set(
) -> Result<Json<StreamModeResponse>> {
use crate::video::encoder::VideoCodecType;
let constraints = state.stream_manager.codec_constraints().await;
let mode_lower = req.mode.to_lowercase();
let (new_mode, video_codec) = match mode_lower.as_str() {
"mjpeg" => (StreamMode::Mjpeg, None),
@@ -1454,6 +1468,23 @@ pub async fn stream_mode_set(
}
};
if new_mode == StreamMode::Mjpeg && !constraints.is_mjpeg_allowed() {
return Err(AppError::BadRequest(format!(
"Codec 'mjpeg' is not allowed: {}",
constraints.reason
)));
}
if let Some(codec) = video_codec {
if !constraints.is_webrtc_codec_allowed(codec) {
return Err(AppError::BadRequest(format!(
"Codec '{}' is not allowed: {}",
codec_to_id(codec),
constraints.reason
)));
}
}
// Set video codec if switching to WebRTC mode with specific codec
if let Some(codec) = video_codec {
info!("Setting WebRTC video codec to {:?}", codec);
@@ -1560,6 +1591,67 @@ pub struct AvailableCodecsResponse {
pub codecs: Vec<VideoCodecInfo>,
}
/// Stream constraints response
#[derive(Serialize)]
pub struct StreamConstraintsResponse {
pub success: bool,
pub allowed_codecs: Vec<String>,
pub locked_codec: Option<String>,
pub disallow_mjpeg: bool,
pub sources: ConstraintSources,
pub reason: String,
pub current_mode: String,
}
#[derive(Serialize)]
pub struct ConstraintSources {
pub rustdesk: bool,
pub rtsp: bool,
}
/// Get stream codec constraints derived from enabled services.
pub async fn stream_constraints_get(
State(state): State<Arc<AppState>>,
) -> Json<StreamConstraintsResponse> {
use crate::video::encoder::VideoCodecType;
let constraints = state.stream_manager.codec_constraints().await;
let current_mode = state.stream_manager.current_mode().await;
let current_mode = match current_mode {
StreamMode::Mjpeg => "mjpeg".to_string(),
StreamMode::WebRTC => {
let codec = state
.stream_manager
.webrtc_streamer()
.current_video_codec()
.await;
match codec {
VideoCodecType::H264 => "h264".to_string(),
VideoCodecType::H265 => "h265".to_string(),
VideoCodecType::VP8 => "vp8".to_string(),
VideoCodecType::VP9 => "vp9".to_string(),
}
}
};
Json(StreamConstraintsResponse {
success: true,
allowed_codecs: constraints
.allowed_codecs_for_api()
.into_iter()
.map(str::to_string)
.collect(),
locked_codec: constraints.locked_codec.map(codec_to_id).map(str::to_string),
disallow_mjpeg: !constraints.allow_mjpeg,
sources: ConstraintSources {
rustdesk: constraints.rustdesk_enabled,
rtsp: constraints.rtsp_enabled,
},
reason: constraints.reason,
current_mode,
})
}
/// Set bitrate request
#[derive(Deserialize)]
pub struct SetBitrateRequest {