From c8fd3648adfddfdd8efa5f1aaf6ee3516e3b0d3a Mon Sep 17 00:00:00 2001 From: mofeng-git Date: Fri, 27 Mar 2026 08:21:14 +0800 Subject: [PATCH] =?UTF-8?q?refactor(video):=20=E5=88=A0=E9=99=A4=E5=BA=9F?= =?UTF-8?q?=E5=BC=83=E8=A7=86=E9=A2=91=E6=B5=81=E6=B0=B4=E7=BA=BF=E5=B9=B6?= =?UTF-8?q?=E6=94=B6=E6=95=9B=20MJPEG/WebRTC=20=E7=BC=96=E6=8E=92=E4=B8=8E?= =?UTF-8?q?=E6=AD=BB=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/audio/monitor.rs | 6 +- src/hid/ch9329.rs | 16 - src/msd/monitor.rs | 10 +- src/stream/mjpeg_streamer.rs | 700 -------------- src/stream/mod.rs | 5 - src/video/capture.rs | 560 ----------- src/video/encoder/h264.rs | 4 - src/video/h264_pipeline.rs | 444 --------- src/video/mod.rs | 8 - src/video/shared_video_pipeline.rs | 886 +----------------- .../shared_video_pipeline/encoder_state.rs | 639 +++++++++++++ src/video/stream_manager.rs | 82 +- src/video/video_session.rs | 590 ------------ src/webrtc/h265_payloader.rs | 11 - src/webrtc/mod.rs | 2 +- src/webrtc/track.rs | 3 - 16 files changed, 672 insertions(+), 3294 deletions(-) delete mode 100644 src/stream/mjpeg_streamer.rs delete mode 100644 src/video/capture.rs delete mode 100644 src/video/h264_pipeline.rs create mode 100644 src/video/shared_video_pipeline/encoder_state.rs delete mode 100644 src/video/video_session.rs diff --git a/src/audio/monitor.rs b/src/audio/monitor.rs index 9ee57fbe..c7dfd049 100644 --- a/src/audio/monitor.rs +++ b/src/audio/monitor.rs @@ -6,7 +6,7 @@ //! - Error tracking //! - Log throttling to prevent log flooding -use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; +use std::sync::atomic::{AtomicU32, Ordering}; use std::time::Duration; use tokio::sync::RwLock; use tracing::{info, warn}; @@ -63,9 +63,6 @@ pub struct AudioHealthMonitor { throttler: LogThrottler, /// Configuration config: AudioMonitorConfig, - /// Whether monitoring is active (reserved for future use) - #[allow(dead_code)] - running: AtomicBool, /// Current retry count retry_count: AtomicU32, /// Last error code (for change detection) @@ -80,7 +77,6 @@ impl AudioHealthMonitor { status: RwLock::new(AudioHealthStatus::Healthy), throttler: LogThrottler::with_secs(throttle_secs), config, - running: AtomicBool::new(false), retry_count: AtomicU32::new(0), last_error_code: RwLock::new(None), } diff --git a/src/hid/ch9329.rs b/src/hid/ch9329.rs index 0a9693d6..f430cf4b 100644 --- a/src/hid/ch9329.rs +++ b/src/hid/ch9329.rs @@ -41,10 +41,6 @@ const PACKET_HEADER: [u8; 2] = [0x57, 0xAB]; /// Default address (accepts any address) const DEFAULT_ADDR: u8 = 0x00; -/// Broadcast address (no response required) -#[allow(dead_code)] -const BROADCAST_ADDR: u8 = 0xFF; - /// Default baud rate for CH9329 pub const DEFAULT_BAUD_RATE: u32 = 9600; @@ -67,7 +63,6 @@ const RECONNECT_DELAY_MS: u64 = 2000; const INIT_WAIT_MS: u64 = 3000; /// CH9329 command codes -#[allow(dead_code)] pub mod cmd { /// Get chip version, USB status, and LED status pub const GET_INFO: u8 = 0x01; @@ -81,16 +76,6 @@ pub mod cmd { pub const SEND_MS_REL_DATA: u8 = 0x05; /// Send custom HID data pub const SEND_MY_HID_DATA: u8 = 0x06; - /// Read custom HID data (sent by chip automatically) - pub const READ_MY_HID_DATA: u8 = 0x87; - /// Get parameter configuration - pub const GET_PARA_CFG: u8 = 0x08; - /// Set parameter configuration - pub const SET_PARA_CFG: u8 = 0x09; - /// Get USB string descriptor - pub const GET_USB_STRING: u8 = 0x0A; - /// Set USB string descriptor - pub const SET_USB_STRING: u8 = 0x0B; /// Restore factory default configuration pub const SET_DEFAULT_CFG: u8 = 0x0C; /// Software reset @@ -98,7 +83,6 @@ pub mod cmd { } /// Response command mask (success = cmd | 0x80, error = cmd | 0xC0) -#[allow(dead_code)] const RESPONSE_SUCCESS_MASK: u8 = 0x80; const RESPONSE_ERROR_MASK: u8 = 0xC0; diff --git a/src/msd/monitor.rs b/src/msd/monitor.rs index b667e48e..46030b40 100644 --- a/src/msd/monitor.rs +++ b/src/msd/monitor.rs @@ -6,7 +6,7 @@ //! - Error state tracking //! - Log throttling to prevent log flooding -use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; +use std::sync::atomic::{AtomicU32, Ordering}; use tokio::sync::RwLock; use tracing::{info, warn}; @@ -50,12 +50,6 @@ pub struct MsdHealthMonitor { status: RwLock, /// Log throttler to prevent log flooding throttler: LogThrottler, - /// Configuration - #[allow(dead_code)] - config: MsdMonitorConfig, - /// Whether monitoring is active (reserved for future use) - #[allow(dead_code)] - running: AtomicBool, /// Error count (for tracking) error_count: AtomicU32, /// Last error code (for change detection) @@ -69,8 +63,6 @@ impl MsdHealthMonitor { Self { status: RwLock::new(MsdHealthStatus::Healthy), throttler: LogThrottler::with_secs(throttle_secs), - config, - running: AtomicBool::new(false), error_count: AtomicU32::new(0), last_error_code: RwLock::new(None), } diff --git a/src/stream/mjpeg_streamer.rs b/src/stream/mjpeg_streamer.rs deleted file mode 100644 index 6fbd7379..00000000 --- a/src/stream/mjpeg_streamer.rs +++ /dev/null @@ -1,700 +0,0 @@ -//! MJPEG Streamer - High-level MJPEG/HTTP streaming manager -//! -//! This module provides a unified interface for MJPEG streaming mode, -//! integrating video capture, MJPEG distribution, and WebSocket HID. -//! -//! # Architecture -//! -//! ```text -//! MjpegStreamer -//! | -//! +-- VideoCapturer (V4L2 video capture) -//! +-- MjpegStreamHandler (HTTP multipart video) -//! +-- WsHidHandler (WebSocket HID) -//! ``` -//! -//! Note: Audio WebSocket is handled separately by audio_ws.rs (/api/ws/audio) - -use crate::utils::LogThrottler; -use crate::video::v4l2r_capture::V4l2rCaptureStream; -use std::collections::HashMap; -use std::io; -use std::path::PathBuf; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; -use std::time::Duration; -use tokio::sync::{Mutex, RwLock}; -use tracing::{error, info, warn}; - -use crate::audio::AudioController; -use crate::error::{AppError, Result}; -use crate::events::{EventBus, SystemEvent}; -use crate::hid::HidController; -use crate::video::capture::{CaptureConfig, VideoCapturer}; -use crate::video::device::{enumerate_devices, find_best_device, VideoDeviceInfo}; -use crate::video::format::{PixelFormat, Resolution}; -use crate::video::frame::{FrameBuffer, FrameBufferPool, VideoFrame}; - -use super::mjpeg::MjpegStreamHandler; -use super::ws_hid::WsHidHandler; - -/// Minimum valid frame size for capture -const MIN_CAPTURE_FRAME_SIZE: usize = 128; -/// Validate JPEG header every N frames to reduce overhead -const JPEG_VALIDATE_INTERVAL: u64 = 30; - -/// MJPEG streamer configuration -#[derive(Debug, Clone)] -pub struct MjpegStreamerConfig { - /// Device path (None = auto-detect) - pub device_path: Option, - /// Desired resolution - pub resolution: Resolution, - /// Desired format - pub format: PixelFormat, - /// Desired FPS - pub fps: u32, - /// JPEG quality (1-100) - pub jpeg_quality: u8, -} - -impl Default for MjpegStreamerConfig { - fn default() -> Self { - Self { - device_path: None, - resolution: Resolution::HD1080, - format: PixelFormat::Mjpeg, - fps: 30, - jpeg_quality: 80, - } - } -} - -/// MJPEG streamer state -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum MjpegStreamerState { - /// Not initialized - Uninitialized, - /// Ready but not streaming - Ready, - /// Actively streaming - Streaming, - /// No video signal - NoSignal, - /// Error occurred - Error, -} - -impl std::fmt::Display for MjpegStreamerState { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - MjpegStreamerState::Uninitialized => write!(f, "uninitialized"), - MjpegStreamerState::Ready => write!(f, "ready"), - MjpegStreamerState::Streaming => write!(f, "streaming"), - MjpegStreamerState::NoSignal => write!(f, "no_signal"), - MjpegStreamerState::Error => write!(f, "error"), - } - } -} - -/// MJPEG streamer statistics -#[derive(Debug, Clone, Default)] -pub struct MjpegStreamerStats { - /// Current state - pub state: String, - /// Current device path - pub device: Option, - /// Video resolution - pub resolution: Option<(u32, u32)>, - /// Video format - pub format: Option, - /// Current FPS - pub fps: u32, - /// MJPEG client count - pub mjpeg_clients: u64, - /// WebSocket HID client count - pub ws_hid_clients: usize, -} - -/// MJPEG Streamer -/// -/// High-level manager for MJPEG/HTTP streaming mode. -/// Integrates video capture, MJPEG distribution, and WebSocket HID. -pub struct MjpegStreamer { - // === Video === - config: RwLock, - capturer: RwLock>>, - mjpeg_handler: Arc, - current_device: RwLock>, - state: RwLock, - - // === Audio (controller reference only, WS handled by audio_ws.rs) === - audio_controller: RwLock>>, - audio_enabled: AtomicBool, - - // === HID === - ws_hid_handler: Arc, - hid_controller: RwLock>>, - - // === Control === - start_lock: tokio::sync::Mutex<()>, - direct_stop: AtomicBool, - direct_active: AtomicBool, - direct_handle: Mutex>>, - events: RwLock>>, - config_changing: AtomicBool, -} - -impl MjpegStreamer { - /// Create a new MJPEG streamer - pub fn new() -> Arc { - Arc::new(Self { - config: RwLock::new(MjpegStreamerConfig::default()), - capturer: RwLock::new(None), - mjpeg_handler: Arc::new(MjpegStreamHandler::new()), - current_device: RwLock::new(None), - state: RwLock::new(MjpegStreamerState::Uninitialized), - audio_controller: RwLock::new(None), - audio_enabled: AtomicBool::new(false), - ws_hid_handler: WsHidHandler::new(), - hid_controller: RwLock::new(None), - start_lock: tokio::sync::Mutex::new(()), - direct_stop: AtomicBool::new(false), - direct_active: AtomicBool::new(false), - direct_handle: Mutex::new(None), - events: RwLock::new(None), - config_changing: AtomicBool::new(false), - }) - } - - /// Create with specific config - pub fn with_config(config: MjpegStreamerConfig) -> Arc { - Arc::new(Self { - config: RwLock::new(config), - capturer: RwLock::new(None), - mjpeg_handler: Arc::new(MjpegStreamHandler::new()), - current_device: RwLock::new(None), - state: RwLock::new(MjpegStreamerState::Uninitialized), - audio_controller: RwLock::new(None), - audio_enabled: AtomicBool::new(false), - ws_hid_handler: WsHidHandler::new(), - hid_controller: RwLock::new(None), - start_lock: tokio::sync::Mutex::new(()), - direct_stop: AtomicBool::new(false), - direct_active: AtomicBool::new(false), - direct_handle: Mutex::new(None), - events: RwLock::new(None), - config_changing: AtomicBool::new(false), - }) - } - - // ======================================================================== - // Configuration and Setup - // ======================================================================== - - /// Set event bus for broadcasting state changes - pub async fn set_event_bus(&self, events: Arc) { - *self.events.write().await = Some(events); - } - - /// Set audio controller (for reference, WebSocket handled by audio_ws.rs) - pub async fn set_audio_controller(&self, audio: Arc) { - *self.audio_controller.write().await = Some(audio); - info!("MjpegStreamer: Audio controller set"); - } - - /// Set HID controller - pub async fn set_hid_controller(&self, hid: Arc) { - *self.hid_controller.write().await = Some(hid.clone()); - self.ws_hid_handler.set_hid_controller(hid); - info!("MjpegStreamer: HID controller set"); - } - - /// Enable or disable audio - pub fn set_audio_enabled(&self, enabled: bool) { - self.audio_enabled.store(enabled, Ordering::SeqCst); - } - - /// Check if audio is enabled - pub fn is_audio_enabled(&self) -> bool { - self.audio_enabled.load(Ordering::SeqCst) - } - - // ======================================================================== - // State and Status - // ======================================================================== - - /// Get current state - pub async fn state(&self) -> MjpegStreamerState { - *self.state.read().await - } - - /// Check if config is currently being changed - pub fn is_config_changing(&self) -> bool { - self.config_changing.load(Ordering::SeqCst) - } - - /// Get current device info - pub async fn current_device(&self) -> Option { - self.current_device.read().await.clone() - } - - /// Get statistics - pub async fn stats(&self) -> MjpegStreamerStats { - let state = *self.state.read().await; - let device = self.current_device.read().await; - let config = self.config.read().await; - - let (resolution, format) = { - if self.direct_active.load(Ordering::Relaxed) { - ( - Some((config.resolution.width, config.resolution.height)), - Some(config.format.to_string()), - ) - } else if let Some(ref cap) = *self.capturer.read().await { - let _ = cap; - ( - Some((config.resolution.width, config.resolution.height)), - Some(config.format.to_string()), - ) - } else { - (None, None) - } - }; - - MjpegStreamerStats { - state: state.to_string(), - device: device.as_ref().map(|d| d.path.display().to_string()), - resolution, - format, - fps: config.fps, - mjpeg_clients: self.mjpeg_handler.client_count(), - ws_hid_clients: self.ws_hid_handler.client_count(), - } - } - - // ======================================================================== - // Handler Access - // ======================================================================== - - /// Get MJPEG handler for HTTP streaming - pub fn mjpeg_handler(&self) -> Arc { - self.mjpeg_handler.clone() - } - - /// Get WebSocket HID handler - pub fn ws_hid_handler(&self) -> Arc { - self.ws_hid_handler.clone() - } - - // ======================================================================== - // Initialization - // ======================================================================== - - /// Initialize with auto-detected device - pub async fn init_auto(self: &Arc) -> Result<()> { - let best = find_best_device()?; - self.init_with_device(best).await - } - - /// Initialize with specific device - pub async fn init_with_device(self: &Arc, device: VideoDeviceInfo) -> Result<()> { - info!( - "MjpegStreamer: Initializing with device: {}", - device.path.display() - ); - - let config = self.config.read().await.clone(); - self.mjpeg_handler.set_jpeg_quality(config.jpeg_quality); - - // Create capture config - let capture_config = CaptureConfig { - device_path: device.path.clone(), - resolution: config.resolution, - format: config.format, - fps: config.fps, - buffer_count: 4, - timeout: std::time::Duration::from_secs(5), - jpeg_quality: config.jpeg_quality, - }; - - // Create capturer - let capturer = Arc::new(VideoCapturer::new(capture_config)); - - // Store device and capturer - *self.current_device.write().await = Some(device); - *self.capturer.write().await = Some(capturer); - *self.state.write().await = MjpegStreamerState::Ready; - - self.publish_state_change().await; - Ok(()) - } - - // ======================================================================== - // Streaming Control - // ======================================================================== - - /// Start streaming - pub async fn start(self: &Arc) -> Result<()> { - let _lock = self.start_lock.lock().await; - - if self.config_changing.load(Ordering::SeqCst) { - return Err(AppError::VideoError( - "Config change in progress".to_string(), - )); - } - - let state = *self.state.read().await; - if state == MjpegStreamerState::Streaming { - return Ok(()); - } - - let device = self - .current_device - .read() - .await - .clone() - .ok_or_else(|| AppError::VideoError("Not initialized".to_string()))?; - - let config = self.config.read().await.clone(); - - self.direct_stop.store(false, Ordering::SeqCst); - self.direct_active.store(true, Ordering::SeqCst); - - let streamer = self.clone(); - let handle = tokio::task::spawn_blocking(move || { - streamer.run_direct_capture(device.path, config); - }); - *self.direct_handle.lock().await = Some(handle); - - // Note: Audio WebSocket is handled separately by audio_ws.rs (/api/ws/audio) - - *self.state.write().await = MjpegStreamerState::Streaming; - self.mjpeg_handler.set_online(); - - self.publish_state_change().await; - info!("MjpegStreamer: Streaming started"); - Ok(()) - } - - /// Stop streaming - pub async fn stop(&self) -> Result<()> { - let state = *self.state.read().await; - if state != MjpegStreamerState::Streaming { - return Ok(()); - } - - self.direct_stop.store(true, Ordering::SeqCst); - - if let Some(handle) = self.direct_handle.lock().await.take() { - let _ = handle.await; - } - self.direct_active.store(false, Ordering::SeqCst); - - // Stop capturer (legacy path) - if let Some(ref cap) = *self.capturer.read().await { - let _ = cap.stop().await; - } - - // Set offline - self.mjpeg_handler.set_offline(); - *self.state.write().await = MjpegStreamerState::Ready; - - self.publish_state_change().await; - info!("MjpegStreamer: Streaming stopped"); - Ok(()) - } - - /// Check if streaming - pub async fn is_streaming(&self) -> bool { - *self.state.read().await == MjpegStreamerState::Streaming - } - - // ======================================================================== - // Configuration Updates - // ======================================================================== - - /// Apply video configuration - /// - /// This stops the current stream, reconfigures the capturer, and restarts. - pub async fn apply_config(self: &Arc, config: MjpegStreamerConfig) -> Result<()> { - info!("MjpegStreamer: Applying config: {:?}", config); - - self.config_changing.store(true, Ordering::SeqCst); - - // Stop current stream - self.stop().await?; - - // Disconnect all MJPEG clients - self.mjpeg_handler.disconnect_all_clients(); - - // Release capturer - *self.capturer.write().await = None; - - // Update config - *self.config.write().await = config.clone(); - self.mjpeg_handler.set_jpeg_quality(config.jpeg_quality); - - // Re-initialize if device path is set - if let Some(ref path) = config.device_path { - let devices = enumerate_devices()?; - let device = devices - .into_iter() - .find(|d| d.path == *path) - .ok_or_else(|| { - AppError::VideoError(format!("Device not found: {}", path.display())) - })?; - - self.init_with_device(device).await?; - } - - self.config_changing.store(false, Ordering::SeqCst); - self.publish_state_change().await; - - Ok(()) - } - - // ======================================================================== - // Internal - // ======================================================================== - - /// Publish state change event - async fn publish_state_change(&self) { - if let Some(ref events) = *self.events.read().await { - let state = *self.state.read().await; - let device = self.current_device.read().await; - - events.publish(SystemEvent::StreamStateChanged { - state: state.to_string(), - device: device.as_ref().map(|d| d.path.display().to_string()), - }); - } - } - - /// Direct capture loop for MJPEG mode (single loop, no broadcast) - fn run_direct_capture(self: Arc, device_path: PathBuf, config: MjpegStreamerConfig) { - const MAX_RETRIES: u32 = 5; - const RETRY_DELAY_MS: u64 = 200; - - let handle = tokio::runtime::Handle::current(); - let mut last_state = MjpegStreamerState::Streaming; - - let mut set_state = |new_state: MjpegStreamerState| { - if new_state != last_state { - handle.block_on(async { - *self.state.write().await = new_state; - self.publish_state_change().await; - }); - last_state = new_state; - } - }; - - let mut stream_opt: Option = None; - let mut last_error: Option = None; - - for attempt in 0..MAX_RETRIES { - if self.direct_stop.load(Ordering::Relaxed) { - self.direct_active.store(false, Ordering::SeqCst); - return; - } - - match V4l2rCaptureStream::open( - &device_path, - config.resolution, - config.format, - config.fps, - 4, - Duration::from_secs(2), - ) { - Ok(stream) => { - stream_opt = Some(stream); - break; - } - Err(e) => { - let err_str = e.to_string(); - if err_str.contains("busy") || err_str.contains("resource") { - warn!( - "Device busy on attempt {}/{}, retrying in {}ms...", - attempt + 1, - MAX_RETRIES, - RETRY_DELAY_MS - ); - std::thread::sleep(std::time::Duration::from_millis(RETRY_DELAY_MS)); - last_error = Some(err_str); - continue; - } - last_error = Some(err_str); - break; - } - } - } - - let mut stream = match stream_opt { - Some(stream) => stream, - None => { - error!( - "Failed to open device {:?}: {}", - device_path, - last_error.unwrap_or_else(|| "unknown error".to_string()) - ); - set_state(MjpegStreamerState::Error); - self.mjpeg_handler.set_offline(); - self.direct_active.store(false, Ordering::SeqCst); - return; - } - }; - - let resolution = stream.resolution(); - let pixel_format = stream.format(); - let stride = stream.stride(); - - info!( - "Capture format: {}x{} {:?} stride={}", - resolution.width, resolution.height, pixel_format, stride - ); - - let buffer_pool = Arc::new(FrameBufferPool::new(8)); - let mut signal_present = true; - let mut validate_counter: u64 = 0; - let capture_error_throttler = LogThrottler::with_secs(5); - let mut suppressed_capture_errors: HashMap = HashMap::new(); - - let classify_capture_error = |err: &std::io::Error| -> String { - let message = err.to_string(); - if message.contains("dqbuf failed") && message.contains("EINVAL") { - "capture_dqbuf_einval".to_string() - } else if message.contains("dqbuf failed") { - "capture_dqbuf".to_string() - } else { - format!("capture_{:?}", err.kind()) - } - }; - - while !self.direct_stop.load(Ordering::Relaxed) { - let mut owned = buffer_pool.take(MIN_CAPTURE_FRAME_SIZE); - let meta = match stream.next_into(&mut owned) { - Ok(meta) => meta, - Err(e) => { - if e.kind() == io::ErrorKind::TimedOut { - if signal_present { - signal_present = false; - set_state(MjpegStreamerState::NoSignal); - } - std::thread::sleep(std::time::Duration::from_millis(100)); - continue; - } - - let is_device_lost = match e.raw_os_error() { - Some(6) => true, // ENXIO - Some(19) => true, // ENODEV - Some(5) => true, // EIO - Some(32) => true, // EPIPE - Some(108) => true, // ESHUTDOWN - _ => false, - }; - - if is_device_lost { - error!("Video device lost: {} - {}", device_path.display(), e); - set_state(MjpegStreamerState::Error); - self.mjpeg_handler.set_offline(); - self.direct_active.store(false, Ordering::SeqCst); - return; - } - - let key = classify_capture_error(&e); - if capture_error_throttler.should_log(&key) { - let suppressed = suppressed_capture_errors.remove(&key).unwrap_or(0); - if suppressed > 0 { - error!("Capture error: {} (suppressed {} repeats)", e, suppressed); - } else { - error!("Capture error: {}", e); - } - } else { - let counter = suppressed_capture_errors.entry(key).or_insert(0); - *counter = counter.saturating_add(1); - } - continue; - } - }; - - let frame_size = meta.bytes_used; - if frame_size < MIN_CAPTURE_FRAME_SIZE { - continue; - } - - validate_counter = validate_counter.wrapping_add(1); - if pixel_format.is_compressed() - && validate_counter.is_multiple_of(JPEG_VALIDATE_INTERVAL) - && !VideoFrame::is_valid_jpeg_bytes(&owned[..frame_size]) - { - continue; - } - - owned.truncate(frame_size); - let frame = VideoFrame::from_pooled( - Arc::new(FrameBuffer::new(owned, Some(buffer_pool.clone()))), - resolution, - pixel_format, - stride, - meta.sequence, - ); - - if !signal_present { - signal_present = true; - set_state(MjpegStreamerState::Streaming); - } - - self.mjpeg_handler.update_frame(frame); - } - - self.direct_active.store(false, Ordering::SeqCst); - } -} - -impl Default for MjpegStreamer { - fn default() -> Self { - Self { - config: RwLock::new(MjpegStreamerConfig::default()), - capturer: RwLock::new(None), - mjpeg_handler: Arc::new(MjpegStreamHandler::new()), - current_device: RwLock::new(None), - state: RwLock::new(MjpegStreamerState::Uninitialized), - audio_controller: RwLock::new(None), - audio_enabled: AtomicBool::new(false), - ws_hid_handler: WsHidHandler::new(), - hid_controller: RwLock::new(None), - start_lock: tokio::sync::Mutex::new(()), - direct_stop: AtomicBool::new(false), - direct_active: AtomicBool::new(false), - direct_handle: Mutex::new(None), - events: RwLock::new(None), - config_changing: AtomicBool::new(false), - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_mjpeg_streamer_creation() { - let streamer = MjpegStreamer::new(); - assert!(!streamer.is_config_changing()); - assert!(!streamer.is_audio_enabled()); - } - - #[test] - fn test_mjpeg_streamer_config_default() { - let config = MjpegStreamerConfig::default(); - assert_eq!(config.resolution, Resolution::HD1080); - assert_eq!(config.format, PixelFormat::Mjpeg); - assert_eq!(config.fps, 30); - } - - #[test] - fn test_mjpeg_streamer_state_display() { - assert_eq!(MjpegStreamerState::Streaming.to_string(), "streaming"); - assert_eq!(MjpegStreamerState::Ready.to_string(), "ready"); - } -} diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 110d77b1..b3237b04 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -4,16 +4,11 @@ //! //! # Components //! -//! - `MjpegStreamer` - High-level MJPEG streaming manager //! - `MjpegStreamHandler` - HTTP multipart MJPEG video streaming //! - `WsHidHandler` - WebSocket HID input handler pub mod mjpeg; -pub mod mjpeg_streamer; pub mod ws_hid; pub use mjpeg::{ClientGuard, MjpegStreamHandler}; -pub use mjpeg_streamer::{ - MjpegStreamer, MjpegStreamerConfig, MjpegStreamerState, MjpegStreamerStats, -}; pub use ws_hid::WsHidHandler; diff --git a/src/video/capture.rs b/src/video/capture.rs deleted file mode 100644 index 464227fd..00000000 --- a/src/video/capture.rs +++ /dev/null @@ -1,560 +0,0 @@ -//! V4L2 video capture implementation -//! -//! Provides async video capture using memory-mapped buffers. - -use bytes::Bytes; -use std::collections::HashMap; -use std::io; -use std::path::{Path, PathBuf}; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; -use std::time::{Duration, Instant}; -use tokio::sync::{watch, Mutex}; -use tracing::{debug, error, info, warn}; - -use super::format::{PixelFormat, Resolution}; -use super::frame::VideoFrame; -use crate::error::{AppError, Result}; -use crate::utils::LogThrottler; -use crate::video::v4l2r_capture::V4l2rCaptureStream; - -/// Default number of capture buffers (reduced from 4 to 2 for lower latency) -const DEFAULT_BUFFER_COUNT: u32 = 2; -/// Default capture timeout in seconds -const DEFAULT_TIMEOUT: u64 = 2; -/// Minimum valid frame size (bytes) -const MIN_FRAME_SIZE: usize = 128; - -/// Video capturer configuration -#[derive(Debug, Clone)] -pub struct CaptureConfig { - /// Device path - pub device_path: PathBuf, - /// Desired resolution - pub resolution: Resolution, - /// Desired pixel format - pub format: PixelFormat, - /// Desired frame rate (0 = max available) - pub fps: u32, - /// Number of capture buffers - pub buffer_count: u32, - /// Capture timeout - pub timeout: Duration, - /// JPEG quality (1-100, for MJPEG sources with hardware quality control) - pub jpeg_quality: u8, -} - -impl Default for CaptureConfig { - fn default() -> Self { - Self { - device_path: PathBuf::from("/dev/video0"), - resolution: Resolution::HD1080, - format: PixelFormat::Mjpeg, - fps: 30, - buffer_count: DEFAULT_BUFFER_COUNT, - timeout: Duration::from_secs(DEFAULT_TIMEOUT), - jpeg_quality: 80, - } - } -} - -impl CaptureConfig { - /// Create config for a specific device - pub fn for_device(path: impl AsRef) -> Self { - Self { - device_path: path.as_ref().to_path_buf(), - ..Default::default() - } - } - - /// Set resolution - pub fn with_resolution(mut self, width: u32, height: u32) -> Self { - self.resolution = Resolution::new(width, height); - self - } - - /// Set format - pub fn with_format(mut self, format: PixelFormat) -> Self { - self.format = format; - self - } - - /// Set frame rate - pub fn with_fps(mut self, fps: u32) -> Self { - self.fps = fps; - self - } -} - -/// Capture statistics -#[derive(Debug, Clone, Default)] -pub struct CaptureStats { - /// Current FPS (calculated) - pub current_fps: f32, -} - -/// Video capturer state -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum CaptureState { - /// Not started - Stopped, - /// Starting (initializing device) - Starting, - /// Running and capturing - Running, - /// No signal from source - NoSignal, - /// Error occurred - Error, - /// Device was lost (disconnected) - DeviceLost, -} - -/// Async video capturer -pub struct VideoCapturer { - config: CaptureConfig, - state: Arc>, - state_rx: watch::Receiver, - stats: Arc>, - stop_flag: Arc, - capture_handle: Mutex>>, - /// Last error that occurred (device path, reason) - last_error: Arc>>, -} - -impl VideoCapturer { - /// Create a new video capturer - pub fn new(config: CaptureConfig) -> Self { - let (state_tx, state_rx) = watch::channel(CaptureState::Stopped); - - Self { - config, - state: Arc::new(state_tx), - state_rx, - stats: Arc::new(Mutex::new(CaptureStats::default())), - stop_flag: Arc::new(AtomicBool::new(false)), - capture_handle: Mutex::new(None), - last_error: Arc::new(parking_lot::RwLock::new(None)), - } - } - - /// Get current capture state - pub fn state(&self) -> CaptureState { - *self.state_rx.borrow() - } - - /// Subscribe to state changes - pub fn state_watch(&self) -> watch::Receiver { - self.state_rx.clone() - } - - /// Get last error (device path, reason) - pub fn last_error(&self) -> Option<(String, String)> { - self.last_error.read().clone() - } - - /// Clear last error - pub fn clear_error(&self) { - *self.last_error.write() = None; - } - - /// Get capture statistics - pub async fn stats(&self) -> CaptureStats { - self.stats.lock().await.clone() - } - - /// Get config - pub fn config(&self) -> &CaptureConfig { - &self.config - } - - /// Start capturing in background - pub async fn start(&self) -> Result<()> { - let current_state = self.state(); - // Already running or starting - nothing to do - if current_state == CaptureState::Running || current_state == CaptureState::Starting { - return Ok(()); - } - - info!( - "Starting capture on {:?} at {}x{} {}", - self.config.device_path, - self.config.resolution.width, - self.config.resolution.height, - self.config.format - ); - - // Set Starting state immediately to prevent concurrent start attempts - let _ = self.state.send(CaptureState::Starting); - - // Clear any previous error - *self.last_error.write() = None; - - self.stop_flag.store(false, Ordering::SeqCst); - - let config = self.config.clone(); - let state = self.state.clone(); - let stats = self.stats.clone(); - let stop_flag = self.stop_flag.clone(); - let last_error = self.last_error.clone(); - - let handle = tokio::task::spawn_blocking(move || { - capture_loop(config, state, stats, stop_flag, last_error); - }); - - *self.capture_handle.lock().await = Some(handle); - - Ok(()) - } - - /// Stop capturing - pub async fn stop(&self) -> Result<()> { - info!("Stopping capture"); - self.stop_flag.store(true, Ordering::SeqCst); - - if let Some(handle) = self.capture_handle.lock().await.take() { - let _ = handle.await; - } - - let _ = self.state.send(CaptureState::Stopped); - Ok(()) - } - - /// Check if capturing - pub fn is_running(&self) -> bool { - self.state() == CaptureState::Running - } - - /// Get the latest frame (if any receivers would get it) - pub fn latest_frame(&self) -> Option { - // This is a bit tricky with broadcast - we'd need to track internally - // For now, callers should use subscribe() - None - } -} - -/// Main capture loop (runs in blocking thread) -fn capture_loop( - config: CaptureConfig, - state: Arc>, - stats: Arc>, - stop_flag: Arc, - error_holder: Arc>>, -) { - let result = run_capture(&config, &state, &stats, &stop_flag); - - match result { - Ok(_) => { - let _ = state.send(CaptureState::Stopped); - } - Err(AppError::VideoDeviceLost { device, reason }) => { - error!("Video device lost: {} - {}", device, reason); - // Store the error for recovery handling - *error_holder.write() = Some((device, reason)); - let _ = state.send(CaptureState::DeviceLost); - } - Err(e) => { - error!("Capture error: {}", e); - let _ = state.send(CaptureState::Error); - } - } -} - -fn run_capture( - config: &CaptureConfig, - state: &watch::Sender, - stats: &Arc>, - stop_flag: &AtomicBool, -) -> Result<()> { - // Retry logic for device busy errors - const MAX_RETRIES: u32 = 5; - const RETRY_DELAY_MS: u64 = 200; - - let mut last_error = None; - - for attempt in 0..MAX_RETRIES { - if stop_flag.load(Ordering::Relaxed) { - return Ok(()); - } - - let stream = match V4l2rCaptureStream::open( - &config.device_path, - config.resolution, - config.format, - config.fps, - config.buffer_count, - config.timeout, - ) { - Ok(stream) => stream, - Err(e) => { - let err_str = e.to_string(); - if err_str.contains("busy") || err_str.contains("resource") { - warn!( - "Device busy on attempt {}/{}, retrying in {}ms...", - attempt + 1, - MAX_RETRIES, - RETRY_DELAY_MS - ); - std::thread::sleep(Duration::from_millis(RETRY_DELAY_MS)); - last_error = Some(AppError::VideoError(format!( - "Failed to open device {:?}: {}", - config.device_path, e - ))); - continue; - } - return Err(AppError::VideoError(format!( - "Failed to open device {:?}: {}", - config.device_path, e - ))); - } - }; - - return run_capture_inner(config, state, stats, stop_flag, stream); - } - - // All retries exhausted - Err(last_error.unwrap_or_else(|| { - AppError::VideoError("Failed to open device after all retries".to_string()) - })) -} - -/// Inner capture function after device is successfully opened -fn run_capture_inner( - config: &CaptureConfig, - state: &watch::Sender, - stats: &Arc>, - stop_flag: &AtomicBool, - mut stream: V4l2rCaptureStream, -) -> Result<()> { - let resolution = stream.resolution(); - let pixel_format = stream.format(); - let stride = stream.stride(); - info!( - "Capture format: {}x{} {:?} stride={}", - resolution.width, resolution.height, pixel_format, stride - ); - - let _ = state.send(CaptureState::Running); - info!("Capture started"); - - // FPS calculation variables - let mut fps_frame_count = 0u64; - let mut fps_window_start = Instant::now(); - let fps_window_duration = Duration::from_secs(1); - let mut scratch = Vec::new(); - let capture_error_throttler = LogThrottler::with_secs(5); - let mut suppressed_capture_errors: HashMap = HashMap::new(); - - let classify_capture_error = |err: &std::io::Error| -> String { - let message = err.to_string(); - if message.contains("dqbuf failed") && message.contains("EINVAL") { - "capture_dqbuf_einval".to_string() - } else if message.contains("dqbuf failed") { - "capture_dqbuf".to_string() - } else { - format!("capture_{:?}", err.kind()) - } - }; - - // Main capture loop - while !stop_flag.load(Ordering::Relaxed) { - let meta = match stream.next_into(&mut scratch) { - Ok(meta) => meta, - Err(e) => { - if e.kind() == io::ErrorKind::TimedOut { - warn!("Capture timeout - no signal?"); - let _ = state.send(CaptureState::NoSignal); - - // Wait a bit before retrying - std::thread::sleep(Duration::from_millis(100)); - continue; - } - - // Check for device loss errors - let is_device_lost = match e.raw_os_error() { - Some(6) => true, // ENXIO - No such device or address - Some(19) => true, // ENODEV - No such device - Some(5) => true, // EIO - I/O error (device removed) - Some(32) => true, // EPIPE - Broken pipe - Some(108) => true, // ESHUTDOWN - Transport endpoint shutdown - _ => false, - }; - - if is_device_lost { - let device_path = config.device_path.display().to_string(); - error!("Video device lost: {} - {}", device_path, e); - return Err(AppError::VideoDeviceLost { - device: device_path, - reason: e.to_string(), - }); - } - - let key = classify_capture_error(&e); - if capture_error_throttler.should_log(&key) { - let suppressed = suppressed_capture_errors.remove(&key).unwrap_or(0); - if suppressed > 0 { - error!("Capture error: {} (suppressed {} repeats)", e, suppressed); - } else { - error!("Capture error: {}", e); - } - } else { - let counter = suppressed_capture_errors.entry(key).or_insert(0); - *counter = counter.saturating_add(1); - } - continue; - } - }; - - // Use actual bytes used, not buffer size - let frame_size = meta.bytes_used; - - // Validate frame - if frame_size < MIN_FRAME_SIZE { - debug!( - "Dropping small frame: {} bytes (bytesused={})", - frame_size, meta.bytes_used - ); - continue; - } - - // Update state if was no signal - if *state.borrow() == CaptureState::NoSignal { - let _ = state.send(CaptureState::Running); - } - - // Update FPS calculation - if let Ok(mut s) = stats.try_lock() { - fps_frame_count += 1; - let elapsed = fps_window_start.elapsed(); - - if elapsed >= fps_window_duration { - // Calculate FPS from the completed window - s.current_fps = (fps_frame_count as f32 / elapsed.as_secs_f32()).max(0.0); - // Reset for next window - fps_frame_count = 0; - fps_window_start = Instant::now(); - } else if elapsed.as_millis() > 100 && fps_frame_count > 0 { - // Provide partial estimate if we have at least 100ms of data - s.current_fps = (fps_frame_count as f32 / elapsed.as_secs_f32()).max(0.0); - } - } - - if *state.borrow() == CaptureState::NoSignal { - let _ = state.send(CaptureState::Running); - } - } - - info!("Capture stopped"); - Ok(()) -} - -/// Validate JPEG frame data -#[cfg(test)] -fn is_valid_jpeg(data: &[u8]) -> bool { - if data.len() < 125 { - return false; - } - - // Check start marker (0xFFD8) - let start_marker = ((data[0] as u16) << 8) | data[1] as u16; - if start_marker != 0xFFD8 { - return false; - } - - // Check end marker - let end = data.len(); - let end_marker = ((data[end - 2] as u16) << 8) | data[end - 1] as u16; - - // Valid end markers: 0xFFD9, 0xD900, 0x0000 (padded) - matches!(end_marker, 0xFFD9 | 0xD900 | 0x0000) -} - -/// Frame grabber for one-shot capture -pub struct FrameGrabber { - device_path: PathBuf, -} - -impl FrameGrabber { - /// Create a new frame grabber - pub fn new(device_path: impl AsRef) -> Self { - Self { - device_path: device_path.as_ref().to_path_buf(), - } - } - - /// Capture a single frame - pub async fn grab(&self, resolution: Resolution, format: PixelFormat) -> Result { - let device_path = self.device_path.clone(); - - tokio::task::spawn_blocking(move || grab_single_frame(&device_path, resolution, format)) - .await - .map_err(|e| AppError::VideoError(format!("Grab task failed: {}", e)))? - } -} - -fn grab_single_frame( - device_path: &Path, - resolution: Resolution, - format: PixelFormat, -) -> Result { - let mut stream = V4l2rCaptureStream::open( - device_path, - resolution, - format, - 0, - 2, - Duration::from_secs(DEFAULT_TIMEOUT), - )?; - let actual_resolution = stream.resolution(); - let actual_format = stream.format(); - let actual_stride = stream.stride(); - let mut scratch = Vec::new(); - - // Try to get a valid frame (skip first few which might be bad) - for attempt in 0..5 { - match stream.next_into(&mut scratch) { - Ok(meta) => { - if meta.bytes_used >= MIN_FRAME_SIZE { - return Ok(VideoFrame::new( - Bytes::copy_from_slice(&scratch[..meta.bytes_used]), - actual_resolution, - actual_format, - actual_stride, - 0, - )); - } - } - Err(e) if attempt == 4 => { - return Err(AppError::VideoError(format!("Failed to grab frame: {}", e))); - } - Err(_) => {} - } - } - - Err(AppError::VideoError( - "Failed to capture valid frame".to_string(), - )) -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_valid_jpeg() { - // Valid JPEG header and footer - let mut data = vec![0xFF, 0xD8]; // SOI - data.extend(vec![0u8; 200]); // Content - data.extend([0xFF, 0xD9]); // EOI - - assert!(is_valid_jpeg(&data)); - - // Invalid - too small - assert!(!is_valid_jpeg(&[0xFF, 0xD8, 0xFF, 0xD9])); - - // Invalid - wrong header - let mut bad = vec![0x00, 0x00]; - bad.extend(vec![0u8; 200]); - assert!(!is_valid_jpeg(&bad)); - } -} diff --git a/src/video/encoder/h264.rs b/src/video/encoder/h264.rs index d72ef49a..86627cba 100644 --- a/src/video/encoder/h264.rs +++ b/src/video/encoder/h264.rs @@ -244,9 +244,6 @@ pub struct H264Encoder { codec_name: String, /// Frame counter frame_count: u64, - /// YUV420P buffer for input (reserved for future use) - #[allow(dead_code)] - yuv_buffer: Vec, /// Required YUV buffer length from hwcodec yuv_length: i32, } @@ -326,7 +323,6 @@ impl H264Encoder { encoder_type, codec_name: codec_name.to_string(), frame_count: 0, - yuv_buffer: vec![0u8; yuv_length as usize], yuv_length, }) } diff --git a/src/video/h264_pipeline.rs b/src/video/h264_pipeline.rs deleted file mode 100644 index c20273cf..00000000 --- a/src/video/h264_pipeline.rs +++ /dev/null @@ -1,444 +0,0 @@ -//! H264 video encoding pipeline for WebRTC streaming -//! -//! This module provides a complete H264 encoding pipeline that connects: -//! 1. Video capture (YUYV/MJPEG from V4L2) -//! 2. Pixel conversion (YUYV → YUV420P) or JPEG decode -//! 3. H264 encoding (via hwcodec) -//! 4. RTP packetization and WebRTC track output - -use std::sync::Arc; -use std::time::{Duration, Instant}; -use tokio::sync::{broadcast, watch, Mutex}; -use tracing::{debug, error, info, warn}; - -use crate::error::{AppError, Result}; -use crate::video::convert::Nv12Converter; -use crate::video::encoder::h264::{H264Config, H264Encoder}; -use crate::video::format::{PixelFormat, Resolution}; -use crate::webrtc::rtp::{H264VideoTrack, H264VideoTrackConfig}; - -/// H264 pipeline configuration -#[derive(Debug, Clone)] -pub struct H264PipelineConfig { - /// Input resolution - pub resolution: Resolution, - /// Input pixel format (YUYV, NV12, etc.) - pub input_format: PixelFormat, - /// Target bitrate in kbps - pub bitrate_kbps: u32, - /// Target FPS - pub fps: u32, - /// GOP size (keyframe interval in frames) - pub gop_size: u32, - /// Track ID for WebRTC - pub track_id: String, - /// Stream ID for WebRTC - pub stream_id: String, -} - -impl Default for H264PipelineConfig { - fn default() -> Self { - Self { - resolution: Resolution::HD720, - input_format: PixelFormat::Yuyv, - bitrate_kbps: 8000, - fps: 30, - gop_size: 30, - track_id: "video0".to_string(), - stream_id: "one-kvm-stream".to_string(), - } - } -} - -/// H264 pipeline statistics -#[derive(Debug, Clone, Default)] -pub struct H264PipelineStats { - /// Current encoding FPS - pub current_fps: f32, -} - -/// H264 video encoding pipeline -pub struct H264Pipeline { - config: H264PipelineConfig, - /// H264 encoder instance - encoder: Arc>>, - /// NV12 converter (for BGR24/RGB24/YUYV → NV12) - nv12_converter: Arc>>, - /// WebRTC video track - video_track: Arc, - /// Pipeline statistics - stats: Arc>, - /// Running state - running: watch::Sender, -} - -impl H264Pipeline { - /// Create a new H264 pipeline - pub fn new(config: H264PipelineConfig) -> Result { - info!( - "Creating H264 pipeline: {}x{} @ {} kbps, {} fps", - config.resolution.width, config.resolution.height, config.bitrate_kbps, config.fps - ); - - // Determine encoder input format based on pipeline input - // NV12 is optimal for VAAPI, use it for all formats - // VAAPI encoders typically only support NV12 input - let encoder_input_format = crate::video::encoder::h264::H264InputFormat::Nv12; - - // Create H264 encoder with appropriate input format - let encoder_config = H264Config { - base: crate::video::encoder::traits::EncoderConfig::h264( - config.resolution, - config.bitrate_kbps, - ), - bitrate_kbps: config.bitrate_kbps, - gop_size: config.gop_size, - fps: config.fps, - input_format: encoder_input_format, - }; - - let encoder = H264Encoder::new(encoder_config)?; - info!( - "H264 encoder created: {} ({}) with {:?} input", - encoder.codec_name(), - encoder.encoder_type(), - encoder_input_format - ); - - // Create NV12 converter based on input format - // All formats are converted to NV12 for VAAPI encoder - let nv12_converter = match config.input_format { - // NV12 input - direct passthrough - PixelFormat::Nv12 => { - info!("NV12 input: direct passthrough to encoder"); - None - } - - // YUYV (4:2:2 packed) → NV12 - PixelFormat::Yuyv => { - info!("YUYV input: converting to NV12"); - Some(Nv12Converter::yuyv_to_nv12(config.resolution)) - } - - // RGB24 → NV12 - PixelFormat::Rgb24 => { - info!("RGB24 input: converting to NV12"); - Some(Nv12Converter::rgb24_to_nv12(config.resolution)) - } - - // BGR24 → NV12 - PixelFormat::Bgr24 => { - info!("BGR24 input: converting to NV12"); - Some(Nv12Converter::bgr24_to_nv12(config.resolution)) - } - - // MJPEG/JPEG input - not supported (requires libjpeg for decoding) - PixelFormat::Mjpeg | PixelFormat::Jpeg => { - return Err(AppError::VideoError( - "MJPEG input format not supported in this build".to_string(), - )); - } - - _ => { - return Err(AppError::VideoError(format!( - "Unsupported input format for H264 pipeline: {}", - config.input_format - ))); - } - }; - - // Create WebRTC video track - let track_config = H264VideoTrackConfig { - track_id: config.track_id.clone(), - stream_id: config.stream_id.clone(), - resolution: config.resolution, - bitrate_kbps: config.bitrate_kbps, - fps: config.fps, - profile_level_id: None, // Let browser negotiate the best profile - }; - let video_track = Arc::new(H264VideoTrack::new(track_config)); - - let (running_tx, _) = watch::channel(false); - - Ok(Self { - config, - encoder: Arc::new(Mutex::new(Some(encoder))), - nv12_converter: Arc::new(Mutex::new(nv12_converter)), - video_track, - stats: Arc::new(Mutex::new(H264PipelineStats::default())), - running: running_tx, - }) - } - - /// Get the WebRTC video track - pub fn video_track(&self) -> Arc { - self.video_track.clone() - } - - /// Get current statistics - pub async fn stats(&self) -> H264PipelineStats { - self.stats.lock().await.clone() - } - - /// Check if pipeline is running - pub fn is_running(&self) -> bool { - *self.running.borrow() - } - - /// Start the encoding pipeline - /// - /// This starts a background task that receives raw frames from the receiver, - /// encodes them to H264, and sends them to the WebRTC track. - pub async fn start(&self, mut frame_rx: broadcast::Receiver>) { - if *self.running.borrow() { - warn!("H264 pipeline already running"); - return; - } - - let _ = self.running.send(true); - info!( - "Starting H264 pipeline (input format: {})", - self.config.input_format - ); - - let encoder = self.encoder.lock().await.take(); - let nv12_converter = self.nv12_converter.lock().await.take(); - let video_track = self.video_track.clone(); - let stats = self.stats.clone(); - let config = self.config.clone(); - let mut running_rx = self.running.subscribe(); - - // Spawn encoding task - tokio::spawn(async move { - let mut encoder = match encoder { - Some(e) => e, - None => { - error!("No encoder available"); - return; - } - }; - - let mut nv12_converter = nv12_converter; - let mut frame_count: u64 = 0; - let mut last_fps_time = Instant::now(); - let mut fps_frame_count: u64 = 0; - - // Flag for one-time warnings - let mut size_mismatch_warned = false; - - loop { - tokio::select! { - biased; - - _ = running_rx.changed() => { - if !*running_rx.borrow() { - info!("H264 pipeline stopping"); - break; - } - } - - result = frame_rx.recv() => { - match result { - Ok(raw_frame) => { - let start = Instant::now(); - - // Validate frame size for uncompressed formats - if let Some(expected_size) = config.input_format.frame_size(config.resolution) { - if raw_frame.len() != expected_size && !size_mismatch_warned { - warn!( - "Frame size mismatch: got {} bytes, expected {} for {} {}x{}", - raw_frame.len(), - expected_size, - config.input_format, - config.resolution.width, - config.resolution.height - ); - size_mismatch_warned = true; - } - } - - // Convert to NV12 for VAAPI encoder - // BGR24/RGB24/YUYV -> NV12 (via NV12 converter) - // NV12 -> pass through - // - // Optimized: avoid unnecessary allocations and copies - frame_count += 1; - fps_frame_count += 1; - let pts_ms = (frame_count * 1000 / config.fps as u64) as i64; - - let encode_result = if let Some(ref mut conv) = nv12_converter { - // BGR24/RGB24/YUYV input - convert to NV12 - // Optimized: pass reference directly without copy - match conv.convert(&raw_frame) { - Ok(nv12_data) => encoder.encode_raw(nv12_data, pts_ms), - Err(e) => { - error!("NV12 conversion failed: {}", e); - continue; - } - } - } else { - // NV12 input - pass reference directly - encoder.encode_raw(&raw_frame, pts_ms) - }; - - match encode_result { - Ok(frames) => { - if !frames.is_empty() { - let frame = &frames[0]; - let is_keyframe = frame.key == 1; - - // Send to WebRTC track - let duration = Duration::from_millis( - 1000 / config.fps as u64 - ); - - if let Err(e) = video_track - .write_frame(&frame.data, duration, is_keyframe) - .await - { - error!("Failed to write frame to track: {}", e); - } else { - let _ = start; - } - } - } - Err(e) => { - error!("Encoding failed: {}", e); - } - } - - // Update FPS every second - if last_fps_time.elapsed() >= Duration::from_secs(1) { - let mut s = stats.lock().await; - s.current_fps = fps_frame_count as f32 - / last_fps_time.elapsed().as_secs_f32(); - fps_frame_count = 0; - last_fps_time = Instant::now(); - } - } - Err(broadcast::error::RecvError::Lagged(n)) => { - let _ = n; - } - Err(broadcast::error::RecvError::Closed) => { - info!("Frame channel closed, stopping H264 pipeline"); - break; - } - } - } - } - } - - info!("H264 pipeline task exited"); - }); - } - - /// Stop the encoding pipeline - pub fn stop(&self) { - if *self.running.borrow() { - let _ = self.running.send(false); - info!("Stopping H264 pipeline"); - } - } - - /// Request a keyframe (force IDR) - pub async fn request_keyframe(&self) { - // Note: hwcodec doesn't support on-demand keyframe requests - // The encoder will produce keyframes based on GOP size - debug!("Keyframe requested (will occur at next GOP boundary)"); - } - - /// Update bitrate dynamically - pub async fn set_bitrate(&self, bitrate_kbps: u32) -> Result<()> { - if let Some(ref mut encoder) = *self.encoder.lock().await { - encoder.set_bitrate(bitrate_kbps)?; - info!("H264 pipeline bitrate updated to {} kbps", bitrate_kbps); - } - Ok(()) - } -} - -/// Builder for H264 pipeline configuration -pub struct H264PipelineBuilder { - config: H264PipelineConfig, -} - -impl H264PipelineBuilder { - pub fn new() -> Self { - Self { - config: H264PipelineConfig::default(), - } - } - - pub fn resolution(mut self, resolution: Resolution) -> Self { - self.config.resolution = resolution; - self - } - - pub fn input_format(mut self, format: PixelFormat) -> Self { - self.config.input_format = format; - self - } - - pub fn bitrate_kbps(mut self, bitrate: u32) -> Self { - self.config.bitrate_kbps = bitrate; - self - } - - pub fn fps(mut self, fps: u32) -> Self { - self.config.fps = fps; - self - } - - pub fn gop_size(mut self, gop: u32) -> Self { - self.config.gop_size = gop; - self - } - - pub fn track_id(mut self, id: &str) -> Self { - self.config.track_id = id.to_string(); - self - } - - pub fn stream_id(mut self, id: &str) -> Self { - self.config.stream_id = id.to_string(); - self - } - - pub fn build(self) -> Result { - H264Pipeline::new(self.config) - } -} - -impl Default for H264PipelineBuilder { - fn default() -> Self { - Self::new() - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_pipeline_config_default() { - let config = H264PipelineConfig::default(); - assert_eq!(config.resolution, Resolution::HD720); - assert_eq!(config.bitrate_kbps, 8000); - assert_eq!(config.fps, 30); - assert_eq!(config.gop_size, 30); - } - - #[test] - fn test_pipeline_builder() { - let builder = H264PipelineBuilder::new() - .resolution(Resolution::HD1080) - .bitrate_kbps(4000) - .fps(60) - .input_format(PixelFormat::Yuyv); - - assert_eq!(builder.config.resolution, Resolution::HD1080); - assert_eq!(builder.config.bitrate_kbps, 4000); - assert_eq!(builder.config.fps, 60); - assert_eq!(builder.config.input_format, PixelFormat::Yuyv); - } -} diff --git a/src/video/mod.rs b/src/video/mod.rs index 1ea9600b..84b3c103 100644 --- a/src/video/mod.rs +++ b/src/video/mod.rs @@ -2,7 +2,6 @@ //! //! This module provides V4L2 video capture, encoding, and streaming functionality. -pub mod capture; pub mod codec_constraints; pub mod convert; pub mod decoder; @@ -10,25 +9,18 @@ pub mod device; pub mod encoder; pub mod format; pub mod frame; -pub mod h264_pipeline; pub mod shared_video_pipeline; pub mod stream_manager; pub mod streamer; pub mod v4l2r_capture; -pub mod video_session; -pub use capture::VideoCapturer; pub use convert::{PixelConverter, Yuv420pBuffer}; pub use device::{VideoDevice, VideoDeviceInfo}; pub use encoder::{H264Encoder, H264EncoderType, JpegEncoder}; pub use format::PixelFormat; pub use frame::VideoFrame; -pub use h264_pipeline::{H264Pipeline, H264PipelineBuilder, H264PipelineConfig}; pub use shared_video_pipeline::{ EncodedVideoFrame, SharedVideoPipeline, SharedVideoPipelineConfig, SharedVideoPipelineStats, }; pub use stream_manager::VideoStreamManager; pub use streamer::{Streamer, StreamerState}; -pub use video_session::{ - CodecInfo, VideoSessionInfo, VideoSessionManager, VideoSessionManagerConfig, VideoSessionState, -}; diff --git a/src/video/shared_video_pipeline.rs b/src/video/shared_video_pipeline.rs index 43f926cf..1c3f6f95 100644 --- a/src/video/shared_video_pipeline.rs +++ b/src/video/shared_video_pipeline.rs @@ -5,26 +5,30 @@ //! //! Architecture: //! ```text -//! VideoCapturer (MJPEG/YUYV/NV12) +//! V4L2 capture //! | -//! v (broadcast::Receiver) -//! SharedVideoPipeline (single encoder) +//! v +//! SharedVideoPipeline (capture + encode + broadcast) //! | -//! v (broadcast::Sender) +//! v //! ┌────┴────┬────────┬────────┐ //! v v v v //! Session1 Session2 Session3 ... //! ``` +mod encoder_state; + use bytes::Bytes; use parking_lot::RwLock as ParkingRwLock; use std::collections::HashMap; use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; -use tokio::sync::{broadcast, mpsc, watch, Mutex, RwLock}; +use tokio::sync::{mpsc, watch, Mutex, RwLock}; use tracing::{debug, error, info, trace, warn}; +use self::encoder_state::{build_encoder_state, EncoderThreadState}; + /// Grace period before auto-stopping pipeline when no subscribers (in seconds) const AUTO_STOP_GRACE_PERIOD_SECS: u64 = 3; /// Restart capture stream after this many consecutive timeouts. @@ -41,21 +45,12 @@ const ENCODE_ERROR_THROTTLE_SECS: u64 = 5; use crate::error::{AppError, Result}; use crate::utils::LogThrottler; -use crate::video::convert::{Nv12Converter, PixelConverter}; -use crate::video::decoder::MjpegTurboDecoder; -use crate::video::encoder::h264::{H264Config, H264Encoder, H264InputFormat}; -use crate::video::encoder::h265::{H265Config, H265Encoder, H265InputFormat}; -use crate::video::encoder::registry::{EncoderBackend, EncoderRegistry, VideoEncoderType}; -use crate::video::encoder::traits::EncoderConfig; -use crate::video::encoder::vp8::{VP8Config, VP8Encoder}; -use crate::video::encoder::vp9::{VP9Config, VP9Encoder}; +use crate::video::encoder::registry::{EncoderBackend, VideoEncoderType}; use crate::video::format::{PixelFormat, Resolution}; use crate::video::frame::{FrameBuffer, FrameBufferPool, VideoFrame}; use crate::video::v4l2r_capture::V4l2rCaptureStream; #[cfg(any(target_arch = "aarch64", target_arch = "arm"))] -use hwcodec::ffmpeg_hw::{ - last_error_message as ffmpeg_hw_last_error, HwMjpegH26xConfig, HwMjpegH26xPipeline, -}; +use hwcodec::ffmpeg_hw::last_error_message as ffmpeg_hw_last_error; /// Encoded video frame for distribution #[derive(Debug, Clone)] @@ -222,166 +217,6 @@ pub struct SharedVideoPipelineStats { pub current_fps: f32, } -struct EncoderThreadState { - encoder: Option>, - mjpeg_decoder: Option, - nv12_converter: Option, - yuv420p_converter: Option, - encoder_needs_yuv420p: bool, - #[cfg(any(target_arch = "aarch64", target_arch = "arm"))] - ffmpeg_hw_pipeline: Option, - #[cfg(any(target_arch = "aarch64", target_arch = "arm"))] - ffmpeg_hw_enabled: bool, - fps: u32, - codec: VideoEncoderType, - input_format: PixelFormat, -} - -/// Universal video encoder trait object -#[allow(dead_code)] -trait VideoEncoderTrait: Send { - fn encode_raw(&mut self, data: &[u8], pts_ms: i64) -> Result>; - fn set_bitrate(&mut self, bitrate_kbps: u32) -> Result<()>; - fn codec_name(&self) -> &str; - fn request_keyframe(&mut self); -} - -/// Encoded frame from encoder -#[allow(dead_code)] -struct EncodedFrame { - data: Vec, - pts: i64, - key: i32, -} - -/// H264 encoder wrapper -struct H264EncoderWrapper(H264Encoder); - -impl VideoEncoderTrait for H264EncoderWrapper { - fn encode_raw(&mut self, data: &[u8], pts_ms: i64) -> Result> { - let frames = self.0.encode_raw(data, pts_ms)?; - Ok(frames - .into_iter() - .map(|f| EncodedFrame { - data: f.data, - pts: f.pts, - key: f.key, - }) - .collect()) - } - - fn set_bitrate(&mut self, bitrate_kbps: u32) -> Result<()> { - self.0.set_bitrate(bitrate_kbps) - } - - fn codec_name(&self) -> &str { - self.0.codec_name() - } - - fn request_keyframe(&mut self) { - self.0.request_keyframe() - } -} - -/// H265 encoder wrapper -struct H265EncoderWrapper(H265Encoder); - -impl VideoEncoderTrait for H265EncoderWrapper { - fn encode_raw(&mut self, data: &[u8], pts_ms: i64) -> Result> { - let frames = self.0.encode_raw(data, pts_ms)?; - Ok(frames - .into_iter() - .map(|f| EncodedFrame { - data: f.data, - pts: f.pts, - key: f.key, - }) - .collect()) - } - - fn set_bitrate(&mut self, bitrate_kbps: u32) -> Result<()> { - self.0.set_bitrate(bitrate_kbps) - } - - fn codec_name(&self) -> &str { - self.0.codec_name() - } - - fn request_keyframe(&mut self) { - self.0.request_keyframe() - } -} - -/// VP8 encoder wrapper -struct VP8EncoderWrapper(VP8Encoder); - -impl VideoEncoderTrait for VP8EncoderWrapper { - fn encode_raw(&mut self, data: &[u8], pts_ms: i64) -> Result> { - let frames = self.0.encode_raw(data, pts_ms)?; - Ok(frames - .into_iter() - .map(|f| EncodedFrame { - data: f.data, - pts: f.pts, - key: f.key, - }) - .collect()) - } - - fn set_bitrate(&mut self, bitrate_kbps: u32) -> Result<()> { - self.0.set_bitrate(bitrate_kbps) - } - - fn codec_name(&self) -> &str { - self.0.codec_name() - } - - fn request_keyframe(&mut self) { - // VP8 encoder doesn't support request_keyframe yet - } -} - -/// VP9 encoder wrapper -struct VP9EncoderWrapper(VP9Encoder); - -impl VideoEncoderTrait for VP9EncoderWrapper { - fn encode_raw(&mut self, data: &[u8], pts_ms: i64) -> Result> { - let frames = self.0.encode_raw(data, pts_ms)?; - Ok(frames - .into_iter() - .map(|f| EncodedFrame { - data: f.data, - pts: f.pts, - key: f.key, - }) - .collect()) - } - - fn set_bitrate(&mut self, bitrate_kbps: u32) -> Result<()> { - self.0.set_bitrate(bitrate_kbps) - } - - fn codec_name(&self) -> &str { - self.0.codec_name() - } - - fn request_keyframe(&mut self) { - // VP9 encoder doesn't support request_keyframe yet - } -} - -enum MjpegDecoderKind { - Turbo(MjpegTurboDecoder), -} - -impl MjpegDecoderKind { - fn decode(&mut self, data: &[u8]) -> Result> { - match self { - MjpegDecoderKind::Turbo(decoder) => decoder.decode_to_rgb(data), - } - } -} - /// Universal shared video pipeline pub struct SharedVideoPipeline { config: RwLock, @@ -431,520 +266,6 @@ impl SharedVideoPipeline { Ok(pipeline) } - fn build_encoder_state(config: &SharedVideoPipelineConfig) -> Result { - let registry = EncoderRegistry::global(); - - // Helper to get codec name for specific backend - let get_codec_name = - |format: VideoEncoderType, backend: Option| -> Option { - match backend { - Some(b) => registry - .encoder_with_backend(format, b) - .map(|e| e.codec_name.clone()), - None => registry - .best_available_encoder(format) - .map(|e| e.codec_name.clone()), - } - }; - - let needs_mjpeg_decode = config.input_format.is_compressed(); - - // Check if RKMPP backend is available for direct input optimization - let is_rkmpp_available = registry - .encoder_with_backend(VideoEncoderType::H264, EncoderBackend::Rkmpp) - .is_some(); - let use_yuyv_direct = - is_rkmpp_available && !needs_mjpeg_decode && config.input_format == PixelFormat::Yuyv; - let use_rkmpp_direct = is_rkmpp_available - && !needs_mjpeg_decode - && matches!( - config.input_format, - PixelFormat::Yuyv - | PixelFormat::Yuv420 - | PixelFormat::Rgb24 - | PixelFormat::Bgr24 - | PixelFormat::Nv12 - | PixelFormat::Nv16 - | PixelFormat::Nv21 - | PixelFormat::Nv24 - ); - - if use_yuyv_direct { - info!( - "RKMPP backend detected with YUYV input, enabling YUYV direct input optimization" - ); - } else if use_rkmpp_direct { - info!( - "RKMPP backend detected with {} input, enabling direct input optimization", - config.input_format - ); - } - - let selected_codec_name = match config.output_codec { - VideoEncoderType::H264 => { - if use_rkmpp_direct { - // Force RKMPP backend for direct input - get_codec_name(VideoEncoderType::H264, Some(EncoderBackend::Rkmpp)).ok_or_else( - || { - AppError::VideoError( - "RKMPP backend not available for H.264".to_string(), - ) - }, - )? - } else if let Some(ref backend) = config.encoder_backend { - // Specific backend requested - get_codec_name(VideoEncoderType::H264, Some(*backend)).ok_or_else(|| { - AppError::VideoError(format!( - "Backend {:?} does not support H.264", - backend - )) - })? - } else { - get_codec_name(VideoEncoderType::H264, None).ok_or_else(|| { - AppError::VideoError("No H.264 encoder available".to_string()) - })? - } - } - VideoEncoderType::H265 => { - if use_rkmpp_direct { - get_codec_name(VideoEncoderType::H265, Some(EncoderBackend::Rkmpp)).ok_or_else( - || { - AppError::VideoError( - "RKMPP backend not available for H.265".to_string(), - ) - }, - )? - } else if let Some(ref backend) = config.encoder_backend { - get_codec_name(VideoEncoderType::H265, Some(*backend)).ok_or_else(|| { - AppError::VideoError(format!( - "Backend {:?} does not support H.265", - backend - )) - })? - } else { - get_codec_name(VideoEncoderType::H265, None).ok_or_else(|| { - AppError::VideoError("No H.265 encoder available".to_string()) - })? - } - } - VideoEncoderType::VP8 => { - if let Some(ref backend) = config.encoder_backend { - get_codec_name(VideoEncoderType::VP8, Some(*backend)).ok_or_else(|| { - AppError::VideoError(format!("Backend {:?} does not support VP8", backend)) - })? - } else { - get_codec_name(VideoEncoderType::VP8, None).ok_or_else(|| { - AppError::VideoError("No VP8 encoder available".to_string()) - })? - } - } - VideoEncoderType::VP9 => { - if let Some(ref backend) = config.encoder_backend { - get_codec_name(VideoEncoderType::VP9, Some(*backend)).ok_or_else(|| { - AppError::VideoError(format!("Backend {:?} does not support VP9", backend)) - })? - } else { - get_codec_name(VideoEncoderType::VP9, None).ok_or_else(|| { - AppError::VideoError("No VP9 encoder available".to_string()) - })? - } - } - }; - - #[cfg(any(target_arch = "aarch64", target_arch = "arm"))] - let is_rkmpp_encoder = selected_codec_name.contains("rkmpp"); - #[cfg(any(target_arch = "aarch64", target_arch = "arm"))] - if needs_mjpeg_decode - && is_rkmpp_encoder - && matches!( - config.output_codec, - VideoEncoderType::H264 | VideoEncoderType::H265 - ) - { - info!( - "Initializing FFmpeg HW MJPEG->{} pipeline (no fallback)", - config.output_codec - ); - let hw_config = HwMjpegH26xConfig { - decoder: "mjpeg_rkmpp".to_string(), - encoder: selected_codec_name.clone(), - width: config.resolution.width as i32, - height: config.resolution.height as i32, - fps: config.fps as i32, - bitrate_kbps: config.bitrate_kbps() as i32, - gop: config.gop_size() as i32, - thread_count: 1, - }; - let pipeline = HwMjpegH26xPipeline::new(hw_config).map_err(|e| { - let detail = if e.is_empty() { - ffmpeg_hw_last_error() - } else { - e - }; - AppError::VideoError(format!( - "FFmpeg HW MJPEG->{} init failed: {}", - config.output_codec, detail - )) - })?; - info!("Using FFmpeg HW MJPEG->{} pipeline", config.output_codec); - return Ok(EncoderThreadState { - encoder: None, - mjpeg_decoder: None, - nv12_converter: None, - yuv420p_converter: None, - encoder_needs_yuv420p: false, - #[cfg(any(target_arch = "aarch64", target_arch = "arm"))] - ffmpeg_hw_pipeline: Some(pipeline), - #[cfg(any(target_arch = "aarch64", target_arch = "arm"))] - ffmpeg_hw_enabled: true, - fps: config.fps, - codec: config.output_codec, - input_format: config.input_format, - }); - } - - let pipeline_input_format = if needs_mjpeg_decode { - info!( - "MJPEG input detected, using TurboJPEG decoder ({} -> RGB24)", - config.input_format - ); - let decoder = MjpegTurboDecoder::new(config.resolution)?; - (Some(MjpegDecoderKind::Turbo(decoder)), PixelFormat::Rgb24) - } else { - (None, config.input_format) - }; - let (mjpeg_decoder, pipeline_input_format) = pipeline_input_format; - - // Create encoder based on codec type - let encoder: Box = match config.output_codec { - VideoEncoderType::H264 => { - let codec_name = selected_codec_name.clone(); - - let is_rkmpp = codec_name.contains("rkmpp"); - let direct_input_format = if is_rkmpp { - match pipeline_input_format { - PixelFormat::Yuyv => Some(H264InputFormat::Yuyv422), - PixelFormat::Yuv420 => Some(H264InputFormat::Yuv420p), - PixelFormat::Rgb24 => Some(H264InputFormat::Rgb24), - PixelFormat::Bgr24 => Some(H264InputFormat::Bgr24), - PixelFormat::Nv12 => Some(H264InputFormat::Nv12), - PixelFormat::Nv16 => Some(H264InputFormat::Nv16), - PixelFormat::Nv21 => Some(H264InputFormat::Nv21), - PixelFormat::Nv24 => Some(H264InputFormat::Nv24), - _ => None, - } - } else if codec_name.contains("libx264") { - match pipeline_input_format { - PixelFormat::Nv12 => Some(H264InputFormat::Nv12), - PixelFormat::Nv16 => Some(H264InputFormat::Nv16), - PixelFormat::Nv21 => Some(H264InputFormat::Nv21), - PixelFormat::Yuv420 => Some(H264InputFormat::Yuv420p), - _ => None, - } - } else { - None - }; - - // Choose input format: prefer direct input when supported - let h264_input_format = if let Some(fmt) = direct_input_format { - fmt - } else if codec_name.contains("libx264") { - H264InputFormat::Yuv420p - } else { - H264InputFormat::Nv12 - }; - - let encoder_config = H264Config { - base: EncoderConfig::h264(config.resolution, config.bitrate_kbps()), - bitrate_kbps: config.bitrate_kbps(), - gop_size: config.gop_size(), - fps: config.fps, - input_format: h264_input_format, - }; - - if use_rkmpp_direct { - info!( - "Creating H264 encoder with RKMPP backend for {} direct input (codec: {})", - config.input_format, codec_name - ); - } else if let Some(ref backend) = config.encoder_backend { - info!( - "Creating H264 encoder with backend {:?} (codec: {})", - backend, codec_name - ); - } - - let encoder = H264Encoder::with_codec(encoder_config, &codec_name)?; - - info!("Created H264 encoder: {}", encoder.codec_name()); - Box::new(H264EncoderWrapper(encoder)) - } - VideoEncoderType::H265 => { - let codec_name = selected_codec_name.clone(); - - let is_rkmpp = codec_name.contains("rkmpp"); - let direct_input_format = if is_rkmpp { - match pipeline_input_format { - PixelFormat::Yuyv => Some(H265InputFormat::Yuyv422), - PixelFormat::Yuv420 => Some(H265InputFormat::Yuv420p), - PixelFormat::Rgb24 => Some(H265InputFormat::Rgb24), - PixelFormat::Bgr24 => Some(H265InputFormat::Bgr24), - PixelFormat::Nv12 => Some(H265InputFormat::Nv12), - PixelFormat::Nv16 => Some(H265InputFormat::Nv16), - PixelFormat::Nv21 => Some(H265InputFormat::Nv21), - PixelFormat::Nv24 => Some(H265InputFormat::Nv24), - _ => None, - } - } else if codec_name.contains("libx265") { - match pipeline_input_format { - PixelFormat::Yuv420 => Some(H265InputFormat::Yuv420p), - _ => None, - } - } else { - None - }; - - let h265_input_format = if let Some(fmt) = direct_input_format { - fmt - } else if codec_name.contains("libx265") { - H265InputFormat::Yuv420p - } else { - H265InputFormat::Nv12 - }; - - let encoder_config = H265Config { - base: EncoderConfig { - resolution: config.resolution, - input_format: config.input_format, - quality: config.bitrate_kbps(), - fps: config.fps, - gop_size: config.gop_size(), - }, - bitrate_kbps: config.bitrate_kbps(), - gop_size: config.gop_size(), - fps: config.fps, - input_format: h265_input_format, - }; - - if use_rkmpp_direct { - info!( - "Creating H265 encoder with RKMPP backend for {} direct input (codec: {})", - config.input_format, codec_name - ); - } else if let Some(ref backend) = config.encoder_backend { - info!( - "Creating H265 encoder with backend {:?} (codec: {})", - backend, codec_name - ); - } - - let encoder = H265Encoder::with_codec(encoder_config, &codec_name)?; - - info!("Created H265 encoder: {}", encoder.codec_name()); - Box::new(H265EncoderWrapper(encoder)) - } - VideoEncoderType::VP8 => { - let encoder_config = - VP8Config::low_latency(config.resolution, config.bitrate_kbps()); - let codec_name = selected_codec_name.clone(); - if let Some(ref backend) = config.encoder_backend { - info!( - "Creating VP8 encoder with backend {:?} (codec: {})", - backend, codec_name - ); - } - let encoder = VP8Encoder::with_codec(encoder_config, &codec_name)?; - - info!("Created VP8 encoder: {}", encoder.codec_name()); - Box::new(VP8EncoderWrapper(encoder)) - } - VideoEncoderType::VP9 => { - let encoder_config = - VP9Config::low_latency(config.resolution, config.bitrate_kbps()); - let codec_name = selected_codec_name.clone(); - if let Some(ref backend) = config.encoder_backend { - info!( - "Creating VP9 encoder with backend {:?} (codec: {})", - backend, codec_name - ); - } - let encoder = VP9Encoder::with_codec(encoder_config, &codec_name)?; - - info!("Created VP9 encoder: {}", encoder.codec_name()); - Box::new(VP9EncoderWrapper(encoder)) - } - }; - - // Determine if encoder can take direct input without conversion - let codec_name = encoder.codec_name(); - let use_direct_input = if codec_name.contains("rkmpp") { - matches!( - pipeline_input_format, - PixelFormat::Yuyv - | PixelFormat::Yuv420 - | PixelFormat::Rgb24 - | PixelFormat::Bgr24 - | PixelFormat::Nv12 - | PixelFormat::Nv16 - | PixelFormat::Nv21 - | PixelFormat::Nv24 - ) - } else if codec_name.contains("libx264") { - matches!( - pipeline_input_format, - PixelFormat::Nv12 | PixelFormat::Nv16 | PixelFormat::Nv21 | PixelFormat::Yuv420 - ) - } else { - false - }; - - // Determine if encoder needs YUV420P (software encoders) or NV12 (hardware encoders) - let needs_yuv420p = if codec_name.contains("libx264") { - !matches!( - pipeline_input_format, - PixelFormat::Nv12 | PixelFormat::Nv16 | PixelFormat::Nv21 | PixelFormat::Yuv420 - ) - } else { - codec_name.contains("libvpx") || codec_name.contains("libx265") - }; - - info!( - "Encoder {} needs {} format", - codec_name, - if use_direct_input { - "direct" - } else if needs_yuv420p { - "YUV420P" - } else { - "NV12" - } - ); - - // Create converter or decoder based on input format and encoder needs - info!( - "Initializing input format handler for: {} -> {}", - pipeline_input_format, - if use_direct_input { - "direct" - } else if needs_yuv420p { - "YUV420P" - } else { - "NV12" - } - ); - - let (nv12_converter, yuv420p_converter) = if use_yuyv_direct { - // RKMPP with YUYV direct input - skip all conversion - info!("YUYV direct input enabled for RKMPP, skipping format conversion"); - (None, None) - } else if use_direct_input { - info!("Direct input enabled, skipping format conversion"); - (None, None) - } else if needs_yuv420p { - // Software encoder needs YUV420P - match pipeline_input_format { - PixelFormat::Yuv420 => { - info!("Using direct YUV420P input (no conversion)"); - (None, None) - } - PixelFormat::Yuyv => { - info!("Using YUYV->YUV420P converter"); - ( - None, - Some(PixelConverter::yuyv_to_yuv420p(config.resolution)), - ) - } - PixelFormat::Nv12 => { - info!("Using NV12->YUV420P converter"); - ( - None, - Some(PixelConverter::nv12_to_yuv420p(config.resolution)), - ) - } - PixelFormat::Nv21 => { - info!("Using NV21->YUV420P converter"); - ( - None, - Some(PixelConverter::nv21_to_yuv420p(config.resolution)), - ) - } - PixelFormat::Rgb24 => { - info!("Using RGB24->YUV420P converter"); - ( - None, - Some(PixelConverter::rgb24_to_yuv420p(config.resolution)), - ) - } - PixelFormat::Bgr24 => { - info!("Using BGR24->YUV420P converter"); - ( - None, - Some(PixelConverter::bgr24_to_yuv420p(config.resolution)), - ) - } - _ => { - return Err(AppError::VideoError(format!( - "Unsupported input format for software encoding: {}", - pipeline_input_format - ))); - } - } - } else { - // Hardware encoder needs NV12 - match pipeline_input_format { - PixelFormat::Nv12 => { - info!("Using direct NV12 input (no conversion)"); - (None, None) - } - PixelFormat::Yuyv => { - info!("Using YUYV->NV12 converter"); - (Some(Nv12Converter::yuyv_to_nv12(config.resolution)), None) - } - PixelFormat::Nv21 => { - info!("Using NV21->NV12 converter"); - (Some(Nv12Converter::nv21_to_nv12(config.resolution)), None) - } - PixelFormat::Nv16 => { - info!("Using NV16->NV12 converter"); - (Some(Nv12Converter::nv16_to_nv12(config.resolution)), None) - } - PixelFormat::Yuv420 => { - info!("Using YUV420P->NV12 converter"); - (Some(Nv12Converter::yuv420_to_nv12(config.resolution)), None) - } - PixelFormat::Rgb24 => { - info!("Using RGB24->NV12 converter"); - (Some(Nv12Converter::rgb24_to_nv12(config.resolution)), None) - } - PixelFormat::Bgr24 => { - info!("Using BGR24->NV12 converter"); - (Some(Nv12Converter::bgr24_to_nv12(config.resolution)), None) - } - _ => { - return Err(AppError::VideoError(format!( - "Unsupported input format for hardware encoding: {}", - pipeline_input_format - ))); - } - } - }; - - Ok(EncoderThreadState { - encoder: Some(encoder), - mjpeg_decoder, - nv12_converter, - yuv420p_converter, - encoder_needs_yuv420p: needs_yuv420p, - #[cfg(any(target_arch = "aarch64", target_arch = "arm"))] - ffmpeg_hw_pipeline: None, - #[cfg(any(target_arch = "aarch64", target_arch = "arm"))] - ffmpeg_hw_enabled: false, - fps: config.fps, - codec: config.output_codec, - input_format: config.input_format, - }) - } - /// Subscribe to encoded frames pub fn subscribe(&self) -> mpsc::Receiver> { let (tx, rx) = mpsc::channel(4); @@ -961,18 +282,6 @@ impl SharedVideoPipeline { .count() } - /// Report that a receiver has lagged behind - /// - /// Call this when a broadcast receiver detects it has fallen behind - /// (e.g., when RecvError::Lagged is received). - /// - /// # Arguments - /// - /// * `_frames_lagged` - Number of frames the receiver has lagged (currently unused) - pub async fn report_lag(&self, _frames_lagged: u64) { - // No-op: backpressure control removed as it was not effective - } - /// Request encoder to produce a keyframe on next encode /// /// This is useful when a new client connects and needs an immediate @@ -1068,177 +377,10 @@ impl SharedVideoPipeline { } } - /// Get current codec - pub async fn current_codec(&self) -> VideoEncoderType { - self.config.read().await.output_codec - } - - /// Switch codec (requires restart) - pub async fn switch_codec(&self, codec: VideoEncoderType) -> Result<()> { - let was_running = self.is_running(); - - if was_running { - self.stop(); - tokio::time::sleep(Duration::from_millis(100)).await; - } - - { - let mut config = self.config.write().await; - config.output_codec = codec; - } - - self.clear_cmd_tx(); - - info!("Switched to {} codec", codec); - Ok(()) - } - - /// Start the pipeline - pub async fn start( - self: &Arc, - mut frame_rx: broadcast::Receiver, - ) -> Result<()> { - if *self.running_rx.borrow() { - warn!("Pipeline already running"); - return Ok(()); - } - - let config = self.config.read().await.clone(); - let mut encoder_state = Self::build_encoder_state(&config)?; - let _ = self.running.send(true); - self.running_flag.store(true, Ordering::Release); - let gop_size = config.gop_size(); - info!( - "Starting {} pipeline (GOP={})", - config.output_codec, gop_size - ); - - let pipeline = self.clone(); - let (cmd_tx, mut cmd_rx) = tokio::sync::mpsc::unbounded_channel(); - { - let mut guard = self.cmd_tx.write(); - *guard = Some(cmd_tx); - } - - tokio::spawn(async move { - let mut frame_count: u64 = 0; - let mut last_fps_time = Instant::now(); - let mut fps_frame_count: u64 = 0; - let mut running_rx = pipeline.running_rx.clone(); - let encode_error_throttler = LogThrottler::with_secs(ENCODE_ERROR_THROTTLE_SECS); - let mut suppressed_encode_errors: HashMap = HashMap::new(); - - // Track when we last had subscribers for auto-stop feature - let mut no_subscribers_since: Option = None; - let grace_period = Duration::from_secs(AUTO_STOP_GRACE_PERIOD_SECS); - - loop { - tokio::select! { - biased; - - _ = running_rx.changed() => { - if !*running_rx.borrow() { - break; - } - } - - result = frame_rx.recv() => { - match result { - Ok(video_frame) => { - while let Ok(cmd) = cmd_rx.try_recv() { - if let Err(e) = pipeline.apply_cmd(&mut encoder_state, cmd) { - error!("Failed to apply pipeline command: {}", e); - } - } - let subscriber_count = pipeline.subscriber_count(); - - if subscriber_count == 0 { - // Track when we started having no subscribers - if no_subscribers_since.is_none() { - no_subscribers_since = Some(Instant::now()); - trace!("No subscribers, starting grace period timer"); - } - - // Check if grace period has elapsed - if let Some(since) = no_subscribers_since { - if since.elapsed() >= grace_period { - info!( - "No subscribers for {}s, auto-stopping video pipeline", - grace_period.as_secs() - ); - // Signal stop and break out of loop - let _ = pipeline.running.send(false); - pipeline - .running_flag - .store(false, Ordering::Release); - break; - } - } - - // Skip encoding but continue loop (within grace period) - continue; - } else { - // Reset the no-subscriber timer when we have subscribers again - if no_subscribers_since.is_some() { - trace!("Subscriber connected, resetting grace period timer"); - no_subscribers_since = None; - } - } - - match pipeline.encode_frame_sync(&mut encoder_state, &video_frame, frame_count) { - Ok(Some(encoded_frame)) => { - let encoded_arc = Arc::new(encoded_frame); - pipeline.broadcast_encoded(encoded_arc).await; - - frame_count += 1; - fps_frame_count += 1; - } - Ok(None) => {} - Err(e) => { - log_encoding_error( - &encode_error_throttler, - &mut suppressed_encode_errors, - &e, - ); - } - } - - // Update FPS every second (reduces lock contention) - let fps_elapsed = last_fps_time.elapsed(); - if fps_elapsed >= Duration::from_secs(1) { - let current_fps = - fps_frame_count as f32 / fps_elapsed.as_secs_f32(); - fps_frame_count = 0; - last_fps_time = Instant::now(); - - // Single lock acquisition for FPS - let mut s = pipeline.stats.lock().await; - s.current_fps = current_fps; - } - } - Err(broadcast::error::RecvError::Lagged(n)) => { - let _ = n; - } - Err(broadcast::error::RecvError::Closed) => { - break; - } - } - } - } - } - - pipeline.clear_cmd_tx(); - pipeline.running_flag.store(false, Ordering::Release); - info!("Video pipeline stopped"); - }); - - Ok(()) - } - /// Start the pipeline by owning capture + encode in a single loop. /// - /// This avoids the raw-frame broadcast path and keeps capture and encode - /// in the same thread for lower overhead. + /// Capture and encode stay tightly coupled to avoid maintaining separate + /// raw-frame fan-out and direct-device execution paths. pub async fn start_with_device( self: &Arc, device_path: std::path::PathBuf, @@ -1251,7 +393,7 @@ impl SharedVideoPipeline { } let config = self.config.read().await.clone(); - let mut encoder_state = Self::build_encoder_state(&config)?; + let mut encoder_state = build_encoder_state(&config)?; let _ = self.running.send(true); self.running_flag.store(true, Ordering::Release); diff --git a/src/video/shared_video_pipeline/encoder_state.rs b/src/video/shared_video_pipeline/encoder_state.rs new file mode 100644 index 00000000..978847ee --- /dev/null +++ b/src/video/shared_video_pipeline/encoder_state.rs @@ -0,0 +1,639 @@ +use crate::error::{AppError, Result}; +use crate::video::convert::{Nv12Converter, PixelConverter}; +use crate::video::decoder::MjpegTurboDecoder; +use crate::video::encoder::h264::{H264Config, H264Encoder, H264InputFormat}; +use crate::video::encoder::h265::{H265Config, H265Encoder, H265InputFormat}; +use crate::video::encoder::registry::{EncoderBackend, EncoderRegistry, VideoEncoderType}; +use crate::video::encoder::traits::EncoderConfig; +use crate::video::encoder::vp8::{VP8Config, VP8Encoder}; +use crate::video::encoder::vp9::{VP9Config, VP9Encoder}; +use crate::video::format::{PixelFormat, Resolution}; +#[cfg(any(target_arch = "aarch64", target_arch = "arm"))] +use hwcodec::ffmpeg_hw::{ + last_error_message as ffmpeg_hw_last_error, HwMjpegH26xConfig, HwMjpegH26xPipeline, +}; +use tracing::info; + +use super::SharedVideoPipelineConfig; + +pub(super) struct EncoderThreadState { + pub(super) encoder: Option>, + pub(super) mjpeg_decoder: Option, + pub(super) nv12_converter: Option, + pub(super) yuv420p_converter: Option, + pub(super) encoder_needs_yuv420p: bool, + #[cfg(any(target_arch = "aarch64", target_arch = "arm"))] + pub(super) ffmpeg_hw_pipeline: Option, + #[cfg(any(target_arch = "aarch64", target_arch = "arm"))] + pub(super) ffmpeg_hw_enabled: bool, + pub(super) fps: u32, + pub(super) codec: VideoEncoderType, + pub(super) input_format: PixelFormat, +} + +pub(super) trait VideoEncoderTrait: Send { + fn encode_raw(&mut self, data: &[u8], pts_ms: i64) -> Result>; + fn set_bitrate(&mut self, bitrate_kbps: u32) -> Result<()>; + fn codec_name(&self) -> &str; + fn request_keyframe(&mut self); +} + +pub(super) struct EncodedFrame { + pub(super) data: Vec, + pub(super) key: i32, +} + +struct H264EncoderWrapper(H264Encoder); + +impl VideoEncoderTrait for H264EncoderWrapper { + fn encode_raw(&mut self, data: &[u8], pts_ms: i64) -> Result> { + let frames = self.0.encode_raw(data, pts_ms)?; + Ok(frames + .into_iter() + .map(|f| EncodedFrame { + data: f.data, + key: f.key, + }) + .collect()) + } + + fn set_bitrate(&mut self, bitrate_kbps: u32) -> Result<()> { + self.0.set_bitrate(bitrate_kbps) + } + + fn codec_name(&self) -> &str { + self.0.codec_name() + } + + fn request_keyframe(&mut self) { + self.0.request_keyframe() + } +} + +struct H265EncoderWrapper(H265Encoder); + +impl VideoEncoderTrait for H265EncoderWrapper { + fn encode_raw(&mut self, data: &[u8], pts_ms: i64) -> Result> { + let frames = self.0.encode_raw(data, pts_ms)?; + Ok(frames + .into_iter() + .map(|f| EncodedFrame { + data: f.data, + key: f.key, + }) + .collect()) + } + + fn set_bitrate(&mut self, bitrate_kbps: u32) -> Result<()> { + self.0.set_bitrate(bitrate_kbps) + } + + fn codec_name(&self) -> &str { + self.0.codec_name() + } + + fn request_keyframe(&mut self) { + self.0.request_keyframe() + } +} + +struct VP8EncoderWrapper(VP8Encoder); + +impl VideoEncoderTrait for VP8EncoderWrapper { + fn encode_raw(&mut self, data: &[u8], pts_ms: i64) -> Result> { + let frames = self.0.encode_raw(data, pts_ms)?; + Ok(frames + .into_iter() + .map(|f| EncodedFrame { + data: f.data, + key: f.key, + }) + .collect()) + } + + fn set_bitrate(&mut self, bitrate_kbps: u32) -> Result<()> { + self.0.set_bitrate(bitrate_kbps) + } + + fn codec_name(&self) -> &str { + self.0.codec_name() + } + + fn request_keyframe(&mut self) {} +} + +struct VP9EncoderWrapper(VP9Encoder); + +impl VideoEncoderTrait for VP9EncoderWrapper { + fn encode_raw(&mut self, data: &[u8], pts_ms: i64) -> Result> { + let frames = self.0.encode_raw(data, pts_ms)?; + Ok(frames + .into_iter() + .map(|f| EncodedFrame { + data: f.data, + key: f.key, + }) + .collect()) + } + + fn set_bitrate(&mut self, bitrate_kbps: u32) -> Result<()> { + self.0.set_bitrate(bitrate_kbps) + } + + fn codec_name(&self) -> &str { + self.0.codec_name() + } + + fn request_keyframe(&mut self) {} +} + +pub(super) enum MjpegDecoderKind { + Turbo(MjpegTurboDecoder), +} + +impl MjpegDecoderKind { + pub(super) fn decode(&mut self, data: &[u8]) -> Result> { + match self { + MjpegDecoderKind::Turbo(decoder) => decoder.decode_to_rgb(data), + } + } +} + +pub(super) fn build_encoder_state(config: &SharedVideoPipelineConfig) -> Result { + let registry = EncoderRegistry::global(); + + let get_codec_name = + |format: VideoEncoderType, backend: Option| -> Option { + match backend { + Some(b) => registry + .encoder_with_backend(format, b) + .map(|e| e.codec_name.clone()), + None => registry + .best_available_encoder(format) + .map(|e| e.codec_name.clone()), + } + }; + + let needs_mjpeg_decode = config.input_format.is_compressed(); + let is_rkmpp_available = registry + .encoder_with_backend(VideoEncoderType::H264, EncoderBackend::Rkmpp) + .is_some(); + let use_yuyv_direct = + is_rkmpp_available && !needs_mjpeg_decode && config.input_format == PixelFormat::Yuyv; + let use_rkmpp_direct = is_rkmpp_available + && !needs_mjpeg_decode + && matches!( + config.input_format, + PixelFormat::Yuyv + | PixelFormat::Yuv420 + | PixelFormat::Rgb24 + | PixelFormat::Bgr24 + | PixelFormat::Nv12 + | PixelFormat::Nv16 + | PixelFormat::Nv21 + | PixelFormat::Nv24 + ); + + if use_yuyv_direct { + info!("RKMPP backend detected with YUYV input, enabling YUYV direct input optimization"); + } else if use_rkmpp_direct { + info!( + "RKMPP backend detected with {} input, enabling direct input optimization", + config.input_format + ); + } + + let selected_codec_name = match config.output_codec { + VideoEncoderType::H264 => { + if use_rkmpp_direct { + get_codec_name(VideoEncoderType::H264, Some(EncoderBackend::Rkmpp)).ok_or_else( + || AppError::VideoError("RKMPP backend not available for H.264".to_string()), + )? + } else if let Some(ref backend) = config.encoder_backend { + get_codec_name(VideoEncoderType::H264, Some(*backend)).ok_or_else(|| { + AppError::VideoError(format!("Backend {:?} does not support H.264", backend)) + })? + } else { + get_codec_name(VideoEncoderType::H264, None) + .ok_or_else(|| AppError::VideoError("No H.264 encoder available".to_string()))? + } + } + VideoEncoderType::H265 => { + if use_rkmpp_direct { + get_codec_name(VideoEncoderType::H265, Some(EncoderBackend::Rkmpp)).ok_or_else( + || AppError::VideoError("RKMPP backend not available for H.265".to_string()), + )? + } else if let Some(ref backend) = config.encoder_backend { + get_codec_name(VideoEncoderType::H265, Some(*backend)).ok_or_else(|| { + AppError::VideoError(format!("Backend {:?} does not support H.265", backend)) + })? + } else { + get_codec_name(VideoEncoderType::H265, None) + .ok_or_else(|| AppError::VideoError("No H.265 encoder available".to_string()))? + } + } + VideoEncoderType::VP8 => { + if let Some(ref backend) = config.encoder_backend { + get_codec_name(VideoEncoderType::VP8, Some(*backend)).ok_or_else(|| { + AppError::VideoError(format!("Backend {:?} does not support VP8", backend)) + })? + } else { + get_codec_name(VideoEncoderType::VP8, None) + .ok_or_else(|| AppError::VideoError("No VP8 encoder available".to_string()))? + } + } + VideoEncoderType::VP9 => { + if let Some(ref backend) = config.encoder_backend { + get_codec_name(VideoEncoderType::VP9, Some(*backend)).ok_or_else(|| { + AppError::VideoError(format!("Backend {:?} does not support VP9", backend)) + })? + } else { + get_codec_name(VideoEncoderType::VP9, None) + .ok_or_else(|| AppError::VideoError("No VP9 encoder available".to_string()))? + } + } + }; + + #[cfg(any(target_arch = "aarch64", target_arch = "arm"))] + let is_rkmpp_encoder = selected_codec_name.contains("rkmpp"); + #[cfg(any(target_arch = "aarch64", target_arch = "arm"))] + if needs_mjpeg_decode + && is_rkmpp_encoder + && matches!( + config.output_codec, + VideoEncoderType::H264 | VideoEncoderType::H265 + ) + { + info!( + "Initializing FFmpeg HW MJPEG->{} pipeline (no fallback)", + config.output_codec + ); + let pipeline = HwMjpegH26xPipeline::new(HwMjpegH26xConfig { + decoder: "mjpeg_rkmpp".to_string(), + encoder: selected_codec_name.clone(), + width: config.resolution.width as i32, + height: config.resolution.height as i32, + fps: config.fps as i32, + bitrate_kbps: config.bitrate_kbps() as i32, + gop: config.gop_size() as i32, + thread_count: 1, + }) + .map_err(|e| { + let detail = if e.is_empty() { + ffmpeg_hw_last_error() + } else { + e + }; + AppError::VideoError(format!( + "FFmpeg HW MJPEG->{} init failed: {}", + config.output_codec, detail + )) + })?; + info!("Using FFmpeg HW MJPEG->{} pipeline", config.output_codec); + return Ok(EncoderThreadState { + encoder: None, + mjpeg_decoder: None, + nv12_converter: None, + yuv420p_converter: None, + encoder_needs_yuv420p: false, + #[cfg(any(target_arch = "aarch64", target_arch = "arm"))] + ffmpeg_hw_pipeline: Some(pipeline), + #[cfg(any(target_arch = "aarch64", target_arch = "arm"))] + ffmpeg_hw_enabled: true, + fps: config.fps, + codec: config.output_codec, + input_format: config.input_format, + }); + } + + let (mjpeg_decoder, pipeline_input_format) = if needs_mjpeg_decode { + info!( + "MJPEG input detected, using TurboJPEG decoder ({} -> RGB24)", + config.input_format + ); + ( + Some(MjpegDecoderKind::Turbo(MjpegTurboDecoder::new( + config.resolution, + )?)), + PixelFormat::Rgb24, + ) + } else { + (None, config.input_format) + }; + + let encoder: Box = match config.output_codec { + VideoEncoderType::H264 => { + let codec_name = selected_codec_name.clone(); + let direct_input_format = h264_direct_input_format(&codec_name, pipeline_input_format); + let input_format = direct_input_format.unwrap_or_else(|| { + if codec_name.contains("libx264") { + H264InputFormat::Yuv420p + } else { + H264InputFormat::Nv12 + } + }); + + if use_rkmpp_direct { + info!( + "Creating H264 encoder with RKMPP backend for {} direct input (codec: {})", + config.input_format, codec_name + ); + } else if let Some(ref backend) = config.encoder_backend { + info!( + "Creating H264 encoder with backend {:?} (codec: {})", + backend, codec_name + ); + } + + let encoder = H264Encoder::with_codec( + H264Config { + base: EncoderConfig::h264(config.resolution, config.bitrate_kbps()), + bitrate_kbps: config.bitrate_kbps(), + gop_size: config.gop_size(), + fps: config.fps, + input_format, + }, + &codec_name, + )?; + info!("Created H264 encoder: {}", encoder.codec_name()); + Box::new(H264EncoderWrapper(encoder)) + } + VideoEncoderType::H265 => { + let codec_name = selected_codec_name.clone(); + let direct_input_format = h265_direct_input_format(&codec_name, pipeline_input_format); + let input_format = direct_input_format.unwrap_or_else(|| { + if codec_name.contains("libx265") { + H265InputFormat::Yuv420p + } else { + H265InputFormat::Nv12 + } + }); + + if use_rkmpp_direct { + info!( + "Creating H265 encoder with RKMPP backend for {} direct input (codec: {})", + config.input_format, codec_name + ); + } else if let Some(ref backend) = config.encoder_backend { + info!( + "Creating H265 encoder with backend {:?} (codec: {})", + backend, codec_name + ); + } + + let encoder = H265Encoder::with_codec( + H265Config { + base: EncoderConfig { + resolution: config.resolution, + input_format: config.input_format, + quality: config.bitrate_kbps(), + fps: config.fps, + gop_size: config.gop_size(), + }, + bitrate_kbps: config.bitrate_kbps(), + gop_size: config.gop_size(), + fps: config.fps, + input_format, + }, + &codec_name, + )?; + info!("Created H265 encoder: {}", encoder.codec_name()); + Box::new(H265EncoderWrapper(encoder)) + } + VideoEncoderType::VP8 => { + let codec_name = selected_codec_name.clone(); + if let Some(ref backend) = config.encoder_backend { + info!( + "Creating VP8 encoder with backend {:?} (codec: {})", + backend, codec_name + ); + } + let encoder = + VP8Encoder::with_codec(VP8Config::low_latency(config.resolution, config.bitrate_kbps()), &codec_name)?; + info!("Created VP8 encoder: {}", encoder.codec_name()); + Box::new(VP8EncoderWrapper(encoder)) + } + VideoEncoderType::VP9 => { + let codec_name = selected_codec_name.clone(); + if let Some(ref backend) = config.encoder_backend { + info!( + "Creating VP9 encoder with backend {:?} (codec: {})", + backend, codec_name + ); + } + let encoder = + VP9Encoder::with_codec(VP9Config::low_latency(config.resolution, config.bitrate_kbps()), &codec_name)?; + info!("Created VP9 encoder: {}", encoder.codec_name()); + Box::new(VP9EncoderWrapper(encoder)) + } + }; + + let codec_name = encoder.codec_name(); + let use_direct_input = if codec_name.contains("rkmpp") { + matches!( + pipeline_input_format, + PixelFormat::Yuyv + | PixelFormat::Yuv420 + | PixelFormat::Rgb24 + | PixelFormat::Bgr24 + | PixelFormat::Nv12 + | PixelFormat::Nv16 + | PixelFormat::Nv21 + | PixelFormat::Nv24 + ) + } else if codec_name.contains("libx264") { + matches!( + pipeline_input_format, + PixelFormat::Nv12 | PixelFormat::Nv16 | PixelFormat::Nv21 | PixelFormat::Yuv420 + ) + } else { + false + }; + let needs_yuv420p = if codec_name.contains("libx264") { + !matches!( + pipeline_input_format, + PixelFormat::Nv12 | PixelFormat::Nv16 | PixelFormat::Nv21 | PixelFormat::Yuv420 + ) + } else { + codec_name.contains("libvpx") || codec_name.contains("libx265") + }; + + info!( + "Encoder {} needs {} format", + codec_name, + if use_direct_input { + "direct" + } else if needs_yuv420p { + "YUV420P" + } else { + "NV12" + } + ); + info!( + "Initializing input format handler for: {} -> {}", + pipeline_input_format, + if use_direct_input { + "direct" + } else if needs_yuv420p { + "YUV420P" + } else { + "NV12" + } + ); + + let (nv12_converter, yuv420p_converter) = converters_for_pipeline( + config.resolution, + pipeline_input_format, + use_yuyv_direct, + use_direct_input, + needs_yuv420p, + )?; + + Ok(EncoderThreadState { + encoder: Some(encoder), + mjpeg_decoder, + nv12_converter, + yuv420p_converter, + encoder_needs_yuv420p: needs_yuv420p, + #[cfg(any(target_arch = "aarch64", target_arch = "arm"))] + ffmpeg_hw_pipeline: None, + #[cfg(any(target_arch = "aarch64", target_arch = "arm"))] + ffmpeg_hw_enabled: false, + fps: config.fps, + codec: config.output_codec, + input_format: config.input_format, + }) +} + +fn h264_direct_input_format(codec_name: &str, input_format: PixelFormat) -> Option { + if codec_name.contains("rkmpp") { + match input_format { + PixelFormat::Yuyv => Some(H264InputFormat::Yuyv422), + PixelFormat::Yuv420 => Some(H264InputFormat::Yuv420p), + PixelFormat::Rgb24 => Some(H264InputFormat::Rgb24), + PixelFormat::Bgr24 => Some(H264InputFormat::Bgr24), + PixelFormat::Nv12 => Some(H264InputFormat::Nv12), + PixelFormat::Nv16 => Some(H264InputFormat::Nv16), + PixelFormat::Nv21 => Some(H264InputFormat::Nv21), + PixelFormat::Nv24 => Some(H264InputFormat::Nv24), + _ => None, + } + } else if codec_name.contains("libx264") { + match input_format { + PixelFormat::Nv12 => Some(H264InputFormat::Nv12), + PixelFormat::Nv16 => Some(H264InputFormat::Nv16), + PixelFormat::Nv21 => Some(H264InputFormat::Nv21), + PixelFormat::Yuv420 => Some(H264InputFormat::Yuv420p), + _ => None, + } + } else { + None + } +} + +fn h265_direct_input_format(codec_name: &str, input_format: PixelFormat) -> Option { + if codec_name.contains("rkmpp") { + match input_format { + PixelFormat::Yuyv => Some(H265InputFormat::Yuyv422), + PixelFormat::Yuv420 => Some(H265InputFormat::Yuv420p), + PixelFormat::Rgb24 => Some(H265InputFormat::Rgb24), + PixelFormat::Bgr24 => Some(H265InputFormat::Bgr24), + PixelFormat::Nv12 => Some(H265InputFormat::Nv12), + PixelFormat::Nv16 => Some(H265InputFormat::Nv16), + PixelFormat::Nv21 => Some(H265InputFormat::Nv21), + PixelFormat::Nv24 => Some(H265InputFormat::Nv24), + _ => None, + } + } else if codec_name.contains("libx265") { + match input_format { + PixelFormat::Yuv420 => Some(H265InputFormat::Yuv420p), + _ => None, + } + } else { + None + } +} + +fn converters_for_pipeline( + resolution: Resolution, + input_format: PixelFormat, + use_yuyv_direct: bool, + use_direct_input: bool, + needs_yuv420p: bool, +) -> Result<(Option, Option)> { + if use_yuyv_direct { + info!("YUYV direct input enabled for RKMPP, skipping format conversion"); + return Ok((None, None)); + } + if use_direct_input { + info!("Direct input enabled, skipping format conversion"); + return Ok((None, None)); + } + if needs_yuv420p { + return match input_format { + PixelFormat::Yuv420 => { + info!("Using direct YUV420P input (no conversion)"); + Ok((None, None)) + } + PixelFormat::Yuyv => { + info!("Using YUYV->YUV420P converter"); + Ok((None, Some(PixelConverter::yuyv_to_yuv420p(resolution)))) + } + PixelFormat::Nv12 => { + info!("Using NV12->YUV420P converter"); + Ok((None, Some(PixelConverter::nv12_to_yuv420p(resolution)))) + } + PixelFormat::Nv21 => { + info!("Using NV21->YUV420P converter"); + Ok((None, Some(PixelConverter::nv21_to_yuv420p(resolution)))) + } + PixelFormat::Rgb24 => { + info!("Using RGB24->YUV420P converter"); + Ok((None, Some(PixelConverter::rgb24_to_yuv420p(resolution)))) + } + PixelFormat::Bgr24 => { + info!("Using BGR24->YUV420P converter"); + Ok((None, Some(PixelConverter::bgr24_to_yuv420p(resolution)))) + } + _ => Err(AppError::VideoError(format!( + "Unsupported input format for software encoding: {}", + input_format + ))), + }; + } + + match input_format { + PixelFormat::Nv12 => { + info!("Using direct NV12 input (no conversion)"); + Ok((None, None)) + } + PixelFormat::Yuyv => { + info!("Using YUYV->NV12 converter"); + Ok((Some(Nv12Converter::yuyv_to_nv12(resolution)), None)) + } + PixelFormat::Nv21 => { + info!("Using NV21->NV12 converter"); + Ok((Some(Nv12Converter::nv21_to_nv12(resolution)), None)) + } + PixelFormat::Nv16 => { + info!("Using NV16->NV12 converter"); + Ok((Some(Nv12Converter::nv16_to_nv12(resolution)), None)) + } + PixelFormat::Yuv420 => { + info!("Using YUV420P->NV12 converter"); + Ok((Some(Nv12Converter::yuv420_to_nv12(resolution)), None)) + } + PixelFormat::Rgb24 => { + info!("Using RGB24->NV12 converter"); + Ok((Some(Nv12Converter::rgb24_to_nv12(resolution)), None)) + } + PixelFormat::Bgr24 => { + info!("Using BGR24->NV12 converter"); + Ok((Some(Nv12Converter::bgr24_to_nv12(resolution)), None)) + } + _ => Err(AppError::VideoError(format!( + "Unsupported input format for hardware encoding: {}", + input_format + ))), + } +} diff --git a/src/video/stream_manager.rs b/src/video/stream_manager.rs index 91231074..c49ce1ae 100644 --- a/src/video/stream_manager.rs +++ b/src/video/stream_manager.rs @@ -12,7 +12,6 @@ //! │ //! ├── MJPEG Mode //! │ └── Streamer ──► MjpegStreamHandler -//! │ (Future: MjpegStreamer with WsAudio/WsHid) //! │ //! └── WebRTC Mode //! └── WebRtcStreamer ──► H264SessionManager @@ -211,21 +210,7 @@ impl VideoStreamManager { } } - // Configure WebRTC capture source after initialization - let (device_path, resolution, format, fps, jpeg_quality) = - self.streamer.current_capture_config().await; - info!( - "WebRTC capture config after init: {}x{} {:?} @ {}fps", - resolution.width, resolution.height, format, fps - ); - self.webrtc_streamer - .update_video_config(resolution, format, fps) - .await; - if let Some(device_path) = device_path { - self.webrtc_streamer - .set_capture_device(device_path, jpeg_quality) - .await; - } + self.sync_webrtc_capture_source("after init").await; Ok(()) } @@ -351,11 +336,17 @@ impl VideoStreamManager { } } + self.sync_webrtc_capture_source("for WebRTC ensure").await; + + Ok(()) + } + + async fn sync_webrtc_capture_source(&self, reason: &str) { let (device_path, resolution, format, fps, jpeg_quality) = self.streamer.current_capture_config().await; info!( - "Configuring WebRTC capture: {}x{} {:?} @ {}fps", - resolution.width, resolution.height, format, fps + "Syncing WebRTC capture source {}: {}x{} {:?} @ {}fps", + reason, resolution.width, resolution.height, format, fps ); self.webrtc_streamer .update_video_config(resolution, format, fps) @@ -364,9 +355,9 @@ impl VideoStreamManager { self.webrtc_streamer .set_capture_device(device_path, jpeg_quality) .await; + } else { + warn!("No capture device configured while syncing WebRTC capture source"); } - - Ok(()) } /// Internal implementation of mode switching (called with lock held) @@ -471,22 +462,7 @@ impl VideoStreamManager { } } - let (device_path, resolution, format, fps, jpeg_quality) = - self.streamer.current_capture_config().await; - info!( - "Configuring WebRTC capture pipeline: {}x{} {:?} @ {}fps", - resolution.width, resolution.height, format, fps - ); - self.webrtc_streamer - .update_video_config(resolution, format, fps) - .await; - if let Some(device_path) = device_path { - self.webrtc_streamer - .set_capture_device(device_path, jpeg_quality) - .await; - } else { - warn!("No capture device configured for WebRTC"); - } + self.sync_webrtc_capture_source("for WebRTC mode").await; let codec = self.webrtc_streamer.current_video_codec().await; let is_hardware = self.webrtc_streamer.is_hardware_encoding().await; @@ -603,19 +579,7 @@ impl VideoStreamManager { self.streamer.init_auto().await?; } - // Synchronize WebRTC config with current capture config - let (device_path, resolution, format, fps, jpeg_quality) = - self.streamer.current_capture_config().await; - self.webrtc_streamer - .update_video_config(resolution, format, fps) - .await; - if let Some(device_path) = device_path { - self.webrtc_streamer - .set_capture_device(device_path, jpeg_quality) - .await; - } else { - warn!("No capture device configured for WebRTC"); - } + self.sync_webrtc_capture_source("before start").await; } } @@ -760,24 +724,10 @@ impl VideoStreamManager { } // 2. Synchronize WebRTC config with capture config - let (device_path, resolution, format, fps, jpeg_quality) = - self.streamer.current_capture_config().await; - tracing::info!( - "Connecting encoded frame subscription: {}x{} {:?} @ {}fps", - resolution.width, - resolution.height, - format, - fps - ); - self.webrtc_streamer - .update_video_config(resolution, format, fps) + let (device_path, _, _, _, _) = self.streamer.current_capture_config().await; + self.sync_webrtc_capture_source("for encoded frame subscription") .await; - if let Some(device_path) = device_path { - self.webrtc_streamer - .set_capture_device(device_path, jpeg_quality) - .await; - } else { - tracing::warn!("No capture device configured for encoded frames"); + if device_path.is_none() { return None; } diff --git a/src/video/video_session.rs b/src/video/video_session.rs deleted file mode 100644 index 39512215..00000000 --- a/src/video/video_session.rs +++ /dev/null @@ -1,590 +0,0 @@ -//! Video session management with multi-codec support -//! -//! This module provides session management for video streaming with: -//! - Multi-codec support (H264, H265, VP8, VP9) -//! - Session lifecycle management -//! - Dynamic codec switching -//! - Statistics and monitoring - -use std::collections::HashMap; -use std::sync::Arc; -use std::time::Instant; -use tokio::sync::{broadcast, RwLock}; -use tracing::{debug, info, warn}; - -use super::encoder::registry::{EncoderBackend, EncoderRegistry, VideoEncoderType}; -use super::encoder::BitratePreset; -use super::format::Resolution; -use super::frame::VideoFrame; -use super::shared_video_pipeline::{ - EncodedVideoFrame, SharedVideoPipeline, SharedVideoPipelineConfig, SharedVideoPipelineStats, -}; -use crate::error::{AppError, Result}; - -/// Maximum concurrent video sessions -const MAX_VIDEO_SESSIONS: usize = 8; - -/// Video session state -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum VideoSessionState { - /// Session created but not started - Created, - /// Session is active and streaming - Active, - /// Session is paused - Paused, - /// Session is closing - Closing, - /// Session is closed - Closed, -} - -impl std::fmt::Display for VideoSessionState { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - VideoSessionState::Created => write!(f, "Created"), - VideoSessionState::Active => write!(f, "Active"), - VideoSessionState::Paused => write!(f, "Paused"), - VideoSessionState::Closing => write!(f, "Closing"), - VideoSessionState::Closed => write!(f, "Closed"), - } - } -} - -/// Video session information -#[derive(Debug, Clone)] -pub struct VideoSessionInfo { - /// Session ID - pub session_id: String, - /// Current codec - pub codec: VideoEncoderType, - /// Session state - pub state: VideoSessionState, - /// Creation time - pub created_at: Instant, - /// Last activity time - pub last_activity: Instant, - /// Frames received - pub frames_received: u64, - /// Bytes received - pub bytes_received: u64, -} - -/// Individual video session -struct VideoSession { - /// Session ID - session_id: String, - /// Codec for this session - codec: VideoEncoderType, - /// Session state - state: VideoSessionState, - /// Creation time - created_at: Instant, - /// Last activity time - last_activity: Instant, - /// Frame receiver - frame_rx: Option>>, - /// Stats - frames_received: u64, - bytes_received: u64, -} - -impl VideoSession { - fn new(session_id: String, codec: VideoEncoderType) -> Self { - let now = Instant::now(); - Self { - session_id, - codec, - state: VideoSessionState::Created, - created_at: now, - last_activity: now, - frame_rx: None, - frames_received: 0, - bytes_received: 0, - } - } - - fn info(&self) -> VideoSessionInfo { - VideoSessionInfo { - session_id: self.session_id.clone(), - codec: self.codec, - state: self.state, - created_at: self.created_at, - last_activity: self.last_activity, - frames_received: self.frames_received, - bytes_received: self.bytes_received, - } - } -} - -/// Video session manager configuration -#[derive(Debug, Clone)] -pub struct VideoSessionManagerConfig { - /// Default codec - pub default_codec: VideoEncoderType, - /// Default resolution - pub resolution: Resolution, - /// Bitrate preset - pub bitrate_preset: BitratePreset, - /// Default FPS - pub fps: u32, - /// Session timeout (seconds) - pub session_timeout_secs: u64, - /// Encoder backend (None = auto select best available) - pub encoder_backend: Option, -} - -impl Default for VideoSessionManagerConfig { - fn default() -> Self { - Self { - default_codec: VideoEncoderType::H264, - resolution: Resolution::HD720, - bitrate_preset: BitratePreset::Balanced, - fps: 30, - session_timeout_secs: 300, - encoder_backend: None, - } - } -} - -/// Video session manager -/// -/// Manages video encoding sessions with multi-codec support. -/// A single encoder is shared across all sessions with the same codec. -pub struct VideoSessionManager { - /// Configuration - config: VideoSessionManagerConfig, - /// Active sessions - sessions: RwLock>, - /// Current pipeline (shared across sessions with same codec) - pipeline: RwLock>>, - /// Current codec (active pipeline codec) - current_codec: RwLock>, - /// Video frame source - frame_source: RwLock>>, -} - -impl VideoSessionManager { - /// Create a new video session manager - pub fn new(config: VideoSessionManagerConfig) -> Self { - info!( - "Creating video session manager with default codec: {}", - config.default_codec - ); - - Self { - config, - sessions: RwLock::new(HashMap::new()), - pipeline: RwLock::new(None), - current_codec: RwLock::new(None), - frame_source: RwLock::new(None), - } - } - - /// Create with default configuration - pub fn with_defaults() -> Self { - Self::new(VideoSessionManagerConfig::default()) - } - - /// Set the video frame source - pub async fn set_frame_source(&self, rx: broadcast::Receiver) { - *self.frame_source.write().await = Some(rx); - } - - /// Get available codecs based on encoder availability - pub fn available_codecs(&self) -> Vec { - EncoderRegistry::global().selectable_formats() - } - - /// Check if a codec is available - pub fn is_codec_available(&self, codec: VideoEncoderType) -> bool { - EncoderRegistry::global().is_codec_available(codec) - } - - /// Create a new video session - pub async fn create_session(&self, codec: Option) -> Result { - let sessions = self.sessions.read().await; - if sessions.len() >= MAX_VIDEO_SESSIONS { - return Err(AppError::VideoError(format!( - "Maximum video sessions ({}) reached", - MAX_VIDEO_SESSIONS - ))); - } - drop(sessions); - - // Use specified codec or default - let codec = codec.unwrap_or(self.config.default_codec); - - // Verify codec is available - if !self.is_codec_available(codec) { - return Err(AppError::VideoError(format!( - "Codec {} is not available on this system", - codec - ))); - } - - // Generate session ID - let session_id = uuid::Uuid::new_v4().to_string(); - - // Create session - let session = VideoSession::new(session_id.clone(), codec); - - // Store session - let mut sessions = self.sessions.write().await; - sessions.insert(session_id.clone(), session); - - info!("Video session created: {} (codec: {})", session_id, codec); - - Ok(session_id) - } - - /// Start a video session (subscribe to encoded frames) - pub async fn start_session( - &self, - session_id: &str, - ) -> Result>> { - // Ensure pipeline is running with correct codec - self.ensure_pipeline_for_session(session_id).await?; - - let mut sessions = self.sessions.write().await; - let session = sessions - .get_mut(session_id) - .ok_or_else(|| AppError::NotFound(format!("Session not found: {}", session_id)))?; - - // Get pipeline and subscribe - let pipeline = self.pipeline.read().await; - let pipeline = pipeline - .as_ref() - .ok_or_else(|| AppError::VideoError("Pipeline not initialized".to_string()))?; - - let rx = pipeline.subscribe(); - session.frame_rx = Some(pipeline.subscribe()); - session.state = VideoSessionState::Active; - session.last_activity = Instant::now(); - - info!("Video session started: {}", session_id); - Ok(rx) - } - - /// Ensure pipeline is running with correct codec for session - async fn ensure_pipeline_for_session(&self, session_id: &str) -> Result<()> { - let sessions = self.sessions.read().await; - let session = sessions - .get(session_id) - .ok_or_else(|| AppError::NotFound(format!("Session not found: {}", session_id)))?; - let required_codec = session.codec; - drop(sessions); - - let current_codec = *self.current_codec.read().await; - - // Check if we need to create or switch pipeline - if current_codec != Some(required_codec) { - self.switch_pipeline_codec(required_codec).await?; - } - - // Ensure pipeline is started - let pipeline = self.pipeline.read().await; - if let Some(ref pipe) = *pipeline { - if !pipe.is_running() { - // Need frame source to start - let frame_rx = { - let source = self.frame_source.read().await; - source.as_ref().map(|rx| rx.resubscribe()) - }; - - if let Some(rx) = frame_rx { - drop(pipeline); - let pipeline = self.pipeline.read().await; - if let Some(ref pipe) = *pipeline { - pipe.start(rx).await?; - } - } - } - } - - Ok(()) - } - - /// Switch pipeline to different codec - async fn switch_pipeline_codec(&self, codec: VideoEncoderType) -> Result<()> { - info!("Switching pipeline to codec: {}", codec); - - // Stop existing pipeline - { - let pipeline = self.pipeline.read().await; - if let Some(ref pipe) = *pipeline { - pipe.stop(); - } - } - - // Create new pipeline config - let pipeline_config = SharedVideoPipelineConfig { - resolution: self.config.resolution, - input_format: crate::video::format::PixelFormat::Mjpeg, // Common input - output_codec: codec, - bitrate_preset: self.config.bitrate_preset, - fps: self.config.fps, - encoder_backend: self.config.encoder_backend, - }; - - // Create new pipeline - let new_pipeline = SharedVideoPipeline::new(pipeline_config)?; - - // Update state - *self.pipeline.write().await = Some(new_pipeline); - *self.current_codec.write().await = Some(codec); - - info!("Pipeline switched to codec: {}", codec); - Ok(()) - } - - /// Get session info - pub async fn get_session(&self, session_id: &str) -> Option { - let sessions = self.sessions.read().await; - sessions.get(session_id).map(|s| s.info()) - } - - /// List all sessions - pub async fn list_sessions(&self) -> Vec { - let sessions = self.sessions.read().await; - sessions.values().map(|s| s.info()).collect() - } - - /// Pause a session - pub async fn pause_session(&self, session_id: &str) -> Result<()> { - let mut sessions = self.sessions.write().await; - let session = sessions - .get_mut(session_id) - .ok_or_else(|| AppError::NotFound(format!("Session not found: {}", session_id)))?; - - session.state = VideoSessionState::Paused; - session.last_activity = Instant::now(); - - debug!("Video session paused: {}", session_id); - Ok(()) - } - - /// Resume a session - pub async fn resume_session(&self, session_id: &str) -> Result<()> { - let mut sessions = self.sessions.write().await; - let session = sessions - .get_mut(session_id) - .ok_or_else(|| AppError::NotFound(format!("Session not found: {}", session_id)))?; - - session.state = VideoSessionState::Active; - session.last_activity = Instant::now(); - - debug!("Video session resumed: {}", session_id); - Ok(()) - } - - /// Close a session - pub async fn close_session(&self, session_id: &str) -> Result<()> { - let mut sessions = self.sessions.write().await; - if let Some(mut session) = sessions.remove(session_id) { - session.state = VideoSessionState::Closed; - session.frame_rx = None; - info!("Video session closed: {}", session_id); - } - - // If no more sessions, consider stopping pipeline - if sessions.is_empty() { - drop(sessions); - self.maybe_stop_pipeline().await; - } - - Ok(()) - } - - /// Stop pipeline if no active sessions - async fn maybe_stop_pipeline(&self) { - let sessions = self.sessions.read().await; - let has_active = sessions - .values() - .any(|s| s.state == VideoSessionState::Active); - drop(sessions); - - if !has_active { - let pipeline = self.pipeline.read().await; - if let Some(ref pipe) = *pipeline { - pipe.stop(); - debug!("Pipeline stopped - no active sessions"); - } - } - } - - /// Cleanup stale/timed out sessions - pub async fn cleanup_stale_sessions(&self) { - let timeout = std::time::Duration::from_secs(self.config.session_timeout_secs); - let now = Instant::now(); - - let stale_ids: Vec = { - let sessions = self.sessions.read().await; - sessions - .iter() - .filter(|(_, s)| { - (s.state == VideoSessionState::Paused || s.state == VideoSessionState::Created) - && now.duration_since(s.last_activity) > timeout - }) - .map(|(id, _)| id.clone()) - .collect() - }; - - if !stale_ids.is_empty() { - let mut sessions = self.sessions.write().await; - for id in stale_ids { - info!("Removing stale video session: {}", id); - sessions.remove(&id); - } - } - } - - /// Get session count - pub async fn session_count(&self) -> usize { - self.sessions.read().await.len() - } - - /// Get active session count - pub async fn active_session_count(&self) -> usize { - self.sessions - .read() - .await - .values() - .filter(|s| s.state == VideoSessionState::Active) - .count() - } - - /// Get pipeline statistics - pub async fn pipeline_stats(&self) -> Option { - let pipeline = self.pipeline.read().await; - if let Some(ref pipe) = *pipeline { - Some(pipe.stats().await) - } else { - None - } - } - - /// Get current active codec - pub async fn current_codec(&self) -> Option { - *self.current_codec.read().await - } - - /// Set bitrate for current pipeline - pub async fn set_bitrate(&self, bitrate_kbps: u32) -> Result<()> { - let pipeline = self.pipeline.read().await; - if let Some(ref pipe) = *pipeline { - pipe.set_bitrate(bitrate_kbps).await?; - } - Ok(()) - } - - /// Request keyframe for all sessions - pub async fn request_keyframe(&self) { - // This would be implemented if encoders support forced keyframes - warn!("Keyframe request not yet implemented"); - } - - /// Change codec for a session (requires restart) - pub async fn change_session_codec( - &self, - session_id: &str, - new_codec: VideoEncoderType, - ) -> Result<()> { - if !self.is_codec_available(new_codec) { - return Err(AppError::VideoError(format!( - "Codec {} is not available", - new_codec - ))); - } - - let mut sessions = self.sessions.write().await; - let session = sessions - .get_mut(session_id) - .ok_or_else(|| AppError::NotFound(format!("Session not found: {}", session_id)))?; - - let old_codec = session.codec; - session.codec = new_codec; - session.state = VideoSessionState::Created; // Require restart - session.frame_rx = None; - session.last_activity = Instant::now(); - - info!( - "Session {} codec changed: {} -> {}", - session_id, old_codec, new_codec - ); - - Ok(()) - } - - /// Get codec info - pub fn get_codec_info(&self, codec: VideoEncoderType) -> Option { - let registry = EncoderRegistry::global(); - let encoder = registry.best_available_encoder(codec)?; - - Some(CodecInfo { - codec_type: codec, - codec_name: encoder.codec_name.clone(), - backend: encoder.backend.to_string(), - is_hardware: encoder.is_hardware, - }) - } - - /// List all available codecs with their info - pub fn list_codec_info(&self) -> Vec { - self.available_codecs() - .iter() - .filter_map(|c| self.get_codec_info(*c)) - .collect() - } -} - -/// Codec information -#[derive(Debug, Clone)] -pub struct CodecInfo { - /// Codec type - pub codec_type: VideoEncoderType, - /// FFmpeg codec name - pub codec_name: String, - /// Backend (VAAPI, NVENC, etc.) - pub backend: String, - /// Whether this is hardware accelerated - pub is_hardware: bool, -} - -impl Default for VideoSessionManager { - fn default() -> Self { - Self::with_defaults() - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_session_state_display() { - assert_eq!(VideoSessionState::Active.to_string(), "Active"); - assert_eq!(VideoSessionState::Closed.to_string(), "Closed"); - } - - #[test] - fn test_available_codecs() { - let manager = VideoSessionManager::with_defaults(); - let codecs = manager.available_codecs(); - println!("Available codecs: {:?}", codecs); - // H264 should always be available (software fallback) - assert!(codecs.contains(&VideoEncoderType::H264)); - } - - #[test] - fn test_codec_info() { - let manager = VideoSessionManager::with_defaults(); - let info = manager.get_codec_info(VideoEncoderType::H264); - if let Some(info) = info { - println!( - "H264: {} ({}, hardware={})", - info.codec_name, info.backend, info.is_hardware - ); - } - } -} diff --git a/src/webrtc/h265_payloader.rs b/src/webrtc/h265_payloader.rs index 5263f0eb..8a841256 100644 --- a/src/webrtc/h265_payloader.rs +++ b/src/webrtc/h265_payloader.rs @@ -37,12 +37,6 @@ const H265_NAL_SPS: u8 = 33; const H265_NAL_PPS: u8 = 34; const H265_NAL_AUD: u8 = 35; const H265_NAL_FILLER: u8 = 38; -#[allow(dead_code)] -const H265_NAL_SEI_PREFIX: u8 = 39; // PREFIX_SEI_NUT -#[allow(dead_code)] -const H265_NAL_SEI_SUFFIX: u8 = 40; // SUFFIX_SEI_NUT -#[allow(dead_code)] -const H265_NAL_AP: u8 = 48; // Aggregation Packet const H265_NAL_FU: u8 = 49; // Fragmentation Unit /// H.265 NAL header size @@ -51,11 +45,6 @@ const H265_NAL_HEADER_SIZE: usize = 2; /// FU header size (1 byte after NAL header) const H265_FU_HEADER_SIZE: usize = 1; -/// Fixed PayloadHdr for FU packets: Type=49, LayerID=0, TID=1 -/// This matches the rtp crate's FRAG_PAYLOAD_HDR -#[allow(dead_code)] -const FU_PAYLOAD_HDR: [u8; 2] = [0x62, 0x01]; - /// Fixed PayloadHdr for AP packets: Type=48, LayerID=0, TID=1 /// This matches the rtp crate's AGGR_PAYLOAD_HDR const AP_PAYLOAD_HDR: [u8; 2] = [0x60, 0x01]; diff --git a/src/webrtc/mod.rs b/src/webrtc/mod.rs index 7fdf3c72..8640ab56 100644 --- a/src/webrtc/mod.rs +++ b/src/webrtc/mod.rs @@ -9,7 +9,7 @@ //! //! Architecture: //! ```text -//! VideoCapturer (MJPEG/YUYV) +//! V4L2 capture //! | //! v //! SharedVideoPipeline (decode -> convert -> encode) diff --git a/src/webrtc/track.rs b/src/webrtc/track.rs index d3787c63..3466fdd2 100644 --- a/src/webrtc/track.rs +++ b/src/webrtc/track.rs @@ -262,8 +262,6 @@ impl Default for AudioTrackConfig { /// Audio track for WebRTC streaming pub struct AudioTrack { - #[allow(dead_code)] - config: AudioTrackConfig, /// RTP track track: Arc, /// Running flag @@ -284,7 +282,6 @@ impl AudioTrack { let (running_tx, _) = watch::channel(false); Self { - config, track, running: Arc::new(running_tx), }