229 lines
8.9 KiB
Rust
229 lines
8.9 KiB
Rust
//! RTMP 集成测试 — 使用 ffmpeg 推流验证完整 RTMP 协议栈
|
||
//!
|
||
//! 测试流程:
|
||
//! 1. 启动一个最简 RTMP 服务器(纯 TCP + RtmpSession)
|
||
//! 2. 使用 ffmpeg 推流一个真实 MP4 文件
|
||
//! 3. 验证握手、connect、createStream、publish、音视频数据全部正确处理
|
||
|
||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||
use std::io::{Read, Write};
|
||
use std::net::{TcpListener, TcpStream};
|
||
use std::sync::Arc;
|
||
use std::thread;
|
||
use std::time::Duration;
|
||
|
||
use rave::protocol::rtmp::session::{RtmpSession, SessionEvent, SessionState};
|
||
|
||
/// 收集到的统计数据(原子操作,跨线程安全)
|
||
struct Stats {
|
||
/// 收到的音视频消息数
|
||
media_count: AtomicUsize,
|
||
/// 收到的音频消息数
|
||
audio_count: AtomicUsize,
|
||
/// 收到的视频消息数
|
||
video_count: AtomicUsize,
|
||
/// 是否收到了 PublishStart 事件
|
||
publish_started: AtomicBool,
|
||
/// 是否收到了 MediaData 事件
|
||
media_received: AtomicBool,
|
||
/// 会话是否到达 Publishing 状态
|
||
reached_publishing: AtomicBool,
|
||
}
|
||
|
||
/// 运行 RTMP 测试服务器,返回 (端口, 统计句柄, 关闭信号)
|
||
fn start_test_server() -> (u16, Arc<Stats>, Arc<AtomicBool>) {
|
||
let stats = Arc::new(Stats {
|
||
media_count: AtomicUsize::new(0),
|
||
audio_count: AtomicUsize::new(0),
|
||
video_count: AtomicUsize::new(0),
|
||
publish_started: AtomicBool::new(false),
|
||
media_received: AtomicBool::new(false),
|
||
reached_publishing: AtomicBool::new(false),
|
||
});
|
||
let shutdown = Arc::new(AtomicBool::new(false));
|
||
|
||
// 绑定到 0 端口让 OS 分配可用端口
|
||
let listener = TcpListener::bind("127.0.0.1:0").expect("bind failed");
|
||
let port = listener.local_addr().unwrap().port();
|
||
|
||
let stats_clone = stats.clone();
|
||
let shutdown_clone = shutdown.clone();
|
||
|
||
thread::spawn(move || {
|
||
listener.set_nonblocking(false).ok();
|
||
// 只接受一个连接
|
||
if let Ok((stream, _)) = listener.accept() {
|
||
handle_rtmp_client(stream, &stats_clone);
|
||
}
|
||
// 等待关闭信号再退出(给统计收集留时间)
|
||
let start = std::time::Instant::now();
|
||
while !shutdown_clone.load(Ordering::Relaxed) && start.elapsed() < Duration::from_secs(30) {
|
||
thread::sleep(Duration::from_millis(50));
|
||
}
|
||
});
|
||
|
||
(port, stats, shutdown)
|
||
}
|
||
|
||
/// 处理单个 RTMP 客户端
|
||
fn handle_rtmp_client(mut stream: TcpStream, stats: &Arc<Stats>) {
|
||
stream.set_read_timeout(Some(Duration::from_secs(10))).ok();
|
||
stream.set_write_timeout(Some(Duration::from_secs(5))).ok();
|
||
eprintln!("[test server] client connected from {:?}", stream.peer_addr());
|
||
|
||
let mut session = RtmpSession::new();
|
||
let mut buf = [0u8; 65536];
|
||
let mut read_count = 0usize;
|
||
|
||
loop {
|
||
match stream.read(&mut buf) {
|
||
Ok(0) => {
|
||
eprintln!("[test server] client disconnected after {} reads", read_count);
|
||
break;
|
||
}
|
||
Ok(n) => {
|
||
read_count += 1;
|
||
let events = session.on_data(&buf[..n]);
|
||
eprintln!("[test server] read {} bytes (read #{}, state={:?}), got {} events, recv_buf={}", n, read_count, session.state(), events.len(), session.recv_buf_len());
|
||
for event in events {
|
||
match event {
|
||
SessionEvent::SendData(data) => {
|
||
eprintln!("[test server] sending {} bytes", data.len());
|
||
if stream.write_all(&data).is_err() {
|
||
eprintln!("[test server] write error");
|
||
return;
|
||
}
|
||
}
|
||
SessionEvent::PublishStart { ref app, ref stream } => {
|
||
eprintln!("[test server] PublishStart: app={}, stream={}", app, stream);
|
||
stats.publish_started.store(true, Ordering::Relaxed);
|
||
stats.reached_publishing.store(
|
||
session.state() == SessionState::Publishing,
|
||
Ordering::Relaxed,
|
||
);
|
||
}
|
||
SessionEvent::PlayStart { .. } => {}
|
||
SessionEvent::MediaData { msg_type_id, timestamp: _, data: _ } => {
|
||
let _count = stats.media_count.fetch_add(1, Ordering::Relaxed) + 1;
|
||
match msg_type_id {
|
||
8 => {
|
||
stats.audio_count.fetch_add(1, Ordering::Relaxed);
|
||
}
|
||
9 => {
|
||
stats.video_count.fetch_add(1, Ordering::Relaxed);
|
||
}
|
||
_ => {}
|
||
}
|
||
stats.media_received.store(true, Ordering::Relaxed);
|
||
}
|
||
SessionEvent::Error(e) => {
|
||
eprintln!("[test server] error: {}", e);
|
||
if session.state() == SessionState::Closed {
|
||
return;
|
||
}
|
||
}
|
||
SessionEvent::SessionClosed => return,
|
||
}
|
||
}
|
||
}
|
||
Err(e) => {
|
||
if e.kind() == std::io::ErrorKind::WouldBlock || e.kind() == std::io::ErrorKind::TimedOut {
|
||
continue;
|
||
}
|
||
break;
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
#[test]
|
||
fn test_rtmp_ffmpeg_push_stream() {
|
||
// 检查测试视频文件是否存在
|
||
let video_path = "/tmp/test_video.mp4";
|
||
if !std::path::Path::new(video_path).exists() {
|
||
eprintln!("SKIP: test video not found at {}", video_path);
|
||
return;
|
||
}
|
||
|
||
// 启动测试服务器
|
||
let (port, stats, shutdown) = start_test_server();
|
||
eprintln!("[test] RTMP server listening on 127.0.0.1:{}", port);
|
||
|
||
// 等待服务器就绪
|
||
thread::sleep(Duration::from_millis(200));
|
||
|
||
// 使用 ffmpeg 推流
|
||
let rtmp_url = format!("rtmp://127.0.0.1:{}/live/test", port);
|
||
let child = std::process::Command::new("ffmpeg")
|
||
.args([
|
||
"-re",
|
||
"-i", video_path,
|
||
"-c:v", "copy",
|
||
"-c:a", "copy",
|
||
"-f", "flv",
|
||
rtmp_url.as_str(),
|
||
])
|
||
.stderr(std::process::Stdio::inherit()) // 输出 stderr 到测试控制台
|
||
.stdout(std::process::Stdio::null())
|
||
.spawn();
|
||
|
||
let mut child = match child {
|
||
Ok(c) => c,
|
||
Err(e) => {
|
||
eprintln!("SKIP: cannot start ffmpeg: {}", e);
|
||
shutdown.store(true, Ordering::Relaxed);
|
||
return;
|
||
}
|
||
};
|
||
|
||
// 等待 ffmpeg 完成(最多 30 秒)
|
||
let start = std::time::Instant::now();
|
||
loop {
|
||
match child.try_wait() {
|
||
Ok(Some(status)) => {
|
||
eprintln!("[test] ffmpeg exited with status: {}", status);
|
||
break;
|
||
}
|
||
Ok(None) => {
|
||
// 检查是否已收到足够的媒体数据(收到 >10 条就提前结束)
|
||
if stats.media_count.load(Ordering::Relaxed) > 20 {
|
||
eprintln!("[test] received enough media data, killing ffmpeg");
|
||
let _ = child.kill();
|
||
let _ = child.wait();
|
||
break;
|
||
}
|
||
if start.elapsed() > Duration::from_secs(30) {
|
||
eprintln!("[test] timeout, killing ffmpeg");
|
||
let _ = child.kill();
|
||
let _ = child.wait();
|
||
break;
|
||
}
|
||
thread::sleep(Duration::from_millis(200));
|
||
}
|
||
Err(e) => {
|
||
eprintln!("[test] error waiting for ffmpeg: {}", e);
|
||
break;
|
||
}
|
||
}
|
||
}
|
||
|
||
// 给服务器一点时间处理最后的数据
|
||
thread::sleep(Duration::from_millis(500));
|
||
|
||
// 输出统计信息
|
||
let media_total = stats.media_count.load(Ordering::Relaxed);
|
||
let audio_total = stats.audio_count.load(Ordering::Relaxed);
|
||
let video_total = stats.video_count.load(Ordering::Relaxed);
|
||
eprintln!("[test] stats: media={}, audio={}, video={}", media_total, audio_total, video_total);
|
||
|
||
// 断言
|
||
assert!(stats.publish_started.load(Ordering::Relaxed), "should receive PublishStart event");
|
||
assert!(stats.reached_publishing.load(Ordering::Relaxed), "session should reach Publishing state");
|
||
assert!(stats.media_received.load(Ordering::Relaxed), "should receive at least one MediaData event");
|
||
assert!(media_total > 0, "should receive more than 0 media messages");
|
||
assert!(audio_total > 0, "should receive audio messages");
|
||
assert!(video_total > 0, "should receive video messages");
|
||
|
||
shutdown.store(true, Ordering::Relaxed);
|
||
}
|