前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Oceanus 在腾讯微视数据的实践-统计某时间段内的uv、pv

Oceanus 在腾讯微视数据的实践-统计某时间段内的uv、pv

作者头像
吴云涛
修改2021-11-02 12:21:32
1.7K0
修改2021-11-02 12:21:32
举报

导语

在实时计算中,经常会遇到需要计算某个时间段内的pv、uv这类需求,完成该类需求有多种方式,本文以微视数据端内计算启动数据的pv、uv为应用场景,来介绍常用的两种实现方式。

业务背景

为了实时监控微视端内app启动以及启动方式的情况,需要实时的统计每10分钟及每小时pv、uv。这里pv,每收到一条启动日志即+1,uv则需要依据启动的唯一标识qimei来做去重处理。

实现介绍

实现pv、uv的统计主要微视数据尝试过两种方式,一是窗口方式:主要是使用flink window+valueState,统计的结果可以直接输出;另外一种是使用redis,借用外部存储系统redis来完成,两种实现方式各有优劣吧。

窗口方式:使用窗口的方式,来计算pv、uv,即根据需求的时间段,来设定窗口的大小,例如需要计算10分钟内的pv、uv则需要开一个10分钟时长的统计窗口,对于pv不需要做去重处理,对于uv,需要借用flink自带的valueState来保存中间数据,同时需要借用set、hyperloglog或者bitmap(roaringbitmap)等数据结构来做去重。计算pv较简单,在这里不做介绍,例如下面使用hyperloglog来做去重,来计算uv,在maven中添加导入hyperloglog的依赖:

代码语言:javascript
复制
<dependency>
      <groupId>com.clearspring.analytics</groupId>
      <artifactId>stream</artifactId>
      <version>2.9.8</version>
</dependency>

 程序执行主体:构造flink的执行环境、从kafka中读取数据,对数据流做map、aggregate等操作,将处理的数据写入到虫洞kafka中。

代码语言:javascript
复制
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
    env.getConfig().setAutoWatermarkInterval(5000L);
    env.getConfig().setUseSnapshotCompression(true);

    //kafka配置
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", KAFKA_BROKERS);
    properties.setProperty("enable.auto.commit", "true");
    properties.setProperty("auto.commit.interval.ms", "5000");

    properties.setProperty("group.id", CONSUMER_GROUP);
    properties.setProperty("client.id", CONSUMER_GROUP);  //用于虫洞验证
    FlinkKafkaConsumer011<AppActionModel> kafkaConsumer =
        new FlinkKafkaConsumer011<>(KAFKA_TOPIC, new AppActionDeSerializer(), properties);
    kafkaConsumer.setStartFromLatest();
    kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
    DataStream<AppActionModel> dataStream = env.addSource(kafkaConsumer).setParallelism(80);

    DataStream<StatPvUv> resultStream = dataStream.map(new Dau10minMapFunction())
        .timeWindowAll(Time.minutes(10)).aggregate(new DauHllAggregateFuction());

    //写kafka配置
    Properties propsProducer = new Properties();
    propsProducer.setProperty("bootstrap.servers", "9.144.224.168:9092");
    propsProducer.put("serializer.class", "kafka.serializer.StringEncoder");
    //propsProducer.setProperty("metadata.broker.list", "9.144.224.168:9092");
    //propsProducer.setProperty("client.id", "wesee_core_kpi_10min");  //用于虫洞验证
    FlinkKafkaProducer011 flinkKafkaProducer = new FlinkKafkaProducer011("wesee_core_kpi_10min",
        new SimpleStringSchema(),propsProducer);

    resultStream.map(new MapFunction<StatPvUv, String>() {
      @Override
      public String map(StatPvUv statPvUv) throws Exception {

        String time = DateUtil.dateTo12String(new Date());
        StringBuilder sb = new StringBuilder();
        sb.append(time).append("|").append(statPvUv.getUv()).append("|")
            .append(statPvUv.getPv());
        return sb.toString();
      }
    }).addSink(flinkKafkaProducer);

    env.execute("Dau10minUvPvToKafkaTopology");

 Dau10minMapFuction主要是获取数据流中的qimei:

代码语言:javascript
复制
public class Dau10minMapFunction extends RichMapFunction<AppActionModel, String> {
  @Override
  public String map(AppActionModel value) throws Exception {
    String qimei = value.getQimei();
    return qimei;
  }
}

DauHllAggregateFuction主要是借用HyperloglogPlus来做去重,该数据结构是一种模糊的去重,其原理可参见https://en.wikipedia.org/wiki/HyperLogLog,如果需要精确去重,可以将HyperloglogPlus结构替换为roaringbitmap等数据结构。采用HyperloglogPlus的优势在于,该数据结构占用内存空间减少。

代码语言:javascript
复制
public class DauHllAggregateFuction implements AggregateFunction<String, AccHllBean, StatPvUv> {
  @Override
  public AccHllBean createAccumulator() {
    return new AccHllBean(0, new HyperLogLogPlus(5,18));
  }
  @Override
  public AccHllBean add(String s, AccHllBean accBean) {

    HyperLogLogPlus hll=accBean.getSets();
    hll.offer(s);
    return new AccHllBean(accBean.getPv() + 1, hll);
  }
  @Override
  public StatPvUv getResult(AccHllBean accr) {
    long pv = accr.getPv();
    long uv = accr.getSets().cardinality();
    return new StatPvUv(pv, uv);
  }
  @Override
  public AccHllBean merge(AccHllBean acc1, AccHllBean acc2) {
    HyperLogLogPlus tmp = new HyperLogLogPlus(5,18);
    long resutl = acc1.getPv() + acc2.getPv();
    try {
      tmp.addAll(acc1.getSets());
      tmp.addAll(acc2.getSets());
    }catch (Exception e){
      System.out.println("hll merger erro");
    }
    return new AccHllBean(resutl, tmp);
  }
}

 AccHllBean是封装的javaBean:

代码语言:javascript
复制
public class AccHllBean implements Serializable {

  private long pv;

  private HyperLogLogPlus sets;

  public long getPv() {
    return pv;
  }

  public void setPv(long pv) {
    this.pv = pv;
  }

  public HyperLogLogPlus getSets() {
    return sets;
  }

  public void setSets(HyperLogLogPlus sets) {
    this.sets = sets;
  }

  public AccHllBean(long pv, HyperLogLogPlus sets) {
    this.pv = pv;
    this.sets = sets;
  }
}

 对于数据量比较少的需求,去重结构还可以使用set集合。

借用redis:使用redis方式来计算某时间段的pv、uv,如果是需要计算任意时间段内,可以使用redis的zset结构或者是通过hash分片,都是把统计的时间窗口放在redis的key上,计算uv,可以借用redis提供的bitmap或者hyperloglog来完成。实现如下:构造flink运行环境,对数据流做简单的清洗(构造写入redis的key和value)

代码语言:javascript
复制
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.enableCheckpointing(60 * 1000 * 2);
    //kafka配置
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", KAFKA_BROKERS);
    properties.setProperty("enable.auto.commit", "true");
    properties.setProperty("auto.commit.interval.ms", "5000");

    properties.setProperty("group.id", CONSUMER_GROUP);
    properties.setProperty("client.id", CONSUMER_GROUP);  //用于虫洞验证
    FlinkKafkaConsumer011<AppActionModel> kafkaConsumer =
        new FlinkKafkaConsumer011<>(KAFKA_TOPIC, new AppActionDauPvUvDeSerializer(), properties);
    kafkaConsumer.setStartFromLatest();
    kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
    // 使用日志里面的 reportTime作为 eventTime
    DataStream<AppActionModel> dataStream = env.addSource(kafkaConsumer);
    dataStream.addSink(new SinkDau10minAnd1hourPvUvToRedis());
    env.execute("Dau10minUvPvToRedis");

如果计算的pv、uv需要以接口方式提供或者写入到kafka,增需要再写一个程序 ,定时读取redis。

两种方式对比:采用窗口的方式来计算pv、uv,代码实现起来更复杂一下,可以直接将统计的结果写入到kafka中,并且不需要额外的存储资源。借用redis来计算pv、uv,代码实现较简单,统计的数据,可以按照实际需要直接保存在redis中,由于构造存储统计数据的key是按照日志上报的时间,该方式具有更长的延迟数据处理能力。

转自:merlingyu(余杨),研发工程师

本文系转载,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文系转载前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 业务背景:
  • 实现介绍:
相关产品与服务
流计算 Oceanus
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的企业级实时大数据分析平台,具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档