【owt】WebrtcNode, subscribe流程

news/2024/2/29 2:15:07

subscribe流程

1. AmqpClient - New message received

2023-04-26T21:54:18.415  - DEBUG: AmqpClient - RpcServer New message received {method: 'subscribe',args: ['b149e44bb10d4e91bd162a8c6806ae7b','webrtc',{transportId: 'b149e44bb10d4e91bd162a8c6806ae7b',tracks: [Array],controller: 'conference-aed26ef945c09ddf89b3@192.168.221.62_0',owner: 'xG6DXLHdXwky_E8eAAAD'}],corrID: 28,replyTo: 'amq.gen-WtoELIbC4gJ1GfdYgkvSFA'
}

2. WebrtcNode - subscribe

webrtc_agent/webrtc/index.js

2023-04-26T21:54:18.416  - DEBUG: WebrtcNode - subscribe, operationId: b149e44bb10d4e91bd162a8c6806ae7b connectionType: webrtc options: {transportId: 'b149e44bb10d4e91bd162a8c6806ae7b',tracks: [{from: 'cb8fcfb93f174c24862feaa38915111a',mid: '0',type: 'audio',formatPreference: [Object]},{from: 'cb8fcfb93f174c24862feaa38915111a',mid: '1',parameters: {},type: 'video',formatPreference: [Object]}],controller: 'conference-aed26ef945c09ddf89b3@192.168.221.62_0',owner: 'xG6DXLHdXwky_E8eAAAD'
}

from----参数的意义???,来自哪个transportId

 /** For operations on type webrtc, publicTrackId is connectionId.* For operations on type internal, operationId is connectionId.*/// functions: publish, unpublish, subscribe, unsubscribe, linkup, cutoff// options = {//   transportId,//   tracks = [{mid, type, formatPreference, scalabilityMode}],//   controller, owner, enableBWE// }// formatPreference = {preferred: MediaFormat, optional: [MediaFormat]}that.subscribe = function (operationId, connectionType, options, callback) {log.debug('subscribe, operationId:', operationId, 'connectionType:', connectionType, 'options:', options);if (mappingTransports.has(operationId)) {return callback('callback', {type: 'failed', reason: 'Connection already exists:'+operationId});}var conn = null;if (connectionType === 'webrtc') {if (!options.transportId) {// Generate a transportId}// 1. 创建 WebRTCConnection// options.transportId, connecid 类似// options.controller, 消息来自哪个confrence-agent,回传消息的时候用// options.owner, 用户idconn = createWebRTCConnection(options.transportId, options.controller, options.owner);// 2. addTrackOperationoptions.tracks.forEach(function trackOp(t) {conn.addTrackOperation(operationId, 'recvonly', t);});mappingTransports.set(operationId, options.transportId);if (options.enableBWE) {conn.enableBWE = true;}callback('callback', 'ok');} else {log.error('Connection type invalid:' + connectionType);}if (!conn) {log.error('Create connection failed', operationId, connectionType);callback('callback', {type: 'failed', reason: 'Create Connection failed'});}};

???callback 是哪里来的

2.1 createWebRTCConnection

创建WebRtcConnection

小节 3

2.2 addTrackOperation

小节 4

3.WebrtcNode - createWebRTCConnection——返回WrtcConnection

webrtc_agent/webrtc/index.js

 var createWebRTCConnection = function (transportId, controller, owner) {if (peerConnections.has(transportId)) {log.debug('PeerConnection already created:', transportId);return peerConnections.get(transportId);}var connection = new WrtcConnection({connectionId: transportId,threadPool: threadPool,ioThreadPool: ioThreadPool,network_interfaces: global.config.webrtc.network_interfaces,owner,}, function onTransportStatus(status) {notifyTransportStatus(controller, transportId, status);}, function onTrack(trackInfo) {// track的相关信息handleTrackInfo(transportId, trackInfo, controller);});// map 存放WebRtcconnecitonpeerConnections.set(transportId, connection);mappingPublicId.set(transportId, new Map());connection.controller = controller;return connection;};

3.1 peerConnections 成员

存放 WrtcConnection,transportId与connection一一对应

// Map { transportId => WrtcConnection }
var peerConnections = new Map();
----------------------------------       
peerConnections.set(transportId, connection);

3.2 mappingPublicId成员

// Map { transportId => Map { trackId => publicTrackId } }
var mappingPublicId = new Map();
--------------------------------------------
mappingPublicId.set(transportId, new Map());

createWebRTCConnection 的时候,只是创建了空的Map,而Map中存放的{ trackId => publicTrackId } },是在 handleTrackInfotrackInfo.type === 'track-added' 中存入的。

dist-debug/webrtc_agent/webrtc/index.js

 var handleTrackInfo = function (transportId, trackInfo, controller) {var publicTrackId;var updateInfo;if (trackInfo.type === 'track-added') {// Generate public track IDconst track = trackInfo.track;publicTrackId = transportId + '-' + track.id;if (mediaTracks.has(publicTrackId)) {log.error('Conflict public track id:', publicTrackId, transportId, track.id);return;}...mappingPublicId.get(transportId).set(track.id, publicTrackId);...}

3.3 new WrtcConnection——创建rtc连接,并初始化

详细见小节4

3.4 handleTrackInfo

4. new WrtcConnection——创建rtc连接,并初始化

webrtc_agent/webrtc/wrtcConnection.js

 module.exports = function (spec, on_status, on_track) {...wrtc = new Connection(wrtcId, threadPool, ioThreadPool, { ipAddresses });wrtc.callBase = new CallBase();// wrtc.addMediaStream(wrtcId, {label: ''}, direction === 'in');initWebRtcConnection(wrtc);return that;
};

4.1 new Connection——创建c++的WebrtcConnection

webrtc_agent/webrtc/connection.js

2023-04-26T21:54:18.416  - INFO: Connection - message: Connection, id: b149e44bb10d4e91bd162a8c6806ae7bd
class Connection extends EventEmitter {constructor (id, threadPool, ioThreadPool, options = {}) {super();log.info(`message: Connection, id: ${id}`);this.id = id;this.threadPool = threadPool;this.ioThreadPool = ioThreadPool;this.mediaConfiguration = 'default';this.mediaStreams = new Map();this.initialized = false;this.options = options;this.ipAddresses = options.ipAddresses || '';this.trickleIce = options.trickleIce || false;this.metadata = this.options.metadata || {};this.isProcessingRemoteSdp = false;this.ready = false;
// native 的WebRtcConnectionthis.wrtc = this._createWrtc();}
...
}

4.1.1 ---------Connection._createWrtc——创建webrtc connection

 _createWrtc() {var wrtc = new addon.WebRtcConnection(this.threadPool, this.ioThreadPool, this.id,global.config.webrtc.stunserver,global.config.webrtc.stunport,global.config.webrtc.minport,global.config.webrtc.maxport,false, //this.trickleIce,this._getMediaConfiguration(this.mediaConfiguration),false,'', // turnserver,'', // turnport,'', //turnusername,'', //turnpass,'', //networkinterfacethis.ipAddresses);return wrtc;}

4.1.2 NAN_METHOD(WebRtcConnection::New)

source/agent/webrtc/rtcConn/WebRtcConnection.cc

NAN_METHOD(WebRtcConnection::New) {...WebRtcConnection* obj = new WebRtcConnection();obj->me = std::make_shared<erizo::WebRtcConnection>(worker, io_worker, wrtcId, iceConfig,rtp_mappings, ext_mappings, obj);uv_async_init(uv_default_loop(), &obj->async_, &WebRtcConnection::eventsCallback);obj->Wrap(info.This());info.GetReturnValue().Set(info.This());obj->asyncResource_ = new Nan::AsyncResource("WebRtcConnectionCallback");...
}

4.1.3 erizo::WebRtcConnection::WebRtcConnection

source/agent/webrtc/rtcConn/erizo/src/erizo/WebRtcConnection.cpp

2023-04-26 21:54:18,417  - INFO: WebRtcConnection - id: b149e44bb10d4e91bd162a8c6806ae7b,  message: constructor,stunserver: , stunPort: 0, minPort: 0, maxPort: 0
WebRtcConnection::WebRtcConnection(std::shared_ptr<Worker> worker, std::shared_ptr<IOWorker> io_worker,const std::string& connection_id, const IceConfig& ice_config, const std::vector<RtpMap> rtp_mappings,const std::vector<erizo::ExtMap> ext_mappings, WebRtcConnectionEventListener* listener) :connection_id_{connection_id},audio_enabled_{false}, video_enabled_{false}, bundle_{false}, conn_event_listener_{listener},ice_config_{ice_config}, rtp_mappings_{rtp_mappings}, extension_processor_{ext_mappings},worker_{worker}, io_worker_{io_worker},remote_sdp_{std::make_shared<SdpInfo>(rtp_mappings)}, local_sdp_{std::make_shared<SdpInfo>(rtp_mappings)},audio_muted_{false}, video_muted_{false}, first_remote_sdp_processed_{false}{ELOG_INFO("%s message: constructor, stunserver: %s, stunPort: %d, minPort: %d, maxPort: %d",toLog(), ice_config.stun_server.c_str(), ice_config.stun_port, ice_config.min_port, ice_config.max_port);stats_ = std::make_shared<Stats>();// distributor_ = std::unique_ptr<BandwidthDistributionAlgorithm>(new TargetVideoBWDistributor());global_state_ = CONN_INITIAL;trickle_enabled_ = ice_config_.should_trickle;slide_show_mode_ = false;sending_ = true;
}

4.2 WrtcConnection - initWebRtcConnection

webrtc_agent/webrtc/wrtcConnection.js

/** Given a WebRtcConnection waits for the state CANDIDATES_GATHERED for set remote SDP.*/// wrtc 是Connection,Connection 继承于EventEmittervar initWebRtcConnection = function (wrtc) {// EventEmitter.on()用于监听事件// 在c++从回调到js中,就是在Connection.init中触发// Connection wrtcwrtc.on('status_event', (evt, status) => {if (evt.type === 'answer') {processAnswer(evt.sdp);const message = localSdp.toString();log.debug('Answer SDP', message);on_status({type: 'answer', sdp: message});} else if (evt.type === 'candidate') {let message = evt.candidate;networkInterfaces.forEach((i) => {if (i.ip_address && i.replaced_ip_address) {message = message.replace(new RegExp(i.ip_address, 'g'), i.replaced_ip_address);}});on_status({type: 'candidate', candidate: message});} else if (evt.type === 'failed') {log.warn('ICE failed, ', status, wrtc.id);on_status({type: 'failed', reason: 'Ice procedure failed.'});} else if (evt.type === 'ready') {log.debug('Connection ready, ', wrtc.wrtcId);on_status({type: 'ready'});}});// Connection wrtcwrtc.init(wrtcId);};

4.2.1 Connection/EventEmitter.on——监听事件

4.2.2 Connection.init——初始化c++的WebRtcConnection

webrtc_agent/webrtc/connection.js

2023-04-26T21:54:18.417  - DEBUG: Connection - message: 
Init Connection, 
connectionId: b149e44bb10d4e91bd162a8c6806ae7b 
{"ipAddresses":[]}
init(streamId) {if (this.initialized) {return false;}const firstStreamId = streamId;this.initialized = true;log.debug(`message: Init Connection, connectionId: ${this.id} `+`${logger.objectToLog(this.options)}`);this.sessionVersion = 0;// WebRtcConnection c++ wrapper, 调用c++this.wrtc.init((newStatus, mess, streamId) => {log.debug('message: WebRtcConnection status update, ' +'id: ' + this.id + ', status: ' + newStatus +', ' + logger.objectToLog(this.metadata) + mess);switch(newStatus) {case CONN_INITIAL:// 触发4.2.1this.emit('status_event', {type: 'started'}, newStatus);break;case CONN_SDP_PROCESSED:this.isProcessingRemoteSdp = false;// this.latestSdp = mess;// this._maybeSendAnswer(newStatus, streamId);break;case CONN_SDP:this.latestSdp = mess;this._maybeSendAnswer(newStatus, streamId);break;case CONN_GATHERED:this.alreadyGathered = true;this.latestSdp = mess;this._maybeSendAnswer(newStatus, firstStreamId);break;case CONN_CANDIDATE:mess = mess.replace(this.options.privateRegexp, this.options.publicIP);this.emit('status_event', {type: 'candidate', candidate: mess}, newStatus);break;case CONN_FAILED:log.warn('message: failed the ICE process, ' + 'code: ' + WARN_BAD_CONNECTION +', id: ' + this.id);this.emit('status_event', {type: 'failed', sdp: mess}, newStatus);break;case CONN_READY:log.debug('message: connection ready, ' + 'id: ' + this.id +', ' + 'status: ' + newStatus + ' ' + mess + ',' + streamId);if (!this.ready) {this.ready = true;this.emit('status_event', {type: 'ready'}, newStatus);}break;}});if (this.options.createOffer) {log.debug('message: create offer requested, id:', this.id);const audioEnabled = this.options.createOffer.audio;const videoEnabled = this.options.createOffer.video;const bundle = this.options.createOffer.bundle;// WebRtcConnection c++ wrapper, 调用c++this.wrtc.createOffer(videoEnabled, audioEnabled, bundle);}// 触发4.2.1this.emit('status_event', {type: 'initializing'});return true;}

4.2.3 NAN_METHOD(WebRtcConnection::init)

source/agent/webrtc/rtcConn/WebRtcConnection.cc

NAN_METHOD(WebRtcConnection::init) {WebRtcConnection* obj = Nan::ObjectWrap::Unwrap<WebRtcConnection>(info.Holder());std::shared_ptr<erizo::WebRtcConnection> me = obj->me;obj->eventCallback_ = new Nan::Callback(info[0].As<Function>());bool r = me->init();info.GetReturnValue().Set(Nan::New(r));
}

4.2.4 erizo::WebRtcConnection::init()

source/agent/webrtc/rtcConn/erizo/src/erizo/WebRtcConnection.cpp

bool WebRtcConnection::init() {maybeNotifyWebRtcConnectionEvent(global_state_, "");return true;
}

状态通知给js层,

4.2.5 erizo::WebRtcConnection::maybeNotifyWebRtcConnectionEvent

void WebRtcConnection::maybeNotifyWebRtcConnectionEvent(const WebRTCEvent& event, const std::string& message,const std::string& stream_id) {boost::mutex::scoped_lock lock(event_listener_mutex_);if (!conn_event_listener_) {return;}conn_event_listener_->notifyEvent(event, message, stream_id);
}

—> 4.2.2

this.wrtc.init((newStatus, mess, streamId) => {log.debug('message: WebRtcConnection status update, ' +'id: ' + this.id + ', status: ' + newStatus +', ' + logger.objectToLog(this.metadata) + mess);switch(newStatus) {case CONN_INITIAL:this.emit('status_event', {type: 'started'}, newStatus);break;....});

—> 4.2.1

wrtc.on('status_event', (evt, status) => {...
});

4.2.6 log

2023-04-26T21:54:18.419  - DEBUG: Connection - message: 
WebRtcConnection status update, 
id: b149e44bb10d4e91bd162a8c6806ae7b, 
status: 101, 
{}

CONN_INITIAL = 101

4.2.7 status

const CONN_INITIAL        = 101;
const CONN_STARTED        = 102;
const CONN_GATHERED       = 103;
const CONN_READY          = 104;
const CONN_FINISHED       = 105;
const CONN_CANDIDATE      = 201;
const CONN_SDP            = 202;
const CONN_SDP_PROCESSED  = 203;
const CONN_FAILED         = 500;
const WARN_BAD_CONNECTION = 502;

5. WrtcConnection.addTrackOperation

// option = {mid, type, formatPreference, scalabilityMode}that.addTrackOperation = function (operationId, sdpDirection, option) {var ret = false;var {mid, type, formatPreference, scalabilityMode} = option;if (!operationMap.has(mid)) {log.debug(`MID ${mid} for operation ${operationId} add`);const enabled = true;// mapoperationMap.set(mid, {operationId, type, sdpDirection, formatPreference, enabled});if (scalabilityMode) {operationMap.get(mid).scalabilityMode = scalabilityMode;}ret = true;} else {log.warn(`MID ${mid} has mapped operation ${operationMap.get(mid).operationId}`);}return ret;};

5.1 WrtcConnection.operationMap

  // mid => { operationId, sdpDirection, type, formatPreference, rids, enabled, finalFormat }var operationMap = new Map();

6. 流程图

在这里插入图片描述


https://www.jiucaihua.cn/news/show-4628058.html

相关文章

前端045_单点登录SSO_实现流程

单点登录SSO_实现流程 1、背景2、基于同域下 Cookie 实现 SSO1、背景 在企业发展初期,企业使用的系统很少,通常一个或者两个,每个系统都有自己的登录模块,运营人员每天用自己的账号登录,很方便。 但随着企业的发展,用到的系统随之增多,运营人员在操作不同的系统时,需要…

【ARMv8 SIMD和浮点指令编程】NEON 减法指令——减法也好几种

向量减法包括常见的普通加指令&#xff0c;还包括长减、宽减、半减、饱和减、按对减、按对加并累加、选择高半部分结果加、全部元素加等。 1 SUB 减法&#xff08;向量&#xff09;&#xff0c;该指令从第一个源 SIMD&FP 寄存器中的相应向量元素中减去第二个源 SIMD&…

Java 实现在顺序表指定位置插入一个元素

一、思路 1.定义一个pos变量来记录要插入的位置. 2.定义一个usedSize变量来记录元素个数. 3.定义一个data变量来记录要插入的元素值. 4.要保证pos位置合法&#xff0c;也就是不是负数&#xff0c;因为是要保证pos位置前是要有元素&#xff0c;因此也不能大于元素个数. 5.也需要…

PyTorch深度学习实战(2)——PyTorch基础

PyTorch深度学习实战&#xff08;2&#xff09;——PyTorch基础 0. 前言1. 搭建 PyTorch 环境2. PyTorch 张量2.1 张量初始化2.2 张量运算2.3 张量对象的自动梯度计算 3. PyTorch 张量相对于 NumPy 数组的优势小结系列链接 0. 前言 PyTorch 是广泛应用于机器学习领域中的强大开…

Rust每日一练(Leetday0020) 最后单词的长度、螺旋矩阵II、排列序列

目录 58. 最后一个单词的长度 Length of Last Word &#x1f31f; 59. 螺旋矩阵 II Spiral Matrix II &#x1f31f;&#x1f31f; 60. 排列序列 Permutation Sequence &#x1f31f;&#x1f31f;&#x1f31f; &#x1f31f; 每日一练刷题专栏 &#x1f31f; Rust每日…

Linux之理解文件系统——文件的管理

文章目录 前言一、磁盘1.磁盘的物理结构2.磁盘的存储结构3.磁盘的逻辑结构 二、文件系统与inode1.文件在磁盘中是如何存储的&#xff1f;2.对文件进行操作 三、软硬链接1.软链接创建软链接&#xff1a;inode删除软链接&#xff1a;软链接的作用&#xff1a; 2.硬链接创建硬链接…

JavaScript之BOM(八)

JavaScript之BOM 1、BOM中的对象2、window对象2.1、简介2.2、常用的属性与方法2.3、常用的事件2.4、定时器和延时器 3、navigator 常用属性与方法4、history 常用属性与方法5、location 常用属性与方法 BOM&#xff1a;浏览器对象模型&#xff08;Browser Object Model&#xf…

阿里云 Windows Server 2022 安装 Docker

阿里云Windows Server 2022 安装 Docker 文章目录 情景尝试正解 安装Docker管理工具安装Docker重启系统配置Docker系统路径配置Docker引擎(也许不用)启动Docker服务 情景 情景&#xff1a;最近一直在搞微服务&#xff0c;团队的服务器是阿里云的 Windows Server 2022&…

论文笔记--Transformer-XL: Attentive Language Models Beyond a Fixed-Length Context

论文笔记--Transformer-XL: Attentive Language Models Beyond a Fixed-Length Context 1. 文章简介2. 文章概括3 文章重点技术3.1 Segment-Level Recurrence with State Reuse3.2 相对位置编码 4. 文章亮点5. 原文传送门 1. 文章简介 标题&#xff1a;Transformer-XL: Attent…

Golang每日一练(leetDay0086) 回文链表、删除链表节点

目录 234. 回文链表 Palindrome Linked-list &#x1f31f; 237. 删除链表中的节点 Delete Node In a Linked-list &#x1f31f;&#x1f31f; &#x1f31f; 每日一练刷题专栏 &#x1f31f; Rust每日一练 专栏 Golang每日一练 专栏 Python每日一练 专栏 C/C每日一练…