fix: mpp 性能优化和修复

- mjpeg-->h265 mpp 编码速度优化
- 修复 mpp 编码后的视频 rustdesk 无法解码问题
- 更新版本号为 v0.1.2
This commit is contained in:
mofeng
2026-01-27 17:06:47 +08:00
parent 1786b7689d
commit 9193c54f86
17 changed files with 300 additions and 123 deletions

View File

@@ -623,7 +623,7 @@ impl Connection {
self.negotiated_codec = Some(negotiated);
info!("Negotiated video codec: {:?}", negotiated);
let response = self.create_login_response(true);
let response = self.create_login_response(true).await;
let response_bytes = response
.write_to_bytes()
.map_err(|e| anyhow::anyhow!("Failed to encode: {}", e))?;
@@ -673,7 +673,11 @@ impl Connection {
Some(misc::Union::RefreshVideo(refresh)) => {
if *refresh {
debug!("Video refresh requested");
// TODO: Request keyframe from encoder
if let Some(ref video_manager) = self.video_manager {
if let Err(e) = video_manager.request_keyframe().await {
warn!("Failed to request keyframe: {}", e);
}
}
}
}
Some(misc::Union::VideoReceived(received)) => {
@@ -1064,7 +1068,7 @@ impl Connection {
}
/// Create login response with dynamically detected encoder capabilities
fn create_login_response(&self, success: bool) -> HbbMessage {
async fn create_login_response(&self, success: bool) -> HbbMessage {
if success {
// Dynamically detect available encoders
let registry = EncoderRegistry::global();
@@ -1080,11 +1084,21 @@ impl Connection {
h264_available, h265_available, vp8_available, vp9_available
);
let mut display_width = self.screen_width;
let mut display_height = self.screen_height;
if let Some(ref video_manager) = self.video_manager {
let video_info = video_manager.get_video_info().await;
if let Some((width, height)) = video_info.resolution {
display_width = width;
display_height = height;
}
}
let mut display_info = DisplayInfo::new();
display_info.x = 0;
display_info.y = 0;
display_info.width = 1920;
display_info.height = 1080;
display_info.width = display_width as i32;
display_info.height = display_height as i32;
display_info.name = "KVM Display".to_string();
display_info.online = true;
display_info.cursor_embedded = false;
@@ -1582,6 +1596,9 @@ async fn run_video_streaming(
config.bitrate_preset
);
}
if let Err(e) = video_manager.request_keyframe().await {
debug!("Failed to request keyframe for connection {}: {}", conn_id, e);
}
// Inner loop: receives frames from current subscription
loop {

View File

@@ -42,6 +42,9 @@ pub struct VideoFrameAdapter {
seq: u32,
/// Timestamp offset
timestamp_base: u64,
/// Cached H264 SPS/PPS (Annex B NAL without start code)
h264_sps: Option<Bytes>,
h264_pps: Option<Bytes>,
}
impl VideoFrameAdapter {
@@ -51,6 +54,8 @@ impl VideoFrameAdapter {
codec,
seq: 0,
timestamp_base: 0,
h264_sps: None,
h264_pps: None,
}
}
@@ -68,6 +73,7 @@ impl VideoFrameAdapter {
is_keyframe: bool,
timestamp_ms: u64,
) -> Message {
let data = self.prepare_h264_frame(data, is_keyframe);
// Calculate relative timestamp
if self.seq == 0 {
self.timestamp_base = timestamp_ms;
@@ -100,6 +106,41 @@ impl VideoFrameAdapter {
msg
}
fn prepare_h264_frame(&mut self, data: Bytes, is_keyframe: bool) -> Bytes {
if self.codec != VideoCodec::H264 {
return data;
}
// Parse SPS/PPS from Annex B data (without start codes)
let (sps, pps) = crate::webrtc::rtp::extract_sps_pps(&data);
let mut has_sps = false;
let mut has_pps = false;
if let Some(sps) = sps {
self.h264_sps = Some(Bytes::from(sps));
has_sps = true;
}
if let Some(pps) = pps {
self.h264_pps = Some(Bytes::from(pps));
has_pps = true;
}
// Inject cached SPS/PPS before IDR when missing
if is_keyframe && (!has_sps || !has_pps) {
if let (Some(ref sps), Some(ref pps)) = (self.h264_sps.as_ref(), self.h264_pps.as_ref()) {
let mut out = Vec::with_capacity(8 + sps.len() + pps.len() + data.len());
out.extend_from_slice(&[0, 0, 0, 1]);
out.extend_from_slice(sps);
out.extend_from_slice(&[0, 0, 0, 1]);
out.extend_from_slice(pps);
out.extend_from_slice(&data);
return Bytes::from(out);
}
}
data
}
/// Convert encoded video data to RustDesk Message
pub fn encode_frame(&mut self, data: &[u8], is_keyframe: bool, timestamp_ms: u64) -> Message {
self.encode_frame_from_bytes(Bytes::copy_from_slice(data), is_keyframe, timestamp_ms)

View File

@@ -2,7 +2,7 @@
use hwcodec::ffmpeg::AVPixelFormat;
use hwcodec::ffmpeg_ram::decode::{DecodeContext, Decoder};
use tracing::warn;
use tracing::{info, warn};
use crate::error::{AppError, Result};
use crate::video::convert::Nv12Converter;
@@ -72,6 +72,9 @@ impl MjpegRkmppDecoder {
);
}
} else {
if frame.pixfmt == AVPixelFormat::AV_PIX_FMT_NV16 {
info!("mjpeg_rkmpp output pixfmt NV16 on first frame; converting to NV12");
}
self.last_pixfmt = Some(frame.pixfmt);
}

View File

@@ -2,10 +2,6 @@
//!
//! This module provides video decoding capabilities.
#[cfg(any(target_arch = "aarch64", target_arch = "arm"))]
pub mod mjpeg_rkmpp;
pub mod mjpeg_turbo;
#[cfg(any(target_arch = "aarch64", target_arch = "arm"))]
pub use mjpeg_rkmpp::MjpegRkmppDecoder;
pub use mjpeg_turbo::MjpegTurboDecoder;

View File

@@ -33,11 +33,9 @@ const JPEG_VALIDATE_INTERVAL: u64 = 30;
use crate::error::{AppError, Result};
use crate::video::convert::{Nv12Converter, PixelConverter};
#[cfg(any(target_arch = "aarch64", target_arch = "arm"))]
use crate::video::decoder::MjpegRkmppDecoder;
use crate::video::decoder::MjpegTurboDecoder;
#[cfg(any(target_arch = "aarch64", target_arch = "arm"))]
use hwcodec::ffmpeg_hw::{last_error_message as ffmpeg_hw_last_error, HwMjpegH264Config, HwMjpegH264Pipeline};
use hwcodec::ffmpeg_hw::{last_error_message as ffmpeg_hw_last_error, HwMjpegH26xConfig, HwMjpegH26xPipeline};
use v4l::buffer::Type as BufferType;
use v4l::io::traits::CaptureStream;
use v4l::prelude::*;
@@ -177,7 +175,7 @@ struct EncoderThreadState {
yuv420p_converter: Option<PixelConverter>,
encoder_needs_yuv420p: bool,
#[cfg(any(target_arch = "aarch64", target_arch = "arm"))]
ffmpeg_hw_pipeline: Option<HwMjpegH264Pipeline>,
ffmpeg_hw_pipeline: Option<HwMjpegH26xPipeline>,
#[cfg(any(target_arch = "aarch64", target_arch = "arm"))]
ffmpeg_hw_enabled: bool,
fps: u32,
@@ -319,16 +317,12 @@ impl VideoEncoderTrait for VP9EncoderWrapper {
}
enum MjpegDecoderKind {
#[cfg(any(target_arch = "aarch64", target_arch = "arm"))]
Rkmpp(MjpegRkmppDecoder),
Turbo(MjpegTurboDecoder),
}
impl MjpegDecoderKind {
fn decode(&mut self, data: &[u8]) -> Result<Vec<u8>> {
match self {
#[cfg(any(target_arch = "aarch64", target_arch = "arm"))]
MjpegDecoderKind::Rkmpp(decoder) => decoder.decode_to_nv12(data),
MjpegDecoderKind::Turbo(decoder) => decoder.decode_to_rgb(data),
}
}
@@ -513,14 +507,16 @@ impl SharedVideoPipeline {
};
let is_rkmpp_encoder = selected_codec_name.contains("rkmpp");
let is_software_encoder = selected_codec_name.contains("libx264")
|| selected_codec_name.contains("libx265")
|| selected_codec_name.contains("libvpx");
#[cfg(any(target_arch = "aarch64", target_arch = "arm"))]
if needs_mjpeg_decode && is_rkmpp_encoder && config.output_codec == VideoEncoderType::H264 {
info!("Initializing FFmpeg HW MJPEG->H264 pipeline (no fallback)");
let hw_config = HwMjpegH264Config {
if needs_mjpeg_decode
&& is_rkmpp_encoder
&& matches!(config.output_codec, VideoEncoderType::H264 | VideoEncoderType::H265)
{
info!(
"Initializing FFmpeg HW MJPEG->{} pipeline (no fallback)",
config.output_codec
);
let hw_config = HwMjpegH26xConfig {
decoder: "mjpeg_rkmpp".to_string(),
encoder: selected_codec_name.clone(),
width: config.resolution.width as i32,
@@ -530,14 +526,14 @@ impl SharedVideoPipeline {
gop: config.gop_size() as i32,
thread_count: 1,
};
let pipeline = HwMjpegH264Pipeline::new(hw_config).map_err(|e| {
let pipeline = HwMjpegH26xPipeline::new(hw_config).map_err(|e| {
let detail = if e.is_empty() { ffmpeg_hw_last_error() } else { e };
AppError::VideoError(format!(
"FFmpeg HW MJPEG->H264 init failed: {}",
detail
"FFmpeg HW MJPEG->{} init failed: {}",
config.output_codec, detail
))
})?;
info!("Using FFmpeg HW MJPEG->H264 pipeline");
info!("Using FFmpeg HW MJPEG->{} pipeline", config.output_codec);
return Ok(EncoderThreadState {
encoder: None,
mjpeg_decoder: None,
@@ -555,35 +551,12 @@ impl SharedVideoPipeline {
}
let pipeline_input_format = if needs_mjpeg_decode {
if is_rkmpp_encoder {
info!(
"MJPEG input detected, using RKMPP decoder ({} -> NV12 with NV16 fallback)",
config.input_format
);
#[cfg(any(target_arch = "aarch64", target_arch = "arm"))]
{
let decoder = MjpegRkmppDecoder::new(config.resolution)?;
let pipeline_format = PixelFormat::Nv12;
(Some(MjpegDecoderKind::Rkmpp(decoder)), pipeline_format)
}
#[cfg(not(any(target_arch = "aarch64", target_arch = "arm")))]
{
return Err(AppError::VideoError(
"RKMPP MJPEG decode is only supported on ARM builds".to_string(),
));
}
} else if is_software_encoder {
info!(
"MJPEG input detected, using TurboJPEG decoder ({} -> RGB24)",
config.input_format
);
let decoder = MjpegTurboDecoder::new(config.resolution)?;
(Some(MjpegDecoderKind::Turbo(decoder)), PixelFormat::Rgb24)
} else {
return Err(AppError::VideoError(
"MJPEG input requires RKMPP or software encoder".to_string(),
));
}
info!(
"MJPEG input detected, using TurboJPEG decoder ({} -> RGB24)",
config.input_format
);
let decoder = MjpegTurboDecoder::new(config.resolution)?;
(Some(MjpegDecoderKind::Turbo(decoder)), PixelFormat::Rgb24)
} else {
(None, config.input_format)
};

View File

@@ -794,6 +794,11 @@ impl VideoStreamManager {
self.webrtc_streamer.set_bitrate_preset(preset).await
}
/// Request a keyframe from the shared video pipeline
pub async fn request_keyframe(&self) -> crate::error::Result<()> {
self.webrtc_streamer.request_keyframe().await
}
/// Publish event to event bus
async fn publish_event(&self, event: SystemEvent) {
if let Some(ref events) = *self.events.read().await {

View File

@@ -342,6 +342,18 @@ impl WebRtcStreamer {
}
}
/// Request the encoder to generate a keyframe on next encode
pub async fn request_keyframe(&self) -> Result<()> {
if let Some(ref pipeline) = *self.video_pipeline.read().await {
pipeline.request_keyframe().await;
Ok(())
} else {
Err(AppError::VideoError(
"Video pipeline not running".to_string(),
))
}
}
// === Audio Management ===
/// Check if audio is enabled