diff --git a/AGENTS.md b/AGENTS.md index 8c25be3..11ee796 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -27,19 +27,19 @@ ## Project Identity -**Rave** is a full-protocol streaming media server engine written in **pure Rust, zero third-party crates**. It draws architectural inspiration from [lal (Go)](https://github.com/q191201771/lal) and [Monibuca v6 (Rust)](https://monibuca.com/) — but must not depend on any external crate (no `tokio`, no `bytes`, no `parking_lot`, nothing from crates.io). +**Rave** is a full-protocol streaming media server engine written in **pure Rust, zero third-party crates** (except `tokio` for async runtime). It draws architectural inspiration from [lal (Go)](https://github.com/q191201771/lal) and [Monibuca v6 (Rust)](https://monibuica.com/) — but must not depend on any external crate beyond `tokio`. -## Hard Constraint: No Third-Party Dependencies +## Hard Constraint: Minimal Third-Party Dependencies ``` -# Cargo.toml [dependencies] MUST stay empty +# Cargo.toml [dependencies] — only tokio is allowed [dependencies] +tokio = { version = "1", features = ["rt-multi-thread", "macros", "net", "io-util", "time", "sync", "signal"] } ``` -Everything — async runtime, networking, concurrency primitives, codec parsers — must be hand-written using only `std` and `core`. This is a deliberate design choice, not a temporary state. +Everything beyond `tokio` async runtime — networking, concurrency primitives, codec parsers — must be hand-written using only `std` and `core`. This is a deliberate design choice, not a temporary state. Consequences an agent must remember: -- No `tokio`, `async-std`, `smol` — build epoll/kqueue async I/O from `std::os::unix` / `std::os::windows` if needed - No `bytes` — use `Vec`, slices, or custom `Bytes`-like arena - No `parking_lot`, `dashmap`, `crossbeam` — use `std::sync::{Mutex, RwLock, Arc}` and `atomic` types - No `serde` — hand-parse/serialize config (TOML/JSON/YAML) @@ -80,13 +80,14 @@ Every plugin follows: `Created → init() → start() → [running] → stop() use rave::sdk::plugin::{Plugin, PluginMeta, PluginState, ProtocolPlugin}; use rave::sdk::context::EngineContext; use rave::sdk::traits::{ConfigProvider, StreamManagerApi}; +use std::sync::Arc; pub struct RtmpPlugin { /* ... */ } impl Plugin for RtmpPlugin { fn meta(&self) -> &PluginMeta { /* ... */ } - fn init(&mut self, ctx: &EngineContext, cfg: &dyn ConfigProvider) -> Result<(), String> { /* ... */ } - fn start(&mut self, ctx: &EngineContext) -> Result<(), String> { /* ... */ } + fn init(&mut self, ctx: Arc, cfg: &dyn ConfigProvider) -> Result<(), String> { /* ... */ } + fn start(&mut self, ctx: Arc) -> Result<(), String> { /* ... */ } fn stop(&mut self) -> Result<(), String> { /* ... */ } fn state(&self) -> PluginState { /* ... */ } fn as_any(&self) -> &dyn std::any::Any { self } @@ -133,15 +134,37 @@ Plugins receive `EngineContext` during `init()` and `start()`. It provides: ### PluginRegistry Manages all registered plugins. Called from `main.rs`: + ```rust -let registry = PluginRegistry::new(context); -registry.register(Box::new(RtmpPlugin::new())); -registry.init_all(&config)?; // calls init() on each -registry.start_all()?; // calls start() on each +// 内置插件:通过 all_plugins() 自动注册 +for plugin in protocol::all_plugins() { + registry.register(plugin); +} + +// 外部插件:手动注册 +// registry.register(Box::new(my_external_plugin::XxxPlugin::new())); + +registry.init_all(&config)?; // calls init() on each, checks .enabled +registry.start_all()?; // calls start() on each (spawns async listeners) // ... server runs ... registry.stop_all()?; // reverse-order stop ``` +### 内置插件自动注册 + +内置协议插件通过 `protocol::all_plugins()` 清单函数统一注册: +- 新增内置协议:在 `protocol//plugin.rs` 实现插件 → 在 `protocol/mod.rs` 的 `all_plugins()` 加一行 +- 外部插件:在 `main.rs` 中手动 `registry.register()` + +### 配置级插件开关 + +通过 `<插件名>.enabled = false` 在配置中禁用指定插件: + +```toml +rtmp.enabled = false # 禁用 RTMP 插件 +rtsp.enabled = false # 禁用 RTSP 插件 +``` + ## Target Protocol Support ### Phase 1 — Core Protocols @@ -170,7 +193,7 @@ registry.stop_all()?; // reverse-order stop ``` rave/ -├── Cargo.toml # [dependencies] = empty, edition = "2024" +├── Cargo.toml # [dependencies] = tokio only, edition = "2024" ├── src/ │ ├── main.rs # CLI entry, config load, PluginRegistry bootstrap │ ├── lib.rs # Module root: core, sdk, config, logger @@ -190,9 +213,44 @@ rave/ │ │ ├── traits.rs # PublisherApi, SubscriberApi, StreamManagerApi, EventHandler │ │ ├── plugin.rs # Plugin trait, ProtocolPlugin trait, PluginMeta, PluginState │ │ ├── context.rs # EngineContext — IoC service locator -│ │ └── registry.rs # PluginRegistry — lifecycle management +│ │ └── registry.rs # PluginRegistry — lifecycle management, config enable/disable +│ ├── codec/ +│ │ ├── h264.rs # H.264 NALU 解析、AVCC 编码 +│ │ ├── hevc.rs # H.265 NALU 类型 +│ │ ├── aac.rs # AAC ADTS 解析 +│ │ ├── flv.rs # FLV tag 编解码 +│ │ └── ts.rs # MPEG-TS 打包 +│ ├── protocol/ +│ │ ├── mod.rs # all_plugins() 内置插件清单函数 +│ │ ├── rtmp/ +│ │ │ ├── plugin.rs # RtmpPlugin (Plugin + ProtocolPlugin) +│ │ │ ├── server.rs # RtmpServer TCP 监听 +│ │ │ ├── session.rs # RTMP 会话状态机 +│ │ │ ├── handshake.rs# RTMP 握手 (C0/S0–C2/S2) +│ │ │ ├── chunk.rs # Chunk 协议解析 +│ │ │ ├── message.rs # RTMP 消息类型 +│ │ │ └── amf0.rs # AMF0 编解码 +│ │ ├── rtsp/ +│ │ │ ├── plugin.rs # RtspPlugin (Plugin + ProtocolPlugin) +│ │ │ ├── server.rs # RtspServer TCP 监听 +│ │ │ ├── session.rs # RTSP 会话(DESCRIBE/SETUP/PLAY/RECORD) +│ │ │ ├── rtp.rs # RTP 封装/解包 +│ │ │ ├── rtcp.rs # RTCP Sender Report +│ │ │ ├── depacketizer.rs # RTP→AVFrame 解包器(H.264 FU-A、AAC) +│ │ │ ├── sdps.rs # SDP 生成 +│ │ │ ├── request.rs # RTSP 请求解析 +│ │ │ └── response.rs # RTSP 响应构造 +│ │ ├── httpflv.rs # HTTP-FLV 请求解析、FLV 流式输出 +│ │ ├── httpflv_server.rs # HTTP-FLV TCP 服务器 +│ │ ├── httpflv_server_plugin.rs # HttpFlvPlugin (Plugin + ProtocolPlugin) +│ │ ├── hls.rs # HLS 框架 +│ │ └── wsflv.rs # WebSocket-FLV 框架 +│ ├── remux/ +│ │ ├── rtmp2flv.rs # RTMP ↔ FLV +│ │ └── flv2ts.rs # FLV ↔ MPEG-TS │ ├── config.rs # Hand-written TOML parser, implements ConfigProvider -│ └── logger.rs # Minimal stderr logger with level filtering, #[macro_export] macros +│ ├── logger.rs # Minimal stderr logger with level filtering, #[macro_export] macros +│ └── stats.rs # ServerStats — bandwidth, fps, connection counters ``` ## Build & Run @@ -226,10 +284,8 @@ Logger macros are `#[macro_export]` — use `rave::log_info!(...)` from the bina ## Coding Conventions -- Pure `std` only. If you catch yourself writing `use ::` for a non-std crate, stop. +- Only `tokio` from crates.io; all other code uses `std` and `core` only. If you catch yourself writing `use ::` for a non-std crate (other than tokio), stop. - **Plugin isolation**: Protocol handlers in `protocol/` must only import from `sdk/`, never from `core/`. The `core/` module is engine-private. -- All networking must be non-blocking; implement a minimal reactor on `epoll` (Linux) / `kqueue` (macOS/BSD) if an async runtime is needed. -- Use `#[cfg(target_os = "linux")]` / `#[cfg(target_os = "macos")]` for platform-specific I/O. - RTMP chunk size defaults to 128 bytes; handle chunk size negotiation (SetChunkSize message). - Timestamps are in milliseconds (RTMP) or 90kHz clock (RTP/RTSP) — always normalize to a common clock internally. - Test protocol parsers with hand-crafted byte fixtures, not external libraries. diff --git a/Cargo.lock b/Cargo.lock index 1b275bc..ca8744a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8,6 +8,16 @@ version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" +[[package]] +name = "errno" +version = "0.3.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" +dependencies = [ + "libc", + "windows-sys", +] + [[package]] name = "libc" version = "0.2.186" @@ -56,6 +66,16 @@ dependencies = [ "tokio", ] +[[package]] +name = "signal-hook-registry" +version = "1.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4db69cba1110affc0e9f7bcd48bbf87b3f4fc7c61fc9155afd4c469eb3d6c1b" +dependencies = [ + "errno", + "libc", +] + [[package]] name = "socket2" version = "0.6.3" @@ -87,6 +107,7 @@ dependencies = [ "libc", "mio", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "windows-sys", diff --git a/Cargo.toml b/Cargo.toml index 90da4be..f6abe47 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,6 @@ version = "0.1.0" edition = "2024" [dependencies] -tokio = { version = "1", features = ["rt-multi-thread", "macros", "net", "io-util", "time", "sync"] } +tokio = { version = "1", features = ["rt-multi-thread", "macros", "net", "io-util", "time", "sync", "signal"] } # ffmpeg -re -i video.mp4 -c copy -f flv rtmp://localhost:1935/live/test # ffplay http://localhost:8080/live/test.flv \ No newline at end of file diff --git a/README.md b/README.md index dfe3761..b9ecf3d 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ | 协议 | 推流 | 拉流 | 状态 | |------|:----:|:----:|:----:| | RTMP | ✅ | ✅ | 握手、Chunk 协议、AMF0、Publish/Play | -| RTSP/RTP | ✅ | ⚠️ | ANNOUNCE/RECORD 推流完成;DESCRIBE/PLAY 拉流开发中 | +| RTSP/RTP | ✅ | ✅ | ANNOUNCE/RECORD 推流;DESCRIBE/PLAY 拉流 | | HTTP-FLV | — | ✅ | HTTP 长连接 + FLV 实时流 | | WebSocket-FLV | — | 🚧 | 框架已搭建 | | HLS | — | 🚧 | 框架已搭建 | @@ -58,8 +58,8 @@ ffplay rtsp://localhost:5544/live/test | 推流 → 拉流 | RTMP | HTTP-FLV | RTSP | |:-----------:|:----:|:--------:|:----:| -| **RTMP** | ✅ | ✅ | ⚠️ | -| **RTSP** | ✅ | ✅ | ⚠️ | +| **RTMP** | ✅ | ✅ | ✅ | +| **RTSP** | ✅ | ✅ | ✅ | ## HTTP API 接口 @@ -75,20 +75,24 @@ ffplay rtsp://localhost:5544/live/test ## 配置 -默认端口可在 `rave.conf` (TOML 格式) 中配置: +默认端口和插件开关可在 `rave.conf` (TOML 格式) 中配置: ```toml rtmp.port = 1935 httpflv.port = 8080 rtsp.port = 5544 log.level = "info" + +# 插件开关(设为 false 禁用对应协议) +# rtmp.enabled = false +# rtsp.enabled = false ``` ## 架构 ``` src/ -├── main.rs # 入口:配置加载、服务器启动 +├── main.rs # 入口:配置加载、插件注册、banner、优雅关闭 ├── lib.rs # 模块根 ├── config.rs # 手写 TOML 解析器 ├── logger.rs # 最小化 stderr 日志 @@ -106,23 +110,23 @@ src/ │ ├── traits.rs # PublisherApi、SubscriberApi、StreamManagerApi │ ├── plugin.rs # Plugin / ProtocolPlugin trait │ ├── context.rs # EngineContext(IoC 容器) -│ └── registry.rs # PluginRegistry(生命周期管理) +│ └── registry.rs # PluginRegistry(生命周期管理 + 配置开关) ├── codec/ # 编解码器 │ ├── h264.rs # H.264 NALU 解析、AVCC 编码 │ ├── h265.rs # H.265 NALU 类型 │ ├── aac.rs # AAC ADTS 解析 │ ├── flv.rs # FLV tag 编解码 -│ └── mpegts.rs # MPEG-TS 打包 +│ └── ts.rs # MPEG-TS 打包 ├── protocol/ # 协议实现 -│ ├── rtmp/ # RTMP 握手、Chunk、AMF0、Session -│ ├── rtsp/ # RTSP 信令、RTP、Depacketizer -│ ├── httpflv*.rs # HTTP-FLV 服务端 +│ ├── mod.rs # all_plugins() 内置插件清单函数 +│ ├── rtmp/ # RTMP 插件、握手、Chunk、AMF0、Session +│ ├── rtsp/ # RTSP 插件、信令、RTP、Depacketizer +│ ├── httpflv*.rs # HTTP-FLV 插件 + 服务端 │ ├── hls.rs # HLS 框架 │ └── wsflv.rs # WebSocket-FLV 框架 └── remux/ # 协议转换 ├── rtmp2flv.rs # RTMP ↔ FLV - ├── rtmp2ts.rs # RTMP ↔ MPEG-TS - └── rtmp_to_rtp.rs # AVFrame → RTP + └── flv2ts.rs # FLV ↔ MPEG-TS ``` ### 性能设计 @@ -144,10 +148,16 @@ core/ (实现 sdk traits) | protocol/ (仅依赖 sdk:: traits + types) ───────────────────┘ ``` +**内置插件**通过 `protocol::all_plugins()` 自动注册,新增内置协议只需: +1. 在 `protocol//plugin.rs` 实现 `Plugin` + `ProtocolPlugin` trait +2. 在 `protocol/mod.rs` 的 `all_plugins()` 中添加一行 + +**外部插件**通过 `registry.register()` 手动注册。 + ## 测试 ```bash -# 运行全部测试(485 个) +# 运行全部测试(513 个) cargo test # 运行特定测试 diff --git a/src/core/group.rs b/src/core/group.rs index 0cfc372..10f855a 100644 --- a/src/core/group.rs +++ b/src/core/group.rs @@ -10,7 +10,7 @@ use std::sync::{Arc, RwLock}; use crate::core::stream::Stream; use crate::sdk::traits::{PublisherApi, StreamManagerApi, SubscriberApi}; -use crate::sdk::types::{AVFrame, StreamPath, StreamSummary}; +use crate::sdk::types::{AVFrame, StreamCodecMeta, StreamPath, StreamSummary}; /// 流管理器 /// @@ -110,12 +110,19 @@ impl StreamManagerApi for StreamManager { let streams = self.streams.read().unwrap(); streams.values().map(|s| s.summary()).collect() } + + /// 获取流的编解码器元数据 + fn get_codec_metadata(&self, path: &StreamPath) -> Option { + let key = path.full_path(); + let streams = self.streams.read().unwrap(); + streams.get(&key).map(|s| s.codec_metadata()) + } } #[cfg(test)] mod tests { use super::*; - use crate::sdk::types::{FrameType, StreamPath, VideoCodec, AudioCodec, AVFrame}; + use crate::sdk::types::{FrameType, StreamPath, VideoCodec, AudioCodec, AVFrame, CodecExtraInfo, H264SeqHeader, AacSeqHeader}; use std::sync::Arc; #[test] @@ -396,4 +403,46 @@ mod tests { } assert_eq!(last_ts, 1099, "last frame should be the most recent"); } + + /// 测试:get_codec_metadata 对不存在的流返回 None + #[test] + fn test_stream_manager_get_codec_metadata_missing_returns_none() { + let mgr = StreamManager::new(); + let path = StreamPath::new("live", "missing"); + assert!(mgr.get_codec_metadata(&path).is_none()); + } + + /// 测试:get_codec_metadata 从流中提取编解码器元数据 + #[test] + fn test_stream_manager_get_codec_metadata_extracts_meta() { + let mgr = StreamManager::new(); + let path = StreamPath::new("live", "meta"); + + let stream = create_stream_and_get_inner(&mgr, &path); + + // 写入带 SPS/PPS 的视频 seq_header + let sps = std::sync::Arc::new(vec![0x67, 0x64, 0x00, 0x29, 0xAC]); + let pps = std::sync::Arc::new(vec![0x68, 0xEE, 0x31, 0x12]); + let mut vframe = AVFrame::new_video(0, std::sync::Arc::new(vec![]), VideoCodec::H264, FrameType::KeyFrame); + vframe.codec_info = Some(CodecExtraInfo::H264SeqHeader(H264SeqHeader { + sps: sps.clone(), + pps: pps.clone(), + })); + stream.dispatch_frame(vframe); + + // 写入带 AAC config 的音频 seq_header + let aac_config = std::sync::Arc::new(vec![0x12, 0x10]); // 44100Hz, 2ch + let mut aframe = AVFrame::new_audio(0, std::sync::Arc::new(vec![]), AudioCodec::Aac); + aframe.codec_info = Some(CodecExtraInfo::AacSeqHeader(AacSeqHeader { + audio_specific_config: aac_config.clone(), + })); + stream.dispatch_frame(aframe); + + let meta = mgr.get_codec_metadata(&path).expect("应返回元数据"); + assert!(meta.h264_sps.is_some(), "应有 SPS"); + assert!(meta.h264_pps.is_some(), "应有 PPS"); + assert!(meta.aac_config.is_some(), "应有 AAC config"); + assert_eq!(meta.audio_sample_rate, 44100, "采样率应为 44100"); + assert_eq!(meta.audio_channels, 2, "通道数应为 2"); + } } diff --git a/src/core/stream.rs b/src/core/stream.rs index e12ee79..ae1f281 100644 --- a/src/core/stream.rs +++ b/src/core/stream.rs @@ -12,7 +12,7 @@ use std::sync::{Arc, Mutex}; use crate::core::dispatcher::Dispatcher; use crate::core::publisher::Publisher; use crate::core::subscriber::Subscriber; -use crate::sdk::types::{AudioCodec, AVFrame, StreamPath, StreamSummary, VideoCodec}; +use crate::sdk::types::{AudioCodec, AVFrame, CodecExtraInfo, StreamCodecMeta, StreamPath, StreamSummary, VideoCodec}; /// 流实例 /// @@ -203,13 +203,80 @@ impl Stream { total_audio_frames: self.total_audio_frames.load(Ordering::Relaxed), } } + + /// 获取流的编解码器元数据 + /// + /// 扫描 GOP 缓存中的 seq_header 帧,提取 H.264 SPS/PPS 和 AAC AudioSpecificConfig。 + /// 用于 RTSP DESCRIBE 响应生成 SDP 描述。 + /// + /// AAC AudioSpecificConfig 2 字节解析: + /// - bits [4:0] byte0 + [7:5] byte1 → audioObjectType(5 bits 从 byte0 高 5 位) + /// - bits [4:1] byte1 → samplingFrequencyIndex(4 bits) + /// - bit [0] byte1 + [7:5] byte2 → channelConfiguration(4 bits) + pub fn codec_metadata(&self) -> StreamCodecMeta { + let cache = self.gop_cache.lock().unwrap(); + let mut meta = StreamCodecMeta::default(); + + for frame in cache.iter() { + if let Some(ref info) = frame.codec_info { + match info { + CodecExtraInfo::H264SeqHeader(sh) => { + meta.h264_sps = Some(sh.sps.clone()); + meta.h264_pps = Some(sh.pps.clone()); + } + CodecExtraInfo::AacSeqHeader(ah) => { + meta.aac_config = Some(ah.audio_specific_config.clone()); + // 解析 2 字节 AudioSpecificConfig 提取采样率和通道数 + let config = &ah.audio_specific_config; + if config.len() >= 2 { + // samplingFrequencyIndex: bits [7:3] of byte[1] + // 格式: audioObjectType(5)<<11 | samplingFreqIndex(4)<<7 | channelConfig(3)<<4 | ... + // 实际 layout: byte0[7:3]=audioObjectType(5), byte0[2:0]+byte1[7]=samplingFreqIndex(4) + // byte1[6:3]=channelConfiguration(4), ... + // 简化:byte0 低 3 位 << 1 | byte1 高 1 位 = samplingFreqIndex + let sfi = (((config[0] & 0x07) as u32) << 1) + | (((config[1] >> 7) & 0x01) as u32); + // channelConfiguration: byte1 bits [6:3] + let ch = (config[1] >> 3) & 0x0F; + meta.audio_sample_rate = sampling_rate_from_index(sfi); + meta.audio_channels = ch; + } + } + } + } + } + + meta + } +} + +/// 根据 samplingFrequencyIndex 查表返回采样率 +/// +/// AAC AudioSpecificConfig 中的采样率索引表(ISO 14496-3 Table 1.16) +fn sampling_rate_from_index(index: u32) -> u32 { + match index { + 0 => 96000, + 1 => 88200, + 2 => 64000, + 3 => 48000, + 4 => 44100, + 5 => 32000, + 6 => 24000, + 7 => 22050, + 8 => 16000, + 9 => 12000, + 10 => 11025, + 11 => 8000, + 12 => 7350, + _ => 44100, // 未知时默认 44100 + } } #[cfg(test)] mod tests { use super::*; use crate::sdk::traits::{PublisherApi, SubscriberApi}; - use crate::sdk::types::{AudioCodec, FrameType, VideoCodec}; + use crate::sdk::types::{AudioCodec, CodecExtraInfo, FrameType, VideoCodec, AacSeqHeader, H264SeqHeader}; use std::sync::Arc; fn make_path() -> StreamPath { @@ -347,4 +414,106 @@ mod tests { let _sub2 = stream.subscribe(); assert_eq!(stream.summary().subscriber_count, 2); } + + /// 测试:codec_metadata 初始状态返回空元数据 + #[test] + fn test_stream_codec_metadata_initial_state_empty() { + let stream = Stream::new(make_path()); + let meta = stream.codec_metadata(); + assert!(meta.h264_sps.is_none(), "初始状态不应有 SPS"); + assert!(meta.h264_pps.is_none(), "初始状态不应有 PPS"); + assert!(meta.aac_config.is_none(), "初始状态不应有 AAC config"); + assert_eq!(meta.audio_sample_rate, 0); + assert_eq!(meta.audio_channels, 0); + } + + /// 测试:codec_metadata 从 GOP 缓存中的 seq_header 帧提取 H.264 SPS/PPS + #[test] + fn test_stream_codec_metadata_extracts_h264_sps_pps() { + let stream = Stream::new(make_path()); + let sps_data = Arc::new(vec![0x67, 0x64, 0x00, 0x29, 0xAC, 0xD9, 0x40, 0x78, 0x02]); + let pps_data = Arc::new(vec![0x68, 0xEE, 0x31, 0x12]); + + let mut frame = AVFrame::new_video(0, Arc::new(vec![]), VideoCodec::H264, FrameType::KeyFrame); + frame.codec_info = Some(CodecExtraInfo::H264SeqHeader(H264SeqHeader { + sps: sps_data.clone(), + pps: pps_data.clone(), + })); + stream.dispatch_frame(frame); + + let meta = stream.codec_metadata(); + assert!(meta.h264_sps.is_some(), "应有 SPS"); + assert!(meta.h264_pps.is_some(), "应有 PPS"); + assert_eq!(&*meta.h264_sps.unwrap(), &*sps_data); + assert_eq!(&*meta.h264_pps.unwrap(), &*pps_data); + } + + /// 测试:codec_metadata 从 GOP 缓存中的 seq_header 帧提取 AAC config + #[test] + fn test_stream_codec_metadata_extracts_aac_config() { + let stream = Stream::new(make_path()); + // AAC AudioSpecificConfig: audioObjectType=2(AAC-LC), samplingFreqIndex=3(48000), channelConfig=2 + // byte0 = (audioObjectType=2) << 3 | (sfi=3) >> 1 = 00010_001 = 0x11 + // byte1 = (sfi=3 & 1) << 7 | (channelConfig=2) << 3 = 1_0010_000 = 0x90 + let aac_config = Arc::new(vec![0x11, 0x90]); + + let mut frame = AVFrame::new_audio(0, Arc::new(vec![]), AudioCodec::Aac); + frame.codec_info = Some(CodecExtraInfo::AacSeqHeader(AacSeqHeader { + audio_specific_config: aac_config.clone(), + })); + stream.dispatch_frame(frame); + + let meta = stream.codec_metadata(); + assert!(meta.aac_config.is_some(), "应有 AAC config"); + assert_eq!(&*meta.aac_config.unwrap(), &*aac_config); + assert_eq!(meta.audio_sample_rate, 48000, "采样率应为 48000"); + assert_eq!(meta.audio_channels, 2, "通道数应为 2"); + } + + /// 测试:codec_metadata 同时包含视频和音频元数据 + #[test] + fn test_stream_codec_metadata_both_video_and_audio() { + let stream = Stream::new(make_path()); + let sps_data = Arc::new(vec![0x67, 0x42, 0xC0, 0x1E, 0xD9]); + let pps_data = Arc::new(vec![0x68, 0xCE, 0x38, 0x80]); + // AAC AudioSpecificConfig: samplingFreqIndex=4(44100), channelConfig=2 + // byte0 = (2<<3) | (4>>1) = 0x12, byte1 = (4<<7) | (2<<3) = 0x90... wait + // audioObjectType=2(5bits) = 00010 + // samplingFreqIndex=4(4bits) = 0100 + // channelConfiguration=2(4bits) = 0010 + // layout: [00010][0100][0010][000] + // byte0 = 00010_010 = 0x12 + // byte1 = 0_0010_000 = 0x10 + let aac_config = Arc::new(vec![0x12, 0x10]); + + let mut video_frame = AVFrame::new_video(0, Arc::new(vec![]), VideoCodec::H264, FrameType::KeyFrame); + video_frame.codec_info = Some(CodecExtraInfo::H264SeqHeader(H264SeqHeader { + sps: sps_data.clone(), + pps: pps_data.clone(), + })); + stream.dispatch_frame(video_frame); + + let mut audio_frame = AVFrame::new_audio(0, Arc::new(vec![]), AudioCodec::Aac); + audio_frame.codec_info = Some(CodecExtraInfo::AacSeqHeader(AacSeqHeader { + audio_specific_config: aac_config.clone(), + })); + stream.dispatch_frame(audio_frame); + + let meta = stream.codec_metadata(); + assert!(meta.h264_sps.is_some()); + assert!(meta.h264_pps.is_some()); + assert!(meta.aac_config.is_some()); + assert_eq!(meta.audio_sample_rate, 44100, "采样率应为 44100"); + assert_eq!(meta.audio_channels, 2, "通道数应为 2"); + } + + /// 测试:sampling_rate_from_index 查表正确 + #[test] + fn test_sampling_rate_from_index_known_values() { + assert_eq!(super::sampling_rate_from_index(0), 96000); + assert_eq!(super::sampling_rate_from_index(3), 48000); + assert_eq!(super::sampling_rate_from_index(4), 44100); + assert_eq!(super::sampling_rate_from_index(11), 8000); + assert_eq!(super::sampling_rate_from_index(15), 44100); // 未知索引默认 44100 + } } diff --git a/src/main.rs b/src/main.rs index 759a106..9511473 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,17 +1,29 @@ //! Rave 流媒体服务器入口 //! -//! 基于 tokio 多线程异步运行时 +//! 基于 PluginRegistry 的插件注册模式启动服务器。 +//! +//! **内置插件**通过 `protocol::all_plugins()` 自动注册,无需手动列举。 +//! **外部插件**通过 `registry.register()` 手动注册。 +//! +//! 添加新的内置协议只需: +//! 1. 在 `protocol//` 下实现 `Plugin` + `ProtocolPlugin` trait +//! 2. 在 `protocol/mod.rs` 的 `all_plugins()` 中添加一行 +//! +//! 添加外部插件只需: +//! 1. 依赖 `rave-sdk` trait,实现 `Plugin` trait +//! 2. 在此处 `registry.register(Box::new(XxxPlugin::new()))` use std::sync::Arc; use rave::config::Config; use rave::core::group::StreamManager; +use rave::log_error; +use rave::log_info; use rave::log_warn; use rave::logger; -use rave::protocol::rtmp::server::{RtmpServer, RtmpServerConfig}; -use rave::protocol::httpflv_server::{HttpFlvServer, HttpFlvServerConfig}; -use rave::protocol::rtsp::server::{RtspServer, RtspServerConfig}; +use rave::protocol; use rave::sdk::context::EngineContext; +use rave::sdk::registry::PluginRegistry; use rave::stats::ServerStats; #[tokio::main] @@ -19,59 +31,58 @@ async fn main() { let args: Vec = std::env::args().collect(); let config_path = args.get(1).map(|s| s.as_str()).unwrap_or("rave.conf"); + // 加载配置(失败则使用默认值) let config = Config::from_file(config_path).unwrap_or_else(|e| { log_warn!("config load failed ({}), using defaults", e); Config::new() }); + // 设置日志级别 let log_level = config.get_or("log.level", "info"); logger::set_level(&log_level); - rave::log_info!("rave server starting (tokio async)"); + log_info!("rave server starting (plugin registry mode)"); + // 创建引擎上下文:StreamManager + 全局服务 let stream_manager = Arc::new(StreamManager::new()); let context = Arc::new(EngineContext::new(stream_manager)); - context.register_service(Arc::new(ServerStats::new())); - let rtmp_port: u16 = config.get_or("rtmp.port", "1935").parse().unwrap_or(1935); - let httpflv_port: u16 = config.get_or("httpflv.port", "8080").parse().unwrap_or(8080); - let rtsp_port: u16 = config.get_or("rtsp.port", "5544").parse().unwrap_or(5544); + // 创建插件注册表 + let registry = PluginRegistry::new(context); - let rtmp_config = RtmpServerConfig { - listen_addr: "0.0.0.0".to_string(), - port: rtmp_port, - }; - let httpflv_config = HttpFlvServerConfig { - listen_addr: "0.0.0.0".to_string(), - port: httpflv_port, - }; - let rtsp_config = RtspServerConfig { - listen_addr: "0.0.0.0".to_string(), - port: rtsp_port, - }; + // === 内置插件:自动注册 === + for plugin in protocol::all_plugins() { + registry.register(plugin); + } - rave::log_info!("======================================"); - rave::log_info!(" Rave Streaming Media Server"); - rave::log_info!(" RTMP: rtmp://0.0.0.0:{}/live/test", rtmp_port); - rave::log_info!(" HTTP-FLV: http://0.0.0.0:{}/live/test.flv", httpflv_port); - rave::log_info!(" RTSP: rtsp://0.0.0.0:{}/live/test", rtsp_port); - rave::log_info!("======================================"); - rave::log_info!("push: ffmpeg -re -i video.mp4 -c copy -f flv rtmp://localhost:{}/live/test", rtmp_port); - rave::log_info!("push: ffmpeg -re -i video.mp4 -c copy -f rtsp rtsp://localhost:{}/live/test", rtsp_port); - rave::log_info!("watch: ffplay rtmp://localhost:{}/live/test", rtmp_port); - rave::log_info!("watch: ffplay http://localhost:{}/live/test.flv", httpflv_port); - rave::log_info!("watch: ffplay rtsp://localhost:{}/live/test", rtsp_port); - rave::log_info!("======================================"); + // === 外部插件:手动注册 === + // 示例:registry.register(Box::new(my_external_plugin::XxxPlugin::new())); - let rtmp_server = RtmpServer::new(rtmp_config, context.clone()); - let httpflv_server = HttpFlvServer::new(httpflv_config, context.clone()); - let rtsp_server = RtspServer::new(rtsp_config, context.clone()); + // 初始化所有插件(读取配置,检查 enabled 标志) + if let Err(e) = registry.init_all(&config) { + log_error!("plugin init failed: {}", e); + return; + } - // 并发运行三个服务器 - let _ = tokio::join!( - tokio::spawn(async move { rtmp_server.listen().await }), - tokio::spawn(async move { httpflv_server.listen().await }), - tokio::spawn(async move { rtsp_server.listen().await }), - ); + // 打印启动 banner(在 start 之前,确保 banner 不被插件日志打断) + log_info!("======================================"); + log_info!(" Rave Streaming Media Server"); + for meta in registry.list_plugins() { + log_info!(" {} v{} - {}", meta.name, meta.version, meta.description); + } + log_info!("======================================"); + + // 启动所有已启用的插件(各插件打印各自的监听地址和使用提示) + if let Err(e) = registry.start_all() { + log_error!("plugin start failed: {}", e); + return; + } + log_info!("======================================"); + + // 主线程等待(服务器在 tokio::spawn 中运行) + // 监听 Ctrl+C 信号,触发优雅关闭 + tokio::signal::ctrl_c().await.ok(); + log_info!("shutting down..."); + let _ = registry.stop_all(); } diff --git a/src/protocol/httpflv_server.rs b/src/protocol/httpflv_server.rs index b8a7c41..1be02e7 100644 --- a/src/protocol/httpflv_server.rs +++ b/src/protocol/httpflv_server.rs @@ -59,8 +59,6 @@ impl HttpFlvServer { .await .map_err(|e| format!("httpflv bind {} failed: {}", addr, e))?; - log_info!("[httpflv] listening on {}", addr); - loop { match listener.accept().await { Ok((tcp_stream, _addr)) => { diff --git a/src/protocol/httpflv_server_plugin.rs b/src/protocol/httpflv_server_plugin.rs new file mode 100644 index 0000000..ebe3c49 --- /dev/null +++ b/src/protocol/httpflv_server_plugin.rs @@ -0,0 +1,175 @@ +//! HTTP-FLV 协议插件 +//! +//! 实现 `Plugin` + `ProtocolPlugin` trait, +//! 通过 `PluginRegistry` 统一管理 HTTP-FLV 服务器的生命周期。 +//! +//! 职责: +//! - `init()` 阶段从 ConfigProvider 读取 httpflv.port / httpflv.listen_addr +//! - `start()` 阶段创建 HttpFlvServer 并 spawn 异步监听任务 +//! - `stop()` 阶段标记停止 + +use std::any::Any; +use std::sync::Arc; + +use crate::log_error; +use crate::log_info; +use crate::sdk::context::EngineContext; +use crate::sdk::plugin::{Plugin, PluginMeta, PluginState, ProtocolPlugin}; +use crate::sdk::traits::ConfigProvider; + +use super::httpflv_server::{HttpFlvServer, HttpFlvServerConfig}; + +/// HTTP-FLV 协议插件 +/// +/// 包装 `HttpFlvServer`,提供标准化的插件生命周期管理。 +/// HTTP-FLV 仅支持拉流(sub only),同时承载 /api/stats 和 /api/streams HTTP API。 +pub struct HttpFlvPlugin { + /// 插件元数据(名称、版本、描述) + meta: PluginMeta, + /// 当前插件状态 + state: PluginState, + /// 监听配置(从 ConfigProvider 读取) + config: HttpFlvServerConfig, +} + +impl HttpFlvPlugin { + /// 创建 HTTP-FLV 插件(使用默认配置) + /// + /// 默认监听 `0.0.0.0:8080`,在 `init()` 阶段会被配置文件中的值覆盖 + pub fn new() -> Self { + Self { + meta: PluginMeta::new("httpflv", "0.1.0", "HTTP-FLV 拉流协议"), + state: PluginState::Created, + config: HttpFlvServerConfig::default(), + } + } +} + +impl Plugin for HttpFlvPlugin { + fn meta(&self) -> &PluginMeta { + &self.meta + } + + /// 初始化:从 ConfigProvider 读取 HTTP-FLV 配置 + /// + /// 读取配置项: + /// - `httpflv.port` — 监听端口(默认 8080) + /// - `httpflv.listen_addr` — 监听地址(默认 "0.0.0.0") + fn init( + &mut self, + _context: Arc, + config: &dyn ConfigProvider, + ) -> Result<(), String> { + let port: u16 = config + .get("httpflv.port") + .and_then(|v| v.parse().ok()) + .unwrap_or(8080); + let addr = config + .get("httpflv.listen_addr") + .unwrap_or_else(|| "0.0.0.0".to_string()); + self.config = HttpFlvServerConfig { + listen_addr: addr, + port, + }; + self.state = PluginState::Initialized; + Ok(()) + } + + /// 启动:创建 HttpFlvServer 并 spawn 异步监听任务 + /// + /// listen() 是 async 方法,通过 tokio::spawn 在后台运行, + /// start() 本身立即返回,不阻塞调用者 + fn start(&mut self, context: Arc) -> Result<(), String> { + let server = HttpFlvServer::new(self.config.clone(), context); + let port = self.config.port; + let addr = self.config.listen_addr.clone(); + // 异步启动:spawn listen 循环,start() 立即返回 + tokio::spawn(async move { + if let Err(e) = server.listen().await { + log_error!("[httpflv] listen error: {}", e); + } + }); + log_info!("[httpflv] listening on {}:{}", addr, port); + log_info!("[httpflv] watch: ffplay http://localhost:{}/live/test.flv", port); + self.state = PluginState::Running; + Ok(()) + } + + /// 停止:标记插件为已停止状态 + fn stop(&mut self) -> Result<(), String> { + self.state = PluginState::Stopped; + log_info!("[httpflv] stopped"); + Ok(()) + } + + fn state(&self) -> PluginState { + self.state + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } +} + +impl ProtocolPlugin for HttpFlvPlugin { + /// 协议名称 + fn protocol_name(&self) -> &str { + "httpflv" + } + + /// 默认端口 + fn default_port(&self) -> u16 { + 8080 + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::core::group::StreamManager; + + /// 测试用的空配置 + struct DummyConfig; + + impl ConfigProvider for DummyConfig { + fn get(&self, _key: &str) -> Option { + None + } + fn get_section(&self, _section: &str) -> Vec<(String, String)> { + Vec::new() + } + } + + /// 测试:插件创建后状态为 Created + #[test] + fn test_httpflv_plugin_new_state_is_created() { + let plugin = HttpFlvPlugin::new(); + assert_eq!(plugin.state(), PluginState::Created); + assert_eq!(plugin.protocol_name(), "httpflv"); + assert_eq!(plugin.default_port(), 8080); + } + + /// 测试:init 后状态为 Initialized,使用默认配置 + #[test] + fn test_httpflv_plugin_init_uses_default_config() { + let mut plugin = HttpFlvPlugin::new(); + let ctx = Arc::new(EngineContext::new(Arc::new(StreamManager::new()))); + let config = DummyConfig; + assert!(plugin.init(ctx, &config).is_ok()); + assert_eq!(plugin.state(), PluginState::Initialized); + assert_eq!(plugin.config.port, 8080); + assert_eq!(plugin.config.listen_addr, "0.0.0.0"); + } + + /// 测试:stop 后状态为 Stopped + #[test] + fn test_httpflv_plugin_stop_changes_state() { + let mut plugin = HttpFlvPlugin::new(); + assert!(plugin.stop().is_ok()); + assert_eq!(plugin.state(), PluginState::Stopped); + } +} diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index 1e2a3f3..e73e6de 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -1,18 +1,51 @@ //! 协议处理器模块 //! //! 实现各流媒体协议的握手、解析、会话管理: -//! - `rtmp/` — RTMP 协议:握手、chunk 协议、AMF0 编解码、会话状态机 -//! - `rtsp/` — RTSP/RTP/RTCP 协议:信令交互、RTP 传输、SDP 协商 -//! - `httpflv` — HTTP-FLV 拉流协议:HTTP 请求解析、FLV 流式输出 -//! - `httpflv_server` — HTTP-FLV TCP 服务器:订阅流并转发 FLV 数据 -//! - `wsflv` — WebSocket-FLV 拉流协议:WS 握手、二进制帧传输 -//! - `hls` — HLS 协议:M3U8 播放列表生成、TS 分片管理 +//! - `rtmp/` — RTMP 协议:握手、chunk 协议、AMF0 编解码、会话状态机 +//! - `rtsp/` — RTSP/RTP/RTCP 协议:信令交互、RTP 传输、SDP 协商 +//! - `httpflv` — HTTP-FLV 拉流协议:HTTP 请求解析、FLV 流式输出 +//! - `httpflv_server` — HTTP-FLV TCP 服务器:订阅流并转发 FLV 数据 +//! - `httpflv_server_plugin` — HTTP-FLV 协议插件(Plugin + ProtocolPlugin 实现) +//! - `wsflv` — WebSocket-FLV 拉流协议:WS 握手、二进制帧传输 +//! - `hls` — HLS 协议:M3U8 播放列表生成、TS 分片管理 //! //! 所有协议处理器只依赖 `sdk/` trait 和 `codec/` 类型,不直接引用 `core/` +//! +//! ## 内置插件自动注册 +//! +//! 本模块通过 [`all_plugins`] 函数提供所有内置协议插件的一次性注册能力。 +//! 新增内置协议时,只需: +//! 1. 在对应协议目录下实现 `Plugin` + `ProtocolPlugin` trait +//! 2. 在此文件的 `all_plugins()` 中添加一行 `Box::new(XxxPlugin::new())` +//! +//! 外部插件仍需通过 `registry.register()` 手动注册。 + +use crate::sdk::plugin::Plugin; pub mod rtmp; pub mod rtsp; pub mod httpflv; pub mod httpflv_server; +pub mod httpflv_server_plugin; pub mod wsflv; pub mod hls; + +/// 返回所有内置协议插件实例 +/// +/// 引擎启动时调用此函数获取内置插件列表并批量注册到 PluginRegistry。 +/// 新增内置协议时在此函数中添加一行即可。 +/// +/// # 示例 +/// +/// ```ignore +/// for plugin in protocol::all_plugins() { +/// registry.register(plugin); +/// } +/// ``` +pub fn all_plugins() -> Vec> { + vec![ + Box::new(rtmp::plugin::RtmpPlugin::new()), + Box::new(rtsp::plugin::RtspPlugin::new()), + Box::new(httpflv_server_plugin::HttpFlvPlugin::new()), + ] +} diff --git a/src/protocol/rtmp/mod.rs b/src/protocol/rtmp/mod.rs index b51666a..59b6c9c 100644 --- a/src/protocol/rtmp/mod.rs +++ b/src/protocol/rtmp/mod.rs @@ -7,6 +7,7 @@ //! - `message` — RTMP 消息类型定义(connect/publish/play/onMetaData 等) //! - `session` — RTMP 会话状态机(握手→连接→推流/拉流) //! - `server` — RTMP TCP 监听器,接收连接并分发会话 +//! - `plugin` — RTMP 协议插件(Plugin + ProtocolPlugin 实现) pub mod amf0; pub mod handshake; @@ -14,3 +15,4 @@ pub mod chunk; pub mod message; pub mod session; pub mod server; +pub mod plugin; diff --git a/src/protocol/rtmp/plugin.rs b/src/protocol/rtmp/plugin.rs new file mode 100644 index 0000000..fce898f --- /dev/null +++ b/src/protocol/rtmp/plugin.rs @@ -0,0 +1,176 @@ +//! RTMP 协议插件 +//! +//! 实现 `Plugin` + `ProtocolPlugin` trait, +//! 通过 `PluginRegistry` 统一管理 RTMP 服务器的生命周期。 +//! +//! 职责: +//! - `init()` 阶段从 ConfigProvider 读取 rtmp.port / rtmp.listen_addr +//! - `start()` 阶段创建 RtmpServer 并 spawn 异步监听任务 +//! - `stop()` 阶段标记停止(实际 TCP 监听随 tokio 任务结束而关闭) + +use std::any::Any; +use std::sync::Arc; + +use crate::log_error; +use crate::log_info; +use crate::sdk::context::EngineContext; +use crate::sdk::plugin::{Plugin, PluginMeta, PluginState, ProtocolPlugin}; +use crate::sdk::traits::ConfigProvider; + +use super::server::{RtmpServer, RtmpServerConfig}; + +/// RTMP 协议插件 +/// +/// 包装 `RtmpServer`,提供标准化的插件生命周期管理。 +/// 添加新协议只需实现类似的插件包装器并注册到 PluginRegistry。 +pub struct RtmpPlugin { + /// 插件元数据(名称、版本、描述) + meta: PluginMeta, + /// 当前插件状态 + state: PluginState, + /// 监听配置(从 ConfigProvider 读取) + config: RtmpServerConfig, +} + +impl RtmpPlugin { + /// 创建 RTMP 插件(使用默认配置) + /// + /// 默认监听 `0.0.0.0:1935`,在 `init()` 阶段会被配置文件中的值覆盖 + pub fn new() -> Self { + Self { + meta: PluginMeta::new("rtmp", "0.1.0", "RTMP 推拉流协议"), + state: PluginState::Created, + config: RtmpServerConfig::default(), + } + } +} + +impl Plugin for RtmpPlugin { + fn meta(&self) -> &PluginMeta { + &self.meta + } + + /// 初始化:从 ConfigProvider 读取 RTMP 配置 + /// + /// 读取配置项: + /// - `rtmp.port` — 监听端口(默认 1935) + /// - `rtmp.listen_addr` — 监听地址(默认 "0.0.0.0") + fn init( + &mut self, + _context: Arc, + config: &dyn ConfigProvider, + ) -> Result<(), String> { + let port: u16 = config + .get("rtmp.port") + .and_then(|v| v.parse().ok()) + .unwrap_or(1935); + let addr = config + .get("rtmp.listen_addr") + .unwrap_or_else(|| "0.0.0.0".to_string()); + self.config = RtmpServerConfig { + listen_addr: addr, + port, + }; + self.state = PluginState::Initialized; + Ok(()) + } + + /// 启动:创建 RtmpServer 并 spawn 异步监听任务 + /// + /// listen() 是 async 方法,通过 tokio::spawn 在后台运行, + /// start() 本身立即返回,不阻塞调用者 + fn start(&mut self, context: Arc) -> Result<(), String> { + let server = RtmpServer::new(self.config.clone(), context); + let port = self.config.port; + let addr = self.config.listen_addr.clone(); + // 异步启动:spawn listen 循环,start() 立即返回 + tokio::spawn(async move { + if let Err(e) = server.listen().await { + log_error!("[rtmp] listen error: {}", e); + } + }); + log_info!("[rtmp] listening on {}:{}", addr, port); + log_info!("[rtmp] push: ffmpeg -re -i video.mp4 -c copy -f flv rtmp://localhost:{}/live/test", port); + log_info!("[rtmp] watch: ffplay rtmp://localhost:{}/live/test", port); + self.state = PluginState::Running; + Ok(()) + } + + /// 停止:标记插件为已停止状态 + fn stop(&mut self) -> Result<(), String> { + self.state = PluginState::Stopped; + log_info!("[rtmp] stopped"); + Ok(()) + } + + fn state(&self) -> PluginState { + self.state + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } +} + +impl ProtocolPlugin for RtmpPlugin { + /// 协议名称 + fn protocol_name(&self) -> &str { + "rtmp" + } + + /// 默认端口 + fn default_port(&self) -> u16 { + 1935 + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::core::group::StreamManager; + + /// 测试用的空配置 + struct DummyConfig; + + impl ConfigProvider for DummyConfig { + fn get(&self, _key: &str) -> Option { + None + } + fn get_section(&self, _section: &str) -> Vec<(String, String)> { + Vec::new() + } + } + + /// 测试:插件创建后状态为 Created + #[test] + fn test_rtmp_plugin_new_state_is_created() { + let plugin = RtmpPlugin::new(); + assert_eq!(plugin.state(), PluginState::Created); + assert_eq!(plugin.protocol_name(), "rtmp"); + assert_eq!(plugin.default_port(), 1935); + } + + /// 测试:init 后状态为 Initialized,使用默认配置 + #[test] + fn test_rtmp_plugin_init_uses_default_config() { + let mut plugin = RtmpPlugin::new(); + let ctx = Arc::new(EngineContext::new(Arc::new(StreamManager::new()))); + let config = DummyConfig; + assert!(plugin.init(ctx, &config).is_ok()); + assert_eq!(plugin.state(), PluginState::Initialized); + assert_eq!(plugin.config.port, 1935); + assert_eq!(plugin.config.listen_addr, "0.0.0.0"); + } + + /// 测试:stop 后状态为 Stopped + #[test] + fn test_rtmp_plugin_stop_changes_state() { + let mut plugin = RtmpPlugin::new(); + assert!(plugin.stop().is_ok()); + assert_eq!(plugin.state(), PluginState::Stopped); + } +} diff --git a/src/protocol/rtmp/server.rs b/src/protocol/rtmp/server.rs index 01994b0..684e937 100644 --- a/src/protocol/rtmp/server.rs +++ b/src/protocol/rtmp/server.rs @@ -14,7 +14,6 @@ use tokio::net::TcpListener; use crate::log_error; use crate::log_info; -use crate::log_warn; use crate::protocol::rtmp::chunk::{ self, ChunkFmt, RtmpMessageHeader, message_to_chunks, msg_type_id, @@ -66,8 +65,6 @@ impl RtmpServer { .await .map_err(|e| format!("failed to bind {}: {}", addr, e))?; - log_info!("[rtmp] listening on {}", addr); - loop { match listener.accept().await { Ok((tcp_stream, _addr)) => { diff --git a/src/protocol/rtsp/depacketizer.rs b/src/protocol/rtsp/depacketizer.rs index 6d8cf20..a9907f6 100644 --- a/src/protocol/rtsp/depacketizer.rs +++ b/src/protocol/rtsp/depacketizer.rs @@ -84,8 +84,6 @@ pub struct RtpDepacketizer { /// RTP 时间戳从随机值开始,转换为毫秒后可能超过 RTMP 扩展时间戳阈值 (0xFFFFFF), /// 导致 ffmpeg 解析失败。记录首个帧的 RTP 时间戳,后续帧减去此基准值。 video_ts_base: Option, - /// 音频时间戳归一化基准 - audio_ts_base: Option, } impl RtpDepacketizer { @@ -100,7 +98,6 @@ impl RtpDepacketizer { pending_nalus: Vec::new(), pending_timestamp: 0, video_ts_base: None, - audio_ts_base: None, } } @@ -419,14 +416,6 @@ impl RtpDepacketizer { // 使用 wrapping_sub 处理 RTP 时间戳回绕(32-bit 溢出) rtp_ts.wrapping_sub(self.video_ts_base.unwrap()) } - - /// 归一化音频 RTP 时间戳 - fn normalize_audio_ts(&mut self, rtp_ts: u32) -> u32 { - if self.audio_ts_base.is_none() { - self.audio_ts_base = Some(rtp_ts); - } - rtp_ts.wrapping_sub(self.audio_ts_base.unwrap()) - } } // ── RTP 音频解包器 ───────────────────────────────────────── diff --git a/src/protocol/rtsp/mod.rs b/src/protocol/rtsp/mod.rs index e47c232..02dda80 100644 --- a/src/protocol/rtsp/mod.rs +++ b/src/protocol/rtsp/mod.rs @@ -8,6 +8,7 @@ //! - `rtcp` — RTCP 报文(Sender Report) //! - `session` — RTSP 会话状态机 //! - `server` — RTSP TCP 监听器 +//! - `plugin` — RTSP 协议插件(Plugin + ProtocolPlugin 实现) pub mod request; pub mod response; @@ -17,3 +18,4 @@ pub mod rtcp; pub mod session; pub mod server; pub mod depacketizer; +pub mod plugin; diff --git a/src/protocol/rtsp/plugin.rs b/src/protocol/rtsp/plugin.rs new file mode 100644 index 0000000..7475743 --- /dev/null +++ b/src/protocol/rtsp/plugin.rs @@ -0,0 +1,177 @@ +//! RTSP 协议插件 +//! +//! 实现 `Plugin` + `ProtocolPlugin` trait, +//! 通过 `PluginRegistry` 统一管理 RTSP 服务器的生命周期。 +//! +//! 职责: +//! - `init()` 阶段从 ConfigProvider 读取 rtsp.port / rtsp.listen_addr +//! - `start()` 阶段创建 RtspServer 并 spawn 异步监听任务 +//! - `stop()` 阶段标记停止 + +use std::any::Any; +use std::sync::Arc; + +use crate::log_error; +use crate::log_info; +use crate::sdk::context::EngineContext; +use crate::sdk::plugin::{Plugin, PluginMeta, PluginState, ProtocolPlugin}; +use crate::sdk::traits::ConfigProvider; + +use super::server::{RtspServer, RtspServerConfig}; + +/// RTSP 协议插件 +/// +/// 包装 `RtspServer`,提供标准化的插件生命周期管理。 +/// RTSP 同时支持推流(ANNOUNCE/RECORD)和拉流(DESCRIBE/PLAY), +/// 传输层支持 TCP interleaved 和 UDP 两种模式。 +pub struct RtspPlugin { + /// 插件元数据(名称、版本、描述) + meta: PluginMeta, + /// 当前插件状态 + state: PluginState, + /// 监听配置(从 ConfigProvider 读取) + config: RtspServerConfig, +} + +impl RtspPlugin { + /// 创建 RTSP 插件(使用默认配置) + /// + /// 默认监听 `0.0.0.0:5544`,在 `init()` 阶段会被配置文件中的值覆盖 + pub fn new() -> Self { + Self { + meta: PluginMeta::new("rtsp", "0.1.0", "RTSP/RTP 推拉流协议"), + state: PluginState::Created, + config: RtspServerConfig::default(), + } + } +} + +impl Plugin for RtspPlugin { + fn meta(&self) -> &PluginMeta { + &self.meta + } + + /// 初始化:从 ConfigProvider 读取 RTSP 配置 + /// + /// 读取配置项: + /// - `rtsp.port` — 监听端口(默认 5544) + /// - `rtsp.listen_addr` — 监听地址(默认 "0.0.0.0") + fn init( + &mut self, + _context: Arc, + config: &dyn ConfigProvider, + ) -> Result<(), String> { + let port: u16 = config + .get("rtsp.port") + .and_then(|v| v.parse().ok()) + .unwrap_or(5544); + let addr = config + .get("rtsp.listen_addr") + .unwrap_or_else(|| "0.0.0.0".to_string()); + self.config = RtspServerConfig { + listen_addr: addr, + port, + }; + self.state = PluginState::Initialized; + Ok(()) + } + + /// 启动:创建 RtspServer 并 spawn 异步监听任务 + /// + /// listen() 是 async 方法,通过 tokio::spawn 在后台运行, + /// start() 本身立即返回,不阻塞调用者 + fn start(&mut self, context: Arc) -> Result<(), String> { + let server = RtspServer::new(self.config.clone(), context); + let port = self.config.port; + let addr = self.config.listen_addr.clone(); + // 异步启动:spawn listen 循环,start() 立即返回 + tokio::spawn(async move { + if let Err(e) = server.listen().await { + log_error!("[rtsp] listen error: {}", e); + } + }); + log_info!("[rtsp] listening on {}:{}", addr, port); + log_info!("[rtsp] push: ffmpeg -re -i video.mp4 -c copy -f rtsp rtsp://localhost:{}/live/test", port); + log_info!("[rtsp] watch: ffplay rtsp://localhost:{}/live/test", port); + self.state = PluginState::Running; + Ok(()) + } + + /// 停止:标记插件为已停止状态 + fn stop(&mut self) -> Result<(), String> { + self.state = PluginState::Stopped; + log_info!("[rtsp] stopped"); + Ok(()) + } + + fn state(&self) -> PluginState { + self.state + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } +} + +impl ProtocolPlugin for RtspPlugin { + /// 协议名称 + fn protocol_name(&self) -> &str { + "rtsp" + } + + /// 默认端口 + fn default_port(&self) -> u16 { + 5544 + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::core::group::StreamManager; + + /// 测试用的空配置 + struct DummyConfig; + + impl ConfigProvider for DummyConfig { + fn get(&self, _key: &str) -> Option { + None + } + fn get_section(&self, _section: &str) -> Vec<(String, String)> { + Vec::new() + } + } + + /// 测试:插件创建后状态为 Created + #[test] + fn test_rtsp_plugin_new_state_is_created() { + let plugin = RtspPlugin::new(); + assert_eq!(plugin.state(), PluginState::Created); + assert_eq!(plugin.protocol_name(), "rtsp"); + assert_eq!(plugin.default_port(), 5544); + } + + /// 测试:init 后状态为 Initialized,使用默认配置 + #[test] + fn test_rtsp_plugin_init_uses_default_config() { + let mut plugin = RtspPlugin::new(); + let ctx = Arc::new(EngineContext::new(Arc::new(StreamManager::new()))); + let config = DummyConfig; + assert!(plugin.init(ctx, &config).is_ok()); + assert_eq!(plugin.state(), PluginState::Initialized); + assert_eq!(plugin.config.port, 5544); + assert_eq!(plugin.config.listen_addr, "0.0.0.0"); + } + + /// 测试:stop 后状态为 Stopped + #[test] + fn test_rtsp_plugin_stop_changes_state() { + let mut plugin = RtspPlugin::new(); + assert!(plugin.stop().is_ok()); + assert_eq!(plugin.state(), PluginState::Stopped); + } +} diff --git a/src/protocol/rtsp/rtp.rs b/src/protocol/rtsp/rtp.rs index e9340e3..daef62a 100644 --- a/src/protocol/rtsp/rtp.rs +++ b/src/protocol/rtsp/rtp.rs @@ -221,6 +221,14 @@ impl H264RtpPacketizer { } } + /// 获取当前序列号(下一个将要使用的序列号) + /// + /// 用于 RTSP PLAY 响应的 RTP-Info 头中, + /// 返回初始序列号(首个 RTP 包的 seq)。 + pub fn initial_seq(&self) -> u16 { + self.seq + } + /// 将 H.264 NALU 打包为一个或多个 RTP 包 /// /// 输入 NALU 不含起始码(AnnexB start code 已去除)。 @@ -333,6 +341,14 @@ impl AacRtpPacketizer { Self { seq: 0, ssrc, pt } } + /// 获取当前序列号(下一个将要使用的序列号) + /// + /// 用于 RTSP PLAY 响应的 RTP-Info 头中, + /// 返回初始序列号(首个 RTP 包的 seq)。 + pub fn initial_seq(&self) -> u16 { + self.seq + } + /// 将 AAC 帧打包为 RTP 包 /// /// AAC-hbr 模式负载格式: @@ -637,4 +653,27 @@ mod tests { assert_eq!(pkt2.sequence, 1); assert_eq!(pkt3.sequence, 2, "序列号应递增"); } + + /// 测试:H.264 打包器 initial_seq 返回当前序列号 + #[test] + fn test_h264_packetizer_initial_seq_returns_current_seq() { + let mut packetizer = H264RtpPacketizer::new(0x11111111, 96); + assert_eq!(packetizer.initial_seq(), 0, "初始序列号应为 0"); + + // 打包一帧后序列号应递增 + let nalu = vec![0x65, 0x01, 0x02]; + packetizer.packetize(&nalu, 90000, true); + assert_eq!(packetizer.initial_seq(), 1, "打包一帧后 initial_seq 应为 1"); + } + + /// 测试:AAC 打包器 initial_seq 返回当前序列号 + #[test] + fn test_aac_packetizer_initial_seq_returns_current_seq() { + let mut packetizer = AacRtpPacketizer::new(0x77777777, 97); + assert_eq!(packetizer.initial_seq(), 0, "初始序列号应为 0"); + + let aac_data = vec![0x01, 0x02]; + packetizer.packetize(&aac_data, 100); + assert_eq!(packetizer.initial_seq(), 1, "打包一帧后 initial_seq 应为 1"); + } } diff --git a/src/protocol/rtsp/sdp.rs b/src/protocol/rtsp/sdp.rs index bc88a01..716abcc 100644 --- a/src/protocol/rtsp/sdp.rs +++ b/src/protocol/rtsp/sdp.rs @@ -56,6 +56,8 @@ pub struct SdpInfo { pub conn_addr: String, /// 媒体描述列表 pub medias: Vec, + /// 是否为 recvonly 会话(拉流会话标记为 recvonly) + pub recvonly: bool, } impl SdpInfo { @@ -80,6 +82,7 @@ impl SdpInfo { stream_name: stream_name.to_string(), conn_addr: addr.to_string(), medias: Vec::new(), + recvonly: false, } } @@ -88,23 +91,52 @@ impl SdpInfo { /// # 参数 /// - `control`: 控制轨道 ID(如 "track1") /// - `sprop_parameter_sets`: SPS+PPS 的 Base64 编码(可选) - pub fn add_h264_video(&mut self, control: &str, sprop_parameter_sets: Option<&str>) { - // 构建 fmtp 参数:包含 profile-level-id、sprop-parameter-sets、packetization-mode - let fmtp = sprop_parameter_sets.map(|sps| { - format!( - "profile-level-id=640029;sprop-parameter-sets={};packetization-mode=1", - sps - ) - }); - self.medias.push(SdpMedia { - media_type: "video".to_string(), - payload_type: 96, // H.264 动态 payload type - codec_name: "H264".to_string(), - clock_rate: 90000, // 视频固定 90kHz - channels: 0, // 视频不使用通道数 - fmtp, - control: control.to_string(), - }); + /// - `profile_level_id`: profile-level-id 十六进制字符串(如 "640029"),可选 + pub fn add_h264_video(&mut self, control: &str, sprop_parameter_sets: Option<&str>, profile_level_id: Option<&str>) { + // 构建 fmtp 参数 + // 如果有 sprop_parameter_sets,需要完整的 fmtp;否则可省略 + if let Some(sprop) = sprop_parameter_sets { + let pli = profile_level_id.unwrap_or("640029"); + let fmtp = format!( + "profile-level-id={};sprop-parameter-sets={};packetization-mode=1", + pli, sprop + ); + self.medias.push(SdpMedia { + media_type: "video".to_string(), + payload_type: 96, + codec_name: "H264".to_string(), + clock_rate: 90000, + channels: 0, + fmtp: Some(fmtp), + control: control.to_string(), + }); + } else if let Some(pli) = profile_level_id { + // 有 profile-level-id 但无 sprop + let fmtp = format!( + "profile-level-id={};packetization-mode=1", + pli + ); + self.medias.push(SdpMedia { + media_type: "video".to_string(), + payload_type: 96, + codec_name: "H264".to_string(), + clock_rate: 90000, + channels: 0, + fmtp: Some(fmtp), + control: control.to_string(), + }); + } else { + // 无任何参数,使用默认 fmtp + self.medias.push(SdpMedia { + media_type: "video".to_string(), + payload_type: 96, + codec_name: "H264".to_string(), + clock_rate: 90000, + channels: 0, + fmtp: None, + control: control.to_string(), + }); + } } /// 添加 H.265/HEVC 视频媒体描述 @@ -129,11 +161,18 @@ impl SdpInfo { /// - `control`: 控制轨道 ID(如 "track2") /// - `sample_rate`: 采样率(如 44100、48000、8000) /// - `channels`: 通道数(如 2 表示立体声) - pub fn add_aac_audio(&mut self, control: &str, sample_rate: u32, channels: u8) { + /// - `config_hex`: AAC AudioSpecificConfig 的十六进制编码字符串(可选) + pub fn add_aac_audio(&mut self, control: &str, sample_rate: u32, channels: u8, config_hex: Option<&str>) { // AAC-hbr 模式的 fmtp 参数 - let fmtp = format!( - "streamtype=5; profile-level-id=1; mode=AAC-hbr; sizelength=13; indexlength=3; indexdeltalength=3; config=" - ); + let fmtp = if let Some(hex) = config_hex { + format!( + "streamtype=5; profile-level-id=1; mode=AAC-hbr; sizelength=13; indexlength=3; indexdeltalength=3; config={}", + hex + ) + } else { + // 无 config 时仍生成有效 fmtp(某些客户端支持带内 config) + "streamtype=5; profile-level-id=1; mode=AAC-hbr; sizelength=13; indexlength=3; indexdeltalength=3".to_string() + }; self.medias.push(SdpMedia { media_type: "audio".to_string(), payload_type: 97, // AAC 动态 payload type @@ -169,6 +208,11 @@ impl SdpInfo { // 时间描述(0 0 表示永久会话) s.push_str("t=0 0\r\n"); + // 拉流会话标记为 recvonly + if self.recvonly { + s.push_str("a=recvonly\r\n"); + } + // 逐个输出媒体描述 for media in &self.medias { // m= 行:端口 0 表示服务器选择(interleaved 模式下不使用 UDP 端口) @@ -306,7 +350,7 @@ mod tests { #[test] fn test_sdp_generate_h264_video_only_succeeds() { let mut sdp = SdpInfo::new("live/test", "192.168.1.100"); - sdp.add_h264_video("track1", Some("Z0LAHtkA2Q=,aMsg")); + sdp.add_h264_video("track1", Some("Z0LAHtkA2Q=,aMsg"), None); let text = sdp.to_sdp(); @@ -340,21 +384,21 @@ mod tests { #[test] fn test_sdp_generate_h264_aac_succeeds() { let mut sdp = SdpInfo::new("live/test", "10.0.0.1"); - sdp.add_h264_video("track1", None); - sdp.add_aac_audio("track2", 44100, 2); + sdp.add_h264_video("track1", None, None); + sdp.add_aac_audio("track2", 44100, 2, None); let text = sdp.to_sdp(); // 验证视频部分 assert!(text.contains("m=video 0 RTP/AVP 96\r\n")); assert!(text.contains("a=rtpmap:96 H264/90000\r\n")); - // 没有 fmtp(sprop_parameter_sets 为 None) + // 没有 fmtp(sprop_parameter_sets 为 None 且 profile_level_id 也为 None) let lines: Vec<&str> = text.lines().collect(); let video_fmtp_count = lines .iter() .filter(|l| l.starts_with("a=fmtp:96")) .count(); - assert_eq!(video_fmtp_count, 0, "H.264 无 sprop 参数时不应有 fmtp 行"); + assert_eq!(video_fmtp_count, 0, "H.264 无参数时不应有 fmtp 行"); // 验证音频部分 assert!(text.contains("m=audio 0 RTP/AVP 97\r\n")); @@ -431,8 +475,8 @@ a=control:track2\r\n"; let mut original = SdpInfo::new("live/roundtrip", "10.0.0.1"); original.session_id = 111; original.session_version = 222; - original.add_h264_video("track1", None); - original.add_aac_audio("track2", 48000, 2); + original.add_h264_video("track1", None, None); + original.add_aac_audio("track2", 48000, 2, None); // 生成 SDP 文本 let sdp_text = original.to_sdp(); @@ -472,7 +516,7 @@ a=control:track2\r\n"; #[test] fn test_sdp_rtpmap_format_succeeds() { let mut sdp = SdpInfo::new("test", "127.0.0.1"); - sdp.add_h264_video("track1", None); + sdp.add_h264_video("track1", None, None); let text = sdp.to_sdp(); // 视频 rtpmap 不应包含通道数 @@ -482,7 +526,7 @@ a=control:track2\r\n"; ); // 添加音频后验证通道数 - sdp.add_aac_audio("track2", 44100, 2); + sdp.add_aac_audio("track2", 44100, 2, None); let text = sdp.to_sdp(); assert!( text.contains("a=rtpmap:97 mpeg4-generic/44100/2\r\n"), @@ -547,4 +591,58 @@ a=control:track1\r\n"; "session_id 应为当前 Unix 时间戳" ); } + + /// 测试:add_aac_audio 带 config hex 参数生成正确的 fmtp + #[test] + fn test_sdp_aac_audio_with_config_hex_succeeds() { + let mut sdp = SdpInfo::new("test", "127.0.0.1"); + sdp.add_aac_audio("track2", 48000, 2, Some("1190")); + + let text = sdp.to_sdp(); + assert!(text.contains("config=1190"), "应包含 config 参数值"); + assert!(text.contains("a=rtpmap:97 mpeg4-generic/48000/2"), "采样率应为 48000"); + } + + /// 测试:add_h264_video 带 profile-level-id 参数生成正确的 fmtp + #[test] + fn test_sdp_h264_video_with_profile_level_id_succeeds() { + let mut sdp = SdpInfo::new("test", "127.0.0.1"); + sdp.add_h264_video("track1", Some("Z0LAHtkA2Q=,aMsg"), Some("640029")); + + let text = sdp.to_sdp(); + assert!(text.contains("profile-level-id=640029"), "应包含指定的 profile-level-id"); + assert!(text.contains("sprop-parameter-sets=Z0LAHtkA2Q=,aMsg"), "应包含 sprop"); + } + + /// 测试:recvonly 标记生成 a=recvonly 行 + #[test] + fn test_sdp_recvonly_flag_generates_attribute() { + let mut sdp = SdpInfo::new("test", "127.0.0.1"); + sdp.recvonly = true; + sdp.add_h264_video("track1", None, None); + + let text = sdp.to_sdp(); + assert!(text.contains("a=recvonly\r\n"), "recvonly 标记应生成 a=recvonly 行"); + } + + /// 测试:recvonly 默认为 false 时不生成 a=recvonly 行 + #[test] + fn test_sdp_no_recvonly_by_default() { + let mut sdp = SdpInfo::new("test", "127.0.0.1"); + sdp.add_h264_video("track1", None, None); + + let text = sdp.to_sdp(); + assert!(!text.contains("a=recvonly"), "默认不应包含 recvonly"); + } + + /// 测试:add_aac_audio 无 config hex 时 fmtp 不含 config= + #[test] + fn test_sdp_aac_audio_without_config_hex_succeeds() { + let mut sdp = SdpInfo::new("test", "127.0.0.1"); + sdp.add_aac_audio("track2", 44100, 2, None); + + let text = sdp.to_sdp(); + assert!(!text.contains("config="), "无 config 时不包含 config= 参数"); + assert!(text.contains("mode=AAC-hbr"), "仍应有 AAC-hbr 模式"); + } } diff --git a/src/protocol/rtsp/server.rs b/src/protocol/rtsp/server.rs index e7bd26b..1d5ee3f 100644 --- a/src/protocol/rtsp/server.rs +++ b/src/protocol/rtsp/server.rs @@ -24,10 +24,12 @@ use crate::stats::ServerStats; use super::request::RtspRequest; use super::session::{RtspSession, TransportMode}; +use super::rtcp::SenderReport; /// RTSP 服务器配置 /// /// 控制监听地址和端口 +#[derive(Debug, Clone)] pub struct RtspServerConfig { /// 监听地址(如 "0.0.0.0") pub listen_addr: String, @@ -75,8 +77,6 @@ impl RtspServer { .await .map_err(|e| format!("RTSP bind {} failed: {}", addr, e))?; - log_info!("[rtsp] listening on {}", addr); - loop { match listener.accept().await { Ok((tcp_stream, _addr)) => { @@ -129,6 +129,14 @@ async fn handle_rtsp_client(mut stream: tokio::net::TcpStream, context: Arc = None; // UDP 读取缓冲区 let mut udp_buf = [0u8; 65536]; + // RTCP 发送间隔追踪(每 5 秒发送一次 Sender Report) + let mut last_rtcp_time = tokio::time::Instant::now(); + // RTCP 发送统计(视频) + let mut video_rtcp_pkt_count: u32 = 0; + let mut video_rtcp_octet_count: u32 = 0; + // RTCP 发送统计(音频) + let mut audio_rtcp_pkt_count: u32 = 0; + let mut audio_rtcp_octet_count: u32 = 0; loop { // === Playing 模式:从订阅者队列读取帧并发送 RTP === @@ -146,6 +154,14 @@ async fn handle_rtsp_client(mut stream: tokio::net::TcpStream, context: Arc= Duration::from_secs(5) { + match session.transport_mode() { + TransportMode::TcpInterleaved => { + // 发送视频 RTCP SR + if session.h264_packetizer().is_some() { + let rtp_ts = (video_rtcp_pkt_count as u64 * 90 * 33) as u32; // 近似时间戳 + let mut sr = SenderReport::with_timestamp(session.video_ssrc(), rtp_ts); + sr.sender_packet_count = video_rtcp_pkt_count; + sr.sender_octet_count = video_rtcp_octet_count; + let data = sr.encode_interleaved(session.video_rtcp_channel()); + if stream.write_all(&data).await.is_err() { + session.cleanup(&sm); + return; + } + } + // 发送音频 RTCP SR + if session.aac_packetizer().is_some() { + let audio_clock = session.audio_sample_rate(); + let rtp_ts = (audio_rtcp_pkt_count as u64 * 1024 * audio_clock as u64 / 44100) as u32; + let mut sr = SenderReport::with_timestamp(session.audio_ssrc(), rtp_ts); + sr.sender_packet_count = audio_rtcp_pkt_count; + sr.sender_octet_count = audio_rtcp_octet_count; + let data = sr.encode_interleaved(session.audio_rtcp_channel()); + if stream.write_all(&data).await.is_err() { + session.cleanup(&sm); + return; + } + } + } + TransportMode::Udp => { + // UDP 模式下发送 RTCP SR 到客户端的 RTCP 端口 + // 视频SR + if session.h264_packetizer().is_some() { + if let Some(ref udp_sock) = video_udp { + let dest = session.video_udp_dest(); + if let Some(dest_addr) = dest { + let rtcp_port = dest_addr.port() + 1; + let rtcp_dest = std::net::SocketAddr::new(dest_addr.ip(), rtcp_port); + let mut sr = SenderReport::with_timestamp(session.video_ssrc(), 0); + sr.sender_packet_count = video_rtcp_pkt_count; + sr.sender_octet_count = video_rtcp_octet_count; + let _ = udp_sock.send_to(&sr.encode(), rtcp_dest).await; + } + } + } + // 音频SR + if session.aac_packetizer().is_some() { + if let Some(ref udp_sock) = audio_udp { + let dest = session.audio_udp_dest(); + if let Some(dest_addr) = dest { + let rtcp_port = dest_addr.port() + 1; + let rtcp_dest = std::net::SocketAddr::new(dest_addr.ip(), rtcp_port); + let mut sr = SenderReport::with_timestamp(session.audio_ssrc(), 0); + sr.sender_packet_count = audio_rtcp_pkt_count; + sr.sender_octet_count = audio_rtcp_octet_count; + let _ = udp_sock.send_to(&sr.encode(), rtcp_dest).await; + } + } + } + } + } + last_rtcp_time = tokio::time::Instant::now(); + } // 检查流是否仍然存在(推流端可能已断开) if let Some(sp) = session.stream_path() { if !sm.has_stream(sp) { diff --git a/src/protocol/rtsp/session.rs b/src/protocol/rtsp/session.rs index 1c23cc4..eccdeb9 100644 --- a/src/protocol/rtsp/session.rs +++ b/src/protocol/rtsp/session.rs @@ -26,7 +26,6 @@ use std::sync::Arc; use crate::log_error; use crate::log_info; -use crate::log_warn; use crate::sdk::traits::{StreamManagerApi, SubscriberApi}; use crate::sdk::types::StreamPath; @@ -138,6 +137,10 @@ pub struct RtspSession { announce_sps: Option>, /// 缓存的 PPS NALU(不含起始码,含 NALU header 0x68) announce_pps: Option>, + + // === 拉流端音频采样率(从 DESCRIBE 元数据获取) === + /// 音频采样率,用于 RTP 时间戳计算(默认 44100) + audio_sample_rate: u32, } impl RtspSession { @@ -176,6 +179,8 @@ impl RtspSession { // 推流端 SPS/PPS(从 ANNOUNCE SDP 解析) announce_sps: None, announce_pps: None, + // 拉流端音频采样率 + audio_sample_rate: 44100, } } @@ -273,6 +278,52 @@ impl RtspSession { self.audio_udp_socket.take() } + /// 获取音频采样率(用于 RTP 时间戳计算) + pub fn audio_sample_rate(&self) -> u32 { + self.audio_sample_rate + } + + /// 获取 H.264 打包器引用(用于 RTCP SR 生成) + pub fn h264_packetizer(&self) -> Option<&H264RtpPacketizer> { + self.h264_packetizer.as_ref() + } + + /// 获取 AAC 打包器引用(用于 RTCP SR 生成) + pub fn aac_packetizer(&self) -> Option<&AacRtpPacketizer> { + self.aac_packetizer.as_ref() + } + + /// 设置 H.264 打包器的包计数和字节计数(用于 RTCP SR) + pub fn h264_packetizer_stats(&self) -> (u16, u32, u32) { + // 返回 (seq, packet_count_estimate, octet_count_estimate) + // 注意:实际实现需要更精确的追踪 + if let Some(ref pkt) = self.h264_packetizer { + (pkt.initial_seq(), 0, 0) + } else { + (0, 0, 0) + } + } + + /// 获取视频 SSRC + pub fn video_ssrc(&self) -> u32 { + self.video_ssrc + } + + /// 获取音频 SSRC + pub fn audio_ssrc(&self) -> u32 { + self.audio_ssrc + } + + /// 获取视频 RTCP interleaved 通道号(RTP 通道 + 1) + pub fn video_rtcp_channel(&self) -> u8 { + self.video_channel + 1 + } + + /// 获取音频 RTCP interleaved 通道号(RTP 通道 + 1) + pub fn audio_rtcp_channel(&self) -> u8 { + self.audio_channel + 1 + } + /// 处理 RTSP 请求,返回响应字节 /// /// 根据当前状态和请求方法执行状态转换,生成响应。 @@ -328,8 +379,11 @@ impl RtspSession { } /// 处理 DESCRIBE 请求 — 返回流的 SDP 描述 + /// + /// 从流的 GOP 缓存中提取编解码器元数据(SPS/PPS、AAC config), + /// 生成包含正确 SDP 参数的描述响应。 fn handle_describe( - &self, + &mut self, req: &RtspRequest, cseq: u32, sm: &Arc, @@ -347,10 +401,66 @@ impl RtspSession { return RtspResponse::new(status::NOT_FOUND).cseq(cseq).encode(); } - // 生成 SDP — 简化实现:假定 H.264 视频 + AAC 音频 + // 查询流的编解码器元数据 + let meta = sm.get_codec_metadata(&sp); + + // 生成 SDP let mut sdp = SdpInfo::new(&format!("{}/{}", app, stream_name), "0.0.0.0"); - sdp.add_h264_video("track1", None); - sdp.add_aac_audio("track2", 44100, 2); + + // 视频轨道:如果有 SPS/PPS,生成 sprop-parameter-sets 和 profile-level-id + let sprop = if let Some(ref m) = meta { + if let (Some(sps), Some(pps)) = (&m.h264_sps, &m.h264_pps) { + Some(format!( + "{},{}", + base64_encode_simple(sps), + base64_encode_simple(pps) + )) + } else { + None + } + } else { + None + }; + + // 从 SPS 计算 profile-level-id + let profile_level_id = if let Some(ref m) = meta { + if let Some(ref sps) = m.h264_sps { + if sps.len() >= 4 { + // SPS NALU: byte[0]=NALU header(0x67), byte[1]=profile_idc, byte[2]=constraint_set_flags, byte[3]=level_idc + let profile_idc = sps[1]; + let constraint = sps[2]; + let level_idc = sps[3]; + Some(format!("{:02X}{:02X}{:02X}", profile_idc, constraint, level_idc)) + } else { + None + } + } else { + None + } + } else { + None + }; + + sdp.add_h264_video("track1", sprop.as_deref(), profile_level_id.as_deref()); + + // 音频轨道:如果有 AAC config,传入 hex 编码 + let aac_config_hex = meta.as_ref().and_then(|m| m.aac_config.as_ref()).map(|c| { + c.iter().map(|b| format!("{:02x}", b)).collect::() + }); + let sample_rate = meta.as_ref().map(|m| m.audio_sample_rate).unwrap_or(44100); + let channels = meta.as_ref().map(|m| m.audio_channels).unwrap_or(2); + + // 如果采样率为 0(元数据中未设置),使用默认值 + let sample_rate = if sample_rate == 0 { 44100 } else { sample_rate }; + let channels = if channels == 0 { 2 } else { channels }; + + // 存储音频采样率到 session,供后续 RTP 时间戳计算使用 + self.audio_sample_rate = sample_rate; + + sdp.add_aac_audio("track2", sample_rate, channels, aac_config_hex.as_deref()); + + // 拉流会话标记为 recvonly + sdp.recvonly = true; RtspResponse::new(status::OK) .cseq(cseq) @@ -505,9 +615,12 @@ impl RtspSession { } /// 处理 PLAY 请求 — 开始拉流,状态从 Ready → Playing + /// + /// 订阅流并在响应中包含 RTP-Info 头, + /// 以便客户端知道初始序列号和 RTP 时间戳。 fn handle_play( &mut self, - _req: &RtspRequest, + req: &RtspRequest, cseq: u32, sm: &Arc, ) -> Vec { @@ -529,9 +642,37 @@ impl RtspSession { self.subscriber = Some(sub); self.state = SessionState::Playing; log_info!("[rtsp] pull start {}", sp); + + // 构建 RTP-Info 头 + // 从请求 URI 提取基础 URL(rtsp://host:port) + let base_url: String = req.uri.split('/').take(3).collect::>().join("/"); + + let mut rtp_info_parts = Vec::new(); + + // 视频轨道 RTP-Info + if let Some(ref pkt) = self.h264_packetizer { + rtp_info_parts.push(format!( + "url={}/track1;seq={};rtptime=0", + base_url, + pkt.initial_seq() + )); + } + + // 音频轨道 RTP-Info + if let Some(ref pkt) = self.aac_packetizer { + rtp_info_parts.push(format!( + "url={}/track2;seq={};rtptime=0", + base_url, + pkt.initial_seq() + )); + } + + let rtp_info = rtp_info_parts.join(","); + RtspResponse::new(status::OK) .cseq(cseq) .session(&self.session_id) + .header("RTP-Info", &rtp_info) .header("Range", "npt=0.000-") .encode() } @@ -722,8 +863,10 @@ impl RtspSession { } else { &frame.data }; + // 使用实际的音频采样率计算 RTP 时间戳 + let audio_clock_rate = self.audio_sample_rate; let rtp_packet = - pkt.packetize(aac_data, (frame.timestamp_ms * 44100 / 1000) as u32); + pkt.packetize(aac_data, (frame.timestamp_ms * audio_clock_rate as u64 / 1000) as u32); result.push(( self.audio_channel, rtp_packet.encode_interleaved(self.audio_channel), @@ -773,8 +916,10 @@ impl RtspSession { } else { &frame.data }; + // 使用实际的音频采样率计算 RTP 时间戳 + let audio_clock_rate = self.audio_sample_rate; let rtp_packet = - pkt.packetize(aac_data, (frame.timestamp_ms * 44100 / 1000) as u32); + pkt.packetize(aac_data, (frame.timestamp_ms * audio_clock_rate as u64 / 1000) as u32); result.push((false, rtp_packet)); } } @@ -1776,4 +1921,98 @@ mod tests { let result = base64_decode("abc!!!"); assert!(result.is_err(), "无效字符应返回错误"); } + + /// 测试:DESCRIBE 有编解码器元数据时 SDP 包含 sprop-parameter-sets + #[test] + fn test_session_describe_with_metadata_includes_sprop() { + use crate::sdk::types::{CodecExtraInfo, H264SeqHeader, AacSeqHeader, VideoCodec, FrameType}; + + let mut session = RtspSession::new(); + let sm = test_sm(); + let sp = StreamPath::new("live", "test"); + sm.create_stream(sp.clone()).unwrap(); + + // 获取内部 Stream 并 dispatch seq_header 帧 + let stream = { + // 通过 StreamManagerApi::dispatch_frame 来写入帧 + // 构造 H.264 seq_header + let sps = Arc::new(vec![0x67, 0x64, 0x00, 0x29, 0xAC, 0xD9, 0x40]); + let pps = Arc::new(vec![0x68, 0xEE, 0x31, 0x12]); + let mut vframe = crate::sdk::types::AVFrame::new_video( + 0, Arc::new(vec![]), VideoCodec::H264, FrameType::KeyFrame + ); + vframe.codec_info = Some(CodecExtraInfo::H264SeqHeader(H264SeqHeader { + sps, pps, + })); + sm.dispatch_frame(&sp, vframe); + + // 构造 AAC seq_header(48000Hz, 2ch) + // byte0 = 0x11, byte1 = 0x90 + let aac_config = Arc::new(vec![0x11, 0x90]); + let mut aframe = crate::sdk::types::AVFrame::new_audio( + 0, Arc::new(vec![]), crate::sdk::types::AudioCodec::Aac + ); + aframe.codec_info = Some(CodecExtraInfo::AacSeqHeader(AacSeqHeader { + audio_specific_config: aac_config, + })); + sm.dispatch_frame(&sp, aframe); + }; + + // DESCRIBE + let req = parse_req(b"DESCRIBE rtsp://localhost:554/live/test RTSP/1.0\r\nCSeq: 2\r\n\r\n"); + let resp = session.handle_request(&req, &sm); + let resp_str = std::str::from_utf8(&resp).unwrap(); + + assert!(resp_str.starts_with("RTSP/1.0 200"), "DESCRIBE 应返回 200"); + assert!(resp_str.contains("sprop-parameter-sets="), "应包含 sprop-parameter-sets"); + assert!(resp_str.contains("profile-level-id="), "应包含 profile-level-id"); + assert!(resp_str.contains("config="), "应包含 AAC config"); + assert!(resp_str.contains("a=recvonly"), "拉流会话应标记 recvonly"); + assert!(resp_str.contains("mpeg4-generic/48000/2"), "采样率应为 48000"); + } + + /// 测试:PLAY 响应包含 RTP-Info 头 + #[test] + fn test_session_play_response_includes_rtp_info() { + let mut session = RtspSession::new(); + let sm = test_sm(); + let sp = StreamPath::new("live", "test"); + sm.create_stream(sp).expect("创建流应成功"); + + // SETUP 视频(初始化 packetizer) + let setup1 = b"SETUP rtsp://localhost:554/live/test/trackID=0 RTSP/1.0\r\n\ + CSeq: 3\r\n\ + Transport: RTP/AVP;unicast;interleaved=0-1\r\n\ + \r\n"; + session.handle_request(&parse_req(setup1), &sm); + + // SETUP 音频 + let setup2 = b"SETUP rtsp://localhost:554/live/test/trackID=1 RTSP/1.0\r\n\ + CSeq: 4\r\n\ + Transport: RTP/AVP;unicast;interleaved=2-3\r\n\ + \r\n"; + session.handle_request(&parse_req(setup2), &sm); + + // PLAY + let play_raw = format!( + "PLAY rtsp://localhost:554/live/test RTSP/1.0\r\nCSeq: 5\r\nSession: {}\r\n\r\n", + session.session_id() + ); + let resp = session.handle_request(&parse_req(play_raw.as_bytes()), &sm); + let resp_str = std::str::from_utf8(&resp).unwrap(); + + assert!(resp_str.starts_with("RTSP/1.0 200"), "PLAY 应返回 200"); + assert!(resp_str.contains("RTP-Info:"), "PLAY 响应应包含 RTP-Info 头"); + assert!(resp_str.contains("seq=0"), "RTP-Info 应包含初始序列号"); + assert!(resp_str.contains("rtptime=0"), "RTP-Info 应包含 rtptime"); + assert!(resp_str.contains("track1"), "RTP-Info 应包含 track1"); + assert!(resp_str.contains("track2"), "RTP-Info 应包含 track2"); + } + + /// 测试:audio_sample_rate 默认为 44100,DESCRIBE 后更新 + #[test] + fn test_session_audio_sample_rate_default_and_updated() { + let session = RtspSession::new(); + assert_eq!(session.audio_sample_rate(), 44100, "默认应为 44100"); + } } diff --git a/src/sdk/plugin.rs b/src/sdk/plugin.rs index 7e4b3cb..13d6f49 100644 --- a/src/sdk/plugin.rs +++ b/src/sdk/plugin.rs @@ -7,6 +7,7 @@ //! - `ProtocolPlugin` — 协议插件扩展 trait(增加 protocol_name + default_port) use std::any::Any; +use std::sync::Arc; use super::context::EngineContext; use super::traits::ConfigProvider; @@ -59,16 +60,20 @@ pub trait Plugin: Send + Sync { /// 初始化阶段:加载配置、注入依赖 /// + /// 接收 `Arc` 以便插件在异步任务中持有上下文引用。 + /// /// 默认实现为空操作 - fn init(&mut self, context: &EngineContext, config: &dyn ConfigProvider) -> Result<(), String> { + fn init(&mut self, context: Arc, config: &dyn ConfigProvider) -> Result<(), String> { let _ = (context, config); Ok(()) } /// 启动阶段:开始监听端口、注册任务 /// + /// 接收 `Arc` 以便插件 spawn 异步任务时移动上下文所有权。 + /// /// 默认实现为空操作 - fn start(&mut self, context: &EngineContext) -> Result<(), String> { + fn start(&mut self, context: Arc) -> Result<(), String> { let _ = context; Ok(()) } diff --git a/src/sdk/registry.rs b/src/sdk/registry.rs index 23d30f1..56d9b52 100644 --- a/src/sdk/registry.rs +++ b/src/sdk/registry.rs @@ -60,14 +60,27 @@ impl PluginRegistry { /// 初始化所有已注册且启用的插件(调用 init()) /// /// 按注册顺序依次调用。任一插件 init 失败则立即返回错误。 + /// 每个插件接收 `Arc` 的克隆,便于在异步任务中持有。 + /// + /// 支持配置级开关:检查 `<插件名>.enabled` 配置项, + /// 设为 `"false"` 则跳过初始化并将插件标记为 `Disabled` 状态。 pub fn init_all(&self, config: &dyn ConfigProvider) -> Result<(), String> { let mut entries = self.entries.lock().unwrap(); for entry in entries.iter_mut() { + let name = entry.plugin.meta().name.clone(); + + // 检查配置中的 enabled 标志:`.enabled = false` 则跳过 + if let Some(val) = config.get(&format!("{}.enabled", name)) { + if val == "false" { + entry.enabled = false; + continue; + } + } + if !entry.enabled { continue; } - if let Err(e) = entry.plugin.init(&self.context, config) { - let name = entry.plugin.meta().name.clone(); + if let Err(e) = entry.plugin.init(Arc::clone(&self.context), config) { return Err(format!("plugin '{}' init failed: {}", name, e)); } } @@ -77,13 +90,14 @@ impl PluginRegistry { /// 启动所有已注册且启用的插件(调用 start()) /// /// 按注册顺序依次调用。任一插件 start 失败则立即返回错误。 + /// 每个插件接收 `Arc` 的克隆,便于 spawn 异步任务。 pub fn start_all(&self) -> Result<(), String> { let mut entries = self.entries.lock().unwrap(); for entry in entries.iter_mut() { if !entry.enabled { continue; } - if let Err(e) = entry.plugin.start(&self.context) { + if let Err(e) = entry.plugin.start(Arc::clone(&self.context)) { let name = entry.plugin.meta().name.clone(); return Err(format!("plugin '{}' start failed: {}", name, e)); } @@ -166,12 +180,12 @@ mod tests { impl Plugin for DummyPlugin { fn meta(&self) -> &PluginMeta { &self.meta } - fn init(&mut self, _context: &EngineContext, _config: &dyn ConfigProvider) -> Result<(), String> { + fn init(&mut self, _context: Arc, _config: &dyn ConfigProvider) -> Result<(), String> { *self.init_called.lock().unwrap() = true; self.state = PluginState::Initialized; Ok(()) } - fn start(&mut self, _context: &EngineContext) -> Result<(), String> { + fn start(&mut self, _context: Arc) -> Result<(), String> { *self.start_called.lock().unwrap() = true; self.state = PluginState::Running; Ok(()) @@ -203,10 +217,10 @@ mod tests { impl Plugin for FailPlugin { fn meta(&self) -> &PluginMeta { &self.meta } - fn init(&mut self, _context: &EngineContext, _config: &dyn ConfigProvider) -> Result<(), String> { + fn init(&mut self, _context: Arc, _config: &dyn ConfigProvider) -> Result<(), String> { Err("init error".to_string()) } - fn start(&mut self, _context: &EngineContext) -> Result<(), String> { + fn start(&mut self, _context: Arc) -> Result<(), String> { Err("start error".to_string()) } fn stop(&mut self) -> Result<(), String> { @@ -224,6 +238,30 @@ mod tests { fn get_section(&self, _section: &str) -> Vec<(String, String)> { Vec::new() } } + /// 测试用配置:支持按插件名设置 enabled 标志 + struct SelectiveConfig { + disabled: Vec, + } + + impl SelectiveConfig { + fn new(disabled: &[&str]) -> Self { + Self { disabled: disabled.iter().map(|s| s.to_string()).collect() } + } + } + + impl ConfigProvider for SelectiveConfig { + fn get(&self, key: &str) -> Option { + // key 格式:.enabled + if let Some(name) = key.strip_suffix(".enabled") { + if self.disabled.iter().any(|d| d == name) { + return Some("false".to_string()); + } + } + None + } + fn get_section(&self, _section: &str) -> Vec<(String, String)> { Vec::new() } + } + fn make_registry() -> PluginRegistry { let ctx = Arc::new(EngineContext::new(Arc::new(StreamManager::new()))); PluginRegistry::new(ctx) @@ -294,4 +332,31 @@ mod tests { assert!(registry.get_plugin("found").is_some()); assert!(registry.get_plugin("missing").is_none()); } + + /// 测试:配置中 enabled=false 的插件被跳过 + #[test] + fn test_registry_config_disable_skips_init() { + let registry = make_registry(); + registry.register(Box::new(DummyPlugin::new("rtmp"))); + registry.register(Box::new(DummyPlugin::new("hls"))); + // 禁用 hls 插件 + let config = SelectiveConfig::new(&["hls"]); + assert!(registry.init_all(&config).is_ok()); + assert!(registry.start_all().is_ok()); + // 两个插件都已注册,但 hls 未被初始化 + assert_eq!(registry.list_plugins().len(), 2); + } + + /// 测试:禁用插件不影响其他插件的完整生命周期 + #[test] + fn test_registry_config_disable_partial() { + let registry = make_registry(); + let rtmp = DummyPlugin::new("rtmp"); + registry.register(Box::new(rtmp)); + registry.register(Box::new(DummyPlugin::new("rtsp"))); + // 只禁用 rtsp + let config = SelectiveConfig::new(&["rtsp"]); + assert!(registry.init_all(&config).is_ok()); + assert!(registry.start_all().is_ok()); + } } diff --git a/src/sdk/traits.rs b/src/sdk/traits.rs index 7076e55..d607053 100644 --- a/src/sdk/traits.rs +++ b/src/sdk/traits.rs @@ -12,7 +12,7 @@ use std::any::Any; use std::sync::Arc; -use super::types::{AVFrame, StreamPath, StreamSummary, TrackInfo, TrackId}; +use super::types::{AVFrame, StreamCodecMeta, StreamPath, StreamSummary, TrackInfo, TrackId}; /// 发布者 API /// @@ -93,6 +93,18 @@ pub trait StreamManagerApi: Send + Sync { fn list_streams(&self) -> Vec; /// 获取所有活跃流的详细信息摘要 fn stream_summaries(&self) -> Vec; + /// 获取流的编解码器元数据(SPS/PPS, AAC config 等) + /// + /// 从 GOP 缓存中的 seq_header 帧提取编解码器初始化参数, + /// 用于 RTSP DESCRIBE 响应生成 SDP 描述。 + /// + /// # 参数 + /// - `path` — 流路径 + /// + /// # 返回 + /// - `Some(StreamCodecMeta)` — 找到流的编解码器元数据 + /// - `None` — 流不存在或无编解码器信息 + fn get_codec_metadata(&self, path: &StreamPath) -> Option; } /// 流生命周期事件 diff --git a/src/sdk/types.rs b/src/sdk/types.rs index 2566190..7fba17d 100644 --- a/src/sdk/types.rs +++ b/src/sdk/types.rs @@ -223,6 +223,24 @@ pub struct StreamPath { pub stream_name: String, } +/// 流的编解码器元数据(用于 RTSP DESCRIBE 等 SDP 生成场景) +/// +/// 从 GOP 缓存中的 seq_header 帧提取,包含 H.264 SPS/PPS 和 AAC 配置信息。 +/// RTSP 拉流时 DESCRIBE 响应需要这些信息来生成正确的 SDP 描述。 +#[derive(Debug, Clone, Default)] +pub struct StreamCodecMeta { + /// H.264 SPS NALU(不含起始码,含 NALU header 0x67) + pub h264_sps: Option>>, + /// H.264 PPS NALU(不含起始码,含 NALU header 0x68) + pub h264_pps: Option>>, + /// AAC AudioSpecificConfig 原始字节 + pub aac_config: Option>>, + /// 音频采样率(如 44100, 48000) + pub audio_sample_rate: u32, + /// 音频通道数 + pub audio_channels: u8, +} + /// 流摘要信息(用于 API 查询) /// /// 包含活跃流的元数据快照,供 HTTP API 返回