//! RTMP 推流 → HTTP-FLV 拉流 端到端集成测试 //! //! 测试完整的流媒体链路: //! 1. 启动 RTMP 服务器(接收推流) //! 2. 启动 HTTP-FLV 服务器(分发拉流) //! 3. ffmpeg 推流到 RTMP //! 4. ffplay/ffmpeg 从 HTTP-FLV 拉流验证 //! //! 使用方法: //! E2E=1 cargo test --test e2e_rtmp_httpflv -- --nocapture use std::fs; use std::io::{Read, Write}; use std::net::{TcpListener, TcpStream}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; use std::thread; use std::time::Duration; use rave::core::group::StreamManager; use rave::protocol::rtmp::session::{RtmpSession, SessionEvent, SessionState}; use rave::protocol::httpflv::{self, HttpRequestBuffer}; use rave::remux::rtmp2flv; use rave::sdk::context::EngineContext; use rave::sdk::traits::StreamManagerApi; use rave::sdk::types::StreamPath; /// RTMP 推流处理线程 — 接收 ffmpeg 推流并通过 StreamManager 分发 fn rtmp_publisher_handler( mut stream: TcpStream, sm: Arc, publish_count: Arc, ) { let _peer = stream.peer_addr().map(|a| a.to_string()).unwrap_or_default(); let mut session = RtmpSession::new(); let mut buf = [0u8; 65536]; let mut stream_path: Option = None; stream.set_read_timeout(Some(Duration::from_secs(15))).ok(); loop { match stream.read(&mut buf) { Ok(0) => break, Ok(n) => { let events = session.on_data(&buf[..n]); for event in events { match event { SessionEvent::SendData(data) => { if stream.write_all(&data).is_err() { return; } } SessionEvent::PublishStart { app, stream: name } => { let sp = StreamPath::new(&app, &name); let _ = sm.create_stream(sp.clone()); stream_path = Some(sp); publish_count.fetch_add(1, Ordering::Relaxed); } SessionEvent::MediaData { msg_type_id, timestamp, data } => { if let Some(ref sp) = stream_path { if let Some(frame) = rtmp_media_to_avframe(msg_type_id, timestamp, &data) { sm.dispatch_frame(sp, frame); } } } SessionEvent::Error(_) => { if session.state() == SessionState::Closed { return; } } SessionEvent::SessionClosed => return, _ => {} } } } Err(_) => break, } } if let Some(sp) = stream_path { sm.remove_stream(&sp); } } /// HTTP-FLV 拉流处理线程 — 订阅流并持续发送 FLV 数据 fn httpflv_subscriber_handler( mut stream: TcpStream, sm: Arc, bytes_sent: Arc, frames_sent: Arc, ready_flag: Arc, ) { let _peer = stream.peer_addr().map(|a| a.to_string()).unwrap_or_default(); stream.set_read_timeout(Some(Duration::from_secs(5))).ok(); stream.set_write_timeout(Some(Duration::from_secs(5))).ok(); // 读取 HTTP 请求 let mut req_buf = HttpRequestBuffer::new(); let mut read_buf = [0u8; 4096]; loop { match stream.read(&mut read_buf) { Ok(0) => return, Ok(n) => { if req_buf.feed(&read_buf[..n]) { break; } } Err(_) => return, } } let request = match req_buf.parse() { Ok(r) => r, Err(_) => return, }; if !httpflv::is_http_flv_request(&request) { let _ = stream.write_all(httpflv::build_http_404_response().as_bytes()); return; } let (app, stream_name) = match httpflv::parse_stream_path(&request.path) { Some(pair) => pair, None => return, }; let sp = StreamPath::new(&app, &stream_name); // 等待流可用(最多等 10 秒) let subscriber = { let mut attempts = 0; loop { match sm.subscribe(&sp) { Ok(sub) => break sub, Err(_) => { attempts += 1; if attempts > 200 { return; } thread::sleep(Duration::from_millis(50)); } } } }; // 发送 HTTP 200 + FLV Header let http_resp = httpflv::build_http_flv_response(); if stream.write_all(http_resp.as_bytes()).is_err() { return; } let flv_header = rtmp2flv::flv_header_with_audio_video(); if stream.write_all(&flv_header).is_err() { return; } let _ = stream.flush(); bytes_sent.fetch_add(flv_header.len(), Ordering::Relaxed); ready_flag.store(true, Ordering::Relaxed); // 循环读取帧 → 发送 FLV Tag let mut idle_count: u32 = 0; loop { match subscriber.read_frame() { Some(frame) => { idle_count = 0; let msg_type = match frame.media_type { rave::sdk::types::MediaType::Audio => 8, rave::sdk::types::MediaType::Video => 9, rave::sdk::types::MediaType::Data => 18, }; if let Some(tag) = rtmp2flv::rtmp_msg_to_flv_tag( msg_type, frame.timestamp_ms as u32, &frame.data, ) { let encoded = tag.encode(); if stream.write_all(&encoded).is_err() { break; } let _ = stream.flush(); bytes_sent.fetch_add(encoded.len(), Ordering::Relaxed); frames_sent.fetch_add(1, Ordering::Relaxed); } } None => { idle_count += 1; if idle_count > 600 { if !sm.has_stream(&sp) { break; } idle_count = 0; } thread::sleep(Duration::from_millis(5)); } } if !subscriber.is_active() { break; } } subscriber.close(); } /// 将 RTMP 媒体消息转为 AVFrame(复用 server.rs 的逻辑) fn rtmp_media_to_avframe( msg_type: u8, timestamp: u32, data: &[u8], ) -> Option { use rave::sdk::types::*; if data.is_empty() { return None; } match msg_type { 8 => { // Audio let byte0 = data[0]; let sound_format = (byte0 >> 4) & 0x0F; let codec = match sound_format { 10 => AudioCodec::Aac, _ => AudioCodec::Unknown, }; Some(AVFrame::new_audio(timestamp as u64, Arc::new(data.to_vec()), codec)) } 9 => { // Video let byte0 = data[0]; let frame_type_raw = (byte0 >> 4) & 0x0F; let codec_id = byte0 & 0x0F; let frame_type = match frame_type_raw { 1 => FrameType::KeyFrame, 2 => FrameType::InterFrame, _ => FrameType::InterFrame, }; let codec = match codec_id { 7 => VideoCodec::H264, 12 => VideoCodec::H265, _ => VideoCodec::Unknown, }; Some(AVFrame::new_video(timestamp as u64, Arc::new(data.to_vec()), codec, frame_type)) } _ => None, } } #[test] fn test_e2e_rtmp_push_httpflv_pull() { if std::env::var("E2E").is_err() { eprintln!("SKIP: set E2E=1 to run end-to-end test"); eprintln!(" E2E=1 cargo test --test e2e_rtmp_httpflv -- --nocapture"); return; } let video_path = "/tmp/test_video.mp4"; if !std::path::Path::new(video_path).exists() { eprintln!("SKIP: {} not found", video_path); return; } let sm: Arc = Arc::new(StreamManager::new()); let _context = Arc::new(EngineContext::new(sm.clone())); // 统计 let publish_count = Arc::new(AtomicUsize::new(0)); let httpflv_bytes_sent = Arc::new(AtomicUsize::new(0)); let httpflv_frames_sent = Arc::new(AtomicUsize::new(0)); let httpflv_ready = Arc::new(AtomicBool::new(false)); let running = Arc::new(AtomicBool::new(true)); // 启动 RTMP 服务器 let rtmp_listener = TcpListener::bind("127.0.0.1:0").unwrap(); let rtmp_port = rtmp_listener.local_addr().unwrap().port(); let sm_rtmp = sm.clone(); let pc_rtmp = publish_count.clone(); let running_rtmp = running.clone(); let rtmp_handle = thread::spawn(move || { rtmp_listener.set_nonblocking(true).ok(); while running_rtmp.load(Ordering::Relaxed) { match rtmp_listener.accept() { Ok((stream, _)) => { let sm = sm_rtmp.clone(); let pc = pc_rtmp.clone(); thread::spawn(move || rtmp_publisher_handler(stream, sm, pc)); } Err(_) => {} } thread::sleep(Duration::from_millis(10)); } }); // 启动 HTTP-FLV 服务器 let httpflv_listener = TcpListener::bind("127.0.0.1:0").unwrap(); let httpflv_port = httpflv_listener.local_addr().unwrap().port(); let sm_hflv = sm.clone(); let bs_hflv = httpflv_bytes_sent.clone(); let fs_hflv = httpflv_frames_sent.clone(); let rf_hflv = httpflv_ready.clone(); let running_hflv = running.clone(); let httpflv_handle = thread::spawn(move || { httpflv_listener.set_nonblocking(true).ok(); while running_hflv.load(Ordering::Relaxed) { match httpflv_listener.accept() { Ok((stream, _)) => { let sm = sm_hflv.clone(); let bs = bs_hflv.clone(); let fs = fs_hflv.clone(); let rf = rf_hflv.clone(); thread::spawn(move || { httpflv_subscriber_handler(stream, sm, bs, fs, rf) }); } Err(_) => {} } thread::sleep(Duration::from_millis(10)); } }); // 先启动 HTTP-FLV 拉流(它会等待流出现) let httpflv_url = format!("http://127.0.0.1:{}/live/test.flv", httpflv_port); let flv_output = "/tmp/e2e_test_output.flv"; // ffmpeg 拉流(写入文件) let mut pull_child = std::process::Command::new("ffmpeg") .args([ "-y", "-i", &httpflv_url, "-c", "copy", "-f", "flv", flv_output, ]) .stderr(std::process::Stdio::null()) .stdout(std::process::Stdio::null()) .spawn() .expect("ffmpeg pull failed to start"); // 等待拉流端连接 thread::sleep(Duration::from_millis(200)); // ffmpeg 推流 let rtmp_url = format!("rtmp://127.0.0.1:{}/live/test", rtmp_port); let mut push_child = std::process::Command::new("ffmpeg") .args([ "-re", "-i", video_path, "-c:v", "copy", "-c:a", "copy", "-f", "flv", &rtmp_url, ]) .stderr(std::process::Stdio::null()) .stdout(std::process::Stdio::null()) .spawn() .expect("ffmpeg push failed to start"); // 等待推流完成(最多 30 秒) let start = std::time::Instant::now(); loop { match push_child.try_wait() { Ok(Some(_)) => break, Ok(None) => { if start.elapsed() > Duration::from_secs(30) { let _ = push_child.kill(); let _ = push_child.wait(); break; } thread::sleep(Duration::from_millis(200)); } Err(_) => break, } } // 等拉流端再收一点数据 thread::sleep(Duration::from_secs(3)); // 停止拉流 let _ = pull_child.kill(); let _ = pull_child.wait(); // 停止服务器 running.store(false, Ordering::Relaxed); let _ = rtmp_handle.join(); let _ = httpflv_handle.join(); // 验证结果 let pubs = publish_count.load(Ordering::Relaxed); let flv_bytes = httpflv_bytes_sent.load(Ordering::Relaxed); let flv_frames = httpflv_frames_sent.load(Ordering::Relaxed); eprintln!("\n[e2e] ========================"); eprintln!("[e2e] RTMP publish sessions: {}", pubs); eprintln!("[e2e] HTTP-FLV bytes sent: {}", flv_bytes); eprintln!("[e2e] HTTP-FLV frames sent: {}", flv_frames); eprintln!("[e2e] ========================"); assert!(pubs >= 1, "should have at least 1 publish session"); assert!(flv_bytes > 0, "HTTP-FLV should send data"); assert!(flv_frames > 0, "HTTP-FLV should send frames"); // 验证输出文件 if std::path::Path::new(flv_output).exists() { let file_size = fs::metadata(flv_output).map(|m| m.len()).unwrap_or(0); eprintln!("[e2e] output FLV file: {} bytes", file_size); if file_size > 13 { // 用 ffprobe 检查 let probe = std::process::Command::new("ffprobe") .args(["-v", "quiet", "-print_format", "json", "-show_streams", flv_output]) .output(); if let Ok(output) = probe { let stdout = String::from_utf8_lossy(&output.stdout); let has_h264 = stdout.contains("h264"); let has_aac = stdout.contains("aac"); eprintln!("[e2e] ffprobe: h264={}, aac={}", has_h264, has_aac); assert!(has_h264 || has_aac, "output should contain at least one valid stream"); } } } }