前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >大话ion系列(五)

大话ion系列(五)

作者头像
LiveVideoStack
发布2021-11-26 14:39:15
7520
发布2021-11-26 14:39:15
举报
文章被收录于专栏:音视频技术音视频技术
作者 | 王朋闯 本

八、QOS之Buffer和NACK

1. buffer简介

大家都知道webrtc有jitterbuffer,ion-sfu里也有buffer,抗丢包40%的秘诀就在这里。

主要作用有:

  • 缓存rtp包,收到nack后,重传rtp包
  • 计算rtcp-nack,发送给客户端
  • 计算rtcp-twcc,发送给客户端
  • 计算rtcp-rr/sr/pli/等,发送给客户端

2.buffer结构

看了上边buffer的功能,基本buffer内的数据结构都是为功能服务的。

//待处理的包
type  pendingPackets struct {
    arrivalTime int64//到达时间
    packet      []byte//包数据
}

//扩展的包结构体
type ExtPacket struct {
    Head     bool//是否是第一个包
    Cycle    uint32//SN转了多少轮
    Arrival  int64//到达时间
    Packet   rtp.Packet//包
    Payload  interface{}//包payload
    KeyFrame bool//是否关键帧
}

// Buffer contains all packets
type Buffer struct {
    sync.Mutex
    bucket     *Bucket//定制的rtp包ringbuffer
    nacker     *nackQueue//nack计算队列
    videoPool  *sync.Pool//视频包临时缓存
    audioPool  *sync.Pool//视频包临时缓存
    codecType  webrtc.RTPCodecType
    extPackets deque.Deque//扩展包缓存
    pPackets   []pendingPackets//待处理包,用于缓存一些来不及处理的包
    closeOnce  sync.Once
    mediaSSRC  uint32//媒体源
    clockRate  uint32//时钟频率
    maxBitrate uint64//最大码率
    lastReport int64//上次报告时间
    twccExt    uint8//rtp扩展头twcc的id,sdp里有
    audioExt   uint8//rtp扩展头audiolevel的id,sdp里有
    bound      bool
    closed     atomicBool
    mime       string//媒体类型,如video/h264等

    // 是否开启remb nack twcc audiolevel
    remb       bool
    nack       bool
    twcc       bool
    audioLevel bool

    minPacketProbe     int
    lastPacketRead     int
    maxTemporalLayer   int32
    bitrate            uint64//存储码率
    bitrateHelper      uint64//用来计算码率
    lastSRNTPTime      uint64//最后一次SR的NTP时间
    lastSRRTPTime      uint32//最后一次SR的RTP时间
    lastSRRecv         int64//1970年1月1日0时0分0秒起到现在的总纳秒数
    baseSN             uint16//用来组装RR
    cycles             uint32//SN回环次数
    lastRtcpPacketTime int64//上一次rtcp
    lastRtcpSrTime     int64// Time the lastRTCP SR was received. Required for DLSR computation.
    lastTransit        uint32//用来计算jitter
    maxSeqNo           uint16//收到最大的SN

    stats Stats//状态统计

    latestTimestamp     uint32// latestreceived RTP timestamp on packet
    latestTimestampTime int64  // Time of the latest timestamp (innanos since unix epoch)

    // callbacks
    onClose      func()
    onAudioLevel func(level uint8)
    feedbackCB   func([]rtcp.Packet)
    feedbackTWCC func(sn uint16, timeNS int64, marker bool)

    // logger
    logger logr.Logger
}

3.buffer创建

Pion/webrtc支持自定义BufferFactory,设置好之后,pion/webrtc的组件会使用自定义buffer。

比如pion/srtp是实际收发srtp和srtcp包的类,它们也会使用自定义buffer。

首先来看一下ion-sfu是在哪里设置自定义buffer的:

func NewWebRTCTransportConfig(c Config)WebRTCTransportConfig {
  //这个SettingEngine是pion里很重要的设置类,可以控制pion/webrtc很多行为和参数,比如ice-lite等
    se :=webrtc.SettingEngine{}
    se.DisableMediaEngineCopy(true)
  ....
  //这里把自定义的BufferFactory给配置进去了
  //意思是pion/srtp会使用这个buffer来传包
    se.BufferFactory =c.BufferFactory.GetOrNew
}

srtp和srtcp流向是这样的:

客户端---srtp--->srtp.ReadStreamSRTP------->SFU
客户端<---srtcp---srtp.ReadStreamSRTCP<------SFU

当包到达pion/srtp时,就会触发ReadStreamSRTP.init函数和ReadStreamSRTCP.init函数。

  • ReadStreamSRTP.init调用自定义的BufferFactory.GetOrNew函数,new了一个buffer
  • ReadStreamSRTCP.init调用自定义的BufferFactory.GetOrNew函数,new了一个rtcpReader

之后收发rtp和rtcp包,就会流经这个buffer和rtcpReader:

https://github.com/pion/srtp/blob/3c34651fa0c6de900bdc91062e7ccb5992409643/stream_srtp.go#L53

func (r *ReadStreamSRTP) init(childstreamSession, ssrc uint32) error {
    sessionSRTP, ok :=child.(*SessionSRTP)
......
    ifr.session.bufferFactory != nil {
    //这里就是调用自定义的BufferFactory.GetOrNew函数了,new了一个buffer
        r.buffer = r.session.bufferFactory(packetio.RTPBufferPacket,ssrc)
    } else {
.......
    }
 
    return nil
}
srtp.(*ReadStreamSRTP).init--->session.bufferFactory(其实是buffer.BufferFactory.GetOrNew)--->buffer.NewBuffer
srtp.(*ReadStreamSRTCP).init--->session.bufferFactory(其实是buffer.BufferFactory.GetOrNew)--->buffer.NewNewRTCPReader

为什么这么搞呢?

仔细想想,如果控制了rtp和rtcp的buffer,是不是计算twcc、nack、stats等就很方便了,在buffer写入包的同时,就可以通过设置的回调函数搞各种复杂计算。

4.buffer收发包流程

收发rtp包流程图简单总结:

srtp.(*ReadStreamSRTP).write--->buffer.(*Buffer).Write--->buffer.(*Buffer).ReadExtended--->DownTrack.WriteRTP--->DownTrack.writeSimpleRTP/writeSimulcastRTP

贴一下代码:

func (r *ReadStreamSRTP) write(buf []byte) (n int, err error) {
    //这里就把包写入了自定义buffer
  n, err = r.buffer.Write(buf)
 
    if errors.Is(err,packetio.ErrFull) {
        // Silently dropdata when the buffer is full.
        return len(buf), nil
    }
 
    return n, err
}

downtrack收发rtcp包流程图:

srtp.(*ReadStreamSRTCP).write--->buffer.(*RTCPReader).Write--->DownTrack.Bind里rr.OnPacket

5.bucket存储rtp

如图,bucket是一个定制的ringbuffer,是用来存储rtp包的:

包含一个数组,step是索引,每来一个包,先把包长度存入2字节,再把包存入一个MTU;step递增,以此类推,达到最大再从0循环(从0到maxSteps)。

buf: [2][MTU][2][MTU][2][MTU][2][MTU]...[MTU]
             0       1       2       3      maxSteps
             |       |      |       |  ... |
         step------------------------------->
          <---------------------------------|

rtp包写入bucket,并被WebRTCReceiver用来查找+重传包的过程。

buffer.(*Buffer).Write-->buffer.Buffer.calc-->buffer.bucket.AddPacket-->buffer.bucket.GetPacket<---WebRTCReceiver.RetransmitPackets<---DownTrack.handleRTCP

看下代码细节:

const maxPktSize = 1500//一般MTU的大小
 
type Bucket struct {
    buf []byte//一块buffer,可以存多个包
    src *[]byte//存储原始buffer指针
 
    init     bool//是否初始化
    step     int//递增计数
    headSN   uint16//头部sn
    maxSteps int//最大计数,一般是(总buffer大小/mtu大小)来计算
}
 
//创建bucket,存储包
funcNewBucket(buf *[]byte) *Bucket{
    return &Bucket{
        src:      buf,
        buf:      *buf,
        maxSteps: int(math.Floor(float64(len(*buf))/float64(maxPktSize))) -1,
    }
}
 
//塞包
func (b *Bucket) AddPacket(pkt []byte, sn uint16, latest bool) ([]byte, error) {
    if !b.init {//如果没有初始化headSN
        b.headSN = sn - 1
        b.init = true
    }
  //如果不是最后一个包,即乱序
    if !latest {
        return b.set(sn, pkt)//存储or覆盖
    }
  //如果是最后一个包
    diff := sn -b.headSN
    b.headSN = sn
  //计算step
    for i := uint16(1); i < diff;i++ {
        b.step++
        if b.step >=b.maxSteps {
            b.step = 0
        }
    }
    return b.push(pkt), nil
}
 
//查找序号sn的包并写入buf
func (b *Bucket) GetPacket(buf []byte, sn uint16) (i int, err error) {
    p := b.get(sn)
    if p == nil{
        err = errPacketNotFound
        return
    }
    i = len(p)
    if cap(buf) < i {
        err = errBufferTooSmall
        return
    }
    if len(buf) < i {
        buf = buf[:i]
    }
    copy(buf, p)
    return
}
 
//存包
func (b *Bucket) push(pkt []byte) []byte {
  //先写入2字节长度
    binary.BigEndian.PutUint16(b.buf[b.step*maxPktSize:],uint16(len(pkt)))
    off := b.step*maxPktSize+ 2
  //再写入包长度
    copy(b.buf[off:],pkt)
    b.step++//递增
    if b.step > b.maxSteps{
        b.step = 0//归零
    }
    return b.buf[off : off+len(pkt)]//返回包数据
}
 
//查找包数据
func (b *Bucket) get(sn uint16) []byte {
    pos := b.step - int(b.headSN-sn+1)
    if pos < 0 {
        if pos*-1 > b.maxSteps+1 {
            return nil
        }
        pos = b.maxSteps +pos + 1
    }
    off := pos * maxPktSize
    if off > len(b.buf) {
        return nil
    }
    if binary.BigEndian.Uint16(b.buf[off+4:off+6]) != sn{
        return nil
    }
    sz := int(binary.BigEndian.Uint16(b.buf[off : off+2]))
    return b.buf[off+2 : off+2+sz]
}
 
//写入包数据
func (b *Bucket) set(sn uint16, pkt []byte) ([]byte, error) {
    if b.headSN-sn >=uint16(b.maxSteps+1) {
        return nil,errPacketTooOld
    }
    pos := b.step - int(b.headSN-sn+1)
    if pos < 0 {
        pos = b.maxSteps +pos + 1
    }
    off := pos *maxPktSize
    if off > len(b.buf) ||off < 0 {
        return nil,errPacketTooOld
    }
 
    // 如果已经存在则不写入
    if binary.BigEndian.Uint16(b.buf[off+4:off+6]) == sn{
        return nil,errRTXPacket
    }
 
    binary.BigEndian.PutUint16(b.buf[off:], uint16(len(pkt)))
    copy(b.buf[off+2:], pkt)
    return b.buf[off+2 : off+2+len(pkt)], nil
}

6. nackQueue计算nack

nack数组,用来存储nack信息并计算rtcp-nack。

[9316][9317]...[N]
 |
 sn

代码细节:

const maxNackTimes = 3   // 每个nack包发送的最大次数,防止客户端一直重传加重拥塞
const maxNackCache = 100// 最大缓存个数
 
type nack struct {
    sn     uint32//rtp序列号
    nacked uint8//发送的次数
}
 
type nackQueue struct {
    nacks []nack//nack数组
    kfSN  uint32//askKeyframeSN 要求发送PLI
}
 
//创建nackQueue
funcnewNACKQueue() *nackQueue{
    return &nackQueue{
        nacks: make([]nack, 0, maxNackCache+1),
    }
}
 
//删除
func (n *nackQueue) remove(extSN uint32) {
    i := sort.Search(len(n.nacks), func(iint) bool { return n.nacks[i].sn >= extSN })
    if i >= len(n.nacks) ||n.nacks[i].sn != extSN {
        return
    }
    copy(n.nacks[i:],n.nacks[i+1:])
    n.nacks = n.nacks[:len(n.nacks)-1]
}
 
//插入,extSN从大到小,查找效率高
func (n *nackQueue) push(extSN uint32) {
  //找到<=数组中sn的位置,一般是0
    i := sort.Search(len(n.nacks), func(iint) bool { return n.nacks[i].sn >= extSN })
    if i < len(n.nacks) &&n.nacks[i].sn == extSN {
        return
    }
 
    nck := nack{
        sn:     extSN,
        nacked: 0,
    }
 
    if i == len(n.nacks) {//如果是0,直接append
        n.nacks = append(n.nacks, nck)
    } else {//否则复制元素,到最前边
        n.nacks = append(n.nacks[:i+1], n.nacks[i:]...)
        n.nacks[i] = nck
    }
 
  //如果nack达到最大,删除最前一个
    if len(n.nacks) >=maxNackCache {
        copy(n.nacks,n.nacks[1:])
    }
}
 
//生成nack
func (n *nackQueue) pairs(headSN uint32)([]rtcp.NackPair, bool) {
    if len(n.nacks) ==0 {
        return nil, false
    }
    i := 0
    askKF := false
    var np rtcp.NackPair
    var nps []rtcp.NackPair
    for _, nck := rangen.nacks {
        if nck.nacked >=maxNackTimes {//如果nack重发>=3次
            if nck.sn >n.kfSN {//如果sn>上次请求关键帧SN
                n.kfSN = nck.sn//记录下来
                askKF = true//返回请求PLI
            }
            continue
        }
 
    //跳过比headSN大3的
        if nck.sn >=headSN-2 {//如:9316>=9320-2 不成立,跳过
            n.nacks[i] = nck
            i++
            continue
        }
    //过来的是3个包
    //这个值是个经验值:
    //如果太大,返回rtcp-nack包会delay太久,导致客户端重发包太迟,画面延迟
    //如果太小,比如2,则可能乱序的概率会大,因为等的越短,乱序包到来的概率越小
        n.nacks[i] = nack{
            sn:     nck.sn,
            nacked: nck.nacked +1,//计数器+1
        }
        i++
 
    //如果是np.PacketID==0,是第一个包,需要初始化np.PacketIDnp.LostPackets
        if np.PacketID ==0 || uint16(nck.sn) > np.PacketID+16 {
            if np.PacketID !=0 {
                nps = append(nps, np)
            }
            np.PacketID = uint16(nck.sn)
            np.LostPackets = 0
            continue
        }
    //如果是后续包,计算LostPackets
        np.LostPackets |= 1 <<(uint16(nck.sn) - np.PacketID - 1)
    }
 
    if np.PacketID != 0 {
        nps = append(nps, np)//追加到后边
    }
    n.nacks = n.nacks[:i]//去掉已经算过的包
 
    return nps, askKF
}

7. 总结

本文介绍了Qos中两个基础部分:

  • 使用bucket缓存rtp包,收到nack后,重传rtp包
  • 使用nackQueue,存储信息并计算rtcp-nack,发送给客户端
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-11-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 LiveVideoStack 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档