rave/tests/rtmp_loop.rs

251 lines
9.3 KiB
Rust

//! RTMP 循环推流测试 — 持续接收 ffmpeg 推流并保存为 FLV 文件
//!
//! 使用方法:
//! cargo test --test rtmp_loop -- --nocapture
//!
//! 播放:
//! ffplay /tmp/rave_test.flv
use std::fs::File;
use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use rave::protocol::rtmp::session::{RtmpSession, SessionEvent, SessionState};
use rave::remux::rtmp2flv;
struct Stats {
media_count: AtomicUsize,
audio_count: AtomicUsize,
video_count: AtomicUsize,
publish_count: AtomicUsize,
}
/// RTMP → FLV 录制服务器
fn run_recorder(mut stream: TcpStream, stats: &Arc<Stats>, flv_file: Arc<std::sync::Mutex<File>>) {
let peer = stream.peer_addr().map(|a| a.to_string()).unwrap_or_default();
eprintln!("[recorder] client connected from {}", peer);
let mut session = RtmpSession::new();
let mut buf = [0u8; 65536];
let mut flv_header_sent = false;
stream.set_read_timeout(Some(Duration::from_secs(15))).ok();
stream.set_write_timeout(Some(Duration::from_secs(5))).ok();
loop {
match stream.read(&mut buf) {
Ok(0) => {
eprintln!("[recorder] client {} disconnected", peer);
break;
}
Ok(n) => {
let events = session.on_data(&buf[..n]);
for event in events {
match event {
SessionEvent::SendData(data) => {
if stream.write_all(&data).is_err() {
return;
}
}
SessionEvent::PublishStart { ref app, ref stream } => {
let count = stats.publish_count.fetch_add(1, Ordering::Relaxed) + 1;
eprintln!("[recorder] #{count} PublishStart: app={app}, stream={stream}");
}
SessionEvent::MediaData { msg_type_id, timestamp, data } => {
let total = stats.media_count.fetch_add(1, Ordering::Relaxed) + 1;
match msg_type_id {
8 => { stats.audio_count.fetch_add(1, Ordering::Relaxed); }
9 => { stats.video_count.fetch_add(1, Ordering::Relaxed); }
_ => {}
}
// 写 FLV
if let Ok(mut f) = flv_file.lock() {
if !flv_header_sent {
let hdr = rtmp2flv::flv_header_with_audio_video();
let _ = f.write_all(&hdr);
flv_header_sent = true;
}
if let Some(tag) = rtmp2flv::rtmp_msg_to_flv_tag(msg_type_id, timestamp, &data) {
let _ = f.write_all(&tag.encode());
}
// 每 100 帧刷新一次
if total % 100 == 0 {
let _ = f.flush();
}
}
}
SessionEvent::Error(e) => {
eprintln!("[recorder] error: {}", e);
if session.state() == SessionState::Closed {
return;
}
}
SessionEvent::SessionClosed => return,
_ => {}
}
}
}
Err(e) => {
if e.kind() == std::io::ErrorKind::TimedOut || e.kind() == std::io::ErrorKind::WouldBlock {
continue;
}
break;
}
}
}
}
#[test]
fn test_rtmp_loop_push_and_record() {
let video_path = "/tmp/test_video.mp4";
if !std::path::Path::new(video_path).exists() {
eprintln!("SKIP: {} not found", video_path);
return;
}
if std::env::var("RTMP_LOOP").is_err() {
eprintln!("NOTE: set env RTMP_LOOP=1 to enable this test");
eprintln!(" RTMP_LOOP=1 cargo test --test rtmp_loop -- --nocapture");
return;
}
let stats = Arc::new(Stats {
media_count: AtomicUsize::new(0),
audio_count: AtomicUsize::new(0),
video_count: AtomicUsize::new(0),
publish_count: AtomicUsize::new(0),
});
// 创建 FLV 输出文件
let flv_path = "/tmp/rave_test.flv";
let flv_file = Arc::new(std::sync::Mutex::new(
File::create(flv_path).expect("create flv file failed"),
));
// 启动 RTMP 服务器
let listener = TcpListener::bind("127.0.0.1:0").expect("bind failed");
let port = listener.local_addr().unwrap().port();
let running = Arc::new(AtomicBool::new(true));
let stats_srv = stats.clone();
let flv_srv = flv_file.clone();
let running_srv = running.clone();
let server_handle = thread::spawn(move || {
listener.set_nonblocking(false).ok();
eprintln!("========================================");
eprintln!(" Rave RTMP Server");
eprintln!(" Push: ffmpeg -re -i {} -c copy -f flv rtmp://127.0.0.1:{}/live/test", video_path, port);
eprintln!(" Watch: ffplay {}", flv_path);
eprintln!("========================================");
while running_srv.load(Ordering::Relaxed) {
listener.set_nonblocking(true).ok();
match listener.accept() {
Ok((stream, _)) => {
let s = stats_srv.clone();
let f = flv_srv.clone();
thread::spawn(move || run_recorder(stream, &s, f));
}
Err(_) => {}
}
thread::sleep(Duration::from_millis(50));
}
});
// 循环推流 3 次
let loop_count = 3;
for i in 0..loop_count {
eprintln!("\n[loop] ===== round {}/{} =====", i + 1, loop_count);
let rtmp_url = format!("rtmp://127.0.0.1:{}/live/test", port);
let mut child = std::process::Command::new("ffmpeg")
.args([
"-y",
"-re",
"-i", video_path,
"-c:v", "copy",
"-c:a", "copy",
"-f", "flv",
&rtmp_url,
])
.stderr(std::process::Stdio::null())
.stdout(std::process::Stdio::null())
.spawn()
.expect("ffmpeg failed to start");
// 等待 ffmpeg 结束(最多 30 秒)
let start = std::time::Instant::now();
loop {
match child.try_wait() {
Ok(Some(status)) => {
eprintln!("[loop] ffmpeg round {} exited: {}", i + 1, status);
break;
}
Ok(None) => {
if start.elapsed() > Duration::from_secs(30) {
eprintln!("[loop] timeout, killing ffmpeg");
let _ = child.kill();
let _ = child.wait();
break;
}
thread::sleep(Duration::from_millis(200));
}
Err(_) => break,
}
}
// 轮次间隔
if i + 1 < loop_count {
thread::sleep(Duration::from_millis(500));
}
}
running.store(false, Ordering::Relaxed);
let _ = server_handle.join();
let media = stats.media_count.load(Ordering::Relaxed);
let audio = stats.audio_count.load(Ordering::Relaxed);
let video = stats.video_count.load(Ordering::Relaxed);
let pubs = stats.publish_count.load(Ordering::Relaxed);
eprintln!("\n[summary] ========================");
eprintln!("[summary] publish sessions: {}", pubs);
eprintln!("[summary] total media frames: {}", media);
eprintln!("[summary] audio: {}", audio);
eprintln!("[summary] video: {}", video);
eprintln!("[summary] FLV saved to: {}", flv_path);
eprintln!("[summary] play: ffplay {}", flv_path);
eprintln!("[summary] ========================");
assert!(pubs >= 1, "should have at least 1 publish session");
assert!(media > 0, "should receive media frames");
assert!(audio > 0, "should receive audio frames");
assert!(video > 0, "should receive video frames");
// 验证 FLV 文件可被 ffmpeg 识别
let flv_size = std::fs::metadata(flv_path).map(|m| m.len()).unwrap_or(0);
eprintln!("[summary] FLV file size: {} bytes", flv_size);
assert!(flv_size > 13, "FLV file should be larger than header");
// 用 ffprobe 检查 FLV 文件
let probe = std::process::Command::new("ffprobe")
.args(["-v", "quiet", "-print_format", "json", "-show_streams", flv_path])
.output();
match probe {
Ok(output) => {
let stdout = String::from_utf8_lossy(&output.stdout);
eprintln!("[summary] ffprobe result:");
for line in stdout.lines().filter(|l| l.contains("codec_name") || l.contains("codec_type")) {
eprintln!(" {}", line.trim());
}
}
Err(e) => eprintln!("[summary] ffprobe failed: {}", e),
}
}