前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >大话ion系列(一)

大话ion系列(一)

作者头像
LiveVideoStack
发布2021-09-23 16:43:17
1.5K0
发布2021-09-23 16:43:17
举报
文章被收录于专栏:音视频技术音视频技术

一、为什么用ion-sfu

1.简介

ion-sfu作为ion分布式架构里的核心模块,SFU是选择转发单元的简称,可以分发WebRTC的媒体流。ion-sfu从pion/ion拆分出来,经过社区打磨,是目前GO方案中最成熟且使用最广的SFU。

https://github.com/pion/ion

已经有多家开始商用了,这点国外公司比较快,比如:100ms、Screenleap和Tandem等。

100ms:https://www.100ms.live/

Screenleap:https://www.screenleap.com/

Tandem:https://tandem.chat/

2.ion-sfu优点

  • 纯GO,开发效率高,且能帮你绕过很多坑
  • 单进程多协程模型: - 可以利用多核 - 大大降低级联/单端口复杂度(其他SFU,可能存在本机不同worker间relay的问题;监听单端口时,存在worker间抢包的问题)
  • 高并发,曾在谷歌云4核压测到单房间50方会议 (大概2500路流-0.5Mbps)
  • 功能全面: - 双PeerConnection+多Track设计,有良好的浏览器兼容性,节省系统资源 - 支持多对多音视频通信 - 支持大小流Simulcast - 支持屏幕分享Screenshare - 支持发言方自动检测Audio-Level-Detect - 支持定制DataChannel - 支持节点间Relay - 支持单端口,大大降低部署难度 - 完善的抗弱网机制,抗丢包40%左右,支持TWCC/REMB+PLI/FIR+Nack+SR/RR等
  • 配套SDK完善,JS/Flutter/GO等

3.使用方式

ion-sfu使用方式有两种:

  • 作为服务使用,比如编译带grpc或jsonrpc信令的ion-sfu,然后再做一个自己的信令服务(推荐ion分布式套装),远程调用即可。
  • 作为包使用,import导入,然后做二次开发。此时抛弃了cmd下边的信令层,只需导入pkg/sfu下边的包即可,然后自行定制信令层,可以在sfu、session、peer层面,通过继承接口定制自己的业务,比较复杂。
import (
      sfu "github.com/pion/ion-sfu/pkg/sfu"
)

二、架构与模块

上面给一个简单架构图,很多细节表示不出来,需要看代码。

1.简介

得益于GO,ion-sfu整体代码精简,拥有极高的开发效率。结合现有SDK使用,可以避免很多坑:ion-sdk-js等。

ion-sfu基于pion/webrtc,所以代码风格偏标准webrtc,比如:PeerConnection。因为是使用了标准API,熟悉了之后很容易看懂其他工程,比如:ion-sdk-go/js/flutter。

这样从前到后,整体门槛都降低了。

2.工程组织

这里给出主要模块列表:

├── Makefile //用来编译二进制和grpc文件
├── bin //编译好的二进制目录
├── cmd
│   └── signal //包含三个主文件 grpc、jsonrpc、allrpc
├── config.toml //配置文件
├── examples //网页示例目录
├── pkg
    ├── buffer //buffer包,用于缓存包
    ├── logger //日志
    ├── middlewares //中间件,主要是支持自定义datachannel
    ├── relay //中继
    ├── sfu //sfu主模块,包含router、session、peer等
    ├── stats //状态统计
    └── twcc //transport-cc

3.信令层

信令代码和主程序在一起,在cmd/signal/下边。

  • 支持jsonrpc,主要处理逻辑在:
func (p *JSONSignal) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) {
  • 支持grpc,主要处理逻辑在:
func (s *SFUServer) Signal(stream pb.SFU_SignalServer) error {

而allrpc,是jsonrpc和grpc的合体封装,运行时会进入上面两个函数。

信令很简单:

  • join:加入一个session。
  • description:发起offer或回复answer,用于协商和重协商。
  • trickle:发送trickle candidate。

另外,出于简单考虑,一些信令和事件,直接走datachannel了,比如:大小流切换、声音检测、自定义信令等。

4.媒体层

媒体层的主要模块:

├── audioobserver.go //声音检测
├── datachannel.go //dc中间件的封装
├── downtrack.go //下行track
├── helpers.go //工具函数集
├── mediaengine.go //SDP相关codec、rtp参数设置
├── peer.go //peer封装,一个peer包含一个publisher和一个subscriber,双pc设计
├── publisher.go //publisher,封装上行pc
├── receiver.go //subscriber,封装下行pc
├── router.go //router,包含pc、session、一组receivers
├── sequencer.go //记录包的信息:序列号sn、偏移、时间戳ts等
├── session.go //会话,包含多个peer、dc
├── sfu.go //分发单元,包含多个session、dc
├── simulcast.go //大小流配置
├── subscriber.go //subscriber,封装下行pc、DownTrack、dc
└── turn.go //内置turn server

相比以前版本,增加了一些interface,主要是为了作为包使用时,封装自己的类。

三、主函数与信令流程

1.主函数

这里拿jsonrpc来分析,其他rpc流程上是一样的。

func main() {
      if !parse() {
            showHelp()
            os.Exit(-1)
      }


      //创建SFU和DC,这里的DC用于Simulcast和AudioLevel
      s := sfu.NewSFU(conf)
      dc := s.NewDatachannel(sfu.APIChannelLabel)
      dc.Use(datachannel.SubscriberAPI)


      //接下来是标准websocket服务器启动的流程
      upgrader := websocket.Upgrader{
            CheckOrigin: func(r *http.Request) bool {
                  return true
            },
            ReadBufferSize:  1024,
            WriteBufferSize: 1024,
      }


      //这里jsonrpc基于websocket,websocket从标准http upgrade过来的
      http.Handle("/ws", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
            c, err := upgrader.Upgrade(w, r, nil)
            if err != nil {
                  panic(err)
            }
            defer c.Close()


            //这里创建了JSONSignal,每次真实请求到来时会新建一个Peer,进入Handle函数处理
            p := server.NewJSONSignal(sfu.NewPeer(s), logger)
            defer p.Close()


            jc := jsonrpc2.NewConn(r.Context(), websocketjsonrpc2.NewObjectStream(c), p)
            <-< span="">jc.DisconnectNotify()
      }))


      go startMetrics(metricsAddr)


      var err error
      if key != "" && cert != "" {
            logger.Info("Started listening", "addr", "https://"+addr)
            err = http.ListenAndServeTLS(addr, cert, key, nil)
      } else {
            logger.Info("Started listening", "addr", "http://"+addr)
            err = http.ListenAndServe(addr, nil)
      }
      if err != nil {
            panic(err)
      }
}

2.协商&重协商

协商(negotiate):

WebRTC对外的类是PeerConnection,简称PC,通过信令服务交换SDP给PC进行操作。协商就是指双方通过信令交换SDP,通过PC的一些接口,达到协商双方的媒体格式、传输地址端口等信息,从而实现推流和播放的目的。

一次协商完整流程:

本端CreateOffer-》本端SetLocalDescription(offer)-》本端发送offer-》对端SetRemoteDescription(offer)-》对端CreateAnswer-》SetLocalDescription(answer)-》对端对端返回answer-》本端SetRemoteDescription(answer)

重协商(renegotiate):

就是指再次协商。

为什么要重协商?

因为客户端和服务器的track都是变化的,重协商是通知对端的必要手段,比如:客户端发起屏幕分享,服务器有人进出房间等。

重协商的原则:

谁变化谁发起(offer)。

3.信令流程

首先,客户端ws连接成功:

服务端会建立一个Peer,可以参考上边代码。

然后,客户端发起第一次协商:

客户端pc.CreateOffer(一个只包含dc的offer)-》pc.SetLocalDescription(offer),然后把offer放入Join信令,发送给服务端,然后服务器协商【pc.SetRemoteDescription(offer)-》pc.CreateAnswer-》pc.SetLocalDescription(answer)】,返回answer给客户端,至此完成数据通道(datachannel)建立。首先打通dc,是为了方便audio-level/simulcast通道的建立,此时也可以创建自定dc做定制业务。

接下来,服务端发起第二次协商:

服务端pc.CreateOffer,SetLocalDescription,发送offer,此时offer会携带上此房间内的所有track信息,客户端收到后会CreateAnswer,SetLocalDescription,把answer返回来,然后服务端pc.SetRemoteDescription(answer),此时客户端可以收到服务器此房间内的所有流了。

最后,客户端publish发流时会发起第三次协商:

同第一次流程一样,不同的是同时携带了音视频的track,本次协商完成后,服务器可以收到客户端的流了,收到之后会对同房间内的其他客户端发起重协商。

往后只要客户端或服务器track有变化,都会再次发起重协商。

4.代码分析

JsonRPC所有的信令都会进入Handle函数。为了简化流程,可以暂时不看Trickle和OnIceCandidate函数,这个是开启trickle-ICE时才会有。

// Handle incoming RPC call events like join, answer, offer and trickle
// JSONSignal是继承了LocalPeer,所以会继承一些属性和回调:OnOffer等。
// 可以在浏览器端ws网络工具里查看具体信令内容
func (p *JSONSignal) Handle(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2.Request) {
      replyError := func(err error) {
            _ = conn.ReplyWithError(ctx, req.ID, &jsonrpc2.Error{
                  Code:    500,
                  Message: fmt.Sprintf("%s", err),
            })
      }


      switch req.Method {
      case "join":// 首先客户端会发join信令过来
            var join Join
            err := json.Unmarshal(*req.Params, &join)
            if err != nil {
                  p.Logger.Error(err, "connect: error parsing offer")
                  replyError(err)
                  break
            }


            //设置OnOffer,即SFU发起offer时(重协商),会使用这个回调,比如重协商时,因为有很多客户端peer同时连到SFU,每个Peer的Track增删时,SFU需要向其他Peer重协商来告诉Track的变更
            p.OnOffer = func(offer *webrtc.SessionDescription) {
                  if err := conn.Notify(ctx, "offer", offer); err != nil {
                        p.Logger.Error(err, "error sending offer")
                  }


            }


            //设置OnIceCandidate,即SFU在ICE流程获取到新候选时,会回调这个函数,告诉客户端新增了啥候选
            p.OnIceCandidate = func(candidate *webrtc.ICECandidateInit, target int) {
                  if err := conn.Notify(ctx, "trickle", Trickle{
                        Candidate: *candidate,
                        Target:    target,
                  }); err != nil {
                        p.Logger.Error(err, "error sending ice candidate")
                  }
            }
            //加入某个会话(房间)
            err = p.Join(join.SID, join.UID, join.Config)
            if err != nil {
                  replyError(err)
                  break
            }


            //根据offer回复answer
            answer, err := p.Answer(join.Offer)
            if err != nil {
                  replyError(err)
                  break
            }


            _ = conn.Reply(ctx, req.ID, answer)


      //如果是客户端发offer,回复answer,此时为客户端发起重协商
      case "offer":
            var negotiation Negotiation
            err := json.Unmarshal(*req.Params, &negotiation)
            if err != nil {
                  p.Logger.Error(err, "connect: error parsing offer")
                  replyError(err)
                  break
            }


            answer, err := p.Answer(negotiation.Desc)
            if err != nil {
                  replyError(err)
                  break
            }
            _ = conn.Reply(ctx, req.ID, answer)


      //如果是客户端发answer,设置SetRemoteDescription即可
      case "answer":
            var negotiation Negotiation
            err := json.Unmarshal(*req.Params, &negotiation)
            if err != nil {
                  p.Logger.Error(err, "connect: error parsing offer")
                  replyError(err)
                  break
            }


            err = p.SetRemoteDescription(negotiation.Desc)
            if err != nil {
                  replyError(err)
            }


      //如果是客户端发送Trickle-ICE的候选过来,设置即可
      case "trickle":
            var trickle Trickle
            err := json.Unmarshal(*req.Params, &trickle)
            if err != nil {
                  p.Logger.Error(err, "connect: error parsing candidate")
                  replyError(err)
                  break
            }


            err = p.Trickle(trickle.Candidate, trickle.Target)
            if err != nil {
                  replyError(err)
            }
      }
}

注意OnOffer是服务器重协商的回调,即房间内某客户端track有变化,服务器会回调此函数通知其他客户端。

总结一句话,客户端《---》SFU的核心逻辑就是不断重协商,谁变化谁发起offer


本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-09-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 LiveVideoStack 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 2.ion-sfu优点
  • 3.使用方式
  • 1.简介
  • 2.工程组织
  • 3.信令层
  • 4.媒体层
  • 1.主函数
  • 2.协商&重协商
  • 3.信令流程
  • 4.代码分析
相关产品与服务
消息队列 TDMQ
消息队列 TDMQ (Tencent Distributed Message Queue)是腾讯基于 Apache Pulsar 自研的一个云原生消息中间件系列,其中包含兼容Pulsar、RabbitMQ、RocketMQ 等协议的消息队列子产品,得益于其底层计算与存储分离的架构,TDMQ 具备良好的弹性伸缩以及故障恢复能力。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档