提取webrtc推流、播放代码为单独的派生类

This commit is contained in:
ziyue
2021-10-15 16:27:17 +08:00
parent 8531b5e1cb
commit 7f3f47abbb
7 changed files with 379 additions and 229 deletions

View File

@@ -15,6 +15,7 @@
#include "Rtcp/RtcpFCI.h"
#include "Rtsp/RtpReceiver.h"
#define RTP_SSRC_OFFSET 1
#define RTX_SSRC_OFFSET 2
#define RTP_CNAME "zlmediakit-rtp"
#define RTP_LABEL "zlmediakit-label"
@@ -207,16 +208,16 @@ std::string WebRtcTransport::getAnswerSdp(const string &offer){
}
}
bool is_dtls(char *buf) {
static bool is_dtls(char *buf) {
return ((*buf > 19) && (*buf < 64));
}
bool is_rtp(char *buf) {
static bool is_rtp(char *buf) {
RtpHeader *header = (RtpHeader *) buf;
return ((header->pt < 64) || (header->pt >= 96));
}
bool is_rtcp(char *buf) {
static bool is_rtcp(char *buf) {
RtpHeader *header = (RtpHeader *) buf;
return ((header->pt >= 64) && (header->pt < 96));
}
@@ -285,14 +286,6 @@ void WebRtcTransport::sendRtcpPacket(const char *buf, int len, bool flush, void
}
///////////////////////////////////////////////////////////////////////////////////
WebRtcTransportImp::Ptr WebRtcTransportImp::create(const EventPoller::Ptr &poller){
WebRtcTransportImp::Ptr ret(new WebRtcTransportImp(poller), [](WebRtcTransportImp *ptr){
ptr->onDestory();
delete ptr;
});
ret->onCreate();
return ret;
}
void WebRtcTransportImp::onCreate(){
WebRtcTransport::onCreate();
@@ -327,46 +320,6 @@ WebRtcTransportImp::~WebRtcTransportImp() {
void WebRtcTransportImp::onDestory() {
WebRtcTransport::onDestory();
unregisterSelf();
if (!_session) {
return;
}
uint64_t duration = _alive_ticker.createdTime() / 1000;
//流量统计事件广播
GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold);
if (_reader) {
WarnL << "RTC播放器("
<< _media_info._vhost << "/"
<< _media_info._app << "/"
<< _media_info._streamid
<< ")结束播放,耗时(s):" << duration;
if (_bytes_usage >= iFlowThreshold * 1024) {
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _bytes_usage, duration, true, static_cast<SockInfo &>(*_session));
}
}
if (_push_src) {
WarnL << "RTC推流器("
<< _media_info._vhost << "/"
<< _media_info._app << "/"
<< _media_info._streamid
<< ")结束推流,耗时(s):" << duration;
if (_bytes_usage >= iFlowThreshold * 1024) {
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _bytes_usage, duration, false, static_cast<SockInfo &>(*_session));
}
}
}
void WebRtcTransportImp::attach(const RtspMediaSource::Ptr &src, const MediaInfo &info, bool is_play) {
assert(src);
_media_info = info;
if (is_play) {
_play_src = src;
} else {
_push_src = src;
}
}
void WebRtcTransportImp::onSendSockData(const char *buf, size_t len, struct sockaddr_in *dst, bool flush) {
@@ -384,9 +337,6 @@ void WebRtcTransportImp::onSendSockData(const char *buf, size_t len, struct sock
///////////////////////////////////////////////////////////////////
bool WebRtcTransportImp::canSendRtp() const{
if (!_play_src) {
return false;
}
for (auto &m : _answer_sdp->media) {
if (m.direction == RtpDirection::sendrecv || m.direction == RtpDirection::sendonly) {
return true;
@@ -396,9 +346,6 @@ bool WebRtcTransportImp::canSendRtp() const{
}
bool WebRtcTransportImp::canRecvRtp() const{
if (!_push_src) {
return false;
}
for (auto &m : _answer_sdp->media) {
if (m.direction == RtpDirection::sendrecv || m.direction == RtpDirection::recvonly) {
return true;
@@ -422,6 +369,8 @@ void WebRtcTransportImp::onStartWebRTC() {
track->plan_rtx = m_answer.getRelatedRtxPlan(track->plan_rtp->pt);
track->rtcp_context_send = std::make_shared<RtcpContextForSend>();
//rtp track type --> MediaTrack
_type_to_track[m_answer.type] = track;
//send ssrc --> MediaTrack
_ssrc_to_track[track->answer_ssrc_rtp] = track;
_ssrc_to_track[track->answer_ssrc_rtx] = track;
@@ -460,50 +409,6 @@ void WebRtcTransportImp::onStartWebRTC() {
}
}
}
if (canRecvRtp()) {
_push_src->setSdp(_answer_sdp->toRtspSdp());
_simulcast = _answer_sdp->supportSimulcast();
}
if (canSendRtp()) {
RtcSession rtsp_send_sdp;
rtsp_send_sdp.loadFrom(_play_src->getSdp(), false);
for (auto &m : _answer_sdp->media) {
if (m.type == TrackApplication) {
continue;
}
auto rtsp_media = rtsp_send_sdp.getMedia(m.type);
if (rtsp_media && getCodecId(rtsp_media->plan[0].codec) == getCodecId(m.plan[0].codec)) {
auto it = _pt_to_track.find(m.plan[0].pt);
CHECK(it != _pt_to_track.end());
//记录发送rtp时约定的信息届时发送rtp时需要修改pt和ssrc
_type_to_track[m.type] = it->second.second;
}
}
_play_src->pause(false);
_reader = _play_src->getRing()->attach(getPoller(), true);
weak_ptr<WebRtcTransportImp> weak_self = static_pointer_cast<WebRtcTransportImp>(shared_from_this());
_reader->setReadCB([weak_self](const RtspMediaSource::RingDataType &pkt) {
auto strongSelf = weak_self.lock();
if (!strongSelf) {
return;
}
size_t i = 0;
pkt->for_each([&](const RtpPacket::Ptr &rtp) {
strongSelf->onSendRtp(rtp, ++i == pkt->size());
});
});
_reader->setDetachCB([weak_self](){
auto strongSelf = weak_self.lock();
if (!strongSelf) {
return;
}
strongSelf->onShutdown(SockException(Err_shutdown, "rtsp ring buffer detached"));
});
}
//使用完毕后,释放强引用,这样确保推流器断开后能及时注销媒体
_play_src = nullptr;
}
void WebRtcTransportImp::onCheckAnswer(RtcSession &sdp) {
@@ -533,7 +438,8 @@ void WebRtcTransportImp::onCheckAnswer(RtcSession &sdp) {
//添加answer sdp的ssrc信息
m.rtp_rtx_ssrc.emplace_back();
auto &ssrc = m.rtp_rtx_ssrc.back();
ssrc.ssrc = _play_src->getSsrc(m.type);
//发送的ssrc我们随便定义因为在发送rtp时会修改为此值
ssrc.ssrc = m.type + RTP_SSRC_OFFSET;
ssrc.cname = RTP_CNAME;
ssrc.label = RTP_LABEL;
ssrc.mslabel = RTP_MSLABEL;
@@ -557,20 +463,6 @@ void WebRtcTransportImp::onCheckSdp(SdpType type, RtcSession &sdp) {
void WebRtcTransportImp::onRtcConfigure(RtcConfigure &configure) const {
WebRtcTransport::onRtcConfigure(configure);
if (_play_src) {
//这是播放,同时也可能有推流
configure.video.direction = _push_src ? RtpDirection::sendrecv : RtpDirection::sendonly;
configure.audio.direction = configure.video.direction;
configure.setPlayRtspInfo(_play_src->getSdp());
} else if (_push_src) {
//这只是推流
configure.video.direction = RtpDirection::recvonly;
configure.audio.direction = RtpDirection::recvonly;
} else {
throw std::invalid_argument("未设置播放或推流的媒体源");
}
//添加接收端口candidate信息
configure.addCandidate(*getIceCandidate());
}
@@ -872,35 +764,12 @@ void WebRtcTransportImp::onSortedRtp(MediaTrack &track, const string &rid, RtpPa
}
}
if (!_simulcast) {
assert(_push_src);
_push_src->onWrite(rtp, false);
return;
}
if (rtp->type == TrackAudio) {
//音频
for (auto &pr : _push_src_simulcast) {
pr.second->onWrite(rtp, false);
}
} else {
//视频
auto &src = _push_src_simulcast[rid];
if (!src) {
auto stream_id = rid.empty() ? _push_src->getId() : _push_src->getId() + "_" + rid;
auto src_imp = std::make_shared<RtspMediaSourceImp>(_push_src->getVhost(), _push_src->getApp(), stream_id);
src_imp->setSdp(_push_src->getSdp());
src_imp->setProtocolTranslation(_push_src->isRecording(Recorder::type_hls),_push_src->isRecording(Recorder::type_mp4));
src_imp->setListener(static_pointer_cast<WebRtcTransportImp>(shared_from_this()));
src = src_imp;
}
src->onWrite(std::move(rtp), false);
}
onRecvRtp(track, rid, std::move(rtp));
}
///////////////////////////////////////////////////////////////////
void WebRtcTransportImp::onSendRtp(const RtpPacket::Ptr &rtp, bool flush, bool rtx){
void WebRtcTransportImp::onSendRtp(const RtpPacket::Ptr &rtp, bool flush, bool rtx) {
auto &track = _type_to_track[rtp->type];
if (!track) {
//忽略,对方不支持该编码类型
@@ -969,48 +838,24 @@ void WebRtcTransportImp::onShutdown(const SockException &ex){
}
}
/////////////////////////////////////////////////////////////////////////////////////////////
bool WebRtcTransportImp::close(MediaSource &sender, bool force) {
//此回调在其他线程触发
if (!force && totalReaderCount(sender)) {
return false;
}
string err = StrPrinter << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force;
weak_ptr<WebRtcTransportImp> weak_self = static_pointer_cast<WebRtcTransportImp>(shared_from_this());
getPoller()->async([weak_self, err]() {
auto strong_self = weak_self.lock();
if (strong_self) {
strong_self->onShutdown(SockException(Err_shutdown, err));
}
});
return true;
}
int WebRtcTransportImp::totalReaderCount(MediaSource &sender) {
auto total_count = 0;
for (auto &src : _push_src_simulcast) {
total_count += src.second->totalReaderCount();
}
return total_count + _push_src->totalReaderCount();
}
MediaOriginType WebRtcTransportImp::getOriginType(MediaSource &sender) const {
return MediaOriginType::rtc_push;
}
string WebRtcTransportImp::getOriginUrl(MediaSource &sender) const {
return _media_info._full_url;
}
std::shared_ptr<SockInfo> WebRtcTransportImp::getOriginSock(MediaSource &sender) const {
return static_pointer_cast<SockInfo>(_session);
}
void WebRtcTransportImp::setSession(Session::Ptr session) {
_session = std::move(session);
}
const Session::Ptr &WebRtcTransportImp::getSession() const {
return _session;
}
uint64_t WebRtcTransportImp::getBytesUsage() const{
return _bytes_usage;
}
uint64_t WebRtcTransportImp::getDuration() const{
return _alive_ticker.createdTime() / 1000;
}
/////////////////////////////////////////////////////////////////////////////////////////////
class WebRtcTransportManager {
mutable mutex _mtx;
unordered_map<string, weak_ptr<WebRtcTransportImp> > _map;