diff --git a/src/Common/MediaSource.cpp b/src/Common/MediaSource.cpp index 9c4b0f40..d28aadbb 100644 --- a/src/Common/MediaSource.cpp +++ b/src/Common/MediaSource.cpp @@ -82,10 +82,10 @@ bool MediaSource::regist() { InfoL << m_strSchema << " " << m_strVhost << " " << m_strApp << " " << m_strId; NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaChanged, true, - m_strSchema.data(), - m_strVhost.data(), - m_strApp.data(), - m_strId.data()); + m_strSchema, + m_strVhost, + m_strApp, + m_strId); } return success; } @@ -111,10 +111,10 @@ void MediaSource::unregisted(){ InfoL << "" << m_strSchema << " " << m_strVhost << " " << m_strApp << " " << m_strId; NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaChanged, false, - m_strSchema.data(), - m_strVhost.data(), - m_strApp.data(), - m_strId.data()); + m_strSchema, + m_strVhost, + m_strApp, + m_strId); } void MediaInfo::parse(const string &url){ diff --git a/src/Common/config.cpp b/src/Common/config.cpp index b43a9e86..edfec1d6 100644 --- a/src/Common/config.cpp +++ b/src/Common/config.cpp @@ -54,6 +54,11 @@ const char kBroadcastOnGetRtspRealm[] = "kBroadcastOnGetRtspRealm"; const char kBroadcastOnRtspAuth[] = "kBroadcastOnRtspAuth"; const char kBroadcastMediaPlayed[] = "kBroadcastMediaPlayed"; const char kBroadcastRtmpPublish[] = "kBroadcastRtmpPublish"; +const char kBroadcastFlowReport[] = "kBroadcastFlowReport"; +const char kFlowThreshold[] = "Broadcast.flowThreshold"; +onceToken token([](){ + mINI::Instance()[kFlowThreshold] = 1024; +},nullptr); } //namespace Broadcast //代理失败最大重试次数 diff --git a/src/Common/config.h b/src/Common/config.h index 74f734dd..2bfe5a8d 100644 --- a/src/Common/config.h +++ b/src/Common/config.h @@ -65,7 +65,7 @@ namespace Broadcast { //注册或反注册MediaSource事件广播 extern const char kBroadcastMediaChanged[]; -#define BroadcastMediaChangedArgs bool bRegist, const char *schema,const char *vhost,const char *app,const char *stream +#define BroadcastMediaChangedArgs const bool &bRegist, const string &schema,const string &vhost,const string &app,const string &stream //录制mp4文件成功后广播 extern const char kBroadcastRecordMP4[]; @@ -73,17 +73,16 @@ extern const char kBroadcastRecordMP4[]; //收到http api请求广播 extern const char kBroadcastHttpRequest[]; -#define BroadcastHttpRequestArgs const Parser &parser,HttpSession::HttpResponseInvoker &invoker,bool &consumed +#define BroadcastHttpRequestArgs const Parser &parser,const HttpSession::HttpResponseInvoker &invoker,bool &consumed //该流是否需要认证?是的话调用invoker并传入realm,否则传入空的realm.如果该事件不监听则不认证 extern const char kBroadcastOnGetRtspRealm[]; -#define BroadcastOnGetRtspRealmArgs const char *app,const char *stream,const RtspSession::onGetRealm &invoker +#define BroadcastOnGetRtspRealmArgs const string &app,const string &stream,const RtspSession::onGetRealm &invoker //请求认证用户密码事件,user_name为用户名,must_no_encrypt如果为true,则必须提供明文密码(因为此时是base64认证方式),否则会导致认证失败 //获取到密码后请调用invoker并输入对应类型的密码和密码类型,invoker执行时会匹配密码 extern const char kBroadcastOnRtspAuth[]; -#define BroadcastOnRtspAuthArgs const char *user_name,bool must_no_encrypt,const RtspSession::onAuth &invoker - +#define BroadcastOnRtspAuthArgs const string &user_name,const bool &must_no_encrypt,const RtspSession::onAuth &invoker //鉴权结果回调对象 typedef std::function AuthInvoker; @@ -92,10 +91,16 @@ typedef std::function AuthInvoker; extern const char kBroadcastRtmpPublish[]; #define BroadcastRtmpPublishArgs const MediaInfo &args,const Broadcast::AuthInvoker &invoker -//播放rtsp或rtmp事件广播,通过该事件控制播放鉴权 +//播放rtsp/rtmp/http-flv事件广播,通过该事件控制播放鉴权 extern const char kBroadcastMediaPlayed[]; #define BroadcastMediaPlayedArgs const MediaInfo &args,const Broadcast::AuthInvoker &invoker +//停止rtsp/rtmp/http-flv会话后流量汇报事件广播 +extern const char kBroadcastFlowReport[]; +#define BroadcastFlowReportArgs const MediaInfo &args,const uint64_t &totalBytes + +//流量汇报事件流量阈值,单位KB,默认1MB +extern const char kFlowThreshold[]; } //namespace Broadcast //代理失败最大重试次数 diff --git a/src/Http/HttpClient.cpp b/src/Http/HttpClient.cpp index 6664186d..115a830a 100644 --- a/src/Http/HttpClient.cpp +++ b/src/Http/HttpClient.cpp @@ -83,7 +83,7 @@ void HttpClient::sendRequest(const string &strUrl){ _isHttps = isHttps; if(!alive() || bChanged){ - InfoL << "reconnet:" << _lastHost; + //InfoL << "reconnet:" << _lastHost; startConnect(host, port); }else{ SockException ex; diff --git a/src/Http/HttpSession.cpp b/src/Http/HttpSession.cpp index 8e9b05e3..de0d7362 100644 --- a/src/Http/HttpSession.cpp +++ b/src/Http/HttpSession.cpp @@ -161,6 +161,10 @@ inline HttpSession::HttpCode HttpSession::parserHttpReq(const string &str) { } void HttpSession::onError(const SockException& err) { //WarnL << err.what(); + static uint64_t iFlowThreshold = mINI::Instance()[Broadcast::kFlowThreshold]; + if(m_previousTagSize > iFlowThreshold * 1024){ + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport,m_mediaInfo,m_previousTagSize); + } } void HttpSession::onManager() { @@ -185,10 +189,10 @@ inline bool HttpSession::checkLiveFlvStream(){ } //拼接成完整url auto fullUrl = string(HTTP_SCHEMA) + "://" + m_parser["Host"] + m_parser.FullUrl(); - MediaInfo info(fullUrl); - info.m_streamid.erase(info.m_streamid.size() - 4);//去除.flv后缀 + m_mediaInfo.parse(fullUrl); + m_mediaInfo.m_streamid.erase(m_mediaInfo.m_streamid.size() - 4);//去除.flv后缀 - auto mediaSrc = dynamic_pointer_cast(MediaSource::find(RTMP_SCHEMA,info.m_vhost,info.m_app,info.m_streamid)); + auto mediaSrc = dynamic_pointer_cast(MediaSource::find(RTMP_SCHEMA,m_mediaInfo.m_vhost,m_mediaInfo.m_app,m_mediaInfo.m_streamid)); if(!mediaSrc){ //该rtmp源不存在 sendNotFound(true); @@ -288,7 +292,7 @@ inline bool HttpSession::checkLiveFlvStream(){ onRes(authSuccess,err); }); }; - auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed,info,invoker); + auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed,m_mediaInfo,invoker); if(!flag){ //该事件无人监听,默认不鉴权 onRes(true,""); @@ -306,9 +310,9 @@ inline HttpSession::HttpCode HttpSession::Handle_Req_GET() { //事件未被拦截,则认为是http下载请求 auto fullUrl = string(HTTP_SCHEMA) + "://" + m_parser["Host"] + m_parser.FullUrl(); - MediaInfo info(fullUrl); + m_mediaInfo.parse(fullUrl); - string strFile = m_strPath + "/" + info.m_vhost + m_parser.Url(); + string strFile = m_strPath + "/" + m_mediaInfo.m_vhost + m_parser.Url(); /////////////HTTP连接是否需要被关闭//////////////// static uint32_t reqCnt = mINI::Instance()[Config::Http::kMaxReqCount].as(); bool bClose = (strcasecmp(m_parser["Connection"].data(),"close") == 0) && ( ++m_iReqCnt < reqCnt); @@ -317,7 +321,7 @@ inline HttpSession::HttpCode HttpSession::Handle_Req_GET() { if (strFile.back() == '/') { //生成文件夹菜单索引 string strMeun; - if (!makeMeun(strFile,info.m_vhost, strMeun)) { + if (!makeMeun(strFile,m_mediaInfo.m_vhost, strMeun)) { //文件夹不存在 sendNotFound(bClose); return eHttpCode; @@ -600,7 +604,7 @@ inline bool HttpSession::emitHttpEvent(bool doInvoke){ }; ///////////////////广播HTTP事件/////////////////////////// bool consumed = false;//该事件是否被消费 - NoticeCenter::Instance().emitEvent(Config::Broadcast::kBroadcastHttpRequest,m_parser,invoker,(bool &)consumed); + NoticeCenter::Instance().emitEvent(Config::Broadcast::kBroadcastHttpRequest,m_parser,invoker,consumed); if(!consumed && doInvoke){ //该事件无人消费,所以返回404 invoker("404 Not Found",KeyValue(),""); diff --git a/src/Http/HttpSession.h b/src/Http/HttpSession.h index 0b17607e..42e57efe 100644 --- a/src/Http/HttpSession.h +++ b/src/Http/HttpSession.h @@ -76,7 +76,9 @@ private: //flv over http uint32_t m_aui32FirstStamp[2] = {0}; uint32_t m_previousTagSize = 0; + MediaInfo m_mediaInfo; RingBuffer::RingReader::Ptr m_pRingReader; + void onSendMedia(const RtmpPacket::Ptr &pkt); void sendRtmp(const RtmpPacket::Ptr &pkt, uint32_t ui32TimeStamp); void sendRtmp(uint8_t ui8Type, const std::string& strBuf, uint32_t ui32TimeStamp); diff --git a/src/MediaFile/Mp4Maker.cpp b/src/MediaFile/Mp4Maker.cpp index 13b09b15..d7863150 100644 --- a/src/MediaFile/Mp4Maker.cpp +++ b/src/MediaFile/Mp4Maker.cpp @@ -219,7 +219,7 @@ void Mp4Maker::closeFile() { stat(m_strFile.data(), &fileData); m_info.ui64FileSize = fileData.st_size; //----record 业务逻辑----// - NoticeCenter::Instance().emitEvent(Config::Broadcast::kBroadcastRecordMP4,(const Mp4Info &)m_info); + NoticeCenter::Instance().emitEvent(Config::Broadcast::kBroadcastRecordMP4,m_info); } } diff --git a/src/Rtmp/RtmpSession.cpp b/src/Rtmp/RtmpSession.cpp index 230836a0..5e1ed8c4 100644 --- a/src/Rtmp/RtmpSession.cpp +++ b/src/Rtmp/RtmpSession.cpp @@ -52,9 +52,12 @@ RtmpSession::~RtmpSession() { void RtmpSession::onError(const SockException& err) { DebugL << err.what(); - if (m_pPublisherSrc) { - m_pPublisherSrc.reset(); - } + + //流量统计事件广播 + static uint64_t iFlowThreshold = mINI::Instance()[Broadcast::kFlowThreshold]; + if(m_ui64TotalBytes > iFlowThreshold * 1024){ + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport,m_mediaInfo,m_ui64TotalBytes); + } } void RtmpSession::onManager() { @@ -76,6 +79,7 @@ void RtmpSession::onManager() { void RtmpSession::onRecv(const Socket::Buffer::Ptr &pBuf) { m_ticker.resetTime(); try { + m_ui64TotalBytes += pBuf->size(); onParseRtmp(pBuf->data(), pBuf->size()); } catch (exception &e) { WarnL << e.what(); diff --git a/src/Rtmp/RtmpSession.h b/src/Rtmp/RtmpSession.h index 48980b74..4122e3cb 100644 --- a/src/Rtmp/RtmpSession.h +++ b/src/Rtmp/RtmpSession.h @@ -65,7 +65,8 @@ private: bool m_bPublisherSrcRegisted = false; std::weak_ptr m_pPlayerSrc; uint32_t m_aui32FirstStamp[2] = {0}; - + //消耗的总流量 + uint64_t m_ui64TotalBytes = 0; void onProcessCmd(AMFDecoder &dec); void onCmd_connect(AMFDecoder &dec); void onCmd_createStream(AMFDecoder &dec); @@ -82,10 +83,12 @@ private: void onSendMedia(const RtmpPacket::Ptr &pkt); void onSendRawData(const char *pcRawData,int iSize) override{ + m_ui64TotalBytes += iSize; send(pcRawData, iSize); } void onSendRawData(const Socket::Buffer::Ptr &buffer,int flags) override{ - sock->send(buffer,flags); + m_ui64TotalBytes += buffer->size(); + sock->send(buffer,flags); } void onRtmpChunk(RtmpPacket &chunkData) override; diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index a04bb8c7..28853e78 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -125,6 +125,12 @@ void RtspSession::onError(const SockException& err) { g_mapPostter.emplace(this, dynamic_pointer_cast(shared_from_this())); TraceL << "quickTime will not send request any more!"; } + + //流量统计事件广播 + static uint64_t iFlowThreshold = mINI::Instance()[Broadcast::kFlowThreshold]; + if(m_ui64TotalBytes > iFlowThreshold * 1024){ + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport,m_mediaInfo,m_ui64TotalBytes); + } } void RtspSession::onManager() { @@ -144,9 +150,11 @@ void RtspSession::onManager() { void RtspSession::onRecv(const Socket::Buffer::Ptr &pBuf) { m_ticker.resetTime(); - char tmp[2 * 1024]; - m_pcBuf = tmp; - if (m_bBase64need) { + char tmp[2 * 1024]; + m_pcBuf = tmp; + + m_ui64TotalBytes += pBuf->size(); + if (m_bBase64need) { //quicktime 加密后的rtsp请求,需要解密 av_base64_decode((uint8_t *) m_pcBuf, pBuf->data(), sizeof(tmp)); m_parser.Parse(m_pcBuf); //rtsp请求解析 @@ -217,8 +225,8 @@ bool RtspSession::handleReq_Describe() { //广播是否需要认证事件 if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnGetRtspRealm, - m_mediaInfo.m_app.data(), - m_mediaInfo.m_streamid.data(), + m_mediaInfo.m_app, + m_mediaInfo.m_streamid, invoker)){ //无人监听此事件,说明无需认证 invoker(""); @@ -322,7 +330,7 @@ void RtspSession::onAuthBasic(const weak_ptr &weakSelf,const string }; //此时必须提供明文密码 bool must_no_encrypt = true; - if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth,user.data(),must_no_encrypt,invoker)){ + if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth,user,must_no_encrypt,invoker)){ //表明该流需要认证却没监听请求密码事件,这一般是大意的程序所为,警告之 WarnL << "请监听kBroadcastOnRtspAuth事件!"; //但是我们还是忽略认证以便完成播放 @@ -404,7 +412,7 @@ void RtspSession::onAuthDigest(const weak_ptr &weakSelf,const strin //此时可以提供明文或md5加密的密码 bool must_no_encrypt = false; - if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth,username.data(),must_no_encrypt,invoker)){ + if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth,username,must_no_encrypt,invoker)){ //表明该流需要认证却没监听请求密码事件,这一般是大意的程序所为,警告之 WarnL << "请监听kBroadcastOnRtspAuth事件!"; //但是我们还是忽略认证以便完成播放 @@ -908,6 +916,7 @@ inline void RtspSession::sendRtpPacket(const RtpPacket::Ptr & pkt) { return; } BufferRtp::Ptr buffer(new BufferRtp(pkt,4)); + m_ui64TotalBytes += buffer->size(); pSock->send(buffer,SOCKET_DEFAULE_FLAGS, peerAddr.get()); } break; diff --git a/src/Rtsp/RtspSession.h b/src/Rtsp/RtspSession.h index 88722f78..4f545c4f 100644 --- a/src/Rtsp/RtspSession.h +++ b/src/Rtsp/RtspSession.h @@ -83,16 +83,20 @@ public: private: typedef bool (RtspSession::*rtspCMDHandle)(); int send(const string &strBuf) override { + m_ui64TotalBytes += strBuf.size(); return m_pSender->send(strBuf); } int send(string &&strBuf) override { - return m_pSender->send(std::move(strBuf)); + m_ui64TotalBytes += strBuf.size(); + return m_pSender->send(std::move(strBuf)); } int send(const char *pcBuf, int iSize) override { - return m_pSender->send(pcBuf, iSize); + m_ui64TotalBytes += iSize; + return m_pSender->send(pcBuf, iSize); } int send(const Socket::Buffer::Ptr &pkt) override{ - return m_pSender->send(pkt,SOCKET_DEFAULE_FLAGS | FLAG_MORE); + m_ui64TotalBytes += pkt->size(); + return m_pSender->send(pkt,SOCKET_DEFAULE_FLAGS | FLAG_MORE); } void shutdown() override; bool handleReq_Options(); //处理options方法 @@ -185,6 +189,10 @@ private: //quicktime 请求rtsp会产生两次tcp连接, //一次发送 get 一次发送post,需要通过sessioncookie关联起来 string m_strSessionCookie; + + //消耗的总流量 + uint64_t m_ui64TotalBytes = 0; + static recursive_mutex g_mtxGetter; //对quicktime上锁保护 static recursive_mutex g_mtxPostter; //对quicktime上锁保护 static unordered_map > g_mapGetter;