专栏首页share ai happinessFlink SQL 实时计算UV指标

Flink SQL 实时计算UV指标

用一个接地气的案例来介绍如何实时计算 UV 数据。大家都知道,在 ToC 的互联网公司,UV 是一个很重要的指标,对于老板、商务、运营的及时决策会产生很大的影响,笔者在电商公司,目前主要的工作就是计算 UV、销售等各类实时数据,体验就特别深刻, 因此就用一个简单demo 演示如何用 Flink SQL 消费 Kafka 中的 PV 数据,实时计算出 UV 指标后写入 Hbase。

Kafka 源数据解析输入标题

PV 数据来源于埋点数据经 FileBeat 上报清洗后,以 ProtoBuffer 格式写入下游 Kafka,消费时第一步要先反序列化 PB 格式的数据为 Flink 能识别的 Row 类型,因此也就需要自定义实现 DeserializationSchema 接口,具体如下代码, 这里只抽取计算用到的 PV 的 mid、事件时间 time_local,并从其解析得到 log_date 字段:

public class PageViewDeserializationSchema implements DeserializationSchema<Row> {

    public static final Logger LOG = LoggerFactory.getLogger(PageViewDeserializationSchema.class);
    protected SimpleDateFormat dayFormatter;

    private final RowTypeInfo rowTypeInfo;

    public PageViewDeserializationSchema(RowTypeInfo rowTypeInfo){
        dayFormatter = new SimpleDateFormat("yyyyMMdd", Locale.UK);
        this.rowTypeInfo = rowTypeInfo;
    }
    @Override
    public Row deserialize(byte[] message) throws IOException {
        Row row = new Row(rowTypeInfo.getArity());
        MobilePage mobilePage = null;
        try {
            mobilePage = MobilePage.parseFrom(message);
            String mid = mobilePage.getMid();
            row.setField(0, mid);
            Long timeLocal = mobilePage.getTimeLocal();
            String logDate = dayFormatter.format(timeLocal);
            row.setField(1, logDate);
            row.setField(2, timeLocal);
        }catch (Exception e){
            String mobilePageError = (mobilePage != null) ? mobilePage.toString() : "";
            LOG.error("error parse bytes payload is {}, pageview error is {}", message.toString(), mobilePageError, e);
        }
        return null;
    }

编写 Flink Job 主程序输入标题

将 PV 数据解析为 Flink 的 Row 类型后,接下来就很简单了,编写主函数,写 SQL 就能统计 UV 指标了,代码如下:

public class RealtimeUV {

    public static void main(String[] args) throws Exception {
        //step1 从properties配置文件中解析出需要的Kakfa、Hbase配置信息、checkpoint参数信息
        Map<String, String> config = PropertiesUtil.loadConfFromFile(args[0]);
        String topic = config.get("source.kafka.topic");
        String groupId = config.get("source.group.id");
        String sourceBootStrapServers = config.get("source.bootstrap.servers");
        String hbaseTable = config.get("hbase.table.name");
        String hbaseZkQuorum = config.get("hbase.zk.quorum");
        String hbaseZkParent = config.get("hbase.zk.parent");
        int checkPointPeriod = Integer.parseInt(config.get("checkpoint.period"));
        int checkPointTimeout = Integer.parseInt(config.get("checkpoint.timeout"));

        StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        //step2 设置Checkpoint相关参数,用于Failover容错
        sEnv.getConfig().registerTypeWithKryoSerializer(MobilePage.class,
                ProtobufSerializer.class);
        sEnv.getCheckpointConfig().setFailOnCheckpointingErrors(false);
        sEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        sEnv.enableCheckpointing(checkPointPeriod, CheckpointingMode.EXACTLY_ONCE);
        sEnv.getCheckpointConfig().setCheckpointTimeout(checkPointTimeout);
        sEnv.getCheckpointConfig().enableExternalizedCheckpoints(
                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        //step3 使用Blink planner、创建TableEnvironment,并且设置状态过期时间,避免Job OOM
        EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(sEnv, environmentSettings);
        tEnv.getConfig().setIdleStateRetentionTime(Time.days(1), Time.days(2));

        Properties sourceProperties = new Properties();
        sourceProperties.setProperty("bootstrap.servers", sourceBootStrapServers);
        sourceProperties.setProperty("auto.commit.interval.ms", "3000");
        sourceProperties.setProperty("group.id", groupId);

        //step4 初始化KafkaTableSource的Schema信息,笔者这里使用register TableSource的方式将源表注册到Flink中,而没有用register DataStream方式,也是因为想熟悉一下如何注册KafkaTableSource到Flink中
        TableSchema schema = TableSchemaUtil.getAppPageViewTableSchema();
        Optional<String> proctimeAttribute = Optional.empty();
        List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors = Collections.emptyList();
        Map<String, String> fieldMapping = new HashMap<>();
        List<String> columnNames = new ArrayList<>();
        RowTypeInfo rowTypeInfo = new RowTypeInfo(schema.getFieldTypes(), schema.getFieldNames());
        columnNames.addAll(Arrays.asList(schema.getFieldNames()));
        columnNames.forEach(name -> fieldMapping.put(name, name));
        PageViewDeserializationSchema deserializationSchema = new PageViewDeserializationSchema(
                rowTypeInfo);
        Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
        Kafka011TableSource kafkaTableSource = new Kafka011TableSource(
                schema,
                proctimeAttribute,
                rowtimeAttributeDescriptors,
                Optional.of(fieldMapping),
                topic,
                sourceProperties,
                deserializationSchema,
                StartupMode.EARLIEST,
                specificOffsets);
        tEnv.registerTableSource("pageview", kafkaTableSource);

        //step5 初始化Hbase TableSchema、写入参数,并将其注册到Flink中
        HBaseTableSchema hBaseTableSchema = new HBaseTableSchema();
        hBaseTableSchema.setRowKey("log_date", String.class);
        hBaseTableSchema.addColumn("f", "UV", Long.class);
        HBaseOptions hBaseOptions = HBaseOptions.builder()
                .setTableName(hbaseTable)
                .setZkQuorum(hbaseZkQuorum)
                .setZkNodeParent(hbaseZkParent)
                .build();
        HBaseWriteOptions hBaseWriteOptions = HBaseWriteOptions.builder()
                .setBufferFlushMaxRows(1000)
                .setBufferFlushIntervalMillis(1000)
                .build();
        HBaseUpsertTableSink hBaseSink = new HBaseUpsertTableSink(hBaseTableSchema, hBaseOptions, hBaseWriteOptions);
        tEnv.registerTableSink("uv_index", hBaseSink);

        //step6 实时计算当天UV指标sql, 这里使用最简单的group by agg,没有使用minibatch或窗口,在大数据量优化时最好使用后两种方式
        String uvQuery = "insert into uv_index "
                + "select log_date,\n"
                + "ROW(count(distinct mid) as UV)\n"
                + "from pageview\n"
                + "group by log_date";
        tEnv.sqlUpdate(uvQuery);
        //step7 执行Job
        sEnv.execute("UV Job");
    }
}

以上就是一个简单的使用 Flink SQL 统计 UV 的 case, 代码非常简单,只需要理清楚如何解析 Kafka 中数据,如何初始化 Table Schema,以及如何将表注册到 Flink中,即可使用 Flink SQL 完成各种复杂的实时数据统计类的业务需求,学习成本比API 的方式低很多。说明一下,笔者这个 demo 是基于目前业务场景而开发的,在生产环境中可以真实运行起来,可能不能拆箱即用,你需要结合自己的业务场景自定义相应的 kafka 数据解析类。

END

本文分享自微信公众号 - 1001次重燃(smile765999),作者:木野归郎

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-06-02

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 面试被问到Flink的checkpoint问题,给问懵逼了....

    Checkpoint 机制

    木野归郎
  • 拿美团offer,HIve基础篇(补)

    GROUP BY 语句通常会和聚合函数一起使用,按照一个或者多个列队结果进行分组,然后对每个组执行聚合操作。

    木野归郎
  • 项目实战中Hive注释乱码解决方案

    下面这些都是我在工作中总结出来的,希望对大家有帮助,如果有其他的问题或者解决方法可以留言给我。

    木野归郎
  • Apache Flink on Kubernetes运行模式分析

    Apache Flink是一个分布式流处理引擎,它提供了丰富且易用的API来处理有状态的流处理应用,并且在支持容错的前提下,高效、大规模的运行此类应用。通过支持...

    yujunwang
  • 使用 Kubernetes 部署 Flink 应用

    https://blog.csdn.net/zjerryj/article/details/100063858

    大数据技术与架构
  • Descheduler 实现 K8S Pod 二次调度

    Kubernetes中的调度是将待处理的pod绑定到节点的过程,由Kubernetes的一个名为kube-scheduler的组件执行。调度程序的决定,无论是否...

    YP小站
  • CornerStone的使用

    对于我们程序员来说,不管你是大神,还是小鱼小虾,进入公司之后,都用过源码管理工具,不然你就不是一个合格的程序员,现在各个公司用于源码管理工具通常有下面两种:

    s_在路上
  • Android CameraX NDK OpenCV(四)-- 二维码检测与识别

    OpenCV在4的版本后就有了二维码QRCode的检测和识别功能,当时刚出的时候效率及识别效果都还一般,在4.1.2的版本中也改善了精度和速度,然后后面4.3版...

    Vaccae
  • Flink入门宝典(详细截图版)

    本文基于java构建Flink1.9版本入门程序,需要Maven 3.0.4 和 Java 8 以上版本。需要安装Netcat进行简单调试。

    用户6070864
  • 如何使用 Kubernetes 部署 Flink 应用

    场景描述:Kubernetes 是目前非常流行的容器编排系统,在其之上可以运行 Web 服务、大数据处理等各类应用。这些应用被打包在一个个非常轻量的容器中,我们...

    zhisheng

扫码关注云+社区

领取腾讯云代金券

,,