paced_sender逻辑新增音视频包排序功能

用于矫正音视频包时间戳交织性
This commit is contained in:
xia-chu
2025-06-29 18:00:04 +08:00
parent 0b21ece801
commit 9a7cca1ad9

View File

@@ -62,32 +62,38 @@ public:
setCurrentStamp(frame->dts());
resetTimer(EventPoller::getCurrentPoller());
}
_cache.emplace_back(frame->dts() + _cache_ms, Frame::getCacheAbleFrame(frame));
auto &last_dts = _last_dts[frame->getTrackType()];
if (last_dts > frame->dts()) {
// 时间戳回退了,点播流?
WarnL << "Dts decrease: " << last_dts << "->" << frame->dts() << ", flush all paced sender cache: " << _cache.size();
flushCache(frame->dts());
}
_cache.emplace(frame->dts(), Frame::getCacheAbleFrame(frame));
last_dts = frame->dts();
return true;
}
private:
void onTick() {
std::lock_guard<std::recursive_mutex> lck(_mtx);
auto dst = _cache.empty() ? 0 : _cache.back().first;
auto max_dts = _cache.empty() ? 0 : _cache.rbegin()->first;
while (!_cache.empty()) {
auto &front = _cache.front();
if (getCurrentStamp() < front.first) {
auto front = _cache.begin();
if (getCurrentStamp() < front->first + _cache_ms) {
// 还没到消费时间 [AUTO-TRANSLATED:09fb4c3d]
// Not yet time to consume
break;
}
// 时间到了该消费frame了 [AUTO-TRANSLATED:2f007931]
// Time is up, it's time to consume the frame
_cb(front.second);
_cache.pop_front();
_cb(front->second);
_cache.erase(front);
}
if (_cache.empty() && dst) {
if (_cache.empty() && max_dts) {
// 消费太快,需要增加缓存大小 [AUTO-TRANSLATED:c05bfbcd]
// Consumption is too fast, need to increase cache size
setCurrentStamp(dst);
setCurrentStamp(max_dts);
_cache_ms += kMinCacheMS;
}
@@ -95,15 +101,20 @@ private:
// Consumption is too slow, need to force flush data
if (_cache.size() > 25 * 5) {
WarnL << "Flush frame paced sender cache: " << _cache.size();
while (!_cache.empty()) {
auto &front = _cache.front();
_cb(front.second);
_cache.pop_front();
}
setCurrentStamp(dst);
flushCache(max_dts);
}
}
void flushCache(uint64_t dts) {
while (!_cache.empty()) {
auto front = _cache.begin();
_cb(front->second);
_cache.erase(front);
}
setCurrentStamp(dts);
_cache_ms = kMinCacheMS;
}
uint64_t getCurrentStamp() { return _ticker.elapsedTime() + _stamp_offset; }
void setCurrentStamp(uint64_t stamp) {
@@ -115,11 +126,12 @@ private:
uint32_t _paced_sender_ms;
uint32_t _cache_ms = kMinCacheMS;
uint64_t _stamp_offset = 0;
uint64_t _last_dts[2] = {0, 0};
OnFrame _cb;
Ticker _ticker;
Timer::Ptr _timer;
std::recursive_mutex _mtx;
std::list<std::pair<uint64_t, Frame::Ptr>> _cache;
std::multimap<uint64_t, Frame::Ptr> _cache;
};
std::shared_ptr<MediaSinkInterface> MultiMediaSourceMuxer::makeRecorder(MediaSource &sender, Recorder::type type) {