rave/tests/e2e_rtmp_httpflv.rs

428 lines
14 KiB
Rust
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//! RTMP 推流 → HTTP-FLV 拉流 端到端集成测试
//!
//! 测试完整的流媒体链路:
//! 1. 启动 RTMP 服务器(接收推流)
//! 2. 启动 HTTP-FLV 服务器(分发拉流)
//! 3. ffmpeg 推流到 RTMP
//! 4. ffplay/ffmpeg 从 HTTP-FLV 拉流验证
//!
//! 使用方法:
//! E2E=1 cargo test --test e2e_rtmp_httpflv -- --nocapture
use std::fs;
use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use rave::core::group::StreamManager;
use rave::protocol::rtmp::session::{RtmpSession, SessionEvent, SessionState};
use rave::protocol::httpflv::{self, HttpRequestBuffer};
use rave::remux::rtmp2flv;
use rave::sdk::context::EngineContext;
use rave::sdk::traits::StreamManagerApi;
use rave::sdk::types::StreamPath;
/// RTMP 推流处理线程 — 接收 ffmpeg 推流并通过 StreamManager 分发
fn rtmp_publisher_handler(
mut stream: TcpStream,
sm: Arc<dyn StreamManagerApi>,
publish_count: Arc<AtomicUsize>,
) {
let _peer = stream.peer_addr().map(|a| a.to_string()).unwrap_or_default();
let mut session = RtmpSession::new();
let mut buf = [0u8; 65536];
let mut stream_path: Option<StreamPath> = None;
stream.set_read_timeout(Some(Duration::from_secs(15))).ok();
loop {
match stream.read(&mut buf) {
Ok(0) => break,
Ok(n) => {
let events = session.on_data(&buf[..n]);
for event in events {
match event {
SessionEvent::SendData(data) => {
if stream.write_all(&data).is_err() {
return;
}
}
SessionEvent::PublishStart { app, stream: name } => {
let sp = StreamPath::new(&app, &name);
let _ = sm.create_stream(sp.clone());
stream_path = Some(sp);
publish_count.fetch_add(1, Ordering::Relaxed);
}
SessionEvent::MediaData { msg_type_id, timestamp, data } => {
if let Some(ref sp) = stream_path {
if let Some(frame) =
rtmp_media_to_avframe(msg_type_id, timestamp, &data)
{
sm.dispatch_frame(sp, frame);
}
}
}
SessionEvent::Error(_) => {
if session.state() == SessionState::Closed {
return;
}
}
SessionEvent::SessionClosed => return,
_ => {}
}
}
}
Err(_) => break,
}
}
if let Some(sp) = stream_path {
sm.remove_stream(&sp);
}
}
/// HTTP-FLV 拉流处理线程 — 订阅流并持续发送 FLV 数据
fn httpflv_subscriber_handler(
mut stream: TcpStream,
sm: Arc<dyn StreamManagerApi>,
bytes_sent: Arc<AtomicUsize>,
frames_sent: Arc<AtomicUsize>,
ready_flag: Arc<AtomicBool>,
) {
let _peer = stream.peer_addr().map(|a| a.to_string()).unwrap_or_default();
stream.set_read_timeout(Some(Duration::from_secs(5))).ok();
stream.set_write_timeout(Some(Duration::from_secs(5))).ok();
// 读取 HTTP 请求
let mut req_buf = HttpRequestBuffer::new();
let mut read_buf = [0u8; 4096];
loop {
match stream.read(&mut read_buf) {
Ok(0) => return,
Ok(n) => {
if req_buf.feed(&read_buf[..n]) {
break;
}
}
Err(_) => return,
}
}
let request = match req_buf.parse() {
Ok(r) => r,
Err(_) => return,
};
if !httpflv::is_http_flv_request(&request) {
let _ = stream.write_all(httpflv::build_http_404_response().as_bytes());
return;
}
let (app, stream_name) = match httpflv::parse_stream_path(&request.path) {
Some(pair) => pair,
None => return,
};
let sp = StreamPath::new(&app, &stream_name);
// 等待流可用(最多等 10 秒)
let subscriber = {
let mut attempts = 0;
loop {
match sm.subscribe(&sp) {
Ok(sub) => break sub,
Err(_) => {
attempts += 1;
if attempts > 200 {
return;
}
thread::sleep(Duration::from_millis(50));
}
}
}
};
// 发送 HTTP 200 + FLV Header
let http_resp = httpflv::build_http_flv_response();
if stream.write_all(http_resp.as_bytes()).is_err() {
return;
}
let flv_header = rtmp2flv::flv_header_with_audio_video();
if stream.write_all(&flv_header).is_err() {
return;
}
let _ = stream.flush();
bytes_sent.fetch_add(flv_header.len(), Ordering::Relaxed);
ready_flag.store(true, Ordering::Relaxed);
// 循环读取帧 → 发送 FLV Tag
let mut idle_count: u32 = 0;
loop {
match subscriber.read_frame() {
Some(frame) => {
idle_count = 0;
let msg_type = match frame.media_type {
rave::sdk::types::MediaType::Audio => 8,
rave::sdk::types::MediaType::Video => 9,
rave::sdk::types::MediaType::Data => 18,
};
if let Some(tag) = rtmp2flv::rtmp_msg_to_flv_tag(
msg_type,
frame.timestamp_ms as u32,
&frame.data,
) {
let encoded = tag.encode();
if stream.write_all(&encoded).is_err() {
break;
}
let _ = stream.flush();
bytes_sent.fetch_add(encoded.len(), Ordering::Relaxed);
frames_sent.fetch_add(1, Ordering::Relaxed);
}
}
None => {
idle_count += 1;
if idle_count > 600 {
if !sm.has_stream(&sp) {
break;
}
idle_count = 0;
}
thread::sleep(Duration::from_millis(5));
}
}
if !subscriber.is_active() {
break;
}
}
subscriber.close();
}
/// 将 RTMP 媒体消息转为 AVFrame复用 server.rs 的逻辑)
fn rtmp_media_to_avframe(
msg_type: u8,
timestamp: u32,
data: &[u8],
) -> Option<rave::sdk::types::AVFrame> {
use rave::sdk::types::*;
if data.is_empty() {
return None;
}
match msg_type {
8 => {
// Audio
let byte0 = data[0];
let sound_format = (byte0 >> 4) & 0x0F;
let codec = match sound_format {
10 => AudioCodec::Aac,
_ => AudioCodec::Unknown,
};
Some(AVFrame::new_audio(timestamp as u64, Arc::new(data.to_vec()), codec))
}
9 => {
// Video
let byte0 = data[0];
let frame_type_raw = (byte0 >> 4) & 0x0F;
let codec_id = byte0 & 0x0F;
let frame_type = match frame_type_raw {
1 => FrameType::KeyFrame,
2 => FrameType::InterFrame,
_ => FrameType::InterFrame,
};
let codec = match codec_id {
7 => VideoCodec::H264,
12 => VideoCodec::H265,
_ => VideoCodec::Unknown,
};
Some(AVFrame::new_video(timestamp as u64, Arc::new(data.to_vec()), codec, frame_type))
}
_ => None,
}
}
#[test]
fn test_e2e_rtmp_push_httpflv_pull() {
if std::env::var("E2E").is_err() {
eprintln!("SKIP: set E2E=1 to run end-to-end test");
eprintln!(" E2E=1 cargo test --test e2e_rtmp_httpflv -- --nocapture");
return;
}
let video_path = "/tmp/test_video.mp4";
if !std::path::Path::new(video_path).exists() {
eprintln!("SKIP: {} not found", video_path);
return;
}
let sm: Arc<dyn StreamManagerApi> = Arc::new(StreamManager::new());
let _context = Arc::new(EngineContext::new(sm.clone()));
// 统计
let publish_count = Arc::new(AtomicUsize::new(0));
let httpflv_bytes_sent = Arc::new(AtomicUsize::new(0));
let httpflv_frames_sent = Arc::new(AtomicUsize::new(0));
let httpflv_ready = Arc::new(AtomicBool::new(false));
let running = Arc::new(AtomicBool::new(true));
// 启动 RTMP 服务器
let rtmp_listener = TcpListener::bind("127.0.0.1:0").unwrap();
let rtmp_port = rtmp_listener.local_addr().unwrap().port();
let sm_rtmp = sm.clone();
let pc_rtmp = publish_count.clone();
let running_rtmp = running.clone();
let rtmp_handle = thread::spawn(move || {
rtmp_listener.set_nonblocking(true).ok();
while running_rtmp.load(Ordering::Relaxed) {
match rtmp_listener.accept() {
Ok((stream, _)) => {
let sm = sm_rtmp.clone();
let pc = pc_rtmp.clone();
thread::spawn(move || rtmp_publisher_handler(stream, sm, pc));
}
Err(_) => {}
}
thread::sleep(Duration::from_millis(10));
}
});
// 启动 HTTP-FLV 服务器
let httpflv_listener = TcpListener::bind("127.0.0.1:0").unwrap();
let httpflv_port = httpflv_listener.local_addr().unwrap().port();
let sm_hflv = sm.clone();
let bs_hflv = httpflv_bytes_sent.clone();
let fs_hflv = httpflv_frames_sent.clone();
let rf_hflv = httpflv_ready.clone();
let running_hflv = running.clone();
let httpflv_handle = thread::spawn(move || {
httpflv_listener.set_nonblocking(true).ok();
while running_hflv.load(Ordering::Relaxed) {
match httpflv_listener.accept() {
Ok((stream, _)) => {
let sm = sm_hflv.clone();
let bs = bs_hflv.clone();
let fs = fs_hflv.clone();
let rf = rf_hflv.clone();
thread::spawn(move || {
httpflv_subscriber_handler(stream, sm, bs, fs, rf)
});
}
Err(_) => {}
}
thread::sleep(Duration::from_millis(10));
}
});
// 先启动 HTTP-FLV 拉流(它会等待流出现)
let httpflv_url = format!("http://127.0.0.1:{}/live/test.flv", httpflv_port);
let flv_output = "/tmp/e2e_test_output.flv";
// ffmpeg 拉流(写入文件)
let mut pull_child = std::process::Command::new("ffmpeg")
.args([
"-y",
"-i", &httpflv_url,
"-c", "copy",
"-f", "flv",
flv_output,
])
.stderr(std::process::Stdio::null())
.stdout(std::process::Stdio::null())
.spawn()
.expect("ffmpeg pull failed to start");
// 等待拉流端连接
thread::sleep(Duration::from_millis(200));
// ffmpeg 推流
let rtmp_url = format!("rtmp://127.0.0.1:{}/live/test", rtmp_port);
let mut push_child = std::process::Command::new("ffmpeg")
.args([
"-re",
"-i", video_path,
"-c:v", "copy",
"-c:a", "copy",
"-f", "flv",
&rtmp_url,
])
.stderr(std::process::Stdio::null())
.stdout(std::process::Stdio::null())
.spawn()
.expect("ffmpeg push failed to start");
// 等待推流完成(最多 30 秒)
let start = std::time::Instant::now();
loop {
match push_child.try_wait() {
Ok(Some(_)) => break,
Ok(None) => {
if start.elapsed() > Duration::from_secs(30) {
let _ = push_child.kill();
let _ = push_child.wait();
break;
}
thread::sleep(Duration::from_millis(200));
}
Err(_) => break,
}
}
// 等拉流端再收一点数据
thread::sleep(Duration::from_secs(3));
// 停止拉流
let _ = pull_child.kill();
let _ = pull_child.wait();
// 停止服务器
running.store(false, Ordering::Relaxed);
let _ = rtmp_handle.join();
let _ = httpflv_handle.join();
// 验证结果
let pubs = publish_count.load(Ordering::Relaxed);
let flv_bytes = httpflv_bytes_sent.load(Ordering::Relaxed);
let flv_frames = httpflv_frames_sent.load(Ordering::Relaxed);
eprintln!("\n[e2e] ========================");
eprintln!("[e2e] RTMP publish sessions: {}", pubs);
eprintln!("[e2e] HTTP-FLV bytes sent: {}", flv_bytes);
eprintln!("[e2e] HTTP-FLV frames sent: {}", flv_frames);
eprintln!("[e2e] ========================");
assert!(pubs >= 1, "should have at least 1 publish session");
assert!(flv_bytes > 0, "HTTP-FLV should send data");
assert!(flv_frames > 0, "HTTP-FLV should send frames");
// 验证输出文件
if std::path::Path::new(flv_output).exists() {
let file_size = fs::metadata(flv_output).map(|m| m.len()).unwrap_or(0);
eprintln!("[e2e] output FLV file: {} bytes", file_size);
if file_size > 13 {
// 用 ffprobe 检查
let probe = std::process::Command::new("ffprobe")
.args(["-v", "quiet", "-print_format", "json", "-show_streams", flv_output])
.output();
if let Ok(output) = probe {
let stdout = String::from_utf8_lossy(&output.stdout);
let has_h264 = stdout.contains("h264");
let has_aac = stdout.contains("aac");
eprintln!("[e2e] ffprobe: h264={}, aac={}", has_h264, has_aac);
assert!(has_h264 || has_aac, "output should contain at least one valid stream");
}
}
}
}