diff --git a/src/Common/MultiMediaSourceMuxer.cpp b/src/Common/MultiMediaSourceMuxer.cpp index 4c9ad177..4493a8fe 100644 --- a/src/Common/MultiMediaSourceMuxer.cpp +++ b/src/Common/MultiMediaSourceMuxer.cpp @@ -62,32 +62,38 @@ public: setCurrentStamp(frame->dts()); resetTimer(EventPoller::getCurrentPoller()); } - - _cache.emplace_back(frame->dts() + _cache_ms, Frame::getCacheAbleFrame(frame)); + auto &last_dts = _last_dts[frame->getTrackType()]; + if (last_dts > frame->dts()) { + // 时间戳回退了,点播流? + WarnL << "Dts decrease: " << last_dts << "->" << frame->dts() << ", flush all paced sender cache: " << _cache.size(); + flushCache(frame->dts()); + } + _cache.emplace(frame->dts(), Frame::getCacheAbleFrame(frame)); + last_dts = frame->dts(); return true; } private: void onTick() { std::lock_guard lck(_mtx); - auto dst = _cache.empty() ? 0 : _cache.back().first; + auto max_dts = _cache.empty() ? 0 : _cache.rbegin()->first; while (!_cache.empty()) { - auto &front = _cache.front(); - if (getCurrentStamp() < front.first) { + auto front = _cache.begin(); + if (getCurrentStamp() < front->first + _cache_ms) { // 还没到消费时间 [AUTO-TRANSLATED:09fb4c3d] // Not yet time to consume break; } // 时间到了,该消费frame了 [AUTO-TRANSLATED:2f007931] // Time is up, it's time to consume the frame - _cb(front.second); - _cache.pop_front(); + _cb(front->second); + _cache.erase(front); } - if (_cache.empty() && dst) { + if (_cache.empty() && max_dts) { // 消费太快,需要增加缓存大小 [AUTO-TRANSLATED:c05bfbcd] // Consumption is too fast, need to increase cache size - setCurrentStamp(dst); + setCurrentStamp(max_dts); _cache_ms += kMinCacheMS; } @@ -95,15 +101,20 @@ private: // Consumption is too slow, need to force flush data if (_cache.size() > 25 * 5) { WarnL << "Flush frame paced sender cache: " << _cache.size(); - while (!_cache.empty()) { - auto &front = _cache.front(); - _cb(front.second); - _cache.pop_front(); - } - setCurrentStamp(dst); + flushCache(max_dts); } } + void flushCache(uint64_t dts) { + while (!_cache.empty()) { + auto front = _cache.begin(); + _cb(front->second); + _cache.erase(front); + } + setCurrentStamp(dts); + _cache_ms = kMinCacheMS; + } + uint64_t getCurrentStamp() { return _ticker.elapsedTime() + _stamp_offset; } void setCurrentStamp(uint64_t stamp) { @@ -115,11 +126,12 @@ private: uint32_t _paced_sender_ms; uint32_t _cache_ms = kMinCacheMS; uint64_t _stamp_offset = 0; + uint64_t _last_dts[2] = {0, 0}; OnFrame _cb; Ticker _ticker; Timer::Ptr _timer; std::recursive_mutex _mtx; - std::list> _cache; + std::multimap _cache; }; std::shared_ptr MultiMediaSourceMuxer::makeRecorder(MediaSource &sender, Recorder::type type) {