feat: 完善架构优化性能

- 调整音视频架构,提升 RKMPP 编码 MJPEG-->H264 性能,同时解决丢帧马赛克问题;
- 删除多用户逻辑,只保留单用户,支持设置 web 单会话;
- 修复删除体验不好的的回退逻辑,前端页面菜单位置微调;
- 增加 OTG USB 设备动态调整功能;
- 修复 mdns 问题,webrtc 视频切换更顺畅。
This commit is contained in:
mofeng
2026-01-25 16:04:29 +08:00
parent 01e01430da
commit 1786b7689d
66 changed files with 4225 additions and 2936 deletions

View File

@@ -2,13 +2,13 @@
//!
//! Provides async video capture using memory-mapped buffers.
use bytes::Bytes;
use std::io;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use bytes::Bytes;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{broadcast, watch, Mutex};
use tokio::sync::{watch, Mutex};
use tracing::{debug, error, info, warn};
use v4l::buffer::Type as BufferType;
use v4l::io::traits::CaptureStream;
@@ -92,20 +92,8 @@ impl CaptureConfig {
/// Capture statistics
#[derive(Debug, Clone, Default)]
pub struct CaptureStats {
/// Total frames captured
pub frames_captured: u64,
/// Frames dropped (invalid/too small)
pub frames_dropped: u64,
/// Current FPS (calculated)
pub current_fps: f32,
/// Average frame size in bytes
pub avg_frame_size: usize,
/// Capture errors
pub errors: u64,
/// Last frame timestamp
pub last_frame_ts: Option<Instant>,
/// Whether signal is present
pub signal_present: bool,
}
/// Video capturer state
@@ -131,9 +119,7 @@ pub struct VideoCapturer {
state: Arc<watch::Sender<CaptureState>>,
state_rx: watch::Receiver<CaptureState>,
stats: Arc<Mutex<CaptureStats>>,
frame_tx: broadcast::Sender<VideoFrame>,
stop_flag: Arc<AtomicBool>,
sequence: Arc<AtomicU64>,
capture_handle: Mutex<Option<tokio::task::JoinHandle<()>>>,
/// Last error that occurred (device path, reason)
last_error: Arc<parking_lot::RwLock<Option<(String, String)>>>,
@@ -143,16 +129,13 @@ impl VideoCapturer {
/// Create a new video capturer
pub fn new(config: CaptureConfig) -> Self {
let (state_tx, state_rx) = watch::channel(CaptureState::Stopped);
let (frame_tx, _) = broadcast::channel(16); // Buffer size 16 for low latency
Self {
config,
state: Arc::new(state_tx),
state_rx,
stats: Arc::new(Mutex::new(CaptureStats::default())),
frame_tx,
stop_flag: Arc::new(AtomicBool::new(false)),
sequence: Arc::new(AtomicU64::new(0)),
capture_handle: Mutex::new(None),
last_error: Arc::new(parking_lot::RwLock::new(None)),
}
@@ -178,16 +161,6 @@ impl VideoCapturer {
*self.last_error.write() = None;
}
/// Subscribe to frames
pub fn subscribe(&self) -> broadcast::Receiver<VideoFrame> {
self.frame_tx.subscribe()
}
/// Get frame sender (for sharing with other components like WebRTC)
pub fn frame_sender(&self) -> broadcast::Sender<VideoFrame> {
self.frame_tx.clone()
}
/// Get capture statistics
pub async fn stats(&self) -> CaptureStats {
self.stats.lock().await.clone()
@@ -225,15 +198,11 @@ impl VideoCapturer {
let config = self.config.clone();
let state = self.state.clone();
let stats = self.stats.clone();
let frame_tx = self.frame_tx.clone();
let stop_flag = self.stop_flag.clone();
let sequence = self.sequence.clone();
let last_error = self.last_error.clone();
let handle = tokio::task::spawn_blocking(move || {
capture_loop(
config, state, stats, frame_tx, stop_flag, sequence, last_error,
);
capture_loop(config, state, stats, stop_flag, last_error);
});
*self.capture_handle.lock().await = Some(handle);
@@ -272,12 +241,10 @@ fn capture_loop(
config: CaptureConfig,
state: Arc<watch::Sender<CaptureState>>,
stats: Arc<Mutex<CaptureStats>>,
frame_tx: broadcast::Sender<VideoFrame>,
stop_flag: Arc<AtomicBool>,
sequence: Arc<AtomicU64>,
error_holder: Arc<parking_lot::RwLock<Option<(String, String)>>>,
) {
let result = run_capture(&config, &state, &stats, &frame_tx, &stop_flag, &sequence);
let result = run_capture(&config, &state, &stats, &stop_flag);
match result {
Ok(_) => {
@@ -300,9 +267,7 @@ fn run_capture(
config: &CaptureConfig,
state: &watch::Sender<CaptureState>,
stats: &Arc<Mutex<CaptureStats>>,
frame_tx: &broadcast::Sender<VideoFrame>,
stop_flag: &AtomicBool,
sequence: &AtomicU64,
) -> Result<()> {
// Retry logic for device busy errors
const MAX_RETRIES: u32 = 5;
@@ -368,16 +333,7 @@ fn run_capture(
};
// Device opened and format set successfully - proceed with capture
return run_capture_inner(
config,
state,
stats,
frame_tx,
stop_flag,
sequence,
device,
actual_format,
);
return run_capture_inner(config, state, stats, stop_flag, device, actual_format);
}
// All retries exhausted
@@ -391,9 +347,7 @@ fn run_capture_inner(
config: &CaptureConfig,
state: &watch::Sender<CaptureState>,
stats: &Arc<Mutex<CaptureStats>>,
frame_tx: &broadcast::Sender<VideoFrame>,
stop_flag: &AtomicBool,
sequence: &AtomicU64,
device: Device,
actual_format: Format,
) -> Result<()> {
@@ -402,8 +356,6 @@ fn run_capture_inner(
actual_format.width, actual_format.height, actual_format.fourcc, actual_format.stride
);
let resolution = Resolution::new(actual_format.width, actual_format.height);
let pixel_format = PixelFormat::from_fourcc(actual_format.fourcc).unwrap_or(config.format);
// Try to set hardware FPS (V4L2 VIDIOC_S_PARM)
if config.fps > 0 {
@@ -449,18 +401,13 @@ fn run_capture_inner(
// Main capture loop
while !stop_flag.load(Ordering::Relaxed) {
// Try to capture a frame
let (buf, meta) = match stream.next() {
let (_buf, meta) = match stream.next() {
Ok(frame_data) => frame_data,
Err(e) => {
if e.kind() == io::ErrorKind::TimedOut {
warn!("Capture timeout - no signal?");
let _ = state.send(CaptureState::NoSignal);
// Update stats
if let Ok(mut s) = stats.try_lock() {
s.signal_present = false;
}
// Wait a bit before retrying
std::thread::sleep(Duration::from_millis(100));
continue;
@@ -486,9 +433,6 @@ fn run_capture_inner(
}
error!("Capture error: {}", e);
if let Ok(mut s) = stats.try_lock() {
s.errors += 1;
}
continue;
}
};
@@ -502,54 +446,16 @@ fn run_capture_inner(
"Dropping small frame: {} bytes (bytesused={})",
frame_size, meta.bytesused
);
if let Ok(mut s) = stats.try_lock() {
s.frames_dropped += 1;
}
continue;
}
// For JPEG formats, validate header
if pixel_format.is_compressed() && !is_valid_jpeg(&buf[..frame_size]) {
debug!("Dropping invalid JPEG frame (size={})", frame_size);
if let Ok(mut s) = stats.try_lock() {
s.frames_dropped += 1;
}
continue;
}
// Create frame with actual data size
let seq = sequence.fetch_add(1, Ordering::Relaxed);
let frame = VideoFrame::new(
Bytes::copy_from_slice(&buf[..frame_size]),
resolution,
pixel_format,
actual_format.stride,
seq,
);
// Update state if was no signal
if *state.borrow() == CaptureState::NoSignal {
let _ = state.send(CaptureState::Running);
}
// Send frame to subscribers
let receiver_count = frame_tx.receiver_count();
if receiver_count > 0 {
if let Err(e) = frame_tx.send(frame) {
debug!("No active receivers for frame: {}", e);
}
} else if seq % 60 == 0 {
// Log every 60 frames (about 1 second at 60fps) when no receivers
debug!("No receivers for video frames (receiver_count=0)");
}
// Update stats
// Update FPS calculation
if let Ok(mut s) = stats.try_lock() {
s.frames_captured += 1;
s.signal_present = true;
s.last_frame_ts = Some(Instant::now());
// Update FPS calculation
fps_frame_count += 1;
let elapsed = fps_window_start.elapsed();
@@ -571,6 +477,7 @@ fn run_capture_inner(
}
/// Validate JPEG frame data
#[cfg(test)]
fn is_valid_jpeg(data: &[u8]) -> bool {
if data.len() < 125 {
return false;

View File

@@ -511,21 +511,6 @@ impl Encoder for H264Encoder {
}
}
/// Encoder statistics
#[derive(Debug, Clone, Default)]
pub struct EncoderStats {
/// Total frames encoded
pub frames_encoded: u64,
/// Total bytes output
pub bytes_output: u64,
/// Current encoding FPS
pub fps: f32,
/// Average encoding time per frame (ms)
pub avg_encode_time_ms: f32,
/// Keyframes encoded
pub keyframes: u64,
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -1,17 +1,110 @@
//! Video frame data structures
use bytes::Bytes;
use parking_lot::Mutex;
use std::sync::Arc;
use std::sync::OnceLock;
use std::time::Instant;
use super::format::{PixelFormat, Resolution};
#[derive(Clone)]
enum FrameData {
Bytes(Bytes),
Pooled(Arc<FrameBuffer>),
}
impl std::fmt::Debug for FrameData {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
FrameData::Bytes(bytes) => f
.debug_struct("FrameData::Bytes")
.field("len", &bytes.len())
.finish(),
FrameData::Pooled(buf) => f
.debug_struct("FrameData::Pooled")
.field("len", &buf.len())
.finish(),
}
}
}
#[derive(Debug)]
pub struct FrameBufferPool {
pool: Mutex<Vec<Vec<u8>>>,
max_buffers: usize,
}
impl FrameBufferPool {
pub fn new(max_buffers: usize) -> Self {
Self {
pool: Mutex::new(Vec::new()),
max_buffers: max_buffers.max(1),
}
}
pub fn take(&self, min_capacity: usize) -> Vec<u8> {
let mut pool = self.pool.lock();
if let Some(mut buf) = pool.pop() {
if buf.capacity() < min_capacity {
buf.reserve(min_capacity - buf.capacity());
}
buf
} else {
Vec::with_capacity(min_capacity)
}
}
pub fn put(&self, mut buf: Vec<u8>) {
buf.clear();
let mut pool = self.pool.lock();
if pool.len() < self.max_buffers {
pool.push(buf);
}
}
}
pub struct FrameBuffer {
data: Vec<u8>,
pool: Option<Arc<FrameBufferPool>>,
}
impl FrameBuffer {
pub fn new(data: Vec<u8>, pool: Option<Arc<FrameBufferPool>>) -> Self {
Self { data, pool }
}
pub fn as_slice(&self) -> &[u8] {
&self.data
}
pub fn len(&self) -> usize {
self.data.len()
}
}
impl std::fmt::Debug for FrameBuffer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FrameBuffer")
.field("len", &self.data.len())
.finish()
}
}
impl Drop for FrameBuffer {
fn drop(&mut self) {
if let Some(pool) = self.pool.take() {
let data = std::mem::take(&mut self.data);
pool.put(data);
}
}
}
/// A video frame with metadata
#[derive(Debug, Clone)]
pub struct VideoFrame {
/// Raw frame data
data: Arc<Bytes>,
data: FrameData,
/// Cached xxHash64 of frame data (lazy computed for deduplication)
hash: Arc<OnceLock<u64>>,
/// Frame resolution
@@ -40,7 +133,7 @@ impl VideoFrame {
sequence: u64,
) -> Self {
Self {
data: Arc::new(data),
data: FrameData::Bytes(data),
hash: Arc::new(OnceLock::new()),
resolution,
format,
@@ -63,24 +156,51 @@ impl VideoFrame {
Self::new(Bytes::from(data), resolution, format, stride, sequence)
}
/// Create a frame from pooled buffer
pub fn from_pooled(
data: Arc<FrameBuffer>,
resolution: Resolution,
format: PixelFormat,
stride: u32,
sequence: u64,
) -> Self {
Self {
data: FrameData::Pooled(data),
hash: Arc::new(OnceLock::new()),
resolution,
format,
stride,
key_frame: true,
sequence,
capture_ts: Instant::now(),
online: true,
}
}
/// Get frame data as bytes slice
pub fn data(&self) -> &[u8] {
&self.data
match &self.data {
FrameData::Bytes(bytes) => bytes,
FrameData::Pooled(buf) => buf.as_slice(),
}
}
/// Get frame data as Bytes (cheap clone)
pub fn data_bytes(&self) -> Bytes {
(*self.data).clone()
match &self.data {
FrameData::Bytes(bytes) => bytes.clone(),
FrameData::Pooled(buf) => Bytes::copy_from_slice(buf.as_slice()),
}
}
/// Get data length
pub fn len(&self) -> usize {
self.data.len()
self.data().len()
}
/// Check if frame is empty
pub fn is_empty(&self) -> bool {
self.data.is_empty()
self.data().is_empty()
}
/// Get width
@@ -108,7 +228,7 @@ impl VideoFrame {
pub fn get_hash(&self) -> u64 {
*self
.hash
.get_or_init(|| xxhash_rust::xxh64::xxh64(self.data.as_ref(), 0))
.get_or_init(|| xxhash_rust::xxh64::xxh64(self.data(), 0))
}
/// Check if format is JPEG/MJPEG
@@ -121,25 +241,27 @@ impl VideoFrame {
if !self.is_jpeg() {
return false;
}
if self.data.len() < 125 {
Self::is_valid_jpeg_bytes(self.data())
}
/// Validate JPEG bytes without constructing a frame
pub fn is_valid_jpeg_bytes(data: &[u8]) -> bool {
if data.len() < 125 {
return false;
}
// Check JPEG header
let start_marker = ((self.data[0] as u16) << 8) | self.data[1] as u16;
let start_marker = ((data[0] as u16) << 8) | data[1] as u16;
if start_marker != 0xFFD8 {
return false;
}
// Check JPEG end marker
let end = self.data.len();
let end_marker = ((self.data[end - 2] as u16) << 8) | self.data[end - 1] as u16;
// Valid end markers: 0xFFD9, 0xD900, 0x0000 (padded)
let end = data.len();
let end_marker = ((data[end - 2] as u16) << 8) | data[end - 1] as u16;
matches!(end_marker, 0xFFD9 | 0xD900 | 0x0000)
}
/// Create an offline placeholder frame
pub fn offline(resolution: Resolution, format: PixelFormat) -> Self {
Self {
data: Arc::new(Bytes::new()),
data: FrameData::Bytes(Bytes::new()),
hash: Arc::new(OnceLock::new()),
resolution,
format,
@@ -175,65 +297,3 @@ impl From<&VideoFrame> for FrameMeta {
}
}
}
/// Ring buffer for storing recent frames
pub struct FrameRing {
frames: Vec<Option<VideoFrame>>,
capacity: usize,
write_pos: usize,
count: usize,
}
impl FrameRing {
/// Create a new frame ring with specified capacity
pub fn new(capacity: usize) -> Self {
assert!(capacity > 0, "Ring capacity must be > 0");
Self {
frames: (0..capacity).map(|_| None).collect(),
capacity,
write_pos: 0,
count: 0,
}
}
/// Push a frame into the ring
pub fn push(&mut self, frame: VideoFrame) {
self.frames[self.write_pos] = Some(frame);
self.write_pos = (self.write_pos + 1) % self.capacity;
if self.count < self.capacity {
self.count += 1;
}
}
/// Get the latest frame
pub fn latest(&self) -> Option<&VideoFrame> {
if self.count == 0 {
return None;
}
let pos = if self.write_pos == 0 {
self.capacity - 1
} else {
self.write_pos - 1
};
self.frames[pos].as_ref()
}
/// Get number of frames in ring
pub fn len(&self) -> usize {
self.count
}
/// Check if ring is empty
pub fn is_empty(&self) -> bool {
self.count == 0
}
/// Clear all frames
pub fn clear(&mut self) {
for frame in &mut self.frames {
*frame = None;
}
self.write_pos = 0;
self.count = 0;
}
}

View File

@@ -53,22 +53,8 @@ impl Default for H264PipelineConfig {
/// H264 pipeline statistics
#[derive(Debug, Clone, Default)]
pub struct H264PipelineStats {
/// Total frames captured
pub frames_captured: u64,
/// Total frames encoded
pub frames_encoded: u64,
/// Frames dropped (encoding too slow)
pub frames_dropped: u64,
/// Total bytes encoded
pub bytes_encoded: u64,
/// Keyframes encoded
pub keyframes_encoded: u64,
/// Average encoding time per frame (ms)
pub avg_encode_time_ms: f32,
/// Current encoding FPS
pub current_fps: f32,
/// Errors encountered
pub errors: u64,
}
/// H264 video encoding pipeline
@@ -84,8 +70,6 @@ pub struct H264Pipeline {
stats: Arc<Mutex<H264PipelineStats>>,
/// Running state
running: watch::Sender<bool>,
/// Encode time accumulator for averaging
encode_times: Arc<Mutex<Vec<f32>>>,
}
impl H264Pipeline {
@@ -183,7 +167,6 @@ impl H264Pipeline {
video_track,
stats: Arc::new(Mutex::new(H264PipelineStats::default())),
running: running_tx,
encode_times: Arc::new(Mutex::new(Vec::with_capacity(100))),
})
}
@@ -222,7 +205,6 @@ impl H264Pipeline {
let nv12_converter = self.nv12_converter.lock().await.take();
let video_track = self.video_track.clone();
let stats = self.stats.clone();
let encode_times = self.encode_times.clone();
let config = self.config.clone();
let mut running_rx = self.running.subscribe();
@@ -275,12 +257,6 @@ impl H264Pipeline {
}
}
// Update captured count
{
let mut s = stats.lock().await;
s.frames_captured += 1;
}
// Convert to NV12 for VAAPI encoder
// BGR24/RGB24/YUYV -> NV12 (via NV12 converter)
// NV12 -> pass through
@@ -297,8 +273,6 @@ impl H264Pipeline {
Ok(nv12_data) => encoder.encode_raw(nv12_data, pts_ms),
Err(e) => {
error!("NV12 conversion failed: {}", e);
let mut s = stats.lock().await;
s.errors += 1;
continue;
}
}
@@ -323,35 +297,13 @@ impl H264Pipeline {
.await
{
error!("Failed to write frame to track: {}", e);
let mut s = stats.lock().await;
s.errors += 1;
} else {
// Update stats
let encode_time = start.elapsed().as_secs_f32() * 1000.0;
let mut s = stats.lock().await;
s.frames_encoded += 1;
s.bytes_encoded += frame.data.len() as u64;
if is_keyframe {
s.keyframes_encoded += 1;
}
// Update encode time average
let mut times = encode_times.lock().await;
times.push(encode_time);
if times.len() > 100 {
times.remove(0);
}
if !times.is_empty() {
s.avg_encode_time_ms =
times.iter().sum::<f32>() / times.len() as f32;
}
let _ = start;
}
}
}
Err(e) => {
error!("Encoding failed: {}", e);
let mut s = stats.lock().await;
s.errors += 1;
}
}
@@ -365,8 +317,7 @@ impl H264Pipeline {
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
let mut s = stats.lock().await;
s.frames_dropped += n;
let _ = n;
}
Err(broadcast::error::RecvError::Closed) => {
info!("Frame channel closed, stopping H264 pipeline");

View File

@@ -17,20 +17,33 @@
//! ```
use bytes::Bytes;
use parking_lot::RwLock as ParkingRwLock;
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{broadcast, watch, Mutex, RwLock};
use tokio::sync::{broadcast, mpsc, watch, Mutex, RwLock};
use tracing::{debug, error, info, trace, warn};
/// Grace period before auto-stopping pipeline when no subscribers (in seconds)
const AUTO_STOP_GRACE_PERIOD_SECS: u64 = 3;
/// Minimum valid frame size for capture
const MIN_CAPTURE_FRAME_SIZE: usize = 128;
/// Validate JPEG header every N frames to reduce overhead
const JPEG_VALIDATE_INTERVAL: u64 = 30;
use crate::error::{AppError, Result};
use crate::video::convert::{Nv12Converter, PixelConverter};
#[cfg(any(target_arch = "aarch64", target_arch = "arm"))]
use crate::video::decoder::MjpegRkmppDecoder;
use crate::video::decoder::MjpegTurboDecoder;
#[cfg(any(target_arch = "aarch64", target_arch = "arm"))]
use hwcodec::ffmpeg_hw::{last_error_message as ffmpeg_hw_last_error, HwMjpegH264Config, HwMjpegH264Pipeline};
use v4l::buffer::Type as BufferType;
use v4l::io::traits::CaptureStream;
use v4l::prelude::*;
use v4l::video::Capture;
use v4l::video::capture::Parameters;
use v4l::Format;
use crate::video::encoder::h264::{detect_best_encoder, H264Config, H264Encoder, H264InputFormat};
use crate::video::encoder::h265::{
detect_best_h265_encoder, H265Config, H265Encoder, H265InputFormat,
@@ -40,7 +53,7 @@ use crate::video::encoder::traits::EncoderConfig;
use crate::video::encoder::vp8::{detect_best_vp8_encoder, VP8Config, VP8Encoder};
use crate::video::encoder::vp9::{detect_best_vp9_encoder, VP9Config, VP9Encoder};
use crate::video::format::{PixelFormat, Resolution};
use crate::video::frame::VideoFrame;
use crate::video::frame::{FrameBuffer, FrameBufferPool, VideoFrame};
/// Encoded video frame for distribution
#[derive(Debug, Clone)]
@@ -59,6 +72,10 @@ pub struct EncodedVideoFrame {
pub codec: VideoEncoderType,
}
enum PipelineCmd {
SetBitrate { bitrate_kbps: u32, gop: u32 },
}
/// Shared video pipeline configuration
#[derive(Debug, Clone)]
pub struct SharedVideoPipelineConfig {
@@ -150,16 +167,22 @@ impl SharedVideoPipelineConfig {
/// Pipeline statistics
#[derive(Debug, Clone, Default)]
pub struct SharedVideoPipelineStats {
pub frames_captured: u64,
pub frames_encoded: u64,
pub frames_dropped: u64,
pub frames_skipped: u64,
pub bytes_encoded: u64,
pub keyframes_encoded: u64,
pub avg_encode_time_ms: f32,
pub current_fps: f32,
pub errors: u64,
pub subscribers: u64,
}
struct EncoderThreadState {
encoder: Option<Box<dyn VideoEncoderTrait + Send>>,
mjpeg_decoder: Option<MjpegDecoderKind>,
nv12_converter: Option<Nv12Converter>,
yuv420p_converter: Option<PixelConverter>,
encoder_needs_yuv420p: bool,
#[cfg(any(target_arch = "aarch64", target_arch = "arm"))]
ffmpeg_hw_pipeline: Option<HwMjpegH264Pipeline>,
#[cfg(any(target_arch = "aarch64", target_arch = "arm"))]
ffmpeg_hw_enabled: bool,
fps: u32,
codec: VideoEncoderType,
input_format: PixelFormat,
}
/// Universal video encoder trait object
@@ -314,18 +337,13 @@ impl MjpegDecoderKind {
/// Universal shared video pipeline
pub struct SharedVideoPipeline {
config: RwLock<SharedVideoPipelineConfig>,
encoder: Mutex<Option<Box<dyn VideoEncoderTrait + Send>>>,
mjpeg_decoder: Mutex<Option<MjpegDecoderKind>>,
nv12_converter: Mutex<Option<Nv12Converter>>,
yuv420p_converter: Mutex<Option<PixelConverter>>,
/// Whether the encoder needs YUV420P (true) or NV12 (false)
encoder_needs_yuv420p: AtomicBool,
/// Whether YUYV direct input is enabled (RKMPP optimization)
direct_input: AtomicBool,
frame_tx: broadcast::Sender<EncodedVideoFrame>,
subscribers: ParkingRwLock<Vec<mpsc::Sender<Arc<EncodedVideoFrame>>>>,
stats: Mutex<SharedVideoPipelineStats>,
running: watch::Sender<bool>,
running_rx: watch::Receiver<bool>,
cmd_tx: ParkingRwLock<Option<tokio::sync::mpsc::UnboundedSender<PipelineCmd>>>,
/// Fast running flag for blocking capture loop
running_flag: AtomicBool,
/// Frame sequence counter (atomic for lock-free access)
sequence: AtomicU64,
/// Atomic flag for keyframe request (avoids lock contention)
@@ -347,21 +365,16 @@ impl SharedVideoPipeline {
config.input_format
);
let (frame_tx, _) = broadcast::channel(16); // Reduced from 64 for lower latency
let (running_tx, running_rx) = watch::channel(false);
let pipeline = Arc::new(Self {
config: RwLock::new(config),
encoder: Mutex::new(None),
mjpeg_decoder: Mutex::new(None),
nv12_converter: Mutex::new(None),
yuv420p_converter: Mutex::new(None),
encoder_needs_yuv420p: AtomicBool::new(false),
direct_input: AtomicBool::new(false),
frame_tx,
subscribers: ParkingRwLock::new(Vec::new()),
stats: Mutex::new(SharedVideoPipelineStats::default()),
running: running_tx,
running_rx,
cmd_tx: ParkingRwLock::new(None),
running_flag: AtomicBool::new(false),
sequence: AtomicU64::new(0),
keyframe_requested: AtomicBool::new(false),
pipeline_start_time_ms: AtomicI64::new(0),
@@ -370,9 +383,7 @@ impl SharedVideoPipeline {
Ok(pipeline)
}
/// Initialize encoder based on config
async fn init_encoder(&self) -> Result<()> {
let config = self.config.read().await.clone();
fn build_encoder_state(config: &SharedVideoPipelineConfig) -> Result<EncoderThreadState> {
let registry = EncoderRegistry::global();
// Helper to get codec name for specific backend
@@ -506,6 +517,43 @@ impl SharedVideoPipeline {
|| selected_codec_name.contains("libx265")
|| selected_codec_name.contains("libvpx");
#[cfg(any(target_arch = "aarch64", target_arch = "arm"))]
if needs_mjpeg_decode && is_rkmpp_encoder && config.output_codec == VideoEncoderType::H264 {
info!("Initializing FFmpeg HW MJPEG->H264 pipeline (no fallback)");
let hw_config = HwMjpegH264Config {
decoder: "mjpeg_rkmpp".to_string(),
encoder: selected_codec_name.clone(),
width: config.resolution.width as i32,
height: config.resolution.height as i32,
fps: config.fps as i32,
bitrate_kbps: config.bitrate_kbps() as i32,
gop: config.gop_size() as i32,
thread_count: 1,
};
let pipeline = HwMjpegH264Pipeline::new(hw_config).map_err(|e| {
let detail = if e.is_empty() { ffmpeg_hw_last_error() } else { e };
AppError::VideoError(format!(
"FFmpeg HW MJPEG->H264 init failed: {}",
detail
))
})?;
info!("Using FFmpeg HW MJPEG->H264 pipeline");
return Ok(EncoderThreadState {
encoder: None,
mjpeg_decoder: None,
nv12_converter: None,
yuv420p_converter: None,
encoder_needs_yuv420p: false,
#[cfg(any(target_arch = "aarch64", target_arch = "arm"))]
ffmpeg_hw_pipeline: Some(pipeline),
#[cfg(any(target_arch = "aarch64", target_arch = "arm"))]
ffmpeg_hw_enabled: true,
fps: config.fps,
codec: config.output_codec,
input_format: config.input_format,
});
}
let pipeline_input_format = if needs_mjpeg_decode {
if is_rkmpp_encoder {
info!(
@@ -515,8 +563,8 @@ impl SharedVideoPipeline {
#[cfg(any(target_arch = "aarch64", target_arch = "arm"))]
{
let decoder = MjpegRkmppDecoder::new(config.resolution)?;
*self.mjpeg_decoder.lock().await = Some(MjpegDecoderKind::Rkmpp(decoder));
PixelFormat::Nv12
let pipeline_format = PixelFormat::Nv12;
(Some(MjpegDecoderKind::Rkmpp(decoder)), pipeline_format)
}
#[cfg(not(any(target_arch = "aarch64", target_arch = "arm")))]
{
@@ -530,17 +578,16 @@ impl SharedVideoPipeline {
config.input_format
);
let decoder = MjpegTurboDecoder::new(config.resolution)?;
*self.mjpeg_decoder.lock().await = Some(MjpegDecoderKind::Turbo(decoder));
PixelFormat::Rgb24
(Some(MjpegDecoderKind::Turbo(decoder)), PixelFormat::Rgb24)
} else {
return Err(AppError::VideoError(
"MJPEG input requires RKMPP or software encoder".to_string(),
));
}
} else {
*self.mjpeg_decoder.lock().await = None;
config.input_format
(None, config.input_format)
};
let (mjpeg_decoder, pipeline_input_format) = pipeline_input_format;
// Create encoder based on codec type
let encoder: Box<dyn VideoEncoderTrait + Send> = match config.output_codec {
@@ -856,24 +903,32 @@ impl SharedVideoPipeline {
}
};
*self.encoder.lock().await = Some(encoder);
*self.nv12_converter.lock().await = nv12_converter;
*self.yuv420p_converter.lock().await = yuv420p_converter;
self.encoder_needs_yuv420p
.store(needs_yuv420p, Ordering::Release);
self.direct_input.store(use_direct_input, Ordering::Release);
Ok(())
Ok(EncoderThreadState {
encoder: Some(encoder),
mjpeg_decoder,
nv12_converter,
yuv420p_converter,
encoder_needs_yuv420p: needs_yuv420p,
#[cfg(any(target_arch = "aarch64", target_arch = "arm"))]
ffmpeg_hw_pipeline: None,
#[cfg(any(target_arch = "aarch64", target_arch = "arm"))]
ffmpeg_hw_enabled: false,
fps: config.fps,
codec: config.output_codec,
input_format: config.input_format,
})
}
/// Subscribe to encoded frames
pub fn subscribe(&self) -> broadcast::Receiver<EncodedVideoFrame> {
self.frame_tx.subscribe()
pub fn subscribe(&self) -> mpsc::Receiver<Arc<EncodedVideoFrame>> {
let (tx, rx) = mpsc::channel(4);
self.subscribers.write().push(tx);
rx
}
/// Get subscriber count
pub fn subscriber_count(&self) -> usize {
self.frame_tx.receiver_count()
self.subscribers.read().iter().filter(|tx| !tx.is_closed()).count()
}
/// Report that a receiver has lagged behind
@@ -899,11 +954,50 @@ impl SharedVideoPipeline {
info!("[Pipeline] Keyframe requested for new client");
}
fn send_cmd(&self, cmd: PipelineCmd) {
let tx = self.cmd_tx.read().clone();
if let Some(tx) = tx {
let _ = tx.send(cmd);
}
}
fn clear_cmd_tx(&self) {
let mut guard = self.cmd_tx.write();
*guard = None;
}
fn apply_cmd(&self, state: &mut EncoderThreadState, cmd: PipelineCmd) -> Result<()> {
match cmd {
PipelineCmd::SetBitrate { bitrate_kbps, gop } => {
#[cfg(not(any(target_arch = "aarch64", target_arch = "arm")))]
let _ = gop;
#[cfg(any(target_arch = "aarch64", target_arch = "arm"))]
if state.ffmpeg_hw_enabled {
if let Some(ref mut pipeline) = state.ffmpeg_hw_pipeline {
pipeline
.reconfigure(bitrate_kbps as i32, gop as i32)
.map_err(|e| {
let detail = if e.is_empty() { ffmpeg_hw_last_error() } else { e };
AppError::VideoError(format!(
"FFmpeg HW reconfigure failed: {}",
detail
))
})?;
return Ok(());
}
}
if let Some(ref mut encoder) = state.encoder {
encoder.set_bitrate(bitrate_kbps)?;
}
}
}
Ok(())
}
/// Get current stats
pub async fn stats(&self) -> SharedVideoPipelineStats {
let mut stats = self.stats.lock().await.clone();
stats.subscribers = self.frame_tx.receiver_count() as u64;
stats
self.stats.lock().await.clone()
}
/// Check if running
@@ -919,6 +1013,27 @@ impl SharedVideoPipeline {
self.running_rx.clone()
}
async fn broadcast_encoded(&self, frame: Arc<EncodedVideoFrame>) {
let subscribers = {
let guard = self.subscribers.read();
if guard.is_empty() {
return;
}
guard.iter().cloned().collect::<Vec<_>>()
};
for tx in &subscribers {
if tx.send(frame.clone()).await.is_err() {
// Receiver dropped; cleanup happens below.
}
}
if subscribers.iter().any(|tx| tx.is_closed()) {
let mut guard = self.subscribers.write();
guard.retain(|tx| !tx.is_closed());
}
}
/// Get current codec
pub async fn current_codec(&self) -> VideoEncoderType {
self.config.read().await.output_codec
@@ -938,12 +1053,7 @@ impl SharedVideoPipeline {
config.output_codec = codec;
}
// Clear encoder state
*self.encoder.lock().await = None;
*self.mjpeg_decoder.lock().await = None;
*self.nv12_converter.lock().await = None;
*self.yuv420p_converter.lock().await = None;
self.encoder_needs_yuv420p.store(false, Ordering::Release);
self.clear_cmd_tx();
info!("Switched to {} codec", codec);
Ok(())
@@ -959,10 +1069,10 @@ impl SharedVideoPipeline {
return Ok(());
}
self.init_encoder().await?;
let _ = self.running.send(true);
let config = self.config.read().await.clone();
let mut encoder_state = Self::build_encoder_state(&config)?;
let _ = self.running.send(true);
self.running_flag.store(true, Ordering::Release);
let gop_size = config.gop_size();
info!(
"Starting {} pipeline (GOP={})",
@@ -970,6 +1080,11 @@ impl SharedVideoPipeline {
);
let pipeline = self.clone();
let (cmd_tx, mut cmd_rx) = tokio::sync::mpsc::unbounded_channel();
{
let mut guard = self.cmd_tx.write();
*guard = Some(cmd_tx);
}
tokio::spawn(async move {
let mut frame_count: u64 = 0;
@@ -977,13 +1092,6 @@ impl SharedVideoPipeline {
let mut fps_frame_count: u64 = 0;
let mut running_rx = pipeline.running_rx.clone();
// Local counters for batch stats update (reduce lock contention)
let mut local_frames_encoded: u64 = 0;
let mut local_bytes_encoded: u64 = 0;
let mut local_keyframes: u64 = 0;
let mut local_errors: u64 = 0;
let mut local_dropped: u64 = 0;
let mut local_skipped: u64 = 0;
// Track when we last had subscribers for auto-stop feature
let mut no_subscribers_since: Option<Instant> = None;
let grace_period = Duration::from_secs(AUTO_STOP_GRACE_PERIOD_SECS);
@@ -1001,7 +1109,12 @@ impl SharedVideoPipeline {
result = frame_rx.recv() => {
match result {
Ok(video_frame) => {
let subscriber_count = pipeline.frame_tx.receiver_count();
while let Ok(cmd) = cmd_rx.try_recv() {
if let Err(e) = pipeline.apply_cmd(&mut encoder_state, cmd) {
error!("Failed to apply pipeline command: {}", e);
}
}
let subscriber_count = pipeline.subscriber_count();
if subscriber_count == 0 {
// Track when we started having no subscribers
@@ -1019,6 +1132,9 @@ impl SharedVideoPipeline {
);
// Signal stop and break out of loop
let _ = pipeline.running.send(false);
pipeline
.running_flag
.store(false, Ordering::Release);
break;
}
}
@@ -1033,18 +1149,10 @@ impl SharedVideoPipeline {
}
}
match pipeline.encode_frame(&video_frame, frame_count).await {
match pipeline.encode_frame_sync(&mut encoder_state, &video_frame, frame_count) {
Ok(Some(encoded_frame)) => {
// Send frame to all subscribers
// Note: broadcast::send is non-blocking
let _ = pipeline.frame_tx.send(encoded_frame.clone());
// Update local counters (no lock)
local_frames_encoded += 1;
local_bytes_encoded += encoded_frame.data.len() as u64;
if encoded_frame.is_keyframe {
local_keyframes += 1;
}
let encoded_arc = Arc::new(encoded_frame);
pipeline.broadcast_encoded(encoded_arc).await;
frame_count += 1;
fps_frame_count += 1;
@@ -1052,11 +1160,10 @@ impl SharedVideoPipeline {
Ok(None) => {}
Err(e) => {
error!("Encoding failed: {}", e);
local_errors += 1;
}
}
// Batch update stats every second (reduces lock contention)
// Update FPS every second (reduces lock contention)
let fps_elapsed = last_fps_time.elapsed();
if fps_elapsed >= Duration::from_secs(1) {
let current_fps =
@@ -1064,27 +1171,13 @@ impl SharedVideoPipeline {
fps_frame_count = 0;
last_fps_time = Instant::now();
// Single lock acquisition for all stats
// Single lock acquisition for FPS
let mut s = pipeline.stats.lock().await;
s.frames_encoded += local_frames_encoded;
s.bytes_encoded += local_bytes_encoded;
s.keyframes_encoded += local_keyframes;
s.errors += local_errors;
s.frames_dropped += local_dropped;
s.frames_skipped += local_skipped;
s.current_fps = current_fps;
// Reset local counters
local_frames_encoded = 0;
local_bytes_encoded = 0;
local_keyframes = 0;
local_errors = 0;
local_dropped = 0;
local_skipped = 0;
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
local_dropped += n;
let _ = n;
}
Err(broadcast::error::RecvError::Closed) => {
break;
@@ -1094,37 +1187,277 @@ impl SharedVideoPipeline {
}
}
pipeline.clear_cmd_tx();
pipeline.running_flag.store(false, Ordering::Release);
info!("Video pipeline stopped");
});
Ok(())
}
/// Encode a single frame
async fn encode_frame(
/// Start the pipeline by owning capture + encode in a single loop.
///
/// This avoids the raw-frame broadcast path and keeps capture and encode
/// in the same thread for lower overhead.
pub async fn start_with_device(
self: &Arc<Self>,
device_path: std::path::PathBuf,
buffer_count: u32,
_jpeg_quality: u8,
) -> Result<()> {
if *self.running_rx.borrow() {
warn!("Pipeline already running");
return Ok(());
}
let config = self.config.read().await.clone();
let mut encoder_state = Self::build_encoder_state(&config)?;
let _ = self.running.send(true);
self.running_flag.store(true, Ordering::Release);
let pipeline = self.clone();
let latest_frame: Arc<ParkingRwLock<Option<Arc<VideoFrame>>>> =
Arc::new(ParkingRwLock::new(None));
let (frame_seq_tx, mut frame_seq_rx) = watch::channel(0u64);
let buffer_pool = Arc::new(FrameBufferPool::new(buffer_count.max(4) as usize));
let (cmd_tx, mut cmd_rx) = tokio::sync::mpsc::unbounded_channel();
{
let mut guard = self.cmd_tx.write();
*guard = Some(cmd_tx);
}
// Encoder loop (runs on tokio, consumes latest frame)
{
let pipeline = pipeline.clone();
let latest_frame = latest_frame.clone();
tokio::spawn(async move {
let mut frame_count: u64 = 0;
let mut last_fps_time = Instant::now();
let mut fps_frame_count: u64 = 0;
let mut last_seq = *frame_seq_rx.borrow();
while pipeline.running_flag.load(Ordering::Acquire) {
if frame_seq_rx.changed().await.is_err() {
break;
}
if !pipeline.running_flag.load(Ordering::Acquire) {
break;
}
let seq = *frame_seq_rx.borrow();
if seq == last_seq {
continue;
}
last_seq = seq;
if pipeline.subscriber_count() == 0 {
continue;
}
while let Ok(cmd) = cmd_rx.try_recv() {
if let Err(e) = pipeline.apply_cmd(&mut encoder_state, cmd) {
error!("Failed to apply pipeline command: {}", e);
}
}
let frame = {
let guard = latest_frame.read();
guard.clone()
};
let frame = match frame {
Some(f) => f,
None => continue,
};
match pipeline.encode_frame_sync(&mut encoder_state, &frame, frame_count) {
Ok(Some(encoded_frame)) => {
let encoded_arc = Arc::new(encoded_frame);
pipeline.broadcast_encoded(encoded_arc).await;
frame_count += 1;
fps_frame_count += 1;
}
Ok(None) => {}
Err(e) => {
error!("Encoding failed: {}", e);
}
}
let fps_elapsed = last_fps_time.elapsed();
if fps_elapsed >= Duration::from_secs(1) {
let current_fps = fps_frame_count as f32 / fps_elapsed.as_secs_f32();
fps_frame_count = 0;
last_fps_time = Instant::now();
let mut s = pipeline.stats.lock().await;
s.current_fps = current_fps;
}
}
pipeline.clear_cmd_tx();
});
}
// Capture loop (runs on thread, updates latest frame)
{
let pipeline = pipeline.clone();
let latest_frame = latest_frame.clone();
let frame_seq_tx = frame_seq_tx.clone();
let buffer_pool = buffer_pool.clone();
std::thread::spawn(move || {
let device = match Device::with_path(&device_path) {
Ok(d) => d,
Err(e) => {
error!("Failed to open device {:?}: {}", device_path, e);
let _ = pipeline.running.send(false);
pipeline.running_flag.store(false, Ordering::Release);
let _ = frame_seq_tx.send(1);
return;
}
};
let requested_format = Format::new(
config.resolution.width,
config.resolution.height,
config.input_format.to_fourcc(),
);
let actual_format = match device.set_format(&requested_format) {
Ok(f) => f,
Err(e) => {
error!("Failed to set capture format: {}", e);
let _ = pipeline.running.send(false);
pipeline.running_flag.store(false, Ordering::Release);
let _ = frame_seq_tx.send(1);
return;
}
};
let resolution = Resolution::new(actual_format.width, actual_format.height);
let pixel_format =
PixelFormat::from_fourcc(actual_format.fourcc).unwrap_or(config.input_format);
let stride = actual_format.stride;
if config.fps > 0 {
if let Err(e) = device.set_params(&Parameters::with_fps(config.fps)) {
warn!("Failed to set hardware FPS: {}", e);
}
}
let mut stream = match MmapStream::with_buffers(
&device,
BufferType::VideoCapture,
buffer_count.max(1),
) {
Ok(s) => s,
Err(e) => {
error!("Failed to create capture stream: {}", e);
let _ = pipeline.running.send(false);
pipeline.running_flag.store(false, Ordering::Release);
let _ = frame_seq_tx.send(1);
return;
}
};
let mut no_subscribers_since: Option<Instant> = None;
let grace_period = Duration::from_secs(AUTO_STOP_GRACE_PERIOD_SECS);
let mut sequence: u64 = 0;
let mut validate_counter: u64 = 0;
while pipeline.running_flag.load(Ordering::Acquire) {
let subscriber_count = pipeline.subscriber_count();
if subscriber_count == 0 {
if no_subscribers_since.is_none() {
no_subscribers_since = Some(Instant::now());
trace!("No subscribers, starting grace period timer");
}
if let Some(since) = no_subscribers_since {
if since.elapsed() >= grace_period {
info!(
"No subscribers for {}s, auto-stopping video pipeline",
grace_period.as_secs()
);
let _ = pipeline.running.send(false);
pipeline.running_flag.store(false, Ordering::Release);
let _ = frame_seq_tx.send(sequence.wrapping_add(1));
break;
}
}
std::thread::sleep(Duration::from_millis(5));
continue;
} else if no_subscribers_since.is_some() {
trace!("Subscriber connected, resetting grace period timer");
no_subscribers_since = None;
}
let (buf, meta) = match stream.next() {
Ok(frame_data) => frame_data,
Err(e) => {
if e.kind() == std::io::ErrorKind::TimedOut {
warn!("Capture timeout - no signal?");
} else {
error!("Capture error: {}", e);
}
continue;
}
};
let frame_size = meta.bytesused as usize;
if frame_size < MIN_CAPTURE_FRAME_SIZE {
continue;
}
validate_counter = validate_counter.wrapping_add(1);
if pixel_format.is_compressed()
&& validate_counter % JPEG_VALIDATE_INTERVAL == 0
&& !VideoFrame::is_valid_jpeg_bytes(&buf[..frame_size])
{
continue;
}
let mut owned = buffer_pool.take(frame_size);
owned.resize(frame_size, 0);
owned[..frame_size].copy_from_slice(&buf[..frame_size]);
let frame = Arc::new(VideoFrame::from_pooled(
Arc::new(FrameBuffer::new(owned, Some(buffer_pool.clone()))),
resolution,
pixel_format,
stride,
sequence,
));
sequence = sequence.wrapping_add(1);
{
let mut guard = latest_frame.write();
*guard = Some(frame);
}
let _ = frame_seq_tx.send(sequence);
}
pipeline.running_flag.store(false, Ordering::Release);
let _ = pipeline.running.send(false);
let _ = frame_seq_tx.send(sequence.wrapping_add(1));
info!("Video pipeline stopped");
});
}
Ok(())
}
/// Encode a single frame (synchronous, no async locks)
fn encode_frame_sync(
&self,
state: &mut EncoderThreadState,
frame: &VideoFrame,
frame_count: u64,
) -> Result<Option<EncodedVideoFrame>> {
let (fps, codec, input_format) = {
let config = self.config.read().await;
(config.fps, config.output_codec, config.input_format)
};
let fps = state.fps;
let codec = state.codec;
let input_format = state.input_format;
let raw_frame = frame.data();
let decoded_buf = if input_format.is_compressed() {
let decoded = {
let mut decoder_guard = self.mjpeg_decoder.lock().await;
let decoder = decoder_guard.as_mut().ok_or_else(|| {
AppError::VideoError("MJPEG decoder not initialized".to_string())
})?;
decoder.decode(raw_frame)?
};
Some(decoded)
} else {
None
};
let raw_frame = decoded_buf.as_deref().unwrap_or(raw_frame);
// Calculate PTS from real capture timestamp (lock-free using AtomicI64)
// This ensures smooth playback even when capture timing varies
@@ -1149,6 +1482,53 @@ impl SharedVideoPipeline {
current_ts_ms - start_ts
};
#[cfg(any(target_arch = "aarch64", target_arch = "arm"))]
if state.ffmpeg_hw_enabled {
if input_format != PixelFormat::Mjpeg {
return Err(AppError::VideoError(
"FFmpeg HW pipeline requires MJPEG input".to_string(),
));
}
let pipeline = state.ffmpeg_hw_pipeline.as_mut().ok_or_else(|| {
AppError::VideoError("FFmpeg HW pipeline not initialized".to_string())
})?;
if self.keyframe_requested.swap(false, Ordering::AcqRel) {
pipeline.request_keyframe();
debug!("[Pipeline] FFmpeg HW keyframe requested");
}
let packet = pipeline.encode(raw_frame, pts_ms).map_err(|e| {
let detail = if e.is_empty() { ffmpeg_hw_last_error() } else { e };
AppError::VideoError(format!("FFmpeg HW encode failed: {}", detail))
})?;
if let Some((data, is_keyframe)) = packet {
let sequence = self.sequence.fetch_add(1, Ordering::Relaxed) + 1;
return Ok(Some(EncodedVideoFrame {
data: Bytes::from(data),
pts_ms,
is_keyframe,
sequence,
duration: Duration::from_millis(1000 / fps as u64),
codec,
}));
}
return Ok(None);
}
let decoded_buf = if input_format.is_compressed() {
let decoder = state.mjpeg_decoder.as_mut().ok_or_else(|| {
AppError::VideoError("MJPEG decoder not initialized".to_string())
})?;
let decoded = decoder.decode(raw_frame)?;
Some(decoded)
} else {
None
};
let raw_frame = decoded_buf.as_deref().unwrap_or(raw_frame);
// Debug log for H265
if codec == VideoEncoderType::H265 && frame_count % 30 == 1 {
debug!(
@@ -1159,12 +1539,9 @@ impl SharedVideoPipeline {
);
}
let mut nv12_converter = self.nv12_converter.lock().await;
let mut yuv420p_converter = self.yuv420p_converter.lock().await;
let needs_yuv420p = self.encoder_needs_yuv420p.load(Ordering::Acquire);
let mut encoder_guard = self.encoder.lock().await;
let encoder = encoder_guard
let needs_yuv420p = state.encoder_needs_yuv420p;
let encoder = state
.encoder
.as_mut()
.ok_or_else(|| AppError::VideoError("Encoder not initialized".to_string()))?;
@@ -1174,16 +1551,16 @@ impl SharedVideoPipeline {
debug!("[Pipeline] Keyframe will be generated for this frame");
}
let encode_result = if needs_yuv420p && yuv420p_converter.is_some() {
let encode_result = if needs_yuv420p && state.yuv420p_converter.is_some() {
// Software encoder with direct input conversion to YUV420P
let conv = yuv420p_converter.as_mut().unwrap();
let conv = state.yuv420p_converter.as_mut().unwrap();
let yuv420p_data = conv
.convert(raw_frame)
.map_err(|e| AppError::VideoError(format!("YUV420P conversion failed: {}", e)))?;
encoder.encode_raw(yuv420p_data, pts_ms)
} else if nv12_converter.is_some() {
} else if state.nv12_converter.is_some() {
// Hardware encoder with input conversion to NV12
let conv = nv12_converter.as_mut().unwrap();
let conv = state.nv12_converter.as_mut().unwrap();
let nv12_data = conv
.convert(raw_frame)
.map_err(|e| AppError::VideoError(format!("NV12 conversion failed: {}", e)))?;
@@ -1193,10 +1570,6 @@ impl SharedVideoPipeline {
encoder.encode_raw(raw_frame, pts_ms)
};
drop(encoder_guard);
drop(nv12_converter);
drop(yuv420p_converter);
match encode_result {
Ok(frames) => {
if !frames.is_empty() {
@@ -1255,6 +1628,8 @@ impl SharedVideoPipeline {
pub fn stop(&self) {
if *self.running_rx.borrow() {
let _ = self.running.send(false);
self.running_flag.store(false, Ordering::Release);
self.clear_cmd_tx();
info!("Stopping video pipeline");
}
}
@@ -1265,10 +1640,12 @@ impl SharedVideoPipeline {
preset: crate::video::encoder::BitratePreset,
) -> Result<()> {
let bitrate_kbps = preset.bitrate_kbps();
if let Some(ref mut encoder) = *self.encoder.lock().await {
encoder.set_bitrate(bitrate_kbps)?;
self.config.write().await.bitrate_preset = preset;
}
let gop = {
let mut config = self.config.write().await;
config.bitrate_preset = preset;
config.gop_size()
};
self.send_cmd(PipelineCmd::SetBitrate { bitrate_kbps, gop });
Ok(())
}

View File

@@ -135,7 +135,8 @@ impl VideoStreamManager {
/// Set event bus for notifications
pub async fn set_event_bus(&self, events: Arc<EventBus>) {
*self.events.write().await = Some(events);
*self.events.write().await = Some(events.clone());
self.webrtc_streamer.set_event_bus(events).await;
}
/// Set configuration store
@@ -199,19 +200,20 @@ impl VideoStreamManager {
}
}
// Always reconnect frame source after initialization
// This ensures WebRTC has the correct frame_tx from the current capturer
if let Some(frame_tx) = self.streamer.frame_sender().await {
// Synchronize WebRTC config with actual capture format
let (format, resolution, fps) = self.streamer.current_video_config().await;
info!(
"Reconnecting frame source to WebRTC after init: {}x{} {:?} @ {}fps (receiver_count={})",
resolution.width, resolution.height, format, fps, frame_tx.receiver_count()
);
// Configure WebRTC capture source after initialization
let (device_path, resolution, format, fps, jpeg_quality) =
self.streamer.current_capture_config().await;
info!(
"WebRTC capture config after init: {}x{} {:?} @ {}fps",
resolution.width, resolution.height, format, fps
);
self.webrtc_streamer
.update_video_config(resolution, format, fps)
.await;
if let Some(device_path) = device_path {
self.webrtc_streamer
.update_video_config(resolution, format, fps)
.set_capture_device(device_path, jpeg_quality)
.await;
self.webrtc_streamer.set_video_source(frame_tx).await;
}
Ok(())
@@ -329,7 +331,7 @@ impl VideoStreamManager {
/// Ensure video capture is running (for WebRTC mode)
async fn ensure_video_capture_running(self: &Arc<Self>) -> Result<()> {
// Initialize streamer if not already initialized
// Initialize streamer if not already initialized (for config discovery)
if self.streamer.state().await == StreamerState::Uninitialized {
info!("Initializing video capture for WebRTC (ensure)");
if let Err(e) = self.streamer.init_auto().await {
@@ -338,29 +340,19 @@ impl VideoStreamManager {
}
}
// Start video capture if not streaming
if self.streamer.state().await != StreamerState::Streaming {
info!("Starting video capture for WebRTC (ensure)");
if let Err(e) = self.streamer.start().await {
error!("Failed to start video capture: {}", e);
return Err(e);
}
// Wait a bit for capture to stabilize
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
// Reconnect frame source to WebRTC
if let Some(frame_tx) = self.streamer.frame_sender().await {
let (format, resolution, fps) = self.streamer.current_video_config().await;
info!(
"Reconnecting frame source to WebRTC: {}x{} {:?} @ {}fps",
resolution.width, resolution.height, format, fps
);
let (device_path, resolution, format, fps, jpeg_quality) =
self.streamer.current_capture_config().await;
info!(
"Configuring WebRTC capture: {}x{} {:?} @ {}fps",
resolution.width, resolution.height, format, fps
);
self.webrtc_streamer
.update_video_config(resolution, format, fps)
.await;
if let Some(device_path) = device_path {
self.webrtc_streamer
.update_video_config(resolution, format, fps)
.set_capture_device(device_path, jpeg_quality)
.await;
self.webrtc_streamer.set_video_source(frame_tx).await;
}
Ok(())
@@ -403,7 +395,6 @@ impl VideoStreamManager {
match current_mode {
StreamMode::Mjpeg => {
info!("Stopping MJPEG streaming");
// Only stop MJPEG distribution, keep video capture running for WebRTC
self.streamer.mjpeg_handler().set_offline();
if let Err(e) = self.streamer.stop().await {
warn!("Error stopping MJPEG streamer: {}", e);
@@ -458,10 +449,9 @@ impl VideoStreamManager {
}
}
StreamMode::WebRTC => {
// WebRTC mode: ensure video capture is running for H264 encoding
// WebRTC mode: configure direct capture for encoder pipeline
info!("Activating WebRTC mode");
// Initialize streamer if not already initialized
if self.streamer.state().await == StreamerState::Uninitialized {
info!("Initializing video capture for WebRTC");
if let Err(e) = self.streamer.init_auto().await {
@@ -470,77 +460,32 @@ impl VideoStreamManager {
}
}
// Auto-switch to non-compressed format if current format is MJPEG/JPEG
if let Some(device) = self.streamer.current_device().await {
let (current_format, resolution, fps) =
self.streamer.current_video_config().await;
if current_format.is_compressed() {
let available_formats: Vec<PixelFormat> =
device.formats.iter().map(|f| f.format).collect();
// Determine if using hardware encoding
let is_hardware = self.webrtc_streamer.is_hardware_encoding().await;
if let Some(recommended) =
PixelFormat::recommended_for_encoding(&available_formats, is_hardware)
{
info!(
"Auto-switching from {:?} to {:?} for WebRTC encoding (hardware={})",
current_format, recommended, is_hardware
);
let device_path = device.path.to_string_lossy().to_string();
if let Err(e) = self
.streamer
.apply_video_config(&device_path, recommended, resolution, fps)
.await
{
warn!("Failed to auto-switch format for WebRTC: {}, keeping current format", e);
}
}
}
}
// Start video capture if not streaming
if self.streamer.state().await != StreamerState::Streaming {
info!("Starting video capture for WebRTC");
if let Err(e) = self.streamer.start().await {
error!("Failed to start video capture for WebRTC: {}", e);
return Err(e);
}
}
// Wait a bit for capture to stabilize
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
// Connect frame source to WebRTC with correct format
if let Some(frame_tx) = self.streamer.frame_sender().await {
// Synchronize WebRTC config with actual capture format
let (format, resolution, fps) = self.streamer.current_video_config().await;
info!(
"Connecting frame source to WebRTC pipeline: {}x{} {:?} @ {}fps",
resolution.width, resolution.height, format, fps
);
self.webrtc_streamer
.update_video_config(resolution, format, fps)
.await;
self.webrtc_streamer.set_video_source(frame_tx).await;
// Publish WebRTCReady event - frame source is now connected
let codec = self.webrtc_streamer.current_video_codec().await;
let is_hardware = self.webrtc_streamer.is_hardware_encoding().await;
self.publish_event(SystemEvent::WebRTCReady {
transition_id: Some(transition_id.clone()),
codec: codec_to_string(codec),
hardware: is_hardware,
})
let (device_path, resolution, format, fps, jpeg_quality) =
self.streamer.current_capture_config().await;
info!(
"Configuring WebRTC capture pipeline: {}x{} {:?} @ {}fps",
resolution.width, resolution.height, format, fps
);
self.webrtc_streamer
.update_video_config(resolution, format, fps)
.await;
if let Some(device_path) = device_path {
self.webrtc_streamer
.set_capture_device(device_path, jpeg_quality)
.await;
} else {
warn!(
"No frame source available for WebRTC - sessions may fail to receive video"
);
warn!("No capture device configured for WebRTC");
}
let codec = self.webrtc_streamer.current_video_codec().await;
let is_hardware = self.webrtc_streamer.is_hardware_encoding().await;
self.publish_event(SystemEvent::WebRTCReady {
transition_id: Some(transition_id.clone()),
codec: codec_to_string(codec),
hardware: is_hardware,
})
.await;
info!("WebRTC mode activated (sessions created on-demand)");
}
}
@@ -587,36 +532,34 @@ impl VideoStreamManager {
.update_video_config(resolution, format, fps)
.await;
// Restart video capture for WebRTC (it was stopped during config change)
info!("Restarting video capture for WebRTC after config change");
if let Err(e) = self.streamer.start().await {
error!("Failed to restart video capture for WebRTC: {}", e);
return Err(e);
let (device_path, actual_resolution, actual_format, actual_fps, jpeg_quality) =
self.streamer.current_capture_config().await;
if actual_format != format || actual_resolution != resolution || actual_fps != fps {
info!(
"Actual capture config differs from requested, updating WebRTC: {}x{} {:?} @ {}fps",
actual_resolution.width, actual_resolution.height, actual_format, actual_fps
);
self.webrtc_streamer
.update_video_config(actual_resolution, actual_format, actual_fps)
.await;
}
// Wait a bit for capture to stabilize
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
// Reconnect frame source with the new capturer
if let Some(frame_tx) = self.streamer.frame_sender().await {
// Note: update_video_config was already called above with the requested config,
// but verify that actual capture matches
let (actual_format, actual_resolution, actual_fps) =
self.streamer.current_video_config().await;
if actual_format != format || actual_resolution != resolution || actual_fps != fps {
info!(
"Actual capture config differs from requested, updating WebRTC: {}x{} {:?} @ {}fps",
actual_resolution.width, actual_resolution.height, actual_format, actual_fps
);
self.webrtc_streamer
.update_video_config(actual_resolution, actual_format, actual_fps)
.await;
}
info!("Reconnecting frame source to WebRTC after config change");
self.webrtc_streamer.set_video_source(frame_tx).await;
if let Some(device_path) = device_path {
info!("Configuring direct capture for WebRTC after config change");
self.webrtc_streamer
.set_capture_device(device_path, jpeg_quality)
.await;
} else {
warn!("No frame source available after config change");
warn!("No capture device configured for WebRTC after config change");
}
let codec = self.webrtc_streamer.current_video_codec().await;
let is_hardware = self.webrtc_streamer.is_hardware_encoding().await;
self.publish_event(SystemEvent::WebRTCReady {
transition_id: None,
codec: codec_to_string(codec),
hardware: is_hardware,
})
.await;
}
Ok(())
@@ -631,22 +574,23 @@ impl VideoStreamManager {
self.streamer.start().await?;
}
StreamMode::WebRTC => {
// Ensure video capture is running
// Ensure device is initialized for config discovery
if self.streamer.state().await == StreamerState::Uninitialized {
self.streamer.init_auto().await?;
}
if self.streamer.state().await != StreamerState::Streaming {
self.streamer.start().await?;
}
// Connect frame source with correct format
if let Some(frame_tx) = self.streamer.frame_sender().await {
// Synchronize WebRTC config with actual capture format
let (format, resolution, fps) = self.streamer.current_video_config().await;
// Synchronize WebRTC config with current capture config
let (device_path, resolution, format, fps, jpeg_quality) =
self.streamer.current_capture_config().await;
self.webrtc_streamer
.update_video_config(resolution, format, fps)
.await;
if let Some(device_path) = device_path {
self.webrtc_streamer
.update_video_config(resolution, format, fps)
.set_capture_device(device_path, jpeg_quality)
.await;
self.webrtc_streamer.set_video_source(frame_tx).await;
} else {
warn!("No capture device configured for WebRTC");
}
}
}
@@ -764,13 +708,6 @@ impl VideoStreamManager {
self.streamer.is_streaming().await
}
/// Get frame sender for video frames
pub async fn frame_sender(
&self,
) -> Option<tokio::sync::broadcast::Sender<crate::video::frame::VideoFrame>> {
self.streamer.frame_sender().await
}
/// Subscribe to encoded video frames from the shared video pipeline
///
/// This allows RustDesk (and other consumers) to receive H264/H265/VP8/VP9
@@ -781,10 +718,10 @@ impl VideoStreamManager {
/// Returns None if video capture cannot be started or pipeline creation fails.
pub async fn subscribe_encoded_frames(
&self,
) -> Option<
tokio::sync::broadcast::Receiver<crate::video::shared_video_pipeline::EncodedVideoFrame>,
> {
// 1. Ensure video capture is initialized
) -> Option<tokio::sync::mpsc::Receiver<std::sync::Arc<
crate::video::shared_video_pipeline::EncodedVideoFrame,
>>> {
// 1. Ensure video capture is initialized (for config discovery)
if self.streamer.state().await == StreamerState::Uninitialized {
tracing::info!("Initializing video capture for encoded frame subscription");
if let Err(e) = self.streamer.init_auto().await {
@@ -796,28 +733,9 @@ impl VideoStreamManager {
}
}
// 2. Ensure video capture is running (streaming)
if self.streamer.state().await != StreamerState::Streaming {
tracing::info!("Starting video capture for encoded frame subscription");
if let Err(e) = self.streamer.start().await {
tracing::error!("Failed to start video capture for encoded frames: {}", e);
return None;
}
// Wait for capture to stabilize
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
// 3. Get frame sender from running capture
let frame_tx = match self.streamer.frame_sender().await {
Some(tx) => tx,
None => {
tracing::warn!("Cannot subscribe to encoded frames: no frame sender available");
return None;
}
};
// 4. Synchronize WebRTC config with actual capture format
let (format, resolution, fps) = self.streamer.current_video_config().await;
// 2. Synchronize WebRTC config with capture config
let (device_path, resolution, format, fps, jpeg_quality) =
self.streamer.current_capture_config().await;
tracing::info!(
"Connecting encoded frame subscription: {}x{} {:?} @ {}fps",
resolution.width,
@@ -828,14 +746,17 @@ impl VideoStreamManager {
self.webrtc_streamer
.update_video_config(resolution, format, fps)
.await;
if let Some(device_path) = device_path {
self.webrtc_streamer
.set_capture_device(device_path, jpeg_quality)
.await;
} else {
tracing::warn!("No capture device configured for encoded frames");
return None;
}
// 5. Use WebRtcStreamer to ensure the shared video pipeline is running
// This will create the pipeline if needed
match self
.webrtc_streamer
.ensure_video_pipeline_for_external(frame_tx)
.await
{
// 3. Use WebRtcStreamer to ensure the shared video pipeline is running
match self.webrtc_streamer.ensure_video_pipeline_for_external().await {
Ok(pipeline) => Some(pipeline.subscribe()),
Err(e) => {
tracing::error!("Failed to start shared video pipeline: {}", e);

View File

@@ -4,17 +4,28 @@
//! managing the lifecycle of the capture thread and MJPEG/WebRTC distribution.
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::Arc;
use tokio::sync::{broadcast, RwLock};
use tokio::sync::RwLock;
use tracing::{debug, error, info, trace, warn};
use super::capture::{CaptureConfig, CaptureState, VideoCapturer};
use super::device::{enumerate_devices, find_best_device, VideoDeviceInfo};
use super::format::{PixelFormat, Resolution};
use super::frame::VideoFrame;
use super::frame::{FrameBuffer, FrameBufferPool, VideoFrame};
use crate::error::{AppError, Result};
use crate::events::{EventBus, SystemEvent};
use crate::stream::MjpegStreamHandler;
use v4l::buffer::Type as BufferType;
use v4l::io::traits::CaptureStream;
use v4l::prelude::*;
use v4l::video::capture::Parameters;
use v4l::video::Capture;
use v4l::Format;
/// Minimum valid frame size for capture
const MIN_CAPTURE_FRAME_SIZE: usize = 128;
/// Validate JPEG header every N frames to reduce overhead
const JPEG_VALIDATE_INTERVAL: u64 = 30;
/// Streamer configuration
#[derive(Debug, Clone)]
@@ -65,11 +76,14 @@ pub enum StreamerState {
/// Video streamer service
pub struct Streamer {
config: RwLock<StreamerConfig>,
capturer: RwLock<Option<Arc<VideoCapturer>>>,
mjpeg_handler: Arc<MjpegStreamHandler>,
current_device: RwLock<Option<VideoDeviceInfo>>,
state: RwLock<StreamerState>,
start_lock: tokio::sync::Mutex<()>,
direct_stop: AtomicBool,
direct_active: AtomicBool,
direct_handle: tokio::sync::Mutex<Option<tokio::task::JoinHandle<()>>>,
current_fps: AtomicU32,
/// Event bus for broadcasting state changes (optional)
events: RwLock<Option<Arc<EventBus>>>,
/// Last published state (for change detection)
@@ -94,11 +108,14 @@ impl Streamer {
pub fn new() -> Arc<Self> {
Arc::new(Self {
config: RwLock::new(StreamerConfig::default()),
capturer: RwLock::new(None),
mjpeg_handler: Arc::new(MjpegStreamHandler::new()),
current_device: RwLock::new(None),
state: RwLock::new(StreamerState::Uninitialized),
start_lock: tokio::sync::Mutex::new(()),
direct_stop: AtomicBool::new(false),
direct_active: AtomicBool::new(false),
direct_handle: tokio::sync::Mutex::new(None),
current_fps: AtomicU32::new(0),
events: RwLock::new(None),
last_published_state: RwLock::new(None),
config_changing: std::sync::atomic::AtomicBool::new(false),
@@ -114,11 +131,14 @@ impl Streamer {
pub fn with_config(config: StreamerConfig) -> Arc<Self> {
Arc::new(Self {
config: RwLock::new(config),
capturer: RwLock::new(None),
mjpeg_handler: Arc::new(MjpegStreamHandler::new()),
current_device: RwLock::new(None),
state: RwLock::new(StreamerState::Uninitialized),
start_lock: tokio::sync::Mutex::new(()),
direct_stop: AtomicBool::new(false),
direct_active: AtomicBool::new(false),
direct_handle: tokio::sync::Mutex::new(None),
current_fps: AtomicU32::new(0),
events: RwLock::new(None),
last_published_state: RwLock::new(None),
config_changing: std::sync::atomic::AtomicBool::new(false),
@@ -176,20 +196,6 @@ impl Streamer {
self.mjpeg_handler.clone()
}
/// Get frame sender for WebRTC integration
/// Returns None if no capturer is initialized
pub async fn frame_sender(&self) -> Option<broadcast::Sender<VideoFrame>> {
let capturer = self.capturer.read().await;
capturer.as_ref().map(|c| c.frame_sender())
}
/// Subscribe to video frames
/// Returns None if no capturer is initialized
pub async fn subscribe_frames(&self) -> Option<broadcast::Receiver<VideoFrame>> {
let capturer = self.capturer.read().await;
capturer.as_ref().map(|c| c.subscribe())
}
/// Get current device info
pub async fn current_device(&self) -> Option<VideoDeviceInfo> {
self.current_device.read().await.clone()
@@ -201,6 +207,20 @@ impl Streamer {
(config.format, config.resolution, config.fps)
}
/// Get current capture configuration for direct pipelines
pub async fn current_capture_config(
&self,
) -> (Option<PathBuf>, Resolution, PixelFormat, u32, u8) {
let config = self.config.read().await;
(
config.device_path.clone(),
config.resolution,
config.format,
config.fps,
config.jpeg_quality,
)
}
/// List available video devices
pub async fn list_devices(&self) -> Result<Vec<VideoDeviceInfo>> {
enumerate_devices()
@@ -278,18 +298,11 @@ impl Streamer {
// Give clients time to receive the disconnect signal and close their connections
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
// Stop existing capturer and wait for device release
{
// Take ownership of the old capturer to ensure it's dropped
let old_capturer = self.capturer.write().await.take();
if let Some(capturer) = old_capturer {
info!("Stopping existing capture before applying new config...");
if let Err(e) = capturer.stop().await {
warn!("Error stopping old capturer: {}", e);
}
// Explicitly drop the capturer to release V4L2 resources
drop(capturer);
}
// Stop active capture and wait for device release
if self.direct_active.load(Ordering::SeqCst) {
info!("Stopping existing capture before applying new config...");
self.stop().await?;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
// Update config
@@ -301,18 +314,6 @@ impl Streamer {
cfg.fps = fps;
}
// Recreate capturer
let capture_config = CaptureConfig {
device_path: device.path.clone(),
resolution,
format,
fps,
jpeg_quality: self.config.read().await.jpeg_quality,
..Default::default()
};
let capturer = Arc::new(VideoCapturer::new(capture_config));
*self.capturer.write().await = Some(capturer.clone());
*self.current_device.write().await = Some(device.clone());
*self.state.write().await = StreamerState::Ready;
@@ -374,21 +375,6 @@ impl Streamer {
// Store device info
*self.current_device.write().await = Some(device.clone());
// Create capturer
let config = self.config.read().await;
let capture_config = CaptureConfig {
device_path: device.path.clone(),
resolution: config.resolution,
format: config.format,
fps: config.fps,
jpeg_quality: config.jpeg_quality,
..Default::default()
};
drop(config);
let capturer = Arc::new(VideoCapturer::new(capture_config));
*self.capturer.write().await = Some(capturer);
*self.state.write().await = StreamerState::Ready;
info!("Streamer initialized: {} @ {}", format, resolution);
@@ -445,43 +431,30 @@ impl Streamer {
.ok_or_else(|| AppError::VideoError("No resolutions available".to_string()))
}
/// Restart the capturer only (for recovery - doesn't spawn new monitor)
///
/// This is a simpler version of start() used during device recovery.
/// It doesn't spawn a new state monitor since the existing one is still active.
async fn restart_capturer(&self) -> Result<()> {
let capturer = self.capturer.read().await;
let capturer = capturer
.as_ref()
.ok_or_else(|| AppError::VideoError("Capturer not initialized".to_string()))?;
/// Restart capture for recovery (direct capture path)
async fn restart_capture(self: &Arc<Self>) -> Result<()> {
self.direct_stop.store(false, Ordering::SeqCst);
self.start().await?;
// Start capture
capturer.start().await?;
// Set MJPEG handler online
self.mjpeg_handler.set_online();
// Start frame distribution task
let mjpeg_handler = self.mjpeg_handler.clone();
let mut frame_rx = capturer.subscribe();
tokio::spawn(async move {
debug!("Recovery frame distribution task started");
loop {
match frame_rx.recv().await {
Ok(frame) => {
mjpeg_handler.update_frame(frame);
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
debug!("Frame channel closed");
break;
}
// Wait briefly for the capture thread to initialize the device.
// If it fails immediately, the state will flip to Error/DeviceLost.
for _ in 0..5 {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let state = *self.state.read().await;
match state {
StreamerState::Streaming | StreamerState::NoSignal => return Ok(()),
StreamerState::Error | StreamerState::DeviceLost => {
return Err(AppError::VideoError(
"Failed to restart capture".to_string(),
))
}
_ => {}
}
});
}
Ok(())
Err(AppError::VideoError(
"Capture restart timed out".to_string(),
))
}
/// Start streaming
@@ -498,138 +471,26 @@ impl Streamer {
self.init_auto().await?;
}
let capturer = self.capturer.read().await;
let capturer = capturer
.as_ref()
.ok_or_else(|| AppError::VideoError("Capturer not initialized".to_string()))?;
let device = self
.current_device
.read()
.await
.clone()
.ok_or_else(|| AppError::VideoError("No video device configured".to_string()))?;
// Start capture
capturer.start().await?;
let config = self.config.read().await.clone();
self.direct_stop.store(false, Ordering::SeqCst);
self.direct_active.store(true, Ordering::SeqCst);
// Set MJPEG handler online before starting frame distribution
// This is important after config changes where disconnect_all_clients() set it offline
let streamer = self.clone();
let handle = tokio::task::spawn_blocking(move || {
streamer.run_direct_capture(device.path, config);
});
*self.direct_handle.lock().await = Some(handle);
// Set MJPEG handler online before starting capture
self.mjpeg_handler.set_online();
// Start frame distribution task
let mjpeg_handler = self.mjpeg_handler.clone();
let mut frame_rx = capturer.subscribe();
let state_ref = Arc::downgrade(self);
let frame_tx = capturer.frame_sender();
tokio::spawn(async move {
info!("Frame distribution task started");
// Track when we started having no active consumers
let mut idle_since: Option<std::time::Instant> = None;
const IDLE_STOP_DELAY_SECS: u64 = 5;
loop {
match frame_rx.recv().await {
Ok(frame) => {
mjpeg_handler.update_frame(frame);
// Check if there are any active consumers:
// - MJPEG clients via mjpeg_handler
// - Other subscribers (WebRTC/RustDesk) via frame_tx receiver_count
// Note: receiver_count includes this task, so > 1 means other subscribers
let mjpeg_clients = mjpeg_handler.client_count();
let other_subscribers = frame_tx.receiver_count().saturating_sub(1);
if mjpeg_clients == 0 && other_subscribers == 0 {
if idle_since.is_none() {
idle_since = Some(std::time::Instant::now());
trace!("No active video consumers, starting idle timer");
} else if let Some(since) = idle_since {
if since.elapsed().as_secs() >= IDLE_STOP_DELAY_SECS {
info!(
"No active video consumers for {}s, stopping frame distribution",
IDLE_STOP_DELAY_SECS
);
// Stop the streamer
if let Some(streamer) = state_ref.upgrade() {
if let Err(e) = streamer.stop().await {
warn!(
"Failed to stop streamer during idle cleanup: {}",
e
);
}
}
break;
}
}
} else {
// Reset idle timer when we have consumers
if idle_since.is_some() {
trace!("Video consumers active, resetting idle timer");
idle_since = None;
}
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
debug!("Frame channel closed");
break;
}
}
// Check if streamer still exists
if state_ref.upgrade().is_none() {
break;
}
}
info!("Frame distribution task ended");
});
// Monitor capture state
let mut state_rx = capturer.state_watch();
let state_ref = Arc::downgrade(self);
let mjpeg_handler = self.mjpeg_handler.clone();
tokio::spawn(async move {
while state_rx.changed().await.is_ok() {
let capture_state = *state_rx.borrow();
match capture_state {
CaptureState::Running => {
if let Some(streamer) = state_ref.upgrade() {
*streamer.state.write().await = StreamerState::Streaming;
}
}
CaptureState::NoSignal => {
mjpeg_handler.set_offline();
if let Some(streamer) = state_ref.upgrade() {
*streamer.state.write().await = StreamerState::NoSignal;
}
}
CaptureState::Stopped => {
mjpeg_handler.set_offline();
if let Some(streamer) = state_ref.upgrade() {
*streamer.state.write().await = StreamerState::Ready;
}
}
CaptureState::Error => {
mjpeg_handler.set_offline();
if let Some(streamer) = state_ref.upgrade() {
*streamer.state.write().await = StreamerState::Error;
}
}
CaptureState::DeviceLost => {
mjpeg_handler.set_offline();
if let Some(streamer) = state_ref.upgrade() {
*streamer.state.write().await = StreamerState::DeviceLost;
// Start device recovery task (fire and forget)
let streamer_clone = Arc::clone(&streamer);
tokio::spawn(async move {
streamer_clone.start_device_recovery_internal().await;
});
}
}
CaptureState::Starting => {
// Starting state - device is initializing, no action needed
}
}
}
});
// Start background tasks only once per Streamer instance
// Use compare_exchange to atomically check and set the flag
if self
@@ -735,9 +596,11 @@ impl Streamer {
/// Stop streaming
pub async fn stop(&self) -> Result<()> {
if let Some(capturer) = self.capturer.read().await.as_ref() {
capturer.stop().await?;
self.direct_stop.store(true, Ordering::SeqCst);
if let Some(handle) = self.direct_handle.lock().await.take() {
let _ = handle.await;
}
self.direct_active.store(false, Ordering::SeqCst);
self.mjpeg_handler.set_offline();
*self.state.write().await = StreamerState::Ready;
@@ -749,6 +612,258 @@ impl Streamer {
Ok(())
}
/// Direct capture loop for MJPEG mode (single loop, no broadcast)
fn run_direct_capture(self: Arc<Self>, device_path: PathBuf, config: StreamerConfig) {
const MAX_RETRIES: u32 = 5;
const RETRY_DELAY_MS: u64 = 200;
const IDLE_STOP_DELAY_SECS: u64 = 5;
const BUFFER_COUNT: u32 = 2;
let handle = tokio::runtime::Handle::current();
let mut last_state = StreamerState::Streaming;
let mut set_state = |new_state: StreamerState| {
if new_state != last_state {
handle.block_on(async {
*self.state.write().await = new_state;
self.publish_event(self.current_state_event().await).await;
});
last_state = new_state;
}
};
let mut device_opt: Option<Device> = None;
let mut format_opt: Option<Format> = None;
let mut last_error: Option<String> = None;
for attempt in 0..MAX_RETRIES {
if self.direct_stop.load(Ordering::Relaxed) {
self.direct_active.store(false, Ordering::SeqCst);
return;
}
let device = match Device::with_path(&device_path) {
Ok(d) => d,
Err(e) => {
let err_str = e.to_string();
if err_str.contains("busy") || err_str.contains("resource") {
warn!(
"Device busy on attempt {}/{}, retrying in {}ms...",
attempt + 1,
MAX_RETRIES,
RETRY_DELAY_MS
);
std::thread::sleep(std::time::Duration::from_millis(RETRY_DELAY_MS));
last_error = Some(err_str);
continue;
}
last_error = Some(err_str);
break;
}
};
let requested = Format::new(
config.resolution.width,
config.resolution.height,
config.format.to_fourcc(),
);
match device.set_format(&requested) {
Ok(actual) => {
device_opt = Some(device);
format_opt = Some(actual);
break;
}
Err(e) => {
let err_str = e.to_string();
if err_str.contains("busy") || err_str.contains("resource") {
warn!(
"Device busy on set_format attempt {}/{}, retrying in {}ms...",
attempt + 1,
MAX_RETRIES,
RETRY_DELAY_MS
);
std::thread::sleep(std::time::Duration::from_millis(RETRY_DELAY_MS));
last_error = Some(err_str);
continue;
}
last_error = Some(err_str);
break;
}
}
}
let (device, actual_format) = match (device_opt, format_opt) {
(Some(d), Some(f)) => (d, f),
_ => {
error!(
"Failed to open device {:?}: {}",
device_path,
last_error.unwrap_or_else(|| "unknown error".to_string())
);
self.mjpeg_handler.set_offline();
set_state(StreamerState::Error);
self.direct_active.store(false, Ordering::SeqCst);
self.current_fps.store(0, Ordering::Relaxed);
return;
}
};
info!(
"Capture format: {}x{} {:?} stride={}",
actual_format.width, actual_format.height, actual_format.fourcc, actual_format.stride
);
let resolution = Resolution::new(actual_format.width, actual_format.height);
let pixel_format =
PixelFormat::from_fourcc(actual_format.fourcc).unwrap_or(config.format);
if config.fps > 0 {
if let Err(e) = device.set_params(&Parameters::with_fps(config.fps)) {
warn!("Failed to set hardware FPS: {}", e);
}
}
let mut stream =
match MmapStream::with_buffers(&device, BufferType::VideoCapture, BUFFER_COUNT) {
Ok(s) => s,
Err(e) => {
error!("Failed to create capture stream: {}", e);
self.mjpeg_handler.set_offline();
set_state(StreamerState::Error);
self.direct_active.store(false, Ordering::SeqCst);
self.current_fps.store(0, Ordering::Relaxed);
return;
}
};
let buffer_pool = Arc::new(FrameBufferPool::new(BUFFER_COUNT.max(4) as usize));
let mut signal_present = true;
let mut sequence: u64 = 0;
let mut validate_counter: u64 = 0;
let mut idle_since: Option<std::time::Instant> = None;
let mut fps_frame_count: u64 = 0;
let mut last_fps_time = std::time::Instant::now();
while !self.direct_stop.load(Ordering::Relaxed) {
let mjpeg_clients = self.mjpeg_handler.client_count();
if mjpeg_clients == 0 {
if idle_since.is_none() {
idle_since = Some(std::time::Instant::now());
trace!("No active video consumers, starting idle timer");
} else if let Some(since) = idle_since {
if since.elapsed().as_secs() >= IDLE_STOP_DELAY_SECS {
info!(
"No active video consumers for {}s, stopping capture",
IDLE_STOP_DELAY_SECS
);
self.mjpeg_handler.set_offline();
set_state(StreamerState::Ready);
break;
}
}
} else if idle_since.is_some() {
trace!("Video consumers active, resetting idle timer");
idle_since = None;
}
let (buf, meta) = match stream.next() {
Ok(frame_data) => frame_data,
Err(e) => {
if e.kind() == std::io::ErrorKind::TimedOut {
if signal_present {
signal_present = false;
self.mjpeg_handler.set_offline();
set_state(StreamerState::NoSignal);
self.current_fps.store(0, Ordering::Relaxed);
fps_frame_count = 0;
last_fps_time = std::time::Instant::now();
}
std::thread::sleep(std::time::Duration::from_millis(100));
continue;
}
let is_device_lost = match e.raw_os_error() {
Some(6) => true, // ENXIO
Some(19) => true, // ENODEV
Some(5) => true, // EIO
Some(32) => true, // EPIPE
Some(108) => true, // ESHUTDOWN
_ => false,
};
if is_device_lost {
error!("Video device lost: {} - {}", device_path.display(), e);
self.mjpeg_handler.set_offline();
handle.block_on(async {
*self.last_lost_device.write().await =
Some(device_path.display().to_string());
*self.last_lost_reason.write().await = Some(e.to_string());
});
set_state(StreamerState::DeviceLost);
handle.block_on(async {
let streamer = Arc::clone(&self);
tokio::spawn(async move {
streamer.start_device_recovery_internal().await;
});
});
break;
}
error!("Capture error: {}", e);
continue;
}
};
let frame_size = meta.bytesused as usize;
if frame_size < MIN_CAPTURE_FRAME_SIZE {
continue;
}
validate_counter = validate_counter.wrapping_add(1);
if pixel_format.is_compressed()
&& validate_counter % JPEG_VALIDATE_INTERVAL == 0
&& !VideoFrame::is_valid_jpeg_bytes(&buf[..frame_size])
{
continue;
}
let mut owned = buffer_pool.take(frame_size);
owned.resize(frame_size, 0);
owned[..frame_size].copy_from_slice(&buf[..frame_size]);
let frame = VideoFrame::from_pooled(
Arc::new(FrameBuffer::new(owned, Some(buffer_pool.clone()))),
resolution,
pixel_format,
actual_format.stride,
sequence,
);
sequence = sequence.wrapping_add(1);
if !signal_present {
signal_present = true;
self.mjpeg_handler.set_online();
set_state(StreamerState::Streaming);
}
self.mjpeg_handler.update_frame(frame);
fps_frame_count += 1;
let fps_elapsed = last_fps_time.elapsed();
if fps_elapsed >= std::time::Duration::from_secs(1) {
let current_fps = fps_frame_count as f32 / fps_elapsed.as_secs_f32();
fps_frame_count = 0;
last_fps_time = std::time::Instant::now();
self.current_fps
.store((current_fps * 100.0) as u32, Ordering::Relaxed);
}
}
self.direct_active.store(false, Ordering::SeqCst);
self.current_fps.store(0, Ordering::Relaxed);
}
/// Check if streaming
pub async fn is_streaming(&self) -> bool {
self.state().await == StreamerState::Streaming
@@ -756,14 +871,8 @@ impl Streamer {
/// Get stream statistics
pub async fn stats(&self) -> StreamerStats {
let capturer = self.capturer.read().await;
let capture_stats = if let Some(c) = capturer.as_ref() {
Some(c.stats().await)
} else {
None
};
let config = self.config.read().await;
let fps = self.current_fps.load(Ordering::Relaxed) as f32 / 100.0;
StreamerStats {
state: self.state().await,
@@ -772,15 +881,7 @@ impl Streamer {
resolution: Some((config.resolution.width, config.resolution.height)),
clients: self.mjpeg_handler.client_count(),
target_fps: config.fps,
fps: capture_stats.as_ref().map(|s| s.current_fps).unwrap_or(0.0),
frames_captured: capture_stats
.as_ref()
.map(|s| s.frames_captured)
.unwrap_or(0),
frames_dropped: capture_stats
.as_ref()
.map(|s| s.frames_dropped)
.unwrap_or(0),
fps,
}
}
@@ -829,23 +930,23 @@ impl Streamer {
return;
}
// Get last lost device info from capturer
let (device, reason) = {
let capturer = self.capturer.read().await;
if let Some(cap) = capturer.as_ref() {
cap.last_error().unwrap_or_else(|| {
let device_path = self
.current_device
.blocking_read()
.as_ref()
.map(|d| d.path.display().to_string())
.unwrap_or_else(|| "unknown".to_string());
(device_path, "Device lost".to_string())
})
} else {
("unknown".to_string(), "Device lost".to_string())
}
// Get last lost device info (from direct capture)
let device = if let Some(device) = self.last_lost_device.read().await.clone() {
device
} else {
self.current_device
.read()
.await
.as_ref()
.map(|d| d.path.display().to_string())
.unwrap_or_else(|| "unknown".to_string())
};
let reason = self
.last_lost_reason
.read()
.await
.clone()
.unwrap_or_else(|| "Device lost".to_string());
// Store error info
*self.last_lost_device.write().await = Some(device.clone());
@@ -908,7 +1009,7 @@ impl Streamer {
}
// Try to restart capture
match streamer.restart_capturer().await {
match streamer.restart_capture().await {
Ok(_) => {
info!(
"Video device {} recovered after {} attempts",
@@ -947,11 +1048,14 @@ impl Default for Streamer {
fn default() -> Self {
Self {
config: RwLock::new(StreamerConfig::default()),
capturer: RwLock::new(None),
mjpeg_handler: Arc::new(MjpegStreamHandler::new()),
current_device: RwLock::new(None),
state: RwLock::new(StreamerState::Uninitialized),
start_lock: tokio::sync::Mutex::new(()),
direct_stop: AtomicBool::new(false),
direct_active: AtomicBool::new(false),
direct_handle: tokio::sync::Mutex::new(None),
current_fps: AtomicU32::new(0),
events: RwLock::new(None),
last_published_state: RwLock::new(None),
config_changing: std::sync::atomic::AtomicBool::new(false),
@@ -976,8 +1080,6 @@ pub struct StreamerStats {
pub target_fps: u32,
/// Current actual FPS
pub fps: f32,
pub frames_captured: u64,
pub frames_dropped: u64,
}
impl serde::Serialize for StreamerState {

View File

@@ -83,7 +83,7 @@ struct VideoSession {
/// Last activity time
last_activity: Instant,
/// Frame receiver
frame_rx: Option<broadcast::Receiver<EncodedVideoFrame>>,
frame_rx: Option<tokio::sync::mpsc::Receiver<std::sync::Arc<EncodedVideoFrame>>>,
/// Stats
frames_received: u64,
bytes_received: u64,
@@ -243,7 +243,7 @@ impl VideoSessionManager {
pub async fn start_session(
&self,
session_id: &str,
) -> Result<broadcast::Receiver<EncodedVideoFrame>> {
) -> Result<tokio::sync::mpsc::Receiver<std::sync::Arc<EncodedVideoFrame>>> {
// Ensure pipeline is running with correct codec
self.ensure_pipeline_for_session(session_id).await?;