hls/mp4录制放置在后台线程

This commit is contained in:
ziyue
2021-12-30 20:52:38 +08:00
committed by xia-chu
parent 2bbd177d5a
commit df41d4d410
6 changed files with 99 additions and 17 deletions

View File

@@ -245,6 +245,10 @@ extern std::vector<size_t> getBlockTypeSize();
extern uint64_t getTotalMemBlockByType(int type);
extern uint64_t getThisThreadMemBlockByType(int type) ;
namespace mediakit {
extern ThreadPool &getMP4Thread();
extern ThreadPool &getHlsThread();
}
static void *web_api_tag = nullptr;
static inline void addHttpListener(){
@@ -334,7 +338,7 @@ public:
return _map.erase(key);
}
size_t size() {
size_t size() {
std::lock_guard<std::recursive_mutex> lck(_mtx);
return _map.size();
}
@@ -594,7 +598,7 @@ void getStatisticJson(const function<void(Value &val)> &cb) {
val["totalMemBlockTypeCount"] = str;
}
auto thread_size = EventPollerPool::Instance().getExecutorSize() + WorkThreadPool::Instance().getExecutorSize();
auto thread_size = 2 + EventPollerPool::Instance().getExecutorSize() + WorkThreadPool::Instance().getExecutorSize();
std::shared_ptr<vector<Value> > thread_mem_info = std::make_shared<vector<Value> >(thread_size);
shared_ptr<void> finished(nullptr, [thread_mem_info, cb, obj](void *) {
@@ -633,6 +637,8 @@ void getStatisticJson(const function<void(Value &val)> &cb) {
};
EventPollerPool::Instance().for_each(lam1);
WorkThreadPool::Instance().for_each(lam1);
lam0(getMP4Thread());
lam0(getHlsThread());
#else
cb(*obj);
#endif
@@ -756,7 +762,7 @@ void addStreamPusherProxy(const string &schema,
* Install api interface
* All apis support GET and POST methods
* POST method parameters support application/json and application/x-www-form-urlencoded methods
* [AUTO-TRANSLATED:62e68c43]
*/
void installWebApi() {

View File

@@ -0,0 +1,26 @@
/*
* Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
*
* This file is part of ZLMediaKit(https://github.com/xia-chu/ZLMediaKit).
*
* Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
*/
#include "HlsRecorder.h"
using namespace toolkit;
namespace mediakit {
ThreadPool &getHlsThread() {
static ThreadPool ret(1, ThreadPool::PRIORITY_LOWEST, true);
static onceToken s_token([]() {
ret.async([]() {
setThreadName("hls thread");
});
});
return ret;
}
}//namespace mediakit

View File

@@ -15,9 +15,12 @@
#include "MPEG.h"
#include "MP4Muxer.h"
#include "Common/config.h"
#include "Thread/ThreadPool.h"
namespace mediakit {
toolkit::ThreadPool& getHlsThread();
template <typename Muxer>
class HlsRecorderBase : public MediaSourceEventInterceptor, public Muxer, public std::enable_shared_from_this<HlsRecorderBase<Muxer> > {
public:
@@ -58,6 +61,15 @@ public:
}
bool inputFrame(const Frame::Ptr &frame) override {
auto ptr = this->shared_from_this();
auto cached_frame = Frame::getCacheAbleFrame(frame);
getHlsThread().async([ptr, cached_frame]() {
ptr->inputFrame_l(cached_frame);
});
return true;
}
bool inputFrame_l(const Frame::Ptr &frame) {
if (_clear_cache && _option.hls_demand) {
_clear_cache = false;
// 清空旧的m3u8索引文件于ts切片 [AUTO-TRANSLATED:a4ce0664]
@@ -87,8 +99,12 @@ protected:
class HlsRecorder final : public HlsRecorderBase<MpegMuxer> {
public:
using Ptr = std::shared_ptr<HlsRecorder>;
template <typename ...ARGS>
HlsRecorder(ARGS && ...args) : HlsRecorderBase<MpegMuxer>(false, std::forward<ARGS>(args)...) {}
template <typename... ARGS>
static Ptr create(ARGS &&...args) {
return Ptr(new HlsRecorder(std::forward<ARGS>(args)...), [](HlsRecorder *ptr) { getHlsThread().async([ptr]() { delete ptr; }); });
}
~HlsRecorder() override {
try {
this->flush();
@@ -98,6 +114,9 @@ public:
}
private:
template <typename ...ARGS>
HlsRecorder(ARGS && ...args) : HlsRecorderBase<MpegMuxer>(false, std::forward<ARGS>(args)...) {}
void onWrite(std::shared_ptr<toolkit::Buffer> buffer, uint64_t timestamp, bool key_pos) override {
if (!buffer) {
// reset tracks
@@ -111,8 +130,12 @@ private:
class HlsFMP4Recorder final : public HlsRecorderBase<MP4MuxerMemory> {
public:
using Ptr = std::shared_ptr<HlsFMP4Recorder>;
template <typename ...ARGS>
HlsFMP4Recorder(ARGS && ...args) : HlsRecorderBase<MP4MuxerMemory>(true, std::forward<ARGS>(args)...) {}
template <typename... ARGS>
static Ptr create(ARGS &&...args) {
return Ptr(new HlsFMP4Recorder(std::forward<ARGS>(args)...), [](HlsFMP4Recorder *ptr) { getHlsThread().async([ptr]() { delete ptr; }); });
}
~HlsFMP4Recorder() override {
try {
this->flush();
@@ -122,12 +145,18 @@ public:
}
void addTrackCompleted() override {
HlsRecorderBase<MP4MuxerMemory>::addTrackCompleted();
auto data = getInitSegment();
_hls->inputInitSegment(data.data(), data.size());
auto ptr = std::static_pointer_cast<HlsFMP4Recorder>(this->shared_from_this());
getHlsThread().async([ptr]() {
ptr->HlsRecorderBase<MP4MuxerMemory>::addTrackCompleted();
auto data = ptr->getInitSegment();
ptr->_hls->inputInitSegment(data.data(), data.size());
});
}
private:
template <typename ...ARGS>
HlsFMP4Recorder(ARGS && ...args) : HlsRecorderBase<MP4MuxerMemory>(true, std::forward<ARGS>(args)...) {}
void onSegmentData(std::string buffer, uint64_t timestamp, bool key_pos) override {
if (buffer.empty()) {
// reset tracks

View File

@@ -22,6 +22,20 @@ using namespace toolkit;
namespace mediakit {
ThreadPool &getMP4Thread() {
static ThreadPool ret(1, ThreadPool::PRIORITY_LOWEST, true);
static onceToken s_token([]() {
ret.async([]() {
setThreadName("mp4 thread");
});
});
return ret;
}
MP4Recorder::Ptr MP4Recorder::create(const MediaTuple &tuple, const std::string &path, size_t max_second) {
return Ptr(new MP4Recorder(tuple, path, max_second), [](MP4Recorder *ptr) { getMP4Thread().async([ptr]() { delete ptr; }); });
}
MP4Recorder::MP4Recorder(const MediaTuple &tuple, const string &path, size_t max_second) {
// ///record 业务逻辑////// [AUTO-TRANSLATED:2e78931a]
// ///record Business Logic//////
@@ -75,7 +89,7 @@ void MP4Recorder::asyncClose() {
auto full_path_tmp = _full_path_tmp;
auto info = _info;
TraceL << "Start close tmp mp4 file: " << full_path_tmp;
WorkThreadPool::Instance().getExecutor()->async([muxer, full_path_tmp, info]() mutable {
getMP4Thread().async([muxer, full_path_tmp, info]() mutable {
info.time_len = muxer->getDuration() / 1000.0f;
// 关闭mp4可能非常耗时所以要放在后台线程执行 [AUTO-TRANSLATED:a7378a11]
// Closing mp4 can be very time-consuming, so it should be executed in the background thread
@@ -128,7 +142,7 @@ bool MP4Recorder::inputFrame(const Frame::Ptr &frame) {
// In the case of b-frames, the dts timestamp may regress
_last_dts = MIN(frame->dts(), _last_dts);
}
auto duration = 5u; // 默认至少一帧5ms
if (frame->dts() > 0 && frame->dts() > _last_dts) {
duration = MAX(duration, frame->dts() - _last_dts);
@@ -150,7 +164,12 @@ bool MP4Recorder::inputFrame(const Frame::Ptr &frame) {
if (_muxer) {
// 生成mp4文件 [AUTO-TRANSLATED:76a8d77c]
// Generate mp4 file
return _muxer->inputFrame(frame);
auto muxer = _muxer;
auto cached_frame = Frame::getCacheAbleFrame(frame);
getMP4Thread().async([muxer, cached_frame]() {
return muxer->inputFrame(cached_frame);
});
return true;
}
return false;
}

View File

@@ -26,7 +26,7 @@ class MP4Recorder final : public MediaSinkInterface {
public:
using Ptr = std::shared_ptr<MP4Recorder>;
MP4Recorder(const MediaTuple &tuple, const std::string &path, size_t max_second);
static Ptr create(const MediaTuple &tuple, const std::string &path, size_t max_second);
~MP4Recorder() override;
/**
@@ -63,6 +63,8 @@ public:
bool addTrack(const Track::Ptr & track) override;
private:
MP4Recorder(const MediaTuple &tuple, const std::string &path, size_t max_second);
void createFile();
void closeFile();
void asyncClose();

View File

@@ -78,7 +78,7 @@ std::shared_ptr<MediaSinkInterface> Recorder::createRecorder(type type, const Me
#if defined(ENABLE_HLS)
auto path = Recorder::getRecordPath(type, tuple, option.hls_save_path);
GET_CONFIG(bool, enable_vhost, General::kEnableVhost);
auto ret = std::make_shared<HlsRecorder>(path, enable_vhost ? string(VHOST_KEY) + "=" + tuple.vhost : "", option);
auto ret = HlsRecorder::create(path, enable_vhost ? string(VHOST_KEY) + "=" + tuple.vhost : "", option);
ret->setMediaSource(tuple);
return ret;
#else
@@ -89,7 +89,7 @@ std::shared_ptr<MediaSinkInterface> Recorder::createRecorder(type type, const Me
case Recorder::type_mp4: {
#if defined(ENABLE_MP4)
auto path = Recorder::getRecordPath(type, tuple, option.mp4_save_path);
return std::make_shared<MP4Recorder>(tuple, path, option.mp4_max_second);
return MP4Recorder::create(tuple, path, option.mp4_max_second);
#else
throw std::invalid_argument("mp4相关功能未打开请开启ENABLE_MP4宏后编译再测试");
#endif
@@ -99,7 +99,7 @@ std::shared_ptr<MediaSinkInterface> Recorder::createRecorder(type type, const Me
#if defined(ENABLE_MP4)
auto path = Recorder::getRecordPath(type, tuple, option.hls_save_path);
GET_CONFIG(bool, enable_vhost, General::kEnableVhost);
auto ret = std::make_shared<HlsFMP4Recorder>(path, enable_vhost ? string(VHOST_KEY) + "=" + tuple.vhost : "", option);
auto ret = HlsFMP4Recorder::create(path, enable_vhost ? string(VHOST_KEY) + "=" + tuple.vhost : "", option);
ret->setMediaSource(tuple);
return ret;
#else