From 0c5cd62429e1aade0f25a76537c3355452e5a184 Mon Sep 17 00:00:00 2001 From: xiongziliang <771730766@qq.com> Date: Mon, 17 Dec 2018 13:14:49 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96rtsp=20over=20http=EF=BC=8C?= =?UTF-8?q?=E5=88=A0=E9=99=A4=E5=86=97=E4=BD=99=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Rtsp/RtspSession.cpp | 171 +++++++++++++++++++-------------------- src/Rtsp/RtspSession.h | 19 ++--- 2 files changed, 87 insertions(+), 103 deletions(-) diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index 72e1ae8d..4ee826e9 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -44,13 +44,35 @@ namespace mediakit { static int kSockFlags = SOCKET_DEFAULE_FLAGS | FLAG_MORE; -unordered_map > RtspSession::g_mapGetter; -unordered_map > RtspSession::g_mapPostter; -recursive_mutex RtspSession::g_mtxGetter; //对quicktime上锁保护 -recursive_mutex RtspSession::g_mtxPostter; //对quicktime上锁保护 +/** + * rtsp协议有多种方式传输rtp数据包,目前已支持包括以下4种 + * 1: rtp over udp ,这种方式是rtp通过单独的udp端口传输 + * 2: rtp over udp_multicast,这种方式是rtp通过共享udp组播端口传输 + * 3: rtp over tcp,这种方式是通过rtsp信令tcp通道完成传输 + * 4: rtp over http,下面着重讲解:rtp over http + * + * rtp over http 是把rtsp协议伪装成http协议以达到穿透防火墙的目的, + * 此时播放器会发送两次http请求至rtsp服务器,第一次是http get请求, + * 第二次是http post请求。 + * + * 这两次请求通过http请求头中的x-sessioncookie键完成绑定 + * + * 第一次http get请求用于接收rtp、rtcp和rtsp回复,后续该链接不再发送其他请求 + * 第二次http post请求用于发送rtsp请求,rtsp握手结束后可能会断开连接,此时我们还要维持rtp发送 + * 需要指出的是http post请求中的content负载就是base64编码后的rtsp请求包, + * 播放器会把rtsp请求伪装成http content负载发送至rtsp服务器,然后rtsp服务器又把回复发送给第一次http get请求的tcp链接 + * 这样,对防火墙而言,本次rtsp会话就是两次http请求,防火墙就会放行数据 + * + * zlmediakit在处理rtsp over http的请求时,会把http poster中的content数据base64解码后转发给http getter处理 + */ -RtspSession::RtspSession(const std::shared_ptr &pTh, const Socket::Ptr &pSock) : - TcpSession(pTh, pSock), _pSender(pSock) { + +//rtsp over http 情况下get请求实例,在请求实例用于接收rtp数据包 +static unordered_map > g_mapGetter; +//对g_mapGetter上锁保护 +static recursive_mutex g_mtxGetter; + +RtspSession::RtspSession(const std::shared_ptr &pTh, const Socket::Ptr &pSock) : TcpSession(pTh, pSock) { //设置10秒发送缓存 pSock->setSendBufSecond(10); //设置15秒发送超时时间 @@ -60,55 +82,20 @@ RtspSession::RtspSession(const std::shared_ptr &pTh, const Socket::P } RtspSession::~RtspSession() { - if (_onDestory) { - _onDestory(); - } DebugL << get_peer_ip(); } -void RtspSession::shutdown(){ - shutdown_l(true); -} -void RtspSession::shutdown_l(bool close){ - if (_sock) { - _sock->emitErr(SockException(Err_other, "self shutdown"),close); - } - if (_bBase64need && !_sock) { - //quickTime http postter,and self is detached from tcpServer - lock_guard lock(g_mtxPostter); - g_mapPostter.erase(this); - } - if (_pBrdcaster) { - _pBrdcaster->setDetachCB(this, nullptr); - _pBrdcaster.reset(); - } - if (_pRtpReader) { - _pRtpReader.reset(); - } -} - void RtspSession::onError(const SockException& err) { TraceL << err.getErrCode() << " " << err.what(); if (_rtpType == PlayerBase::RTP_MULTICAST) { //取消UDP端口监听 UDPServer::Instance().stopListenPeer(get_peer_ip().data(), this); } - if (!_bBase64need && _strSessionCookie.size() != 0) { - //quickTime http getter - lock_guard lock(g_mtxGetter); - g_mapGetter.erase(_strSessionCookie); - } - if (_bBase64need && err.getErrCode() == Err_eof) { - //quickTime http postter,正在发送rtp; QuickTime只是断开了请求连接,请继续发送rtp - _sock = nullptr; - lock_guard lock(g_mtxPostter); - //为了保证脱离TCPServer后还能正常运作,需要保持本对象的强引用 - try { - g_mapPostter.emplace(this, dynamic_pointer_cast(shared_from_this())); - }catch (std::exception &ex){ - } - TraceL << "quickTime will not send request any more!"; + if (_http_x_sessioncookie.size() != 0) { + //移除http getter的弱引用记录 + lock_guard lock(g_mtxGetter); + g_mapGetter.erase(_http_x_sessioncookie); } //流量统计事件广播 @@ -120,6 +107,7 @@ void RtspSession::onError(const SockException& err) { _ticker.createdTime()/1000, *this); } + } void RtspSession::onManager() { @@ -132,7 +120,7 @@ void RtspSession::onManager() { } //组播不检查心跳是否超时 - if (_rtpType != PlayerBase::RTP_MULTICAST && _ticker.elapsedTime() > 15 * 1000) { + if (_rtpType == PlayerBase::RTP_UDP && _ticker.elapsedTime() > 15 * 1000) { WarnL << "RTSP会话超时:" << get_peer_ip(); shutdown(); return; @@ -189,10 +177,9 @@ int64_t RtspSession::onRecvHeader(const char *header,uint64_t len) { void RtspSession::onRecv(const Buffer::Ptr &pBuf) { _ticker.resetTime(); _ui64TotalBytes += pBuf->size(); - if (_bBase64need) { - //quicktime 加密后的rtsp请求,需要解密 - auto str = decodeBase64(string(pBuf->data(),pBuf->size())); - inputRtspOrRtcp(str.data(),str.size()); + if (_onRecv) { + //http poster的请求数据转发给http getter处理 + _onRecv(pBuf); } else { inputRtspOrRtcp(pBuf->data(),pBuf->size()); } @@ -250,6 +237,7 @@ int RtspSession::handleReq_ANNOUNCE() { _strUrl = _parser.Url(); _pushSrc = std::make_shared(_mediaInfo._vhost,_mediaInfo._app,_mediaInfo._streamid); + _pushSrc->setListener(dynamic_pointer_cast(shared_from_this())); _pushSrc->onGetSDP(_strSdp); sendRtspResponse("200 OK"); }; @@ -282,7 +270,8 @@ int RtspSession::handleReq_RECORD(){ rtp_info.pop_back(); sendRtspResponse("200 OK", {"RTP-Info",rtp_info}); - SockUtil::setNoDelay(_pSender->rawFD(),false); + SockUtil::setNoDelay(_sock->rawFD(),false); + (*this) << SocketFlags(kSockFlags); }; weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); @@ -586,7 +575,7 @@ int RtspSession::handleReq_Setup() { sendRtspResponse("200 OK", {"Transport",StrPrinter << "RTP/AVP/TCP;unicast;" << "interleaved=" << trackRef->_type * 2 << "-" << trackRef->_type * 2 + 1 << ";" - << "ssrc=" << printSSRC(trackRef->_ssrc) << ";mode=play", + << "ssrc=" << printSSRC(trackRef->_ssrc), "x-Transport-Options" , "late-tolerance=1.400000", "x-Dynamic-Rate" , "1" }); @@ -627,7 +616,7 @@ int RtspSession::handleReq_Setup() { {"Transport",StrPrinter << "RTP/AVP/UDP;unicast;" << "client_port=" << strClientPort << ";" << "server_port=" << pSockRtp->get_local_port() << "-" << pSockRtcp->get_local_port() << ";" - << "ssrc=" << printSSRC(trackRef->_ssrc) << ";mode=play" + << "ssrc=" << printSSRC(trackRef->_ssrc) }); } break; @@ -743,7 +732,8 @@ int RtspSession::handleReq_Play() { //提高发送性能 (*this) << SocketFlags(kSockFlags); - SockUtil::setNoDelay(_pSender->rawFD(),false); + SockUtil::setNoDelay(_sock->rawFD(),false); + (*this) << SocketFlags(kSockFlags); if (!_pRtpReader && _rtpType != PlayerBase::RTP_MULTICAST) { weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); @@ -822,7 +812,7 @@ int RtspSession::handleReq_Teardown() { } int RtspSession::handleReq_Get() { - _strSessionCookie = _parser["x-sessioncookie"]; + _http_x_sessioncookie = _parser["x-sessioncookie"]; sendRtspResponse("200 OK", {"Connection","Close", "Cache-Control","no-store", @@ -830,10 +820,9 @@ int RtspSession::handleReq_Get() { "Content-Type","application/x-rtsp-tunnelled", },"","HTTP/1.0"); - //注册GET + //注册http getter,以便http poster绑定 lock_guard lock(g_mtxGetter); - g_mapGetter[_strSessionCookie] = dynamic_pointer_cast(shared_from_this()); - //InfoL << _strSessionCookie; + g_mapGetter[_http_x_sessioncookie] = dynamic_pointer_cast(shared_from_this()); return 0; } @@ -841,24 +830,21 @@ int RtspSession::handleReq_Get() { int RtspSession::handleReq_Post() { lock_guard lock(g_mtxGetter); string sessioncookie = _parser["x-sessioncookie"]; -//Poster 找到 Getter + //Poster 找到 Getter auto it = g_mapGetter.find(sessioncookie); if (it == g_mapGetter.end()) { - //WarnL << sessioncookie; + WarnL << "Http Poster未找到Http Getter"; return -1; } - _bBase64need = true; -//Poster 找到Getter的SOCK - auto strongSession = it->second.lock(); + + //Poster 找到Getter的SOCK + auto httpGetterWeak = it->second; + //移除http getter的弱引用记录 g_mapGetter.erase(sessioncookie); - if (!strongSession) { - send_SessionNotFound(); - //WarnL; - return -1; - } - initSender(strongSession); + auto nextPacketSize = remainDataSize(); if(nextPacketSize > 0){ + //防止http poster中的content部分粘包(后续content都是base64编码的rtsp请求包) _onContent = [this](const char *data,uint64_t len){ BufferRaw::Ptr buffer = std::make_shared(); buffer->assign(data,len); @@ -871,6 +857,26 @@ int RtspSession::handleReq_Post() { },false); }; } + + //http poster收到请求后转发给http getter处理 + _onRecv = [this,httpGetterWeak](const Buffer::Ptr &pBuf){ + auto httpGetterStrong = httpGetterWeak.lock(); + if(!httpGetterStrong){ + WarnL << "Http Getter已经释放"; + shutdown(); + return; + } + + //切换到http getter的线程 + httpGetterStrong->async([pBuf,httpGetterWeak](){ + auto httpGetterStrong = httpGetterWeak.lock(); + if(!httpGetterStrong){ + return; + } + httpGetterStrong->onRecv(std::make_shared(decodeBase64(string(pBuf->data(),pBuf->size())))); + }); + }; + return nextPacketSize; } @@ -1106,26 +1112,6 @@ inline void RtspSession::startListenPeerUdpData(int trackIdx) { } -inline void RtspSession::initSender(const std::shared_ptr& session) { - _pSender = session->_sock; - weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - session->_onDestory = [weakSelf]() { - auto strongSelf=weakSelf.lock(); - if(!strongSelf) { - return; - } - //DebugL; - strongSelf->_pSender->setOnErr([weakSelf](const SockException &err) { - auto strongSelf=weakSelf.lock(); - if(!strongSelf) { - return; - } - strongSelf->safeShutdown(); - }); - }; - session->shutdown_l(false); -} - static string dateStr(){ char buf[64]; time_t tt = time(NULL); @@ -1168,7 +1154,7 @@ bool RtspSession::sendRtspResponse(const string &res_code, int RtspSession::send(const Buffer::Ptr &pkt){ _ui64TotalBytes += pkt->size(); - return _pSender->send(pkt,_flags); + return TcpSession::send(pkt); } bool RtspSession::sendRtspResponse(const string &res_code, @@ -1214,6 +1200,11 @@ inline int RtspSession::getTrackIndexByControlSuffix(const string &controlSuffix return -1; } +bool RtspSession::close() { + InfoL << "kick out:" << _mediaInfo._vhost << " " << _mediaInfo._app << " " << _mediaInfo._streamid; + safeShutdown(); + return true; +} #ifdef RTSP_SEND_RTCP inline void RtspSession::sendRTCP() { diff --git a/src/Rtsp/RtspSession.h b/src/Rtsp/RtspSession.h index 032edc0c..eecfcf95 100644 --- a/src/Rtsp/RtspSession.h +++ b/src/Rtsp/RtspSession.h @@ -66,7 +66,7 @@ private: uint32_t _offset; }; -class RtspSession: public TcpSession, public HttpRequestSplitter, public RtpReceiver{ +class RtspSession: public TcpSession, public HttpRequestSplitter, public RtpReceiver , public MediaSourceEvent{ public: typedef std::shared_ptr Ptr; typedef std::function onGetRealm; @@ -85,10 +85,10 @@ protected: void onRecvContent(const char *data,uint64_t len) override; //RtpReceiver override void onRtpSorted(const RtpPacket::Ptr &rtppt, int trackidx) override; + //MediaSourceEvent override + bool close() override ; private: void inputRtspOrRtcp(const char *data,uint64_t len); - void shutdown() override ; - void shutdown_l(bool close); int handleReq_Options(); //处理options方法 int handleReq_Describe(); //处理describe方法 int handleReq_ANNOUNCE(); //处理options方法 @@ -129,7 +129,6 @@ private: bool sendRtspResponse(const string &res_code,const std::initializer_list &header, const string &sdp = "" , const char *protocol = "RTSP/1.0"); bool sendRtspResponse(const string &res_code,const StrCaseMap &header = StrCaseMap(), const string &sdp = "",const char *protocol = "RTSP/1.0"); int send(const Buffer::Ptr &pkt) override; - inline void initSender(const std::shared_ptr &pSession); //处理rtsp over http,quicktime使用的 private: Ticker _ticker; Parser _parser; //rtsp解析类 @@ -160,17 +159,11 @@ private: uint64_t _ui64TotalBytes = 0; //RTSP over HTTP - function _onDestory; - bool _bBase64need = false; //是否需要base64解码 - Socket::Ptr _pSender; //回复rtsp时走的tcp通道,供quicktime用 //quicktime 请求rtsp会产生两次tcp连接, - //一次发送 get 一次发送post,需要通过sessioncookie关联起来 - string _strSessionCookie; - static recursive_mutex g_mtxGetter; //对quicktime上锁保护 - static recursive_mutex g_mtxPostter; //对quicktime上锁保护 - static unordered_map > g_mapGetter; - static unordered_map > g_mapPostter; + //一次发送 get 一次发送post,需要通过x-sessioncookie关联起来 + string _http_x_sessioncookie; function _onContent; + function _onRecv; std::function _delayTask; uint32_t _iTaskTimeLine = 0;