Files
ZLMediaKit/webrtc/WebRtcSignalingPeer.cpp
baigao-X 3fb43c5fef feat: 增加webrtc代理拉流 (#4389)
- 增加客户端模式,支持主动拉流、推流:
   - addStreamProxy接口新增支持whep主动拉流,拉流地址目前只兼容zlm的whep url。
   - addStreamPusherProxy接口新增支持whip主动推流,推流地址目前只兼容zlm的whip url。
   - 以上推流url格式为webrtc[s]://server_host:server_port/app/stream_id?key=value, 内部会自动转换为http[s]://server_host:server_port/index/api/[whip/whep]?app=app&stream=stream_id&key=value。

- 增加WebRtc p2p 模式:
  - 增加 ICE FULL模式。
  - 增加STUN/TURN 服务器。
  - 增加websocket 信令。
  - 增加P2P代理拉流。

---------

Co-authored-by: xia-chu <771730766@qq.com>
Co-authored-by: mtdxc <mtdxc@126.com>
Co-authored-by: cqm <cqm@97kid.com>
2025-09-20 16:23:30 +08:00

688 lines
26 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/*
* Copyright (c) 2016-present The ZLMediaKit project authors. All Rights Reserved.
*
* This file is part of ZLMediaKit(https://github.com/ZLMediaKit/ZLMediaKit).
*
* Use of this source code is governed by MIT-like license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
*/
#include "WebRtcSignalingPeer.h"
#include "WebRtcSignalingMsg.h"
#include "Util/util.h"
#include "Common/config.h"
#include "json/value.h"
using namespace std;
using namespace toolkit;
using namespace mediakit::Rtc;
namespace mediakit {
// 注册到的信令服务器列表
// 不允许注册到同一个服务器地址
static ServiceController<WebRtcSignalingPeer> s_room_keepers;
static inline string getRoomKeepersKey(const string &host, uint16_t &port) {
return host + ":" + std::to_string(port);
}
void addWebrtcRoomKeeper(const string &host, uint16_t port, const std::string& room_id, bool ssl,
const function<void(const SockException &ex, const string &key)> &cb) {
DebugL;
auto key = getRoomKeepersKey(host, port);
if (s_room_keepers.find(key)) {
//已经发起注册了
cb(SockException(Err_success), key);
return;
}
auto peer = s_room_keepers.make(key, host, port, ssl, room_id);
peer->setOnShutdown([key] (const SockException &ex) {
InfoL << "webrtc peer shutdown, key: " << key << ", " << ex.what();
s_room_keepers.erase(key);
});
peer->setOnConnect([peer, cb] (const SockException &ex) {
peer->regist(cb);
});
peer->connect();
}
void delWebrtcRoomKeeper(const std::string &key, const std::function<void(const SockException &ex)> &cb) {
auto peer = s_room_keepers.find(key);
if (!peer) {
return cb(SockException(Err_other, "room_key not exist"));
}
peer->unregist(cb);
s_room_keepers.erase(key);
}
void listWebrtcRoomKeepers(const std::function<void(const std::string& key, const WebRtcSignalingPeer::Ptr& p)> &cb) {
s_room_keepers.for_each(cb);
}
Json::Value ToJson(const WebRtcSignalingPeer::Ptr& p) {
return p->makeInfoJson();
}
WebRtcSignalingPeer::Ptr getWebrtcRoomKeeper(const string &host, uint16_t port) {
return s_room_keepers.find(getRoomKeepersKey(host, port));
}
//////////// WebRtcSignalingPeer //////////////////////////
WebRtcSignalingPeer::WebRtcSignalingPeer(const std::string &host, uint16_t port, bool ssl, const std::string &room_id, const EventPoller::Ptr &poller)
: WebSocketClient<TcpClient>(poller)
, _room_id(room_id) {
TraceL;
// TODO: not support wss now
_ws_url = StrPrinter << (ssl ? "wss://" : "ws://") + host << ":" << port << "/signaling";
_room_key = getRoomKeepersKey(host, port);
}
WebRtcSignalingPeer::~WebRtcSignalingPeer() {
DebugL << "room_id: " << _room_id;
}
void WebRtcSignalingPeer::connect() {
DebugL;
startWebSocket(_ws_url);
}
void WebRtcSignalingPeer::regist(const function<void(const SockException &ex, const string &key)> &cb) {
DebugL;
std::weak_ptr<WebRtcSignalingPeer> weak_self = std::static_pointer_cast<WebRtcSignalingPeer>(shared_from_this());
getPoller()->async([weak_self, cb]() mutable {
if (auto strong_self = weak_self.lock()) {
strong_self->sendRegisterRequest(std::move(cb));
}
});
}
void WebRtcSignalingPeer::unregist(const function<void(const SockException &ex)> &cb) {
DebugL;
auto trigger = [cb](const SockException &ex, std::string msg) { cb(ex); };
std::weak_ptr<WebRtcSignalingPeer> weak_self = std::static_pointer_cast<WebRtcSignalingPeer>(shared_from_this());
getPoller()->async([weak_self, trigger]() mutable {
if (auto strong_self = weak_self.lock()) {
strong_self->sendUnregisterRequest(std::move(trigger));
}
});
}
void WebRtcSignalingPeer::checkIn(const std::string& peer_room_id, const MediaTuple &tuple, const std::string& identifier,
const std::string& offer, bool is_play,
const function<void(const SockException &ex, const std::string& answer)> &cb, float timeout_sec) {
DebugL;
std::weak_ptr<WebRtcSignalingPeer> weak_self = std::static_pointer_cast<WebRtcSignalingPeer>(shared_from_this());
getPoller()->async([=] () mutable {
TraceL;
if (auto strong_self = weak_self.lock()) {
auto guest_id = strong_self->_room_id + "_" + makeRandStr(16);
strong_self->_tours.emplace(peer_room_id, std::make_pair(guest_id, identifier));
auto trigger = ([cb, peer_room_id, weak_self](const SockException &ex, const std::string &msg) {
auto strong_self = weak_self.lock();
if (ex && strong_self) {
strong_self->_tours.erase(peer_room_id);
}
return cb(ex, msg);
});
strong_self->sendCallRequest(peer_room_id, guest_id, tuple, offer, is_play, std::move(trigger));
}
});
}
void WebRtcSignalingPeer::checkOut(const std::string& peer_room_id) {
DebugL;
std::weak_ptr<WebRtcSignalingPeer> weak_self = std::static_pointer_cast<WebRtcSignalingPeer>(shared_from_this());
getPoller()->async([=] () {
TraceL;
if (auto strong_self = weak_self.lock()) {
auto it = strong_self->_tours.find(peer_room_id);
if (it != strong_self->_tours.end()) {
auto &guest_id = it->second.first;
strong_self->sendByeIndication(peer_room_id, guest_id);
strong_self->_tours.erase(it);
}
}
});
}
void WebRtcSignalingPeer::candidate(const std::string& transport_identifier, const std::string& candidate, const std::string& ice_ufrag, const std::string& ice_pwd) {
std::weak_ptr<WebRtcSignalingPeer> weak_self = std::static_pointer_cast<WebRtcSignalingPeer>(shared_from_this());
getPoller()->async([=] () {
if (auto strong_self = weak_self.lock()) {
strong_self->sendCandidateIndication(transport_identifier, candidate, ice_ufrag, ice_pwd);
}
});
}
void WebRtcSignalingPeer::processOffer(SIGNALING_MSG_ARGS, WebRtcInterface &transport) {
try {
auto sdp = transport.getAnswerSdp((const std::string )allArgs[SDP_KEY]);
auto tuple = MediaTuple(allArgs[CALL_VHOST_KEY], allArgs[CALL_APP_KEY], allArgs[CALL_STREAM_KEY]);
answer(allArgs[GUEST_ID_KEY], tuple, transport.getIdentifier(), sdp, allArgs[TYPE_KEY] == TYPE_VALUE_PLAY, allArgs[TRANSACTION_ID_KEY]);
std::weak_ptr<WebRtcSignalingPeer> weak_self = std::static_pointer_cast<WebRtcSignalingPeer>(shared_from_this());
transport.gatheringCandidate(_ice_server, [weak_self](const std::string& transport_identifier,
const std::string& candidate, const std::string& ufrag, const std::string& pwd) {
if (auto strong_self = weak_self.lock()) {
strong_self->candidate(transport_identifier, candidate, ufrag, pwd);
}
});
} catch (std::exception &ex) {
Json::Value body;
body[METHOD_KEY] = allArgs[METHOD_KEY];
body[ROOM_ID_KEY] = allArgs[ROOM_ID_KEY];
body[GUEST_ID_KEY] = allArgs[GUEST_ID_KEY];
body[CALL_VHOST_KEY] = allArgs[CALL_VHOST_KEY];
body[CALL_APP_KEY] = allArgs[CALL_APP_KEY];
body[CALL_STREAM_KEY] = allArgs[CALL_STREAM_KEY];
body[TYPE_KEY] = allArgs[TYPE_KEY];
sendRefusesResponse(body, allArgs[TRANSACTION_ID_KEY], ex.what());
}
}
void WebRtcSignalingPeer::answer(const std::string& guest_id, const MediaTuple &tuple, const std::string& identifier, const std::string& sdp, bool is_play, const std::string& transaction_id) {
_peer_guests.emplace(guest_id, identifier);
sendCallAccept(guest_id, tuple, sdp, is_play, transaction_id);
}
void WebRtcSignalingPeer::setOnConnect(function<void(const SockException &ex)> cb) {
_on_connect = cb ? std::move(cb) : [](const SockException &) {};
}
void WebRtcSignalingPeer::onConnect(const SockException &ex) {
TraceL;
if (_on_connect) {
_on_connect(ex);
}
if (!ex) {
createResponseExpiredTimer();
}
}
void WebRtcSignalingPeer::setOnShutdown(function<void(const SockException &ex)> cb) {
_on_shutdown = cb ? std::move(cb) : [](const SockException &) {};
}
void WebRtcSignalingPeer::onShutdown(const SockException &ex) {
TraceL;
if (_on_shutdown) {
_on_shutdown(ex);
}
}
void WebRtcSignalingPeer::onRecv(const Buffer::Ptr &buffer) {
TraceL << "recv msg:\r\n" << buffer->data();
Json::Value args;
Json::Reader reader;
reader.parse(buffer->data(), args);
Parser parser;
HttpAllArgs<decltype(args)> allArgs(parser, args);
CHECK_ARGS(METHOD_KEY, TRANSACTION_ID_KEY);
using MsgHandler = void (WebRtcSignalingPeer::*)(SIGNALING_MSG_ARGS);
static std::unordered_map<std::pair<std::string /*class*/, std::string /*method*/>, MsgHandler, ClassMethodHash> s_msg_handlers;
static onceToken token([]() {
s_msg_handlers.emplace(std::make_pair(CLASS_VALUE_ACCEPT, METHOD_VALUE_REGISTER), &WebRtcSignalingPeer::handleRegisterAccept);
s_msg_handlers.emplace(std::make_pair(CLASS_VALUE_REJECT, METHOD_VALUE_REGISTER), &WebRtcSignalingPeer::handleRegisterReject);
s_msg_handlers.emplace(std::make_pair(CLASS_VALUE_ACCEPT, METHOD_VALUE_UNREGISTER), &WebRtcSignalingPeer::handleUnregisterAccept);
s_msg_handlers.emplace(std::make_pair(CLASS_VALUE_REJECT, METHOD_VALUE_UNREGISTER), &WebRtcSignalingPeer::handleUnregisterReject);
s_msg_handlers.emplace(std::make_pair(CLASS_VALUE_REQUEST, METHOD_VALUE_CALL), &WebRtcSignalingPeer::handleCallRequest);
s_msg_handlers.emplace(std::make_pair(CLASS_VALUE_ACCEPT, METHOD_VALUE_CALL), &WebRtcSignalingPeer::handleCallAccept);
s_msg_handlers.emplace(std::make_pair(CLASS_VALUE_REJECT, METHOD_VALUE_CALL), &WebRtcSignalingPeer::handleCallReject);
s_msg_handlers.emplace(std::make_pair(CLASS_VALUE_INDICATION, METHOD_VALUE_CANDIDATE), &WebRtcSignalingPeer::handleCandidateIndication);
s_msg_handlers.emplace(std::make_pair(CLASS_VALUE_INDICATION, METHOD_VALUE_BYE), &WebRtcSignalingPeer::handleByeIndication);
});
auto it = s_msg_handlers.find(std::make_pair(allArgs[CLASS_KEY], allArgs[METHOD_KEY]));
if (it == s_msg_handlers.end()) {
WarnL << "unsupported class: "<< allArgs[CLASS_KEY] << ", method: " << allArgs[METHOD_KEY] << ", ignore";
return;
}
return (this->*(it->second))(allArgs);
}
void WebRtcSignalingPeer::onError(const SockException &err) {
WarnL << "room_id: " << _room_id;
s_room_keepers.erase(_room_key);
// 除非对端显式的发送了注销执行,否则因为网络异常导致的会话中断不影响已经进行通信的webrtc会话,仅作移除
}
bool WebRtcSignalingPeer::responseFilter(SIGNALING_MSG_ARGS, ResponseTrigger& trigger) {
if (allArgs[CLASS_KEY] != CLASS_VALUE_ACCEPT && allArgs[CLASS_KEY] != CLASS_VALUE_REJECT) {
return false;
}
for (auto &pr : _response_list) {
auto &transaction_id = pr.first;
// mismatch transaction_id
if (transaction_id != allArgs[TRANSACTION_ID_KEY] && !transaction_id.empty()) {
continue;
}
auto &handle = pr.second;
if (allArgs[METHOD_KEY] != handle.method) {
WarnL << "recv response method: " << allArgs[METHOD_KEY] << " mismatch request method: " << handle.method;
return false;
}
trigger = std::move(handle.cb);
_response_list.erase(transaction_id);
return true;
}
return false;
}
void WebRtcSignalingPeer::sendRegisterRequest(ResponseTrigger trigger) {
TraceL;
Json::Value body;
body[CLASS_KEY] = CLASS_VALUE_REQUEST;
body[METHOD_KEY] = METHOD_VALUE_REGISTER;
body[ROOM_ID_KEY] = getRoomId();
sendRequest(body, std::move(trigger));
}
void WebRtcSignalingPeer::handleRegisterAccept(SIGNALING_MSG_ARGS) {
TraceL;
ResponseTrigger trigger;
if (!responseFilter(allArgs, trigger)) {
return;
}
auto jsonArgs = allArgs.getArgs();
auto ice_servers = jsonArgs[ICE_SERVERS_KEY];
if (ice_servers.type() != Json::ValueType::arrayValue) {
_StrPrinter msg;
msg << "illegal \"" << ICE_SERVERS_KEY << "\" point";
WarnL << msg;
trigger(SockException(Err_other, msg), getRoomKey());
return;
}
if (ice_servers.empty()) {
_StrPrinter msg;
msg << "no ice server found in \"" << ICE_SERVERS_KEY << "\" point";
WarnL << msg;
trigger(SockException(Err_other, msg), getRoomKey());
return;
}
for (auto &ice_server : ice_servers) {
// only support 1 ice_server now
auto url = ice_server[URL_KEY].asString();
_ice_server = std::make_shared<RTC::IceServerInfo>(url);
_ice_server->_ufrag = ice_server[UFRAG_KEY].asString();
_ice_server->_pwd = ice_server[PWD_KEY].asString();
}
trigger(SockException(Err_success), getRoomKey());
}
void WebRtcSignalingPeer::handleRegisterReject(SIGNALING_MSG_ARGS) {
TraceL;
ResponseTrigger trigger;
if (!responseFilter(allArgs, trigger)) {
return;
}
auto ex = SockException(Err_other, StrPrinter << "register refuses by server, reason: " << allArgs[REASON_KEY]);
trigger(ex, getRoomKey());
onShutdown(ex);
}
void WebRtcSignalingPeer::sendUnregisterRequest(ResponseTrigger trigger) {
TraceL;
Json::Value body;
body[CLASS_KEY] = CLASS_VALUE_REQUEST;
body[METHOD_KEY] = METHOD_VALUE_UNREGISTER;
body[ROOM_ID_KEY] = _room_id;
sendRequest(body, std::move(trigger));
}
void WebRtcSignalingPeer::handleUnregisterAccept(SIGNALING_MSG_ARGS) {
ResponseTrigger trigger;
if (!responseFilter(allArgs, trigger)) {
return;
}
trigger(SockException(Err_success), getRoomKey());
}
void WebRtcSignalingPeer::handleUnregisterReject(SIGNALING_MSG_ARGS) {
ResponseTrigger trigger;
if (!responseFilter(allArgs, trigger)) {
return;
}
auto ex = SockException(Err_other, StrPrinter << "unregister refuses by server, reason: " << allArgs[REASON_KEY]);
trigger(ex, getRoomKey());
}
void WebRtcSignalingPeer::sendCallRequest(const std::string& peer_room_id, const std::string& guest_id, const MediaTuple &tuple, const std::string& sdp, bool is_play, ResponseTrigger trigger) {
DebugL;
Json::Value body;
body[CLASS_KEY] = CLASS_VALUE_REQUEST;
body[METHOD_KEY] = METHOD_VALUE_CALL;
body[TYPE_KEY] = is_play? TYPE_VALUE_PLAY : TYPE_VALUE_PUSH;
body[GUEST_ID_KEY] = guest_id; //our guest id
body[ROOM_ID_KEY] = peer_room_id;
body[CALL_VHOST_KEY] = tuple.vhost;
body[CALL_APP_KEY] = tuple.app;
body[CALL_STREAM_KEY] = tuple.stream;
body[SDP_KEY] = sdp;
sendRequest(body, std::move(trigger));
}
void WebRtcSignalingPeer::sendCallAccept(const std::string& peer_guest_id, const MediaTuple &tuple, const std::string& sdp, bool is_play, const std::string& transaction_id) {
DebugL;
Json::Value body;
body[CLASS_KEY] = CLASS_VALUE_ACCEPT;
body[METHOD_KEY] = METHOD_VALUE_CALL;
body[TRANSACTION_ID_KEY] = transaction_id;
body[TYPE_KEY] = is_play? TYPE_VALUE_PLAY : TYPE_VALUE_PUSH;
body[GUEST_ID_KEY] = peer_guest_id;
body[ROOM_ID_KEY] = _room_id; //our room id
body[CALL_VHOST_KEY] = tuple.vhost;
body[CALL_APP_KEY] = tuple.app;
body[CALL_STREAM_KEY] = tuple.stream;
body[SDP_KEY] = sdp;
sendPacket(body);
}
void WebRtcSignalingPeer::handleCallRequest(SIGNALING_MSG_ARGS) {
DebugL;
CHECK_ARGS(GUEST_ID_KEY, ROOM_ID_KEY, CALL_VHOST_KEY, CALL_APP_KEY, CALL_STREAM_KEY, TYPE_KEY);
if (allArgs[ROOM_ID_KEY] != getRoomId()) {
WarnL << "target room_id: " << allArgs[ROOM_ID_KEY] << "mismatch our room_id: " << getRoomId();
return;
}
auto args = std::make_shared<WebRtcArgsImp<Json::Value>>(allArgs, allArgs[GUEST_ID_KEY]);
std::weak_ptr<WebRtcSignalingPeer> weak_self = std::static_pointer_cast<WebRtcSignalingPeer>(shared_from_this());
WebRtcPluginManager::Instance().negotiateSdp(*this, allArgs[TYPE_KEY], *args, [allArgs, weak_self](const WebRtcInterface &exchanger) mutable {
if (auto strong_self = weak_self.lock()) {
strong_self->processOffer(allArgs, const_cast<WebRtcInterface &>(exchanger));
}
});
}
void WebRtcSignalingPeer::handleCallAccept(SIGNALING_MSG_ARGS) {
DebugL;
ResponseTrigger trigger;
if (!responseFilter(allArgs, trigger)) {
return;
}
CHECK_ARGS(GUEST_ID_KEY, ROOM_ID_KEY, CALL_VHOST_KEY, CALL_APP_KEY, CALL_STREAM_KEY, TYPE_KEY);
auto room_id = allArgs[ROOM_ID_KEY];
auto it = _tours.find(room_id);
if (it == _tours.end()) {
WarnL << "not found room_id: " << room_id << " in tours";
return;
}
auto &guest_id = it->second.first;
if (allArgs[GUEST_ID_KEY] != guest_id) {
WarnL << "guest_id: " << allArgs[GUEST_ID_KEY] << "mismatch our guest_id: " << guest_id;
return;
}
trigger(SockException(Err_success), allArgs[SDP_KEY]);
}
void WebRtcSignalingPeer::handleCallReject(SIGNALING_MSG_ARGS) {
DebugL;
ResponseTrigger trigger;
if (!responseFilter(allArgs, trigger)) {
return;
}
CHECK_ARGS(GUEST_ID_KEY, ROOM_ID_KEY, CALL_VHOST_KEY, CALL_APP_KEY, CALL_STREAM_KEY, TYPE_KEY);
auto room_id = allArgs[ROOM_ID_KEY];
auto it = _tours.find(room_id);
if (it == _tours.end()) {
WarnL << "not found room_id: " << room_id << " in tours";
return;
}
auto &guest_id = it->second.first;
if (allArgs[GUEST_ID_KEY] != guest_id) {
WarnL << "guest_id: " << allArgs[GUEST_ID_KEY] << "mismatch our guest_id: " << guest_id;
return;
}
_tours.erase(room_id);
trigger(SockException(Err_other, StrPrinter << "call refuses by server, reason: " << allArgs[REASON_KEY]), "");
}
void WebRtcSignalingPeer::handleCandidateIndication(SIGNALING_MSG_ARGS) {
DebugL;
CHECK_ARGS(GUEST_ID_KEY, ROOM_ID_KEY, CANDIDATE_KEY, UFRAG_KEY, PWD_KEY);
std::string identifier;
std::string room_id = allArgs[ROOM_ID_KEY];
std::string guest_id = allArgs[GUEST_ID_KEY];
//作为被叫
if (room_id == getRoomId()) {
auto it = _peer_guests.find(guest_id);
if (it == _peer_guests.end()) {
WarnL << "not found guest_id: " << guest_id;
return;
}
identifier = it->second;
} else {
//作为主叫
for (auto it : _tours) {
if (room_id != it.first) {
continue;
}
auto info = it.second;
if (guest_id != info.first) {
break;
}
identifier = info.second;
}
}
TraceL << "recv remote candidate: " << allArgs[CANDIDATE_KEY];
if (identifier.empty()) {
WarnL << "target room_id: " << room_id << " not match our room_id: " << getRoomId()
<< ", and target guest_id: " << guest_id << " not match";
return;
}
auto transport = WebRtcTransportManager::Instance().getItem(identifier);
if (transport) {
SdpAttrCandidate candidate_attr;
candidate_attr.parse(allArgs[CANDIDATE_KEY]);
transport->connectivityCheck(candidate_attr, allArgs[UFRAG_KEY], allArgs[PWD_KEY]);
}
}
void WebRtcSignalingPeer::handleByeIndication(SIGNALING_MSG_ARGS) {
DebugL;
CHECK_ARGS(GUEST_ID_KEY, ROOM_ID_KEY);
if (allArgs[ROOM_ID_KEY] != getRoomId()) {
WarnL << "target room_id: " << allArgs[ROOM_ID_KEY] << "not match our room_id: " << getRoomId();
return;
}
auto it = _peer_guests.find(allArgs[GUEST_ID_KEY]);
if (it == _peer_guests.end()) {
WarnL << "not found guest_id: " << allArgs[GUEST_ID_KEY];
return;
}
auto identifier = it->second;
_peer_guests.erase(it);
auto obj = WebRtcTransportManager::Instance().getItem(identifier);
if (obj) {
obj->safeShutdown(SockException(Err_shutdown, "deleted by websocket signaling server"));
}
}
void WebRtcSignalingPeer::sendByeIndication(const std::string& peer_room_id, const std::string &guest_id) {
DebugL;
Json::Value body;
body[CLASS_KEY] = CLASS_VALUE_INDICATION;
body[METHOD_KEY] = METHOD_VALUE_BYE;
body[GUEST_ID_KEY] = guest_id; //our guest id
body[ROOM_ID_KEY] = peer_room_id;
body[SENDER_KEY] = guest_id;
sendIndication(body);
}
void WebRtcSignalingPeer::sendCandidateIndication(const std::string& transport_identifier, const std::string& candidate, const std::string& ice_ufrag, const std::string& ice_pwd) {
TraceL;
Json::Value body;
body[CLASS_KEY] = CLASS_VALUE_INDICATION;
body[METHOD_KEY] = METHOD_VALUE_CANDIDATE;
body[CANDIDATE_KEY] = candidate;
body[UFRAG_KEY] = ice_ufrag;
body[PWD_KEY] = ice_pwd;
//作为被叫
for (auto &pr : _peer_guests) {
if (pr.second == transport_identifier) {
body[ROOM_ID_KEY] = _room_id;
body[GUEST_ID_KEY] = pr.first; //peer_guest_id
body[SENDER_KEY] = _room_id;
return sendIndication(body);
}
}
//作为主叫
for (auto &pr : _tours) {
auto &info = pr.second;
if (info.second == transport_identifier) {
body[ROOM_ID_KEY] = pr.first; //peer room id
body[GUEST_ID_KEY] = info.first; //our_guest_id
body[SENDER_KEY] = info.first;
return sendIndication(body);
}
}
}
void WebRtcSignalingPeer::sendAcceptResponse(const std::string& method, const std::string& transaction_id, const std::string& room_id,
const std::string& guest_id, const std::string& reason) {
// TODO
}
void WebRtcSignalingPeer::sendRefusesResponse(Json::Value &body, const std::string& transaction_id, const std::string& reason) {
body[CLASS_KEY] = CLASS_VALUE_REJECT;
body[REASON_KEY] = reason;
sendResponse(body, transaction_id);
}
void WebRtcSignalingPeer::sendRequest(Json::Value& body, ResponseTrigger trigger, float seconds) {
auto transaction_id = makeRandStr(32);
body[TRANSACTION_ID_KEY] = transaction_id;
ResponseTuple tuple;
tuple.ttl_ms = seconds * 1000;
tuple.method = body[METHOD_KEY].asString();
tuple.cb = std::move(trigger);
_response_list.emplace(std::move(transaction_id), std::move(tuple));
sendPacket(body);
}
void WebRtcSignalingPeer::sendIndication(Json::Value &body) {
auto transaction_id = makeRandStr(32);
body[TRANSACTION_ID_KEY] = transaction_id;
sendPacket(body);
}
void WebRtcSignalingPeer::sendResponse(Json::Value &body, const std::string& transaction_id) {
body[TRANSACTION_ID_KEY] = transaction_id;
sendPacket(body);
}
void WebRtcSignalingPeer::sendPacket(Json::Value& body) {
auto msg = body.toStyledString();
DebugL << "send msg: " << msg;
SockSender::send(std::move(msg));
}
Json::Value WebRtcSignalingPeer::makeInfoJson() {
Json::Value item;
item["room_id"] = getRoomId();
item["room_key"] = getRoomKey();
Json::Value peer_guests_obj(Json::arrayValue);
auto peer_guests = _peer_guests;
for(auto &guest : peer_guests) {
Json::Value obj;
obj["guest_id"] = guest.first;
obj["transport_identifier"] = guest.second;
peer_guests_obj.append(std::move(obj));
}
item["guests"] = std::move(peer_guests_obj);
Json::Value tours_obj(Json::arrayValue);
auto tours = _tours;
for(auto &tour : tours){
Json::Value obj;
obj["room_id"] = tour.first;
obj["guest_id"] = tour.second.first;
obj["transport_identifier"] = tour.second.second;
tours_obj.append(std::move(obj));
}
item["tours"] = std::move(tours_obj);
return item;
}
void WebRtcSignalingPeer::createResponseExpiredTimer() {
std::weak_ptr<WebRtcSignalingPeer> weak_self = std::static_pointer_cast<WebRtcSignalingPeer>(shared_from_this());
_expire_timer = std::make_shared<Timer>(0.2, [weak_self]() {
if (auto strong_self = weak_self.lock()) {
strong_self->checkResponseExpired();
return true; // 继续定时器
}
return false;
}, getPoller());
}
void WebRtcSignalingPeer::checkResponseExpired() {
//FIXME: 移动到专门的超时timer中处理
#if 0
// 设置计时器以检测 offer 响应超时
_offer_timeout_timer = std::make_shared<Timer>(
timeout_sec,
[this, cb, peer_room_id]() {
_tours.erase(peer_room_id);
return false; // 停止计时器
},
getPoller()
);
#endif
for (auto it = _response_list.begin(); it != _response_list.end();) {
auto &tuple = it->second;
if (!tuple.expired()) {
++it;
continue;
}
// over time
WarnL << "transaction_id: " << it->first << ", method: " << tuple.method << " recv response over time";
tuple.cb(SockException(Err_timeout, "recv response timeout"), "");
it = _response_list.erase(it);
}
}
}// namespace mediakit