diff --git a/server/WebApi.cpp b/server/WebApi.cpp index 291a5d89..bc33732a 100755 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -245,6 +245,10 @@ extern std::vector getBlockTypeSize(); extern uint64_t getTotalMemBlockByType(int type); extern uint64_t getThisThreadMemBlockByType(int type) ; +namespace mediakit { +extern ThreadPool &getMP4Thread(); +extern ThreadPool &getHlsThread(); +} static void *web_api_tag = nullptr; static inline void addHttpListener(){ @@ -334,7 +338,7 @@ public: return _map.erase(key); } - size_t size() { + size_t size() { std::lock_guard lck(_mtx); return _map.size(); } @@ -594,7 +598,7 @@ void getStatisticJson(const function &cb) { val["totalMemBlockTypeCount"] = str; } - auto thread_size = EventPollerPool::Instance().getExecutorSize() + WorkThreadPool::Instance().getExecutorSize(); + auto thread_size = 2 + EventPollerPool::Instance().getExecutorSize() + WorkThreadPool::Instance().getExecutorSize(); std::shared_ptr > thread_mem_info = std::make_shared >(thread_size); shared_ptr finished(nullptr, [thread_mem_info, cb, obj](void *) { @@ -633,6 +637,8 @@ void getStatisticJson(const function &cb) { }; EventPollerPool::Instance().for_each(lam1); WorkThreadPool::Instance().for_each(lam1); + lam0(getMP4Thread()); + lam0(getHlsThread()); #else cb(*obj); #endif @@ -756,7 +762,7 @@ void addStreamPusherProxy(const string &schema, * Install api interface * All apis support GET and POST methods * POST method parameters support application/json and application/x-www-form-urlencoded methods - + * [AUTO-TRANSLATED:62e68c43] */ void installWebApi() { diff --git a/src/Record/HlsRecorder.cpp b/src/Record/HlsRecorder.cpp new file mode 100644 index 00000000..8aab997f --- /dev/null +++ b/src/Record/HlsRecorder.cpp @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/xia-chu/ZLMediaKit). + * + * Use of this source code is governed by MIT license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#include "HlsRecorder.h" +using namespace toolkit; + +namespace mediakit { + +ThreadPool &getHlsThread() { + static ThreadPool ret(1, ThreadPool::PRIORITY_LOWEST, true); + static onceToken s_token([]() { + ret.async([]() { + setThreadName("hls thread"); + }); + }); + return ret; +} + +}//namespace mediakit diff --git a/src/Record/HlsRecorder.h b/src/Record/HlsRecorder.h index 1810aec5..852057e3 100644 --- a/src/Record/HlsRecorder.h +++ b/src/Record/HlsRecorder.h @@ -15,9 +15,12 @@ #include "MPEG.h" #include "MP4Muxer.h" #include "Common/config.h" +#include "Thread/ThreadPool.h" namespace mediakit { +toolkit::ThreadPool& getHlsThread(); + template class HlsRecorderBase : public MediaSourceEventInterceptor, public Muxer, public std::enable_shared_from_this > { public: @@ -58,6 +61,15 @@ public: } bool inputFrame(const Frame::Ptr &frame) override { + auto ptr = this->shared_from_this(); + auto cached_frame = Frame::getCacheAbleFrame(frame); + getHlsThread().async([ptr, cached_frame]() { + ptr->inputFrame_l(cached_frame); + }); + return true; + } + + bool inputFrame_l(const Frame::Ptr &frame) { if (_clear_cache && _option.hls_demand) { _clear_cache = false; // 清空旧的m3u8索引文件于ts切片 [AUTO-TRANSLATED:a4ce0664] @@ -87,8 +99,12 @@ protected: class HlsRecorder final : public HlsRecorderBase { public: using Ptr = std::shared_ptr; - template - HlsRecorder(ARGS && ...args) : HlsRecorderBase(false, std::forward(args)...) {} + + template + static Ptr create(ARGS &&...args) { + return Ptr(new HlsRecorder(std::forward(args)...), [](HlsRecorder *ptr) { getHlsThread().async([ptr]() { delete ptr; }); }); + } + ~HlsRecorder() override { try { this->flush(); @@ -98,6 +114,9 @@ public: } private: + template + HlsRecorder(ARGS && ...args) : HlsRecorderBase(false, std::forward(args)...) {} + void onWrite(std::shared_ptr buffer, uint64_t timestamp, bool key_pos) override { if (!buffer) { // reset tracks @@ -111,8 +130,12 @@ private: class HlsFMP4Recorder final : public HlsRecorderBase { public: using Ptr = std::shared_ptr; - template - HlsFMP4Recorder(ARGS && ...args) : HlsRecorderBase(true, std::forward(args)...) {} + + template + static Ptr create(ARGS &&...args) { + return Ptr(new HlsFMP4Recorder(std::forward(args)...), [](HlsFMP4Recorder *ptr) { getHlsThread().async([ptr]() { delete ptr; }); }); + } + ~HlsFMP4Recorder() override { try { this->flush(); @@ -122,12 +145,18 @@ public: } void addTrackCompleted() override { - HlsRecorderBase::addTrackCompleted(); - auto data = getInitSegment(); - _hls->inputInitSegment(data.data(), data.size()); + auto ptr = std::static_pointer_cast(this->shared_from_this()); + getHlsThread().async([ptr]() { + ptr->HlsRecorderBase::addTrackCompleted(); + auto data = ptr->getInitSegment(); + ptr->_hls->inputInitSegment(data.data(), data.size()); + }); } private: + template + HlsFMP4Recorder(ARGS && ...args) : HlsRecorderBase(true, std::forward(args)...) {} + void onSegmentData(std::string buffer, uint64_t timestamp, bool key_pos) override { if (buffer.empty()) { // reset tracks diff --git a/src/Record/MP4Recorder.cpp b/src/Record/MP4Recorder.cpp index 31732043..198af027 100644 --- a/src/Record/MP4Recorder.cpp +++ b/src/Record/MP4Recorder.cpp @@ -22,6 +22,20 @@ using namespace toolkit; namespace mediakit { +ThreadPool &getMP4Thread() { + static ThreadPool ret(1, ThreadPool::PRIORITY_LOWEST, true); + static onceToken s_token([]() { + ret.async([]() { + setThreadName("mp4 thread"); + }); + }); + return ret; +} + +MP4Recorder::Ptr MP4Recorder::create(const MediaTuple &tuple, const std::string &path, size_t max_second) { + return Ptr(new MP4Recorder(tuple, path, max_second), [](MP4Recorder *ptr) { getMP4Thread().async([ptr]() { delete ptr; }); }); +} + MP4Recorder::MP4Recorder(const MediaTuple &tuple, const string &path, size_t max_second) { // ///record 业务逻辑////// [AUTO-TRANSLATED:2e78931a] // ///record Business Logic////// @@ -75,7 +89,7 @@ void MP4Recorder::asyncClose() { auto full_path_tmp = _full_path_tmp; auto info = _info; TraceL << "Start close tmp mp4 file: " << full_path_tmp; - WorkThreadPool::Instance().getExecutor()->async([muxer, full_path_tmp, info]() mutable { + getMP4Thread().async([muxer, full_path_tmp, info]() mutable { info.time_len = muxer->getDuration() / 1000.0f; // 关闭mp4可能非常耗时,所以要放在后台线程执行 [AUTO-TRANSLATED:a7378a11] // Closing mp4 can be very time-consuming, so it should be executed in the background thread @@ -128,7 +142,7 @@ bool MP4Recorder::inputFrame(const Frame::Ptr &frame) { // In the case of b-frames, the dts timestamp may regress _last_dts = MIN(frame->dts(), _last_dts); } - + auto duration = 5u; // 默认至少一帧5ms if (frame->dts() > 0 && frame->dts() > _last_dts) { duration = MAX(duration, frame->dts() - _last_dts); @@ -150,7 +164,12 @@ bool MP4Recorder::inputFrame(const Frame::Ptr &frame) { if (_muxer) { // 生成mp4文件 [AUTO-TRANSLATED:76a8d77c] // Generate mp4 file - return _muxer->inputFrame(frame); + auto muxer = _muxer; + auto cached_frame = Frame::getCacheAbleFrame(frame); + getMP4Thread().async([muxer, cached_frame]() { + return muxer->inputFrame(cached_frame); + }); + return true; } return false; } diff --git a/src/Record/MP4Recorder.h b/src/Record/MP4Recorder.h index 8faec96b..437a7878 100644 --- a/src/Record/MP4Recorder.h +++ b/src/Record/MP4Recorder.h @@ -26,7 +26,7 @@ class MP4Recorder final : public MediaSinkInterface { public: using Ptr = std::shared_ptr; - MP4Recorder(const MediaTuple &tuple, const std::string &path, size_t max_second); + static Ptr create(const MediaTuple &tuple, const std::string &path, size_t max_second); ~MP4Recorder() override; /** @@ -63,6 +63,8 @@ public: bool addTrack(const Track::Ptr & track) override; private: + MP4Recorder(const MediaTuple &tuple, const std::string &path, size_t max_second); + void createFile(); void closeFile(); void asyncClose(); diff --git a/src/Record/Recorder.cpp b/src/Record/Recorder.cpp index 51b9e031..d63a21f7 100644 --- a/src/Record/Recorder.cpp +++ b/src/Record/Recorder.cpp @@ -78,7 +78,7 @@ std::shared_ptr Recorder::createRecorder(type type, const Me #if defined(ENABLE_HLS) auto path = Recorder::getRecordPath(type, tuple, option.hls_save_path); GET_CONFIG(bool, enable_vhost, General::kEnableVhost); - auto ret = std::make_shared(path, enable_vhost ? string(VHOST_KEY) + "=" + tuple.vhost : "", option); + auto ret = HlsRecorder::create(path, enable_vhost ? string(VHOST_KEY) + "=" + tuple.vhost : "", option); ret->setMediaSource(tuple); return ret; #else @@ -89,7 +89,7 @@ std::shared_ptr Recorder::createRecorder(type type, const Me case Recorder::type_mp4: { #if defined(ENABLE_MP4) auto path = Recorder::getRecordPath(type, tuple, option.mp4_save_path); - return std::make_shared(tuple, path, option.mp4_max_second); + return MP4Recorder::create(tuple, path, option.mp4_max_second); #else throw std::invalid_argument("mp4相关功能未打开,请开启ENABLE_MP4宏后编译再测试"); #endif @@ -99,7 +99,7 @@ std::shared_ptr Recorder::createRecorder(type type, const Me #if defined(ENABLE_MP4) auto path = Recorder::getRecordPath(type, tuple, option.hls_save_path); GET_CONFIG(bool, enable_vhost, General::kEnableVhost); - auto ret = std::make_shared(path, enable_vhost ? string(VHOST_KEY) + "=" + tuple.vhost : "", option); + auto ret = HlsFMP4Recorder::create(path, enable_vhost ? string(VHOST_KEY) + "=" + tuple.vhost : "", option); ret->setMediaSource(tuple); return ret; #else