这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos
server_session.go#doCommandMessage
->
doPublish
->
server.go#OnNewRtmpPubSession
->
server_manager__.go#OnNewRtmpPubSession
->
group__in.go#AddRtmpPubSession
->
addIn
group.rtmp2MpegtsRemuxer = remux.NewRtmp2MpegtsRemuxer(group)
// # mpegts remuxer
if group.rtmp2MpegtsRemuxer != nil {
group.rtmp2MpegtsRemuxer.FeedRtmpMessage(msg)
}
rtmp2mpegts.go#FeedRtmpMessage
->
rtmp2mpegts_filter_.go#Push
->
rtmp2mpegts.go#onPop
->
feedVideo (这段代码比较复杂,值得细看)
->
onFrame
->
muxer.go#OnTsPackets
->
FeedMpegts
->
fragment.go#WriteFile
func (q *rtmp2MpegtsFilter) Push(msg base.RtmpMsg) {
// q.done是个标志,一旦等于true,今后收到的消息都直接给观察者,
// 但是等于true之前,收到的消息都放在切片中缓存起来,
// 如果从消息中成功取得音频和视频的codecID,就在drain方法中把标准设置为true
if q.done {
q.observer.onPop(msg)
return
}
// 将数据缓存到q.data
q.data = append(q.data, msg.Clone())
// 如果是音频消息或者视频消息,就可以得到对应的codecID
switch msg.Header.MsgTypeId {
case base.RtmpTypeIdAudio:
q.audioCodecId = int(msg.Payload[0] >> 4)
case base.RtmpTypeIdVideo:
q.videoCodecId = int(msg.Payload[0] & 0xF)
}
// 一旦音频和视频的codecID都搜集到了,就执行drain,
if q.videoCodecId != -1 && q.audioCodecId != -1 {
q.drain()
return
}
// 缓存存不下的时候也会执行drain
if len(q.data) >= q.maxMsgSize {
q.drain()
return
}
}
func (q *rtmp2MpegtsFilter) drain() {
// 根据当前视频的codecId,确定ts文件的PAT,PMT格式
switch q.videoCodecId {
case int(base.RtmpCodecIdAvc):
q.observer.onPatPmt(mpegts.FixedFragmentHeader)
case int(base.RtmpCodecIdHevc):
q.observer.onPatPmt(mpegts.FixedFragmentHeaderHevc)
default:
// TODO(chef) 正确处理只有音频或只有视频的情况 #56
q.observer.onPatPmt(mpegts.FixedFragmentHeader)
}
// 将缓存的所有消息输出给观察者
for i := range q.data {
q.observer.onPop(q.data[i])
}
q.data = nil
q.done = true
}
func (group *Group) OnPatPmt(b []byte) {
group.patpmt = b
if group.hlsMuxer != nil {
group.hlsMuxer.FeedPatPmt(b)
}
if group.recordMpegts != nil {
if err := group.recordMpegts.Write(b); err != nil {
Log.Errorf("[%s] record mpegts write fragment header error. err=%+v", group.UniqueKey, err)
}
}
}
func (s *Rtmp2MpegtsRemuxer) onPop(msg base.RtmpMsg) {
switch msg.Header.MsgTypeId {
case base.RtmpTypeIdAudio:
s.feedAudio(msg)
case base.RtmpTypeIdVideo:
s.feedVideo(msg)
}
}
func (s *Rtmp2MpegtsRemuxer) onFrame(frame *mpegts.Frame) {
s.adjustDtsPts(frame)
//Log.Debugf("Rtmp2MpegtsRemuxer::onFrame, frame=%s", frame.DebugString())
var boundary bool
if frame.Sid == mpegts.StreamIdAudio {
// 为了考虑没有视频的情况也能切片,所以这里判断spspps为空时,也建议生成fragment
boundary = !s.videoSeqHeaderCached()
} else {
// 收到视频,可能触发建立fragment的条件是:
// 关键帧数据 &&
// (
// (没有收到过音频seq header) || 说明 只有视频
// (收到过音频seq header && fragment没有打开) || 说明 音视频都有,且都已ready
// (收到过音频seq header && fragment已经打开 && 音频缓存数据不为空) 说明 为什么音频缓存需不为空?
// )
boundary = frame.Key && (!s.audioSeqHeaderCached() || !s.opened || !s.audioCacheEmpty())
}
if boundary {
s.opened = true
}
packets := frame.Pack()
s.observer.OnTsPackets(packets, frame, boundary)
}
func (m *Muxer) FeedMpegts(tsPackets []byte, frame *mpegts.Frame, boundary bool) {
//Log.Debugf("> FeedMpegts. boundary=%v, frame=%p, sid=%d", boundary, frame, frame.Sid)
if frame.Sid == mpegts.StreamIdAudio {
// TODO(chef): 为什么音频用pts,视频用dts
if err := m.updateFragment(frame.Pts, boundary, frame); err != nil {
Log.Errorf("[%s] update fragment error. err=%+v", m.UniqueKey, err)
return
}
if !m.opened {
Log.Warnf("[%s] FeedMpegts A not opened. boundary=%t", m.UniqueKey, boundary)
return
}
//Log.Debugf("[%s] WriteFrame A. dts=%d, len=%d", m.UniqueKey, frame.DTS, len(frame.Raw))
} else {
if err := m.updateFragment(frame.Dts, boundary, frame); err != nil {
Log.Errorf("[%s] update fragment error. err=%+v", m.UniqueKey, err)
return
}
if !m.opened {
// 走到这,可能是第一个包并且boundary为false
Log.Warnf("[%s] FeedMpegts V not opened. boundary=%t, key=%t", m.UniqueKey, boundary, frame.Key)
return
}
//Log.Debugf("[%s] WriteFrame V. dts=%d, len=%d", m.UniqueKey, frame.Dts, len(frame.Raw))
}
if err := m.fragment.WriteFile(tsPackets); err != nil {
Log.Errorf("[%s] fragment write error. err=%+v", m.UniqueKey, err)
return
}
}
func (m *Muxer) updateFragment(ts uint64, boundary bool, frame *mpegts.Frame) error {
discont := true
// 如果已经有TS切片,检查是否需要强制开启新的切片,以及切片是否发生跳跃
// 注意,音频和视频是在一起检查的
if m.opened {
f := m.getCurrFrag()
// 以下情况,强制开启新的分片:
// 1. 当前时间戳 - 当前分片的初始时间戳 > 配置中单个ts分片时长的10倍
// 原因可能是:
// 1. 当前包的时间戳发生了大的跳跃
// 2. 一直没有I帧导致没有合适的时间重新切片,堆积的包达到阈值
// 2. 往回跳跃超过了阈值
//
maxfraglen := uint64(m.config.FragmentDurationMs * 90 * 10)
if (ts > m.fragTs && ts-m.fragTs > maxfraglen) || (m.fragTs > ts && m.fragTs-ts > negMaxfraglen) {
Log.Warnf("[%s] force fragment split. fragTs=%d, ts=%d, frame=%s", m.UniqueKey, m.fragTs, ts, frame.DebugString())
if err := m.closeFragment(false); err != nil {
return err
}
if err := m.openFragment(ts, true); err != nil {
return err
}
}
// 更新当前分片的时间长度
//
// TODO chef:
// f.duration(也即写入m3u8中记录分片时间长度)的做法我觉得有问题
// 此处用最新收到的数据更新f.duration
// 但是假设fragment翻滚,数据可能是写入下一个分片中
// 是否就导致了f.duration和实际分片时间长度不一致
if ts > m.fragTs {
duration := float64(ts-m.fragTs) / 90000
if duration > f.duration {
f.duration = duration
}
}
discont = false
// 已经有TS切片,切片时长没有达到设置的阈值,则不开启新的切片
if f.duration < float64(m.config.FragmentDurationMs)/1000 {
return nil
}
}
// 开启新的fragment
// 此时的情况是,上层认为是合适的开启分片的时机(比如是I帧),并且
// 1. 当前是第一个分片
// 2. 当前不是第一个分片,但是上一个分片已经达到配置时长
if boundary {
if err := m.closeFragment(false); err != nil {
return err
}
if err := m.openFragment(ts, discont); err != nil {
return err
}
}
return nil
}
func (m *Muxer) writePlaylist(isLast bool) {
// 找出时长最长的fragment
maxFrag := float64(m.config.FragmentDurationMs) / 1000
m.iterateFragsInPlaylist(func(frag *fragmentInfo) {
if frag.duration > maxFrag {
maxFrag = frag.duration + 0.5
}
})
// TODO chef 优化这块buffer的构造
var buf bytes.Buffer
buf.WriteString("#EXTM3U\n")
buf.WriteString("#EXT-X-VERSION:3\n")
buf.WriteString("#EXT-X-ALLOW-CACHE:NO\n")
buf.WriteString(fmt.Sprintf("#EXT-X-TARGETDURATION:%d\n", int(maxFrag)))
buf.WriteString(fmt.Sprintf("#EXT-X-MEDIA-SEQUENCE:%d\n\n", m.extXMediaSeq()))
m.iterateFragsInPlaylist(func(frag *fragmentInfo) {
if frag.discont {
buf.WriteString("#EXT-X-DISCONTINUITY\n")
}
buf.WriteString(fmt.Sprintf("#EXTINF:%.3f,\n%s\n", frag.duration, frag.filename))
})
if isLast {
buf.WriteString("#EXT-X-ENDLIST\n")
}
if err := writeM3u8File(buf.Bytes(), m.playlistFilename, m.playlistFilenameBak); err != nil {
Log.Errorf("[%s] write live m3u8 file error. err=%+v", m.UniqueKey, err)
}
}
func (m *Muxer) openFragment(ts uint64, discont bool) error {
if m.opened {
return nazaerrors.Wrap(base.ErrHls)
}
id := m.getFragmentId()
filename := PathStrategy.GetTsFileName(m.streamName, id, int(Clock.Now().UnixNano()/1e6))
filenameWithPath := PathStrategy.GetTsFileNameWithPath(m.outPath, filename)
if err := m.fragment.OpenFile(filenameWithPath); err != nil {
return err
}
if err := m.fragment.WriteFile(m.patpmt); err != nil {
return err
}
m.opened = true
frag := m.getCurrFrag()
frag.discont = discont
frag.id = id
frag.filename = filename
frag.duration = 0
m.fragTs = ts
// nrm said: start fragment with audio to make iPhone happy
m.observer.OnFragmentOpen()
m.observer.OnHlsMakeTs(base.HlsMakeTsInfo{
Event: "open",
StreamName: m.streamName,
Cwd: base.GetWd(),
TsFile: filenameWithPath,
LiveM3u8File: m.playlistFilename,
RecordM3u8File: m.recordPlayListFilename,
Id: id,
Duration: frag.duration,
})
return nil
}
func (*DefaultPathStrategy) GetTsFileName(streamName string, index int, timestamp int) string {
return fmt.Sprintf("%s-%d-%d.ts", streamName, timestamp, index)
}
main()
->
logic.go#NewLalServer
->
server_manager__.go#NewServerManager
->
hls/server_handler.go#NewServerHandler
if err := addMux(sm.config.HlsConfig.CommonHttpServerConfig, sm.serveHls, "hls"); err != nil {
return err
}
func (sm *ServerManager) serveHls(writer http.ResponseWriter, req *http.Request) {
urlCtx, err := base.ParseUrl(base.ParseHttpRequest(req), 80)
if err != nil {
Log.Errorf("parse url. err=%+v", err)
return
}
if urlCtx.GetFileType() == "m3u8" {
// TODO(chef): [refactor] 需要整理,这里使用 hls.PathStrategy 不太好 202207
streamName := hls.PathStrategy.GetRequestInfo(urlCtx, sm.config.HlsConfig.OutPath).StreamName
if err = sm.option.Authentication.OnHls(streamName, urlCtx.RawQuery); err != nil {
Log.Errorf("simple auth failed. err=%+v", err)
return
}
}
sm.hlsServerHandler.ServeHTTP(writer, req)
}
server_handler.go#ServeHTTP
->
ServeHTTPWithUrlCtx
// 根据请求信息生成读取TS或者M3U8文件的关键参数,例如流名和文件路径
ri := PathStrategy.GetRequestInfo(urlCtx, s.outPath)
//Log.Debugf("%+v", ri)
// 合法性检查
if filename == "" || (filetype != "m3u8" && filetype != "ts") || ri.StreamName == "" || ri.FileNameWithPath == "" {
err = errors.New(fmt.Sprintf("invalid hls request. url=%+v, request=%+v", urlCtx, ri))
Log.Warnf(err.Error())
resp.WriteHeader(http.StatusFound)
return
}
// 抽象过的读取文件操作,放入二进制切片,
// 具体的读取操作有两种:从磁盘读取或者从内存读取,这取决于配置的是写入磁盘还是内存
content, _err := ReadFile(ri.FileNameWithPath)
if _err != nil {
err = errors.New(fmt.Sprintf("read hls file failed. request=%+v, err=%+v", ri, _err))
Log.Warnf(err.Error())
resp.WriteHeader(http.StatusNotFound)
return
}
// 根据文件类型不同,设置不同的响应header
switch filetype {
case "m3u8":
resp.Header().Add("Content-Type", "application/x-mpegurl")
resp.Header().Add("Server", base.LalHlsM3u8Server)
// 给ts文件都携带上session_id字段
if sessionIdHash != "" {
content = bytes.ReplaceAll(content, []byte(".ts"), []byte(".ts?session_id="+sessionIdHash))
}
case "ts":
resp.Header().Add("Content-Type", "video/mp2t")
resp.Header().Add("Server", base.LalHlsTsServer)
}
resp.Header().Add("Cache-Control", "no-cache")
resp.Header().Add("Access-Control-Allow-Origin", "*")
if sessionIdHash != "" {
session := s.getSubSession(sessionIdHash)
if session != nil {
session.AddWroteBytesSum(uint64(len(content)))
}
}
// 响应
_, _ = resp.Write(content)
return