//! RTMP 集成测试 — 使用 ffmpeg 推流验证完整 RTMP 协议栈 //! //! 测试流程: //! 1. 启动一个最简 RTMP 服务器(纯 TCP + RtmpSession) //! 2. 使用 ffmpeg 推流一个真实 MP4 文件 //! 3. 验证握手、connect、createStream、publish、音视频数据全部正确处理 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::io::{Read, Write}; use std::net::{TcpListener, TcpStream}; use std::sync::Arc; use std::thread; use std::time::Duration; use rave::protocol::rtmp::session::{RtmpSession, SessionEvent, SessionState}; /// 收集到的统计数据(原子操作,跨线程安全) struct Stats { /// 收到的音视频消息数 media_count: AtomicUsize, /// 收到的音频消息数 audio_count: AtomicUsize, /// 收到的视频消息数 video_count: AtomicUsize, /// 是否收到了 PublishStart 事件 publish_started: AtomicBool, /// 是否收到了 MediaData 事件 media_received: AtomicBool, /// 会话是否到达 Publishing 状态 reached_publishing: AtomicBool, } /// 运行 RTMP 测试服务器,返回 (端口, 统计句柄, 关闭信号) fn start_test_server() -> (u16, Arc, Arc) { let stats = Arc::new(Stats { media_count: AtomicUsize::new(0), audio_count: AtomicUsize::new(0), video_count: AtomicUsize::new(0), publish_started: AtomicBool::new(false), media_received: AtomicBool::new(false), reached_publishing: AtomicBool::new(false), }); let shutdown = Arc::new(AtomicBool::new(false)); // 绑定到 0 端口让 OS 分配可用端口 let listener = TcpListener::bind("127.0.0.1:0").expect("bind failed"); let port = listener.local_addr().unwrap().port(); let stats_clone = stats.clone(); let shutdown_clone = shutdown.clone(); thread::spawn(move || { listener.set_nonblocking(false).ok(); // 只接受一个连接 if let Ok((stream, _)) = listener.accept() { handle_rtmp_client(stream, &stats_clone); } // 等待关闭信号再退出(给统计收集留时间) let start = std::time::Instant::now(); while !shutdown_clone.load(Ordering::Relaxed) && start.elapsed() < Duration::from_secs(30) { thread::sleep(Duration::from_millis(50)); } }); (port, stats, shutdown) } /// 处理单个 RTMP 客户端 fn handle_rtmp_client(mut stream: TcpStream, stats: &Arc) { stream.set_read_timeout(Some(Duration::from_secs(10))).ok(); stream.set_write_timeout(Some(Duration::from_secs(5))).ok(); eprintln!("[test server] client connected from {:?}", stream.peer_addr()); let mut session = RtmpSession::new(); let mut buf = [0u8; 65536]; let mut read_count = 0usize; loop { match stream.read(&mut buf) { Ok(0) => { eprintln!("[test server] client disconnected after {} reads", read_count); break; } Ok(n) => { read_count += 1; let events = session.on_data(&buf[..n]); eprintln!("[test server] read {} bytes (read #{}, state={:?}), got {} events, recv_buf={}", n, read_count, session.state(), events.len(), session.recv_buf_len()); for event in events { match event { SessionEvent::SendData(data) => { eprintln!("[test server] sending {} bytes", data.len()); if stream.write_all(&data).is_err() { eprintln!("[test server] write error"); return; } } SessionEvent::PublishStart { ref app, ref stream } => { eprintln!("[test server] PublishStart: app={}, stream={}", app, stream); stats.publish_started.store(true, Ordering::Relaxed); stats.reached_publishing.store( session.state() == SessionState::Publishing, Ordering::Relaxed, ); } SessionEvent::PlayStart { .. } => {} SessionEvent::MediaData { msg_type_id, timestamp: _, data: _ } => { let _count = stats.media_count.fetch_add(1, Ordering::Relaxed) + 1; match msg_type_id { 8 => { stats.audio_count.fetch_add(1, Ordering::Relaxed); } 9 => { stats.video_count.fetch_add(1, Ordering::Relaxed); } _ => {} } stats.media_received.store(true, Ordering::Relaxed); } SessionEvent::Error(e) => { eprintln!("[test server] error: {}", e); if session.state() == SessionState::Closed { return; } } SessionEvent::SessionClosed => return, } } } Err(e) => { if e.kind() == std::io::ErrorKind::WouldBlock || e.kind() == std::io::ErrorKind::TimedOut { continue; } break; } } } } #[test] fn test_rtmp_ffmpeg_push_stream() { // 检查测试视频文件是否存在 let video_path = "/tmp/test_video.mp4"; if !std::path::Path::new(video_path).exists() { eprintln!("SKIP: test video not found at {}", video_path); return; } // 启动测试服务器 let (port, stats, shutdown) = start_test_server(); eprintln!("[test] RTMP server listening on 127.0.0.1:{}", port); // 等待服务器就绪 thread::sleep(Duration::from_millis(200)); // 使用 ffmpeg 推流 let rtmp_url = format!("rtmp://127.0.0.1:{}/live/test", port); let child = std::process::Command::new("ffmpeg") .args([ "-re", "-i", video_path, "-c:v", "copy", "-c:a", "copy", "-f", "flv", rtmp_url.as_str(), ]) .stderr(std::process::Stdio::inherit()) // 输出 stderr 到测试控制台 .stdout(std::process::Stdio::null()) .spawn(); let mut child = match child { Ok(c) => c, Err(e) => { eprintln!("SKIP: cannot start ffmpeg: {}", e); shutdown.store(true, Ordering::Relaxed); return; } }; // 等待 ffmpeg 完成(最多 30 秒) let start = std::time::Instant::now(); loop { match child.try_wait() { Ok(Some(status)) => { eprintln!("[test] ffmpeg exited with status: {}", status); break; } Ok(None) => { // 检查是否已收到足够的媒体数据(收到 >10 条就提前结束) if stats.media_count.load(Ordering::Relaxed) > 20 { eprintln!("[test] received enough media data, killing ffmpeg"); let _ = child.kill(); let _ = child.wait(); break; } if start.elapsed() > Duration::from_secs(30) { eprintln!("[test] timeout, killing ffmpeg"); let _ = child.kill(); let _ = child.wait(); break; } thread::sleep(Duration::from_millis(200)); } Err(e) => { eprintln!("[test] error waiting for ffmpeg: {}", e); break; } } } // 给服务器一点时间处理最后的数据 thread::sleep(Duration::from_millis(500)); // 输出统计信息 let media_total = stats.media_count.load(Ordering::Relaxed); let audio_total = stats.audio_count.load(Ordering::Relaxed); let video_total = stats.video_count.load(Ordering::Relaxed); eprintln!("[test] stats: media={}, audio={}, video={}", media_total, audio_total, video_total); // 断言 assert!(stats.publish_started.load(Ordering::Relaxed), "should receive PublishStart event"); assert!(stats.reached_publishing.load(Ordering::Relaxed), "session should reach Publishing state"); assert!(stats.media_received.load(Ordering::Relaxed), "should receive at least one MediaData event"); assert!(media_total > 0, "should receive more than 0 media messages"); assert!(audio_total > 0, "should receive audio messages"); assert!(video_total > 0, "should receive video messages"); shutdown.store(true, Ordering::Relaxed); }