diff --git a/server/WebApi.cpp b/server/WebApi.cpp index 1980ee02..bbfc803d 100644 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -761,11 +761,20 @@ void installWebApi() { RtpServer::Ptr server = std::make_shared(); server->start(allArgs["port"], allArgs["stream_id"], allArgs["enable_tcp"].as()); - val["port"] = server->getPort(); + + auto port = server->getPort(); + server->setOnDetach([port]() { + //设置rtp超时移除事件 + lock_guard lck(s_rtpServerMapMtx); + s_rtpServerMap.erase(port); + }); //保存对象 lock_guard lck(s_rtpServerMapMtx); - s_rtpServerMap.emplace(server->getPort(), server); + s_rtpServerMap.emplace(port, server); + + //回复json + val["port"] = port; }); api_regist1("/index/api/closeRtpServer",[](API_ARGS1){ diff --git a/src/Rtp/RtpProcess.cpp b/src/Rtp/RtpProcess.cpp index b3f32d19..0693ddae 100644 --- a/src/Rtp/RtpProcess.cpp +++ b/src/Rtp/RtpProcess.cpp @@ -177,6 +177,16 @@ bool RtpProcess::alive() { return false; } +void RtpProcess::onDetach(){ + if(_on_detach){ + _on_detach(); + } +} + +void RtpProcess::setOnDetach(const function &cb) { + _on_detach = cb; +} + string RtpProcess::get_peer_ip() { if(_addr){ return SockUtil::inet_ntoa(((struct sockaddr_in *) _addr)->sin_addr); diff --git a/src/Rtp/RtpProcess.h b/src/Rtp/RtpProcess.h index 1f53451b..81dc7f2f 100644 --- a/src/Rtp/RtpProcess.h +++ b/src/Rtp/RtpProcess.h @@ -44,6 +44,16 @@ public: */ bool alive(); + /** + * 超时时被RtpSelector移除时触发 + */ + void onDetach(); + + /** + * 设置onDetach事件回调 + */ + void setOnDetach(const function &cb); + /// SockInfo override string get_local_ip() override; uint16_t get_local_port() override; @@ -79,6 +89,7 @@ private: MediaInfo _media_info; uint64_t _total_bytes = 0; Socket::Ptr _sock; + function _on_detach; }; }//namespace mediakit diff --git a/src/Rtp/RtpSelector.cpp b/src/Rtp/RtpSelector.cpp index 06979972..fb790048 100644 --- a/src/Rtp/RtpSelector.cpp +++ b/src/Rtp/RtpSelector.cpp @@ -91,7 +91,9 @@ void RtpSelector::onManager() { continue; } WarnL << "RtpProcess timeout:" << it->first; + auto process = it->second->getProcess(); it = _map_rtp_process.erase(it); + process->onDetach(); } } diff --git a/src/Rtp/RtpServer.cpp b/src/Rtp/RtpServer.cpp index 365aceb2..f7a4c9eb 100644 --- a/src/Rtp/RtpServer.cpp +++ b/src/Rtp/RtpServer.cpp @@ -69,6 +69,13 @@ void RtpServer::start(uint16_t local_port, const string &stream_id, bool enable _tcp_server = tcp_server; _udp_server = udp_server; + _rtp_process = process; +} + +void RtpServer::setOnDetach(const function &cb){ + if(_rtp_process){ + _rtp_process->setOnDetach(cb); + } } EventPoller::Ptr RtpServer::getPoller() { diff --git a/src/Rtp/RtpServer.h b/src/Rtp/RtpServer.h index 2623ae2b..bcc045bc 100644 --- a/src/Rtp/RtpServer.h +++ b/src/Rtp/RtpServer.h @@ -52,9 +52,15 @@ public: */ EventPoller::Ptr getPoller(); + /** + * 设置RtpProcess onDetach事件回调 + */ + void setOnDetach(const function &cb); + protected: Socket::Ptr _udp_server; TcpServer::Ptr _tcp_server; + RtpProcess::Ptr _rtp_process; function _on_clearup; };