diff --git a/src/audio/capture.rs b/src/audio/capture.rs index aef64344..b8479991 100644 --- a/src/audio/capture.rs +++ b/src/audio/capture.rs @@ -97,12 +97,14 @@ pub struct AudioFrame { } impl AudioFrame { - pub fn new(data: Bytes, config: &AudioConfig, sequence: u64) -> Self { + /// One capture block: `sample_rate` must be the **hardware** rate (e.g. ALSA `actual_rate`). + pub fn new_interleaved(data: Bytes, channels: u32, sample_rate: u32, sequence: u64) -> Self { + let bps = 2 * channels; Self { - samples: data.len() as u32 / config.bytes_per_sample(), + samples: data.len() as u32 / bps, data, - sample_rate: config.sample_rate, - channels: config.channels, + sample_rate, + channels, sequence, timestamp: Instant::now(), } @@ -285,10 +287,17 @@ fn run_capture( .map(|h| h.get_rate().unwrap_or(config.sample_rate)) .unwrap_or(config.sample_rate); - info!( - "Audio capture configured: {}Hz {}ch (requested {}Hz)", - actual_rate, config.channels, config.sample_rate - ); + if actual_rate != config.sample_rate { + info!( + "ALSA sample rate differs from requested ({}Hz vs {}Hz); streamer will resample to 48000Hz for Opus", + actual_rate, config.sample_rate + ); + } else { + info!( + "Audio capture configured: {}Hz {}ch (requested {}Hz)", + actual_rate, config.channels, config.sample_rate + ); + } // Prepare for capture pcm.prepare() @@ -296,9 +305,17 @@ fn run_capture( let _ = state.send(CaptureState::Running); - // Allocate buffer - use u8 directly for zero-copy - let frame_bytes = config.bytes_per_frame(); - let mut buffer = vec![0u8; frame_bytes]; + // Sized from actual period — `readi` may return up to ~one period of frames per call. + let period_frames = pcm + .hw_params_current() + .ok() + .and_then(|h| h.get_period_size().ok()) + .map(|f| f as usize) + .unwrap_or(1024) + .max(256); + let buf_frames = period_frames.saturating_mul(4).max(2048); + let bytes_per_frame = (config.channels as usize) * 2; + let mut buffer = vec![0u8; buf_frames * bytes_per_frame]; // Capture loop while !stop_flag.load(Ordering::Relaxed) { @@ -337,8 +354,12 @@ fn run_capture( // Directly use the buffer slice (already in correct byte format) let seq = sequence.fetch_add(1, Ordering::Relaxed); - let frame = - AudioFrame::new(Bytes::copy_from_slice(&buffer[..byte_count]), config, seq); + let frame = AudioFrame::new_interleaved( + Bytes::copy_from_slice(&buffer[..byte_count]), + config.channels, + actual_rate, + seq, + ); // Send to subscribers if frame_tx.receiver_count() > 0 { diff --git a/src/audio/controller.rs b/src/audio/controller.rs index 5861aec1..f6f76570 100644 --- a/src/audio/controller.rs +++ b/src/audio/controller.rs @@ -381,7 +381,7 @@ impl AudioController { pub async fn update_config(&self, new_config: AudioControllerConfig) -> Result<()> { let was_streaming = self.is_streaming().await; - // Stop streaming if running + // Stop streaming if running (device/quality/enabled may all change) if was_streaming { self.stop_streaming().await?; } @@ -389,8 +389,10 @@ impl AudioController { // Update config *self.config.write().await = new_config.clone(); - // Restart streaming if it was running and still enabled - if was_streaming && new_config.enabled { + // Start whenever audio is enabled — not only when we were already streaming. + // Otherwise PATCH /config/audio alone leaves enabled=true with no capture until + // POST /audio/start, which races WebRTC reconnect and matches "apply twice" reports. + if new_config.enabled { self.start_streaming().await?; } diff --git a/src/audio/mod.rs b/src/audio/mod.rs index 829bef91..dfd203e1 100644 --- a/src/audio/mod.rs +++ b/src/audio/mod.rs @@ -13,6 +13,7 @@ pub mod controller; pub mod device; pub mod encoder; pub mod monitor; +pub mod resample; pub mod streamer; pub use capture::{AudioCapturer, AudioConfig, AudioFrame}; diff --git a/src/audio/resample.rs b/src/audio/resample.rs new file mode 100644 index 00000000..02135bdc --- /dev/null +++ b/src/audio/resample.rs @@ -0,0 +1,202 @@ +//! Resample capture PCM to 48 kHz stereo for Opus (fixed 20 ms / 960×2 samples). + +const OUT_RATE: f64 = 48000.0; +const OPUS_STEREO_SAMPLES: usize = 960 * 2; + +enum PipelineState { + /// Native 48 kHz interleaved stereo: only buffer and slice into 20 ms blocks (no float work). + Stereo48kPassthrough, + /// Other rates / mono: linear interpolation to 48 kHz stereo. + Resample { + in_rate: u32, + in_channels: u32, + next_out_frame: u64, + buffer_start_frame: u64, + }, +} + +/// Converts incoming interleaved PCM to 48 kHz stereo, then exposes fixed 960×2-sample chunks. +pub struct Opus48kPcmBuffer { + state: PipelineState, + pending: Vec, +} + +impl Opus48kPcmBuffer { + pub fn new(in_rate: u32, in_channels: u32) -> Self { + let ch = in_channels.max(1); + let rate = in_rate.max(1); + let state = if rate == 48000 && ch == 2 { + PipelineState::Stereo48kPassthrough + } else { + PipelineState::Resample { + in_rate: rate, + in_channels: ch, + next_out_frame: 0, + buffer_start_frame: 0, + } + }; + Self { + state, + pending: Vec::new(), + } + } + + /// True when input is already 48 kHz stereo (no interpolation loop). + #[cfg(test)] + pub fn is_passthrough(&self) -> bool { + matches!(self.state, PipelineState::Stereo48kPassthrough) + } + + /// Append one capture block (`sample_rate` must match the rate this buffer was built for). + pub fn push_interleaved(&mut self, data: &[i16]) { + self.pending.extend_from_slice(data); + } + + /// Drain as many 960×2 stereo S16LE samples (20 ms @ 48 kHz) as possible. + pub fn pop_opus_frames(&mut self, out: &mut Vec) { + match &mut self.state { + PipelineState::Stereo48kPassthrough => { + while self.pending.len() >= OPUS_STEREO_SAMPLES { + out.extend_from_slice(&self.pending[..OPUS_STEREO_SAMPLES]); + self.pending.drain(..OPUS_STEREO_SAMPLES); + } + } + PipelineState::Resample { + in_rate, + in_channels, + next_out_frame, + buffer_start_frame, + } => { + let ch = *in_channels as usize; + if ch == 0 { + return; + } + + loop { + let batch_start = *next_out_frame; + let mut block = Vec::with_capacity(OPUS_STEREO_SAMPLES); + let mut complete = true; + + for i in 0u64..960 { + let k = batch_start + i; + let p_abs = (k as f64) * (*in_rate as f64) / OUT_RATE; + let f_abs = p_abs.floor() as u64; + let frac = p_abs - f_abs as f64; + + let f_rel = f_abs.saturating_sub(*buffer_start_frame) as usize; + if f_rel + 1 >= self.pending.len() / ch { + complete = false; + break; + } + + let base0 = f_rel * ch; + let base1 = (f_rel + 1) * ch; + + let (l, r) = if *in_channels >= 2 { + let l0 = self.pending[base0] as f64; + let l1 = self.pending[base1] as f64; + let r0 = self.pending[base0 + 1] as f64; + let r1 = self.pending[base1 + 1] as f64; + (l0 + frac * (l1 - l0), r0 + frac * (r1 - r0)) + } else { + let m0 = self.pending[base0] as f64; + let m1 = self.pending[base1] as f64; + let v = m0 + frac * (m1 - m0); + (v, v) + }; + + block.push(clamp_f64_to_i16(l)); + block.push(clamp_f64_to_i16(r)); + } + + if !complete || block.len() != OPUS_STEREO_SAMPLES { + break; + } + + out.extend_from_slice(&block); + *next_out_frame = batch_start + 960; + trim_resample_prefix( + &mut self.pending, + *in_rate, + *next_out_frame, + buffer_start_frame, + ch, + ); + } + } + } + } +} + +fn trim_resample_prefix( + pending: &mut Vec, + in_rate: u32, + next_out_frame: u64, + buffer_start_frame: &mut u64, + ch: usize, +) { + if pending.is_empty() { + return; + } + + let p_next = (next_out_frame as f64) * (in_rate as f64) / OUT_RATE; + let need_abs = p_next.floor() as u64; + let keep_from_abs = need_abs.saturating_sub(1); + if keep_from_abs <= *buffer_start_frame { + return; + } + + let drop_frames = (keep_from_abs - *buffer_start_frame) as usize; + let drop_samples = drop_frames.saturating_mul(ch).min(pending.len()); + if drop_samples > 0 { + pending.drain(0..drop_samples); + *buffer_start_frame += drop_frames as u64; + } +} + +#[inline] +fn clamp_f64_to_i16(v: f64) -> i16 { + v.round().clamp(i16::MIN as f64, i16::MAX as f64) as i16 +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn passthrough_48k_identity_tone_length() { + let mut buf = Opus48kPcmBuffer::new(48000, 2); + assert!(buf.is_passthrough()); + let mut chunk = vec![0i16; 960 * 2]; + for i in 0..960 { + let s = (i as f32 * 0.1).sin() * 3000.0; + chunk[2 * i] = s as i16; + chunk[2 * i + 1] = s as i16; + } + buf.push_interleaved(&chunk); + let mut out = Vec::new(); + buf.pop_opus_frames(&mut out); + assert_eq!(out.len(), 960 * 2); + } + + #[test] + fn upsample_44k_to_48k_chunk() { + let mut buf = Opus48kPcmBuffer::new(44100, 2); + assert!(!buf.is_passthrough()); + let mut chunk = vec![0i16; 882 * 2]; + for i in 0..882 { + chunk[2 * i] = (i as i16).wrapping_mul(10); + chunk[2 * i + 1] = (i as i16).wrapping_mul(-7); + } + buf.push_interleaved(&chunk); + let mut out = Vec::new(); + buf.pop_opus_frames(&mut out); + assert_eq!(out.len(), 960 * 2, "expected one 20ms Opus block"); + } + + #[test] + fn mono_48k_not_passthrough() { + let buf = Opus48kPcmBuffer::new(48000, 1); + assert!(!buf.is_passthrough()); + } +} diff --git a/src/audio/streamer.rs b/src/audio/streamer.rs index f05773fd..6f6db04e 100644 --- a/src/audio/streamer.rs +++ b/src/audio/streamer.rs @@ -9,9 +9,12 @@ use std::time::Instant; use tokio::sync::{broadcast, watch, Mutex, RwLock}; use tracing::{error, info, warn}; -use super::capture::{AudioCapturer, AudioConfig, CaptureState}; +use super::capture::{AudioCapturer, AudioConfig, AudioFrame, CaptureState}; use super::encoder::{OpusConfig, OpusEncoder, OpusFrame}; +use super::resample::Opus48kPcmBuffer; use crate::error::{AppError, Result}; +use bytemuck; +use bytes::Bytes; /// Audio stream state #[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] @@ -254,6 +257,9 @@ impl AudioStreamer { info!("Audio stream task started"); + let mut to_48k: Option = None; + let mut queued_48k: Vec = Vec::new(); + loop { // Check stop flag (atomic, no async lock needed) if stop_flag.load(Ordering::Relaxed) { @@ -273,27 +279,56 @@ impl AudioStreamer { match recv_result { Ok(Ok(audio_frame)) => { - // Encode to Opus - let opus_result = { - let mut enc_guard = encoder.lock().await; - (*enc_guard) - .as_mut() - .map(|enc| enc.encode_frame(&audio_frame)) + if to_48k.is_none() { + to_48k = Some(Opus48kPcmBuffer::new( + audio_frame.sample_rate, + audio_frame.channels, + )); + } + let pipeline = match to_48k.as_mut() { + Some(p) => p, + None => continue, }; - match opus_result { - Some(Ok(opus_frame)) => { - // Publish latest frame to subscribers - if opus_tx.receiver_count() > 0 { - let _ = opus_tx.send(Some(Arc::new(opus_frame))); + let samples: &[i16] = match bytemuck::try_cast_slice(&audio_frame.data) { + Ok(s) => s, + Err(_) => { + warn!("Audio frame size not multiple of 2; skipping"); + continue; + } + }; + if !samples.is_empty() { + pipeline.push_interleaved(samples); + } + pipeline.pop_opus_frames(&mut queued_48k); + + while queued_48k.len() >= 960 * 2 { + let pcm_20ms = + Bytes::copy_from_slice(bytemuck::cast_slice(&queued_48k[..960 * 2])); + queued_48k.drain(..960 * 2); + + let frame_48k = AudioFrame::new_interleaved(pcm_20ms, 2, 48000, 0); + + let opus_result = { + let mut enc_guard = encoder.lock().await; + (*enc_guard) + .as_mut() + .map(|enc| enc.encode_frame(&frame_48k)) + }; + + match opus_result { + Some(Ok(opus_frame)) => { + if opus_tx.receiver_count() > 0 { + let _ = opus_tx.send(Some(Arc::new(opus_frame))); + } + } + Some(Err(e)) => { + error!("Opus encode error: {}", e); + } + None => { + warn!("Encoder not available"); + break; } - } - Some(Err(e)) => { - error!("Opus encode error: {}", e); - } - None => { - warn!("Encoder not available"); - break; } } } diff --git a/src/rtsp/service.rs b/src/rtsp/service.rs index 8f3afb08..245a0b7d 100644 --- a/src/rtsp/service.rs +++ b/src/rtsp/service.rs @@ -1142,11 +1142,7 @@ fn rtp_timestamp_increment(frame_duration: Duration) -> u32 { } /// Prefer PTS-based RTP time when it advances; otherwise step by `frame_duration` in 90 kHz units. -fn monotonic_rtp_timestamp( - pts_ms: i64, - last: &mut u32, - frame_duration: Duration, -) -> u32 { +fn monotonic_rtp_timestamp(pts_ms: i64, last: &mut u32, frame_duration: Duration) -> u32 { let from_pts = pts_to_rtp_timestamp(pts_ms); let inc = rtp_timestamp_increment(frame_duration); let ts = if from_pts > *last { diff --git a/src/video/encoder/h265.rs b/src/video/encoder/h265.rs index 16e37a71..45faafa2 100644 --- a/src/video/encoder/h265.rs +++ b/src/video/encoder/h265.rs @@ -318,18 +318,26 @@ impl H265Encoder { ) } else { match config.input_format { - H265InputFormat::Nv12 => { - ("nv12", AVPixelFormat::AV_PIX_FMT_NV12, H265InputFormat::Nv12) - } - H265InputFormat::Nv21 => { - ("nv21", AVPixelFormat::AV_PIX_FMT_NV21, H265InputFormat::Nv21) - } - H265InputFormat::Nv16 => { - ("nv16", AVPixelFormat::AV_PIX_FMT_NV16, H265InputFormat::Nv16) - } - H265InputFormat::Nv24 => { - ("nv24", AVPixelFormat::AV_PIX_FMT_NV24, H265InputFormat::Nv24) - } + H265InputFormat::Nv12 => ( + "nv12", + AVPixelFormat::AV_PIX_FMT_NV12, + H265InputFormat::Nv12, + ), + H265InputFormat::Nv21 => ( + "nv21", + AVPixelFormat::AV_PIX_FMT_NV21, + H265InputFormat::Nv21, + ), + H265InputFormat::Nv16 => ( + "nv16", + AVPixelFormat::AV_PIX_FMT_NV16, + H265InputFormat::Nv16, + ), + H265InputFormat::Nv24 => ( + "nv24", + AVPixelFormat::AV_PIX_FMT_NV24, + H265InputFormat::Nv24, + ), H265InputFormat::Yuv420p => ( "yuv420p", AVPixelFormat::AV_PIX_FMT_YUV420P, @@ -340,12 +348,16 @@ impl H265Encoder { AVPixelFormat::AV_PIX_FMT_YUYV422, H265InputFormat::Yuyv422, ), - H265InputFormat::Rgb24 => { - ("rgb24", AVPixelFormat::AV_PIX_FMT_RGB24, H265InputFormat::Rgb24) - } - H265InputFormat::Bgr24 => { - ("bgr24", AVPixelFormat::AV_PIX_FMT_BGR24, H265InputFormat::Bgr24) - } + H265InputFormat::Rgb24 => ( + "rgb24", + AVPixelFormat::AV_PIX_FMT_RGB24, + H265InputFormat::Rgb24, + ), + H265InputFormat::Bgr24 => ( + "bgr24", + AVPixelFormat::AV_PIX_FMT_BGR24, + H265InputFormat::Bgr24, + ), } }; let pixfmt = resolve_pixel_format(pixfmt_name, pixfmt_fallback); diff --git a/src/video/streamer.rs b/src/video/streamer.rs index 666e6144..572d8907 100644 --- a/src/video/streamer.rs +++ b/src/video/streamer.rs @@ -270,8 +270,7 @@ impl Streamer { .find(|d| d.path.to_string_lossy() == device_path) .ok_or_else(|| AppError::VideoError("Video device not found".to_string()))?; - let (format, resolution) = - self.resolve_capture_config(&device, format, resolution)?; + let (format, resolution) = self.resolve_capture_config(&device, format, resolution)?; // IMPORTANT: Disconnect all MJPEG clients FIRST before stopping capture // This prevents race conditions where clients try to reconnect and reopen the device @@ -807,9 +806,7 @@ impl Streamer { // Soft-restart after exponential back-off. if let Some(since) = no_signal_since { let backoff_secs = NOSIGNAL_SOFT_RESTART_SECS - .saturating_mul( - 2u64.pow(no_signal_restart_count.min(2)), - ) + .saturating_mul(2u64.pow(no_signal_restart_count.min(2))) .min(30); if since.elapsed().as_secs() >= backoff_secs { info!( @@ -858,10 +855,7 @@ impl Streamer { if capture_error_throttler.should_log(&key) { let suppressed = suppressed_capture_errors.remove(&key).unwrap_or(0); if suppressed > 0 { - error!( - "Capture error: {} (suppressed {} repeats)", - e, suppressed - ); + error!("Capture error: {} (suppressed {} repeats)", e, suppressed); } else { error!("Capture error: {}", e); } @@ -991,9 +985,8 @@ impl Streamer { /// resolution / format change on the source side is picked up before /// the capture stream is re-opened. pub async fn re_init_device(self: &Arc, device_path: &str) -> Result<()> { - let device = VideoDevice::open_readonly(device_path).map_err(|e| { - AppError::VideoError(format!("Cannot open device for re-init: {}", e)) - })?; + let device = VideoDevice::open_readonly(device_path) + .map_err(|e| AppError::VideoError(format!("Cannot open device for re-init: {}", e)))?; let device_info = device.info()?; let (format, resolution) = { diff --git a/src/web/handlers/mod.rs b/src/web/handlers/mod.rs index aa004d8b..de8567cd 100644 --- a/src/web/handlers/mod.rs +++ b/src/web/handlers/mod.rs @@ -1295,12 +1295,15 @@ pub async fn stream_mode_set( // switch_mode_transaction treats this as "no switch needed" since StreamMode // is still WebRTC, so we handle codec change + event emission here. let current_mode = state.stream_manager.current_mode().await; - let prev_codec = state.stream_manager.webrtc_streamer().current_video_codec().await; + let prev_codec = state + .stream_manager + .webrtc_streamer() + .current_video_codec() + .await; let codec_changed = video_codec.is_some_and(|c| c != prev_codec); - let is_codec_only_switch = current_mode == StreamMode::WebRTC - && new_mode == StreamMode::WebRTC - && codec_changed; + let is_codec_only_switch = + current_mode == StreamMode::WebRTC && new_mode == StreamMode::WebRTC && codec_changed; if let Some(codec) = video_codec { info!("Setting WebRTC video codec to {:?}", codec); @@ -1321,11 +1324,7 @@ pub async fn stream_mode_set( state .stream_manager - .notify_codec_switch( - &transition_id, - requested_mode_str, - &codec_to_id(prev_codec), - ) + .notify_codec_switch(&transition_id, requested_mode_str, &codec_to_id(prev_codec)) .await; return Ok(Json(StreamModeResponse {