Files
ZLMediaKit/src/Rtp/RtpServer.cpp
xiongziliang 946945ce7b 优化代码
整理代码
2022-11-12 22:06:09 +08:00

279 lines
10 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/*
* Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
*
* This file is part of ZLMediaKit(https://github.com/xia-chu/ZLMediaKit).
*
* Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
*/
#if defined(ENABLE_RTPPROXY)
#include "Util/uv_errno.h"
#include "RtpServer.h"
#include "RtpSelector.h"
#include "Rtcp/RtcpContext.h"
using namespace std;
using namespace toolkit;
namespace mediakit{
RtpServer::~RtpServer() {
if (_on_cleanup) {
_on_cleanup();
}
}
class RtcpHelper: public std::enable_shared_from_this<RtcpHelper> {
public:
using Ptr = std::shared_ptr<RtcpHelper>;
RtcpHelper(Socket::Ptr rtcp_sock, std::string stream_id) {
_rtcp_sock = std::move(rtcp_sock);
_stream_id = std::move(stream_id);
}
~RtcpHelper() {
if (_process) {
// 删除rtp处理器
RtpSelector::Instance().delProcess(_stream_id, _process.get());
}
}
void setRtpServerInfo(uint16_t local_port,RtpServer::TcpMode mode,bool re_use_port,uint32_t ssrc){
_local_port = local_port;
_tcp_mode = mode;
_re_use_port = re_use_port;
_ssrc = ssrc;
}
void setOnDetach(function<void()> cb) {
if (_process) {
_process->setOnDetach(std::move(cb));
} else {
_on_detach = std::move(cb);
}
}
void onRecvRtp(const Socket::Ptr &sock, const Buffer::Ptr &buf, struct sockaddr *addr) {
if (!_process) {
_process = RtpSelector::Instance().getProcess(_stream_id, true);
_process->setOnDetach(std::move(_on_detach));
cancelDelayTask();
}
_process->inputRtp(true, sock, buf->data(), buf->size(), addr);
// 统计rtp接受情况用于发送rr包
auto header = (RtpHeader *)buf->data();
sendRtcp(ntohl(header->ssrc), addr);
}
void startRtcp() {
weak_ptr<RtcpHelper> weak_self = shared_from_this();
_rtcp_sock->setOnRead([weak_self](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) {
// 用于接受rtcp打洞包
auto strong_self = weak_self.lock();
if (!strong_self || !strong_self->_process) {
return;
}
if (!strong_self->_rtcp_addr) {
// 只设置一次rtcp对端端口
strong_self->_rtcp_addr = std::make_shared<struct sockaddr_storage>();
memcpy(strong_self->_rtcp_addr.get(), addr, addr_len);
}
auto rtcps = RtcpHeader::loadFromBytes(buf->data(), buf->size());
for (auto &rtcp : rtcps) {
strong_self->_process->onRtcp(rtcp);
}
});
GET_CONFIG(uint64_t, timeoutSec, RtpProxy::kTimeoutSec);
_delay_task = _rtcp_sock->getPoller()->doDelayTask(timeoutSec * 1000, [weak_self]() {
if (auto strong_self = weak_self.lock()) {
auto process = RtpSelector::Instance().getProcess(strong_self->_stream_id, false);
if (!process && strong_self->_on_detach) {
strong_self->_on_detach();
}
if (!process) { // process 未创建触发rtp server 超时事件
NoticeCenter::Instance().emitEvent(Broadcast::KBroadcastRtpServerTimeout,strong_self->_local_port,strong_self->_stream_id,(int)strong_self->_tcp_mode,strong_self->_re_use_port,strong_self->_ssrc);
}
}
return 0;
});
}
void cancelDelayTask() {
if (_delay_task) {
_delay_task->cancel();
_delay_task = nullptr;
}
}
private:
void sendRtcp(uint32_t rtp_ssrc, struct sockaddr *addr) {
// 每5秒发送一次rtcp
if (_ticker.elapsedTime() < 5000 || !_process) {
return;
}
_ticker.resetTime();
auto rtcp_addr = (struct sockaddr *)_rtcp_addr.get();
if (!rtcp_addr) {
// 默认的rtcp端口为rtp端口+1
switch (addr->sa_family) {
case AF_INET: ((sockaddr_in *)addr)->sin_port = htons(ntohs(((sockaddr_in *)addr)->sin_port) + 1); break;
case AF_INET6: ((sockaddr_in6 *)addr)->sin6_port = htons(ntohs(((sockaddr_in6 *)addr)->sin6_port) + 1); break;
}
// 未收到rtcp打洞包时采用默认的rtcp端口
rtcp_addr = addr;
}
_rtcp_sock->send(_process->createRtcpRR(rtp_ssrc + 1, rtp_ssrc), rtcp_addr);
}
private:
bool _re_use_port = false;
uint16_t _local_port = 0;
uint32_t _ssrc = 0;
RtpServer::TcpMode _tcp_mode = RtpServer::NONE;
Ticker _ticker;
Socket::Ptr _rtcp_sock;
RtpProcess::Ptr _process;
std::string _stream_id;
function<void()> _on_detach;
std::shared_ptr<struct sockaddr_storage> _rtcp_addr;
EventPoller::DelayTask::Ptr _delay_task;
};
void RtpServer::start(uint16_t local_port, const string &stream_id, TcpMode tcp_mode, const char *local_ip, bool re_use_port, uint32_t ssrc) {
//创建udp服务器
Socket::Ptr rtp_socket = Socket::createSocket(nullptr, true);
Socket::Ptr rtcp_socket = Socket::createSocket(nullptr, true);
if (local_port == 0) {
//随机端口rtp端口采用偶数
auto pair = std::make_pair(rtp_socket, rtcp_socket);
makeSockPair(pair, local_ip, re_use_port);
} else if (!rtp_socket->bindUdpSock(local_port, local_ip, re_use_port)) {
//用户指定端口
throw std::runtime_error(StrPrinter << "创建rtp端口 " << local_ip << ":" << local_port << " 失败:" << get_uv_errmsg(true));
} else if (!rtcp_socket->bindUdpSock(rtp_socket->get_local_port() + 1, local_ip, re_use_port)) {
// rtcp端口
throw std::runtime_error(StrPrinter << "创建rtcp端口 " << local_ip << ":" << rtp_socket->get_local_port() + 1 << " 失败:" << get_uv_errmsg(true));
}
//设置udp socket读缓存
SockUtil::setRecvBuf(rtp_socket->rawFD(), 4 * 1024 * 1024);
TcpServer::Ptr tcp_server;
_tcp_mode = tcp_mode;
if (tcp_mode == PASSIVE || tcp_mode == ACTIVE) {
//创建tcp服务器
tcp_server = std::make_shared<TcpServer>(rtp_socket->getPoller());
(*tcp_server)[RtpSession::kStreamID] = stream_id;
(*tcp_server)[RtpSession::kIsUDP] = 0;
(*tcp_server)[RtpSession::kSSRC] = ssrc;
if (tcp_mode == PASSIVE) {
tcp_server->start<RtpSession>(rtp_socket->get_local_port(), local_ip);
} else if (stream_id.empty()) {
// tcp主动模式时只能一个端口一个流必须指定流id; 创建TcpServer对象也仅用于传参
throw std::runtime_error(StrPrinter << "tcp主动模式时必需指定流id");
}
}
//创建udp服务器
UdpServer::Ptr udp_server;
RtcpHelper::Ptr helper;
if (!stream_id.empty()) {
//指定了流id那么一个端口一个流(不管是否包含多个ssrc的多个流绑定rtp源后会筛选掉ip端口不匹配的流)
helper = std::make_shared<RtcpHelper>(std::move(rtcp_socket), stream_id);
helper->startRtcp();
helper->setRtpServerInfo(local_port,tcp_mode,re_use_port,ssrc);
rtp_socket->setOnRead([rtp_socket, helper, ssrc](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) {
RtpHeader *header = (RtpHeader *)buf->data();
auto rtp_ssrc = ntohl(header->ssrc);
if (ssrc && rtp_ssrc != ssrc) {
WarnL << "ssrc不匹配,rtp已丢弃:" << rtp_ssrc << " != " << ssrc;
} else {
helper->onRecvRtp(rtp_socket, buf, addr);
}
});
} else {
#if 1
//单端口多线程接收多个流根据ssrc区分流
udp_server = std::make_shared<UdpServer>(rtp_socket->getPoller());
(*udp_server)[RtpSession::kIsUDP] = 1;
udp_server->start<RtpSession>(rtp_socket->get_local_port(), local_ip);
rtp_socket = nullptr;
#else
//单端口单线程接收多个流
auto &ref = RtpSelector::Instance();
rtp_socket->setOnRead([&ref, rtp_socket](const Buffer::Ptr &buf, struct sockaddr *addr, int) {
ref.inputRtp(rtp_socket, buf->data(), buf->size(), addr);
});
#endif
}
_on_cleanup = [rtp_socket, stream_id]() {
if (rtp_socket) {
//去除循环引用
rtp_socket->setOnRead(nullptr);
}
};
_tcp_server = tcp_server;
_udp_server = udp_server;
_rtp_socket = rtp_socket;
_rtcp_helper = helper;
}
void RtpServer::setOnDetach(function<void()> cb) {
if (_rtcp_helper) {
_rtcp_helper->setOnDetach(std::move(cb));
}
}
uint16_t RtpServer::getPort() {
return _udp_server ? _udp_server->getPort() : _rtp_socket->get_local_port();
}
void RtpServer::connectToServer(const std::string &url, uint16_t port, const function<void(const SockException &ex)> &cb) {
if (_tcp_mode != ACTIVE || !_rtp_socket) {
cb(SockException(Err_other, "仅支持tcp主动模式"));
return;
}
weak_ptr<RtpServer> weak_self = shared_from_this();
_rtp_socket->connect(url, port, [url, port, cb, weak_self](const SockException &err) {
auto strong_self = weak_self.lock();
if (!strong_self) {
cb(SockException(Err_other, "服务对象已释放"));
return;
}
if (err) {
WarnL << "连接到服务器 " << url << ":" << port << " 失败 " << err.what();
} else {
InfoL << "连接到服务器 " << url << ":" << port << " 成功";
strong_self->onConnect();
}
cb(err);
},
5.0F, "::", _rtp_socket->get_local_port());
}
void RtpServer::onConnect() {
auto rtp_session = std::make_shared<RtpSession>(_rtp_socket);
rtp_session->attachServer(*_tcp_server);
_rtp_socket->setOnRead([rtp_session](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) {
rtp_session->onRecv(buf);
});
weak_ptr<RtpServer> weak_self = shared_from_this();
_rtp_socket->setOnErr([weak_self](const SockException &err) {
if (auto strong_self = weak_self.lock()) {
strong_self->_rtp_socket->setOnRead(nullptr);
}
});
}
}//namespace mediakit
#endif//defined(ENABLE_RTPPROXY)