diff --git a/python/mk_plugin.py b/python/mk_plugin.py index 0dc62592..59aaaee9 100644 --- a/python/mk_plugin.py +++ b/python/mk_plugin.py @@ -148,4 +148,21 @@ def on_rtp_server_timeout(local_port: int, tuple: mk_loader.MediaTuple, tcp_mode return True def on_reload_config(): - mk_logger.log_info(f"on_reload_config") \ No newline at end of file + mk_logger.log_info(f"on_reload_config") + +class PyMultiMediaSourceMuxer: + def __init__(self, sender: mk_loader.MultiMediaSourceMuxer): + mk_logger.log_info(f"PyMultiMediaSourceMuxer: {sender.getMediaTuple().shortUrl()}") + def destroy(self): + mk_logger.log_info(f"~PyMultiMediaSourceMuxer") + + def addTrack(self, track: mk_loader.Track): + mk_logger.log_info(f"addTrack: {track.getCodecName()}") + return True + def addTrackCompleted(self): + mk_logger.log_info(f"addTrackCompleted") + def inputFrame(self, frame: mk_loader.Frame): + mk_logger.log_info(f"inputFrame: {frame.getCodecName()} {frame.dts()}") + return True +def on_create_muxer(sender: mk_loader.MultiMediaSourceMuxer): + return PyMultiMediaSourceMuxer(sender) \ No newline at end of file diff --git a/server/pyinvoker.cpp b/server/pyinvoker.cpp index 4047fb19..ff43cf9b 100644 --- a/server/pyinvoker.cpp +++ b/server/pyinvoker.cpp @@ -167,6 +167,53 @@ void handle_http_request(const py::object &check_route, const py::object &submit submit_coro(scope, py::bytes(parser.content()), send); } +class MuxerDelegatePython : public MediaSinkInterface { +public: + MuxerDelegatePython(py::object object) { + _py_muxer = std::move(object); + _input_frame = _py_muxer.attr("inputFrame"); + _add_track = _py_muxer.attr("addTrack"); + _add_track_completed = _py_muxer.attr("addTrackCompleted"); + } + + ~MuxerDelegatePython() override { + py::gil_scoped_acquire guard; + try { + auto destroy = _py_muxer.attr("destroy"); + destroy(); + destroy = py::function(); + } catch (std::exception &ex) { + ErrorL << "destroy python muxer failed: " << ex.what(); + } + _input_frame = py::function(); + _add_track = py::function(); + _add_track_completed = py::function(); + _py_muxer = py::function(); + } + + bool addTrack(const Track::Ptr &track) override { + py::gil_scoped_acquire guard; + return _add_track ? _add_track(track).cast() : false; + } + + void addTrackCompleted() override { + py::gil_scoped_acquire guard; + if (_add_track_completed) { + _add_track_completed(); + } + } + + bool inputFrame(const Frame::Ptr &frame) override { + py::gil_scoped_acquire guard; + return _input_frame ? _input_frame(frame).cast() : false; + } + +private: + py::object _py_muxer; + py::function _input_frame; + py::function _add_track; + py::function _add_track_completed; +}; PYBIND11_EMBEDDED_MODULE(mk_loader, m) { m.def("log", [](int lev, const char *file, int line, const char *func, const char *content) { @@ -333,6 +380,70 @@ PYBIND11_EMBEDDED_MODULE(mk_loader, m) { .def("stopSendRtp", &MultiMediaSourceMuxer::stopSendRtp) .def("getOption", &MultiMediaSourceMuxer::getOption) .def("getMediaTuple", &MultiMediaSourceMuxer::getMediaTuple); + + py::class_(m, "Track") + .def("getCodecId", &Track::getCodecId) + .def("getCodecName", &Track::getCodecName) + .def("getTrackType", &Track::getTrackType) + .def("getTrackTypeStr", &Track::getTrackTypeStr) + .def("setIndex", &Track::setIndex) + .def("getIndex", &Track::getIndex) + .def("getVideoKeyFrames", &Track::getVideoKeyFrames) + .def("getFrames", &Track::getFrames) + .def("getVideoGopSize", &Track::getVideoGopSize) + .def("getVideoGopInterval", &Track::getVideoGopInterval) + .def("getDuration", &Track::getDuration) + .def("ready", &Track::ready) + .def("update", &Track::update) + .def("getSdp", &Track::getSdp) + .def("getExtraData", &Track::getExtraData) + .def("setExtraData", &Track::setExtraData) + .def("getBitRate", &Track::getBitRate) + .def("setBitRate", &Track::setBitRate) + .def("getVideoHeight",[](Track *thiz) { + auto ptr = dynamic_cast(thiz); + return ptr ? ptr->getVideoHeight() : 0; + }) + .def("getVideoWidth", [](Track *thiz) { + auto ptr = dynamic_cast(thiz); + return ptr ? ptr->getVideoWidth() : 0; + }) + .def("getVideoFps", [](Track *thiz) { + auto ptr = dynamic_cast(thiz); + return ptr ? ptr->getVideoFps() : 0; + }) + .def("getAudioSampleRate",[](Track *thiz) { + auto ptr = dynamic_cast(thiz); + return ptr ? ptr->getAudioSampleRate() : 0; + }) + .def("getAudioSampleBit", [](Track *thiz) { + auto ptr = dynamic_cast(thiz); + return ptr ? ptr->getAudioSampleBit() : 0; + }) + .def("getAudioChannel", [](Track *thiz) { + auto ptr = dynamic_cast(thiz); + return ptr ? ptr->getAudioChannel() : 0; + }); + + py::class_(m, "Frame") + .def("data", &Frame::data) + .def("size", &Frame::size) + .def("toString", &Frame::toString) + .def("getCapacity", &Frame::getCapacity) + .def("getCodecId", &Frame::getCodecId) + .def("getCodecName", &Frame::getCodecName) + .def("getTrackType", &Frame::getTrackType) + .def("getTrackTypeStr", &Frame::getTrackTypeStr) + .def("setIndex", &Frame::setIndex) + .def("getIndex", &Frame::getIndex) + .def("dts", &Frame::dts) + .def("pts", &Frame::pts) + .def("prefixSize", &Frame::prefixSize) + .def("keyFrame", &Frame::keyFrame) + .def("configFrame", &Frame::configFrame) + .def("cacheAble", &Frame::cacheAble) + .def("dropAble", &Frame::dropAble) + .def("decodeAble", &Frame::decodeAble); } namespace mediakit { @@ -389,6 +500,16 @@ PythonInvoker::PythonInvoker() { _on_reload_config(); } }); + + NoticeCenter::Instance().addListener(this, Broadcast::kBroadcastCreateMuxer, [this](BroadcastCreateMuxerArgs) { + py::gil_scoped_acquire guard; + if (_on_create_muxer) { + auto py_muxer = _on_create_muxer(sender); + if (py_muxer && !py_muxer.is_none()) { + delegate = std::make_shared(std::move(py_muxer)); + } + } + }); } PythonInvoker::~PythonInvoker() { @@ -414,6 +535,7 @@ PythonInvoker::~PythonInvoker() { _on_send_rtp_stopped = py::function(); _on_http_access = py::function(); _on_rtp_server_timeout = py::function(); + _on_create_muxer = py::function(); _module = py::module(); } delete _rel; @@ -445,6 +567,7 @@ void PythonInvoker::load(const std::string &module_name) { GET_FUNC(_module, on_send_rtp_stopped); GET_FUNC(_module, on_http_access); GET_FUNC(_module, on_rtp_server_timeout); + GET_FUNC(_module, on_create_muxer); if (hasattr(_module, "on_start")) { py::object on_start = _module.attr("on_start"); diff --git a/server/pyinvoker.h b/server/pyinvoker.h index b9b8c721..3d9a55e6 100644 --- a/server/pyinvoker.h +++ b/server/pyinvoker.h @@ -83,6 +83,8 @@ private: py::function _on_http_access; // rtp服务收流超时事件 py::function _on_rtp_server_timeout; + // 创建Python muxer对象 + py::function _on_create_muxer; }; diff --git a/src/Common/MultiMediaSourceMuxer.cpp b/src/Common/MultiMediaSourceMuxer.cpp index c05f33ac..52a6705d 100644 --- a/src/Common/MultiMediaSourceMuxer.cpp +++ b/src/Common/MultiMediaSourceMuxer.cpp @@ -243,6 +243,8 @@ MultiMediaSourceMuxer::MultiMediaSourceMuxer(const MediaTuple& tuple, float dur_ // Audio related settings enableAudio(option.enable_audio); enableMuteAudio(option.add_mute_audio); + + NOTICE_EMIT(BroadcastCreateMuxerArgs, Broadcast::kBroadcastCreateMuxer, _delegate, *this); } void MultiMediaSourceMuxer::setMediaListener(const std::weak_ptr &listener) { @@ -705,6 +707,9 @@ bool MultiMediaSourceMuxer::onTrackReady(const Track::Ptr &track) { if (_mp4) { ret = _mp4->addTrack(track) ? true : ret; } + if (_delegate) { + _delegate->addTrack(track); + } return ret; } @@ -764,6 +769,9 @@ void MultiMediaSourceMuxer::onAllTrackReady() { pr.second.syncTo(*first); } } + if (_delegate) { + _delegate->addTrackCompleted(); + } InfoL << "stream: " << shortUrl() << " , codec info: " << getTrackInfoStr(this); } @@ -847,6 +855,9 @@ bool MultiMediaSourceMuxer::onTrackFrame_l(const Frame::Ptr &frame_in) { if (_fmp4) { ret = _fmp4->inputFrame(frame) ? true : ret; } + if (_delegate) { + _delegate->inputFrame(frame); + } if (_ring) { // 此场景由于直接转发,可能存在切换线程引起的数据被缓存在管道,所以需要CacheAbleFrame [AUTO-TRANSLATED:528afbb7] // In this scenario, due to direct forwarding, there may be data cached in the pipeline due to thread switching, so CacheAbleFrame is needed diff --git a/src/Common/MultiMediaSourceMuxer.h b/src/Common/MultiMediaSourceMuxer.h index bcd73697..c65b9329 100644 --- a/src/Common/MultiMediaSourceMuxer.h +++ b/src/Common/MultiMediaSourceMuxer.h @@ -29,6 +29,7 @@ class MultiMediaSourceMuxer : public MediaSourceEventInterceptor, public MediaSi public: using Ptr = std::shared_ptr; using RingType = toolkit::RingBuffer; + using onCreateMuxer = std::function; class Listener { public: @@ -249,6 +250,8 @@ private: toolkit::EventPoller::Ptr _poller; RingType::Ptr _ring; + MediaSinkInterface::Ptr _delegate; + // 对象个数统计 [AUTO-TRANSLATED:3b43e8c2] // Object count statistics toolkit::ObjectStatistic _statistic; diff --git a/src/Common/config.cpp b/src/Common/config.cpp index a3714e7b..410d4ec2 100644 --- a/src/Common/config.cpp +++ b/src/Common/config.cpp @@ -82,6 +82,7 @@ const string kBroadcastRtcSctpSend = "kBroadcastRtcSctpSend"; const string kBroadcastRtcSctpReceived = "kBroadcastRtcSctpReceived"; const string kBroadcastPlayerCountChanged = "kBroadcastPlayerCountChanged"; const string kBroadcastPlayerProxyFailed = "kBroadcastPlayerProxyFailed"; +const string kBroadcastCreateMuxer = "kBroadcastCreateMuxer"; } // namespace Broadcast diff --git a/src/Common/config.h b/src/Common/config.h index 1ee89484..646628fa 100644 --- a/src/Common/config.h +++ b/src/Common/config.h @@ -164,6 +164,9 @@ extern const std::string kBroadcastPlayerCountChanged; extern const std::string kBroadcastPlayerProxyFailed; #define BroadcastPlayerProxyFailedArgs const PlayerProxy& sender, const toolkit::SockException &ex +extern const std::string kBroadcastCreateMuxer; +#define BroadcastCreateMuxerArgs MediaSinkInterface::Ptr &delegate, const MultiMediaSourceMuxer &sender + #define ReloadConfigTag ((void *)(0xFF)) #define RELOAD_KEY(arg, key) \ do { \