前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Golang流媒体实战之五:lal推流服务源码阅读

Golang流媒体实战之五:lal推流服务源码阅读

作者头像
程序员欣宸
发布2023-04-09 09:13:18
6450
发布2023-04-09 09:13:18
举报
文章被收录于专栏:实战docker实战docker

欢迎访问我的GitHub

这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos

《Golang流媒体实战》系列的链接

  1. 体验开源项目lal
  2. 回源
  3. 转推和录制
  4. lalserver的启动源码阅读

准备工作

  • 本文要学习的是lalserver处理rtmp推流的功能代码,因此必须要对rtmp协议有所了解,至少要知道握手、chunk、message、messageType、amf0命令的基本概念,有关rtmp协议的资料在网上已经很丰富了,这里就不展开了,仅提供一个wiki作为参考:https://en.wikipedia.org/wiki/Real-Time_Messaging_Protocol
  • lalserver源码仓库:https://github.com/q191201771/lal
  • 在源码的逻辑和分支较多时,可以结合lal日志来确定真实的执行顺序,让日志输出更丰富的内容,因此这里也会先推流,拿到lal的日志,推流操作曾在《体验开源项目lal》一文中执行过,命令如下
代码语言:javascript
复制
./ffmpeg \
-re \
-stream_loop -1 \
-i ../videos/sample.mp4 \
-c copy \
-f flv \
'rtmp://127.0.0.1:1935/live/test110'

本篇概览

  • 推拉流,这是流媒体技术中的基本功能,本篇通过阅读lal源码,了解推流功能的具体实现
  • 本次学习的是rtmp推流服务端源码,总的来说,处理推流的流程如下
  • 接收TCP连接
  • 握手
  • 接收chunk包,组成mssage
  • 根据messageType的不同,分别处理message
  • 对于amf0类型的message,还要根据amf0命令的不同分别处理6. 音视频数据的处理
  • 结束推流时的相关操作
  • 接下来就一起来学习lal的推流源码,并借此对推流功能做深入了解

处理rtmp推流请求的入口

  • 在lalserver侧,起点是rtmp server收到一个远程TCP连接(默认1935端口),来看看lal是如何处理的,也就是rtmp server的处理逻辑,代码在lal/pkg/rtmp/server.go
代码语言:javascript
复制
func (server *Server) RunLoop() error {
	for {
		conn, err := server.ln.Accept()
		if err != nil {
			return err
		}
		go server.handleTcpConnect(conn)
	}
}
  • 从上述代码可见,每当收到一个TCP连接,就在一个协程中用handleTcpConnect方法处理这个连接,handleTcpConnect方法不涉及细节,内容很简单:为TCP连接创建ServerSession对象,将具体处理交给ServerSession对象执行
代码语言:javascript
复制
func (server *Server) handleTcpConnect(conn net.Conn) {
	Log.Infof("accept a rtmp connection. remoteAddr=%s", conn.RemoteAddr().String())
	session := NewServerSession(server, conn)
	_ = session.RunLoop()

	if session.DisposeByObserverFlag {
		return
	}
	switch session.sessionStat.BaseType() {
	case base.SessionBaseTypePubStr:
		server.observer.OnDelRtmpPubSession(session)
	case base.SessionBaseTypeSubStr:
		server.observer.OnDelRtmpSubSession(session)
	}
}
  • 从上面的代码可以看出,主要业务逻辑都在ServerSession对象的RunLoop方法中,展开后如下所示,先握手(handshake),再执行具体的逻辑(runReadLoop)
代码语言:javascript
复制
func (s *ServerSession) RunLoop() (err error) {
	if err = s.handshake(); err != nil {
		_ = s.dispose(err)
		return err
	}

	err = s.runReadLoop()
	_ = s.dispose(err)

	return err
}

握手

  • 看握手(handshake)代码之前,先温习一下rtmp的握手流程
  • 再看握手的源码,一目了然,不过代码和图略有点不同:lal的代码中,S0S1S2是连发的
代码语言:javascript
复制
func (s *ServerSession) handshake() error {
	if err := s.hs.ReadC0C1(s.conn); err != nil {
		return err
	}
	Log.Infof("[%s] < R Handshake C0+C1.", s.UniqueKey())

	Log.Infof("[%s] > W Handshake S0+S1+S2.", s.UniqueKey())
	if err := s.hs.WriteS0S1S2(s.conn); err != nil {
		return err
	}

	if err := s.hs.ReadC2(s.conn); err != nil {
		return err
	}
	Log.Infof("[%s] < R Handshake C2.", s.UniqueKey())
	return nil
}
  • 想看下具体读取C0、C1、C2这些数据的具体逻辑?如下图,最普通的io读取而已

读取和处理chunk

  • 握手成功后,接着就是lal/pkg/rtmp/server_session.go#runReadLoop方法,里面直接调用lal/pkg/rtmp/chunk_composer.go#RunLoop,记住RunLoop的第二个入参,那是从chunk中获取到完整消息后的回调方法(即lal/pkg/rtmp/server_session.go#doMsg),也是个重点
代码语言:javascript
复制
func (s *ServerSession) runReadLoop() error {
	return s.chunkComposer.RunLoop(s.conn, s.doMsg)
}
  • 真正的核心代码到了,推流客户端和lalserver握手成功后,推流操作的所有逻辑都在这里:lal/pkg/rtmp/chunk_composer.go#RunLoop,这里面的代码分为几部分,代码有点长就不贴出来了,来看几个关键逻辑
  • 首先看到的是个无限循环,每当处理完一个chunk包后,就继续处理下一个chunk包
  • 根据chunk steam id(csid)确定当前包对应的消息,该消息由一个或多个chunk包组成
  • 下面这段比较重要,每个消息都有自己的csid,进而对应一个stream对象,该消息对应的所有包都保存在stream的成员变量中,并记录已经保存的长度,当保存的内容长度达到消息长度时,意味着该消息对应的所有数据已经全部获取完成了,可以执行处理该消息的代码了
  • 上图红色箭头2所指的Flush方法,其实并没有内存或者硬盘的读写操作,而是仅修改了位置变量,表示buffer中那些是正式的消息内容,这个设计值得学习
  • 从chunk中拿到了完整的消息,接下来就要执行处理消息的逻辑了,如下图,之所以有两处回调代码,是因为如果消息类型是聚集消息(Aggregate Message,22),就意味着从chunk中取得的一条消息,实际上是多条消息,需要拆分后逐一回调处理
  • 以上就是chunk的处理逻辑了,现在已从chunk中得到完整消息,该看看消息的处理逻辑了

处理消息

  • 前面的图中可以看出,处理消息的代码是红色箭头所指的cb(stream),实际对应的代码是server_session.go#doMsg
  • doMsg的代码简单明了,根据不同消息类型做不同的操作
代码语言:javascript
复制
func (s *ServerSession) doMsg(stream *Stream) error {
	if err := s.writeAcknowledgementIfNeeded(stream); err != nil {
		return err
	}

	//log.Debugf("%d %d %v", stream.header.msgTypeId, stream.msgLen, stream.header)
	switch stream.header.MsgTypeId {
	case base.RtmpTypeIdWinAckSize:
		return s.doWinAckSize(stream)
	case base.RtmpTypeIdSetChunkSize:
		// noop
		// 因为底层的 chunk composer 已经处理过了,这里就不用处理
	case base.RtmpTypeIdCommandMessageAmf0:
		return s.doCommandMessage(stream)
	case base.RtmpTypeIdCommandMessageAmf3:
		return s.doCommandAmf3Message(stream)
	case base.RtmpTypeIdMetadata:
		return s.doDataMessageAmf0(stream)
	case base.RtmpTypeIdAck:
		return s.doAck(stream)
	case base.RtmpTypeIdUserControl:
		s.doUserControl(stream)
	case base.RtmpTypeIdAudio:
		fallthrough
	case base.RtmpTypeIdVideo:
		if s.sessionStat.BaseType() != base.SessionBaseTypePubStr {
			return nazaerrors.Wrap(base.ErrRtmpUnexpectedMsg)
		}
		s.avObserver.OnReadRtmpAvMsg(stream.toAvMsg())
	default:
		Log.Warnf("[%s] read unknown message. typeid=%d, %s", s.UniqueKey(), stream.header.MsgTypeId, stream.toDebugString())

	}
	return nil
}
  • 那么问题来了,为了学习推流的知识,每种消息的处理逻辑代码都要去读一遍吗?貌似有点多,而且这样顺序读也不清楚各消息之间的顺序或者依赖关系,这时候需要偷懒了…
  • 为了搞清楚推流时协议交互的具体情况,对doMsg方法略作修改,增加下图黄色箭头这行代码,然后编译运行
  • 再用FFmpeg做一次推流,拿到新的日志
  • 推流后的日志如下(通过grep命令,只看上面那行日志)
代码语言:javascript
复制
msg header {Csid:3 MsgLen:139 MsgTypeId:20 MsgStreamId:0 TimestampAbs:0} - server_session.go:215
msg header {Csid:2 MsgLen:4 MsgTypeId:1 MsgStreamId:0 TimestampAbs:0} - server_session.go:215
msg header {Csid:3 MsgLen:36 MsgTypeId:20 MsgStreamId:0 TimestampAbs:0} - server_session.go:215
msg header {Csid:3 MsgLen:32 MsgTypeId:20 MsgStreamId:0 TimestampAbs:0} - server_session.go:215
msg header {Csid:3 MsgLen:25 MsgTypeId:20 MsgStreamId:0 TimestampAbs:0} - server_session.go:215
msg header {Csid:8 MsgLen:37 MsgTypeId:20 MsgStreamId:1 TimestampAbs:0} - server_session.go:215
msg header {Csid:4 MsgLen:388 MsgTypeId:18 MsgStreamId:1 TimestampAbs:0} - server_session.go:215
msg header {Csid:6 MsgLen:43 MsgTypeId:9 MsgStreamId:1 TimestampAbs:0} - server_session.go:215
msg header {Csid:4 MsgLen:4 MsgTypeId:8 MsgStreamId:1 TimestampAbs:0} - server_session.go:215
msg header {Csid:6 MsgLen:105227 MsgTypeId:9 MsgStreamId:1 TimestampAbs:0} - server_session.go:215
msg header {Csid:4 MsgLen:969 MsgTypeId:8 MsgStreamId:1 TimestampAbs:0} - server_session.go:215
msg header {Csid:4 MsgLen:1013 MsgTypeId:8 MsgStreamId:1 TimestampAbs:21} - server_session.go:215
msg header {Csid:6 MsgLen:1559 MsgTypeId:9 MsgStreamId:1 TimestampAbs:40} - server_session.go:215
msg header {Csid:4 MsgLen:1028 MsgTypeId:8 MsgStreamId:1 TimestampAbs:43} - server_session.go:215
msg header {Csid:4 MsgLen:1032 MsgTypeId:8 MsgStreamId:1 TimestampAbs:64} - server_session.go:215
msg header {Csid:6 MsgLen:2158 MsgTypeId:9 MsgStreamId:1 TimestampAbs:80} - server_session.go:215
msg header {Csid:4 MsgLen:992 MsgTypeId:8 MsgStreamId:1 TimestampAbs:85} - server_session.go:215
msg header {Csid:4 MsgLen:960 MsgTypeId:8 MsgStreamId:1 TimestampAbs:107} - server_session.go:215
msg header {Csid:6 MsgLen:2213 MsgTypeId:9 MsgStreamId:1 TimestampAbs:120} - server_session.go:215
msg header {Csid:4 MsgLen:975 MsgTypeId:8 MsgStreamId:1 TimestampAbs:128} - server_session.go:215
msg header {Csid:4 MsgLen:991 MsgTypeId:8 MsgStreamId:1 TimestampAbs:149} - server_session.go:215
msg header {Csid:6 MsgLen:2528 MsgTypeId:9 MsgStreamId:1 TimestampAbs:160} - server_session.go:215
msg header {Csid:4 MsgLen:1011 MsgTypeId:8 MsgStreamId:1 TimestampAbs:171} - server_session.go:215
msg header {Csid:4 MsgLen:1002 MsgTypeId:8 MsgStreamId:1 TimestampAbs:192} - server_session.go:215
  • 再来看wiki上对消息类型的解释
  • 结合协议和日志可以看出,推流开始后,除了第二个设置chunk,其他主要是AMF0编码的RTMP命令消息,以及音频视频数据包
  • 日志中有大量MsgTypeId等于20的消息,对应16进制是0x14,也就是AMF0命令,所以这类消息的处理逻辑需要重点关注,doMsg代码中可见,处理此类消息的方法是doCommandMessage
代码语言:javascript
复制
func (s *ServerSession) doCommandMessage(stream *Stream) error {
	cmd, err := stream.msg.readStringWithType()
	if err != nil {
		return err
	}
	tid, err := stream.msg.readNumberWithType()
	if err != nil {
		return err
	}

	switch cmd {
	case "connect":
		return s.doConnect(tid, stream)
	case "createStream":
		return s.doCreateStream(tid, stream)
	case "publish":
		return s.doPublish(tid, stream)
	case "play":
		return s.doPlay(tid, stream)
	case "releaseStream":
		fallthrough
	case "FCPublish":
		fallthrough
	case "FCUnpublish":
		fallthrough
	case "getStreamLength":
		fallthrough
	case "deleteStream":
		Log.Debugf("[%s] read command message, ignore it. cmd=%s, %s", s.UniqueKey(), cmd, stream.toDebugString())
	default:
		Log.Errorf("[%s] read unknown command message. cmd=%s, %s", s.UniqueKey(), cmd, stream.toDebugString())
	}
	return nil
}
  • 各个命令的处理方法,doConnect、doCreateStream、doPublish、doPlay它们的内部都有代表自己特征的日志,因此,根据日志内容很容易梳理出推流时收到命令的顺序,如下:
代码语言:javascript
复制
connect 
-> 
releaseStream 
-> 
FCPublish 
-> 
createStream
-> 
publish
-> 
MetaDzta
-> 
然后就是音频视频的chunk包
  • 打开doConnect方法,看lalserver收到FFmpeg的connect命令后做了什么,如下,可见从connect命令的参数中拿到了appName(本例中是live),tcUrl(本例中是rtmp://127.0.0.1:1935/live),然后以命令的形式向FFmpeg连续回复消息
代码语言:javascript
复制
func (s *ServerSession) doConnect(tid int, stream *Stream) error {
	val, err := stream.msg.readObjectWithType()
	if err != nil {
		return err
	}
	s.appName, err = val.FindString("app")
	if err != nil {
		return err
	}
	s.tcUrl, err = val.FindString("tcUrl")
	if err != nil {
		Log.Warnf("[%s] tcUrl not exist.", s.UniqueKey())
	}
	Log.Infof("[%s] < R connect('%s'). tcUrl=%s", s.UniqueKey(), s.appName, s.tcUrl)

	s.observer.OnRtmpConnect(s, val)

	Log.Infof("[%s] > W Window Acknowledgement Size %d.", s.UniqueKey(), windowAcknowledgementSize)
	if err := s.packer.writeWinAckSize(s.conn, windowAcknowledgementSize); err != nil {
		return err
	}

	Log.Infof("[%s] > W Set Peer Bandwidth.", s.UniqueKey())
	if err := s.packer.writePeerBandwidth(s.conn, peerBandwidth, peerBandwidthLimitTypeDynamic); err != nil {
		return err
	}

	Log.Infof("[%s] > W SetChunkSize %d.", s.UniqueKey(), LocalChunkSize)
	if err := s.packer.writeChunkSize(s.conn, LocalChunkSize); err != nil {
		return err
	}

	Log.Infof("[%s] > W _result('NetConnection.Connect.Success').", s.UniqueKey())
	oe, err := val.FindNumber("objectEncoding")
	if oe != 0 && oe != 3 {
		oe = 0
	}
	if err := s.packer.writeConnectResult(s.conn, tid, oe); err != nil {
		return err
	}
	return nil
}
  • connect完成后是createStream命令,对应的doCreateStream方法如下,非常简单,就是立即回复,消息类型还是createStream,想想也是,流相关的信息,lal这边都已经准备好了,收到创建流的命令也无需什么操作
代码语言:javascript
复制
func (s *ServerSession) doCreateStream(tid int, stream *Stream) error {
	Log.Infof("[%s] < R createStream().", s.UniqueKey())
	Log.Infof("[%s] > W _result().", s.UniqueKey())
	if err := s.packer.writeCreateStreamResult(s.conn, tid); err != nil {
		return err
	}
	return nil
}
  • 接下来就是publish命令,代码就不贴出了,主要是获取流名、回复onStatus、设置连接的超时时间等,另外如果有观察者,还会向其发送publish相关的事件,而消费publish事件的代码也值得一看,在lal/pkg/logic/server_manager__.go#OnNewRtmpPubSession,如下,可见主要是鉴权、该流相关的Group处理,以及外部监听的通知,另外,如果配置中开启了录制功能,在group.AddRtmpPubSession方法中就会做相关的初始化操作
代码语言:javascript
复制
func (sm *ServerManager) OnNewRtmpPubSession(session *rtmp.ServerSession) error {
	sm.mutex.Lock()
	defer sm.mutex.Unlock()

	info := base.Session2PubStartInfo(session)

	// 先做simple auth鉴权
	if err := sm.option.Authentication.OnPubStart(info); err != nil {
		return err
	}

	group := sm.getOrCreateGroup(session.AppName(), session.StreamName())
	if err := group.AddRtmpPubSession(session); err != nil {
		return err
	}

	info.HasInSession = group.HasInSession()
	info.HasOutSession = group.HasOutSession()

	sm.option.NotifyHandler.OnPubStart(info)
	return nil
}
  • 接下来是Metadata类型的消息处理,该消息中包含了要推来的流的详细参数,例如宽高、码率、帧率等,对应方法是doDataMessageAmf0,该方法中最重要的是对观察者执行了通知,对应的方法在lal/pkg/logic/group__core_streaming.go#OnReadRtmpAvMsg,
  • OnReadRtmpAvMsg方法中会调用broadcastByRtmpMsg,这是个重点,里面会根据group中保存的订阅者的情况,开始发送广播,让所有订阅者都能获得媒体流的详细参数(这里面代码非常复杂,涉及到不同协议的录制、转推、拉流等),在推流的场景,其中这段代码可以重点看下
  • 等到上述命令全部响应完毕,意味着准备工作已经完成,可以静候媒体流数据的到来了

音频和视频消息的处理

  • 回到doMsg方法看看音视频消息的处理,如下图,先做个最基本的检查,然后交给lal/pkg/logic/group__core_streaming.go#OnReadRtmpAvMsg处理
  • 又是这个OnReadRtmpAvMsg方法,前面响应Metadata的时候就是它,现在处理音视频消息还是它,那阅读代码时就轻车熟路了,还是交给了broadcastByRtmpMsg方法去处理的
  • broadcastByRtmpMsg中,重点关注lal/pkg/remux/gop_cache.go#Feed,这里面会对关键帧的seqheader和内容进行缓存
  • broadcastByRtmpMsg方法中,还有本篇最重要的两处代码(个人是这么认为的)
  • 第一处,如下图,如果是一个新来的拉流请求,会将媒体流属性写到拉流端,另外就是刚才缓存的关键帧及其seqheader信息,这样的结果是拉流端刚建立连接就能拿到关键帧(快速出首帧的效果),而不用等待推流端推来的最新关键帧,毕竟一个gop时间可能很长
  • 第二段关键代码如下图,调用write2RtmpSubSessions,将本次收到的音视频消息转发
  • 展开write2RtmpSubSessions,恍然大悟了,个人认为,这就是推拉流的核心代码,将每一个收到的音视频数据,直接从拉流端的TCP连接中写进去(展开session.Write方法即可见到)
代码语言:javascript
复制
func (group *Group) write2RtmpSubSessions(b []byte) {
	for session := range group.rtmpSubSessionSet {
		if session.IsFresh || session.ShouldWaitVideoKeyFrame {
			continue
		}
		_ = session.Write(b)
	}
}

推流结束的处理

  • 如果用ctrl+c结束FFmpeg推流,lal这边会做什么处理呢?还是偷个懒,先看日志吧
代码语言:javascript
复制
2023/04/02 09:47:24.346491 ^[[22;36m INFO ^[[0mmsg header {Csid:4 MsgLen:986 MsgTypeId:8 MsgStreamId:1 TimestampAbs:9323} - server_session.go:215
2023/04/02 09:47:24.346558 ^[[22;36m INFO ^[[0mmsg header {Csid:4 MsgLen:950 MsgTypeId:8 MsgStreamId:1 TimestampAbs:9344} - server_session.go:215
2023/04/02 09:47:24.346605 ^[[22;36m INFO ^[[0mmsg header {Csid:6 MsgLen:5 MsgTypeId:9 MsgStreamId:1 TimestampAbs:9320} - server_session.go:215
2023/04/02 09:47:24.346654 ^[[22;33m WARN ^[[0m[RTMP2MPEGTS1] rtmp msg too short, ignore. header={Csid:6 MsgLen:5 MsgTypeId:9 MsgStreamId:1 TimestampAbs:9320}, payload=00000000  17 02 00 00 00                                    |.....|
 - rtmp2mpegts.go:196
2023/04/02 09:47:24.346681 ^[[22;33m WARN ^[[0mrtmp msg too short, ignore. header={Csid:6 MsgLen:5 MsgTypeId:9 MsgStreamId:1 TimestampAbs:9320}, payload=00000000  17 02 00 00 00                                    |.....|
 - rtmp2rtsp.go:102
2023/04/02 09:47:24.346958 ^[[22;36m INFO ^[[0mmsg header {Csid:3 MsgLen:34 MsgTypeId:20 MsgStreamId:0 TimestampAbs:0} - server_session.go:215
2023/04/02 09:47:24.346987 ^[[22;34mDEBUG ^[[0m[RTMPPUBSUB1] read command message, ignore it. cmd=FCUnpublish, header={Csid:3 MsgLen:34 MsgTypeId:20 MsgStreamId:0 TimestampAbs:0}, b=len(core)=128, rpos=23, wpos=34, hex=00000000  05 02 00 07 74 65 73 74  31 31 30                 |....test110|
 - server_session.go:357
2023/04/02 09:47:24.347012 ^[[22;36m INFO ^[[0mmsg header {Csid:3 MsgLen:34 MsgTypeId:20 MsgStreamId:0 TimestampAbs:0} - server_session.go:215
2023/04/02 09:47:24.347028 ^[[22;34mDEBUG ^[[0m[RTMPPUBSUB1] read command message, ignore it. cmd=deleteStream, header={Csid:3 MsgLen:34 MsgTypeId:20 MsgStreamId:0 TimestampAbs:0}, b=len(core)=128, rpos=24, wpos=34, hex=00000000  05 00 3f f0 00 00 00 00  00 00                    |..?.......|
 - server_session.go:357
2023/04/02 09:47:24.347050 ^[[22;34mDEBUG ^[[0m[NAZACONN1] close once. err=EOF - connection.go:504
2023/04/02 09:47:24.347168 ^[[22;36m INFO ^[[0m[RTMPPUBSUB1] lifecycle dispose rtmp ServerSession. err=EOF - server_session.go:538
2023/04/02 09:47:24.347183 ^[[22;34mDEBUG ^[[0m[NAZACONN1] Close. - connection.go:376
2023/04/02 09:47:24.347199 ^[[22;34mDEBUG ^[[0m[GROUP1] [RTMPPUBSUB1] del rtmp PubSession from group. - group__in.go:318
2023/04/02 09:47:24.347303 ^[[22;36m INFO ^[[0m[HLSMUXER1] lifecycle dispose hls muxer. - muxer.go:126
2023/04/02 09:47:24.570509 ^[[22;36m INFO ^[[0merase inactive group. [GROUP1] - server_manager__.go:299
2023/04/02 09:47:24.570639 ^[[22;36m INFO ^[[0m[GROUP1] lifecycle dispose group. - group__.go:207
  • 从上述日志可见,lal会收到FFmpeg发来的FCUnpublish、deleteStream等命令,但lal并不理会这些命令,如下图,而是等到TCP连接出现EOF错误的时候,由该错误出发
  • 具体的,处理TCP错误,结束推流的代码如下图所示
  • 至此,推流服务的相关源码的学习就完成了,借助lal第一次了解到rtmp推流服务的细节,基本功扎实了,接下来的学习就会事半功倍,接下来咱们去挑战另一个基础功能:rtmp拉流

你不孤单,欣宸原创一路相伴

  1. Java系列
  2. Spring系列
  3. Docker系列
  4. kubernetes系列
  5. 数据库+中间件系列
  6. DevOps系列
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2023-04-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 欢迎访问我的GitHub
  • 《Golang流媒体实战》系列的链接
  • 准备工作
  • 本篇概览
  • 处理rtmp推流请求的入口
  • 握手
  • 读取和处理chunk
  • 处理消息
  • 音频和视频消息的处理
  • 推流结束的处理
  • 你不孤单,欣宸原创一路相伴
相关产品与服务
消息队列 TDMQ
消息队列 TDMQ (Tencent Distributed Message Queue)是腾讯基于 Apache Pulsar 自研的一个云原生消息中间件系列,其中包含兼容Pulsar、RabbitMQ、RocketMQ 等协议的消息队列子产品,得益于其底层计算与存储分离的架构,TDMQ 具备良好的弹性伸缩以及故障恢复能力。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档