refactor: 删除部分多余的代码和注释

This commit is contained in:
mofeng-git
2026-05-01 17:31:04 +08:00
parent 74035f8e12
commit d8e7de74a6
165 changed files with 2960 additions and 9917 deletions

73
src/rtsp/auth.rs Normal file
View File

@@ -0,0 +1,73 @@
use base64::Engine;
use crate::config::RtspConfig;
use super::types::RtspRequest;
pub(crate) fn extract_basic_auth(req: &RtspRequest) -> Option<(String, String)> {
let value = req.headers.get("authorization")?;
let mut parts = value.split_whitespace();
let scheme = parts.next()?;
if !scheme.eq_ignore_ascii_case("basic") {
return None;
}
let b64 = parts.next()?;
let decoded = base64::engine::general_purpose::STANDARD.decode(b64).ok()?;
let raw = String::from_utf8(decoded).ok()?;
let (user, pass) = raw.split_once(':')?;
Some((user.to_string(), pass.to_string()))
}
pub(crate) fn rtsp_auth_credentials(config: &RtspConfig) -> Option<(String, String)> {
let username = config.username.as_ref()?.trim();
if username.is_empty() {
return None;
}
Some((
username.to_string(),
config.password.clone().unwrap_or_default(),
))
}
#[cfg(test)]
mod tests {
use super::*;
use rtsp_types as rtsp;
use std::collections::HashMap;
#[test]
fn rtsp_auth_requires_non_empty_username() {
let mut config = RtspConfig::default();
config.password = Some("secret".to_string());
assert!(rtsp_auth_credentials(&config).is_none());
config.username = Some("".to_string());
assert!(rtsp_auth_credentials(&config).is_none());
config.username = Some("user".to_string());
let credentials = rtsp_auth_credentials(&config).expect("expected credentials");
assert_eq!(credentials, ("user".to_string(), "secret".to_string()));
config.password = None;
let credentials = rtsp_auth_credentials(&config).expect("expected credentials");
assert_eq!(credentials, ("user".to_string(), "".to_string()));
}
#[test]
fn extract_basic_auth_roundtrip() {
let encoded = base64::engine::general_purpose::STANDARD.encode(b"alice:pwd");
let mut headers = HashMap::new();
headers.insert("authorization".to_string(), format!("Basic {}", encoded));
let req = RtspRequest {
method: rtsp::Method::Options,
uri: "*".to_string(),
version: rtsp::Version::V1_0,
headers,
};
assert_eq!(
extract_basic_auth(&req),
Some(("alice".to_string(), "pwd".to_string()))
);
}
}

96
src/rtsp/bitstream.rs Normal file
View File

@@ -0,0 +1,96 @@
use bytes::Bytes;
use crate::video::encoder::registry::VideoEncoderType;
use crate::video::shared_video_pipeline::EncodedVideoFrame;
use super::state::ParameterSets;
pub(crate) fn update_parameter_sets(params: &mut ParameterSets, frame: &EncodedVideoFrame) {
let nal_units = split_annexb_nal_units(frame.data.as_ref());
match frame.codec {
VideoEncoderType::H264 => {
for nal in nal_units {
match h264_nal_type(nal) {
Some(7) => params.h264_sps = Some(Bytes::copy_from_slice(nal)),
Some(8) => params.h264_pps = Some(Bytes::copy_from_slice(nal)),
_ => {}
}
}
}
VideoEncoderType::H265 => {
for nal in nal_units {
match h265_nal_type(nal) {
Some(32) => params.h265_vps = Some(Bytes::copy_from_slice(nal)),
Some(33) => params.h265_sps = Some(Bytes::copy_from_slice(nal)),
Some(34) => params.h265_pps = Some(Bytes::copy_from_slice(nal)),
_ => {}
}
}
}
_ => {}
}
}
fn split_annexb_nal_units(data: &[u8]) -> Vec<&[u8]> {
let mut nal_units = Vec::new();
let mut cursor = 0usize;
while let Some((start, start_code_len)) = find_annexb_start_code(data, cursor) {
let nal_start = start + start_code_len;
if nal_start >= data.len() {
break;
}
let next_start = find_annexb_start_code(data, nal_start)
.map(|(idx, _)| idx)
.unwrap_or(data.len());
let mut nal_end = next_start;
while nal_end > nal_start && data[nal_end - 1] == 0 {
nal_end -= 1;
}
if nal_end > nal_start {
nal_units.push(&data[nal_start..nal_end]);
}
cursor = next_start;
}
nal_units
}
fn find_annexb_start_code(data: &[u8], from: usize) -> Option<(usize, usize)> {
if from >= data.len() {
return None;
}
let mut i = from;
while i + 3 <= data.len() {
if i + 4 <= data.len()
&& data[i] == 0
&& data[i + 1] == 0
&& data[i + 2] == 0
&& data[i + 3] == 1
{
return Some((i, 4));
}
if data[i] == 0 && data[i + 1] == 0 && data[i + 2] == 1 {
return Some((i, 3));
}
i += 1;
}
None
}
fn h264_nal_type(nal: &[u8]) -> Option<u8> {
nal.first().map(|value| value & 0x1f)
}
fn h265_nal_type(nal: &[u8]) -> Option<u8> {
nal.first().map(|value| (value >> 1) & 0x3f)
}

9
src/rtsp/codec.rs Normal file
View File

@@ -0,0 +1,9 @@
use crate::config::RtspCodec;
use crate::video::encoder::VideoCodecType;
pub(crate) fn rtsp_codec_to_video(codec: RtspCodec) -> VideoCodecType {
match codec {
RtspCodec::H264 => VideoCodecType::H264,
RtspCodec::H265 => VideoCodecType::H265,
}
}

View File

@@ -1,3 +1,14 @@
pub mod service;
//! RTSP TCP server exposing H.264/H.265 video from [`VideoStreamManager`](crate::video::VideoStreamManager).
mod auth;
mod bitstream;
mod codec;
mod protocol;
mod response;
mod sdp;
mod service;
mod state;
mod streaming;
mod types;
pub use service::{RtspService, RtspServiceStatus};

193
src/rtsp/protocol.rs Normal file
View File

@@ -0,0 +1,193 @@
use rtsp_types as rtsp;
use std::collections::HashMap;
use super::types::RtspRequest;
pub(crate) const OPTIONS_PUBLIC_CAPABILITIES: &str =
"OPTIONS, DESCRIBE, SETUP, PLAY, GET_PARAMETER, SET_PARAMETER, TEARDOWN";
pub(crate) fn strip_interleaved_frames_prefix(buffer: &mut Vec<u8>) -> bool {
if buffer.len() < 4 || buffer[0] != b'$' {
return false;
}
let payload_len = u16::from_be_bytes([buffer[2], buffer[3]]) as usize;
let frame_len = 4 + payload_len;
if buffer.len() < frame_len {
return false;
}
buffer.drain(0..frame_len);
true
}
pub(crate) fn take_rtsp_request_from_buffer(buffer: &mut Vec<u8>) -> Option<String> {
let delimiter = b"\r\n\r\n";
let pos = find_bytes(buffer, delimiter)?;
let req_end = pos + delimiter.len();
let req_bytes: Vec<u8> = buffer.drain(0..req_end).collect();
Some(String::from_utf8_lossy(&req_bytes).to_string())
}
fn find_bytes(haystack: &[u8], needle: &[u8]) -> Option<usize> {
haystack
.windows(needle.len())
.position(|window| window == needle)
}
pub(crate) fn parse_rtsp_request(raw: &str) -> Option<RtspRequest> {
let (message, consumed): (rtsp::Message<Vec<u8>>, usize) =
rtsp::Message::parse(raw.as_bytes()).ok()?;
if consumed != raw.len() {
return None;
}
let request = match message {
rtsp::Message::Request(req) => req,
_ => return None,
};
let uri = request
.request_uri()
.map(|value| value.as_str().to_string())
.unwrap_or_default();
let mut headers = HashMap::new();
for (name, value) in request.headers() {
headers.insert(name.to_string().to_ascii_lowercase(), value.to_string());
}
Some(RtspRequest {
method: request.method().clone(),
uri,
version: request.version(),
headers,
})
}
pub(crate) fn parse_interleaved_channel(transport: &str) -> Option<u8> {
let lower = transport.to_ascii_lowercase();
if let Some((_, v)) = lower.split_once("interleaved=") {
let head = v.split(';').next().unwrap_or(v);
let first = head.split('-').next().unwrap_or(head).trim();
return first.parse::<u8>().ok();
}
None
}
pub(crate) fn is_tcp_transport_request(transport: &str) -> bool {
transport
.split(',')
.map(str::trim)
.map(str::to_ascii_lowercase)
.any(|item| item.contains("rtp/avp/tcp") || item.contains("interleaved="))
}
pub(crate) fn is_valid_rtsp_path(method: &rtsp::Method, uri: &str, configured_path: &str) -> bool {
if matches!(method, rtsp::Method::Options) && uri.trim() == "*" {
return true;
}
let normalized_cfg = configured_path.trim_matches('/');
if normalized_cfg.is_empty() {
return false;
}
let request_path = extract_rtsp_path(uri);
if request_path == normalized_cfg {
return true;
}
if !matches!(method, rtsp::Method::Setup | rtsp::Method::Teardown) {
return false;
}
let control_track_path = format!("{}/trackID=0", normalized_cfg);
request_path == "trackID=0" || request_path == control_track_path
}
fn extract_rtsp_path(uri: &str) -> String {
let raw_path = if let Some((_, remainder)) = uri.split_once("://") {
match remainder.find('/') {
Some(idx) => &remainder[idx..],
None => "/",
}
} else {
uri
};
raw_path
.split('?')
.next()
.unwrap_or(raw_path)
.split('#')
.next()
.unwrap_or(raw_path)
.trim_matches('/')
.to_string()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn rtsp_path_matching_follows_sdp_control_rules() {
assert!(is_valid_rtsp_path(
&rtsp::Method::Describe,
"rtsp://127.0.0.1/live",
"live"
));
assert!(is_valid_rtsp_path(
&rtsp::Method::Describe,
"rtsp://127.0.0.1/live/?token=1",
"/live/"
));
assert!(!is_valid_rtsp_path(
&rtsp::Method::Describe,
"rtsp://127.0.0.1/live2",
"live"
));
assert!(!is_valid_rtsp_path(
&rtsp::Method::Describe,
"rtsp://127.0.0.1/",
"/"
));
assert!(is_valid_rtsp_path(
&rtsp::Method::Setup,
"rtsp://127.0.0.1/live/trackID=0",
"live"
));
assert!(is_valid_rtsp_path(
&rtsp::Method::Setup,
"rtsp://127.0.0.1/trackID=0",
"live"
));
assert!(!is_valid_rtsp_path(
&rtsp::Method::Describe,
"rtsp://127.0.0.1/live/trackID=0",
"live"
));
assert!(is_valid_rtsp_path(&rtsp::Method::Options, "*", "live"));
}
#[test]
fn transport_parsing_detects_tcp_interleaved_requests() {
assert!(is_tcp_transport_request(
"RTP/AVP/TCP;unicast;interleaved=0-1"
));
assert!(is_tcp_transport_request("RTP/AVP;unicast;interleaved=2-3"));
assert!(!is_tcp_transport_request(
"RTP/AVP;unicast;client_port=8000-8001"
));
}
#[test]
fn options_public_includes_standard_methods() {
assert!(OPTIONS_PUBLIC_CAPABILITIES.contains("GET_PARAMETER"));
assert!(OPTIONS_PUBLIC_CAPABILITIES.contains("TEARDOWN"));
}
}

81
src/rtsp/response.rs Normal file
View File

@@ -0,0 +1,81 @@
use rtsp_types as rtsp;
use tokio::io::{AsyncWrite, AsyncWriteExt};
use crate::error::{AppError, Result};
use super::types::RtspRequest;
async fn serialize_and_write<W: AsyncWrite + Unpin>(
stream: &mut W,
response: rtsp::Response<Vec<u8>>,
) -> Result<()> {
let mut data = Vec::new();
response
.write(&mut data)
.map_err(|e| AppError::BadRequest(format!("failed to serialize RTSP response: {}", e)))?;
stream.write_all(&data).await?;
Ok(())
}
pub(crate) async fn send_simple_response<W: AsyncWrite + Unpin>(
stream: &mut W,
code: u16,
_reason: &str,
cseq: Option<&str>,
body: &str,
) -> Result<()> {
let mut builder = rtsp::Response::builder(rtsp::Version::V1_0, status_code_from_u16(code));
if let Some(cseq) = cseq {
builder = builder.header(rtsp::headers::CSEQ, cseq);
}
let response = builder.build(body.as_bytes().to_vec());
serialize_and_write(stream, response).await
}
pub(crate) async fn send_response<W: AsyncWrite + Unpin>(
stream: &mut W,
req: &RtspRequest,
code: u16,
_reason: &str,
extra_headers: Vec<(String, String)>,
body: &str,
session_id: &str,
) -> Result<()> {
let cseq = req
.headers
.get("cseq")
.cloned()
.unwrap_or_else(|| "1".to_string());
let mut builder = rtsp::Response::builder(req.version, status_code_from_u16(code))
.header(rtsp::headers::CSEQ, cseq.as_str());
if !session_id.is_empty() {
builder = builder.header(rtsp::headers::SESSION, session_id);
}
for (name, value) in extra_headers {
let header_name = rtsp::HeaderName::try_from(name.as_str()).map_err(|e| {
AppError::BadRequest(format!("invalid RTSP header name {}: {}", name, e))
})?;
builder = builder.header(header_name, value);
}
let response = builder.build(body.as_bytes().to_vec());
serialize_and_write(stream, response).await
}
pub(crate) fn status_code_from_u16(code: u16) -> rtsp::StatusCode {
match code {
200 => rtsp::StatusCode::Ok,
400 => rtsp::StatusCode::BadRequest,
401 => rtsp::StatusCode::Unauthorized,
404 => rtsp::StatusCode::NotFound,
405 => rtsp::StatusCode::MethodNotAllowed,
453 => rtsp::StatusCode::NotEnoughBandwidth,
455 => rtsp::StatusCode::MethodNotValidInThisState,
461 => rtsp::StatusCode::UnsupportedTransport,
_ => rtsp::StatusCode::InternalServerError,
}
}

224
src/rtsp/sdp.rs Normal file
View File

@@ -0,0 +1,224 @@
use base64::Engine;
use sdp_types as sdp;
use crate::config::RtspConfig;
use crate::video::encoder::VideoCodecType;
use crate::webrtc::rtp::parse_profile_level_id_from_sps;
use super::state::ParameterSets;
pub(crate) fn build_h264_fmtp(payload_type: u8, params: &ParameterSets) -> String {
let mut attrs = vec!["packetization-mode=1".to_string()];
if let Some(sps) = params.h264_sps.as_ref() {
if let Some(profile_level_id) = parse_profile_level_id_from_sps(sps) {
attrs.push(format!("profile-level-id={}", profile_level_id));
}
} else {
attrs.push("profile-level-id=42e01f".to_string());
}
if let (Some(sps), Some(pps)) = (params.h264_sps.as_ref(), params.h264_pps.as_ref()) {
let sps_b64 = base64::engine::general_purpose::STANDARD.encode(sps.as_ref());
let pps_b64 = base64::engine::general_purpose::STANDARD.encode(pps.as_ref());
attrs.push(format!("sprop-parameter-sets={},{}", sps_b64, pps_b64));
}
format!("{} {}", payload_type, attrs.join(";"))
}
pub(crate) fn build_h265_fmtp(payload_type: u8, params: &ParameterSets) -> String {
let mut attrs = Vec::new();
if let Some(vps) = params.h265_vps.as_ref() {
attrs.push(format!(
"sprop-vps={}",
base64::engine::general_purpose::STANDARD.encode(vps.as_ref())
));
}
if let Some(sps) = params.h265_sps.as_ref() {
attrs.push(format!(
"sprop-sps={}",
base64::engine::general_purpose::STANDARD.encode(sps.as_ref())
));
}
if let Some(pps) = params.h265_pps.as_ref() {
attrs.push(format!(
"sprop-pps={}",
base64::engine::general_purpose::STANDARD.encode(pps.as_ref())
));
}
if attrs.is_empty() {
format!("{} profile-id=1", payload_type)
} else {
format!("{} {}", payload_type, attrs.join(";"))
}
}
pub(crate) fn build_sdp(
config: &RtspConfig,
codec: VideoCodecType,
params: &ParameterSets,
) -> String {
let (payload_type, codec_name, fmtp_value) = match codec {
VideoCodecType::H264 => (96u8, "H264", build_h264_fmtp(96, params)),
VideoCodecType::H265 => (99u8, "H265", build_h265_fmtp(99, params)),
_ => {
tracing::warn!("RTSP SDP: unexpected VideoCodecType, falling back to H264");
(96u8, "H264", build_h264_fmtp(96, params))
}
};
let session = sdp::Session {
origin: sdp::Origin {
username: Some("-".to_string()),
sess_id: "0".to_string(),
sess_version: 0,
nettype: "IN".to_string(),
addrtype: "IP4".to_string(),
unicast_address: config.bind.clone(),
},
session_name: "One-KVM RTSP Stream".to_string(),
session_description: None,
uri: None,
emails: Vec::new(),
phones: Vec::new(),
connection: Some(sdp::Connection {
nettype: "IN".to_string(),
addrtype: "IP4".to_string(),
connection_address: "0.0.0.0".to_string(),
}),
bandwidths: Vec::new(),
times: vec![sdp::Time {
start_time: 0,
stop_time: 0,
repeats: Vec::new(),
}],
time_zones: Vec::new(),
key: None,
attributes: vec![sdp::Attribute {
attribute: "control".to_string(),
value: Some("*".to_string()),
}],
medias: vec![sdp::Media {
media: "video".to_string(),
port: 0,
num_ports: None,
proto: "RTP/AVP".to_string(),
fmt: payload_type.to_string(),
media_title: None,
connections: Vec::new(),
bandwidths: Vec::new(),
key: None,
attributes: vec![
sdp::Attribute {
attribute: "rtpmap".to_string(),
value: Some(format!("{} {}/90000", payload_type, codec_name)),
},
sdp::Attribute {
attribute: "fmtp".to_string(),
value: Some(fmtp_value),
},
sdp::Attribute {
attribute: "control".to_string(),
value: Some("trackID=0".to_string()),
},
],
}],
};
let mut output = Vec::new();
if let Err(err) = session.write(&mut output) {
tracing::warn!("Failed to serialize SDP with sdp-types: {}", err);
return String::new();
}
match String::from_utf8(output) {
Ok(sdp_text) => sdp_text,
Err(err) => {
tracing::warn!("Failed to convert SDP bytes to UTF-8: {}", err);
String::new()
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::RtspConfig;
use bytes::Bytes;
#[test]
fn build_sdp_h264_is_parseable_with_expected_video_attributes() {
let config = RtspConfig::default();
let mut params = ParameterSets::default();
params.h264_sps = Some(Bytes::from_static(&[0x67, 0x42, 0xe0, 0x1f, 0x96, 0x54]));
params.h264_pps = Some(Bytes::from_static(&[0x68, 0xce, 0x06, 0xe2]));
let sdp_text = build_sdp(&config, VideoCodecType::H264, &params);
assert!(!sdp_text.is_empty());
let session = sdp::Session::parse(sdp_text.as_bytes()).expect("sdp parse failed");
assert_eq!(session.session_name, "One-KVM RTSP Stream");
assert_eq!(session.medias.len(), 1);
let media = &session.medias[0];
assert_eq!(media.media, "video");
assert_eq!(media.proto, "RTP/AVP");
assert_eq!(media.fmt, "96");
let has_rtpmap = media.attributes.iter().any(|attr| {
attr.attribute == "rtpmap" && attr.value.as_deref() == Some("96 H264/90000")
});
assert!(has_rtpmap);
let fmtp_value = media
.attributes
.iter()
.find(|attr| attr.attribute == "fmtp")
.and_then(|attr| attr.value.as_deref())
.expect("missing fmtp value");
assert!(fmtp_value.starts_with("96 "));
assert!(fmtp_value.contains("packetization-mode=1"));
assert!(fmtp_value.contains("sprop-parameter-sets="));
}
#[test]
fn build_sdp_h265_is_parseable_with_expected_video_attributes() {
let config = RtspConfig::default();
let mut params = ParameterSets::default();
params.h265_vps = Some(Bytes::from_static(&[0x40, 0x01, 0x0c, 0x01]));
params.h265_sps = Some(Bytes::from_static(&[0x42, 0x01, 0x01, 0x60]));
params.h265_pps = Some(Bytes::from_static(&[0x44, 0x01, 0xc0, 0x73]));
let sdp_text = build_sdp(&config, VideoCodecType::H265, &params);
assert!(!sdp_text.is_empty());
let session = sdp::Session::parse(sdp_text.as_bytes()).expect("sdp parse failed");
assert_eq!(session.medias.len(), 1);
let media = &session.medias[0];
assert_eq!(media.media, "video");
assert_eq!(media.proto, "RTP/AVP");
assert_eq!(media.fmt, "99");
let has_rtpmap = media.attributes.iter().any(|attr| {
attr.attribute == "rtpmap" && attr.value.as_deref() == Some("99 H265/90000")
});
assert!(has_rtpmap);
let fmtp_value = media
.attributes
.iter()
.find(|attr| attr.attribute == "fmtp")
.and_then(|attr| attr.value.as_deref())
.expect("missing fmtp value");
assert!(fmtp_value.starts_with("99 "));
assert!(fmtp_value.contains("sprop-vps="));
assert!(fmtp_value.contains("sprop-sps="));
assert!(fmtp_value.contains("sprop-pps="));
}
}

File diff suppressed because it is too large Load Diff

28
src/rtsp/state.rs Normal file
View File

@@ -0,0 +1,28 @@
use bytes::Bytes;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::sync::{Mutex, RwLock};
#[derive(Default, Clone)]
pub(crate) struct ParameterSets {
pub h264_sps: Option<Bytes>,
pub h264_pps: Option<Bytes>,
pub h265_vps: Option<Bytes>,
pub h265_sps: Option<Bytes>,
pub h265_pps: Option<Bytes>,
}
#[derive(Clone)]
pub(crate) struct SharedRtspState {
pub active_client: Arc<Mutex<Option<SocketAddr>>>,
pub parameter_sets: Arc<RwLock<ParameterSets>>,
}
impl SharedRtspState {
pub fn new() -> Self {
Self {
active_client: Arc::new(Mutex::new(None)),
parameter_sets: Arc::new(RwLock::new(ParameterSets::default())),
}
}
}

367
src/rtsp/streaming.rs Normal file
View File

@@ -0,0 +1,367 @@
use bytes::Bytes;
use rand::Rng;
use rtp::packet::Packet;
use rtp::packetizer::Payloader;
use rtsp_types as rtsp;
use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::time::{sleep, Duration};
use webrtc::util::{Marshal, MarshalSize};
use crate::config::RtspCodec;
use crate::error::{AppError, Result};
use crate::video::encoder::registry::VideoEncoderType;
use crate::video::shared_video_pipeline::EncodedVideoFrame;
use crate::video::VideoStreamManager;
use crate::webrtc::h265_payloader::H265Payloader;
use super::bitstream::update_parameter_sets;
use super::protocol::{
parse_rtsp_request, strip_interleaved_frames_prefix, take_rtsp_request_from_buffer,
};
use super::response::send_response;
use super::state::SharedRtspState;
use super::types::RtspRequest;
pub(crate) const RTP_CLOCK_RATE: u32 = 90_000;
pub(crate) const RTP_MTU: usize = 1200;
pub(crate) const RTSP_BUF_SIZE: usize = 8192;
const RTSP_RESUBSCRIBE_DELAY_MS: u64 = 300;
pub(crate) async fn stream_video_interleaved(
stream: TcpStream,
video_manager: &Arc<VideoStreamManager>,
rtsp_codec: RtspCodec,
channel: u8,
shared: SharedRtspState,
session_id: String,
) -> Result<()> {
let (mut reader, mut writer) = stream.into_split();
let mut rx = video_manager
.subscribe_encoded_frames()
.await
.ok_or_else(|| {
AppError::VideoError("RTSP failed to subscribe encoded frames".to_string())
})?;
video_manager.request_keyframe().await.ok();
let payload_type = match rtsp_codec {
RtspCodec::H264 => 96,
RtspCodec::H265 => 99,
};
let mut sequence_number: u16 = rand::rng().random();
let ssrc: u32 = rand::rng().random();
let mut h264_payloader = rtp::codecs::h264::H264Payloader::default();
let mut h265_payloader = H265Payloader::new();
let mut ctrl_read_buf = [0u8; RTSP_BUF_SIZE];
let mut ctrl_buffer = Vec::with_capacity(RTSP_BUF_SIZE);
// 4-byte interleaved prefix + RTP header + payload shard (≤ RTP_MTU)
let mut interleaved_rtp_buf = Vec::with_capacity(4 + RTP_MTU + 96);
let mut last_rtp_timestamp: u32 = 0;
loop {
tokio::select! {
maybe_frame = rx.recv() => {
let Some(frame) = maybe_frame else {
tracing::warn!("RTSP encoded frame subscription ended, attempting to restart pipeline");
if let Some(new_rx) = video_manager.subscribe_encoded_frames().await {
rx = new_rx;
let _ = video_manager.request_keyframe().await;
tracing::info!("RTSP frame subscription recovered");
} else {
tracing::warn!(
"RTSP failed to resubscribe encoded frames, retrying in {}ms",
RTSP_RESUBSCRIBE_DELAY_MS
);
sleep(Duration::from_millis(RTSP_RESUBSCRIBE_DELAY_MS)).await;
}
continue;
};
if !is_frame_codec_match(&frame, &rtsp_codec) {
continue;
}
{
let mut params = shared.parameter_sets.write().await;
update_parameter_sets(&mut params, &frame);
}
let rtp_timestamp = monotonic_rtp_timestamp(
frame.pts_ms,
&mut last_rtp_timestamp,
frame.duration,
);
let payloads: Vec<Bytes> = match rtsp_codec {
RtspCodec::H264 => h264_payloader
.payload(RTP_MTU, &frame.data)
.map_err(|e| AppError::VideoError(format!("H264 payload failed: {}", e)))?,
RtspCodec::H265 => h265_payloader.payload(RTP_MTU, &frame.data),
};
if payloads.is_empty() {
continue;
}
let total_payloads = payloads.len();
for (idx, payload) in payloads.into_iter().enumerate() {
let marker = idx == total_payloads.saturating_sub(1);
let packet = Packet {
header: rtp::header::Header {
version: 2,
padding: false,
extension: false,
marker,
payload_type,
sequence_number,
timestamp: rtp_timestamp,
ssrc,
..Default::default()
},
payload,
};
sequence_number = sequence_number.wrapping_add(1);
send_interleaved_rtp(&mut writer, channel, &packet, &mut interleaved_rtp_buf)
.await?;
}
if frame.is_keyframe {
tracing::debug!("RTSP keyframe sent");
}
}
read_res = reader.read(&mut ctrl_read_buf) => {
let n = read_res?;
if n == 0 {
break;
}
ctrl_buffer.extend_from_slice(&ctrl_read_buf[..n]);
while strip_interleaved_frames_prefix(&mut ctrl_buffer) {}
while let Some(raw_req) = take_rtsp_request_from_buffer(&mut ctrl_buffer) {
let Some(req) = parse_rtsp_request(&raw_req) else {
continue;
};
if handle_play_control_request(&mut writer, &req, &session_id).await? {
return Ok(());
}
while strip_interleaved_frames_prefix(&mut ctrl_buffer) {}
}
}
}
}
Ok(())
}
pub(crate) async fn send_interleaved_rtp<W: AsyncWrite + Unpin>(
stream: &mut W,
channel: u8,
packet: &Packet,
marshal_buf: &mut Vec<u8>,
) -> Result<()> {
let rtp_len = packet.marshal_size();
let rtp_len_u16 = u16::try_from(rtp_len).map_err(|_| {
AppError::VideoError(format!(
"RTP packet too large for interleaved framing: {} bytes",
rtp_len
))
})?;
marshal_buf.clear();
marshal_buf.reserve(4 + rtp_len);
marshal_buf.extend_from_slice(&[b'$', channel, (rtp_len_u16 >> 8) as u8, rtp_len_u16 as u8]);
let body_off = marshal_buf.len();
marshal_buf.resize(body_off + rtp_len, 0);
let written = packet
.marshal_to(&mut marshal_buf[body_off..])
.map_err(|e| AppError::VideoError(format!("RTP marshal failed: {}", e)))?;
if written != rtp_len {
return Err(AppError::VideoError(format!(
"RTP marshal size mismatch: wrote {written}, expected {rtp_len}"
)));
}
stream.write_all(marshal_buf).await?;
Ok(())
}
pub(crate) async fn handle_play_control_request<W: AsyncWrite + Unpin>(
stream: &mut W,
req: &RtspRequest,
session_id: &str,
) -> Result<bool> {
use super::protocol::OPTIONS_PUBLIC_CAPABILITIES;
match &req.method {
rtsp::Method::Teardown => {
send_response(stream, req, 200, "OK", vec![], "", session_id).await?;
Ok(true)
}
rtsp::Method::Options => {
send_response(
stream,
req,
200,
"OK",
vec![(
"Public".to_string(),
OPTIONS_PUBLIC_CAPABILITIES.to_string(),
)],
"",
session_id,
)
.await?;
Ok(false)
}
rtsp::Method::GetParameter | rtsp::Method::SetParameter => {
send_response(stream, req, 200, "OK", vec![], "", session_id).await?;
Ok(false)
}
_ => {
send_response(
stream,
req,
405,
"Method Not Allowed",
vec![],
"",
session_id,
)
.await?;
Ok(false)
}
}
}
fn pts_to_rtp_timestamp(pts_ms: i64) -> u32 {
if pts_ms <= 0 {
return 0;
}
((pts_ms as u64 * RTP_CLOCK_RATE as u64) / 1000) as u32
}
fn rtp_timestamp_increment(frame_duration: Duration) -> u32 {
let inc = (frame_duration.as_secs_f64() * f64::from(RTP_CLOCK_RATE)).round() as u32;
inc.max(1)
}
fn monotonic_rtp_timestamp(pts_ms: i64, last: &mut u32, frame_duration: Duration) -> u32 {
let from_pts = pts_to_rtp_timestamp(pts_ms);
let inc = rtp_timestamp_increment(frame_duration);
let ts = if from_pts > *last {
from_pts
} else {
last.wrapping_add(inc)
};
*last = ts;
ts
}
fn is_frame_codec_match(frame: &EncodedVideoFrame, codec: &RtspCodec) -> bool {
matches!(
(frame.codec, codec),
(VideoEncoderType::H264, RtspCodec::H264) | (VideoEncoderType::H265, RtspCodec::H265)
)
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
use tokio::io::{duplex, AsyncReadExt};
fn make_test_request(method: rtsp::Method) -> RtspRequest {
let mut headers = HashMap::new();
headers.insert("cseq".to_string(), "7".to_string());
RtspRequest {
method,
uri: "rtsp://127.0.0.1/live".to_string(),
version: rtsp::Version::V1_0,
headers,
}
}
async fn read_response_from_duplex(
mut client: tokio::io::DuplexStream,
) -> rtsp::Response<Vec<u8>> {
let mut buf = vec![0u8; 4096];
let n = client
.read(&mut buf)
.await
.expect("failed to read rtsp response");
assert!(n > 0);
let (message, consumed): (rtsp::Message<Vec<u8>>, usize) =
rtsp::Message::parse(&buf[..n]).expect("failed to parse rtsp response");
assert_eq!(consumed, n);
match message {
rtsp::Message::Response(response) => response,
_ => panic!("expected RTSP response"),
}
}
#[tokio::test]
async fn play_control_teardown_returns_ok_and_stop() {
let req = make_test_request(rtsp::Method::Teardown);
let (client, mut server) = duplex(4096);
let should_stop = handle_play_control_request(&mut server, &req, "session-1")
.await
.expect("control handling failed");
assert!(should_stop);
drop(server);
let response = read_response_from_duplex(client).await;
assert_eq!(response.status(), rtsp::StatusCode::Ok);
}
#[tokio::test]
async fn play_control_pause_returns_method_not_allowed() {
let req = make_test_request(rtsp::Method::Pause);
let (client, mut server) = duplex(4096);
let should_stop = handle_play_control_request(&mut server, &req, "session-1")
.await
.expect("control handling failed");
assert!(!should_stop);
drop(server);
let response = read_response_from_duplex(client).await;
assert_eq!(response.status(), rtsp::StatusCode::MethodNotAllowed);
}
#[test]
fn monotonic_rtp_timestamp_steps_when_pts_stays_zero() {
let d = Duration::from_millis(33);
let mut last = 0u32;
let a = monotonic_rtp_timestamp(0, &mut last, d);
let b = monotonic_rtp_timestamp(0, &mut last, d);
let c = monotonic_rtp_timestamp(0, &mut last, d);
assert!(a > 0);
assert!(b > a);
assert!(c > b);
}
#[test]
fn monotonic_rtp_timestamp_uses_pts_when_it_advances() {
let d = Duration::from_millis(33);
let mut last = 0u32;
let a = monotonic_rtp_timestamp(1000, &mut last, d);
assert_eq!(a, 90_000);
let b = monotonic_rtp_timestamp(2000, &mut last, d);
assert_eq!(b, 180_000);
}
}

53
src/rtsp/types.rs Normal file
View File

@@ -0,0 +1,53 @@
use rand::Rng;
use rtsp_types as rtsp;
use std::collections::HashMap;
use std::fmt;
#[derive(Debug, Clone, PartialEq)]
pub enum RtspServiceStatus {
Stopped,
Starting,
Running,
Error(String),
}
impl fmt::Display for RtspServiceStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Stopped => write!(f, "stopped"),
Self::Starting => write!(f, "starting"),
Self::Running => write!(f, "running"),
Self::Error(err) => write!(f, "error: {}", err),
}
}
}
#[derive(Debug, Clone)]
pub(crate) struct RtspRequest {
pub method: rtsp::Method,
pub uri: String,
pub version: rtsp::Version,
pub headers: HashMap<String, String>,
}
pub(crate) struct RtspConnectionState {
pub session_id: String,
pub setup_done: bool,
pub interleaved_channel: u8,
}
impl RtspConnectionState {
pub fn new() -> Self {
Self {
session_id: generate_session_id(),
setup_done: false,
interleaved_channel: 0,
}
}
}
pub(crate) fn generate_session_id() -> String {
let mut rng = rand::rng();
let value: u64 = rng.random();
format!("{:016x}", value)
}