diff --git a/python/mk_plugin.py b/python/mk_plugin.py index bd0d141d..0dc62592 100644 --- a/python/mk_plugin.py +++ b/python/mk_plugin.py @@ -85,17 +85,17 @@ def on_flow_report(args: dict, totalBytes: int, totalDuration: int, isPlayer: bo # 返回True代表此事件被python拦截 return True -def on_media_changed(is_register: bool, sender) -> bool: +def on_media_changed(is_register: bool, sender: mk_loader.MediaSource) -> bool: mk_logger.log_info(f"is_register: {is_register}, sender: {sender.getUrl()}") # 该事件在c++中也处理下 return False -def on_player_proxy_failed(url, media_tuple, ex) -> bool: +def on_player_proxy_failed(url: str, media_tuple: mk_loader.MediaTuple , ex: mk_loader.SockException) -> bool: mk_logger.log_info(f"on_player_proxy_failed: {url}, {media_tuple.shortUrl()}, {ex.what()}") # 该事件在c++中也处理下 return False -def on_get_rtsp_realm(args: dict, invoker, sender) -> bool: +def on_get_rtsp_realm(args: dict, invoker, sender: dict) -> bool: mk_logger.log_info(f"on_get_rtsp_realm, args: {args}, sender: {sender}") mk_loader.rtsp_get_realm_invoker_do(invoker, "zlmediakit") # 返回True代表此事件被python拦截 @@ -123,5 +123,29 @@ def on_record_ts(info: dict) -> bool: # 返回True代表此事件被python拦截 return True +def on_stream_none_reader(sender: mk_loader.MediaSource) -> bool: + mk_logger.log_info(f"on_stream_none_reader: {sender.getUrl()}") + # 无人观看自动关闭 + sender.close(False) + # 返回True代表此事件被python拦截 + return True + +def on_send_rtp_stopped(sender: mk_loader.MultiMediaSourceMuxer, ssrc: str, ex: mk_loader.SockException) -> bool: + mk_logger.log_info(f"on_send_rtp_stopped, ssrc: {ssrc}, ex: {ex.what()}, url: {sender.getMediaTuple().getUrl()}") + # 返回True代表此事件被python拦截 + return True + +def on_http_access(parser: mk_loader.Parser, path: str, is_dir: bool, invoker, sender: dict) -> bool: + mk_logger.log_info(f"on_http_access, path: {path}, is_dir: {is_dir}, sender: {sender}, http header: {parser.getHeader()}") + # 允许访问该文件/目录1小时, cookie有效期内,访问该目录下的文件或路径不再触发该事件 + mk_loader.http_access_invoker_do(invoker, "", path, 60 * 60) + # 返回True代表此事件被python拦截 + return True + +def on_rtp_server_timeout(local_port: int, tuple: mk_loader.MediaTuple, tcp_mode: int, re_use_port: bool, ssrc: int) -> bool: + mk_logger.log_info(f"on_rtp_server_timeout, local_port: {local_port}, tuple: {tuple.shortUrl()}, tcp_mode: {tcp_mode}, re_use_port: {re_use_port}, ssrc: {ssrc}") + # 返回True代表此事件被python拦截 + return True + def on_reload_config(): mk_logger.log_info(f"on_reload_config") \ No newline at end of file diff --git a/server/WebHook.cpp b/server/WebHook.cpp index 973e0ef3..7e6c19eb 100755 --- a/server/WebHook.cpp +++ b/server/WebHook.cpp @@ -695,6 +695,12 @@ void installWebHook() { return; } +#if defined(ENABLE_PYTHON) + if (PythonInvoker::Instance().on_stream_none_reader(sender)) { + return; + } +#endif + GET_CONFIG(string, hook_stream_none_reader, Hook::kOnStreamNoneReader); if (!hook_enable || hook_stream_none_reader.empty()) { return; @@ -722,6 +728,11 @@ void installWebHook() { }); NoticeCenter::Instance().addListener(&web_hook_tag, Broadcast::kBroadcastSendRtpStopped, [](BroadcastSendRtpStoppedArgs) { +#if defined(ENABLE_PYTHON) + if (PythonInvoker::Instance().on_send_rtp_stopped(sender, ssrc, ex)) { + return; + } +#endif GET_CONFIG(string, hook_send_rtp_stopped, Hook::kOnSendRtpStopped); if (!hook_enable || hook_send_rtp_stopped.empty()) { return; @@ -771,6 +782,11 @@ void installWebHook() { // 追踪用户的目的是为了缓存上次鉴权结果,减少鉴权次数,提高性能 [AUTO-TRANSLATED:22827145] // The purpose of tracking users is to cache the last authentication result, reduce the number of authentication times, and improve performance NoticeCenter::Instance().addListener(&web_hook_tag, Broadcast::kBroadcastHttpAccess, [](BroadcastHttpAccessArgs) { +#if defined(ENABLE_PYTHON) + if (PythonInvoker::Instance().on_http_access(parser, path, is_dir, invoker, sender)) { + return; + } +#endif GET_CONFIG(string, hook_http_access, Hook::kOnHttpAccess); if (!hook_enable || hook_http_access.empty()) { // 未开启http文件访问鉴权,那么允许访问,但是每次访问都要鉴权; [AUTO-TRANSLATED:deb3a0ae] @@ -815,6 +831,11 @@ void installWebHook() { }); NoticeCenter::Instance().addListener(&web_hook_tag, Broadcast::kBroadcastRtpServerTimeout, [](BroadcastRtpServerTimeoutArgs) { +#if defined(ENABLE_PYTHON) + if (PythonInvoker::Instance().on_rtp_server_timeout(local_port, tuple, tcp_mode, re_use_port, ssrc)) { + return; + } +#endif GET_CONFIG(string, rtp_server_timeout, Hook::kOnRtpServerTimeout); if (!hook_enable || rtp_server_timeout.empty()) { return; diff --git a/server/pyinvoker.cpp b/server/pyinvoker.cpp index 047e350a..4047fb19 100644 --- a/server/pyinvoker.cpp +++ b/server/pyinvoker.cpp @@ -128,7 +128,8 @@ void handle_http_request(const py::object &check_route, const py::object &submit auto args = getAllArgs(parser); auto allArgs = ArgsMap(parser, args); GET_CONFIG(std::string, api_secret, API::kSecret); - CHECK_SECRET(); // 检测secret + // TODO python http api暂不开启secret鉴权 + // CHECK_SECRET(); // 检测secret } catch (std::exception &ex) { Json::Value val; val["code"] = API::Exception; @@ -236,6 +237,13 @@ PYBIND11_EMBEDDED_MODULE(mk_loader, m) { invoker(); }); + m.def("http_access_invoker_do", [](const py::capsule &cap, const std::string &errMsg,const std::string &accessPath, int cookieLifeSecond) { + // 执行c++代码时释放gil锁 + py::gil_scoped_release release; + auto &invoker = to_native(cap); + invoker(errMsg, accessPath, cookieLifeSecond); + }); + m.def("set_fastapi", [](const py::object &check_route, const py::object &submit_coro) { static void *fastapi_tag = nullptr; NoticeCenter::Instance().delListener(&fastapi_tag, Broadcast::kBroadcastHttpRequest); @@ -274,7 +282,8 @@ PYBIND11_EMBEDDED_MODULE(mk_loader, m) { .def("setupRecord", &MediaSource::setupRecord) .def("isRecording", &MediaSource::isRecording) .def("stopSendRtp", &MediaSource::stopSendRtp) - .def("getLossRate", &MediaSource::getLossRate); + .def("getLossRate", &MediaSource::getLossRate) + .def("getMuxer", &MediaSource::getMuxer); py::class_>(m, "MediaTuple") .def_readwrite("vhost", &MediaTuple::vhost) @@ -284,6 +293,46 @@ PYBIND11_EMBEDDED_MODULE(mk_loader, m) { .def("shortUrl", &MediaTuple::shortUrl); py::class_>(m, "SockException").def("what", &SockException::what).def("code", &SockException::getErrCode); + + py::class_>(m, "Parser") + .def("method", &Parser::method) + .def("url", &Parser::url) + .def("status", &Parser::status) + .def("fullUrl", &Parser::fullUrl) + .def("protocol", &Parser::protocol) + .def("statusStr", &Parser::statusStr) + .def("content", &Parser::content) + .def("params", &Parser::params) + .def("getHeader", [](Parser *thiz) { + py::dict ret; + for (auto &pr : thiz->getHeader()) { + ret[pr.first.data()] = pr.second; + } + return ret; + }); + + py::enum_(m, "RecordType") + .value("hls", Recorder::type_hls) + .value("mp4", Recorder::type_mp4) + .value("hls_fmp4", Recorder::type_hls_fmp4) + .value("fmp4", Recorder::type_fmp4) + .value("ts", Recorder::type_ts) + .export_values(); + +#define OPT(key) .def_readwrite(#key, &ProtocolOption::key) + py::class_>(m, "ProtocolOption") OPT_VALUE(OPT); +#undef OPT + + py::class_>(m, "MultiMediaSourceMuxer") + .def("totalReaderCount", static_cast(&MultiMediaSourceMuxer::totalReaderCount)) + .def("isEnabled", &MultiMediaSourceMuxer::isEnabled) + .def("setupRecord", &MultiMediaSourceMuxer::setupRecord) + .def("startRecord", &MultiMediaSourceMuxer::startRecord) + .def("isRecording", &MultiMediaSourceMuxer::isRecording) + .def("startSendRtp", &MultiMediaSourceMuxer::startSendRtp) + .def("stopSendRtp", &MultiMediaSourceMuxer::stopSendRtp) + .def("getOption", &MultiMediaSourceMuxer::getOption) + .def("getMediaTuple", &MultiMediaSourceMuxer::getMediaTuple); } namespace mediakit { @@ -361,6 +410,10 @@ PythonInvoker::~PythonInvoker() { _on_stream_not_found = py::function(); _on_record_mp4 = py::function(); _on_record_ts = py::function(); + _on_stream_none_reader = py::function(); + _on_send_rtp_stopped = py::function(); + _on_http_access = py::function(); + _on_rtp_server_timeout = py::function(); _module = py::module(); } delete _rel; @@ -388,6 +441,10 @@ void PythonInvoker::load(const std::string &module_name) { GET_FUNC(_module, on_stream_not_found); GET_FUNC(_module, on_record_mp4); GET_FUNC(_module, on_record_ts); + GET_FUNC(_module, on_stream_none_reader); + GET_FUNC(_module, on_send_rtp_stopped); + GET_FUNC(_module, on_http_access); + GET_FUNC(_module, on_rtp_server_timeout); if (hasattr(_module, "on_start")) { py::object on_start = _module.attr("on_start"); @@ -480,6 +537,38 @@ bool PythonInvoker::on_record_ts(BroadcastRecordTsArgs) const { return _on_record_ts(to_python(info)).cast(); } +bool PythonInvoker::on_stream_none_reader(BroadcastStreamNoneReaderArgs) const { + py::gil_scoped_acquire gil; // 确保在 Python 调用期间持有 GIL + if (!_on_stream_none_reader) { + return false; + } + return _on_stream_none_reader(to_python_ref(sender)).cast(); +} + +bool PythonInvoker::on_send_rtp_stopped(BroadcastSendRtpStoppedArgs) const { + py::gil_scoped_acquire gil; // 确保在 Python 调用期间持有 GIL + if (!_on_send_rtp_stopped) { + return false; + } + return _on_send_rtp_stopped(to_python_ref(sender), ssrc, to_python_ref(ex)).cast(); +} + +bool PythonInvoker::on_http_access(BroadcastHttpAccessArgs) const { + py::gil_scoped_acquire gil; // 确保在 Python 调用期间持有 GIL + if (!_on_http_access) { + return false; + } + return _on_http_access(to_python_ref(parser), path, is_dir, to_python(invoker), to_python(sender)).cast(); +} + +bool PythonInvoker::on_rtp_server_timeout(BroadcastRtpServerTimeoutArgs) const { + py::gil_scoped_acquire gil; // 确保在 Python 调用期间持有 GIL + if (!_on_rtp_server_timeout) { + return false; + } + return _on_rtp_server_timeout(local_port, to_python_ref(tuple), tcp_mode, re_use_port, ssrc).cast(); +} + } // namespace mediakit #endif diff --git a/server/pyinvoker.h b/server/pyinvoker.h index d39a664c..b9b8c721 100644 --- a/server/pyinvoker.h +++ b/server/pyinvoker.h @@ -13,6 +13,7 @@ #include "Common/MediaSource.h" #include "Player/PlayerProxy.h" #include "Rtsp/RtspSession.h" +#include "Http/HttpSession.h" namespace py = pybind11; @@ -36,6 +37,10 @@ public: bool on_stream_not_found(BroadcastNotFoundStreamArgs) const; bool on_record_mp4(BroadcastRecordMP4Args) const; bool on_record_ts(BroadcastRecordTsArgs) const; + bool on_stream_none_reader(BroadcastStreamNoneReaderArgs) const; + bool on_send_rtp_stopped(BroadcastSendRtpStoppedArgs) const; + bool on_http_access(BroadcastHttpAccessArgs) const; + bool on_rtp_server_timeout(BroadcastRtpServerTimeoutArgs) const; private: PythonInvoker(); @@ -70,6 +75,14 @@ private: py::function _on_record_mp4; // 生成hls ts/fmp4切片文件回调 py::function _on_record_ts; + // 流无人观看事件 + py::function _on_stream_none_reader; + // rtp转发失败事件 + py::function _on_send_rtp_stopped; + // http访问鉴权事件 + py::function _on_http_access; + // rtp服务收流超时事件 + py::function _on_rtp_server_timeout; }; diff --git a/src/Common/config.h b/src/Common/config.h index 9f75cfe0..1ee89484 100644 --- a/src/Common/config.h +++ b/src/Common/config.h @@ -126,7 +126,7 @@ extern const std::string kBroadcastStreamNoneReader; // rtp推流被动停止时触发 [AUTO-TRANSLATED:43881965] // Triggered when rtp push stream is passively stopped. extern const std::string kBroadcastSendRtpStopped; -#define BroadcastSendRtpStoppedArgs MultiMediaSourceMuxer &sender, const std::string &ssrc, const SockException &ex +#define BroadcastSendRtpStoppedArgs MultiMediaSourceMuxer &sender, const std::string &ssrc, const toolkit::SockException &ex // 更新配置文件事件广播,执行loadIniConfig函数加载配置文件成功后会触发该广播 [AUTO-TRANSLATED:ad4e167d] // Update configuration file event broadcast. This broadcast will be triggered after the loadIniConfig function loads the configuration file successfully.