refactor: 修改为同步请求

This commit is contained in:
mofeng-git
2026-05-01 20:06:22 +08:00
parent 0d47d8395d
commit 89b19ea7dd
16 changed files with 210 additions and 488 deletions

View File

@@ -8,6 +8,24 @@ use crate::stream_encoder::encoder_type_to_backend;
use crate::video::codec_constraints::{
enforce_constraints_with_stream_manager, StreamCodecConstraints,
};
use tokio::sync::{Mutex, OwnedMutexGuard};
#[derive(Debug, Clone, Copy, Default)]
pub struct ConfigApplyOptions {
pub force: bool,
}
impl ConfigApplyOptions {
pub const fn forced() -> Self {
Self { force: true }
}
}
pub fn try_apply_lock(lock: &Arc<Mutex<()>>, domain: &str) -> Result<OwnedMutexGuard<()>> {
lock.clone().try_lock_owned().map_err(|_| {
AppError::ServiceUnavailable(format!("{domain} configuration is already applying"))
})
}
fn hid_backend_type(config: &HidConfig) -> crate::hid::HidBackendType {
match config.backend {
@@ -33,8 +51,9 @@ pub async fn apply_video_config(
state: &Arc<AppState>,
old_config: &VideoConfig,
new_config: &VideoConfig,
options: ConfigApplyOptions,
) -> Result<()> {
if old_config == new_config {
if old_config == new_config && !options.force {
tracing::info!("Video config unchanged, skipping reload");
return Ok(());
}
@@ -73,10 +92,11 @@ pub async fn apply_stream_config(
state: &Arc<AppState>,
old_config: &StreamConfig,
new_config: &StreamConfig,
options: ConfigApplyOptions,
) -> Result<()> {
tracing::info!("Applying stream config changes...");
if old_config.encoder != new_config.encoder {
if options.force || old_config.encoder != new_config.encoder {
let encoder_backend = encoder_type_to_backend(new_config.encoder.clone());
tracing::info!(
"Updating encoder backend to: {:?} (from config: {:?})",
@@ -86,12 +106,11 @@ pub async fn apply_stream_config(
state.webrtc.update_encoder_backend(encoder_backend).await;
}
if old_config.bitrate_preset != new_config.bitrate_preset {
if options.force || old_config.bitrate_preset != new_config.bitrate_preset {
state
.stream_manager
.set_bitrate_preset(new_config.bitrate_preset)
.await
.ok(); // Ignore error if no active stream
.await?;
}
let ice_changed = old_config.stun_server != new_config.stun_server
@@ -99,7 +118,7 @@ pub async fn apply_stream_config(
|| old_config.turn_username != new_config.turn_username
|| old_config.turn_password != new_config.turn_password;
if ice_changed {
if options.force || ice_changed {
tracing::info!(
"Updating ICE config: STUN={:?}, TURN={:?}",
new_config.stun_server,
@@ -128,6 +147,7 @@ pub async fn apply_hid_config(
state: &Arc<AppState>,
old_config: &HidConfig,
new_config: &HidConfig,
options: ConfigApplyOptions,
) -> Result<()> {
let current_msd_enabled = state.config.get().msd.enabled;
new_config.validate_otg_endpoint_budget(current_msd_enabled)?;
@@ -149,6 +169,7 @@ pub async fn apply_hid_config(
&& !hid_functions_changed
&& !keyboard_leds_changed
&& !endpoint_budget_changed
&& !options.force
{
tracing::info!("HID config unchanged, skipping reload");
return Ok(());
@@ -190,6 +211,7 @@ pub async fn apply_msd_config(
state: &Arc<AppState>,
old_config: &MsdConfig,
new_config: &MsdConfig,
options: ConfigApplyOptions,
) -> Result<()> {
state
.config
@@ -222,7 +244,7 @@ pub async fn apply_msd_config(
tracing::warn!("Failed to create MSD ventoy directory: {}", e);
}
let needs_reload = old_msd_enabled != new_msd_enabled || msd_dir_changed;
let needs_reload = options.force || old_msd_enabled != new_msd_enabled || msd_dir_changed;
if !needs_reload {
tracing::info!(
"MSD enabled state unchanged ({}) and directory unchanged, no reload needed",
@@ -272,7 +294,9 @@ pub async fn apply_msd_config(
}
let current_config = state.config.get();
if current_config.hid.backend == HidBackend::Otg && old_msd_enabled != new_msd_enabled {
if current_config.hid.backend == HidBackend::Otg
&& (options.force || old_msd_enabled != new_msd_enabled)
{
state
.hid
.reload(crate::hid::HidBackendType::Otg)
@@ -306,12 +330,11 @@ pub async fn apply_atx_config(
tracing::info!("ATX enabled in config, initializing...");
let atx = crate::atx::AtxController::new(controller_config);
if let Err(e) = atx.init().await {
tracing::warn!("ATX initialization failed: {}", e);
} else {
*state.atx.write().await = Some(atx);
tracing::info!("ATX controller initialized successfully");
}
atx.init()
.await
.map_err(|e| AppError::Config(format!("ATX initialization failed: {}", e)))?;
*state.atx.write().await = Some(atx);
tracing::info!("ATX controller initialized successfully");
}
}
@@ -331,25 +354,18 @@ pub async fn apply_audio_config(
quality: new_config.quality.parse::<crate::audio::AudioQuality>()?,
};
if let Err(e) = state.audio.update_config(audio_config).await {
tracing::error!("Audio config update failed: {}", e);
} else {
tracing::info!(
"Audio config applied: enabled={}, device={}",
new_config.enabled,
new_config.device
);
}
state.audio.update_config(audio_config).await?;
tracing::info!(
"Audio config applied: enabled={}, device={}",
new_config.enabled,
new_config.device
);
if let Err(e) = state
state
.stream_manager
.set_webrtc_audio_enabled(new_config.enabled)
.await
{
tracing::warn!("Failed to update WebRTC audio state: {}", e);
} else {
tracing::info!("WebRTC audio enabled: {}", new_config.enabled);
}
.await?;
tracing::info!("WebRTC audio enabled: {}", new_config.enabled);
if new_config.enabled {
state.stream_manager.reconnect_webrtc_audio_sources().await;
@@ -370,6 +386,7 @@ pub async fn apply_rustdesk_config(
state: &Arc<AppState>,
old_config: &crate::rustdesk::config::RustDeskConfig,
new_config: &crate::rustdesk::config::RustDeskConfig,
options: ConfigApplyOptions,
) -> Result<()> {
tracing::info!("Applying RustDesk config changes...");
@@ -378,16 +395,18 @@ pub async fn apply_rustdesk_config(
if old_config.enabled && !new_config.enabled {
if let Some(ref service) = *rustdesk_guard {
if let Err(e) = service.stop().await {
tracing::error!("Failed to stop RustDesk service: {}", e);
}
service
.stop()
.await
.map_err(|e| AppError::Config(format!("Failed to stop RustDesk service: {}", e)))?;
tracing::info!("RustDesk service stopped");
}
*rustdesk_guard = None;
}
if new_config.enabled {
let need_restart = old_config.rendezvous_server != new_config.rendezvous_server
let need_restart = options.force
|| old_config.rendezvous_server != new_config.rendezvous_server
|| old_config.device_id != new_config.device_id
|| old_config.device_password != new_config.device_password;
@@ -399,24 +418,22 @@ pub async fn apply_rustdesk_config(
state.hid.clone(),
state.audio.clone(),
);
if let Err(e) = service.start().await {
tracing::error!("Failed to start RustDesk service: {}", e);
} else {
tracing::info!("RustDesk service started with ID: {}", new_config.device_id);
credentials_to_save = service.save_credentials();
}
service.start().await.map_err(|e| {
AppError::Config(format!("Failed to start RustDesk service: {}", e))
})?;
tracing::info!("RustDesk service started with ID: {}", new_config.device_id);
credentials_to_save = service.save_credentials();
*rustdesk_guard = Some(std::sync::Arc::new(service));
} else if need_restart {
if let Some(ref service) = *rustdesk_guard {
if let Err(e) = service.restart(new_config.clone()).await {
tracing::error!("Failed to restart RustDesk service: {}", e);
} else {
tracing::info!(
"RustDesk service restarted with ID: {}",
new_config.device_id
);
credentials_to_save = service.save_credentials();
}
service.restart(new_config.clone()).await.map_err(|e| {
AppError::Config(format!("Failed to restart RustDesk service: {}", e))
})?;
tracing::info!(
"RustDesk service restarted with ID: {}",
new_config.device_id
);
credentials_to_save = service.save_credentials();
}
}
}
@@ -424,7 +441,7 @@ pub async fn apply_rustdesk_config(
drop(rustdesk_guard);
if let Some(updated_config) = credentials_to_save {
tracing::info!("Saving RustDesk credentials to config store...");
if let Err(e) = state
state
.config
.update(|cfg| {
cfg.rustdesk.public_key = updated_config.public_key.clone();
@@ -433,12 +450,8 @@ pub async fn apply_rustdesk_config(
cfg.rustdesk.signing_private_key = updated_config.signing_private_key.clone();
cfg.rustdesk.uuid = updated_config.uuid.clone();
})
.await
{
tracing::warn!("Failed to save RustDesk credentials: {}", e);
} else {
tracing::info!("RustDesk credentials saved successfully");
}
.await?;
tracing::info!("RustDesk credentials saved successfully");
}
if let Some(message) = enforce_stream_codec_constraints(state).await? {
@@ -452,6 +465,7 @@ pub async fn apply_rtsp_config(
state: &Arc<AppState>,
old_config: &RtspConfig,
new_config: &RtspConfig,
options: ConfigApplyOptions,
) -> Result<()> {
tracing::info!("Applying RTSP config changes...");
@@ -459,15 +473,17 @@ pub async fn apply_rtsp_config(
if old_config.enabled && !new_config.enabled {
if let Some(ref service) = *rtsp_guard {
if let Err(e) = service.stop().await {
tracing::error!("Failed to stop RTSP service: {}", e);
}
service
.stop()
.await
.map_err(|e| AppError::Config(format!("Failed to stop RTSP service: {}", e)))?;
}
*rtsp_guard = None;
}
if new_config.enabled {
let need_restart = old_config.bind != new_config.bind
let need_restart = options.force
|| old_config.bind != new_config.bind
|| old_config.port != new_config.port
|| old_config.path != new_config.path
|| old_config.codec != new_config.codec

View File

@@ -6,7 +6,7 @@ use crate::config::{AtxConfig, HidBackend, HidConfig};
use crate::error::{AppError, Result};
use crate::state::AppState;
use super::apply::apply_atx_config;
use super::apply::{apply_atx_config, try_apply_lock};
use super::types::AtxConfigUpdate;
pub async fn get_atx_config(State(state): State<Arc<AppState>>) -> Json<AtxConfig> {
@@ -22,6 +22,7 @@ pub async fn update_atx_config(
req.validate_with_current(&old_atx_config)?;
let _apply_guard = try_apply_lock(&state.config_apply_locks.atx, "atx")?;
let mut merged_atx_config = old_atx_config.clone();
req.apply_to(&mut merged_atx_config);
validate_serial_device_conflict(&merged_atx_config, &current_config.hid)?;
@@ -35,9 +36,7 @@ pub async fn update_atx_config(
let new_atx_config = state.config.get().atx.clone();
if let Err(e) = apply_atx_config(&state, &old_atx_config, &new_atx_config).await {
tracing::error!("Failed to apply ATX config: {}", e);
}
apply_atx_config(&state, &old_atx_config, &new_atx_config).await?;
Ok(Json(new_atx_config))
}

View File

@@ -5,7 +5,7 @@ use crate::config::AudioConfig;
use crate::error::Result;
use crate::state::AppState;
use super::apply::apply_audio_config;
use super::apply::{apply_audio_config, try_apply_lock};
use super::types::AudioConfigUpdate;
pub async fn get_audio_config(State(state): State<Arc<AppState>>) -> Json<AudioConfig> {
@@ -18,6 +18,7 @@ pub async fn update_audio_config(
) -> Result<Json<AudioConfig>> {
req.validate()?;
let _apply_guard = try_apply_lock(&state.config_apply_locks.audio, "audio")?;
let old_audio_config = state.config.get().audio.clone();
state
@@ -29,9 +30,7 @@ pub async fn update_audio_config(
let new_audio_config = state.config.get().audio.clone();
if let Err(e) = apply_audio_config(&state, &old_audio_config, &new_audio_config).await {
tracing::error!("Failed to apply audio config: {}", e);
}
apply_audio_config(&state, &old_audio_config, &new_audio_config).await?;
Ok(Json(new_audio_config))
}

View File

@@ -5,7 +5,7 @@ use crate::config::HidConfig;
use crate::error::Result;
use crate::state::AppState;
use super::apply::apply_hid_config;
use super::apply::{apply_hid_config, try_apply_lock, ConfigApplyOptions};
use super::types::HidConfigUpdate;
pub async fn get_hid_config(State(state): State<Arc<AppState>>) -> Json<HidConfig> {
@@ -18,6 +18,7 @@ pub async fn update_hid_config(
) -> Result<Json<HidConfig>> {
req.validate()?;
let _apply_guard = try_apply_lock(&state.config_apply_locks.otg, "otg")?;
let old_hid_config = state.config.get().hid.clone();
state
@@ -29,9 +30,13 @@ pub async fn update_hid_config(
let new_hid_config = state.config.get().hid.clone();
if let Err(e) = apply_hid_config(&state, &old_hid_config, &new_hid_config).await {
tracing::error!("Failed to apply HID config: {}", e);
}
apply_hid_config(
&state,
&old_hid_config,
&new_hid_config,
ConfigApplyOptions::forced(),
)
.await?;
Ok(Json(new_hid_config))
}

View File

@@ -5,7 +5,7 @@ use crate::config::MsdConfig;
use crate::error::Result;
use crate::state::AppState;
use super::apply::apply_msd_config;
use super::apply::{apply_msd_config, try_apply_lock, ConfigApplyOptions};
use super::types::MsdConfigUpdate;
pub async fn get_msd_config(State(state): State<Arc<AppState>>) -> Json<MsdConfig> {
@@ -18,6 +18,7 @@ pub async fn update_msd_config(
) -> Result<Json<MsdConfig>> {
req.validate()?;
let _apply_guard = try_apply_lock(&state.config_apply_locks.otg, "otg")?;
let old_msd_config = state.config.get().msd.clone();
state
@@ -29,9 +30,13 @@ pub async fn update_msd_config(
let new_msd_config = state.config.get().msd.clone();
if let Err(e) = apply_msd_config(&state, &old_msd_config, &new_msd_config).await {
tracing::error!("Failed to apply MSD config: {}", e);
}
apply_msd_config(
&state,
&old_msd_config,
&new_msd_config,
ConfigApplyOptions::forced(),
)
.await?;
Ok(Json(new_msd_config))
}

View File

@@ -1,10 +1,10 @@
use axum::{extract::State, Json};
use std::sync::Arc;
use crate::error::{AppError, Result};
use crate::error::Result;
use crate::state::AppState;
use super::apply::apply_rtsp_config;
use super::apply::{apply_rtsp_config, try_apply_lock, ConfigApplyOptions};
use super::types::{RtspConfigResponse, RtspConfigUpdate, RtspStatusResponse};
pub async fn get_rtsp_config(State(state): State<Arc<AppState>>) -> Json<RtspConfigResponse> {
@@ -32,6 +32,7 @@ pub async fn update_rtsp_config(
) -> Result<Json<RtspConfigResponse>> {
req.validate()?;
let _apply_guard = try_apply_lock(&state.config_apply_locks.rtsp, "rtsp")?;
let old_config = state.config.get().rtsp.clone();
state
@@ -42,26 +43,13 @@ pub async fn update_rtsp_config(
.await?;
let new_config = state.config.get().rtsp.clone();
if let Err(err) = apply_rtsp_config(&state, &old_config, &new_config).await {
tracing::error!("Failed to apply RTSP config: {}", err);
if let Err(rollback_err) = state
.config
.update(|config| {
config.rtsp = old_config.clone();
})
.await
{
tracing::error!(
"Failed to rollback RTSP config after apply failure: {}",
rollback_err
);
return Err(AppError::ServiceUnavailable(format!(
"RTSP apply failed: {}; rollback failed: {}",
err, rollback_err
)));
}
return Err(err);
}
apply_rtsp_config(
&state,
&old_config,
&new_config,
ConfigApplyOptions::forced(),
)
.await?;
Ok(Json(RtspConfigResponse::from(&new_config)))
}

View File

@@ -5,7 +5,7 @@ use crate::error::Result;
use crate::rustdesk::config::RustDeskConfig;
use crate::state::AppState;
use super::apply::apply_rustdesk_config;
use super::apply::{apply_rustdesk_config, try_apply_lock, ConfigApplyOptions};
use super::types::RustDeskConfigUpdate;
#[derive(Debug, serde::Serialize)]
@@ -75,6 +75,7 @@ pub async fn update_rustdesk_config(
) -> Result<Json<RustDeskConfigResponse>> {
req.validate()?;
let _apply_guard = try_apply_lock(&state.config_apply_locks.rustdesk, "rustdesk")?;
let old_config = state.config.get().rustdesk.clone();
state
@@ -86,9 +87,13 @@ pub async fn update_rustdesk_config(
let new_config = state.config.get().rustdesk.clone();
if let Err(e) = apply_rustdesk_config(&state, &old_config, &new_config).await {
tracing::error!("Failed to apply RustDesk config: {}", e);
}
apply_rustdesk_config(
&state,
&old_config,
&new_config,
ConfigApplyOptions::forced(),
)
.await?;
let constraints = state.stream_manager.codec_constraints().await;
if constraints.rustdesk_enabled || constraints.rtsp_enabled {

View File

@@ -4,7 +4,7 @@ use std::sync::Arc;
use crate::error::Result;
use crate::state::AppState;
use super::apply::apply_stream_config;
use super::apply::{apply_stream_config, try_apply_lock, ConfigApplyOptions};
use super::types::{StreamConfigResponse, StreamConfigUpdate};
pub async fn get_stream_config(State(state): State<Arc<AppState>>) -> Json<StreamConfigResponse> {
@@ -18,6 +18,7 @@ pub async fn update_stream_config(
) -> Result<Json<StreamConfigResponse>> {
req.validate()?;
let _apply_guard = try_apply_lock(&state.config_apply_locks.stream, "stream")?;
let old_stream_config = state.config.get().stream.clone();
state
@@ -29,13 +30,15 @@ pub async fn update_stream_config(
let new_stream_config = state.config.get().stream.clone();
if let Err(e) = apply_stream_config(&state, &old_stream_config, &new_stream_config).await {
tracing::error!("Failed to apply stream config: {}", e);
}
apply_stream_config(
&state,
&old_stream_config,
&new_stream_config,
ConfigApplyOptions::forced(),
)
.await?;
if let Err(e) = super::apply::enforce_stream_codec_constraints(&state).await {
tracing::error!("Failed to enforce stream codec constraints: {}", e);
}
super::apply::enforce_stream_codec_constraints(&state).await?;
Ok(Json(StreamConfigResponse::from(&new_stream_config)))
}

View File

@@ -5,7 +5,7 @@ use crate::config::VideoConfig;
use crate::error::Result;
use crate::state::AppState;
use super::apply::apply_video_config;
use super::apply::{apply_video_config, try_apply_lock, ConfigApplyOptions};
use super::types::VideoConfigUpdate;
pub async fn get_video_config(State(state): State<Arc<AppState>>) -> Json<VideoConfig> {
@@ -18,6 +18,7 @@ pub async fn update_video_config(
) -> Result<Json<VideoConfig>> {
req.validate()?;
let _apply_guard = try_apply_lock(&state.config_apply_locks.video, "video")?;
let old_video_config = state.config.get().video.clone();
state
@@ -29,10 +30,13 @@ pub async fn update_video_config(
let new_video_config = state.config.get().video.clone();
if let Err(e) = apply_video_config(&state, &old_video_config, &new_video_config).await {
tracing::error!("Failed to apply video config: {}", e);
// 根据用户选择,仅记录错误,不回滚
}
apply_video_config(
&state,
&old_video_config,
&new_video_config,
ConfigApplyOptions::forced(),
)
.await?;
Ok(Json(new_video_config))
}