From 5686027fc2313c85b5f58eba91781b03500699d9 Mon Sep 17 00:00:00 2001 From: xia-chu <771730766@qq.com> Date: Tue, 9 Sep 2025 21:59:43 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8DMediaSource::close=E7=BA=BF?= =?UTF-8?q?=E7=A8=8B=E5=AE=89=E5=85=A8=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 主动或无人观看关闭流可能会由于线程安全问题导致崩溃 --- server/WebApi.cpp | 5 ++--- server/WebHook.cpp | 7 +++++-- src/Common/MediaSource.cpp | 4 ++-- src/Player/PlayerProxy.cpp | 13 +++---------- src/Rtmp/RtmpSession.cpp | 4 +--- src/Rtsp/RtspSession.cpp | 4 +--- src/Shell/ShellCMD.h | 6 +++--- srt/SrtTransportImp.cpp | 12 ++---------- webrtc/WebRtcPusher.cpp | 17 ++++------------- 9 files changed, 23 insertions(+), 49 deletions(-) diff --git a/server/WebApi.cpp b/server/WebApi.cpp index 978bfe6c..d7964e20 100755 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -1106,9 +1106,8 @@ void installWebApi() { bool force = allArgs["force"].as(); for (auto &media : media_list) { - if (media->close(force)) { - ++count_closed; - } + media->getOwnerPoller()->async([media, force]() { media->close(force); }); + ++count_closed; } val["count_hit"] = count_hit; val["count_closed"] = count_closed; diff --git a/server/WebHook.cpp b/server/WebHook.cpp index ab65571c..ddac1083 100755 --- a/server/WebHook.cpp +++ b/server/WebHook.cpp @@ -634,7 +634,10 @@ void installWebHook() { // 边沿站无人观看时如果是拉流的则立即停止溯源 [AUTO-TRANSLATED:a1429c77] // If no one is watching at the edge station, stop tracing immediately if it is pulling if (!auto_close) { - sender.close(false); + auto ptr = sender.shared_from_this(); + sender.getOwnerPoller()->async([ptr]() { + ptr->close(false); + }); WarnL << "Auto close stream when none reader: " << sender.getOriginUrl(); } return; @@ -661,7 +664,7 @@ void installWebHook() { if (!flag || !err.empty() || !strongSrc) { return; } - strongSrc->close(false); + strongSrc->getOwnerPoller()->async([strongSrc]() { strongSrc->close(false); }); WarnL << "无人观看主动关闭流:" << strongSrc->getOriginUrl(); }); }); diff --git a/src/Common/MediaSource.cpp b/src/Common/MediaSource.cpp index b10db094..33877847 100644 --- a/src/Common/MediaSource.cpp +++ b/src/Common/MediaSource.cpp @@ -698,13 +698,13 @@ void MediaSourceEvent::onReaderChanged(MediaSource &sender, int size){ // 此流被标记为无人观看自动关闭流 [AUTO-TRANSLATED:64a0dac3] // This stream is marked as an automatically closed stream with no viewers. WarnL << "Auto close stream when none reader: " << strong_sender->getUrl(); - strong_sender->close(false); + strong_sender->getOwnerPoller()->async([strong_sender]() { strong_sender->close(false); }); } } else { // 这个是mp4点播,我们自动关闭 [AUTO-TRANSLATED:8a7b9a90] // This is an mp4 on-demand, we automatically close it. WarnL << "MP4点播无人观看,自动关闭:" << strong_sender->getUrl(); - strong_sender->close(false); + strong_sender->getOwnerPoller()->async([strong_sender]() { strong_sender->close(false); }); } return false; }, specified_poller); diff --git a/src/Player/PlayerProxy.cpp b/src/Player/PlayerProxy.cpp index 9ccd701f..a7cea3e7 100644 --- a/src/Player/PlayerProxy.cpp +++ b/src/Player/PlayerProxy.cpp @@ -254,16 +254,9 @@ void PlayerProxy::rePlay(const string &strUrl, int iFailedCnt) { bool PlayerProxy::close(MediaSource &sender) { // 通知其停止推流 [AUTO-TRANSLATED:d69d10d8] // Notify it to stop pushing the stream - weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - getPoller()->async_first([weakSelf]() { - auto strongSelf = weakSelf.lock(); - if (!strongSelf) { - return; - } - strongSelf->_muxer.reset(); - strongSelf->setMediaSource(nullptr); - strongSelf->teardown(); - }); + _muxer = nullptr; + setMediaSource(nullptr); + teardown(); _on_close(SockException(Err_shutdown, "closed by user")); WarnL << "close media: " << sender.getUrl(); return true; diff --git a/src/Rtmp/RtmpSession.cpp b/src/Rtmp/RtmpSession.cpp index c2642b56..89e36f87 100644 --- a/src/Rtmp/RtmpSession.cpp +++ b/src/Rtmp/RtmpSession.cpp @@ -591,9 +591,7 @@ void RtmpSession::onSendMedia(const RtmpPacket::Ptr &pkt) { } bool RtmpSession::close(MediaSource &sender) { - //此回调在其他线程触发 - string err = StrPrinter << "close media: " << sender.getUrl(); - safeShutdown(SockException(Err_shutdown, err)); + shutdown(SockException(Err_shutdown, "close media: " + sender.getUrl())); return true; } diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index 227862b7..bfbfaf42 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -1175,9 +1175,7 @@ int RtspSession::getTrackIndexByInterleaved(int interleaved) { } bool RtspSession::close(MediaSource &sender) { - //此回调在其他线程触发 - string err = StrPrinter << "close media: " << sender.getUrl(); - safeShutdown(SockException(Err_shutdown,err)); + shutdown(SockException(Err_shutdown,"close media: " + sender.getUrl())); return true; } diff --git a/src/Shell/ShellCMD.h b/src/Shell/ShellCMD.h index 3df4ee50..4b889dc2 100644 --- a/src/Shell/ShellCMD.h +++ b/src/Shell/ShellCMD.h @@ -36,9 +36,9 @@ public: if (!media) { break; } - if (!media->close(true)) { - break; - } + media->getOwnerPoller()->async([media]() { + media->close(true); + }); (*stream) << "\t踢出成功:" << media->getUrl() << "\r\n"; return; } while (0); diff --git a/srt/SrtTransportImp.cpp b/srt/SrtTransportImp.cpp index 841e5d85..8cd927ff 100644 --- a/srt/SrtTransportImp.cpp +++ b/srt/SrtTransportImp.cpp @@ -145,16 +145,8 @@ void SrtTransportImp::onShutdown(const SockException &ex) { } bool SrtTransportImp::close(mediakit::MediaSource &sender) { - std::string err = StrPrinter << "close media: " << sender.getUrl(); - weak_ptr weak_self = static_pointer_cast(shared_from_this()); - getPoller()->async([weak_self, err]() { - auto strong_self = weak_self.lock(); - if (strong_self) { - strong_self->onShutdown(SockException(Err_shutdown, err)); - // 主动关闭推流,那么不延时注销 - strong_self->_muxer = nullptr; - } - }); + onShutdown(SockException(Err_shutdown, "close media: " + sender.getUrl())); + _muxer = nullptr; return true; } diff --git a/webrtc/WebRtcPusher.cpp b/webrtc/WebRtcPusher.cpp index a0ac0bd6..0a9c7c62 100644 --- a/webrtc/WebRtcPusher.cpp +++ b/webrtc/WebRtcPusher.cpp @@ -42,19 +42,10 @@ WebRtcPusher::WebRtcPusher(const EventPoller::Ptr &poller, } bool WebRtcPusher::close(MediaSource &sender) { - // 此回调在其他线程触发 [AUTO-TRANSLATED:c98e7686] - // This callback is triggered in another thread - string err = StrPrinter << "close media: " << sender.getUrl(); - weak_ptr weak_self = static_pointer_cast(shared_from_this()); - getPoller()->async([weak_self, err]() { - auto strong_self = weak_self.lock(); - if (strong_self) { - strong_self->onShutdown(SockException(Err_shutdown, err)); - // 主动关闭推流,那么不延时注销 [AUTO-TRANSLATED:ee7cc580] - // Actively close the stream, then do not delay the logout - strong_self->_push_src = nullptr; - } - }); + onShutdown(SockException(Err_shutdown, "close media: " + sender.getUrl())); + // 主动关闭推流,那么不延时注销 [AUTO-TRANSLATED:ee7cc580] + // Actively close the stream, then do not delay the logout + _push_src = nullptr; return true; }