diff --git a/server/WebApi.cpp b/server/WebApi.cpp index 5fadf3bb..7776958f 100755 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -1104,7 +1104,7 @@ void installWebApi() { throw runtime_error(StrPrinter << "播放鉴权失败:" << err); } auto rtc = WebRtcTransportImp::create(EventPollerPool::Instance().getPoller()); - rtc->attach(src, true); + rtc->attach(src, info, true); val["sdp"] = rtc->getAnswerSdp(offer_sdp); val["type"] = "answer"; invoker(200, headerOut, val.toStyledString()); @@ -1138,7 +1138,7 @@ void installWebApi() { push_src->setProtocolTranslation(enableHls, enableMP4); auto rtc = WebRtcTransportImp::create(EventPollerPool::Instance().getPoller()); push_src->setListener(rtc); - rtc->attach(push_src, false); + rtc->attach(push_src, info, false); val["sdp"] = rtc->getAnswerSdp(offer_sdp); val["type"] = "answer"; invoker(200, headerOut, val.toStyledString()); diff --git a/webrtc/WebRtcTransport.cpp b/webrtc/WebRtcTransport.cpp index 9930d8f4..a229171d 100644 --- a/webrtc/WebRtcTransport.cpp +++ b/webrtc/WebRtcTransport.cpp @@ -285,10 +285,37 @@ WebRtcTransportImp::~WebRtcTransportImp() { void WebRtcTransportImp::onDestory() { WebRtcTransport::onDestory(); + uint64_t duration = _alive_ticker.createdTime() / 1000; + + //流量统计事件广播 + GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold); + + if (_play_src) { + WarnP(_socket) << "RTC播放器(" + << _media_info._vhost << "/" + << _media_info._app << "/" + << _media_info._streamid + << ")结束播放,耗时(s):" << duration; + if (_bytes_usage >= iFlowThreshold * 1024) { + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _bytes_usage, duration, true, static_cast(*_socket)); + } + } + + if (_push_src) { + WarnP(_socket) << "RTC推流器(" + << _media_info._vhost << "/" + << _media_info._app << "/" + << _media_info._streamid + << ")结束推流,耗时(s):" << duration; + if (_bytes_usage >= iFlowThreshold * 1024) { + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _bytes_usage, duration, false, static_cast(*_socket)); + } + } } -void WebRtcTransportImp::attach(const RtspMediaSource::Ptr &src, bool is_play) { +void WebRtcTransportImp::attach(const RtspMediaSource::Ptr &src, const MediaInfo &info, bool is_play) { assert(src); + _media_info = info; if (is_play) { _play_src = src; } else { @@ -455,6 +482,7 @@ private: }; void WebRtcTransportImp::onRtcp(const char *buf, size_t len) { + _bytes_usage += len; auto rtcps = RtcpHeader::loadFromBytes((char *) buf, len); for (auto rtcp : rtcps) { switch ((RtcpType) rtcp->pt) { @@ -504,6 +532,7 @@ void WebRtcTransportImp::onRtcp(const char *buf, size_t len) { } void WebRtcTransportImp::onRtp(const char *buf, size_t len) { + _bytes_usage += len; _alive_ticker.resetTime(); RtpHeader *rtp = (RtpHeader *) buf; //根据接收到的rtp的pt信息,找到该流的信息 @@ -549,6 +578,7 @@ void WebRtcTransportImp::onSendRtp(const RtpPacket::Ptr &rtp, bool flush){ //忽略,对方不支持该编码类型 return; } + _bytes_usage += rtp->size() - RtpPacket::kRtpTcpHeaderSize; sendRtpPacket(rtp->data() + RtpPacket::kRtpTcpHeaderSize, rtp->size() - RtpPacket::kRtpTcpHeaderSize, flush, pt); //统计rtp发送情况,好做sr汇报 _rtp_info_pt[pt].rtcp_context_send->onRtp(rtp->getSeq(), rtp->getStampMS(), rtp->size() - RtpPacket::kRtpTcpHeaderSize); diff --git a/webrtc/WebRtcTransport.h b/webrtc/WebRtcTransport.h index 1cd63583..6c2ea591 100644 --- a/webrtc/WebRtcTransport.h +++ b/webrtc/WebRtcTransport.h @@ -127,7 +127,7 @@ public: * @param src 媒体源 * @param is_play 是播放还是推流 */ - void attach(const RtspMediaSource::Ptr &src, bool is_play = true); + void attach(const RtspMediaSource::Ptr &src, const MediaInfo &info, bool is_play = true); protected: void onStartWebRTC() override; @@ -176,6 +176,10 @@ private: void onBeforeSortedRtp(const RtpPayloadInfo &info,const RtpPacket::Ptr &rtp); private: + //用掉的总流量 + uint64_t _bytes_usage = 0; + //媒体相关元数据 + MediaInfo _media_info; //保持自我强引用 Ptr _self; //检测超时的定时器