首页
学习
活动
专区
圈层
工具
发布
49 篇文章
1
YARN
2
Hadoop前世今生
3
AI分类
4
人工智能综述
5
随机森林
6
【HBase】HBase之what
7
【HBase】HBase之how
8
HBase篇--HBase常用优化
9
Hbase优化
10
flink源码从头分析第一篇之WordCount DataStream操作
11
大数据Flink-Java学习之旅第一篇
12
flink(12)-flink on yarn
13
Flink学习——Flink概述
14
Flink学习笔记:2、Flink介绍
15
Flink学习笔记(2) -- Flink部署
16
Flink入门(一)——Apache Flink介绍
17
Flink1.4 Flink程序剖析
18
Flink SQL 优化实战 - 维表 JOIN 优化
19
flink sql 知其所以然(十四):维表 join 的性能优化之路(上)附源码
20
Flink重点难点:维表关联理论和Join实战
21
Flink重点难点:状态(Checkpoint和Savepoint)容错与两阶段提交
22
详解flink中Look up维表的使用
23
Flink 1.11中对接Hive新特性及如何构建数仓体系
24
Flink 实时计算 - SQL 维表 Join 的实现
25
大数据技术周报第 010 期
26
实时数仓在有赞的实践
27
美团基于 Flink 的实时数仓平台建设新进展
28
基于Flink+Hive构建流批一体准实时数仓
29
实时数仓:基于流计算 Oceanus 实现 MySQL 和 HBase 维表到 ClickHouse 的实时分析
30
当 TiDB 与 Flink 相结合:高效、易用的实时数仓
31
flink维表关联系列之Mysql维表关联:全量加载
32
基于Flink的高可靠实时ETL系统
33
基于 Flink 实现的商品实时推荐系统(附源码)
34
【Flink】基于 Flink 的流式数据实时去重
35
Flink 实战 | 贝壳找房基于Flink的实时平台建设
36
Apache Hudi在华米科技的应用-湖仓一体化改造
38
Flink checkpoint
39
理解Flink checkpoint
40
flink checkpoint配置整理
41
flink checkpoint 源码分析 (二)
42
聊聊flink的checkpoint配置
43
Flink中案例学习--State与CheckPoint
44
Flink源码阅读(一)--Checkpoint触发机制
45
Flink企业级优化全面总结(3万字长文,15张图)
46
Flink高频面试题,附答案解析
47
学习Flink,看这篇就够了
48
【最全的大数据面试系列】Flink面试题大全
49
Flink SQL Client综合实战

【Flink】基于 Flink 的流式数据实时去重

在实时计算 PV 信息时,用户短时间内重复点击并不会增加点击次数,基于此需求,我们需要对流式数据进行实时去重。

一想到大数据去重,我们立刻可以想到布隆过滤器、HyperLogLog 去重、Bitmap 去重等方法。对于实时数据处理引擎 Flink 来说,除了上述方法外还可以通过 Flink SQL 方式或 Flink 状态管理的方式进行去重。

本文主要介绍基于 Flink 状态管理的方式进行实时去重。

1.状态管理

虽然 Flink 的很多操作都是基于事件解析器进行一次的事件处理,但也有很多操作需要记住多个事件的信息,比如窗口运算等。这些操作便称为有状态的操作。

有状态的操作有一些经典案例,比如说:

  • 计算每分钟/小时/天的统计量等;
  • 实时计算 PV、UV,需要维护目前已有的 PV、UV 信息;
  • 实时更新机器学习模型,需要记住模型的参数;

我们在上一篇内容中介绍了如何计算分钟级的统计量,我们采用的方法是开一个窗口函数进行统计;而现在的任务是数据去重,对于增量数据来说没法进行开窗运算。

针对这种情况,Flink 提供了基于事件驱动的处理函数(ProcessFunction),其将事件处理与 Timer、State 结合在一起,提供了更加强大和丰富的功能。

Flink 子任务状态更新和获取的流程如下图所示,一个算子子任务接收输入流,获取对应的状态,根据新的计算结果更新状态。

获取和更新状态的逻辑其实并不复杂,但流处理框架还需要解决以下几类问题:

  • 数据的产出要保证实时性,延迟不能太高;
  • 需要保证数据不丢不重,恰好计算一次,尤其是当状态数据非常大或者应用出现故障需要恢复时,要保证状态的计算不出任何错误;
  • 一般流处理任务都是 7*24 小时运行的,程序的可靠性需要非常高。

基于上述要求,我们不能将状态仅交由内存管理,因为内存的容量是有限制的,当状态数据稍微大一些时,就会出现内存不够的问题。由于 Flink 本身提供了有状态的计算,并且封装了一些底层的实现,比如状态的高效存储、Checkpoint 和 Savepoint 持久化备份机制、计算资源扩缩容等问题,所以我们只需要调用 Flink API,专注于业务逻辑即可。

2.状态类型

Managed State 和 Raw State

Flink有两种基本类型的状态:托管状态(Managed State)和原生状态(Raw State)。从名称中也能读出两者的区别:Managed State 是由 Flink 管理的,Flink 帮忙存储、恢复和优化,Raw State 是开发者自己管理的,需要自己序列化。两者对比如下:

Managed State

Raw State

状态管理方式

Flink Runtime 托管,自动存储、自动恢复、自动伸缩

用户自己管理

状态数据结构

Flink提供的常用数据结构,如 ListState、MapState 等

字节数组:byte[]

使用场景

绝大多数 Flink 算子

用户自定义算子

大部分情况下我们使用 Managed State 便可满足需求。

Keyed State 和 Operator State

我们对 Managed State 继续细分,它又有两种类型:Keyed State 和 Operator State。

Keyed State 是 KeyedStream 上的状态。假如输入流按照 id 为 Key 进行了 keyBy 分组,形成一个 KeyedStream。下图为 Keyed State,因为一个算子子任务可以处理一到多个Key,算子子任务 1 处理了两种 Key,两种 Key 分别对应自己的状态。

Operator State 可以用在所有算子上,每个算子子任务或者说每个算子实例共享一个状态,流入这个算子子任务的数据可以访问和更新这个状态。下图为 Operator State。

下图为两者的区别:

Keyed State

Operator State

适用算子类型

只适用于KeyedStream上的算子

可以用于所有算子

状态分配

每个 Key 对应一个状态

一个算子子任务对应一个状态

创建和访问方式

重写 Rich Function 通过里面的 RuntimeContext 访问

实现 CheckpointedFunction 等接口

支持的数据结构

ValueState、ListState、MapState 等

ListState、BroadcastState 等

无论是 Keyed State 还是 Operator State,Flink 的状态都是基于本地的,即每个算子子任务维护着这个算子子任务对应的状态存储,算子子任务之间的状态不能相互访问。

3.代码实践

3.1 数据准备

准备一些数据 demo,数据格式和之前的一样,依次为 user、item、catelog、behavior、timestamp。

其中第一个数据和最后一行数据视为重复数据(两次点击间隔一秒)。

代码语言:javascript
复制
952483,310884,4580532,pv,1511712000
794777,5119439,982926,pv,1511712000
875914,4484065,1320293,pv,1511712000
980877,5097906,149192,pv,1511712000
944074,2348702,3002561,pv,1511712000
973127,1132597,4181361,pv,1511712000
84681,3505100,2465336,pv,1511712000
732136,3815446,2342116,pv,1511712000
940143,2157435,1013319,pv,1511712000
655789,4945338,4145813,pv,1511712000
952483,310884,4580532,pv,1511713000

3.2 数据源

代码语言:javascript
复制
package com.aze.producer;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.io.BufferedReader;
import java.io.FileReader;

/**
 * @Author: aze
 * @Date: 2020-09-16 14:41
 */
public class ReadLineSource  implements SourceFunction<String> {

    private String filePath;
    private boolean canceled = false;

    public ReadLineSource(String filePath){
        this.filePath = filePath;
    }

    @Override
    public void run(SourceContext<String> sourceContext) throws Exception {
        BufferedReader reader = new BufferedReader(new FileReader(filePath));
        while (!canceled && reader.ready()){
            String line = reader.readLine();
            sourceContext.collect(line);
            Thread.sleep(10);
        }
    }

    @Override
    public void cancel() {
        canceled = true;
    }
}

3.3 主程序

为了增加可读性,主程序介绍时不采用链式编程,后面放出完整程序时会采用链式编程。

3.3.1 创建环境

代码语言:javascript
复制
val env = StreamExecutionEnvironment.getExecutionEnvironment();
// 注意设置 EventTime,而不是默认的 ProcessTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);

3.3.2 数据接入及清洗

这里用 lombok 包的 val 进行修饰,这样便不必编写实际类型。

我们利用 flatMap 清洗掉出了 "pv" 外的其他用户行为,并传出 Tuple2<String, Long> 类型。String 为 key,Long 为 timestamp。

代码语言:javascript
复制
val dataStream = env.addSource(new ReadLineSource("src/main/resources/data.txt"))
     // 注意 FlatMapFunction 不要写成 Lambda 表达式
     // 我们使用了泛型,所以没有显式地指明返回值的类型的话会出错
        .flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Long>> out) {
                String[] split = s.split(",");
                if ("pv".equals(split[3])) {
                    val res = new Tuple2<>(split[0] + "-" + split[1], Long.parseLong(split[4]));
                    out.collect(res);
                }
            }
        });

3.3.3 定义事件时间

代码语言:javascript
复制
val process = dataStream.assignTimestampsAndWatermarks(WatermarkStrategy
            .<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofMillis(1000))
            .withTimestampAssigner((SerializableTimestampAssigner<Tuple2<String, Long>>)
                    (s, l) -> s.f1));

我们为 flatMap 传出的 Tuple2<String, Long> 类型的数据定义一下时间时间。

3.3.4 定义内部类

为了方便处理,我们定义个内部类方便进行状态存储。

同样我们也引用了 lombok 为类添加注解(减少代码量)。

代码语言:javascript
复制
@Data
@ToString
@AllArgsConstructor
private static class UserBehavior {
    private String id;
    private long timestamp;
}

3.3.5 事件处理

通过注释的形式讲解下这段代码

代码语言:javascript
复制
// 基于 key 进行分流
process.keyBy(s -> s.f0)
     // 每一个 key 都会维护一个 KeyedProcessFunction
        .process(new KeyedProcessFunction<String, Tuple2<String, Long>, Object>() {

            // 为每个 key 创建一个私有的状态
            private ValueState<UserBehavior> state;
            
            // KeyedProcessFunction 创建时会调用 open 方法
            @Override
            public void open(Configuration parameters) {
                // 创建一个状态描述器
                val stateDescriptor = new ValueStateDescriptor<>("mystate", UserBehavior.class);
                // 设置状态的生存时间,过时销毁,主要是为减少内存
                stateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.seconds(60)).build());
                // 完成 Keyed State 的创建。
                state = getRuntimeContext().getState(stateDescriptor);
            }

            // 处理事件
            @Override
            public void processElement(Tuple2<String, Long> in,
                                       Context ctx,
                                       Collector<Object> out) throws Exception {
                // 从状态中拿出对象
                UserBehavior cur = state.value();
                // 如果为空则为新数据,否则就是重复数据
                if (cur == null) {
                    cur = new UserBehavior(in.f0, in.f1);
                    // 记得更新下状态
                    state.update(cur);
                    // 注册个定时器任务,60 秒后可以不算是新数据
                    // 即用户 60 秒点击多次只能算一次有效点击
                    ctx.timerService().registerEventTimeTimer(cur.getTimestamp() + 60000);
     // 新数据可以向下传递
                    out.collect(cur);
                } else {
                    // 打印一下
                    System.out.println("[Duplicate Data] " + in.f0 + " " + in.f1);
                }
            }

            // 触发定时任务
            @Override
            public void onTimer(long timestamp,
                                OnTimerContext ctx,
                                Collector<Object> out) throws Exception {
                UserBehavior cur = state.value();
                // 利用定时任务将状态清空
                if (cur.getTimestamp() + 60000 <= timestamp) {
                    System.out.printf("[Overdue] now: %d obj_time: %d Date: %s%n",
                            timestamp, cur.getTimestamp(), cur.getId());
                    state.clear();
                }
            }
        })
        .print();

注意,我这里用的是 ValueState,如果想要更好的性能,可以使用 MapState

3.3.6 执行

代码语言:javascript
复制
env.execute("flink");

输出为:

代码语言:javascript
复制
DataDeduplicate.UserBehavior(id=952483-310884, timestamp=1511712000)
DataDeduplicate.UserBehavior(id=794777-5119439, timestamp=1511712000)
DataDeduplicate.UserBehavior(id=875914-4484065, timestamp=1511712000)
DataDeduplicate.UserBehavior(id=980877-5097906, timestamp=1511712000)
DataDeduplicate.UserBehavior(id=944074-2348702, timestamp=1511712000)
DataDeduplicate.UserBehavior(id=973127-1132597, timestamp=1511712000)
DataDeduplicate.UserBehavior(id=84681-3505100, timestamp=1511712000)
DataDeduplicate.UserBehavior(id=732136-3815446, timestamp=1511712000)
DataDeduplicate.UserBehavior(id=940143-2157435, timestamp=1511712000)
DataDeduplicate.UserBehavior(id=655789-4945338, timestamp=1511712000)
[Duplicate Data] 952483-310884 1511713000
[Overdue] now: 1511772000 obj_time: 1511712000 Date: 952483-310884
[Overdue] now: 1511772000 obj_time: 1511712000 Date: 655789-4945338
[Overdue] now: 1511772000 obj_time: 1511712000 Date: 940143-2157435
[Overdue] now: 1511772000 obj_time: 1511712000 Date: 732136-3815446
[Overdue] now: 1511772000 obj_time: 1511712000 Date: 84681-3505100
[Overdue] now: 1511772000 obj_time: 1511712000 Date: 973127-1132597
[Overdue] now: 1511772000 obj_time: 1511712000 Date: 944074-2348702
[Overdue] now: 1511772000 obj_time: 1511712000 Date: 980877-5097906
[Overdue] now: 1511772000 obj_time: 1511712000 Date: 875914-4484065
[Overdue] now: 1511772000 obj_time: 1511712000 Date: 794777-5119439

可以看到,碰到最后一条数据是提示了为重复数据。

后面定时器也全部触发了,大概率是因为触发水印。

3.4 完整程序

放上完整程序。

代码语言:javascript
复制
package com.aze.consumer;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.ToString;
import lombok.val;
import com.aze.producer.ReadLineSource;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.time.Duration;

/**
 * @Author: aze
 * @Date: 2020-09-16 14:45
 */
public class DataDeduplicate {

    public static void main(String[] args) throws Exception {
        val env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        val dataStream = env.addSource(new ReadLineSource("src/main/resources/data.txt"));
        val process = dataStream
                .flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
                    @Override
                    public void flatMap(String s, Collector<Tuple2<String, Long>> out) throws Exception {
                        String[] split = s.split(",");
                        if ("pv".equals(split[3])) {
                            val res = new Tuple2<>(split[0] + "-" + split[1], Long.parseLong(split[4]));
                            out.collect(res);
                        }
                    }
                })
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        .<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofMillis(1000))
                        .withTimestampAssigner((SerializableTimestampAssigner<Tuple2<String, Long>>)
                                (s, l) -> s.f1))
                .keyBy(s -> s.f0)
                .process(new KeyedProcessFunction<String, Tuple2<String, Long>, Object>() {

                    private ValueState<UserBehavior> state;

                    @Override
                    public void open(Configuration parameters) {
                        val stateDescriptor = new ValueStateDescriptor<>("mystate", UserBehavior.class);
                        stateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.seconds(60)).build());
                        state = getRuntimeContext().getState(stateDescriptor);
                    }

                    @Override
                    public void processElement(Tuple2<String, Long> in,
                                               Context ctx,
                                               Collector<Object> out) throws Exception {
                        UserBehavior cur = state.value();
                        if (cur == null) {
                            cur = new UserBehavior(in.f0, in.f1);
                            state.update(cur);
                            ctx.timerService().registerEventTimeTimer(cur.getTimestamp() + 60000);
                            out.collect(cur);
                        } else {
                            System.out.println("[Duplicate Data] " + in.f0 + " " + in.f1);
                        }
                    }

                    @Override
                    public void onTimer(long timestamp,
                                        OnTimerContext ctx,
                                        Collector<Object> out) throws Exception {
                        UserBehavior cur = state.value();
                        if (cur.getTimestamp() + 1000 <= timestamp) {
                            System.out.printf("[Overdue] now: %d obj_time: %d Date: %s%n",
                                    timestamp, cur.getTimestamp(), cur.getId());
                            state.clear();
                        }
                    }
                });
        process.print();
        env.execute("flink");

    }

    @Data
    @ToString
    @AllArgsConstructor
    private static class UserBehavior {
        private String id;
        private long timestamp;
    }
}

4.总结

以上便是基于 Flink 数据实时去重的所有情况,目前还只是单机处理,也不知道碰到大数据集会不会出现内存爆炸的情况。

5.参考

  1. 《Flink状态管理详解:Keyed State和Operator List State深度解析》
  2. 《有状态流处理》
  3. 《事件驱动应用》
下一篇
举报
领券