From dd2192fd5eb07c6048032c4fc0d8fecf87262c36 Mon Sep 17 00:00:00 2001 From: xiongziliang <771730766@qq.com> Date: Fri, 14 Dec 2018 17:10:24 +0800 Subject: [PATCH] =?UTF-8?q?rtp=20over=20udp=E6=94=B9=E6=88=90=E7=8B=AC?= =?UTF-8?q?=E5=8D=A0=E5=BC=8F=E7=AB=AF=E5=8F=A3=EF=BC=8C=E6=8F=90=E9=AB=98?= =?UTF-8?q?=E6=80=A7=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Common/config.cpp | 2 +- src/Common/config.h | 10 ++- src/Rtmp/RtmpSession.cpp | 2 +- src/Rtsp/RtspSession.cpp | 163 ++++++++++++++++++++++++++++++--------- src/Rtsp/RtspSession.h | 9 ++- 5 files changed, 141 insertions(+), 45 deletions(-) diff --git a/src/Common/config.cpp b/src/Common/config.cpp index 2d9bb1dc..75d25dcb 100644 --- a/src/Common/config.cpp +++ b/src/Common/config.cpp @@ -58,7 +58,7 @@ const char kBroadcastHttpRequest[] = "kBroadcastHttpRequest"; const char kBroadcastOnGetRtspRealm[] = "kBroadcastOnGetRtspRealm"; const char kBroadcastOnRtspAuth[] = "kBroadcastOnRtspAuth"; const char kBroadcastMediaPlayed[] = "kBroadcastMediaPlayed"; -const char kBroadcastRtmpPublish[] = "kBroadcastRtmpPublish"; +const char kBroadcastMediaPublish[] = "kBroadcastMediaPublish"; const char kBroadcastFlowReport[] = "kBroadcastFlowReport"; const char kBroadcastReloadConfig[] = "kBroadcastReloadConfig"; const char kBroadcastShellLogin[] = "kBroadcastShellLogin"; diff --git a/src/Common/config.h b/src/Common/config.h index 9ae406c0..9c31a14c 100644 --- a/src/Common/config.h +++ b/src/Common/config.h @@ -92,9 +92,13 @@ extern const char kBroadcastOnRtspAuth[]; //如果errMessage为空则代表鉴权成功 typedef std::function AuthInvoker; -//收到rtmp推流事件广播,通过该事件控制推流鉴权 -extern const char kBroadcastRtmpPublish[]; -#define BroadcastRtmpPublishArgs const MediaInfo &args,const Broadcast::AuthInvoker &invoker,TcpSession &sender +//收到rtsp/rtmp推流事件广播,通过该事件控制推流鉴权 +extern const char kBroadcastMediaPublish[]; +#define BroadcastMediaPublishArgs const MediaInfo &args,const Broadcast::AuthInvoker &invoker,TcpSession &sender + +//兼容旧代码的宏 +#define BroadcastRtmpPublishArgs BroadcastMediaPublishArgs +#define kBroadcastRtmpPublish kBroadcastMediaPublish //播放rtsp/rtmp/http-flv事件广播,通过该事件控制播放鉴权 extern const char kBroadcastMediaPlayed[]; diff --git a/src/Rtmp/RtmpSession.cpp b/src/Rtmp/RtmpSession.cpp index a498a4a1..fdb34c7b 100644 --- a/src/Rtmp/RtmpSession.cpp +++ b/src/Rtmp/RtmpSession.cpp @@ -190,7 +190,7 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) { onRes(err); }); }; - auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastRtmpPublish, + auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish, _mediaInfo, invoker, *this); diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index 6b24c3f8..c547d1aa 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -89,10 +89,9 @@ void RtspSession::shutdown_l(bool close){ void RtspSession::onError(const SockException& err) { TraceL << err.getErrCode() << " " << err.what(); - if (_bListenPeerUdpData) { + if (_rtpType == PlayerBase::RTP_MULTICAST) { //取消UDP端口监听 UDPServer::Instance().stopListenPeer(get_peer_ip().data(), this); - _bListenPeerUdpData = false; } if (!_bBase64need && _strSessionCookie.size() != 0) { //quickTime http getter @@ -131,7 +130,9 @@ void RtspSession::onManager() { return; } } - if (_rtpType != PlayerBase::RTP_TCP && _ticker.elapsedTime() > 15 * 1000) { + + //组播不检查心跳是否超时 + if (_rtpType != PlayerBase::RTP_MULTICAST && _ticker.elapsedTime() > 15 * 1000) { WarnL << "RTSP会话超时:" << get_peer_ip(); shutdown(); return; @@ -157,6 +158,7 @@ int64_t RtspSession::onRecvHeader(const char *header,uint64_t len) { s_handler_map.emplace("OPTIONS",&RtspSession::handleReq_Options); s_handler_map.emplace("DESCRIBE",&RtspSession::handleReq_Describe); s_handler_map.emplace("ANNOUNCE",&RtspSession::handleReq_ANNOUNCE); + s_handler_map.emplace("RECORD",&RtspSession::handleReq_RECORD); s_handler_map.emplace("SETUP",&RtspSession::handleReq_Setup); s_handler_map.emplace("PLAY",&RtspSession::handleReq_Play); s_handler_map.emplace("PAUSE",&RtspSession::handleReq_Pause); @@ -207,7 +209,7 @@ void RtspSession::inputRtspOrRtcp(const char *data,uint64_t len) { int RtspSession::handleReq_Options() { //支持这些命令 - sendRtspResponse("200 OK",{"Public" , "OPTIONS, DESCRIBE, SETUP, TEARDOWN, PLAY, PAUSE, ANNOUNCE, SET_PARAMETER, GET_PARAMETER"}); + sendRtspResponse("200 OK",{"Public" , "OPTIONS, DESCRIBE, SETUP, TEARDOWN, PLAY, PAUSE, ANNOUNCE, RECORD, SET_PARAMETER, GET_PARAMETER"}); return 0; } @@ -218,14 +220,71 @@ void RtspSession::onRecvContent(const char *data, uint64_t len) { _onContent = nullptr; } } + int RtspSession::handleReq_ANNOUNCE() { sendRtspResponse("200 OK"); _onContent = [this](const char *data, uint64_t len){ - + _strSdp.assign(data,len); + SdpAttr attr(_strSdp); + _aTrackInfo = attr.getAvailableTrack(); }; return atoi(_parser["Content-Length"].data()); } +int RtspSession::handleReq_RECORD(){ + if (_aTrackInfo.empty() || _parser["Session"] != _strSession) { + send_SessionNotFound(); + return -1; + } + auto onRes = [this](const string &err){ + bool authSuccess = err.empty(); + if(!authSuccess){ + //第一次play是播放,否则是恢复播放。只对播放鉴权 + sendRtspResponse("401 Unauthorized", {"Content-Type", "text/plain"}, err); + shutdown(); + return; + } + + _StrPrinter rtp_info; + for(auto &track : _aTrackInfo){ + if (track->_inited == false) { + //还有track没有setup + shutdown(); + return; + } + rtp_info << "url=" << _strUrl << "/" << track->_control_surffix << ","; + } + + rtp_info.pop_back(); + sendRtspResponse("200 OK", {"RTP-Info",rtp_info}); + SockUtil::setNoDelay(_pSender->rawFD(),false); + }; + + weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); + Broadcast::AuthInvoker invoker = [weakSelf,onRes](const string &err){ + auto strongSelf = weakSelf.lock(); + if(!strongSelf){ + return; + } + strongSelf->async([weakSelf,onRes,err](){ + auto strongSelf = weakSelf.lock(); + if(!strongSelf){ + return; + } + onRes(err); + }); + }; + + //rtsp推流需要鉴权 + auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish,_mediaInfo,invoker,*this); + if(!flag){ + //该事件无人监听,默认不鉴权 + onRes(""); + } + return 0; +} + + int RtspSession::handleReq_Describe() { { //解析url获取媒体名称 @@ -510,21 +569,22 @@ int RtspSession::handleReq_Setup() { break; case PlayerBase::RTP_UDP: { //我们用trackIdx区分rtp和rtcp包 - auto pSockRtp = UDPServer::Instance().getSock(get_local_ip().data(),2*trackIdx); - if (!pSockRtp) { + auto pSockRtp = std::make_shared(_sock->getPoller()); + if (!pSockRtp->bindUdpSock(0,get_local_ip().data())) { //分配端口失败 WarnL << "分配rtp端口失败"; send_NotAcceptable(); return -1; } - auto pSockRtcp = UDPServer::Instance().getSock(get_local_ip().data(),2*trackIdx + 1 ,pSockRtp->get_local_port() + 1); - if (!pSockRtcp) { + auto pSockRtcp = std::make_shared(_sock->getPoller()); + if (!pSockRtcp->bindUdpSock(pSockRtp->get_local_port() + 1,get_local_ip().data())) { //分配端口失败 WarnL << "分配rtcp端口失败"; send_NotAcceptable(); return -1; } - _apUdpSock[trackIdx] = pSockRtp; + _apRtpSock[trackIdx] = pSockRtp; + _apRtcpSock[trackIdx] = pSockRtcp; //设置客户端内网端口信息 string strClientPort = FindField(_parser["Transport"].data(), "client_port=", NULL); uint16_t ui16PeerPort = atoi( FindField(strClientPort.data(), NULL, "-").data()); @@ -533,9 +593,9 @@ int RtspSession::handleReq_Setup() { peerAddr.sin_port = htons(ui16PeerPort); peerAddr.sin_addr.s_addr = inet_addr(get_peer_ip().data()); bzero(&(peerAddr.sin_zero), sizeof peerAddr.sin_zero); - _apPeerUdpAddr[trackIdx].reset((struct sockaddr *) (new struct sockaddr_in(peerAddr))); + _apPeerRtpPortAddr[trackIdx].reset((struct sockaddr *) (new struct sockaddr_in(peerAddr))); //尝试获取客户端nat映射地址 - startListenPeerUdpData(); + startListenPeerUdpData(trackIdx); //InfoL << "分配端口:" << srv_port; sendRtspResponse("200 OK", @@ -564,6 +624,7 @@ int RtspSession::handleReq_Setup() { } int iSrvPort = _pBrdcaster->getPort(trackRef->_type); //我们用trackIdx区分rtp和rtcp包 + //由于组播udp端口是共享的,而rtcp端口为组播udp端口+1,所以rtcp端口需要改成共享端口 auto pSockRtcp = UDPServer::Instance().getSock(get_local_ip().data(),2*trackIdx + 1,iSrvPort + 1); if (!pSockRtcp) { //分配端口失败 @@ -571,7 +632,7 @@ int RtspSession::handleReq_Setup() { send_NotAcceptable(); return -1; } - startListenPeerUdpData(); + startListenPeerUdpData(trackIdx); GET_CONFIG_AND_REGISTER(uint32_t,udpTTL,MultiCast::kUdpTTL); sendRtspResponse("200 OK", @@ -914,12 +975,12 @@ inline void RtspSession::sendRtpPacket(const RtpPacket::Ptr & pkt) { break; case PlayerBase::RTP_UDP: { int iTrackIndex = getTrackIndexByTrackType(pkt->type); - auto pSock = _apUdpSock[iTrackIndex].lock(); + auto &pSock = _apRtpSock[iTrackIndex]; if (!pSock) { shutdown(); return; } - auto peerAddr = _apPeerUdpAddr[iTrackIndex]; + auto &peerAddr = _apPeerRtpPortAddr[iTrackIndex]; if (!peerAddr) { return; } @@ -934,7 +995,11 @@ inline void RtspSession::sendRtpPacket(const RtpPacket::Ptr & pkt) { } inline void RtspSession::onRcvPeerUdpData(int iTrackIdx, const Buffer::Ptr &pBuf, const struct sockaddr& addr) { + //这是rtcp心跳包,说明播放器还存活 + _ticker.resetTime(); + if(iTrackIdx % 2 == 0){ +// DebugL << "rtp数据包:" << iTrackIdx / 2; //这是rtp探测包 if(!_bGotAllPeerUdp){ //还没有获取完整的rtp探测包 @@ -944,7 +1009,7 @@ inline void RtspSession::onRcvPeerUdpData(int iTrackIdx, const Buffer::Ptr &pBuf return; } //设置真实的客户端nat映射端口号 - _apPeerUdpAddr[iTrackIdx / 2].reset(new struct sockaddr(addr)); + _apPeerRtpPortAddr[iTrackIdx / 2].reset(new struct sockaddr(addr)); _abGotPeerUdp[iTrackIdx / 2] = true; _bGotAllPeerUdp = true;//先假设获取到完整的rtp探测包 for (unsigned int i = 0; i < _aTrackInfo.size(); i++) { @@ -956,32 +1021,58 @@ inline void RtspSession::onRcvPeerUdpData(int iTrackIdx, const Buffer::Ptr &pBuf } } }else{ - //这是rtcp心跳包,说明播放器还存活 - _ticker.resetTime(); - //TraceL << "rtcp:" << (iTrackIdx-1)/2 ; +// TraceL << "rtcp数据包" << (iTrackIdx-1)/2 ; } } -inline void RtspSession::startListenPeerUdpData() { - _bListenPeerUdpData = true; +inline void RtspSession::startListenPeerUdpData(int trackIdx) { weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - UDPServer::Instance().listenPeer(get_peer_ip().data(), this, - [weakSelf](int iTrackIdx,const Buffer::Ptr &pBuf,struct sockaddr *pPeerAddr)->bool { - auto strongSelf=weakSelf.lock(); - if(!strongSelf) { - return false; - } - struct sockaddr addr=*pPeerAddr; - strongSelf->async_first([weakSelf,pBuf,addr,iTrackIdx]() { - auto strongSelf=weakSelf.lock(); - if(!strongSelf) { - return; - } - strongSelf->onRcvPeerUdpData(iTrackIdx,pBuf,addr); - }); - return true; + + auto onUdpData = [weakSelf](const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr,int iTrackIdx){ + auto strongSelf=weakSelf.lock(); + if(!strongSelf) { + return false; + } + struct sockaddr addr=*pPeerAddr; + strongSelf->async_first([weakSelf,pBuf,addr,iTrackIdx]() { + auto strongSelf=weakSelf.lock(); + if(!strongSelf) { + return; + } + strongSelf->onRcvPeerUdpData(iTrackIdx,pBuf,addr); + }); + return true; + }; + + switch (_rtpType){ + case PlayerBase::RTP_MULTICAST:{ + //组播使用的共享rtcp端口 + UDPServer::Instance().listenPeer(get_peer_ip().data(), this, [onUdpData]( + int iTrackIdx, const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr) { + return onUdpData(pBuf,pPeerAddr,iTrackIdx); }); + } + break; + case PlayerBase::RTP_UDP:{ + auto setEvent = [&](Socket::Ptr &sock,int iTrackIdx){ + if(!sock){ + WarnL << "udp端口为空:" << iTrackIdx; + return; + } + sock->setOnRead([onUdpData,iTrackIdx](const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr){ + onUdpData(pBuf,pPeerAddr,iTrackIdx); + }); + }; + setEvent(_apRtpSock[trackIdx], 2*trackIdx ); + setEvent(_apRtcpSock[trackIdx], 2*trackIdx + 1 ); + } + break; + + default: + break; + } + } inline void RtspSession::initSender(const std::shared_ptr& session) { diff --git a/src/Rtsp/RtspSession.h b/src/Rtsp/RtspSession.h index af3387ab..2d8733bd 100644 --- a/src/Rtsp/RtspSession.h +++ b/src/Rtsp/RtspSession.h @@ -88,6 +88,7 @@ private: int handleReq_Options(); //处理options方法 int handleReq_Describe(); //处理describe方法 int handleReq_ANNOUNCE(); //处理options方法 + int handleReq_RECORD(); //处理options方法 int handleReq_Setup(); //处理setup方法 int handleReq_Play(); //处理play方法 int handleReq_Pause(); //处理pause方法 @@ -108,7 +109,7 @@ private: inline int getTrackIndexByControlSuffix(const string &controlSuffix); inline void onRcvPeerUdpData(int iTrackIdx, const Buffer::Ptr &pBuf, const struct sockaddr &addr); - inline void startListenPeerUdpData(); + inline void startListenPeerUdpData(int iTrackIdx); //认证相关 static void onAuthSuccess(const weak_ptr &weakSelf); @@ -142,9 +143,9 @@ private: //RTP over udp bool _bGotAllPeerUdp = false; bool _abGotPeerUdp[2] = { false, false }; //获取客户端udp端口计数 - weak_ptr _apUdpSock[2]; //发送RTP的UDP端口,trackid idx 为数组下标 - std::shared_ptr _apPeerUdpAddr[2]; //播放器接收RTP的地址,trackid idx 为数组下标 - bool _bListenPeerUdpData = false; + Socket::Ptr _apRtpSock[2]; //RTP端口,trackid idx 为数组下标 + Socket::Ptr _apRtcpSock[2];//RTCP端口,trackid idx 为数组下标 + std::shared_ptr _apPeerRtpPortAddr[2]; //播放器接收RTP的地址,trackid idx 为数组下标 //RTP over udp_multicast RtpBroadCaster::Ptr _pBrdcaster;