新增on_stream_none_reader,on_send_rtp_stopped,on_http_access,on_rtp_server_timeout python事件

This commit is contained in:
xia-chu
2026-02-09 20:02:36 +08:00
parent def8bb53f2
commit 1666bb973a
5 changed files with 153 additions and 6 deletions

View File

@@ -85,17 +85,17 @@ def on_flow_report(args: dict, totalBytes: int, totalDuration: int, isPlayer: bo
# 返回True代表此事件被python拦截
return True
def on_media_changed(is_register: bool, sender) -> bool:
def on_media_changed(is_register: bool, sender: mk_loader.MediaSource) -> bool:
mk_logger.log_info(f"is_register: {is_register}, sender: {sender.getUrl()}")
# 该事件在c++中也处理下
return False
def on_player_proxy_failed(url, media_tuple, ex) -> bool:
def on_player_proxy_failed(url: str, media_tuple: mk_loader.MediaTuple , ex: mk_loader.SockException) -> bool:
mk_logger.log_info(f"on_player_proxy_failed: {url}, {media_tuple.shortUrl()}, {ex.what()}")
# 该事件在c++中也处理下
return False
def on_get_rtsp_realm(args: dict, invoker, sender) -> bool:
def on_get_rtsp_realm(args: dict, invoker, sender: dict) -> bool:
mk_logger.log_info(f"on_get_rtsp_realm, args: {args}, sender: {sender}")
mk_loader.rtsp_get_realm_invoker_do(invoker, "zlmediakit")
# 返回True代表此事件被python拦截
@@ -123,5 +123,29 @@ def on_record_ts(info: dict) -> bool:
# 返回True代表此事件被python拦截
return True
def on_stream_none_reader(sender: mk_loader.MediaSource) -> bool:
mk_logger.log_info(f"on_stream_none_reader: {sender.getUrl()}")
# 无人观看自动关闭
sender.close(False)
# 返回True代表此事件被python拦截
return True
def on_send_rtp_stopped(sender: mk_loader.MultiMediaSourceMuxer, ssrc: str, ex: mk_loader.SockException) -> bool:
mk_logger.log_info(f"on_send_rtp_stopped, ssrc: {ssrc}, ex: {ex.what()}, url: {sender.getMediaTuple().getUrl()}")
# 返回True代表此事件被python拦截
return True
def on_http_access(parser: mk_loader.Parser, path: str, is_dir: bool, invoker, sender: dict) -> bool:
mk_logger.log_info(f"on_http_access, path: {path}, is_dir: {is_dir}, sender: {sender}, http header: {parser.getHeader()}")
# 允许访问该文件/目录1小时, cookie有效期内访问该目录下的文件或路径不再触发该事件
mk_loader.http_access_invoker_do(invoker, "", path, 60 * 60)
# 返回True代表此事件被python拦截
return True
def on_rtp_server_timeout(local_port: int, tuple: mk_loader.MediaTuple, tcp_mode: int, re_use_port: bool, ssrc: int) -> bool:
mk_logger.log_info(f"on_rtp_server_timeout, local_port: {local_port}, tuple: {tuple.shortUrl()}, tcp_mode: {tcp_mode}, re_use_port: {re_use_port}, ssrc: {ssrc}")
# 返回True代表此事件被python拦截
return True
def on_reload_config():
mk_logger.log_info(f"on_reload_config")

View File

@@ -695,6 +695,12 @@ void installWebHook() {
return;
}
#if defined(ENABLE_PYTHON)
if (PythonInvoker::Instance().on_stream_none_reader(sender)) {
return;
}
#endif
GET_CONFIG(string, hook_stream_none_reader, Hook::kOnStreamNoneReader);
if (!hook_enable || hook_stream_none_reader.empty()) {
return;
@@ -722,6 +728,11 @@ void installWebHook() {
});
NoticeCenter::Instance().addListener(&web_hook_tag, Broadcast::kBroadcastSendRtpStopped, [](BroadcastSendRtpStoppedArgs) {
#if defined(ENABLE_PYTHON)
if (PythonInvoker::Instance().on_send_rtp_stopped(sender, ssrc, ex)) {
return;
}
#endif
GET_CONFIG(string, hook_send_rtp_stopped, Hook::kOnSendRtpStopped);
if (!hook_enable || hook_send_rtp_stopped.empty()) {
return;
@@ -771,6 +782,11 @@ void installWebHook() {
// 追踪用户的目的是为了缓存上次鉴权结果,减少鉴权次数,提高性能 [AUTO-TRANSLATED:22827145]
// The purpose of tracking users is to cache the last authentication result, reduce the number of authentication times, and improve performance
NoticeCenter::Instance().addListener(&web_hook_tag, Broadcast::kBroadcastHttpAccess, [](BroadcastHttpAccessArgs) {
#if defined(ENABLE_PYTHON)
if (PythonInvoker::Instance().on_http_access(parser, path, is_dir, invoker, sender)) {
return;
}
#endif
GET_CONFIG(string, hook_http_access, Hook::kOnHttpAccess);
if (!hook_enable || hook_http_access.empty()) {
// 未开启http文件访问鉴权那么允许访问但是每次访问都要鉴权 [AUTO-TRANSLATED:deb3a0ae]
@@ -815,6 +831,11 @@ void installWebHook() {
});
NoticeCenter::Instance().addListener(&web_hook_tag, Broadcast::kBroadcastRtpServerTimeout, [](BroadcastRtpServerTimeoutArgs) {
#if defined(ENABLE_PYTHON)
if (PythonInvoker::Instance().on_rtp_server_timeout(local_port, tuple, tcp_mode, re_use_port, ssrc)) {
return;
}
#endif
GET_CONFIG(string, rtp_server_timeout, Hook::kOnRtpServerTimeout);
if (!hook_enable || rtp_server_timeout.empty()) {
return;

View File

@@ -128,7 +128,8 @@ void handle_http_request(const py::object &check_route, const py::object &submit
auto args = getAllArgs(parser);
auto allArgs = ArgsMap(parser, args);
GET_CONFIG(std::string, api_secret, API::kSecret);
CHECK_SECRET(); // 检测secret
// TODO python http api暂不开启secret鉴权
// CHECK_SECRET(); // 检测secret
} catch (std::exception &ex) {
Json::Value val;
val["code"] = API::Exception;
@@ -236,6 +237,13 @@ PYBIND11_EMBEDDED_MODULE(mk_loader, m) {
invoker();
});
m.def("http_access_invoker_do", [](const py::capsule &cap, const std::string &errMsg,const std::string &accessPath, int cookieLifeSecond) {
// 执行c++代码时释放gil锁
py::gil_scoped_release release;
auto &invoker = to_native<HttpSession::HttpAccessPathInvoker>(cap);
invoker(errMsg, accessPath, cookieLifeSecond);
});
m.def("set_fastapi", [](const py::object &check_route, const py::object &submit_coro) {
static void *fastapi_tag = nullptr;
NoticeCenter::Instance().delListener(&fastapi_tag, Broadcast::kBroadcastHttpRequest);
@@ -274,7 +282,8 @@ PYBIND11_EMBEDDED_MODULE(mk_loader, m) {
.def("setupRecord", &MediaSource::setupRecord)
.def("isRecording", &MediaSource::isRecording)
.def("stopSendRtp", &MediaSource::stopSendRtp)
.def("getLossRate", &MediaSource::getLossRate);
.def("getLossRate", &MediaSource::getLossRate)
.def("getMuxer", &MediaSource::getMuxer);
py::class_<MediaTuple, std::shared_ptr<MediaTuple>>(m, "MediaTuple")
.def_readwrite("vhost", &MediaTuple::vhost)
@@ -284,6 +293,46 @@ PYBIND11_EMBEDDED_MODULE(mk_loader, m) {
.def("shortUrl", &MediaTuple::shortUrl);
py::class_<SockException, std::shared_ptr<SockException>>(m, "SockException").def("what", &SockException::what).def("code", &SockException::getErrCode);
py::class_<Parser, std::shared_ptr<Parser>>(m, "Parser")
.def("method", &Parser::method)
.def("url", &Parser::url)
.def("status", &Parser::status)
.def("fullUrl", &Parser::fullUrl)
.def("protocol", &Parser::protocol)
.def("statusStr", &Parser::statusStr)
.def("content", &Parser::content)
.def("params", &Parser::params)
.def("getHeader", [](Parser *thiz) {
py::dict ret;
for (auto &pr : thiz->getHeader()) {
ret[pr.first.data()] = pr.second;
}
return ret;
});
py::enum_<Recorder::type>(m, "RecordType")
.value("hls", Recorder::type_hls)
.value("mp4", Recorder::type_mp4)
.value("hls_fmp4", Recorder::type_hls_fmp4)
.value("fmp4", Recorder::type_fmp4)
.value("ts", Recorder::type_ts)
.export_values();
#define OPT(key) .def_readwrite(#key, &ProtocolOption::key)
py::class_<ProtocolOption, std::shared_ptr<ProtocolOption>>(m, "ProtocolOption") OPT_VALUE(OPT);
#undef OPT
py::class_<MultiMediaSourceMuxer, std::shared_ptr<MultiMediaSourceMuxer>>(m, "MultiMediaSourceMuxer")
.def("totalReaderCount", static_cast<int (MultiMediaSourceMuxer::*)() const>(&MultiMediaSourceMuxer::totalReaderCount))
.def("isEnabled", &MultiMediaSourceMuxer::isEnabled)
.def("setupRecord", &MultiMediaSourceMuxer::setupRecord)
.def("startRecord", &MultiMediaSourceMuxer::startRecord)
.def("isRecording", &MultiMediaSourceMuxer::isRecording)
.def("startSendRtp", &MultiMediaSourceMuxer::startSendRtp)
.def("stopSendRtp", &MultiMediaSourceMuxer::stopSendRtp)
.def("getOption", &MultiMediaSourceMuxer::getOption)
.def("getMediaTuple", &MultiMediaSourceMuxer::getMediaTuple);
}
namespace mediakit {
@@ -361,6 +410,10 @@ PythonInvoker::~PythonInvoker() {
_on_stream_not_found = py::function();
_on_record_mp4 = py::function();
_on_record_ts = py::function();
_on_stream_none_reader = py::function();
_on_send_rtp_stopped = py::function();
_on_http_access = py::function();
_on_rtp_server_timeout = py::function();
_module = py::module();
}
delete _rel;
@@ -388,6 +441,10 @@ void PythonInvoker::load(const std::string &module_name) {
GET_FUNC(_module, on_stream_not_found);
GET_FUNC(_module, on_record_mp4);
GET_FUNC(_module, on_record_ts);
GET_FUNC(_module, on_stream_none_reader);
GET_FUNC(_module, on_send_rtp_stopped);
GET_FUNC(_module, on_http_access);
GET_FUNC(_module, on_rtp_server_timeout);
if (hasattr(_module, "on_start")) {
py::object on_start = _module.attr("on_start");
@@ -480,6 +537,38 @@ bool PythonInvoker::on_record_ts(BroadcastRecordTsArgs) const {
return _on_record_ts(to_python(info)).cast<bool>();
}
bool PythonInvoker::on_stream_none_reader(BroadcastStreamNoneReaderArgs) const {
py::gil_scoped_acquire gil; // 确保在 Python 调用期间持有 GIL
if (!_on_stream_none_reader) {
return false;
}
return _on_stream_none_reader(to_python_ref(sender)).cast<bool>();
}
bool PythonInvoker::on_send_rtp_stopped(BroadcastSendRtpStoppedArgs) const {
py::gil_scoped_acquire gil; // 确保在 Python 调用期间持有 GIL
if (!_on_send_rtp_stopped) {
return false;
}
return _on_send_rtp_stopped(to_python_ref(sender), ssrc, to_python_ref(ex)).cast<bool>();
}
bool PythonInvoker::on_http_access(BroadcastHttpAccessArgs) const {
py::gil_scoped_acquire gil; // 确保在 Python 调用期间持有 GIL
if (!_on_http_access) {
return false;
}
return _on_http_access(to_python_ref(parser), path, is_dir, to_python(invoker), to_python(sender)).cast<bool>();
}
bool PythonInvoker::on_rtp_server_timeout(BroadcastRtpServerTimeoutArgs) const {
py::gil_scoped_acquire gil; // 确保在 Python 调用期间持有 GIL
if (!_on_rtp_server_timeout) {
return false;
}
return _on_rtp_server_timeout(local_port, to_python_ref(tuple), tcp_mode, re_use_port, ssrc).cast<bool>();
}
} // namespace mediakit
#endif

View File

@@ -13,6 +13,7 @@
#include "Common/MediaSource.h"
#include "Player/PlayerProxy.h"
#include "Rtsp/RtspSession.h"
#include "Http/HttpSession.h"
namespace py = pybind11;
@@ -36,6 +37,10 @@ public:
bool on_stream_not_found(BroadcastNotFoundStreamArgs) const;
bool on_record_mp4(BroadcastRecordMP4Args) const;
bool on_record_ts(BroadcastRecordTsArgs) const;
bool on_stream_none_reader(BroadcastStreamNoneReaderArgs) const;
bool on_send_rtp_stopped(BroadcastSendRtpStoppedArgs) const;
bool on_http_access(BroadcastHttpAccessArgs) const;
bool on_rtp_server_timeout(BroadcastRtpServerTimeoutArgs) const;
private:
PythonInvoker();
@@ -70,6 +75,14 @@ private:
py::function _on_record_mp4;
// 生成hls ts/fmp4切片文件回调
py::function _on_record_ts;
// 流无人观看事件
py::function _on_stream_none_reader;
// rtp转发失败事件
py::function _on_send_rtp_stopped;
// http访问鉴权事件
py::function _on_http_access;
// rtp服务收流超时事件
py::function _on_rtp_server_timeout;
};

View File

@@ -126,7 +126,7 @@ extern const std::string kBroadcastStreamNoneReader;
// rtp推流被动停止时触发 [AUTO-TRANSLATED:43881965]
// Triggered when rtp push stream is passively stopped.
extern const std::string kBroadcastSendRtpStopped;
#define BroadcastSendRtpStoppedArgs MultiMediaSourceMuxer &sender, const std::string &ssrc, const SockException &ex
#define BroadcastSendRtpStoppedArgs MultiMediaSourceMuxer &sender, const std::string &ssrc, const toolkit::SockException &ex
// 更新配置文件事件广播,执行loadIniConfig函数加载配置文件成功后会触发该广播 [AUTO-TRANSLATED:ad4e167d]
// Update configuration file event broadcast. This broadcast will be triggered after the loadIniConfig function loads the configuration file successfully.