From 8bd7157ca135bdd095cb6505d1ab7124a8a10a8c Mon Sep 17 00:00:00 2001 From: xiongziliang <771730766@qq.com> Date: Mon, 20 Nov 2023 21:59:43 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=AF=E6=8C=81=E6=97=A5=E5=BF=97=E4=B8=8A?= =?UTF-8?q?=E4=B8=8B=E6=96=87?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- 3rdpart/ZLToolKit | 2 +- src/Common/MediaSource.cpp | 1 + src/Http/HlsPlayer.cpp | 2 +- src/Rtp/RtpProcess.cpp | 1 + src/Rtp/RtpServer.cpp | 10 ++++++++-- src/Rtsp/RtspPlayer.cpp | 2 ++ src/Rtsp/RtspPusher.cpp | 1 + src/Rtsp/RtspSession.cpp | 5 +++-- srt/SrtSession.cpp | 10 ++++------ srt/SrtTransportImp.cpp | 3 +++ webrtc/WebRtcPusher.cpp | 1 + webrtc/WebRtcSession.cpp | 2 ++ webrtc/WebRtcTransport.cpp | 3 ++- webrtc/WebRtcTransport.h | 9 ++------- 14 files changed, 32 insertions(+), 20 deletions(-) diff --git a/3rdpart/ZLToolKit b/3rdpart/ZLToolKit index ad44a16c..83fc31c8 160000 --- a/3rdpart/ZLToolKit +++ b/3rdpart/ZLToolKit @@ -1 +1 @@ -Subproject commit ad44a16c99834540b397774ad6c7f3f8ed619d56 +Subproject commit 83fc31c8d80499422378403e49cde35a53fda14b diff --git a/src/Common/MediaSource.cpp b/src/Common/MediaSource.cpp index 1977b616..6e8bc02c 100644 --- a/src/Common/MediaSource.cpp +++ b/src/Common/MediaSource.cpp @@ -448,6 +448,7 @@ static void findAsync_l(const MediaInfo &info, const std::shared_ptr &s } poller->async([weak_session, cancel_all, info, cb_once]() { + Logger::setThreadContext(weak_session); cancel_all(); if (auto strong_session = weak_session.lock()) { //播发器请求的流终于注册上了,切换到自己的线程再回复 diff --git a/src/Http/HlsPlayer.cpp b/src/Http/HlsPlayer.cpp index c6509c0c..c22854a6 100644 --- a/src/Http/HlsPlayer.cpp +++ b/src/Http/HlsPlayer.cpp @@ -196,7 +196,7 @@ bool HlsPlayer::onParsed(bool is_m3u8_inner, int64_t sequence, const map weak_self = static_pointer_cast(shared_from_this()); auto url = ts_map.rbegin()->second.url; - getPoller()->async([weak_self, url]() { + async([weak_self, url]() { auto strong_self = weak_self.lock(); if (strong_self) { strong_self->play(url); diff --git a/src/Rtp/RtpProcess.cpp b/src/Rtp/RtpProcess.cpp index 3855f411..3b9b7318 100644 --- a/src/Rtp/RtpProcess.cpp +++ b/src/Rtp/RtpProcess.cpp @@ -253,6 +253,7 @@ void RtpProcess::emitOnPublish() { if (!strong_self) { return; } + Logger::setThreadContext(weak_self); if (err.empty()) { strong_self->_muxer = std::make_shared(strong_self->_media_info, 0.0f, option); diff --git a/src/Rtp/RtpServer.cpp b/src/Rtp/RtpServer.cpp index 1f71a4a8..ed0c5b31 100644 --- a/src/Rtp/RtpServer.cpp +++ b/src/Rtp/RtpServer.cpp @@ -26,7 +26,7 @@ RtpServer::~RtpServer() { } } -class RtcpHelper: public std::enable_shared_from_this { +class RtcpHelper: public std::enable_shared_from_this, public toolkit::LogThreadContext { public: using Ptr = std::shared_ptr; @@ -35,13 +35,15 @@ public: _stream_id = std::move(stream_id); } - ~RtcpHelper() { + ~RtcpHelper() override { if (_process) { // 删除rtp处理器 RtpSelector::Instance().delProcess(_stream_id, _process.get()); } } + std::string getIdentifier() const override { return _process ? _process->getIdentifier() : ""; } + void setRtpServerInfo(uint16_t local_port,RtpServer::TcpMode mode,bool re_use_port,uint32_t ssrc, bool only_audio) { _local_port = local_port; _tcp_mode = mode; @@ -80,6 +82,7 @@ public: if (!strong_self || !strong_self->_process) { return; } + Logger::setThreadContext(weak_self); if (!strong_self->_rtcp_addr) { // 只设置一次rtcp对端端口 strong_self->_rtcp_addr = std::make_shared(); @@ -96,6 +99,7 @@ public: GET_CONFIG(uint64_t, timeoutSec, RtpProxy::kTimeoutSec); _delay_task = _rtcp_sock->getPoller()->doDelayTask(timeoutSec * 1000, [weak_self]() { if (auto strong_self = weak_self.lock()) { + Logger::setThreadContext(weak_self); auto process = RtpSelector::Instance().getProcess(strong_self->_stream_id, false); if (!process && strong_self->_on_detach) { strong_self->_on_detach(); @@ -205,6 +209,7 @@ void RtpServer::start(uint16_t local_port, const string &stream_id, TcpMode tcp_ auto ssrc_ptr = std::make_shared(ssrc); _ssrc = ssrc_ptr; rtp_socket->setOnRead([rtp_socket, helper, ssrc_ptr, bind_peer_addr](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) mutable { + Logger::setThreadContext(helper); RtpHeader *header = (RtpHeader *)buf->data(); auto rtp_ssrc = ntohl(header->ssrc); auto ssrc = *ssrc_ptr; @@ -277,6 +282,7 @@ void RtpServer::onConnect() { auto rtp_session = std::make_shared(_rtp_socket); rtp_session->attachServer(*_tcp_server); _rtp_socket->setOnRead([rtp_session](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) { + Logger::setThreadContext(rtp_session); rtp_session->onRecv(buf); }); weak_ptr weak_self = shared_from_this(); diff --git a/src/Rtsp/RtspPlayer.cpp b/src/Rtsp/RtspPlayer.cpp index f6a13fda..77f9be3c 100644 --- a/src/Rtsp/RtspPlayer.cpp +++ b/src/Rtsp/RtspPlayer.cpp @@ -352,6 +352,7 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int track_idx) { if (!strongSelf) { return; } + Logger::setThreadContext(weakSelf); if (SockUtil::inet_ntoa(addr) != peer_ip) { WarnL << "收到其他地址的rtp数据:" << SockUtil::inet_ntoa(addr); return; @@ -367,6 +368,7 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int track_idx) { if (!strongSelf) { return; } + Logger::setThreadContext(weakSelf); if (SockUtil::inet_ntoa(addr) != peer_ip) { WarnL << "收到其他地址的rtcp数据:" << SockUtil::inet_ntoa(addr); return; diff --git a/src/Rtsp/RtspPusher.cpp b/src/Rtsp/RtspPusher.cpp index 4748fb15..4ffa6d02 100644 --- a/src/Rtsp/RtspPusher.cpp +++ b/src/Rtsp/RtspPusher.cpp @@ -328,6 +328,7 @@ void RtspPusher::handleResSetup(const Parser &parser, unsigned int track_idx) { if (!strongSelf) { return; } + Logger::setThreadContext(weakSelf); if (SockUtil::inet_ntoa(addr) != peer_ip) { WarnL << "收到其他地址的rtcp数据:" << SockUtil::inet_ntoa(addr); return; diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index d780a66d..f22ea1b1 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -1009,7 +1009,7 @@ void RtspSession::startListenPeerUdpData(int track_idx) { if (!strong_self) { return false; } - + Logger::setThreadContext(weak_self); if (SockUtil::inet_ntoa(peer_addr) != peer_ip) { WarnP(strong_self.get()) << ((interleaved % 2 == 0) ? "收到其他地址的rtp数据:" : "收到其他地址的rtcp数据:") << SockUtil::inet_ntoa(peer_addr); @@ -1023,6 +1023,7 @@ void RtspSession::startListenPeerUdpData(int track_idx) { return; } try { + Logger::setThreadContext(weak_self); strong_self->onRcvPeerUdpData(interleaved, buf, addr); } catch (SockException &ex) { strong_self->shutdown(ex); @@ -1048,7 +1049,7 @@ void RtspSession::startListenPeerUdpData(int track_idx) { WarnP(this) << "udp端口为空:" << interleaved; return; } - sock->setOnRead([onUdpData,interleaved](const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr , int addr_len){ + sock->setOnRead([onUdpData, interleaved](const Buffer::Ptr &pBuf, struct sockaddr *pPeerAddr, int addr_len) { onUdpData(pBuf, pPeerAddr, interleaved); }); }; diff --git a/srt/SrtSession.cpp b/srt/SrtSession.cpp index 9f83ddde..228c8c34 100644 --- a/srt/SrtSession.cpp +++ b/srt/SrtSession.cpp @@ -125,12 +125,10 @@ void SrtSession::onError(const SockException &err) { // 防止互相引用导致不释放 auto transport = std::move(_transport); - getPoller()->async( - [transport] { - //延时减引用,防止使用transport对象时,销毁对象 - //transport->onShutdown(err); - }, - false); + async([transport] { + // 延时减引用,防止使用transport对象时,销毁对象 + // transport->onShutdown(err); + },false); } void SrtSession::onManager() { diff --git a/srt/SrtTransportImp.cpp b/srt/SrtTransportImp.cpp index 56d6bd8a..ed48d5af 100644 --- a/srt/SrtTransportImp.cpp +++ b/srt/SrtTransportImp.cpp @@ -123,6 +123,7 @@ bool SrtTransportImp::close(mediakit::MediaSource &sender) { getPoller()->async([weak_self, err]() { auto strong_self = weak_self.lock(); if (strong_self) { + Logger::setThreadContext(weak_self); strong_self->onShutdown(SockException(Err_shutdown, err)); // 主动关闭推流,那么不延时注销 strong_self->_muxer = nullptr; @@ -158,6 +159,7 @@ void SrtTransportImp::emitOnPublish() { if (!strong_self) { return; } + Logger::setThreadContext(weak_self); if (err.empty()) { strong_self->_muxer = std::make_shared(strong_self->_media_info,0.0f, option); @@ -187,6 +189,7 @@ void SrtTransportImp::emitOnPlay() { return; } strong_self->getPoller()->async([strong_self, err] { + Logger::setThreadContext(strong_self); if (err != "") { strong_self->onShutdown(SockException(Err_refused, err)); } else { diff --git a/webrtc/WebRtcPusher.cpp b/webrtc/WebRtcPusher.cpp index 92b4ad04..e20a7571 100644 --- a/webrtc/WebRtcPusher.cpp +++ b/webrtc/WebRtcPusher.cpp @@ -50,6 +50,7 @@ bool WebRtcPusher::close(MediaSource &sender) { getPoller()->async([weak_self, err]() { auto strong_self = weak_self.lock(); if (strong_self) { + Logger::setThreadContext(strong_self); strong_self->onShutdown(SockException(Err_shutdown, err)); //主动关闭推流,那么不延时注销 strong_self->_push_src = nullptr; diff --git a/webrtc/WebRtcSession.cpp b/webrtc/WebRtcSession.cpp index ee3051c0..cd4a101c 100644 --- a/webrtc/WebRtcSession.cpp +++ b/webrtc/WebRtcSession.cpp @@ -77,6 +77,7 @@ void WebRtcSession::onRecv_l(const char *data, size_t len) { if (strong_server) { auto session = static_pointer_cast(strong_server->createSession(sock)); //2、创建新的WebRtcSession对象(绑定到WebRtcTransport所在线程),重新处理一遍ice binding request命令 + Logger::setThreadContext(session); session->onRecv_l(str.data(), str.size()); } }); @@ -112,6 +113,7 @@ void WebRtcSession::onError(const SockException &err) { auto transport = std::move(_transport); getPoller()->async([transport, self]() mutable { //延时减引用,防止使用transport对象时,销毁对象 + Logger::setThreadContext(transport); transport->removeTuple(self.get()); //确保transport在Session对象前销毁,防止WebRtcTransport::onDestory()时获取不到Session对象 transport = nullptr; diff --git a/webrtc/WebRtcTransport.cpp b/webrtc/WebRtcTransport.cpp index 3f798358..1cd0b1ab 100644 --- a/webrtc/WebRtcTransport.cpp +++ b/webrtc/WebRtcTransport.cpp @@ -111,7 +111,7 @@ const EventPoller::Ptr &WebRtcTransport::getPoller() const { return _poller; } -const string &WebRtcTransport::getIdentifier() const { +string WebRtcTransport::getIdentifier() const { return _identifier; } @@ -1081,6 +1081,7 @@ void WebRtcTransportImp::safeShutdown(const SockException &ex) { std::weak_ptr weak_self = static_pointer_cast(shared_from_this()); getPoller()->async([ex, weak_self]() { if (auto strong_self = weak_self.lock()) { + Logger::setThreadContext(weak_self); strong_self->onShutdown(ex); } }); diff --git a/webrtc/WebRtcTransport.h b/webrtc/WebRtcTransport.h index 2d1e6bbf..50e06380 100644 --- a/webrtc/WebRtcTransport.h +++ b/webrtc/WebRtcTransport.h @@ -35,12 +35,11 @@ extern const std::string kTcpPort; extern const std::string kTimeOutSec; }//namespace RTC -class WebRtcInterface { +class WebRtcInterface : public LogThreadContext { public: WebRtcInterface() = default; virtual ~WebRtcInterface() = default; virtual std::string getAnswerSdp(const std::string &offer) = 0; - virtual const std::string& getIdentifier() const = 0; virtual const std::string& deleteRandStr() const { static std::string s_null; return s_null; } }; @@ -53,10 +52,6 @@ public: std::string getAnswerSdp(const std::string &offer) override { throw _ex; } - const std::string &getIdentifier() const override { - static std::string s_null; - return s_null; - } private: SockException _ex; @@ -92,7 +87,7 @@ public: /** * 获取对象唯一id */ - const std::string& getIdentifier() const override; + std::string getIdentifier() const override; const std::string& deleteRandStr() const override; /**