专栏首页zhisheng基于Kafka+Flink+Redis的电商大屏实时计算案例

基于Kafka+Flink+Redis的电商大屏实时计算案例

前言

阿里的双11销量大屏可以说是一道特殊的风景线。实时大屏(real-time dashboard)正在被越来越多的企业采用,用来及时呈现关键的数据指标。并且在实际操作中,肯定也不会仅仅计算一两个维度。由于Flink的“真·流式计算”这一特点,它比Spark Streaming要更适合大屏应用。本文从笔者的实际工作经验抽象出简单的模型,并简要叙述计算流程(当然大部分都是源码)。

数据格式与接入

简化的子订单消息体如下。

{
    "userId": 234567,
    "orderId": 2902306918400,
    "subOrderId": 2902306918401,
    "siteId": 10219,
    "siteName": "site_blabla",
    "cityId": 101,
    "cityName": "北京市",
    "warehouseId": 636,
    "merchandiseId": 187699,
    "price": 299,
    "quantity": 2,
    "orderStatus": 1,
    "isNewOrder": 0,
    "timestamp": 1572963672217
}

由于订单可能会包含多种商品,故会被拆分成子订单来表示,每条JSON消息表示一个子订单。现在要按照自然日来统计以下指标,并以1秒的刷新频率呈现在大屏上:

  • 每个站点(站点ID即siteId)的总订单数、子订单数、销量与GMV;
  • 当前销量排名前N的商品(商品ID即merchandiseId)与它们的销量。

由于大屏的最大诉求是实时性,等待迟到数据显然不太现实,因此我们采用处理时间作为时间特征,并以1分钟的频率做checkpointing。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.enableCheckpointing(60 * 1000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(30 * 1000);

然后订阅Kafka的订单消息作为数据源。

    Properties consumerProps = ParameterUtil.getFromResourceFile("kafka.properties");
    DataStream<String> sourceStream = env
      .addSource(new FlinkKafkaConsumer011<>(
        ORDER_EXT_TOPIC_NAME,                        // topic
        new SimpleStringSchema(),                    // deserializer
        consumerProps                                // consumer properties
      ))
      .setParallelism(PARTITION_COUNT)
      .name("source_kafka_" + ORDER_EXT_TOPIC_NAME)
      .uid("source_kafka_" + ORDER_EXT_TOPIC_NAME);

给带状态的算子设定算子ID(通过调用uid()方法)是个好习惯,能够保证Flink应用从保存点重启时能够正确恢复状态现场。为了尽量稳妥,Flink官方也建议为每个算子都显式地设定ID,参考:https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#should-i-assign-ids-to-all-operators-in-my-job

接下来将JSON数据转化为POJO,JSON框架采用FastJSON。

    DataStream<SubOrderDetail> orderStream = sourceStream
      .map(message -> JSON.parseObject(message, SubOrderDetail.class))
      .name("map_sub_order_detail").uid("map_sub_order_detail");

JSON已经是预先处理好的标准化格式,所以POJO类SubOrderDetail的写法可以通过Lombok极大地简化。如果JSON的字段有不规范的,那么就需要手写Getter和Setter,并用@JSONField注解来指明。

@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class SubOrderDetail implements Serializable {
  private static final long serialVersionUID = 1L;
  
  private long userId;
  private long orderId;
  private long subOrderId;
  private long siteId;
  private String siteName;
  private long cityId;
  private String cityName;
  private long warehouseId;
  private long merchandiseId;
  private long price;
  private long quantity;
  private int orderStatus;
  private int isNewOrder;
  private long timestamp;
}

统计站点指标

将子订单流按站点ID分组,开1天的滚动窗口,并同时设定ContinuousProcessingTimeTrigger触发器,以1秒周期触发计算。注意处理时间的时区问题,这是老生常谈了。

    WindowedStream<SubOrderDetail, Tuple, TimeWindow> siteDayWindowStream = orderStream
      .keyBy("siteId")
      .window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
      .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)));

接下来写个聚合函数。

    DataStream<OrderAccumulator> siteAggStream = siteDayWindowStream
      .aggregate(new OrderAndGmvAggregateFunc())
      .name("aggregate_site_order_gmv").uid("aggregate_site_order_gmv");
  public static final class OrderAndGmvAggregateFunc
    implements AggregateFunction<SubOrderDetail, OrderAccumulator, OrderAccumulator> {
    private static final long serialVersionUID = 1L;

    @Override
    public OrderAccumulator createAccumulator() {
      return new OrderAccumulator();
    }

    @Override
    public OrderAccumulator add(SubOrderDetail record, OrderAccumulator acc) {
      if (acc.getSiteId() == 0) {
        acc.setSiteId(record.getSiteId());
        acc.setSiteName(record.getSiteName());
      }
      acc.addOrderId(record.getOrderId());
      acc.addSubOrderSum(1);
      acc.addQuantitySum(record.getQuantity());
      acc.addGmv(record.getPrice() * record.getQuantity());
      return acc;
    }

    @Override
    public OrderAccumulator getResult(OrderAccumulator acc) {
      return acc;
    }

    @Override
    public OrderAccumulator merge(OrderAccumulator acc1, OrderAccumulator acc2) {
      if (acc1.getSiteId() == 0) {
        acc1.setSiteId(acc2.getSiteId());
        acc1.setSiteName(acc2.getSiteName());
      }
      acc1.addOrderIds(acc2.getOrderIds());
      acc1.addSubOrderSum(acc2.getSubOrderSum());
      acc1.addQuantitySum(acc2.getQuantitySum());
      acc1.addGmv(acc2.getGmv());
      return acc1;
    }
  }

累加器类OrderAccumulator的实现很简单,看源码就大概知道它的结构了,因此不再多废话。唯一需要注意的是订单ID可能重复,所以需要用名为orderIds的HashSet来保存它。HashSet应付我们目前的数据规模还是没太大问题的,如果是海量数据,就考虑换用HyperLogLog吧。

接下来就该输出到Redis供呈现端查询了。这里有个问题:一秒内有数据变化的站点并不多,而ContinuousProcessingTimeTrigger每次触发都会输出窗口里全部的聚合数据,这样做了很多无用功,并且还会增大Redis的压力。所以,我们在聚合结果后再接一个ProcessFunction,代码如下。

    DataStream<Tuple2<Long, String>> siteResultStream = siteAggStream
      .keyBy(0)
      .process(new OutputOrderGmvProcessFunc(), TypeInformation.of(new TypeHint<Tuple2<Long, String>>() {}))
      .name("process_site_gmv_changed").uid("process_site_gmv_changed");
  public static final class OutputOrderGmvProcessFunc
    extends KeyedProcessFunction<Tuple, OrderAccumulator, Tuple2<Long, String>> {
    private static final long serialVersionUID = 1L;

    private MapState<Long, OrderAccumulator> state;

    @Override
    public void open(Configuration parameters) throws Exception {
      super.open(parameters);
      state = this.getRuntimeContext().getMapState(new MapStateDescriptor<>(
        "state_site_order_gmv",
        Long.class,
        OrderAccumulator.class)
      );
    }

    @Override
    public void processElement(OrderAccumulator value, Context ctx, Collector<Tuple2<Long, String>> out) throws Exception {
      long key = value.getSiteId();
      OrderAccumulator cachedValue = state.get(key);

      if (cachedValue == null || value.getSubOrderSum() != cachedValue.getSubOrderSum()) {
        JSONObject result = new JSONObject();
        result.put("site_id", value.getSiteId());
        result.put("site_name", value.getSiteName());
        result.put("quantity", value.getQuantitySum());
        result.put("orderCount", value.getOrderIds().size());
        result.put("subOrderCount", value.getSubOrderSum());
        result.put("gmv", value.getGmv());
        out.collect(new Tuple2<>(key, result.toJSONString());
        state.put(key, value);
      }
    }

    @Override
    public void close() throws Exception {
      state.clear();
      super.close();
    }
  }

说来也简单,就是用一个MapState状态缓存当前所有站点的聚合数据。由于数据源是以子订单为单位的,因此如果站点ID在MapState中没有缓存,或者缓存的子订单数与当前子订单数不一致,表示结果有更新,这样的数据才允许输出。

最后就可以安心地接上Redis Sink了,结果会被存进一个Hash结构里。

    // 看官请自己构造合适的FlinkJedisPoolConfig
    FlinkJedisPoolConfig jedisPoolConfig = ParameterUtil.getFlinkJedisPoolConfig(false, true);
    siteResultStream
      .addSink(new RedisSink<>(jedisPoolConfig, new GmvRedisMapper()))
      .name("sink_redis_site_gmv").uid("sink_redis_site_gmv")
      .setParallelism(1);
  public static final class GmvRedisMapper implements RedisMapper<Tuple2<Long, String>> {
    private static final long serialVersionUID = 1L;
    private static final String HASH_NAME_PREFIX = "RT:DASHBOARD:GMV:";

    @Override
    public RedisCommandDescription getCommandDescription() {
      return new RedisCommandDescription(RedisCommand.HSET, HASH_NAME_PREFIX);
    }

    @Override
    public String getKeyFromData(Tuple2<Long, String> data) {
      return String.valueOf(data.f0);
    }

    @Override
    public String getValueFromData(Tuple2<Long, String> data) {
      return data.f1;
    }

    @Override
    public Optional<String> getAdditionalKey(Tuple2<Long, String> data) {
      return Optional.of(
        HASH_NAME_PREFIX +
        new LocalDateTime(System.currentTimeMillis()).toString(Consts.TIME_DAY_FORMAT) +
        "SITES"
      );
    }
  }

商品Top N

我们可以直接复用前面产生的orderStream,玩法与上面的GMV统计大同小异。这里用1秒滚动窗口就可以了。

    WindowedStream<SubOrderDetail, Tuple, TimeWindow> merchandiseWindowStream = orderStream
      .keyBy("merchandiseId")
      .window(TumblingProcessingTimeWindows.of(Time.seconds(1)));

    DataStream<Tuple2<Long, Long>> merchandiseRankStream = merchandiseWindowStream
      .aggregate(new MerchandiseSalesAggregateFunc(), new MerchandiseSalesWindowFunc())
      .name("aggregate_merch_sales").uid("aggregate_merch_sales")
      .returns(TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() { }));

聚合函数与窗口函数的实现更加简单了,最终返回的是商品ID与商品销量的二元组。

  public static final class MerchandiseSalesAggregateFunc
    implements AggregateFunction<SubOrderDetail, Long, Long> {
    private static final long serialVersionUID = 1L;

    @Override
    public Long createAccumulator() {
      return 0L;
    }

    @Override
    public Long add(SubOrderDetail value, Long acc) {
      return acc + value.getQuantity();
    }

    @Override
    public Long getResult(Long acc) {
      return acc;
    }

    @Override
    public Long merge(Long acc1, Long acc2) {
      return acc1 + acc2;
    }
  }


  public static final class MerchandiseSalesWindowFunc
    implements WindowFunction<Long, Tuple2<Long, Long>, Tuple, TimeWindow> {
    private static final long serialVersionUID = 1L;

    @Override
    public void apply(
      Tuple key,
      TimeWindow window,
      Iterable<Long> accs,
      Collector<Tuple2<Long, Long>> out) throws Exception {
      long merchId = ((Tuple1<Long>) key).f0;
      long acc = accs.iterator().next();
      out.collect(new Tuple2<>(merchId, acc));
    }
  }

既然数据最终都要落到Redis,那么我们完全没必要在Flink端做Top N的统计,直接利用Redis的有序集合(zset)就行了,商品ID作为field,销量作为分数值,简单方便。不过flink-redis-connector项目中默认没有提供ZINCRBY命令的实现(必须再吐槽一次),我们可以自己加,步骤参照之前写过的那篇加SETEX的命令的文章,不再赘述。RedisMapper的写法如下。

  public static final class RankingRedisMapper implements RedisMapper<Tuple2<Long, Long>> {
    private static final long serialVersionUID = 1L;
    private static final String ZSET_NAME_PREFIX = "RT:DASHBOARD:RANKING:";

    @Override
    public RedisCommandDescription getCommandDescription() {
      return new RedisCommandDescription(RedisCommand.ZINCRBY, ZSET_NAME_PREFIX);
    }

    @Override
    public String getKeyFromData(Tuple2<Long, Long> data) {
      return String.valueOf(data.f0);
    }

    @Override
    public String getValueFromData(Tuple2<Long, Long> data) {
      return String.valueOf(data.f1);
    }

    @Override
    public Optional<String> getAdditionalKey(Tuple2<Long, Long> data) {
      return Optional.of(
        ZSET_NAME_PREFIX +
        new LocalDateTime(System.currentTimeMillis()).toString(Consts.TIME_DAY_FORMAT) + ":" +
        "MERCHANDISE"
      );
    }
  }

后端取数时,用ZREVRANGE命令即可取出指定排名的数据了。只要数据规模不是大到难以接受,并且有现成的Redis,这个方案完全可以作为各类Top N需求的通用实现。

The End

大屏的实际呈现需要保密,截图自然是没有的。以下是提交执行时Flink Web UI给出的执行计划(实际有更多的统计任务,不止3个Sink)。通过复用源数据,可以在同一个Flink job内实现更多统计需求。

1、《从0到1学习Flink》—— Apache Flink 介绍 2、《从0到1学习Flink》—— Mac 上搭建 Flink 1.6.0 环境并构建运行简单程序入门 3、《从0到1学习Flink》—— Flink 配置文件详解 4、《从0到1学习Flink》—— Data Source 介绍 5、《从0到1学习Flink》—— 如何自定义 Data Source ? 6、《从0到1学习Flink》—— Data Sink 介绍 7、《从0到1学习Flink》—— 如何自定义 Data Sink ? 8、《从0到1学习Flink》—— Flink Data transformation(转换) 9、《从0到1学习Flink》—— 介绍 Flink 中的 Stream Windows 10、《从0到1学习Flink》—— Flink 中的几种 Time 详解 11、《从0到1学习Flink》—— Flink 读取 Kafka 数据写入到 ElasticSearch 12、《从0到1学习Flink》—— Flink 项目如何运行? 13、《从0到1学习Flink》—— Flink 读取 Kafka 数据写入到 Kafka 14、《从0到1学习Flink》—— Flink JobManager 高可用性配置 15、《从0到1学习Flink》—— Flink parallelism 和 Slot 介绍 16、《从0到1学习Flink》—— Flink 读取 Kafka 数据批量写入到 MySQL 17、《从0到1学习Flink》—— Flink 读取 Kafka 数据写入到 RabbitMQ 18、《从0到1学习Flink》—— 你上传的 jar 包藏到哪里去了 19、大数据“重磅炸弹”——实时计算框架 Flink 20、《Flink 源码解析》—— 源码编译运行 21、为什么说流处理即未来? 22、OPPO数据中台之基石:基于Flink SQL构建实时数据仓库 23、流计算框架 Flink 与 Storm 的性能对比 24、Flink状态管理和容错机制介绍 25、原理解析 | Apache Flink 结合 Kafka 构建端到端的 Exactly-Once 处理 26、Apache Flink 是如何管理好内存的? 27、从0到1学习Flink》——Flink 中这样管理配置,你知道? 28、从0到1学习Flink》——Flink 不可以连续 Split(分流)? 29、Flink 从0到1学习—— 分享四本 Flink 的书和二十多篇 Paper 论文 30、360深度实践:Flink与Storm协议级对比 31、Apache Flink 1.9 重大特性提前解读 32、如何基于Flink+TensorFlow打造实时智能异常检测平台?只看这一篇就够了 33、美团点评基于 Flink 的实时数仓建设实践 34、Flink 灵魂两百问,这谁顶得住? 35、一文搞懂 Flink 的 Exactly Once 和 At Least Once 36、你公司到底需不需要引入实时计算引擎? 37、Flink 从0到1学习 —— 如何使用 Side Output 来分流? 38、一文让你彻底了解大数据实时计算引擎 Flink 39、基于 Flink 实现的商品实时推荐系统(附源码) 40、如何使用 Flink 每天实时处理百亿条日志? 41、Flink 在趣头条的应用与实践 42、Flink Connector 深度解析 43、滴滴实时计算发展之路及平台架构实践 44、Flink Back Pressure(背压)是怎么实现的?有什么绝妙之处? 45、Flink 实战 | 贝壳找房基于Flink的实时平台建设 46、如何使用 Kubernetes 部署 Flink 应用 47、一文彻底搞懂 Flink 网络流控与反压机制 48、Flink中资源管理机制解读与展望 49、Flink 实时写入数据到 ElasticSearch 性能调优 50深入理解 Flink 容错机制 51吐血之作 | 流系统Spark/Flink/Kafka/DataFlow端到端一致性实现对比

本文分享自微信公众号 - zhisheng(zhisheng_blog),作者:LittleMagic

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-12-19

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 一文让你彻底了解大数据实时计算引擎 Flink

    在上一篇文章 你公司到底需不需要引入实时计算引擎? 中我讲解了日常中常见的实时需求,然后分析了这些需求的实现方式,接着对比了实时计算和离线计算。随着这些年大数据...

    zhisheng
  • IntelliJ IDEA 2019.3 这回真的要飞起来了,新特性抢先看!

    IntelliJ IDEA 上周才公布下一个主要版本 2019.3 的 Roadmap,近日就发布了 IntelliJ IDEA 2019.3 的首个早期访问版...

    zhisheng
  • 《从0到1学习Flink》—— Apache Flink 介绍

    Flink 是一种流式计算框架,为什么我会接触到 Flink 呢?因为我目前在负责的是监控平台的告警部分,负责采集到的监控数据会直接往 kafka 里塞,然后告...

    zhisheng
  • B站实时平台的架构与实践

    本文来自B站实时平台负责人郑志升在 Flink Forward Asia 2019 上的技术分享,重点介绍了B站基于 Apache Flink 的流式计算平台建...

    大数据技术架构
  • Flink面试通关手册

    2019 年是大数据实时计算领域最不平凡的一年,2019 年 1 月阿里巴巴 Blink (内部的 Flink 分支版本)开源,大数据领域一夜间从 Spark ...

    暴走大数据
  • 大数据计算引擎,选 Flink 还是 Spark?

    我是 2018 年 6 月加入公司,一直负责监控平台的告警系统。之后,我们的整个监控平台架构中途换过两次,其中一次架构发生了巨大的变化。我们监控告警平台最早的架...

    CSDN技术头条
  • 大数据计算引擎,你 pick 哪个?

    我是 2018 年 6 月加入公司,一直负责监控平台的告警系统。之后,我们的整个监控平台架构中途换过两次,其中一次架构发生了巨大的变化。我们监控告警平台最早的架...

    用户1737318
  • 带你走入 Flink 的世界

    在 18 年时,就听说过 Flink 流式计算引擎,是阿里调研选型选择的新一代大数据框计算架,当时就记住了这个新框架。

    纯洁的微笑
  • Flink面试通关手册

    2019 年是大数据实时计算领域最不平凡的一年,2019 年 1 月阿里巴巴 Blink (内部的 Flink 分支版本)开源,大数据领域一夜间从 Spark ...

    王知无
  • IntelliJ IDEA 2019.3 这回真的要飞起来了,新特性抢先看!

    IntelliJ IDEA 上周才公布下一个主要版本 2019.3 的 Roadmap,近日就发布了 IntelliJ IDEA 2019.3 的首个早期访问版...

    zhisheng

扫码关注云+社区

领取腾讯云代金券