前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >SRS学习笔记(1)-推拉流代码阅读

SRS学习笔记(1)-推拉流代码阅读

原创
作者头像
EndevChen
发布2020-05-24 10:02:27
1.9K0
发布2020-05-24 10:02:27
举报
文章被收录于专栏:SRS学习

SRS是一个用C++开发的开源流媒体集群服务, 能够提供直播点播的功能. github链接: https://github.com/ossrs/srs, 官方架构图如下(3.0版本):

SRS3.0官方架构图
SRS3.0官方架构图

在src/main/srs_main_server.cpp中可以找到入口函数:

代码语言:javascript
复制
int main(int argc, char** argv) {
    srs_error_t err = do_main(argc, argv);
    ...
}

然后是配置适配(兼容旧的配置):

代码语言:javascript
复制
// 解析参数, 读取配置文件, 将旧版本的配置转为新的配置
if ((err = _srs_config->parse_options(argc, argv)) != srs_success) {
    return srs_error_wrap(err, "config parse options");
}

初始化日志:

代码语言:javascript
复制
srs_error_t SrsFastLog::initialize()
{
    if (_srs_config) {
        _srs_config->subscribe(this);
        ... 
}

这里有一个配置变动订阅操作_srs_config->subscribe, 该函数接收基类为ISrsReloadHandler的对象, 在配置有变动时, 会遍历所有订阅了的对象, 调用对应的实现了的方法, 比如日志实现了on_reload_log_level方法, 在日志配置有变动时会调用.

然后是注册部分接口(版本信息/RTMP转FLV/静态页入口(如配置)/FLV/MP4拉流入口):

代码语言:javascript
复制
// 注册接口
if ((err = http_server->initialize()) != srs_success) {
    return srs_error_wrap(err, "http server initialize");
}

用C后台常用的fork/kill后,到了主逻辑run_master:

代码语言:javascript
复制
srs_error_t run_master(SrsServer* svr)
{
    srs_error_t err = srs_success;

    // 初始化协程功能
    if ((err = svr->initialize_st()) != srs_success) {
        return srs_error_wrap(err, "initialize st");
    }

    // 初始化信号相关配置(创建信号管道)
    if ((err = svr->initialize_signal()) != srs_success) {
        return srs_error_wrap(err, "initialize signal");
    }

    // 获取句柄文件锁(防重复起进程)
    if ((err = svr->acquire_pid_file()) != srs_success) {
        return srs_error_wrap(err, "acquire pid file");
    }
    
    // 接口监听及处理
    if ((err = svr->listen()) != srs_success) {
        return srs_error_wrap(err, "listen");
    }
    
    // 注册信号
    if ((err = svr->register_signal()) != srs_success) {
        return srs_error_wrap(err, "register signal");
    }

    // 注册Http相关回调
    if ((err = svr->http_handle()) != srs_success) {
        return srs_error_wrap(err, "http handle");
    }

    // 注册MPEG提取(如需要)
    if ((err = svr->ingest()) != srs_success) {
        return srs_error_wrap(err, "ingest");
    }
    
    // loop: 判断信号/心跳/更新状态统计等
    if ((err = svr->cycle()) != srs_success) {
        return srs_error_wrap(err, "main cycle");
    }
    
    return err;
}

SRS是单进程的运行方式,使用协程来处理并发请求, 主逻辑在svr->listen里边:

代码语言:javascript
复制
srs_error_t SrsServer::listen()
{
    srs_error_t err = srs_success;

    // 监听RTMP
    if ((err = listen_rtmp()) != srs_success) {
        return srs_error_wrap(err, "rtmp listen");
    }

    // 监听Http的API
    if ((err = listen_http_api()) != srs_success) {
        return srs_error_wrap(err, "http api listen");
    }

    // 监听Http的拉流请求(HDS/HLS/DASH)
    if ((err = listen_http_stream()) != srs_success) {
        return srs_error_wrap(err, "http stream listen");
    }

    // 监听MPGE-TS/RTSP/FLV推流请求
    if ((err = listen_stream_caster()) != srs_success) {
        return srs_error_wrap(err, "stream caster listen");
    }
    
    // 定期清理待释放的连接
    if ((err = conn_manager->start()) != srs_success) {
        return srs_error_wrap(err, "connection manager");
    }
    
    return err;
}

这里边会分成四块来做监听:

1.rtmp: 推流拉流的主逻辑

2.http api: 查询/重载类请求

3.http stream: 自适性串流拉流逻辑

4.stream caster: 非rtmp格式推流逻辑

具体的逻辑如下:

SRS逻辑图
SRS逻辑图

主要看rtmp这块的, 层层跟进, 可以到SrsTcpListener:listen这个函数:

代码语言:javascript
复制
srs_error_t SrsTcpListener::listen()
{
    srs_error_t err = srs_success;

    // 监听端口
    if ((err = srs_tcp_listen(ip, port, &lfd)) != srs_success) {
        return srs_error_wrap(err, "listen at %s:%d", ip.c_str(), port);
    }
    
    srs_freep(trd);
    // 注册协程, 以下是SRS里边通用的框架处理逻辑
    trd = new SrsSTCoroutine("tcp", this);
    if ((err = trd->start()) != srs_success) {
        return srs_error_wrap(err, "start coroutine");
    }
    
    return err;
}

这是SRS里边很常见的模式, 用SrsSTCoroutine来注册协程, 然后start, start内部会判断是否重复调用,然后又会回调到本身的cycle函数, 所以只需要看cycle的实现即可:

代码语言:javascript
复制
srs_error_t SrsTcpListener::cycle()
{
    srs_error_t err = srs_success;
    
    while (true) {
        if ((err = trd->pull()) != srs_success) {
            return srs_error_wrap(err, "tcp listener");
        }
        ...
        if ((err = handler->on_tcp_client(fd)) != srs_success) {
            return srs_error_wrap(err, "handle fd=%d", srs_netfd_fileno(fd));
        }
    }
    
    return err;
}

cycle也是很模块化的一种实现, 都是在while循环里边, 用pull函数来判断是否出现错误了, 然后执行收包处理逻辑. 部分类的实现可能会调用do_cycle, 逻辑也和上述类似.

继续往下, 从SrsBufferListener:on_tcp_client到SrsServer:accept_client再到SrsConnection:start, 和上边一样, 直接看SrsConnection:cycle, 再到SrsRtmpConn:do_cycle进入rtmp的主逻辑:

代码语言:javascript
复制
srs_error_t SrsRtmpConn::do_cycle()
{
    ...
    // 握手
    if ((err = rtmp->handshake()) != srs_success) {
        return srs_error_wrap(err, "rtmp handshake");
    }
    ...
    // 读取请求包, 解析内容
    SrsRequest* req = info->req;
    if ((err = rtmp->connect_app(req)) != srs_success) {
        return srs_error_wrap(err, "rtmp connect tcUrl");
    }
    ...
    // 拉收推流或拉流请求
    if ((err = service_cycle()) != srs_success) {
        err = srs_error_wrap(err, "service cycle");
    }
    ...
}

先是握手(接收C0C1, 回S0S1S2, 接收C2), 设置窗口/带宽信息, 然后进入SrsRtmpConn:stream_service_cycle, 在做完请求包解析/权限校验后, 进入类型判断:

代码语言:javascript
复制
switch (info->type) {
    case SrsRtmpConnPlay: {
        // response connection start play
        // 按协议依次发送相应的包给对端
        if ((err = rtmp->start_play(info->res->stream_id)) != srs_success) {
            return srs_error_wrap(err, "rtmp: start play");
        }
        // 调用开始播放的Hook
        if ((err = http_hooks_on_play()) != srs_success) {
            return srs_error_wrap(err, "rtmp: callback on play");
        }
        // 播放
        err = playing(source);
        // 调用播放结束的Hook
        http_hooks_on_stop();
        
        return err;
    }
    case SrsRtmpConnFMLEPublish: {
        // 按标准的流程进行推流前的交互
        if ((err = rtmp->start_fmle_publish(info->res->stream_id)) != srs_success) {
            return srs_error_wrap(err, "rtmp: start FMLE publish");
        }
        // 接收推流
        return publishing(source);
    }
    case SrsRtmpConnHaivisionPublish: {
        if ((err = rtmp->start_haivision_publish(info->res->stream_id)) != srs_success) {
            return srs_error_wrap(err, "rtmp: start HAIVISION publish");
        }
        
        return publishing(source);
    }
    case SrsRtmpConnFlashPublish: {
        if ((err = rtmp->start_flash_publish(info->res->stream_id)) != srs_success) {
            return srs_error_wrap(err, "rtmp: start FLASH publish");
        }
        
        return publishing(source);
    }
    default: {
        return srs_error_new(ERROR_SYSTEM_CLIENT_INVALID, "rtmp: unknown client type=%d", info->type);
    }
}

这里有三种推流和一种拉流, 这里先看推流:

1.start_xxx_publish是按协议格式做接收推流前的交互, FMLE/Haivision/Flash的都不一样

2.publishing则是实际的接收推流的逻辑

代码语言:javascript
复制
// 执行配置的Hook
if ((err = http_hooks_on_publish()) != srs_success) {
    return srs_error_wrap(err, "rtmp: callback on publish");
}

// 接收推流前的准备逻辑, 还比较多, 比如创建目录文件/挂载到Http服务里/准备FFMPEG解码等
if ((err = acquire_publish(source)) == srs_success) {
    // use isolate thread to recv,
    // @see: https://github.com/ossrs/srs/issues/237
    // 接收推流的主要逻辑
    SrsPublishRecvThread rtrd(rtmp, req, srs_netfd_fileno(stfd), 0, this, source, _srs_context->get_id());
    err = do_publishing(source, &rtrd);
    rtrd.stop();
}

acquire_publish里边会执行流挂载逻辑, 以便http stream请求能够访问到, do_publish是实际接收推流的逻辑, 可以跟进到SrsRtmpConn:process_publish_message里边:

代码语言:javascript
复制
srs_error_t SrsRtmpConn::process_publish_message(SrsSource* source, SrsCommonMessage* msg)
{
    ...
    // process audio packet
    if (msg->header.is_audio()) {
        if ((err = source->on_audio(msg)) != srs_success) {
            return srs_error_wrap(err, "rtmp: consume audio");
        }
        return err;
    }
    // process video packet
    if (msg->header.is_video()) {
        if ((err = source->on_video(msg)) != srs_success) {
            return srs_error_wrap(err, "rtmp: consume video");
        }
        return err;
    }

根据音频或视频分别处理, 处理逻辑类似, 这里看视频的处理on_video:

代码语言:javascript
复制
if (!mix_correct) {
    return on_video_imp(&msg);
}

// insert msg to the queue.
mix_queue->push(msg.copy());

// fetch someone from mix queue.
SrsSharedPtrMessage* m = mix_queue->pop();
if (!m) {
    return err;
}

// consume the monotonically increase message.
if (m->is_audio()) {
    err = on_audio_imp(m);
} else {
    err = on_video_imp(m);
}

可以看到, 如果没有配置mix_correct, 则直接处理分发了, 如果配置了的话, 则会扔到队列里, 然后再取出时间最早的处理.

代码语言:javascript
复制
// pure video
if (nb_videos >= SRS_MIX_CORRECT_PURE_AV && nb_audios == 0) {
    mix_ok = true;
}

// pure audio
if (nb_audios >= SRS_MIX_CORRECT_PURE_AV && nb_videos == 0) {
    mix_ok = true;
}

// got 1 video and 1 audio, mix ok.
if (nb_videos >= 1 && nb_audios >= 1) {
    mix_ok = true;
}

if (!mix_ok) {
    return NULL;
}

这个队列只有纯视频或纯音频或同时有视频和音频时,才能取到数据, 取出时由于map的特性, 会按时间戳排序取出. 最后就是on_video_imp:

代码语言:javascript
复制
// Copy to hub to all utilities.
// 与音频不同, 这里是hub先处理(1. 解包, 2. 写hls/dash/dvr)
if ((err = hub->on_video(msg, is_sequence_header)) != srs_success) {
    return srs_error_wrap(err, "hub consume video");
}

// copy to all consumer
// 塞到所有消费者队列里
if (!drop_for_reduce) {
    for (int i = 0; i < (int)consumers.size(); i++) {
        SrsConsumer* consumer = consumers.at(i);
        if ((err = consumer->enqueue(msg, atc, jitter_algorithm)) != srs_success) {
            return srs_error_wrap(err, "consume video");
        }
    }
}

主要是遍历这个流下的所有消费者, 把流数据放到对应的队列里, 消费者就能看到直播了. 至于消费者列表如何来的, 前边的图也有列出:

1.通过rtmp的拉流接口来访问

2.通过http stream接口来访问

接下来看拉流的分支:

1.start_play是按RTMP协议的向请求方做确认拉流前的交互

2.http_hooks_on_play根据配置调用相应的开始时的hook

3.playing是拉流的主逻辑

4.http_hooks_on_stop根据配置调用相应的结束时的hook

playing的主逻辑如下:

代码语言:javascript
复制
// 为这个源创建一个消费者, 后续会遍历源下的消费者,进行推送
if ((err = source->create_consumer(this, consumer)) != srs_success) {
    return srs_error_wrap(err, "rtmp: create consumer");
}
SrsAutoFree(SrsConsumer, consumer);

// Use receiving thread to receive packets from peer.
// @see: https://github.com/ossrs/srs/issues/217
// 单独建一个接收队列来接收对端的控制请求
SrsQueueRecvThread trd(consumer, rtmp, SRS_PERF_MW_SLEEP, _srs_context->get_id());

if ((err = trd.start()) != srs_success) {
    return srs_error_wrap(err, "rtmp: start receive thread");
}

// Deliver packets to peer.
wakable = consumer;
err = do_playing(source, consumer, &trd);

首先往流里边挂个新的消费者, 然后单独创建队列来接收控制类消息(如暂停/开启/结束, 说性能能够提升33%), 主协程主要是不停地从队列里取控制消息或者数据包, 做相应的处理或发送给请求方:

代码语言:javascript
复制
while (true) {
        ...
        // 处理控制命令请求, 如关闭/暂停
        while (!rtrd->empty()) {
            SrsCommonMessage* msg = rtrd->pump();
            if ((err = process_play_control_msg(consumer, msg)) != srs_success) {
                return srs_error_wrap(err, "rtmp: play control message");
            }
        }
        
        ...
        // 从队列里边将数据包取出
        int count = (send_min_interval > 0)? 1 : 0;
        if ((err = consumer->dump_packets(&msgs, count)) != srs_success) {
            return srs_error_wrap(err, "rtmp: consumer dump packets");
        }
        
        ...
        // 发送数据包给对端
        if (count > 0 && (err = rtmp->send_and_free_messages(msgs.msgs, count, info->res->stream_id)) != srs_success) {
            return srs_error_wrap(err, "rtmp: send %d messages", count);
        }

至此, RTMP的推流和拉流流程, 已经都完成了.

其余几个入口的功能:

1. Http Api监听的是查询类的请求, 也有console下的文件拉取请求

2. Http Stream监听的是自适应串流的请求, 主要是在流下添加消费者来实现.

3. Http Caster监听的是非RTMP的推流请求, 通过格式转换后, 发送给自身的RTMP监听端口来实现.

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云直播
云直播(Cloud Streaming Services,CSS)为您提供极速、稳定、专业的云端直播处理服务,根据业务的不同直播场景需求,云直播提供了标准直播、快直播、云导播台三种服务,分别针对大规模实时观看、超低延时直播、便捷云端导播的场景,配合腾讯云视立方·直播 SDK,为您提供一站式的音视频直播解决方案。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档