fix: 优化视频切换流畅性;修复 OTG HID 功能无法一次保存成功和页面未即刻生效问题

This commit is contained in:
mofeng-git
2026-04-11 21:20:54 +08:00
parent eecbc0fc13
commit 099f0b1ca2
8 changed files with 219 additions and 73 deletions

View File

@@ -4,6 +4,7 @@ use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast;
use tokio::sync::Mutex;
use super::AppConfig;
use crate::error::{AppError, Result};
@@ -18,6 +19,8 @@ pub struct ConfigStore {
/// Lock-free cache using ArcSwap for zero-cost reads
cache: Arc<ArcSwap<AppConfig>>,
change_tx: broadcast::Sender<ConfigChange>,
/// Serializes `set` / `update` so concurrent PATCH handlers cannot clobber each other
write_lock: Arc<Mutex<()>>,
}
/// Configuration change event
@@ -59,6 +62,7 @@ impl ConfigStore {
pool,
cache,
change_tx,
write_lock: Arc::new(Mutex::new(())),
})
}
@@ -191,6 +195,7 @@ impl ConfigStore {
/// Set entire configuration
pub async fn set(&self, config: AppConfig) -> Result<()> {
let _guard = self.write_lock.lock().await;
Self::save_config_to_db(&self.pool, &config).await?;
self.cache.store(Arc::new(config));
@@ -204,13 +209,13 @@ impl ConfigStore {
/// Update configuration with a closure
///
/// Note: This uses a read-modify-write pattern. For concurrent updates,
/// the last write wins. This is acceptable for configuration changes
/// which are infrequent and typically user-initiated.
/// Uses read-modify-write under a mutex so concurrent `update` / `set` calls are serialized
/// and merged correctly (each closure sees the latest stored config).
pub async fn update<F>(&self, f: F) -> Result<()>
where
F: FnOnce(&mut AppConfig),
{
let _guard = self.write_lock.lock().await;
// Load current config, clone it for modification
let current = self.cache.load();
let mut config = (**current).clone();

View File

@@ -844,7 +844,7 @@ impl SharedVideoPipeline {
}
}
/// Stop the pipeline
/// Stop the pipeline (non-blocking, does not wait for capture thread to exit)
pub fn stop(&self) {
if *self.running_rx.borrow() {
let _ = self.running.send(false);
@@ -854,6 +854,39 @@ impl SharedVideoPipeline {
}
}
/// Stop the pipeline and wait for the capture thread to fully exit.
///
/// This ensures the V4L2 device is released before returning, which is
/// necessary when another consumer (e.g. MJPEG streamer) needs to open
/// the same device immediately after.
pub async fn stop_and_wait(&self, timeout: std::time::Duration) {
self.stop();
let mut rx = self.running_watch();
if !*rx.borrow() {
// Capture thread may still be running from a previous `stop()` call.
// Wait for the "Video pipeline stopped" log (thread sets running=false
// at exit), unless it already happened.
}
let deadline = tokio::time::Instant::now() + timeout;
loop {
if !self.running_flag.load(Ordering::Acquire) {
// Flag is cleared, but the capture thread may still be unwinding
// (dropping the V4L2 stream). Give it a brief moment.
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
break;
}
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
warn!(
"Timed out waiting for video pipeline to stop after {:?}",
timeout
);
break;
}
let _ = tokio::time::timeout(remaining, rx.changed()).await;
}
}
/// Set bitrate using preset
pub async fn set_bitrate_preset(
&self,

View File

@@ -404,8 +404,11 @@ impl VideoStreamManager {
}
}
StreamMode::WebRTC => {
info!("Closing all WebRTC sessions");
let closed = self.webrtc_streamer.close_all_sessions().await;
info!("Closing all WebRTC sessions and releasing capture device");
let closed = self
.webrtc_streamer
.close_all_sessions_and_release_device()
.await;
if closed > 0 {
info!("Closed {} WebRTC sessions", closed);
}
@@ -781,6 +784,61 @@ impl VideoStreamManager {
self.webrtc_streamer.request_keyframe().await
}
/// Notify frontend about a codec-only switch (WebRTC mode unchanged, codec changed).
///
/// `set_video_codec` already rebuilt the pipeline synchronously, so we just
/// emit the events the frontend waits on: `StreamModeChanged`, `WebRTCReady`,
/// and `StreamModeReady`.
///
/// Events are spawned asynchronously so the HTTP response (carrying the
/// `transition_id`) reaches the client before the WebSocket events, giving
/// the frontend time to call `registerTransition()` first.
pub async fn notify_codec_switch(
self: &Arc<Self>,
transition_id: &str,
new_codec_str: &str,
previous_codec_str: &str,
) {
let manager = Arc::clone(self);
let transition_id = transition_id.to_string();
let new_codec = new_codec_str.to_string();
let prev_codec = previous_codec_str.to_string();
tokio::spawn(async move {
// Small yield to ensure the HTTP response is flushed first.
tokio::task::yield_now().await;
manager
.publish_event(SystemEvent::StreamModeChanged {
transition_id: Some(transition_id.clone()),
mode: new_codec.clone(),
previous_mode: prev_codec.clone(),
})
.await;
let is_hardware = manager.webrtc_streamer.is_hardware_encoding().await;
manager
.publish_event(SystemEvent::WebRTCReady {
transition_id: Some(transition_id.clone()),
codec: new_codec.clone(),
hardware: is_hardware,
})
.await;
manager
.publish_event(SystemEvent::StreamModeReady {
transition_id: transition_id.clone(),
mode: new_codec.clone(),
})
.await;
info!(
"Codec switch notified: {} -> {} (transition: {})",
prev_codec, new_codec, transition_id
);
});
}
/// Publish event to event bus
async fn publish_event(&self, event: SystemEvent) {
if let Some(ref events) = *self.events.read().await {

View File

@@ -1282,7 +1282,26 @@ pub async fn stream_mode_set(
}
}
// Set video codec if switching to WebRTC mode with specific codec
let requested_mode_str = match (&new_mode, &video_codec) {
(StreamMode::Mjpeg, _) => "mjpeg",
(StreamMode::WebRTC, Some(VideoCodecType::H264)) => "h264",
(StreamMode::WebRTC, Some(VideoCodecType::H265)) => "h265",
(StreamMode::WebRTC, Some(VideoCodecType::VP8)) => "vp8",
(StreamMode::WebRTC, Some(VideoCodecType::VP9)) => "vp9",
(StreamMode::WebRTC, None) => "webrtc",
};
// Detect codec-only switch: already in WebRTC mode, just changing codec.
// switch_mode_transaction treats this as "no switch needed" since StreamMode
// is still WebRTC, so we handle codec change + event emission here.
let current_mode = state.stream_manager.current_mode().await;
let prev_codec = state.stream_manager.webrtc_streamer().current_video_codec().await;
let codec_changed = video_codec.is_some_and(|c| c != prev_codec);
let is_codec_only_switch = current_mode == StreamMode::WebRTC
&& new_mode == StreamMode::WebRTC
&& codec_changed;
if let Some(codec) = video_codec {
info!("Setting WebRTC video codec to {:?}", codec);
if let Err(e) = state
@@ -1295,22 +1314,34 @@ pub async fn stream_mode_set(
}
}
// For codec-only switch, emit events directly instead of going through
// switch_mode_transaction (which short-circuits when mode is unchanged).
if is_codec_only_switch {
let transition_id = uuid::Uuid::new_v4().to_string();
state
.stream_manager
.notify_codec_switch(
&transition_id,
requested_mode_str,
&codec_to_id(prev_codec),
)
.await;
return Ok(Json(StreamModeResponse {
success: true,
mode: requested_mode_str.to_string(),
transition_id: Some(transition_id),
switching: false,
message: Some(format!("Codec switched to {}", requested_mode_str)),
}));
}
let tx = state
.stream_manager
.switch_mode_transaction(new_mode.clone())
.await?;
// Return the requested codec identifier (for UI display). The actual active mode
// may differ if the request was rejected due to an in-progress switch.
let requested_mode_str = match (&new_mode, &video_codec) {
(StreamMode::Mjpeg, _) => "mjpeg",
(StreamMode::WebRTC, Some(VideoCodecType::H264)) => "h264",
(StreamMode::WebRTC, Some(VideoCodecType::H265)) => "h265",
(StreamMode::WebRTC, Some(VideoCodecType::VP8)) => "vp8",
(StreamMode::WebRTC, Some(VideoCodecType::VP9)) => "vp9",
(StreamMode::WebRTC, None) => "webrtc",
};
let active_mode_str = match state.stream_manager.current_mode().await {
StreamMode::Mjpeg => "mjpeg".to_string(),
StreamMode::WebRTC => {

View File

@@ -787,6 +787,22 @@ impl WebRtcStreamer {
count
}
/// Close all sessions and wait for the video pipeline to fully release the
/// capture device. Use this when the caller needs the V4L2 device immediately
/// afterwards (e.g. switching to MJPEG mode).
pub async fn close_all_sessions_and_release_device(&self) -> usize {
let count = self.close_all_sessions().await;
if let Some(ref pipeline) = *self.video_pipeline.read().await {
pipeline
.stop_and_wait(std::time::Duration::from_secs(3))
.await;
}
*self.video_pipeline.write().await = None;
count
}
/// Get session count
pub async fn session_count(&self) -> usize {
self.sessions.read().await.len()