251 lines
9.3 KiB
Rust
251 lines
9.3 KiB
Rust
//! RTMP 循环推流测试 — 持续接收 ffmpeg 推流并保存为 FLV 文件
|
|
//!
|
|
//! 使用方法:
|
|
//! cargo test --test rtmp_loop -- --nocapture
|
|
//!
|
|
//! 播放:
|
|
//! ffplay /tmp/rave_test.flv
|
|
|
|
use std::fs::File;
|
|
use std::io::{Read, Write};
|
|
use std::net::{TcpListener, TcpStream};
|
|
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
|
use std::sync::Arc;
|
|
use std::thread;
|
|
use std::time::Duration;
|
|
|
|
use rave::protocol::rtmp::session::{RtmpSession, SessionEvent, SessionState};
|
|
use rave::remux::rtmp2flv;
|
|
|
|
struct Stats {
|
|
media_count: AtomicUsize,
|
|
audio_count: AtomicUsize,
|
|
video_count: AtomicUsize,
|
|
publish_count: AtomicUsize,
|
|
}
|
|
|
|
/// RTMP → FLV 录制服务器
|
|
fn run_recorder(mut stream: TcpStream, stats: &Arc<Stats>, flv_file: Arc<std::sync::Mutex<File>>) {
|
|
let peer = stream.peer_addr().map(|a| a.to_string()).unwrap_or_default();
|
|
eprintln!("[recorder] client connected from {}", peer);
|
|
|
|
let mut session = RtmpSession::new();
|
|
let mut buf = [0u8; 65536];
|
|
let mut flv_header_sent = false;
|
|
|
|
stream.set_read_timeout(Some(Duration::from_secs(15))).ok();
|
|
stream.set_write_timeout(Some(Duration::from_secs(5))).ok();
|
|
|
|
loop {
|
|
match stream.read(&mut buf) {
|
|
Ok(0) => {
|
|
eprintln!("[recorder] client {} disconnected", peer);
|
|
break;
|
|
}
|
|
Ok(n) => {
|
|
let events = session.on_data(&buf[..n]);
|
|
for event in events {
|
|
match event {
|
|
SessionEvent::SendData(data) => {
|
|
if stream.write_all(&data).is_err() {
|
|
return;
|
|
}
|
|
}
|
|
SessionEvent::PublishStart { ref app, ref stream } => {
|
|
let count = stats.publish_count.fetch_add(1, Ordering::Relaxed) + 1;
|
|
eprintln!("[recorder] #{count} PublishStart: app={app}, stream={stream}");
|
|
}
|
|
SessionEvent::MediaData { msg_type_id, timestamp, data } => {
|
|
let total = stats.media_count.fetch_add(1, Ordering::Relaxed) + 1;
|
|
match msg_type_id {
|
|
8 => { stats.audio_count.fetch_add(1, Ordering::Relaxed); }
|
|
9 => { stats.video_count.fetch_add(1, Ordering::Relaxed); }
|
|
_ => {}
|
|
}
|
|
|
|
// 写 FLV
|
|
if let Ok(mut f) = flv_file.lock() {
|
|
if !flv_header_sent {
|
|
let hdr = rtmp2flv::flv_header_with_audio_video();
|
|
let _ = f.write_all(&hdr);
|
|
flv_header_sent = true;
|
|
}
|
|
if let Some(tag) = rtmp2flv::rtmp_msg_to_flv_tag(msg_type_id, timestamp, &data) {
|
|
let _ = f.write_all(&tag.encode());
|
|
}
|
|
// 每 100 帧刷新一次
|
|
if total % 100 == 0 {
|
|
let _ = f.flush();
|
|
}
|
|
}
|
|
}
|
|
SessionEvent::Error(e) => {
|
|
eprintln!("[recorder] error: {}", e);
|
|
if session.state() == SessionState::Closed {
|
|
return;
|
|
}
|
|
}
|
|
SessionEvent::SessionClosed => return,
|
|
_ => {}
|
|
}
|
|
}
|
|
}
|
|
Err(e) => {
|
|
if e.kind() == std::io::ErrorKind::TimedOut || e.kind() == std::io::ErrorKind::WouldBlock {
|
|
continue;
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn test_rtmp_loop_push_and_record() {
|
|
let video_path = "/tmp/test_video.mp4";
|
|
if !std::path::Path::new(video_path).exists() {
|
|
eprintln!("SKIP: {} not found", video_path);
|
|
return;
|
|
}
|
|
if std::env::var("RTMP_LOOP").is_err() {
|
|
eprintln!("NOTE: set env RTMP_LOOP=1 to enable this test");
|
|
eprintln!(" RTMP_LOOP=1 cargo test --test rtmp_loop -- --nocapture");
|
|
return;
|
|
}
|
|
|
|
let stats = Arc::new(Stats {
|
|
media_count: AtomicUsize::new(0),
|
|
audio_count: AtomicUsize::new(0),
|
|
video_count: AtomicUsize::new(0),
|
|
publish_count: AtomicUsize::new(0),
|
|
});
|
|
|
|
// 创建 FLV 输出文件
|
|
let flv_path = "/tmp/rave_test.flv";
|
|
let flv_file = Arc::new(std::sync::Mutex::new(
|
|
File::create(flv_path).expect("create flv file failed"),
|
|
));
|
|
|
|
// 启动 RTMP 服务器
|
|
let listener = TcpListener::bind("127.0.0.1:0").expect("bind failed");
|
|
let port = listener.local_addr().unwrap().port();
|
|
let running = Arc::new(AtomicBool::new(true));
|
|
|
|
let stats_srv = stats.clone();
|
|
let flv_srv = flv_file.clone();
|
|
let running_srv = running.clone();
|
|
|
|
let server_handle = thread::spawn(move || {
|
|
listener.set_nonblocking(false).ok();
|
|
eprintln!("========================================");
|
|
eprintln!(" Rave RTMP Server");
|
|
eprintln!(" Push: ffmpeg -re -i {} -c copy -f flv rtmp://127.0.0.1:{}/live/test", video_path, port);
|
|
eprintln!(" Watch: ffplay {}", flv_path);
|
|
eprintln!("========================================");
|
|
|
|
while running_srv.load(Ordering::Relaxed) {
|
|
listener.set_nonblocking(true).ok();
|
|
match listener.accept() {
|
|
Ok((stream, _)) => {
|
|
let s = stats_srv.clone();
|
|
let f = flv_srv.clone();
|
|
thread::spawn(move || run_recorder(stream, &s, f));
|
|
}
|
|
Err(_) => {}
|
|
}
|
|
thread::sleep(Duration::from_millis(50));
|
|
}
|
|
});
|
|
|
|
// 循环推流 3 次
|
|
let loop_count = 3;
|
|
for i in 0..loop_count {
|
|
eprintln!("\n[loop] ===== round {}/{} =====", i + 1, loop_count);
|
|
|
|
let rtmp_url = format!("rtmp://127.0.0.1:{}/live/test", port);
|
|
let mut child = std::process::Command::new("ffmpeg")
|
|
.args([
|
|
"-y",
|
|
"-re",
|
|
"-i", video_path,
|
|
"-c:v", "copy",
|
|
"-c:a", "copy",
|
|
"-f", "flv",
|
|
&rtmp_url,
|
|
])
|
|
.stderr(std::process::Stdio::null())
|
|
.stdout(std::process::Stdio::null())
|
|
.spawn()
|
|
.expect("ffmpeg failed to start");
|
|
|
|
// 等待 ffmpeg 结束(最多 30 秒)
|
|
let start = std::time::Instant::now();
|
|
loop {
|
|
match child.try_wait() {
|
|
Ok(Some(status)) => {
|
|
eprintln!("[loop] ffmpeg round {} exited: {}", i + 1, status);
|
|
break;
|
|
}
|
|
Ok(None) => {
|
|
if start.elapsed() > Duration::from_secs(30) {
|
|
eprintln!("[loop] timeout, killing ffmpeg");
|
|
let _ = child.kill();
|
|
let _ = child.wait();
|
|
break;
|
|
}
|
|
thread::sleep(Duration::from_millis(200));
|
|
}
|
|
Err(_) => break,
|
|
}
|
|
}
|
|
|
|
// 轮次间隔
|
|
if i + 1 < loop_count {
|
|
thread::sleep(Duration::from_millis(500));
|
|
}
|
|
}
|
|
|
|
running.store(false, Ordering::Relaxed);
|
|
let _ = server_handle.join();
|
|
|
|
let media = stats.media_count.load(Ordering::Relaxed);
|
|
let audio = stats.audio_count.load(Ordering::Relaxed);
|
|
let video = stats.video_count.load(Ordering::Relaxed);
|
|
let pubs = stats.publish_count.load(Ordering::Relaxed);
|
|
|
|
eprintln!("\n[summary] ========================");
|
|
eprintln!("[summary] publish sessions: {}", pubs);
|
|
eprintln!("[summary] total media frames: {}", media);
|
|
eprintln!("[summary] audio: {}", audio);
|
|
eprintln!("[summary] video: {}", video);
|
|
eprintln!("[summary] FLV saved to: {}", flv_path);
|
|
eprintln!("[summary] play: ffplay {}", flv_path);
|
|
eprintln!("[summary] ========================");
|
|
|
|
assert!(pubs >= 1, "should have at least 1 publish session");
|
|
assert!(media > 0, "should receive media frames");
|
|
assert!(audio > 0, "should receive audio frames");
|
|
assert!(video > 0, "should receive video frames");
|
|
|
|
// 验证 FLV 文件可被 ffmpeg 识别
|
|
let flv_size = std::fs::metadata(flv_path).map(|m| m.len()).unwrap_or(0);
|
|
eprintln!("[summary] FLV file size: {} bytes", flv_size);
|
|
assert!(flv_size > 13, "FLV file should be larger than header");
|
|
|
|
// 用 ffprobe 检查 FLV 文件
|
|
let probe = std::process::Command::new("ffprobe")
|
|
.args(["-v", "quiet", "-print_format", "json", "-show_streams", flv_path])
|
|
.output();
|
|
|
|
match probe {
|
|
Ok(output) => {
|
|
let stdout = String::from_utf8_lossy(&output.stdout);
|
|
eprintln!("[summary] ffprobe result:");
|
|
for line in stdout.lines().filter(|l| l.contains("codec_name") || l.contains("codec_type")) {
|
|
eprintln!(" {}", line.trim());
|
|
}
|
|
}
|
|
Err(e) => eprintln!("[summary] ffprobe failed: {}", e),
|
|
}
|
|
}
|