rave/tests/rtmp_integration.rs

229 lines
8.9 KiB
Rust
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//! RTMP 集成测试 — 使用 ffmpeg 推流验证完整 RTMP 协议栈
//!
//! 测试流程:
//! 1. 启动一个最简 RTMP 服务器(纯 TCP + RtmpSession
//! 2. 使用 ffmpeg 推流一个真实 MP4 文件
//! 3. 验证握手、connect、createStream、publish、音视频数据全部正确处理
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use rave::protocol::rtmp::session::{RtmpSession, SessionEvent, SessionState};
/// 收集到的统计数据(原子操作,跨线程安全)
struct Stats {
/// 收到的音视频消息数
media_count: AtomicUsize,
/// 收到的音频消息数
audio_count: AtomicUsize,
/// 收到的视频消息数
video_count: AtomicUsize,
/// 是否收到了 PublishStart 事件
publish_started: AtomicBool,
/// 是否收到了 MediaData 事件
media_received: AtomicBool,
/// 会话是否到达 Publishing 状态
reached_publishing: AtomicBool,
}
/// 运行 RTMP 测试服务器,返回 (端口, 统计句柄, 关闭信号)
fn start_test_server() -> (u16, Arc<Stats>, Arc<AtomicBool>) {
let stats = Arc::new(Stats {
media_count: AtomicUsize::new(0),
audio_count: AtomicUsize::new(0),
video_count: AtomicUsize::new(0),
publish_started: AtomicBool::new(false),
media_received: AtomicBool::new(false),
reached_publishing: AtomicBool::new(false),
});
let shutdown = Arc::new(AtomicBool::new(false));
// 绑定到 0 端口让 OS 分配可用端口
let listener = TcpListener::bind("127.0.0.1:0").expect("bind failed");
let port = listener.local_addr().unwrap().port();
let stats_clone = stats.clone();
let shutdown_clone = shutdown.clone();
thread::spawn(move || {
listener.set_nonblocking(false).ok();
// 只接受一个连接
if let Ok((stream, _)) = listener.accept() {
handle_rtmp_client(stream, &stats_clone);
}
// 等待关闭信号再退出(给统计收集留时间)
let start = std::time::Instant::now();
while !shutdown_clone.load(Ordering::Relaxed) && start.elapsed() < Duration::from_secs(30) {
thread::sleep(Duration::from_millis(50));
}
});
(port, stats, shutdown)
}
/// 处理单个 RTMP 客户端
fn handle_rtmp_client(mut stream: TcpStream, stats: &Arc<Stats>) {
stream.set_read_timeout(Some(Duration::from_secs(10))).ok();
stream.set_write_timeout(Some(Duration::from_secs(5))).ok();
eprintln!("[test server] client connected from {:?}", stream.peer_addr());
let mut session = RtmpSession::new();
let mut buf = [0u8; 65536];
let mut read_count = 0usize;
loop {
match stream.read(&mut buf) {
Ok(0) => {
eprintln!("[test server] client disconnected after {} reads", read_count);
break;
}
Ok(n) => {
read_count += 1;
let events = session.on_data(&buf[..n]);
eprintln!("[test server] read {} bytes (read #{}, state={:?}), got {} events, recv_buf={}", n, read_count, session.state(), events.len(), session.recv_buf_len());
for event in events {
match event {
SessionEvent::SendData(data) => {
eprintln!("[test server] sending {} bytes", data.len());
if stream.write_all(&data).is_err() {
eprintln!("[test server] write error");
return;
}
}
SessionEvent::PublishStart { ref app, ref stream } => {
eprintln!("[test server] PublishStart: app={}, stream={}", app, stream);
stats.publish_started.store(true, Ordering::Relaxed);
stats.reached_publishing.store(
session.state() == SessionState::Publishing,
Ordering::Relaxed,
);
}
SessionEvent::PlayStart { .. } => {}
SessionEvent::MediaData { msg_type_id, timestamp: _, data: _ } => {
let _count = stats.media_count.fetch_add(1, Ordering::Relaxed) + 1;
match msg_type_id {
8 => {
stats.audio_count.fetch_add(1, Ordering::Relaxed);
}
9 => {
stats.video_count.fetch_add(1, Ordering::Relaxed);
}
_ => {}
}
stats.media_received.store(true, Ordering::Relaxed);
}
SessionEvent::Error(e) => {
eprintln!("[test server] error: {}", e);
if session.state() == SessionState::Closed {
return;
}
}
SessionEvent::SessionClosed => return,
}
}
}
Err(e) => {
if e.kind() == std::io::ErrorKind::WouldBlock || e.kind() == std::io::ErrorKind::TimedOut {
continue;
}
break;
}
}
}
}
#[test]
fn test_rtmp_ffmpeg_push_stream() {
// 检查测试视频文件是否存在
let video_path = "/tmp/test_video.mp4";
if !std::path::Path::new(video_path).exists() {
eprintln!("SKIP: test video not found at {}", video_path);
return;
}
// 启动测试服务器
let (port, stats, shutdown) = start_test_server();
eprintln!("[test] RTMP server listening on 127.0.0.1:{}", port);
// 等待服务器就绪
thread::sleep(Duration::from_millis(200));
// 使用 ffmpeg 推流
let rtmp_url = format!("rtmp://127.0.0.1:{}/live/test", port);
let child = std::process::Command::new("ffmpeg")
.args([
"-re",
"-i", video_path,
"-c:v", "copy",
"-c:a", "copy",
"-f", "flv",
rtmp_url.as_str(),
])
.stderr(std::process::Stdio::inherit()) // 输出 stderr 到测试控制台
.stdout(std::process::Stdio::null())
.spawn();
let mut child = match child {
Ok(c) => c,
Err(e) => {
eprintln!("SKIP: cannot start ffmpeg: {}", e);
shutdown.store(true, Ordering::Relaxed);
return;
}
};
// 等待 ffmpeg 完成(最多 30 秒)
let start = std::time::Instant::now();
loop {
match child.try_wait() {
Ok(Some(status)) => {
eprintln!("[test] ffmpeg exited with status: {}", status);
break;
}
Ok(None) => {
// 检查是否已收到足够的媒体数据(收到 >10 条就提前结束)
if stats.media_count.load(Ordering::Relaxed) > 20 {
eprintln!("[test] received enough media data, killing ffmpeg");
let _ = child.kill();
let _ = child.wait();
break;
}
if start.elapsed() > Duration::from_secs(30) {
eprintln!("[test] timeout, killing ffmpeg");
let _ = child.kill();
let _ = child.wait();
break;
}
thread::sleep(Duration::from_millis(200));
}
Err(e) => {
eprintln!("[test] error waiting for ffmpeg: {}", e);
break;
}
}
}
// 给服务器一点时间处理最后的数据
thread::sleep(Duration::from_millis(500));
// 输出统计信息
let media_total = stats.media_count.load(Ordering::Relaxed);
let audio_total = stats.audio_count.load(Ordering::Relaxed);
let video_total = stats.video_count.load(Ordering::Relaxed);
eprintln!("[test] stats: media={}, audio={}, video={}", media_total, audio_total, video_total);
// 断言
assert!(stats.publish_started.load(Ordering::Relaxed), "should receive PublishStart event");
assert!(stats.reached_publishing.load(Ordering::Relaxed), "session should reach Publishing state");
assert!(stats.media_received.load(Ordering::Relaxed), "should receive at least one MediaData event");
assert!(media_total > 0, "should receive more than 0 media messages");
assert!(audio_total > 0, "should receive audio messages");
assert!(video_total > 0, "should receive video messages");
shutdown.store(true, Ordering::Relaxed);
}