这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos
./ffmpeg \
-re \
-stream_loop -1 \
-i ../videos/sample.mp4 \
-c copy \
-f flv \
'rtmp://127.0.0.1:1935/live/test110'
func (server *Server) RunLoop() error {
for {
conn, err := server.ln.Accept()
if err != nil {
return err
}
go server.handleTcpConnect(conn)
}
}
func (server *Server) handleTcpConnect(conn net.Conn) {
Log.Infof("accept a rtmp connection. remoteAddr=%s", conn.RemoteAddr().String())
session := NewServerSession(server, conn)
_ = session.RunLoop()
if session.DisposeByObserverFlag {
return
}
switch session.sessionStat.BaseType() {
case base.SessionBaseTypePubStr:
server.observer.OnDelRtmpPubSession(session)
case base.SessionBaseTypeSubStr:
server.observer.OnDelRtmpSubSession(session)
}
}
func (s *ServerSession) RunLoop() (err error) {
if err = s.handshake(); err != nil {
_ = s.dispose(err)
return err
}
err = s.runReadLoop()
_ = s.dispose(err)
return err
}
func (s *ServerSession) handshake() error {
if err := s.hs.ReadC0C1(s.conn); err != nil {
return err
}
Log.Infof("[%s] < R Handshake C0+C1.", s.UniqueKey())
Log.Infof("[%s] > W Handshake S0+S1+S2.", s.UniqueKey())
if err := s.hs.WriteS0S1S2(s.conn); err != nil {
return err
}
if err := s.hs.ReadC2(s.conn); err != nil {
return err
}
Log.Infof("[%s] < R Handshake C2.", s.UniqueKey())
return nil
}
func (s *ServerSession) runReadLoop() error {
return s.chunkComposer.RunLoop(s.conn, s.doMsg)
}
func (s *ServerSession) doMsg(stream *Stream) error {
if err := s.writeAcknowledgementIfNeeded(stream); err != nil {
return err
}
//log.Debugf("%d %d %v", stream.header.msgTypeId, stream.msgLen, stream.header)
switch stream.header.MsgTypeId {
case base.RtmpTypeIdWinAckSize:
return s.doWinAckSize(stream)
case base.RtmpTypeIdSetChunkSize:
// noop
// 因为底层的 chunk composer 已经处理过了,这里就不用处理
case base.RtmpTypeIdCommandMessageAmf0:
return s.doCommandMessage(stream)
case base.RtmpTypeIdCommandMessageAmf3:
return s.doCommandAmf3Message(stream)
case base.RtmpTypeIdMetadata:
return s.doDataMessageAmf0(stream)
case base.RtmpTypeIdAck:
return s.doAck(stream)
case base.RtmpTypeIdUserControl:
s.doUserControl(stream)
case base.RtmpTypeIdAudio:
fallthrough
case base.RtmpTypeIdVideo:
if s.sessionStat.BaseType() != base.SessionBaseTypePubStr {
return nazaerrors.Wrap(base.ErrRtmpUnexpectedMsg)
}
s.avObserver.OnReadRtmpAvMsg(stream.toAvMsg())
default:
Log.Warnf("[%s] read unknown message. typeid=%d, %s", s.UniqueKey(), stream.header.MsgTypeId, stream.toDebugString())
}
return nil
}
msg header {Csid:3 MsgLen:139 MsgTypeId:20 MsgStreamId:0 TimestampAbs:0} - server_session.go:215
msg header {Csid:2 MsgLen:4 MsgTypeId:1 MsgStreamId:0 TimestampAbs:0} - server_session.go:215
msg header {Csid:3 MsgLen:36 MsgTypeId:20 MsgStreamId:0 TimestampAbs:0} - server_session.go:215
msg header {Csid:3 MsgLen:32 MsgTypeId:20 MsgStreamId:0 TimestampAbs:0} - server_session.go:215
msg header {Csid:3 MsgLen:25 MsgTypeId:20 MsgStreamId:0 TimestampAbs:0} - server_session.go:215
msg header {Csid:8 MsgLen:37 MsgTypeId:20 MsgStreamId:1 TimestampAbs:0} - server_session.go:215
msg header {Csid:4 MsgLen:388 MsgTypeId:18 MsgStreamId:1 TimestampAbs:0} - server_session.go:215
msg header {Csid:6 MsgLen:43 MsgTypeId:9 MsgStreamId:1 TimestampAbs:0} - server_session.go:215
msg header {Csid:4 MsgLen:4 MsgTypeId:8 MsgStreamId:1 TimestampAbs:0} - server_session.go:215
msg header {Csid:6 MsgLen:105227 MsgTypeId:9 MsgStreamId:1 TimestampAbs:0} - server_session.go:215
msg header {Csid:4 MsgLen:969 MsgTypeId:8 MsgStreamId:1 TimestampAbs:0} - server_session.go:215
msg header {Csid:4 MsgLen:1013 MsgTypeId:8 MsgStreamId:1 TimestampAbs:21} - server_session.go:215
msg header {Csid:6 MsgLen:1559 MsgTypeId:9 MsgStreamId:1 TimestampAbs:40} - server_session.go:215
msg header {Csid:4 MsgLen:1028 MsgTypeId:8 MsgStreamId:1 TimestampAbs:43} - server_session.go:215
msg header {Csid:4 MsgLen:1032 MsgTypeId:8 MsgStreamId:1 TimestampAbs:64} - server_session.go:215
msg header {Csid:6 MsgLen:2158 MsgTypeId:9 MsgStreamId:1 TimestampAbs:80} - server_session.go:215
msg header {Csid:4 MsgLen:992 MsgTypeId:8 MsgStreamId:1 TimestampAbs:85} - server_session.go:215
msg header {Csid:4 MsgLen:960 MsgTypeId:8 MsgStreamId:1 TimestampAbs:107} - server_session.go:215
msg header {Csid:6 MsgLen:2213 MsgTypeId:9 MsgStreamId:1 TimestampAbs:120} - server_session.go:215
msg header {Csid:4 MsgLen:975 MsgTypeId:8 MsgStreamId:1 TimestampAbs:128} - server_session.go:215
msg header {Csid:4 MsgLen:991 MsgTypeId:8 MsgStreamId:1 TimestampAbs:149} - server_session.go:215
msg header {Csid:6 MsgLen:2528 MsgTypeId:9 MsgStreamId:1 TimestampAbs:160} - server_session.go:215
msg header {Csid:4 MsgLen:1011 MsgTypeId:8 MsgStreamId:1 TimestampAbs:171} - server_session.go:215
msg header {Csid:4 MsgLen:1002 MsgTypeId:8 MsgStreamId:1 TimestampAbs:192} - server_session.go:215
func (s *ServerSession) doCommandMessage(stream *Stream) error {
cmd, err := stream.msg.readStringWithType()
if err != nil {
return err
}
tid, err := stream.msg.readNumberWithType()
if err != nil {
return err
}
switch cmd {
case "connect":
return s.doConnect(tid, stream)
case "createStream":
return s.doCreateStream(tid, stream)
case "publish":
return s.doPublish(tid, stream)
case "play":
return s.doPlay(tid, stream)
case "releaseStream":
fallthrough
case "FCPublish":
fallthrough
case "FCUnpublish":
fallthrough
case "getStreamLength":
fallthrough
case "deleteStream":
Log.Debugf("[%s] read command message, ignore it. cmd=%s, %s", s.UniqueKey(), cmd, stream.toDebugString())
default:
Log.Errorf("[%s] read unknown command message. cmd=%s, %s", s.UniqueKey(), cmd, stream.toDebugString())
}
return nil
}
connect
->
releaseStream
->
FCPublish
->
createStream
->
publish
->
MetaDzta
->
然后就是音频视频的chunk包
func (s *ServerSession) doConnect(tid int, stream *Stream) error {
val, err := stream.msg.readObjectWithType()
if err != nil {
return err
}
s.appName, err = val.FindString("app")
if err != nil {
return err
}
s.tcUrl, err = val.FindString("tcUrl")
if err != nil {
Log.Warnf("[%s] tcUrl not exist.", s.UniqueKey())
}
Log.Infof("[%s] < R connect('%s'). tcUrl=%s", s.UniqueKey(), s.appName, s.tcUrl)
s.observer.OnRtmpConnect(s, val)
Log.Infof("[%s] > W Window Acknowledgement Size %d.", s.UniqueKey(), windowAcknowledgementSize)
if err := s.packer.writeWinAckSize(s.conn, windowAcknowledgementSize); err != nil {
return err
}
Log.Infof("[%s] > W Set Peer Bandwidth.", s.UniqueKey())
if err := s.packer.writePeerBandwidth(s.conn, peerBandwidth, peerBandwidthLimitTypeDynamic); err != nil {
return err
}
Log.Infof("[%s] > W SetChunkSize %d.", s.UniqueKey(), LocalChunkSize)
if err := s.packer.writeChunkSize(s.conn, LocalChunkSize); err != nil {
return err
}
Log.Infof("[%s] > W _result('NetConnection.Connect.Success').", s.UniqueKey())
oe, err := val.FindNumber("objectEncoding")
if oe != 0 && oe != 3 {
oe = 0
}
if err := s.packer.writeConnectResult(s.conn, tid, oe); err != nil {
return err
}
return nil
}
func (s *ServerSession) doCreateStream(tid int, stream *Stream) error {
Log.Infof("[%s] < R createStream().", s.UniqueKey())
Log.Infof("[%s] > W _result().", s.UniqueKey())
if err := s.packer.writeCreateStreamResult(s.conn, tid); err != nil {
return err
}
return nil
}
func (sm *ServerManager) OnNewRtmpPubSession(session *rtmp.ServerSession) error {
sm.mutex.Lock()
defer sm.mutex.Unlock()
info := base.Session2PubStartInfo(session)
// 先做simple auth鉴权
if err := sm.option.Authentication.OnPubStart(info); err != nil {
return err
}
group := sm.getOrCreateGroup(session.AppName(), session.StreamName())
if err := group.AddRtmpPubSession(session); err != nil {
return err
}
info.HasInSession = group.HasInSession()
info.HasOutSession = group.HasOutSession()
sm.option.NotifyHandler.OnPubStart(info)
return nil
}
func (group *Group) write2RtmpSubSessions(b []byte) {
for session := range group.rtmpSubSessionSet {
if session.IsFresh || session.ShouldWaitVideoKeyFrame {
continue
}
_ = session.Write(b)
}
}
2023/04/02 09:47:24.346491 ^[[22;36m INFO ^[[0mmsg header {Csid:4 MsgLen:986 MsgTypeId:8 MsgStreamId:1 TimestampAbs:9323} - server_session.go:215
2023/04/02 09:47:24.346558 ^[[22;36m INFO ^[[0mmsg header {Csid:4 MsgLen:950 MsgTypeId:8 MsgStreamId:1 TimestampAbs:9344} - server_session.go:215
2023/04/02 09:47:24.346605 ^[[22;36m INFO ^[[0mmsg header {Csid:6 MsgLen:5 MsgTypeId:9 MsgStreamId:1 TimestampAbs:9320} - server_session.go:215
2023/04/02 09:47:24.346654 ^[[22;33m WARN ^[[0m[RTMP2MPEGTS1] rtmp msg too short, ignore. header={Csid:6 MsgLen:5 MsgTypeId:9 MsgStreamId:1 TimestampAbs:9320}, payload=00000000 17 02 00 00 00 |.....|
- rtmp2mpegts.go:196
2023/04/02 09:47:24.346681 ^[[22;33m WARN ^[[0mrtmp msg too short, ignore. header={Csid:6 MsgLen:5 MsgTypeId:9 MsgStreamId:1 TimestampAbs:9320}, payload=00000000 17 02 00 00 00 |.....|
- rtmp2rtsp.go:102
2023/04/02 09:47:24.346958 ^[[22;36m INFO ^[[0mmsg header {Csid:3 MsgLen:34 MsgTypeId:20 MsgStreamId:0 TimestampAbs:0} - server_session.go:215
2023/04/02 09:47:24.346987 ^[[22;34mDEBUG ^[[0m[RTMPPUBSUB1] read command message, ignore it. cmd=FCUnpublish, header={Csid:3 MsgLen:34 MsgTypeId:20 MsgStreamId:0 TimestampAbs:0}, b=len(core)=128, rpos=23, wpos=34, hex=00000000 05 02 00 07 74 65 73 74 31 31 30 |....test110|
- server_session.go:357
2023/04/02 09:47:24.347012 ^[[22;36m INFO ^[[0mmsg header {Csid:3 MsgLen:34 MsgTypeId:20 MsgStreamId:0 TimestampAbs:0} - server_session.go:215
2023/04/02 09:47:24.347028 ^[[22;34mDEBUG ^[[0m[RTMPPUBSUB1] read command message, ignore it. cmd=deleteStream, header={Csid:3 MsgLen:34 MsgTypeId:20 MsgStreamId:0 TimestampAbs:0}, b=len(core)=128, rpos=24, wpos=34, hex=00000000 05 00 3f f0 00 00 00 00 00 00 |..?.......|
- server_session.go:357
2023/04/02 09:47:24.347050 ^[[22;34mDEBUG ^[[0m[NAZACONN1] close once. err=EOF - connection.go:504
2023/04/02 09:47:24.347168 ^[[22;36m INFO ^[[0m[RTMPPUBSUB1] lifecycle dispose rtmp ServerSession. err=EOF - server_session.go:538
2023/04/02 09:47:24.347183 ^[[22;34mDEBUG ^[[0m[NAZACONN1] Close. - connection.go:376
2023/04/02 09:47:24.347199 ^[[22;34mDEBUG ^[[0m[GROUP1] [RTMPPUBSUB1] del rtmp PubSession from group. - group__in.go:318
2023/04/02 09:47:24.347303 ^[[22;36m INFO ^[[0m[HLSMUXER1] lifecycle dispose hls muxer. - muxer.go:126
2023/04/02 09:47:24.570509 ^[[22;36m INFO ^[[0merase inactive group. [GROUP1] - server_manager__.go:299
2023/04/02 09:47:24.570639 ^[[22;36m INFO ^[[0m[GROUP1] lifecycle dispose group. - group__.go:207