七、Simulcast流程
1. Simulcast概念
先介绍WebRTC的一个概念——Simulcast(联播,俗称大小流):
推流端===f/h/q==>SFU--f--->收流端A
|---q--->收流端B
|---h--->收流端C
.........
a=rid:f send
a=rid:h send
a=rid:q send
a=simulcast:send f;h;q
2.收发流程
看本章之前,最好看一下前一章,熟悉一下收发流程,本文只重点介绍其中的Simulcast部分。
收发包逻辑打通步骤:
SDK推流---->OnTrack---->router.AddReceiver(设置Buffer和上行Track)------>SessionLocal.Publish(设置下行Track)---->收发包逻辑打通
3.Simulcast上行流程
非Simulcast情况,OnTrack一般会触发两次:一个audioTrack+一个videoTrack。
Simulcast下,OnTrack一般会触发四次:一个audioTrack+三个videoTrack(rid分别为fhq)。
这个流程会触发四次:
OnTrack--->router.AddReceiver--->WebRTCReceiver.AddUpTrack
三个videoTrack,共用同一个WebRTCReceiver。
type WebRTCReceiver struct {
。。。
receiver *webrtc.RTPReceiver
codec webrtc.RTPCodecParameters
rtcpCh chan []rtcp.Packet
buffers [3]*buffer.Buffer//需要三个buffer
upTracks [3]*webrtc.TrackRemote//三个TrackRemote
。。。
pendingTracks [3][]*DownTrack//三个层,每层来订阅的downtrack
。。。
}
接下来看一下AddUpTrack是如何工作的:
func (w *WebRTCReceiver) AddUpTrack(track *webrtc.TrackRemote,buff *buffer.Buffer, bestQualityFirst bool) {
if w.closed.get() {
return
}
//根据RID来区分layer
var layer int
switch track.RID() {//如果没开simulcast,为""
case fullResolution:
layer = 2
case halfResolution:
layer = 1
default:
layer = 0//如果没开simulcast,为0
}
w.Lock()
//设置空域层layer的track
w.upTracks[layer] = track
//设置空域层layer的buff
w.buffers[layer] = buff
w.available[layer].set(true)
//设置空域层layer的downtrack
w.downTracks[layer].Store(make([]*DownTrack,0, 10))
w.pendingTracks[layer] = make([]*DownTrack,0, 10)
w.Unlock()
//闭包函数,按最佳质量订阅,切到f层
subBestQuality := func(targetLayerint) {
for l := 0; l <targetLayer; l++ {
dts :=w.downTracks[l].Load()
if dts == nil{
continue
}
for _, dt :=range dts.([]*DownTrack) {
_ = dt.SwitchSpatialLayer(int32(targetLayer), false)
}
}
}
//闭包函数,按最差质量订阅,切到q层
subLowestQuality := func(targetLayerint) {
for l := 2; l !=targetLayer; l-- {
dts :=w.downTracks[l].Load()
if dts == nil{
continue
}
for _, dt :=range dts.([]*DownTrack) {
_ = dt.SwitchSpatialLayer(int32(targetLayer), false)
}
}
}
//是否开启大小流
if w.isSimulcast {
//如果配置最佳质量,则等到f层到来时,订阅它
if bestQualityFirst &&(!w.available[2].get() || layer == 2) {
subBestQuality(layer)
//如果配置最差质量,则等到q层到来时,订阅它
} else if!bestQualityFirst && (!w.available[0].get() ||layer == 0) {
subLowestQuality(layer)
}
}
//启动读写流程
go w.writeRTP(layer)
}
真正的收发包流程来了:
func (w *WebRTCReceiver) writeRTP(layer int) {
defer func() {//这里设置自动清理函数
w.closeOnce.Do(func() {
w.closed.set(true)
w.closeTracks()
})
}()
//创建一个PLI包,后边要用
pli := []rtcp.Packet{
&rtcp.PictureLossIndication{SenderSSRC:rand.Uint32(), MediaSSRC: w.SSRC(layer)},
}
for {
//这里可以看到,真正读包是从buffer里读出来的,正是前边讲到的自定义buffer
pkt, err :=w.buffers[layer].ReadExtended()
if err ==io.EOF {
return
}
//如果开启大小流
if w.isSimulcast {
//一开始是pending状态
ifw.pending[layer].get() {
//如果收到的包是关键帧
ifpkt.KeyFrame {
w.Lock()
//如果有切换中的layer,那就切一下
for idx,dt := range w.pendingTracks[layer] {
w.deleteDownTrack(dt.CurrentSpatialLayer(), dt.peerID)
w.storeDownTrack(layer, dt)
dt.SwitchSpatialLayerDone(int32(layer))
w.pendingTracks[layer][idx] = nil
}
w.pendingTracks[layer] = w.pendingTracks[layer][:0]
w.pending[layer].set(false)
w.Unlock()
} else {
//如果是非关键字,说明需要发送PLI
w.SendRTCP(pli)
}
}
}
//这里是不是有疑问,[]*downTracks是SessionLocal.Publish里塞过来的,后边会介绍:)
for _, dt := rangew.downTracks[layer].Load().([]*DownTrack){
//下行track写入rtp包
if err = dt.WriteRTP(pkt, layer);err != nil {
if err ==io.EOF && err == io.ErrClosedPipe {
w.Lock()
w.deleteDownTrack(layer, dt.id)
w.Unlock()
}
log.Error().Err(err).Str("id", dt.id).Msg("Errorwriting to down track")
}
}
}
}
至此一个简单的Simulcast收发模型:
SFU--->WebRTCReceiver(audio).buffer[0].ReadExtended---->downTracks[0][0].WriteRTP->SDK
| |....
| |--->downTracks[0][N].WriteRTP
|
|---->WebRTCReceiver(video).buffer[0].ReadExtended---->downTracks[0][0].WriteRTP
| |....
| |---->downTracks[0][N].WriteRTP
|
|------------->buffer[1].ReadExtended---->downTracks[1][0].WriteRTP
| |....
| |----->downTracks[1][N].WriteRTP
|
|------------->buffer[2].ReadExtended---->downTracks[2][0].WriteRTP
|....
|------>downTracks[2][N].WriteRTP
上面省略了SDK--->ReadStreamSRTP.buffer.Write,这个buffer和WebRTCReceiver.buffer是同一个。
订阅端SDK的切大小流操作,其实就是在0-2来回挂载downTrack而已。
4.Simulcast下行流程
读者前边的疑问,downTracks是哪里塞过来的?流程在这里:
OnTrack--->SessionLocal.Publish--->router.AddDownTracks--->router.AddDownTrack--->WebRTCReceiver.AddDownTrack--->WebRTCReceiver.storeDownTrack
pc.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver){
//Simulcast一般会触发OnTrack四次,一个audio,三个video
//由于三个video的trackId一样,共用一个WebRTCReceiver
r, pub := p.router.AddReceiver(receiver,track)
if pub {//这里video到来的第一次pub才为true
//这里把receiver发布到router里,其他peer的downtrack会挂载到receiver下
p.session.Publish(p.router, r)
这里为了方便,再贴一下整个流程的代码,比较繁琐,可以跳过。
SessionLocal.Publish
func (s *SessionLocal) Publish(router Router,r Receiver) {
for _, p := ranges.Peers() {
// Don't sub toself
if router.ID() == p.ID() || p.Subscriber() == nil{
continue
}
//表示根据r的信息创建downtrack,并增加到p.Subscriber()和r中
if err :=router.AddDownTracks(p.Subscriber(), r); err !=nil {
Logger.Error(err, "Errorsubscribing transport to Router")
continue
}
}
}
router.AddDownTracks
func (r *router) AddDownTracks(s *Subscriber,recv Receiver) error {
。。。
//如果recv不为空,表示根据recv的信息创建downtrack,并增加到s和recv中
if recv != nil{
if _, err :=r.AddDownTrack(s, recv); err != nil {
return err
}
s.negotiate()
return nil
}
//如果recv为空,表示遍历房间中所有的receivers,并增加到s和recv中
if len(r.receivers)> 0 {
for _, rcv := ranger.receivers {
if _, err :=r.AddDownTrack(s, rcv); err != nil {
return err
}
}
s.negotiate()
}
return nil
}
router.AddDownTrack
根据recv的信息创建downtrack,并增加到sub和recv中。
func (r *router) AddDownTrack(sub *Subscriber,recv Receiver) (*DownTrack, error) {
for _, dt := rangesub.GetDownTracks(recv.StreamID()) {//避免重复添加
if dt.ID() ==recv.TrackID() {
return dt, nil
}
}
codec := recv.Codec()
if err := sub.me.RegisterCodec(codec, recv.Kind()); err !=nil {
return nil,err
}
//创建downtrack,downtrack用来给客户端下发流
downTrack, err := NewDownTrack(webrtc.RTPCodecCapability{
MimeType: codec.MimeType,
ClockRate: codec.ClockRate,
Channels: codec.Channels,
SDPFmtpLine: codec.SDPFmtpLine,
RTCPFeedback:[]webrtc.RTCPFeedback{{"goog-remb", ""}, {"nack", ""}, {"nack", "pli"}},
}, recv, r.bufferFactory,sub.id, r.config.MaxPacketTrack)
if err != nil{
return nil,err
}
//把downtrack增加到pc中
if downTrack.transceiver,err = sub.pc.AddTransceiverFromTrack(downTrack,webrtc.RTPTransceiverInit{
Direction:webrtc.RTPTransceiverDirectionSendonly,
}); err != nil {
return nil,err
}
// 设置关闭回调,关闭时pc自动删除track
downTrack.OnCloseHandler(func() {
if sub.pc.ConnectionState() !=webrtc.PeerConnectionStateClosed {
if err :=sub.pc.RemoveTrack(downTrack.transceiver.Sender()); err !=nil {
if err ==webrtc.ErrConnectionClosed {
return
}
Logger.Error(err, "Errorclosing down track")
} else {//如果删除成功,再从sub中删除,然后重协商
sub.RemoveDownTrack(recv.StreamID(), downTrack)
sub.negotiate()
}
}
})
//设置OnBind回调,DownTrack.Bind()里会调用这个;PC协商完成时,DownTrack.Bind()会触发
downTrack.OnBind(func() {
go sub.sendStreamDownTracksReports(recv.StreamID())
})
//增加downTrack到sub中,sub只是用来管理downtracks和生成SenderReport等
sub.AddDownTrack(recv.StreamID(), downTrack)
//增加downTrack到WebRTCReceiver中,实际收发包是WebRTCReceiver来控制,在writeRTP中
recv.AddDownTrack(downTrack,r.config.Simulcast.BestQualityFirst)
return downTrack, nil
}
5.Simulcast切换流程
第一种,自动切换。
上边的subBestQuality,会在f层receiver到来时,自动订阅f层。
第二种,手动切换。
通过信令或datachannel控制来切换。
先来讲一下datachannel信令通道,在main里创建了一个内置dc,处理函数为datachannel.SubscriberAPI。
func main() {
nsfu := sfu.NewSFU(conf.Config)
dc :=nsfu.NewDatachannel(sfu.APIChannelLabel)
dc.Use(datachannel.SubscriberAPI)
s :=server.NewWrapperedGRPCWebServer(options, nsfu)
if err := s.Serve(); err != nil{
logger.Error(err,"failed to serve")
os.Exit(1)
}
select {}
}
客户端发过来的切大小流指令会进入此函数。
funcSubscriberAPI(nextsfu.MessageProcessor) sfu.MessageProcessor {
return sfu.ProcessFunc(func(ctxcontext.Context, args sfu.ProcessArgs) {
srm := &setRemoteMedia{}
if err :=json.Unmarshal(args.Message.Data, srm); err != nil {
return
}
// Publisherchanging active layers
if srm.Layers !=nil && len(srm.Layers) > 0 {
。。。//当前sdk逻辑不会进入这里
} else {
//按流ID查找downTracks
downTracks :=args.Peer.Subscriber().GetDownTracks(srm.StreamID)
for _, dt :=range downTracks {
switch dt.Kind() {
casewebrtc.RTPCodecTypeAudio:
dt.Mute(!srm.Audio)//音频是否需要mute/unmute
casewebrtc.RTPCodecTypeVideo:
switchsrm.Video {//视频是否需要切大小流/mute
casehighValue:
//这里把d.reSync.set设置为true了,writeSimulcastRTP里会自动发PLI
dt.Mute(false)
dt.SwitchSpatialLayer(2, true)
casemediumValue:
dt.Mute(false)
dt.SwitchSpatialLayer(1, true)
caselowValue:
dt.Mute(false)
dt.SwitchSpatialLayer(0, true)
casemutedValue:
dt.Mute(true)
}
switchsrm.Framerate {//当前sdk逻辑也不会进入这里,srm.Framerate=""
}
}
}
}
next.Process(ctx, args)
})
}
DownTrack.SwitchSpatialLayer
func (d *DownTrack) SwitchSpatialLayer(targetLayer int32, setAsMax bool) error {
if d.trackType ==SimulcastDownTrack {
// Don't switchuntil previous switch is done or canceled
csl := atomic.LoadInt32(&d.currentSpatialLayer)
//如果当前运行layer不是正在切的layer,或当前layer是要切的
//换句话说,如果当前layer没切完成,或者当前layer和要切的一样,那就返回错误
if csl !=atomic.LoadInt32(&d.targetSpatialLayer) || csl ==targetLayer {
returnErrSpatialLayerBusy
}
//切换layer
if err :=d.receiver.SwitchDownTrack(d, int(targetLayer));err == nil {
atomic.StoreInt32(&d.targetSpatialLayer,targetLayer)
if setAsMax {
atomic.StoreInt32(&d.maxSpatialLayer,targetLayer)
}
}
return nil
}
returnErrSpatialNotSupported
}
WebRTCReceiver.SwitchDownTrack
func (w *WebRTCReceiver) SwitchDownTrack(track *DownTrack,layer int) error {
if w.closed.get() {
returnerrNoReceiverFound
}
//切换就是把track放入pending
if w.available[layer].get() {
w.Lock()
w.pending[layer].set(true)
w.pendingTracks[layer] = append(w.pendingTracks[layer],track)
w.Unlock()
return nil
}
return errNoReceiverFound
}
然后在writeRTP里切换:
func (w *WebRTCReceiver) writeRTP(layer int) {
....
for {
pkt, err :=w.buffers[layer].ReadExtended()
if err ==io.EOF {
return
}
//如果是大小流
if w.isSimulcast {
//如果正在切换,pending[layer]get()为true
ifw.pending[layer].get() {
// 如果是关键帧,才会切换,好在前边Mute流程里发送了PLI,这里应该很快来一个关键帧
ifpkt.KeyFrame {
w.Lock()
//=========这里切换
for idx, dt:= range w.pendingTracks[layer] {
//删除原来的
w.deleteDownTrack(dt.CurrentSpatialLayer(), dt.peerID)
//存储新的dt,以后writeRTP会写入新的dt
w.storeDownTrack(layer, dt)
//设置切换完成
dt.SwitchSpatialLayerDone(int32(layer))
//pending中此dt置空
w.pendingTracks[layer][idx] = nil
}
//清空pendingTracks此layer
w.pendingTracks[layer] = w.pendingTracks[layer][:0]
//标志位置为false
w.pending[layer].set(false)
w.Unlock()
} else {
// 如果不是关键帧,再次发送PLI
w.SendRTCP(pli)
}
}
}
for _, dt := rangew.downTracks[layer].Load().([]*DownTrack){
if err = dt.WriteRTP(pkt, layer);err != nil {
if err ==io.EOF && err == io.ErrClosedPipe {
w.Lock()
w.deleteDownTrack(layer, dt.id)
w.Unlock()
}
log.Error().Err(err).Str("id", dt.id).Msg("Errorwriting to down track")
}
}
}
}
6. 总结
Simulcast在ion-sfu中,默认是通过datachannel来操作切换的。
首先,切换是操作pendingTracks:
SubscriberAPI---》dt.SwitchSpatialLayer-->WebRTCReceiver.SwitchDownTrack--->写入pendingTracks
然后,在WebRTCReceiver.writeRTP里进行实质切换:
WebRTCReceiver.writeRTP--->读取pendingTracks---》更换downTracks--》storeDownTrack--》OK
之后,写包就会写入新track。至此一个简单的Simulcast收发模型就建成了:
SDK---SFU--->WebRTCReceiver(audio).buffer[0].ReadExtended---->downTracks[0][0].WriteRTP->SDK
| |....
| |--->downTracks[0][N].WriteRTP
|
|---->WebRTCReceiver(video).buffer[0].ReadExtended---->downTracks[0][0].WriteRTP
| |....
| |---->downTracks[0][N].WriteRTP
|
|------------->buffer[1].ReadExtended---->downTracks[1][0].WriteRTP
| |....
| |----->downTracks[1][N].WriteRTP
|
|------------->buffer[2].ReadExtended---->downTracks[2][0].WriteRTP
|....
|------>downTracks[2][N].WriteRTP
作者简介:
王朋闯:前百度RTN资深工程师,前金山云RTC技术专家,前VIPKID流媒体架构师,ION开源项目发起人。
特别说明: 本文发布于知乎,已获得作者授权转载。
本文分享自 LiveVideoStack 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!