feat: 支持 MJPEG 解码与 MSD 目录配置

- FFmpeg/hwcodec 增加 RKMPP MJPEG 解码与 RAM FFI,ARM 构建启用对应解码器
  - 共享视频管线新增 MJPEG 解码路径(RKMPP/TurboJPEG),优化 WebRTC 发送与 MJPEG 去重
  - MSD 配置改为 msd_dir 并自动创建子目录,接口与前端设置同步更新
  - 更新包依赖与版本号
This commit is contained in:
mofeng-git
2026-01-11 16:32:37 +08:00
parent 0f52168e75
commit 01e01430da
30 changed files with 1185 additions and 260 deletions

View File

@@ -202,25 +202,37 @@ impl Default for HidConfig {
pub struct MsdConfig {
/// Enable MSD functionality
pub enabled: bool,
/// Storage path for ISO/IMG images
pub images_path: String,
/// Path for Ventoy bootable drive file
pub drive_path: String,
/// Ventoy drive size in MB (minimum 1024 MB / 1 GB)
pub virtual_drive_size_mb: u32,
/// MSD base directory (absolute path)
pub msd_dir: String,
}
impl Default for MsdConfig {
fn default() -> Self {
Self {
enabled: true,
images_path: "./data/msd/images".to_string(),
drive_path: "./data/msd/ventoy.img".to_string(),
virtual_drive_size_mb: 16 * 1024, // 16GB default
msd_dir: String::new(),
}
}
}
impl MsdConfig {
pub fn msd_dir_path(&self) -> std::path::PathBuf {
std::path::PathBuf::from(&self.msd_dir)
}
pub fn images_dir(&self) -> std::path::PathBuf {
self.msd_dir_path().join("images")
}
pub fn ventoy_dir(&self) -> std::path::PathBuf {
self.msd_dir_path().join("ventoy")
}
pub fn drive_path(&self) -> std::path::PathBuf {
self.ventoy_dir().join("ventoy.img")
}
}
// Re-export ATX types from atx module for configuration
pub use crate::atx::{ActiveLevel, AtxDriverType, AtxKeyConfig, AtxLedConfig};

View File

@@ -65,7 +65,7 @@ struct CliArgs {
#[arg(long, value_name = "FILE", requires = "ssl_cert")]
ssl_key: Option<PathBuf>,
/// Data directory path (default: ./data)
/// Data directory path (default: /etc/one-kvm)
#[arg(short = 'd', long, value_name = "DIR")]
data_dir: Option<PathBuf>,
@@ -104,6 +104,34 @@ async fn main() -> anyhow::Result<()> {
let config_store = ConfigStore::new(&db_path).await?;
let mut config = (*config_store.get()).clone();
// Normalize MSD directory (absolute path under data dir if empty/relative)
let mut msd_dir_updated = false;
if config.msd.msd_dir.trim().is_empty() {
let msd_dir = data_dir.join("msd");
config.msd.msd_dir = msd_dir.to_string_lossy().to_string();
msd_dir_updated = true;
} else if !PathBuf::from(&config.msd.msd_dir).is_absolute() {
let msd_dir = data_dir.join(&config.msd.msd_dir);
tracing::warn!(
"MSD directory is relative, rebasing to {}",
msd_dir.display()
);
config.msd.msd_dir = msd_dir.to_string_lossy().to_string();
msd_dir_updated = true;
}
if msd_dir_updated {
config_store.set(config.clone()).await?;
}
// Ensure MSD directories exist (msd/images, msd/ventoy)
let msd_dir = PathBuf::from(&config.msd.msd_dir);
if let Err(e) = tokio::fs::create_dir_all(msd_dir.join("images")).await {
tracing::warn!("Failed to create MSD images directory: {}", e);
}
if let Err(e) = tokio::fs::create_dir_all(msd_dir.join("ventoy")).await {
tracing::warn!("Failed to create MSD ventoy directory: {}", e);
}
// Apply CLI argument overrides to config (only if explicitly specified)
if let Some(addr) = args.address {
config.web.bind_address = addr;
@@ -344,11 +372,7 @@ async fn main() -> anyhow::Result<()> {
);
}
let controller = MsdController::new(
otg_service.clone(),
&config.msd.images_path,
&config.msd.drive_path,
);
let controller = MsdController::new(otg_service.clone(), config.msd.msd_dir_path());
if let Err(e) = controller.init().await {
tracing::warn!("Failed to initialize MSD controller: {}", e);
None

View File

@@ -32,6 +32,8 @@ pub struct MsdController {
state: RwLock<MsdState>,
/// Images storage path
images_path: PathBuf,
/// Ventoy directory path
ventoy_dir: PathBuf,
/// Virtual drive path
drive_path: PathBuf,
/// Event bus for broadcasting state changes (optional)
@@ -49,19 +51,22 @@ impl MsdController {
///
/// # Parameters
/// * `otg_service` - OTG service for gadget management
/// * `images_path` - Directory path for storing ISO/IMG files
/// * `drive_path` - File path for the virtual FAT32 drive
/// * `msd_dir` - Base directory for MSD storage
pub fn new(
otg_service: Arc<OtgService>,
images_path: impl Into<PathBuf>,
drive_path: impl Into<PathBuf>,
msd_dir: impl Into<PathBuf>,
) -> Self {
let msd_dir = msd_dir.into();
let images_path = msd_dir.join("images");
let ventoy_dir = msd_dir.join("ventoy");
let drive_path = ventoy_dir.join("ventoy.img");
Self {
otg_service,
msd_function: RwLock::new(None),
state: RwLock::new(MsdState::default()),
images_path: images_path.into(),
drive_path: drive_path.into(),
images_path,
ventoy_dir,
drive_path,
events: tokio::sync::RwLock::new(None),
downloads: Arc::new(RwLock::new(HashMap::new())),
operation_lock: Arc::new(RwLock::new(())),
@@ -77,6 +82,9 @@ impl MsdController {
if let Err(e) = std::fs::create_dir_all(&self.images_path) {
warn!("Failed to create images directory: {}", e);
}
if let Err(e) = std::fs::create_dir_all(&self.ventoy_dir) {
warn!("Failed to create ventoy directory: {}", e);
}
// 2. Request MSD function from OtgService
info!("Requesting MSD function from OtgService");
@@ -364,6 +372,11 @@ impl MsdController {
&self.images_path
}
/// Get ventoy directory path
pub fn ventoy_dir(&self) -> &PathBuf {
&self.ventoy_dir
}
/// Get virtual drive path
pub fn drive_path(&self) -> &PathBuf {
&self.drive_path
@@ -588,10 +601,9 @@ mod tests {
async fn test_controller_creation() {
let temp_dir = TempDir::new().unwrap();
let otg_service = Arc::new(OtgService::new());
let images_path = temp_dir.path().join("images");
let drive_path = temp_dir.path().join("ventoy.img");
let msd_dir = temp_dir.path().join("msd");
let controller = MsdController::new(otg_service, &images_path, &drive_path);
let controller = MsdController::new(otg_service, &msd_dir);
// Check that MSD is not initialized (msd_function is None)
let state = controller.state().await;
@@ -604,10 +616,9 @@ mod tests {
async fn test_state_default() {
let temp_dir = TempDir::new().unwrap();
let otg_service = Arc::new(OtgService::new());
let images_path = temp_dir.path().join("images");
let drive_path = temp_dir.path().join("ventoy.img");
let msd_dir = temp_dir.path().join("msd");
let controller = MsdController::new(otg_service, &images_path, &drive_path);
let controller = MsdController::new(otg_service, &msd_dir);
let state = controller.state().await;
assert!(!state.available);

View File

@@ -184,11 +184,22 @@ impl MjpegStreamHandler {
/// Update current frame
pub fn update_frame(&self, frame: VideoFrame) {
// Skip JPEG encoding if no clients are connected (optimization for WebRTC-only mode)
// This avoids unnecessary libyuv conversion when only WebRTC is active
if self.clients.read().is_empty() && !frame.format.is_compressed() {
// Still update the online status and sequence for monitoring purposes
// but skip the expensive JPEG encoding
// Fast path: if no MJPEG clients are connected, do minimal bookkeeping and avoid
// expensive work (JPEG encoding and per-frame dedup hashing).
let has_clients = !self.clients.read().is_empty();
if !has_clients {
self.dropped_same_frames.store(0, Ordering::Relaxed);
self.sequence.fetch_add(1, Ordering::Relaxed);
self.online.store(frame.online, Ordering::SeqCst);
*self.last_frame_ts.write() = Some(Instant::now());
// Keep the latest compressed frame for "instant first frame" when a client connects.
// Avoid retaining large raw buffers when there are no MJPEG clients.
if frame.format.is_compressed() {
self.current_frame.store(Arc::new(Some(frame)));
} else {
self.current_frame.store(Arc::new(None));
}
return;
}
@@ -237,7 +248,7 @@ impl MjpegStreamHandler {
self.dropped_same_frames.store(0, Ordering::Relaxed);
self.sequence.fetch_add(1, Ordering::Relaxed);
self.online.store(true, Ordering::SeqCst);
self.online.store(frame.online, Ordering::SeqCst);
*self.last_frame_ts.write() = Some(Instant::now());
self.current_frame.store(Arc::new(Some(frame)));
@@ -535,9 +546,44 @@ fn frames_are_identical(a: &VideoFrame, b: &VideoFrame) -> bool {
return false;
}
// Compare hashes instead of full binary data
// Hash is computed once and cached in OnceLock for efficiency
// This is much faster than binary comparison for large frames (1080p MJPEG)
// Avoid hashing the whole frame for obviously different frames by sampling a few
// fixed-size windows first. If all samples match, fall back to the cached hash.
let a_data = a.data();
let b_data = b.data();
let len = a_data.len();
// Small frames: direct compare is cheap.
if len <= 256 {
return a_data == b_data;
}
const SAMPLE: usize = 16;
debug_assert!(len == b_data.len());
// Head + tail.
if a_data[..SAMPLE] != b_data[..SAMPLE] {
return false;
}
if a_data[len - SAMPLE..] != b_data[len - SAMPLE..] {
return false;
}
// Two interior samples (quarter + middle) to catch common "same header/footer" cases.
let quarter = len / 4;
let quarter_start = quarter.saturating_sub(SAMPLE / 2);
if a_data[quarter_start..quarter_start + SAMPLE]
!= b_data[quarter_start..quarter_start + SAMPLE]
{
return false;
}
let mid = len / 2;
let mid_start = mid.saturating_sub(SAMPLE / 2);
if a_data[mid_start..mid_start + SAMPLE] != b_data[mid_start..mid_start + SAMPLE] {
return false;
}
// Compare hashes instead of full binary data.
// Hash is computed once and cached in OnceLock for efficiency.
a.get_hash() == b.get_hash()
}

View File

@@ -0,0 +1,95 @@
//! MJPEG decoder using RKMPP via hwcodec (FFmpeg RAM).
use hwcodec::ffmpeg::AVPixelFormat;
use hwcodec::ffmpeg_ram::decode::{DecodeContext, Decoder};
use tracing::warn;
use crate::error::{AppError, Result};
use crate::video::convert::Nv12Converter;
use crate::video::format::Resolution;
pub struct MjpegRkmppDecoder {
decoder: Decoder,
resolution: Resolution,
nv16_to_nv12: Option<Nv12Converter>,
last_pixfmt: Option<AVPixelFormat>,
}
impl MjpegRkmppDecoder {
pub fn new(resolution: Resolution) -> Result<Self> {
let ctx = DecodeContext {
name: "mjpeg_rkmpp".to_string(),
width: resolution.width as i32,
height: resolution.height as i32,
sw_pixfmt: AVPixelFormat::AV_PIX_FMT_NV12,
thread_count: 1,
};
let decoder = Decoder::new(ctx).map_err(|_| {
AppError::VideoError("Failed to create mjpeg_rkmpp decoder".to_string())
})?;
Ok(Self {
decoder,
resolution,
nv16_to_nv12: None,
last_pixfmt: None,
})
}
pub fn decode_to_nv12(&mut self, mjpeg: &[u8]) -> Result<Vec<u8>> {
let frames = self
.decoder
.decode(mjpeg)
.map_err(|e| AppError::VideoError(format!("mjpeg_rkmpp decode failed: {}", e)))?;
if frames.is_empty() {
return Err(AppError::VideoError(
"mjpeg_rkmpp decode returned no frames".to_string(),
));
}
if frames.len() > 1 {
warn!(
"mjpeg_rkmpp decode returned {} frames, using last",
frames.len()
);
}
let frame = frames
.pop()
.ok_or_else(|| AppError::VideoError("mjpeg_rkmpp decode returned empty".to_string()))?;
if frame.width as u32 != self.resolution.width
|| frame.height as u32 != self.resolution.height
{
warn!(
"mjpeg_rkmpp output size {}x{} differs from expected {}x{}",
frame.width, frame.height, self.resolution.width, self.resolution.height
);
}
if let Some(last) = self.last_pixfmt {
if frame.pixfmt != last {
warn!(
"mjpeg_rkmpp output pixfmt changed from {:?} to {:?}",
last, frame.pixfmt
);
}
} else {
self.last_pixfmt = Some(frame.pixfmt);
}
let pixfmt = self.last_pixfmt.unwrap_or(frame.pixfmt);
match pixfmt {
AVPixelFormat::AV_PIX_FMT_NV12 => Ok(frame.data),
AVPixelFormat::AV_PIX_FMT_NV16 => {
if self.nv16_to_nv12.is_none() {
self.nv16_to_nv12 = Some(Nv12Converter::nv16_to_nv12(self.resolution));
}
let conv = self.nv16_to_nv12.as_mut().unwrap();
let nv12 = conv.convert(&frame.data)?;
Ok(nv12.to_vec())
}
other => Err(AppError::VideoError(format!(
"mjpeg_rkmpp output pixfmt {:?} (expected NV12/NV16)",
other
))),
}
}
}

View File

@@ -0,0 +1,54 @@
//! MJPEG decoder using TurboJPEG (software) -> RGB24.
use turbojpeg::{Decompressor, Image, PixelFormat as TJPixelFormat};
use crate::error::{AppError, Result};
use crate::video::format::Resolution;
pub struct MjpegTurboDecoder {
decompressor: Decompressor,
resolution: Resolution,
}
impl MjpegTurboDecoder {
pub fn new(resolution: Resolution) -> Result<Self> {
let decompressor = Decompressor::new().map_err(|e| {
AppError::VideoError(format!("Failed to create turbojpeg decoder: {}", e))
})?;
Ok(Self {
decompressor,
resolution,
})
}
pub fn decode_to_rgb(&mut self, mjpeg: &[u8]) -> Result<Vec<u8>> {
let header = self
.decompressor
.read_header(mjpeg)
.map_err(|e| AppError::VideoError(format!("turbojpeg read_header failed: {}", e)))?;
if header.width as u32 != self.resolution.width
|| header.height as u32 != self.resolution.height
{
return Err(AppError::VideoError(format!(
"turbojpeg size mismatch: {}x{} (expected {}x{})",
header.width, header.height, self.resolution.width, self.resolution.height
)));
}
let pitch = header.width * 3;
let mut image = Image {
pixels: vec![0u8; header.height * pitch],
width: header.width,
pitch,
height: header.height,
format: TJPixelFormat::RGB,
};
self.decompressor
.decompress(mjpeg, image.as_deref_mut())
.map_err(|e| AppError::VideoError(format!("turbojpeg decode failed: {}", e)))?;
Ok(image.pixels)
}
}

View File

@@ -1,3 +1,11 @@
//! Video decoder implementations
//!
//! This module provides video decoding capabilities.
#[cfg(any(target_arch = "aarch64", target_arch = "arm"))]
pub mod mjpeg_rkmpp;
pub mod mjpeg_turbo;
#[cfg(any(target_arch = "aarch64", target_arch = "arm"))]
pub use mjpeg_rkmpp::MjpegRkmppDecoder;
pub use mjpeg_turbo::MjpegTurboDecoder;

View File

@@ -28,14 +28,17 @@ const AUTO_STOP_GRACE_PERIOD_SECS: u64 = 3;
use crate::error::{AppError, Result};
use crate::video::convert::{Nv12Converter, PixelConverter};
#[cfg(any(target_arch = "aarch64", target_arch = "arm"))]
use crate::video::decoder::MjpegRkmppDecoder;
use crate::video::decoder::MjpegTurboDecoder;
use crate::video::encoder::h264::{detect_best_encoder, H264Config, H264Encoder, H264InputFormat};
use crate::video::encoder::h265::{
detect_best_h265_encoder, H265Config, H265Encoder, H265InputFormat,
};
use crate::video::encoder::registry::{EncoderBackend, EncoderRegistry, VideoEncoderType};
use crate::video::encoder::traits::EncoderConfig;
use crate::video::encoder::vp8::{VP8Config, VP8Encoder};
use crate::video::encoder::vp9::{VP9Config, VP9Encoder};
use crate::video::encoder::vp8::{detect_best_vp8_encoder, VP8Config, VP8Encoder};
use crate::video::encoder::vp9::{detect_best_vp9_encoder, VP9Config, VP9Encoder};
use crate::video::format::{PixelFormat, Resolution};
use crate::video::frame::VideoFrame;
@@ -292,10 +295,27 @@ impl VideoEncoderTrait for VP9EncoderWrapper {
}
}
enum MjpegDecoderKind {
#[cfg(any(target_arch = "aarch64", target_arch = "arm"))]
Rkmpp(MjpegRkmppDecoder),
Turbo(MjpegTurboDecoder),
}
impl MjpegDecoderKind {
fn decode(&mut self, data: &[u8]) -> Result<Vec<u8>> {
match self {
#[cfg(any(target_arch = "aarch64", target_arch = "arm"))]
MjpegDecoderKind::Rkmpp(decoder) => decoder.decode_to_nv12(data),
MjpegDecoderKind::Turbo(decoder) => decoder.decode_to_rgb(data),
}
}
}
/// Universal shared video pipeline
pub struct SharedVideoPipeline {
config: RwLock<SharedVideoPipelineConfig>,
encoder: Mutex<Option<Box<dyn VideoEncoderTrait + Send>>>,
mjpeg_decoder: Mutex<Option<MjpegDecoderKind>>,
nv12_converter: Mutex<Option<Nv12Converter>>,
yuv420p_converter: Mutex<Option<PixelConverter>>,
/// Whether the encoder needs YUV420P (true) or NV12 (false)
@@ -333,6 +353,7 @@ impl SharedVideoPipeline {
let pipeline = Arc::new(Self {
config: RwLock::new(config),
encoder: Mutex::new(None),
mjpeg_decoder: Mutex::new(None),
nv12_converter: Mutex::new(None),
yuv420p_converter: Mutex::new(None),
encoder_needs_yuv420p: AtomicBool::new(false),
@@ -367,12 +388,16 @@ impl SharedVideoPipeline {
}
};
let needs_mjpeg_decode = config.input_format.is_compressed();
// Check if RKMPP backend is available for direct input optimization
let is_rkmpp_available = registry
.encoder_with_backend(VideoEncoderType::H264, EncoderBackend::Rkmpp)
.is_some();
let use_yuyv_direct = is_rkmpp_available && config.input_format == PixelFormat::Yuyv;
let use_yuyv_direct =
is_rkmpp_available && !needs_mjpeg_decode && config.input_format == PixelFormat::Yuyv;
let use_rkmpp_direct = is_rkmpp_available
&& !needs_mjpeg_decode
&& matches!(
config.input_format,
PixelFormat::Yuyv
@@ -396,10 +421,9 @@ impl SharedVideoPipeline {
);
}
// Create encoder based on codec type
let encoder: Box<dyn VideoEncoderTrait + Send> = match config.output_codec {
let selected_codec_name = match config.output_codec {
VideoEncoderType::H264 => {
let codec_name = if use_rkmpp_direct {
if use_rkmpp_direct {
// Force RKMPP backend for direct input
get_codec_name(VideoEncoderType::H264, Some(EncoderBackend::Rkmpp)).ok_or_else(
|| {
@@ -423,11 +447,109 @@ impl SharedVideoPipeline {
detected.ok_or_else(|| {
AppError::VideoError("No H.264 encoder available".to_string())
})?
};
}
}
VideoEncoderType::H265 => {
if use_rkmpp_direct {
get_codec_name(VideoEncoderType::H265, Some(EncoderBackend::Rkmpp)).ok_or_else(
|| {
AppError::VideoError(
"RKMPP backend not available for H.265".to_string(),
)
},
)?
} else if let Some(ref backend) = config.encoder_backend {
get_codec_name(VideoEncoderType::H265, Some(*backend)).ok_or_else(|| {
AppError::VideoError(format!(
"Backend {:?} does not support H.265",
backend
))
})?
} else {
let (_encoder_type, detected) =
detect_best_h265_encoder(config.resolution.width, config.resolution.height);
detected.ok_or_else(|| {
AppError::VideoError("No H.265 encoder available".to_string())
})?
}
}
VideoEncoderType::VP8 => {
if let Some(ref backend) = config.encoder_backend {
get_codec_name(VideoEncoderType::VP8, Some(*backend)).ok_or_else(|| {
AppError::VideoError(format!("Backend {:?} does not support VP8", backend))
})?
} else {
let (_encoder_type, detected) =
detect_best_vp8_encoder(config.resolution.width, config.resolution.height);
detected.ok_or_else(|| {
AppError::VideoError("No VP8 encoder available".to_string())
})?
}
}
VideoEncoderType::VP9 => {
if let Some(ref backend) = config.encoder_backend {
get_codec_name(VideoEncoderType::VP9, Some(*backend)).ok_or_else(|| {
AppError::VideoError(format!("Backend {:?} does not support VP9", backend))
})?
} else {
let (_encoder_type, detected) =
detect_best_vp9_encoder(config.resolution.width, config.resolution.height);
detected.ok_or_else(|| {
AppError::VideoError("No VP9 encoder available".to_string())
})?
}
}
};
let is_rkmpp_encoder = selected_codec_name.contains("rkmpp");
let is_software_encoder = selected_codec_name.contains("libx264")
|| selected_codec_name.contains("libx265")
|| selected_codec_name.contains("libvpx");
let pipeline_input_format = if needs_mjpeg_decode {
if is_rkmpp_encoder {
info!(
"MJPEG input detected, using RKMPP decoder ({} -> NV12 with NV16 fallback)",
config.input_format
);
#[cfg(any(target_arch = "aarch64", target_arch = "arm"))]
{
let decoder = MjpegRkmppDecoder::new(config.resolution)?;
*self.mjpeg_decoder.lock().await = Some(MjpegDecoderKind::Rkmpp(decoder));
PixelFormat::Nv12
}
#[cfg(not(any(target_arch = "aarch64", target_arch = "arm")))]
{
return Err(AppError::VideoError(
"RKMPP MJPEG decode is only supported on ARM builds".to_string(),
));
}
} else if is_software_encoder {
info!(
"MJPEG input detected, using TurboJPEG decoder ({} -> RGB24)",
config.input_format
);
let decoder = MjpegTurboDecoder::new(config.resolution)?;
*self.mjpeg_decoder.lock().await = Some(MjpegDecoderKind::Turbo(decoder));
PixelFormat::Rgb24
} else {
return Err(AppError::VideoError(
"MJPEG input requires RKMPP or software encoder".to_string(),
));
}
} else {
*self.mjpeg_decoder.lock().await = None;
config.input_format
};
// Create encoder based on codec type
let encoder: Box<dyn VideoEncoderTrait + Send> = match config.output_codec {
VideoEncoderType::H264 => {
let codec_name = selected_codec_name.clone();
let is_rkmpp = codec_name.contains("rkmpp");
let direct_input_format = if is_rkmpp {
match config.input_format {
match pipeline_input_format {
PixelFormat::Yuyv => Some(H264InputFormat::Yuyv422),
PixelFormat::Yuv420 => Some(H264InputFormat::Yuv420p),
PixelFormat::Rgb24 => Some(H264InputFormat::Rgb24),
@@ -439,7 +561,7 @@ impl SharedVideoPipeline {
_ => None,
}
} else if codec_name.contains("libx264") {
match config.input_format {
match pipeline_input_format {
PixelFormat::Nv12 => Some(H264InputFormat::Nv12),
PixelFormat::Nv16 => Some(H264InputFormat::Nv16),
PixelFormat::Nv21 => Some(H264InputFormat::Nv21),
@@ -485,32 +607,11 @@ impl SharedVideoPipeline {
Box::new(H264EncoderWrapper(encoder))
}
VideoEncoderType::H265 => {
let codec_name = if use_rkmpp_direct {
get_codec_name(VideoEncoderType::H265, Some(EncoderBackend::Rkmpp)).ok_or_else(
|| {
AppError::VideoError(
"RKMPP backend not available for H.265".to_string(),
)
},
)?
} else if let Some(ref backend) = config.encoder_backend {
get_codec_name(VideoEncoderType::H265, Some(*backend)).ok_or_else(|| {
AppError::VideoError(format!(
"Backend {:?} does not support H.265",
backend
))
})?
} else {
let (_encoder_type, detected) =
detect_best_h265_encoder(config.resolution.width, config.resolution.height);
detected.ok_or_else(|| {
AppError::VideoError("No H.265 encoder available".to_string())
})?
};
let codec_name = selected_codec_name.clone();
let is_rkmpp = codec_name.contains("rkmpp");
let direct_input_format = if is_rkmpp {
match config.input_format {
match pipeline_input_format {
PixelFormat::Yuyv => Some(H265InputFormat::Yuyv422),
PixelFormat::Yuv420 => Some(H265InputFormat::Yuv420p),
PixelFormat::Rgb24 => Some(H265InputFormat::Rgb24),
@@ -522,7 +623,7 @@ impl SharedVideoPipeline {
_ => None,
}
} else if codec_name.contains("libx265") {
match config.input_format {
match pipeline_input_format {
PixelFormat::Yuv420 => Some(H265InputFormat::Yuv420p),
_ => None,
}
@@ -572,23 +673,14 @@ impl SharedVideoPipeline {
VideoEncoderType::VP8 => {
let encoder_config =
VP8Config::low_latency(config.resolution, config.bitrate_kbps());
let encoder = if let Some(ref backend) = config.encoder_backend {
let codec_name = get_codec_name(VideoEncoderType::VP8, Some(*backend))
.ok_or_else(|| {
AppError::VideoError(format!(
"Backend {:?} does not support VP8",
backend
))
})?;
let codec_name = selected_codec_name.clone();
if let Some(ref backend) = config.encoder_backend {
info!(
"Creating VP8 encoder with backend {:?} (codec: {})",
backend, codec_name
);
VP8Encoder::with_codec(encoder_config, &codec_name)?
} else {
VP8Encoder::new(encoder_config)?
};
}
let encoder = VP8Encoder::with_codec(encoder_config, &codec_name)?;
info!("Created VP8 encoder: {}", encoder.codec_name());
Box::new(VP8EncoderWrapper(encoder))
@@ -596,23 +688,14 @@ impl SharedVideoPipeline {
VideoEncoderType::VP9 => {
let encoder_config =
VP9Config::low_latency(config.resolution, config.bitrate_kbps());
let encoder = if let Some(ref backend) = config.encoder_backend {
let codec_name = get_codec_name(VideoEncoderType::VP9, Some(*backend))
.ok_or_else(|| {
AppError::VideoError(format!(
"Backend {:?} does not support VP9",
backend
))
})?;
let codec_name = selected_codec_name.clone();
if let Some(ref backend) = config.encoder_backend {
info!(
"Creating VP9 encoder with backend {:?} (codec: {})",
backend, codec_name
);
VP9Encoder::with_codec(encoder_config, &codec_name)?
} else {
VP9Encoder::new(encoder_config)?
};
}
let encoder = VP9Encoder::with_codec(encoder_config, &codec_name)?;
info!("Created VP9 encoder: {}", encoder.codec_name());
Box::new(VP9EncoderWrapper(encoder))
@@ -623,7 +706,7 @@ impl SharedVideoPipeline {
let codec_name = encoder.codec_name();
let use_direct_input = if codec_name.contains("rkmpp") {
matches!(
config.input_format,
pipeline_input_format,
PixelFormat::Yuyv
| PixelFormat::Yuv420
| PixelFormat::Rgb24
@@ -635,7 +718,7 @@ impl SharedVideoPipeline {
)
} else if codec_name.contains("libx264") {
matches!(
config.input_format,
pipeline_input_format,
PixelFormat::Nv12 | PixelFormat::Nv16 | PixelFormat::Nv21 | PixelFormat::Yuv420
)
} else {
@@ -645,7 +728,7 @@ impl SharedVideoPipeline {
// Determine if encoder needs YUV420P (software encoders) or NV12 (hardware encoders)
let needs_yuv420p = if codec_name.contains("libx264") {
!matches!(
config.input_format,
pipeline_input_format,
PixelFormat::Nv12 | PixelFormat::Nv16 | PixelFormat::Nv21 | PixelFormat::Yuv420
)
} else {
@@ -667,7 +750,7 @@ impl SharedVideoPipeline {
// Create converter or decoder based on input format and encoder needs
info!(
"Initializing input format handler for: {} -> {}",
config.input_format,
pipeline_input_format,
if use_direct_input {
"direct"
} else if needs_yuv420p {
@@ -686,7 +769,7 @@ impl SharedVideoPipeline {
(None, None)
} else if needs_yuv420p {
// Software encoder needs YUV420P
match config.input_format {
match pipeline_input_format {
PixelFormat::Yuv420 => {
info!("Using direct YUV420P input (no conversion)");
(None, None)
@@ -729,13 +812,13 @@ impl SharedVideoPipeline {
_ => {
return Err(AppError::VideoError(format!(
"Unsupported input format for software encoding: {}",
config.input_format
pipeline_input_format
)));
}
}
} else {
// Hardware encoder needs NV12
match config.input_format {
match pipeline_input_format {
PixelFormat::Nv12 => {
info!("Using direct NV12 input (no conversion)");
(None, None)
@@ -767,7 +850,7 @@ impl SharedVideoPipeline {
_ => {
return Err(AppError::VideoError(format!(
"Unsupported input format for hardware encoding: {}",
config.input_format
pipeline_input_format
)));
}
}
@@ -857,6 +940,7 @@ impl SharedVideoPipeline {
// Clear encoder state
*self.encoder.lock().await = None;
*self.mjpeg_decoder.lock().await = None;
*self.nv12_converter.lock().await = None;
*self.yuv420p_converter.lock().await = None;
self.encoder_needs_yuv420p.store(false, Ordering::Release);
@@ -973,8 +1057,10 @@ impl SharedVideoPipeline {
}
// Batch update stats every second (reduces lock contention)
if last_fps_time.elapsed() >= Duration::from_secs(1) {
let current_fps = fps_frame_count as f32 / last_fps_time.elapsed().as_secs_f32();
let fps_elapsed = last_fps_time.elapsed();
if fps_elapsed >= Duration::from_secs(1) {
let current_fps =
fps_frame_count as f32 / fps_elapsed.as_secs_f32();
fps_frame_count = 0;
last_fps_time = Instant::now();
@@ -1020,11 +1106,25 @@ impl SharedVideoPipeline {
frame: &VideoFrame,
frame_count: u64,
) -> Result<Option<EncodedVideoFrame>> {
let config = self.config.read().await;
let (fps, codec, input_format) = {
let config = self.config.read().await;
(config.fps, config.output_codec, config.input_format)
};
let raw_frame = frame.data();
let fps = config.fps;
let codec = config.output_codec;
drop(config);
let decoded_buf = if input_format.is_compressed() {
let decoded = {
let mut decoder_guard = self.mjpeg_decoder.lock().await;
let decoder = decoder_guard.as_mut().ok_or_else(|| {
AppError::VideoError("MJPEG decoder not initialized".to_string())
})?;
decoder.decode(raw_frame)?
};
Some(decoded)
} else {
None
};
let raw_frame = decoded_buf.as_deref().unwrap_or(raw_frame);
// Calculate PTS from real capture timestamp (lock-free using AtomicI64)
// This ensures smooth playback even when capture timing varies

View File

@@ -220,11 +220,8 @@ pub async fn apply_hid_config(
// Get MSD config from store
let config = state.config.get();
let msd = crate::msd::MsdController::new(
state.otg_service.clone(),
&config.msd.images_path,
&config.msd.drive_path,
);
let msd =
crate::msd::MsdController::new(state.otg_service.clone(), config.msd.msd_dir_path());
if let Err(e) = msd.init().await {
tracing::warn!("Failed to auto-initialize MSD for OTG: {}", e);
@@ -253,51 +250,73 @@ pub async fn apply_msd_config(
// Check if MSD enabled state changed
let old_msd_enabled = old_config.enabled;
let new_msd_enabled = new_config.enabled;
let msd_dir_changed = old_config.msd_dir != new_config.msd_dir;
tracing::info!(
"MSD enabled: old={}, new={}",
old_msd_enabled,
new_msd_enabled
);
if msd_dir_changed {
tracing::info!("MSD directory changed: {}", new_config.msd_dir);
}
if old_msd_enabled != new_msd_enabled {
if new_msd_enabled {
// MSD was disabled, now enabled - need to initialize
tracing::info!("MSD enabled in config, initializing...");
// Ensure MSD directories exist (msd/images, msd/ventoy)
let msd_dir = new_config.msd_dir_path();
if let Err(e) = std::fs::create_dir_all(msd_dir.join("images")) {
tracing::warn!("Failed to create MSD images directory: {}", e);
}
if let Err(e) = std::fs::create_dir_all(msd_dir.join("ventoy")) {
tracing::warn!("Failed to create MSD ventoy directory: {}", e);
}
let msd = crate::msd::MsdController::new(
state.otg_service.clone(),
&new_config.images_path,
&new_config.drive_path,
);
msd.init()
.await
.map_err(|e| AppError::Config(format!("MSD initialization failed: {}", e)))?;
// Set event bus
let events = state.events.clone();
msd.set_event_bus(events).await;
// Store the initialized controller
*state.msd.write().await = Some(msd);
tracing::info!("MSD initialized successfully");
} else {
// MSD was enabled, now disabled - shutdown
tracing::info!("MSD disabled in config, shutting down...");
if let Some(msd) = state.msd.write().await.as_mut() {
if let Err(e) = msd.shutdown().await {
tracing::warn!("MSD shutdown failed: {}", e);
}
}
*state.msd.write().await = None;
tracing::info!("MSD shutdown complete");
}
} else {
let needs_reload = old_msd_enabled != new_msd_enabled || msd_dir_changed;
if !needs_reload {
tracing::info!(
"MSD enabled state unchanged ({}), no reload needed",
"MSD enabled state unchanged ({}) and directory unchanged, no reload needed",
new_msd_enabled
);
return Ok(());
}
if new_msd_enabled {
tracing::info!("(Re)initializing MSD...");
// Shutdown existing controller if present
let mut msd_guard = state.msd.write().await;
if let Some(msd) = msd_guard.as_mut() {
if let Err(e) = msd.shutdown().await {
tracing::warn!("MSD shutdown failed: {}", e);
}
}
*msd_guard = None;
drop(msd_guard);
let msd =
crate::msd::MsdController::new(state.otg_service.clone(), new_config.msd_dir_path());
msd.init()
.await
.map_err(|e| AppError::Config(format!("MSD initialization failed: {}", e)))?;
// Set event bus
let events = state.events.clone();
msd.set_event_bus(events).await;
// Store the initialized controller
*state.msd.write().await = Some(msd);
tracing::info!("MSD initialized successfully");
} else {
// MSD disabled - shutdown
tracing::info!("MSD disabled in config, shutting down...");
let mut msd_guard = state.msd.write().await;
if let Some(msd) = msd_guard.as_mut() {
if let Err(e) = msd.shutdown().await {
tracing::warn!("MSD shutdown failed: {}", e);
}
}
*msd_guard = None;
tracing::info!("MSD shutdown complete");
}
Ok(())

View File

@@ -3,6 +3,7 @@ use crate::error::AppError;
use crate::rustdesk::config::RustDeskConfig;
use crate::video::encoder::BitratePreset;
use serde::Deserialize;
use std::path::Path;
use typeshare::typeshare;
// ===== Video Config =====
@@ -305,16 +306,20 @@ impl HidConfigUpdate {
#[derive(Debug, Deserialize)]
pub struct MsdConfigUpdate {
pub enabled: Option<bool>,
pub images_path: Option<String>,
pub drive_path: Option<String>,
pub virtual_drive_size_mb: Option<u32>,
pub msd_dir: Option<String>,
}
impl MsdConfigUpdate {
pub fn validate(&self) -> crate::error::Result<()> {
if let Some(size) = self.virtual_drive_size_mb {
if !(1..=10240).contains(&size) {
return Err(AppError::BadRequest("Drive size must be 1-10240 MB".into()));
if let Some(ref dir) = self.msd_dir {
let trimmed = dir.trim();
if trimmed.is_empty() {
return Err(AppError::BadRequest("MSD directory cannot be empty".into()));
}
if !Path::new(trimmed).is_absolute() {
return Err(AppError::BadRequest(
"MSD directory must be an absolute path".into(),
));
}
}
Ok(())
@@ -324,14 +329,8 @@ impl MsdConfigUpdate {
if let Some(enabled) = self.enabled {
config.enabled = enabled;
}
if let Some(ref path) = self.images_path {
config.images_path = path.clone();
}
if let Some(ref path) = self.drive_path {
config.drive_path = path.clone();
}
if let Some(size) = self.virtual_drive_size_mb {
config.virtual_drive_size_mb = size;
if let Some(ref dir) = self.msd_dir {
config.msd_dir = dir.trim().to_string();
}
}
}

View File

@@ -90,12 +90,13 @@ pub struct CapabilityInfo {
pub async fn system_info(State(state): State<Arc<AppState>>) -> Json<SystemInfo> {
let config = state.config.get();
// Get disk space information for MSD images directory
// Get disk space information for MSD base directory
let disk_space = {
if let Some(ref msd_controller) = *state.msd.read().await {
get_disk_space(msd_controller.images_path()).ok()
} else {
let msd_dir = config.msd.msd_dir_path();
if msd_dir.as_os_str().is_empty() {
None
} else {
get_disk_space(&msd_dir).ok()
}
};
@@ -933,66 +934,85 @@ pub async fn update_config(
}
}
// MSD config processing - reload if enabled state changed
// MSD config processing - reload if enabled state or directory changed
if has_msd {
tracing::info!("MSD config sent, checking if reload needed...");
tracing::debug!("Old MSD config: {:?}", old_config.msd);
tracing::debug!("New MSD config: {:?}", new_config.msd);
// Check if MSD enabled state changed
let old_msd_enabled = old_config.msd.enabled;
let new_msd_enabled = new_config.msd.enabled;
let msd_dir_changed = old_config.msd.msd_dir != new_config.msd.msd_dir;
tracing::info!(
"MSD enabled: old={}, new={}",
old_msd_enabled,
new_msd_enabled
);
if msd_dir_changed {
tracing::info!("MSD directory changed: {}", new_config.msd.msd_dir);
}
if old_msd_enabled != new_msd_enabled {
if new_msd_enabled {
// MSD was disabled, now enabled - need to initialize
tracing::info!("MSD enabled in config, initializing...");
// Ensure MSD directories exist (msd/images, msd/ventoy)
let msd_dir = new_config.msd.msd_dir_path();
if let Err(e) = std::fs::create_dir_all(msd_dir.join("images")) {
tracing::warn!("Failed to create MSD images directory: {}", e);
}
if let Err(e) = std::fs::create_dir_all(msd_dir.join("ventoy")) {
tracing::warn!("Failed to create MSD ventoy directory: {}", e);
}
let msd = crate::msd::MsdController::new(
state.otg_service.clone(),
&new_config.msd.images_path,
&new_config.msd.drive_path,
);
if let Err(e) = msd.init().await {
tracing::error!("MSD initialization failed: {}", e);
// Rollback config on failure
state.config.set((*old_config).clone()).await?;
return Ok(Json(LoginResponse {
success: false,
message: Some(format!("MSD initialization failed: {}", e)),
}));
}
// Set event bus
let events = state.events.clone();
msd.set_event_bus(events).await;
// Store the initialized controller
*state.msd.write().await = Some(msd);
tracing::info!("MSD initialized successfully");
} else {
// MSD was enabled, now disabled - shutdown
tracing::info!("MSD disabled in config, shutting down...");
if let Some(msd) = state.msd.write().await.as_mut() {
if let Err(e) = msd.shutdown().await {
tracing::warn!("MSD shutdown failed: {}", e);
}
}
*state.msd.write().await = None;
tracing::info!("MSD shutdown complete");
}
} else {
let needs_reload = old_msd_enabled != new_msd_enabled || msd_dir_changed;
if !needs_reload {
tracing::info!(
"MSD enabled state unchanged ({}), no reload needed",
"MSD enabled state unchanged ({}) and directory unchanged, no reload needed",
new_msd_enabled
);
} else if new_msd_enabled {
tracing::info!("(Re)initializing MSD...");
// Shutdown existing controller if present
let mut msd_guard = state.msd.write().await;
if let Some(msd) = msd_guard.as_mut() {
if let Err(e) = msd.shutdown().await {
tracing::warn!("MSD shutdown failed: {}", e);
}
}
*msd_guard = None;
drop(msd_guard);
let msd = crate::msd::MsdController::new(
state.otg_service.clone(),
new_config.msd.msd_dir_path(),
);
if let Err(e) = msd.init().await {
tracing::error!("MSD initialization failed: {}", e);
// Rollback config on failure
state.config.set((*old_config).clone()).await?;
return Ok(Json(LoginResponse {
success: false,
message: Some(format!("MSD initialization failed: {}", e)),
}));
}
// Set event bus
let events = state.events.clone();
msd.set_event_bus(events).await;
// Store the initialized controller
*state.msd.write().await = Some(msd);
tracing::info!("MSD initialized successfully");
} else {
tracing::info!("MSD disabled in config, shutting down...");
let mut msd_guard = state.msd.write().await;
if let Some(msd) = msd_guard.as_mut() {
if let Err(e) = msd.shutdown().await {
tracing::warn!("MSD shutdown failed: {}", e);
}
}
*msd_guard = None;
tracing::info!("MSD shutdown complete");
}
}
@@ -2069,7 +2089,7 @@ pub async fn msd_status(State(state): State<Arc<AppState>>) -> Result<Json<MsdSt
/// List all available images
pub async fn msd_images_list(State(state): State<Arc<AppState>>) -> Result<Json<Vec<ImageInfo>>> {
let config = state.config.get();
let images_path = std::path::PathBuf::from(&config.msd.images_path);
let images_path = config.msd.images_dir();
let manager = ImageManager::new(images_path);
let images = manager.list()?;
@@ -2082,7 +2102,7 @@ pub async fn msd_image_upload(
mut multipart: Multipart,
) -> Result<Json<ImageInfo>> {
let config = state.config.get();
let images_path = std::path::PathBuf::from(&config.msd.images_path);
let images_path = config.msd.images_dir();
let manager = ImageManager::new(images_path);
while let Some(field) = multipart
@@ -2115,7 +2135,7 @@ pub async fn msd_image_get(
AxumPath(id): AxumPath<String>,
) -> Result<Json<ImageInfo>> {
let config = state.config.get();
let images_path = std::path::PathBuf::from(&config.msd.images_path);
let images_path = config.msd.images_dir();
let manager = ImageManager::new(images_path);
let image = manager.get(&id)?;
@@ -2128,7 +2148,7 @@ pub async fn msd_image_delete(
AxumPath(id): AxumPath<String>,
) -> Result<Json<LoginResponse>> {
let config = state.config.get();
let images_path = std::path::PathBuf::from(&config.msd.images_path);
let images_path = config.msd.images_dir();
let manager = ImageManager::new(images_path);
manager.delete(&id)?;
@@ -2194,7 +2214,7 @@ pub async fn msd_connect(
})?;
// Get image info from ImageManager
let images_path = std::path::PathBuf::from(&config.msd.images_path);
let images_path = config.msd.images_dir();
let manager = ImageManager::new(images_path);
let image = manager.get(&image_id)?;
@@ -2240,7 +2260,7 @@ pub async fn msd_disconnect(State(state): State<Arc<AppState>>) -> Result<Json<L
/// Get drive info
pub async fn msd_drive_info(State(state): State<Arc<AppState>>) -> Result<Json<DriveInfo>> {
let config = state.config.get();
let drive_path = std::path::PathBuf::from(&config.msd.drive_path);
let drive_path = config.msd.drive_path();
let drive = VentoyDrive::new(drive_path);
if !drive.exists() {
@@ -2257,7 +2277,7 @@ pub async fn msd_drive_init(
Json(req): Json<DriveInitRequest>,
) -> Result<Json<DriveInfo>> {
let config = state.config.get();
let drive_path = std::path::PathBuf::from(&config.msd.drive_path);
let drive_path = config.msd.drive_path();
let drive = VentoyDrive::new(drive_path);
let info = drive.init(req.size_mb).await?;
@@ -2281,7 +2301,7 @@ pub async fn msd_drive_delete(State(state): State<Arc<AppState>>) -> Result<Json
drop(msd_guard);
// Delete the drive file
let drive_path = std::path::PathBuf::from(&config.msd.drive_path);
let drive_path = config.msd.drive_path();
if drive_path.exists() {
std::fs::remove_file(&drive_path)
.map_err(|e| AppError::Internal(format!("Failed to delete drive file: {}", e)))?;
@@ -2299,7 +2319,7 @@ pub async fn msd_drive_files(
Query(params): Query<HashMap<String, String>>,
) -> Result<Json<Vec<DriveFile>>> {
let config = state.config.get();
let drive_path = std::path::PathBuf::from(&config.msd.drive_path);
let drive_path = config.msd.drive_path();
let drive = VentoyDrive::new(drive_path);
let dir_path = params.get("path").map(|s| s.as_str()).unwrap_or("/");
@@ -2314,7 +2334,7 @@ pub async fn msd_drive_upload(
mut multipart: Multipart,
) -> Result<Json<LoginResponse>> {
let config = state.config.get();
let drive_path = std::path::PathBuf::from(&config.msd.drive_path);
let drive_path = config.msd.drive_path();
let drive = VentoyDrive::new(drive_path);
let target_dir = params.get("path").map(|s| s.as_str()).unwrap_or("/");
@@ -2359,7 +2379,7 @@ pub async fn msd_drive_download(
AxumPath(file_path): AxumPath<String>,
) -> Result<Response> {
let config = state.config.get();
let drive_path = std::path::PathBuf::from(&config.msd.drive_path);
let drive_path = config.msd.drive_path();
let drive = VentoyDrive::new(drive_path);
// Get file stream (returns file size and channel receiver)
@@ -2393,7 +2413,7 @@ pub async fn msd_drive_file_delete(
AxumPath(file_path): AxumPath<String>,
) -> Result<Json<LoginResponse>> {
let config = state.config.get();
let drive_path = std::path::PathBuf::from(&config.msd.drive_path);
let drive_path = config.msd.drive_path();
let drive = VentoyDrive::new(drive_path);
drive.delete(&file_path).await?;
@@ -2410,7 +2430,7 @@ pub async fn msd_drive_mkdir(
AxumPath(dir_path): AxumPath<String>,
) -> Result<Json<LoginResponse>> {
let config = state.config.get();
let drive_path = std::path::PathBuf::from(&config.msd.drive_path);
let drive_path = config.msd.drive_path();
let drive = VentoyDrive::new(drive_path);
drive.mkdir(&dir_path).await?;

View File

@@ -586,7 +586,10 @@ impl UniversalSession {
// Send encoded frame via RTP
if let Err(e) = video_track
.write_frame(&encoded_frame.data, encoded_frame.is_keyframe)
.write_frame_bytes(
encoded_frame.data.clone(),
encoded_frame.is_keyframe,
)
.await
{
if frames_sent % 100 == 0 {

View File

@@ -310,7 +310,7 @@ impl UniversalVideoTrack {
/// Handles codec-specific processing:
/// - H264/H265: NAL unit parsing, parameter caching
/// - VP8/VP9: Direct frame sending
pub async fn write_frame(&self, data: &[u8], is_keyframe: bool) -> Result<()> {
pub async fn write_frame_bytes(&self, data: Bytes, is_keyframe: bool) -> Result<()> {
if data.is_empty() {
return Ok(());
}
@@ -323,11 +323,16 @@ impl UniversalVideoTrack {
}
}
pub async fn write_frame(&self, data: &[u8], is_keyframe: bool) -> Result<()> {
self.write_frame_bytes(Bytes::copy_from_slice(data), is_keyframe)
.await
}
/// Write H264 frame (Annex B format)
///
/// Sends the entire Annex B frame as a single Sample to allow the
/// H264Payloader to aggregate SPS+PPS into STAP-A packets.
async fn write_h264_frame(&self, data: &[u8], is_keyframe: bool) -> Result<()> {
async fn write_h264_frame(&self, data: Bytes, is_keyframe: bool) -> Result<()> {
// Send entire Annex B frame as one Sample
// The H264Payloader in rtp crate will:
// 1. Parse NAL units from Annex B format
@@ -335,8 +340,9 @@ impl UniversalVideoTrack {
// 3. Aggregate SPS+PPS+IDR into STAP-A when possible
// 4. Fragment large NALs using FU-A
let frame_duration = Duration::from_micros(1_000_000 / self.config.fps.max(1) as u64);
let data_len = data.len();
let sample = Sample {
data: Bytes::copy_from_slice(data),
data,
duration: frame_duration,
..Default::default()
};
@@ -355,7 +361,7 @@ impl UniversalVideoTrack {
// Update stats
let mut stats = self.stats.lock().await;
stats.frames_sent += 1;
stats.bytes_sent += data.len() as u64;
stats.bytes_sent += data_len as u64;
if is_keyframe {
stats.keyframes_sent += 1;
}
@@ -367,18 +373,19 @@ impl UniversalVideoTrack {
///
/// Pass raw Annex B data directly to the official HevcPayloader.
/// The payloader handles NAL parsing, VPS/SPS/PPS caching, AP generation, and FU fragmentation.
async fn write_h265_frame(&self, data: &[u8], is_keyframe: bool) -> Result<()> {
async fn write_h265_frame(&self, data: Bytes, is_keyframe: bool) -> Result<()> {
// Pass raw Annex B data directly to the official HevcPayloader
self.send_h265_rtp(data, is_keyframe).await
}
/// Write VP8 frame
async fn write_vp8_frame(&self, data: &[u8], is_keyframe: bool) -> Result<()> {
async fn write_vp8_frame(&self, data: Bytes, is_keyframe: bool) -> Result<()> {
// VP8 frames are sent directly without NAL parsing
// Calculate frame duration based on configured FPS
let frame_duration = Duration::from_micros(1_000_000 / self.config.fps.max(1) as u64);
let data_len = data.len();
let sample = Sample {
data: Bytes::copy_from_slice(data),
data,
duration: frame_duration,
..Default::default()
};
@@ -397,7 +404,7 @@ impl UniversalVideoTrack {
// Update stats
let mut stats = self.stats.lock().await;
stats.frames_sent += 1;
stats.bytes_sent += data.len() as u64;
stats.bytes_sent += data_len as u64;
if is_keyframe {
stats.keyframes_sent += 1;
}
@@ -406,12 +413,13 @@ impl UniversalVideoTrack {
}
/// Write VP9 frame
async fn write_vp9_frame(&self, data: &[u8], is_keyframe: bool) -> Result<()> {
async fn write_vp9_frame(&self, data: Bytes, is_keyframe: bool) -> Result<()> {
// VP9 frames are sent directly without NAL parsing
// Calculate frame duration based on configured FPS
let frame_duration = Duration::from_micros(1_000_000 / self.config.fps.max(1) as u64);
let data_len = data.len();
let sample = Sample {
data: Bytes::copy_from_slice(data),
data,
duration: frame_duration,
..Default::default()
};
@@ -430,7 +438,7 @@ impl UniversalVideoTrack {
// Update stats
let mut stats = self.stats.lock().await;
stats.frames_sent += 1;
stats.bytes_sent += data.len() as u64;
stats.bytes_sent += data_len as u64;
if is_keyframe {
stats.keyframes_sent += 1;
}
@@ -439,7 +447,7 @@ impl UniversalVideoTrack {
}
/// Send H265 NAL units via custom H265Payloader
async fn send_h265_rtp(&self, data: &[u8], is_keyframe: bool) -> Result<()> {
async fn send_h265_rtp(&self, payload: Bytes, is_keyframe: bool) -> Result<()> {
let rtp_track = match &self.track {
TrackType::Rtp(t) => t,
TrackType::Sample(_) => {
@@ -459,7 +467,6 @@ impl UniversalVideoTrack {
// Minimize lock hold time: only hold lock during payload generation and state update
let (payloads, timestamp, seq_start, num_payloads) = {
let mut state = h265_state.lock().await;
let payload = Bytes::copy_from_slice(data);
// Use custom H265Payloader to fragment the data
let payloads = state.payloader.payload(RTP_MTU, &payload);