前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >大话ion系列(四)

大话ion系列(四)

作者头像
LiveVideoStack
发布2021-11-26 11:45:38
1.2K0
发布2021-11-26 11:45:38
举报
文章被收录于专栏:音视频技术
作者 | 王朋闯 本

七、Simulcast流程

1. Simulcast概念

先介绍WebRTC的一个概念——Simulcast(联播,俗称大小流):

代码语言:javascript
复制
推流端===f/h/q==>SFU--f--->收流端A
                 |---q--->收流端B
                 |---h--->收流端C
  • 上行一般是三路流,按分辨率和码率,一般分为fhq(大中小)三层
  • 下行可以分给不同的用户不同的流,比如网不好时分发个小流q,网变好了再切回大流f
  • 三层的streamId、trackId是一样的,但是rid和ssrc是不同的,rid一般是f、h、q
  • 对应的SDP部分
代码语言:javascript
复制
.........
a=rid:f send
a=rid:h send
a=rid:q send
a=simulcast:send f;h;q

2.收发流程

看本章之前,最好看一下前一章,熟悉一下收发流程,本文只重点介绍其中的Simulcast部分。

收发包逻辑打通步骤:

SDK推流---->OnTrack---->router.AddReceiver(设置Buffer和上行Track)------>SessionLocal.Publish(设置下行Track)---->收发包逻辑打通

3.Simulcast上行流程

非Simulcast情况,OnTrack一般会触发两次:一个audioTrack+一个videoTrack。

Simulcast下,OnTrack一般会触发四次:一个audioTrack+三个videoTrack(rid分别为fhq)。

这个流程会触发四次:

代码语言:javascript
复制
OnTrack--->router.AddReceiver--->WebRTCReceiver.AddUpTrack

三个videoTrack,共用同一个WebRTCReceiver。

代码语言:javascript
复制
type WebRTCReceiver struct {
。。。
    receiver       *webrtc.RTPReceiver
    codec          webrtc.RTPCodecParameters
    rtcpCh         chan []rtcp.Packet
    buffers        [3]*buffer.Buffer//需要三个buffer
    upTracks       [3]*webrtc.TrackRemote//三个TrackRemote
。。。
    pendingTracks  [3][]*DownTrack//三个层,每层来订阅的downtrack
。。。
}

接下来看一下AddUpTrack是如何工作的:

代码语言:javascript
复制
func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote,buff *buffer.Buffer, bestQualityFirst bool) {
    if w.closed.get() {
        return
    }
 
  //根据RID来区分layer
    var layer int
    switch track.RID() {//如果没开simulcast,为""
    case fullResolution:
        layer = 2
    case halfResolution:
        layer = 1
    default:
        layer = 0//如果没开simulcast,为0
    }
 
    w.Lock()
  //设置空域层layer的track
    w.upTracks[layer] = track
 
  //设置空域层layer的buff
    w.buffers[layer] = buff
    w.available[layer].set(true)
 
  //设置空域层layer的downtrack
    w.downTracks[layer].Store(make([]*DownTrack,0, 10))
    w.pendingTracks[layer] = make([]*DownTrack,0, 10)
    w.Unlock()
 
  //闭包函数,按最佳质量订阅,切到f层
    subBestQuality := func(targetLayerint) {
        for l := 0; l <targetLayer; l++ {
            dts :=w.downTracks[l].Load()
            if dts == nil{
                continue
            }
            for _, dt :=range dts.([]*DownTrack) {
                _ = dt.SwitchSpatialLayer(int32(targetLayer), false)
            }
        }
    }
 
  //闭包函数,按最差质量订阅,切到q层
    subLowestQuality := func(targetLayerint) {
        for l := 2; l !=targetLayer; l-- {
            dts :=w.downTracks[l].Load()
            if dts == nil{
                continue
            }
            for _, dt :=range dts.([]*DownTrack) {
                _ = dt.SwitchSpatialLayer(int32(targetLayer), false)
            }
        }
    }
 
  //是否开启大小流
    if w.isSimulcast {
    //如果配置最佳质量,则等到f层到来时,订阅它
        if bestQualityFirst &&(!w.available[2].get() || layer == 2) {
            subBestQuality(layer)
      //如果配置最差质量,则等到q层到来时,订阅它
        } else if!bestQualityFirst && (!w.available[0].get() ||layer == 0) {
            subLowestQuality(layer)
        }
    }
 
  //启动读写流程
    go w.writeRTP(layer)
}

真正的收发包流程来了:

代码语言:javascript
复制
func (w *WebRTCReceiver) writeRTP(layer int) {
    defer func() {//这里设置自动清理函数
        w.closeOnce.Do(func() {
            w.closed.set(true)
            w.closeTracks()
        })
    }()
 
  //创建一个PLI包,后边要用
    pli := []rtcp.Packet{
        &rtcp.PictureLossIndication{SenderSSRC:rand.Uint32(), MediaSSRC: w.SSRC(layer)},
    }
 
    for {
    //这里可以看到,真正读包是从buffer里读出来的,正是前边讲到的自定义buffer
        pkt, err :=w.buffers[layer].ReadExtended()
        if err ==io.EOF {
            return
        }
 
    //如果开启大小流
        if w.isSimulcast {
      //一开始是pending状态
            ifw.pending[layer].get() {
        //如果收到的包是关键帧
                ifpkt.KeyFrame {
                    w.Lock()
          //如果有切换中的layer,那就切一下
                    for idx,dt := range w.pendingTracks[layer] {
                        w.deleteDownTrack(dt.CurrentSpatialLayer(), dt.peerID)
                        w.storeDownTrack(layer, dt)
                        dt.SwitchSpatialLayerDone(int32(layer))
                       w.pendingTracks[layer][idx] = nil
                    }
                   w.pendingTracks[layer] = w.pendingTracks[layer][:0]
                   w.pending[layer].set(false)
                    w.Unlock()
                } else {
          //如果是非关键字,说明需要发送PLI
                    w.SendRTCP(pli)
                }
            }
        }
 
    //这里是不是有疑问,[]*downTracks是SessionLocal.Publish里塞过来的,后边会介绍:)
        for _, dt := rangew.downTracks[layer].Load().([]*DownTrack){
      //下行track写入rtp包
            if err = dt.WriteRTP(pkt, layer);err != nil {
                if err ==io.EOF && err == io.ErrClosedPipe {
                    w.Lock()
                    w.deleteDownTrack(layer, dt.id)
                    w.Unlock()
                }
                log.Error().Err(err).Str("id", dt.id).Msg("Errorwriting to down track")
            }
        }
    }
 
}

至此一个简单的Simulcast收发模型:

代码语言:javascript
复制
   SFU--->WebRTCReceiver(audio).buffer[0].ReadExtended---->downTracks[0][0].WriteRTP->SDK
       |                                                    |....
       |                                                    |--->downTracks[0][N].WriteRTP
       |
       |---->WebRTCReceiver(video).buffer[0].ReadExtended---->downTracks[0][0].WriteRTP
                    |                                      |....
                    |                                      |---->downTracks[0][N].WriteRTP
                    |
                    |------------->buffer[1].ReadExtended---->downTracks[1][0].WriteRTP
                    |                                      |....
                    |                                      |----->downTracks[1][N].WriteRTP
                    |
                    |------------->buffer[2].ReadExtended---->downTracks[2][0].WriteRTP
                                                           |....
                                                           |------>downTracks[2][N].WriteRTP                             

上面省略了SDK--->ReadStreamSRTP.buffer.Write,这个buffer和WebRTCReceiver.buffer是同一个。

订阅端SDK的切大小流操作,其实就是在0-2来回挂载downTrack而已。

4.Simulcast下行流程

读者前边的疑问,downTracks是哪里塞过来的?流程在这里:

OnTrack--->SessionLocal.Publish--->router.AddDownTracks--->router.AddDownTrack--->WebRTCReceiver.AddDownTrack--->WebRTCReceiver.storeDownTrack

代码语言:javascript
复制
pc.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver){
    //Simulcast一般会触发OnTrack四次,一个audio,三个video
    //由于三个video的trackId一样,共用一个WebRTCReceiver
        r, pub := p.router.AddReceiver(receiver,track)
        if pub {//这里video到来的第一次pub才为true
      //这里把receiver发布到router里,其他peer的downtrack会挂载到receiver下
            p.session.Publish(p.router, r)

这里为了方便,再贴一下整个流程的代码,比较繁琐,可以跳过。

SessionLocal.Publish

代码语言:javascript
复制
func (s *SessionLocal) Publish(router Router,r Receiver) {
    for _, p := ranges.Peers() {
        // Don't sub toself
        if router.ID() == p.ID() || p.Subscriber() == nil{
            continue
        }
        //表示根据r的信息创建downtrack,并增加到p.Subscriber()和r中
        if err :=router.AddDownTracks(p.Subscriber(), r); err !=nil {
            Logger.Error(err, "Errorsubscribing transport to Router")
            continue
        }
    }
}

router.AddDownTracks

代码语言:javascript
复制
func (r *router) AddDownTracks(s *Subscriber,recv Receiver) error {
。。。
//如果recv不为空,表示根据recv的信息创建downtrack,并增加到s和recv中
    if recv != nil{
        if _, err :=r.AddDownTrack(s, recv); err != nil {
            return err
        }
        s.negotiate()
        return nil
    }
//如果recv为空,表示遍历房间中所有的receivers,并增加到s和recv中
    if len(r.receivers)> 0 {
        for _, rcv := ranger.receivers {
            if _, err :=r.AddDownTrack(s, rcv); err != nil {
                return err
            }
        }
        s.negotiate()
    }
    return nil
}

router.AddDownTrack

根据recv的信息创建downtrack,并增加到sub和recv中。

代码语言:javascript
复制
func (r *router) AddDownTrack(sub *Subscriber,recv Receiver) (*DownTrack, error) {
    for _, dt := rangesub.GetDownTracks(recv.StreamID()) {//避免重复添加
        if dt.ID() ==recv.TrackID() {
            return dt, nil
        }
    }
 
    codec := recv.Codec()
    if err := sub.me.RegisterCodec(codec, recv.Kind()); err !=nil {
        return nil,err
    }
    //创建downtrack,downtrack用来给客户端下发流
    downTrack, err := NewDownTrack(webrtc.RTPCodecCapability{
        MimeType:     codec.MimeType,
        ClockRate:    codec.ClockRate,
        Channels:     codec.Channels,
        SDPFmtpLine:  codec.SDPFmtpLine,
        RTCPFeedback:[]webrtc.RTCPFeedback{{"goog-remb", ""}, {"nack", ""}, {"nack", "pli"}},
    }, recv, r.bufferFactory,sub.id, r.config.MaxPacketTrack)
    if err != nil{
        return nil,err
    }
    //把downtrack增加到pc中
    if downTrack.transceiver,err = sub.pc.AddTransceiverFromTrack(downTrack,webrtc.RTPTransceiverInit{
        Direction:webrtc.RTPTransceiverDirectionSendonly,
    }); err != nil {
        return nil,err
    }
 
    // 设置关闭回调,关闭时pc自动删除track
    downTrack.OnCloseHandler(func() {
        if sub.pc.ConnectionState() !=webrtc.PeerConnectionStateClosed {
            if err :=sub.pc.RemoveTrack(downTrack.transceiver.Sender()); err !=nil {
                if err ==webrtc.ErrConnectionClosed {
                    return
                }
                Logger.Error(err, "Errorclosing down track")
            } else {//如果删除成功,再从sub中删除,然后重协商
                sub.RemoveDownTrack(recv.StreamID(), downTrack)
                sub.negotiate()
            }
        }
    })
 
  //设置OnBind回调,DownTrack.Bind()里会调用这个;PC协商完成时,DownTrack.Bind()会触发
    downTrack.OnBind(func() {
        go sub.sendStreamDownTracksReports(recv.StreamID())
    })
 
  //增加downTrack到sub中,sub只是用来管理downtracks和生成SenderReport等
    sub.AddDownTrack(recv.StreamID(), downTrack)
 
  //增加downTrack到WebRTCReceiver中,实际收发包是WebRTCReceiver来控制,在writeRTP中
    recv.AddDownTrack(downTrack,r.config.Simulcast.BestQualityFirst)
    return downTrack, nil
}

5.Simulcast切换流程

第一种,自动切换。

上边的subBestQuality,会在f层receiver到来时,自动订阅f层。

第二种,手动切换。

通过信令或datachannel控制来切换。

先来讲一下datachannel信令通道,在main里创建了一个内置dc,处理函数为datachannel.SubscriberAPI。

代码语言:javascript
复制
func main() {
    nsfu := sfu.NewSFU(conf.Config)
    dc :=nsfu.NewDatachannel(sfu.APIChannelLabel)
   dc.Use(datachannel.SubscriberAPI)
    s :=server.NewWrapperedGRPCWebServer(options, nsfu)
    if err := s.Serve(); err != nil{
        logger.Error(err,"failed to serve")
        os.Exit(1)
    }
    select {}
}

客户端发过来的切大小流指令会进入此函数。

代码语言:javascript
复制
funcSubscriberAPI(nextsfu.MessageProcessor) sfu.MessageProcessor {
    return sfu.ProcessFunc(func(ctxcontext.Context, args sfu.ProcessArgs) {
        srm := &setRemoteMedia{}
        if err :=json.Unmarshal(args.Message.Data, srm); err != nil {
            return
        }
        // Publisherchanging active layers
        if srm.Layers !=nil && len(srm.Layers) > 0 {
。。。//当前sdk逻辑不会进入这里
        } else {
      //按流ID查找downTracks
            downTracks :=args.Peer.Subscriber().GetDownTracks(srm.StreamID)
            for _, dt :=range downTracks {
                switch dt.Kind() {
                casewebrtc.RTPCodecTypeAudio:
                    dt.Mute(!srm.Audio)//音频是否需要mute/unmute
                casewebrtc.RTPCodecTypeVideo:
                    switchsrm.Video {//视频是否需要切大小流/mute
                    casehighValue:
            //这里把d.reSync.set设置为true了,writeSimulcastRTP里会自动发PLI
                        dt.Mute(false)
                        dt.SwitchSpatialLayer(2, true)
                    casemediumValue:
                        dt.Mute(false)
                        dt.SwitchSpatialLayer(1, true)
                    caselowValue:
                        dt.Mute(false)
                        dt.SwitchSpatialLayer(0, true)
                    casemutedValue:
                        dt.Mute(true)
                    }
                    switchsrm.Framerate {//当前sdk逻辑也不会进入这里,srm.Framerate=""
                    }
                }
 
            }
        }
        next.Process(ctx, args)
    })
}

DownTrack.SwitchSpatialLayer

代码语言:javascript
复制
func (d *DownTrack) SwitchSpatialLayer(targetLayer int32, setAsMax bool) error {
    if d.trackType ==SimulcastDownTrack {
        // Don't switchuntil previous switch is done or canceled
        csl := atomic.LoadInt32(&d.currentSpatialLayer)
 
        //如果当前运行layer不是正在切的layer,或当前layer是要切的
        //换句话说,如果当前layer没切完成,或者当前layer和要切的一样,那就返回错误
        if csl !=atomic.LoadInt32(&d.targetSpatialLayer) || csl ==targetLayer {
            returnErrSpatialLayerBusy
        }
        //切换layer
        if err :=d.receiver.SwitchDownTrack(d, int(targetLayer));err == nil {
            atomic.StoreInt32(&d.targetSpatialLayer,targetLayer)
            if setAsMax {
                atomic.StoreInt32(&d.maxSpatialLayer,targetLayer)
            }
        }
        return nil
    }
    returnErrSpatialNotSupported
}

WebRTCReceiver.SwitchDownTrack

代码语言:javascript
复制
func (w *WebRTCReceiver) SwitchDownTrack(track *DownTrack,layer int) error {
    if w.closed.get() {
        returnerrNoReceiverFound
    }
    //切换就是把track放入pending
    if w.available[layer].get() {
        w.Lock()
        w.pending[layer].set(true)
        w.pendingTracks[layer] = append(w.pendingTracks[layer],track)
        w.Unlock()
        return nil
    }
    return errNoReceiverFound
}

然后在writeRTP里切换:

代码语言:javascript
复制
func (w *WebRTCReceiver) writeRTP(layer int) {
....
    for {
        pkt, err :=w.buffers[layer].ReadExtended()
        if err ==io.EOF {
            return
        }
 
        //如果是大小流
        if w.isSimulcast {
            //如果正在切换,pending[layer]get()为true
            ifw.pending[layer].get() {
                // 如果是关键帧,才会切换,好在前边Mute流程里发送了PLI,这里应该很快来一个关键帧
                ifpkt.KeyFrame {
                    w.Lock()
 
                    //=========这里切换
                    for idx, dt:= range w.pendingTracks[layer] {
                    //删除原来的
                        w.deleteDownTrack(dt.CurrentSpatialLayer(), dt.peerID)
                        //存储新的dt,以后writeRTP会写入新的dt
                        w.storeDownTrack(layer, dt)
                        //设置切换完成
                        dt.SwitchSpatialLayerDone(int32(layer))
                        //pending中此dt置空
                       w.pendingTracks[layer][idx] = nil
                    }
                    //清空pendingTracks此layer
                   w.pendingTracks[layer] = w.pendingTracks[layer][:0]
                    //标志位置为false
                   w.pending[layer].set(false)
                    w.Unlock()
                } else {
                    // 如果不是关键帧,再次发送PLI
                    w.SendRTCP(pli)
                }
            }
        }
 
        for _, dt := rangew.downTracks[layer].Load().([]*DownTrack){
            if err = dt.WriteRTP(pkt, layer);err != nil {
                if err ==io.EOF && err == io.ErrClosedPipe {
                    w.Lock()
                    w.deleteDownTrack(layer, dt.id)
                    w.Unlock()
                }
                log.Error().Err(err).Str("id", dt.id).Msg("Errorwriting to down track")
            }
        }
    }
}

6. 总结

Simulcast在ion-sfu中,默认是通过datachannel来操作切换的。

首先,切换是操作pendingTracks:

SubscriberAPI---》dt.SwitchSpatialLayer-->WebRTCReceiver.SwitchDownTrack--->写入pendingTracks

然后,在WebRTCReceiver.writeRTP里进行实质切换:

WebRTCReceiver.writeRTP--->读取pendingTracks---》更换downTracks--》storeDownTrack--》OK

之后,写包就会写入新track。至此一个简单的Simulcast收发模型就建成了:

代码语言:javascript
复制
SDK---SFU--->WebRTCReceiver(audio).buffer[0].ReadExtended---->downTracks[0][0].WriteRTP->SDK
       |                                              |....
       |                                              |--->downTracks[0][N].WriteRTP
       |
       |---->WebRTCReceiver(video).buffer[0].ReadExtended---->downTracks[0][0].WriteRTP
                    |                                    |....
                    |                                    |---->downTracks[0][N].WriteRTP
                    |
                    |------------->buffer[1].ReadExtended---->downTracks[1][0].WriteRTP
                    |                                     |....
                    |                                     |----->downTracks[1][N].WriteRTP
                    |
                    |------------->buffer[2].ReadExtended---->downTracks[2][0].WriteRTP
                                                         |....
                                                         |------>downTracks[2][N].WriteRTP


 
 

作者简介:

王朋闯:前百度RTN资深工程师,前金山云RTC技术专家,前VIPKID流媒体架构师,ION开源项目发起人。

特别说明: 本文发布于知乎,已获得作者授权转载。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-11-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 LiveVideoStack 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档