❝本系列每篇文章都是从一些实际的 case 出发,分析一些生产环境中经常会遇到的问题,抛砖引玉,以帮助小伙伴们解决一些实际问题。本篇文章是「直播实时数据建设」系列的第三篇,主要介绍直播间生产侧指标的建设过程,如果对小伙伴有帮助的话,欢迎点赞 + 再看~ ❞
本文是「直播实时数据建设」系列的第三篇,主要介绍「生产侧指标的建设」,比如当前正在直播直播间数,或者主播数等。在介绍生产侧指标的建设过程之前,先回顾下上一节的「架构」图。
架构
而本篇要介绍的「生产侧指标」的数据链路主要对应以下几个模块。
图中「标红」模块为生产侧指标的数据链路涉及到的模块。用另一张图进行了标注。
生产侧架构
其中直播间实时画像维表的介绍已经在上节进行了介绍,感兴趣的话可以点击以下链接,跳转到上节进行阅读~
生产实践 | Flink + 直播(二)| 如何建设实时公共画像维表?
本小节就不针对「生产侧指标的建设」中所有涉及指标的建设过程进行详细介绍了,主要以「当前分钟正在开播直播间数」作为「生产侧指标建设」的一个代表性案例,介绍这个指标的整个建设过程。来为大家还原生产侧指标的业务过程以及技术方案。
仍然从几个问题入手,介绍「当前分钟正在开播直播间数」的建设过程。
当前分钟正在开播直播间数,其定义就是整个平台中,当前分钟正在开播的直播间数 + 单层维度下钻的当前分钟正在开播的直播间数。
举例:
现在的时间点是 2020-11-11 12:42,当前分钟直播的直播间数为 3000 个(其中根据平台维度下钻得到:IOS 平台为 1500,安卓平台为 1500)
到了 12:43 时,有 200 个直播间进行了关播(其中 100 个为 IOS,100 个为安卓),有 100 个直播间开播(全部为 IOS),则当前分钟正在直播的直播间数为 2900(根据平台维度下钻得到:IOS 平台为 1500,安卓平台为 1400)。
其中 2020-11-11 12:42 的 3000 以及 2020-11-11 12:43 的 2900 以及按照平台下钻的数值就为当前时间正在开播的直播间数,也就是最终产出的结果。
根据上述定义和分析,可以明确整个过程中涉及到的数据源和数据汇的 schema 信息。
数据源 schema 如下。
字段 | 备注 |
---|---|
live_stream_id | 直播间 id |
author_id | 主播 id |
start_or_end | 开播还是关播 |
timestamp | 时间戳 |
... | ... |
根据整体处理过程以及最终想要获取的结果,将数据汇 schema 信息确定如下。
字段 | 备注 |
---|---|
timestamp | 时间戳,汇总到分钟粒度 |
metric_name | 指标名,举例:开播直播间数 |
metric_value | 指标值,举例:3000(开播直播间数) |
dim_name | 维度名,举例:平台,版本 |
dim_value | 维度值,举例:IOS,8.1 |
... | ... |
❝Notes: 「metric_name 和 metric_value」: 这两个字段是为了之后进行指标扩充时进行的设计。比如后续如果需要加入开播主播数,开播时长等指标,不用修改数据汇 schema,只需要加一种 metric_name,就可以使用原有 schema 进行数据产出。 「dim_name 和 dim_value」: 目前建设的指标只提供了进行单维度下钻的能力,所以设计了 dim_name 和 dim_value 两个字段,可满足用户查看平台为 IOS 的当前开播直播间数或者使用开播软件版本为 8.1 的当前开播直播间数。如果后续业务场景需要多维下钻能力,可以在字段上面进行扩充。或者也可以提供明细数据在 OLAP 中进行多维下钻。 ❞
对于当前分钟正在开播直播间数来说,其计算方式很简单,就是下面这个数学公式:
「当前分钟正在开播直播间数」 = 「上一分钟正在开播直播间数」 + 「当前分钟开播直播间数」 - 「当前分钟关播直播间数」
目前我们已经有了计算的公式,那么就可以详细分析下指标的计算处理逻辑。并且还可以获取到另一个信息,对于当前分钟正在开播直播间数的计算来说,是依赖上下文信息的,即「上一分钟正在开播直播间数」,这也就是「状态」。
首先介绍指标处理逻辑。
从获取到数据源,到产出指标的整体处理逻辑如下图所示。这里就不使用文字进行赘述了。
数据流转
其中标为「粉色」的模块为任务中的「状态」,即任务中一直存储的当前分钟正在开播直播间数。
既然涉及到了状态,那么这里就展开介绍一下我对「状态」的理解。如有错误,请在文末讨论中进行指出,我会和大家讨论。
状态其实就是一个记录上下文信息的东西,如果当前的计算过程依赖到上次计算的结果,那么上次计算的结果就是状态。举几个?;
生活中随处可见「状态」,即使不是程序员,我相信也都可以理解「状态」的概念。
实现方式举例如下。
public class LiveStreamRealtimeMetricProdProcessorJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<SourceModel> source = SourceFactory.getSourceDataStream(...);
DataStream<SinkModel> result = source
.keyBy(new KeySelector<SourceModel, Long>() {
@Override
public Long getKey(SourceModel commonModel) throws Exception {
return commonModel.getLiveStreamId() % 1000;
}
})
.timeWindow(Time.seconds(60))
.process(new ProcessWindowFunction<SourceModel, SinkModel, Long, TimeWindow>() {
private ValueState<Long> playingLiveStreamNumberValueState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.playingLiveStreamNumberValueState = getRuntimeContext().getState(...);
}
@Override
public void process(Long bucket, Context context, Iterable<SourceModel> iterable,
Collector<SinkModel> collector) throws Exception {
Long playingLiveStreamNumber = this.playingLiveStreamNumberValueState.value();
if (null == playingLiveStreamNumber) {
playingLiveStreamNumber = 0L;
}
List<SourceModel> sourceModels = (List<SourceModel>) iterable;
for (SourceModel sourceModel : sourceModels) {
if (BizType.I == sourceModel.getBizType()) {
playingLiveStreamNumber++;
} else {
playingLiveStreamNumber--;
}
}
this.playingLiveStreamNumberValueState.update(playingLiveStreamNumber);
collector.collect(
SinkModel.builder().build()
);
}
});
SinkFactory.setSinkDataStream(...);
env.execute();
}
@Data
@Builder
static class SourceModel {
// 直播间id
private Long liveStreamId;
// 开播时间,关播时间
private Long time;
// 主播id
private Long authorId;
// binlog 时间戳
private long binlogTimestamp;
// 开播,关播
private BizType bizType;
}
enum BizType {
I, // 开播
D, // 关播
;
}
@Data
@Builder
static class SinkModel {
// 时间戳,汇总到分钟粒度
private Long timestamp;
// 指标名
private String metricName;
// 指标值
private double metricValue;
// 维度名
private String dimName;
// 维度值
private String dimValue;
}
}
本文衔接上文,主要介绍直播间「生产侧指标的建设」,以「当前分钟正在开播直播间数」为代表举例。提出定义以及建设过程相关的问题,以这两个个问题出发,引出了以下两小节。
第一节简单介绍了当前分钟正在开播直播间数的定义。
第二节主要介绍了当前分钟正在开播直播间数的建设逻辑以及过程,并对「状态」这个概念进行了一个拓展介绍。
最后一节对本文进行了总结。
如果你也有相同的指标建设需求,或者存在一些指标建设过程中的问题,欢迎关注博主公众号,或者添加博主微信,互相交流~