前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink1.13架构全集| 一文带你由浅入深精通Flink方方面面(二)

Flink1.13架构全集| 一文带你由浅入深精通Flink方方面面(二)

作者头像
857技术社区
发布2022-12-18 10:25:00
1.3K0
发布2022-12-18 10:25:00
举报
文章被收录于专栏:857-Bigdata857-Bigdata

十一、处理函数

之前所介绍的流处理API,无论是基本的转换、聚合,还是更为复杂的窗口操作,其实都是基于DataStream进行转换的;所以可以统称为DataStream API,这也是Flink编程的核心。而我们知道,为了让代码有更强大的表现力和易用性,Flink本身提供了多层API,DataStream API只是中间的一环,如图所示:

在更底层,我们可以不定义任何具体的算子(比如map,filter,或者window),而只是提炼出一个统一的“处理”(process)操作——它是所有转换算子的一个概括性的表达,可以自定义处理逻辑,所以这一层接口就被叫作“处理函数”(process function)。

11.1 基本处理函数(ProcessFunction)

处理函数主要是定义数据流的转换操作,所以也可以把它归到转换算子中。我们知道在Flink中几乎所有转换算子都提供了对应的函数类接口,处理函数也不例外;它所对应的函数类,就叫作ProcessFunction。

11.1.1 处理函数的功能和使用

我们之前学习的转换算子,一般只是针对某种具体操作来定义的,能够拿到的信息比较有限。如果我们想要访问事件的时间戳,或者当前的水位线信息,都是完全做不到的。跟时间相关的操作,目前我们只会用窗口来处理。而在很多应用需求中,要求我们对时间有更精细的控制,需要能够获取水位线,甚至要“把控时间”、定义什么时候做什么事,这就不是基本的时间窗口能够实现的了。

这时就需要使用底层的处理函数(ProcessFunction)。处理函数提供了一个“定时服务”(TimerService),我们可以通过它访问流中的事件(event)、时间戳(timestamp)、水位线(watermark),甚至可以注册“定时事件”。而且处理函数继承了AbstractRichFunction抽象类,所以拥有富函数类的所有特性,同样可以访问状态(state)和其他运行时信息。此外,处理函数还可以直接将数据输出到侧输出流(side output)中。所以,处理函数是最为灵活的处理方法,可以实现各种自定义的业务逻辑;同时也是整个DataStream API的底层基础。处理函数的使用与基本的转换操作类似,只需要直接基于DataStream调用.process()方法就可以了。方法需要传入一个ProcessFunction作为参数,用来定义处理逻辑。

stream.process(new MyProcessFunction()) 这里ProcessFunction不是接口,而是一个抽象类,继承了AbstractRichFunction;MyProcessFunction是它的一个具体实现。所以所有的处理函数,都是富函数(RichFunction),富函数可以调用的东西这里同样都可以调用。

11.1.2 ProcessFunction解析

在源码中我们可以看到,抽象类ProcessFunction继承了AbstractRichFunction,有两个泛型类型参数:I表示Input,也就是输入的数据类型;O表示Output,也就是处理完成之后输出的数据类型。内部单独定义了两个方法:一个是必须要实现的抽象方法.processElement();另一个是非抽象方法.onTimer()。

代码语言:javascript
复制
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
    ...
public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
...
}

1. 抽象方法.processElement()

用于“处理元素”,定义了处理的核心逻辑。这个方法对于流中的每个元素都会调用一次,参数包括三个:输入数据值value,上下文ctx,以及“收集器”(Collector)out。方法没有返回值,处理之后的输出数据是通过收集器out来定义的。

代码语言:javascript
复制
value:当前流中的输入元素,也就是正在处理的数据,类型与流中数据类型一致。
ctx:类型是ProcessFunction中定义的内部抽象类Context,表示当前运行的上下文,可以获取到当前的时间戳,并提供了用于查询时间和注册定时器的“定时服务”(TimerService),以及可以将数据发送到“侧输出流”(side output)的方法.output()。
out:“收集器”(类型为Collector),用于返回输出数据。使用方式与flatMap算子中的收集器完全一样,直接调用out.collect()方法就可以向下游发出一个数据。这个方法可以多次调用,也可以不调用。
通过几个参数的分析不难发现,ProcessFunction可以轻松实现flatMap这样的基本转换功能(当然map、filter更不在话下);而通过富函数提供的获取上下文方法.getRuntimeContext(),也可以自定义状态(state)进行处理,这也就能实现聚合操作的功能了。

2. 非抽象方法.onTimer()

用于定义定时触发的操作,这是一个非常强大、也非常有趣的功能。这个方法只有在注册好的定时器触发的时候才会调用,而定时器是通过“定时服务”TimerService来注册的。打个比方,注册定时器(timer)就是设了一个闹钟,到了设定时间就会响;而.onTimer()中定义的,就是闹钟响的时候要做的事。所以它本质上是一个基于时间的“回调”(callback)方法,通过时间的进展来触发;在事件时间语义下就是由水位线(watermark)来触发了。

与.processElement()类似,定时方法.onTimer()也有三个参数:时间戳(timestamp),上下文(ctx),以及收集器(out)。这里的timestamp是指设定好的触发时间,事件时间语义下当然就是水位线了。另外这里同样有上下文和收集器,所以也可以调用定时服务(TimerService),以及任意输出处理之后的数据。既然有.onTimer()方法做定时触发,我们用ProcessFunction也可以自定义数据按照时间分组、定时触发计算输出结果;这其实就实现了窗口(window)的功能。所以说ProcessFunction是真正意义上的终极奥义,用它可以实现一切功能。

这里需要注意的是,上面的.onTimer()方法只是定时器触发时的操作,而定时器(timer)真正的设置需要用到上下文ctx中的定时服务。在Flink中,只有“按键分区流”KeyedStream才支持设置定时器的操作,所以之前的代码中我们并没有使用定时器。所以基于不同类型的流,可以使用不同的处理函数,它们之间还是有一些微小的区别的。接下来我们就介绍一下处理函数的分类。

11.1.3 处理函数的分类

Flink中的处理函数其实是一个大家族,ProcessFunction只是其中一员。

我们知道,DataStream在调用一些转换方法之后,有可能生成新的流类型;例如调用.keyBy()之后得到KeyedStream,进而再调用.window()之后得到WindowedStream。对于不同类型的流,其实都可以直接调用.process()方法进行自定义处理,这时传入的参数就都叫作处理函数。当然,它们尽管本质相同,都是可以访问状态和时间信息的底层API,可彼此之间也会有所差异。

Flink提供了8个不同的处理函数:

代码语言:javascript
复制
(1)ProcessFunction
最基本的处理函数,基于DataStream直接调用.process()时作为参数传入。
(2)KeyedProcessFunction
对流按键分区后的处理函数,基于KeyedStream调用.process()时作为参数传入。要想使用定时器,比如基于KeyedStream。
(3)ProcessWindowFunction
开窗之后的处理函数,也是全窗口函数的代表。基于WindowedStream调用.process()时作为参数传入。
(4)ProcessAllWindowFunction
同样是开窗之后的处理函数,基于AllWindowedStream调用.process()时作为参数传入。
(5)CoProcessFunction
合并(connect)两条流之后的处理函数,基于ConnectedStreams调用.process()时作为参数传入。关于流的连接合并操作,我们会在后续章节详细介绍。
(6)ProcessJoinFunction
间隔连接(interval join)两条流之后的处理函数,基于IntervalJoined调用.process()时作为参数传入。
(7)BroadcastProcessFunction
广播连接流处理函数,基于BroadcastConnectedStream调用.process()时作为参数传入。这里的“广播连接流”BroadcastConnectedStream,是一个未keyBy的普通DataStream与一个广播流(BroadcastStream)做连接(conncet)之后的产物。关于广播流的相关操作,我们会在后续章节详细介绍。
(8)KeyedBroadcastProcessFunction
按键分区的广播连接流处理函数,同样是基于BroadcastConnectedStream调用.process()时作为参数传入。与BroadcastProcessFunction不同的是,这时的广播连接流,是一个KeyedStream与广播流(BroadcastStream)做连接之后的产物。
接下来,我们就对KeyedProcessFunction和ProcessWindowFunction的具体用法展开详细说明。
11.2 按键分区处理函数(KeyedProcessFunction)
在上节中提到,只有在KeyedStream中才支持使用TimerService设置定时器的操作。所以一般情况下,我们都是先做了keyBy分区之后,再去定义处理操作;代码中更加常见的处理函数是KeyedProcessFunction。

11.2.1定时器(Timer)和定时服务(TimerService)

KeyedProcessFunction的一个特色,就是可以使用定时器。

定时器(timers)是处理函数中进行时间相关操作的主要机制。在.onTimer()方法中可以实现定时处理的逻辑,而它能触发的前提,就是之前曾经注册过定时器、并且现在已经到了触发时间。注册定时器的功能,是通过上下文中提供的“定时服务”(TimerService)来实现的。定时服务与当前运行的环境有关。前面已经介绍过,ProcessFunction的上下文(Context)中提供了.timerService()方法,可以直接返回一个TimerService对象。TimerService是Flink关于时间和定时器的基础服务接口,包含以下六个方法:

代码语言:javascript
复制
// 获取当前的处理时间
long currentProcessingTime();

// 获取当前的水位线(事件时间)
long currentWatermark();

// 注册处理时间定时器,当处理时间超过time时触发
void registerProcessingTimeTimer(long time);

// 注册事件时间定时器,当水位线超过time时触发
void registerEventTimeTimer(long time);

// 删除触发时间为time的处理时间定时器
void deleteProcessingTimeTimer(long time);

// 删除触发时间为time的处理时间定时器
void deleteEventTimeTimer(long time);

六个方法可以分成两大类:基于处理时间和基于事件时间。而对应的操作主要有三个:获取当前时间,注册定时器,以及删除定时器。需要注意,尽管处理函数中都可以直接访问TimerService,不过只有基于KeyedStream的处理函数,才能去调用注册和删除定时器的方法;未作按键分区的DataStream不支持定时器操作,只能获取当前时间。

TimerService会以键(key)和时间戳为标准,对定时器进行去重;也就是说对于每个key和时间戳,最多只有一个定时器,如果注册了多次,onTimer()方法也将只被调用一次。这样一来,我们在代码中就方便了很多,可以肆无忌惮地对一个key注册定时器,而不用担心重复定义——因为一个时间戳上的定时器只会触发一次。

11.2.2 KeyedProcessFunction的使用

基于keyBy之后的KeyedStream,直接调用.process()方法,这时需要传入的参数就是KeyedProcessFunction的实现类。

代码语言:javascript
复制
stream.keyBy( t -> t.f0 )
.process(new MyKeyedProcessFunction())

类似地,KeyedProcessFunction也是继承自AbstractRichFunction的一个抽象类,与ProcessFunction的定义几乎完全一样,区别只是在于类型参数多了一个K,这是当前按键分区的key的类型。同样地,我们必须实现一个.processElement()抽象方法,用来处理流中的每一个数据;另外还有一个非抽象方法.onTimer(),用来定义定时器触发时的回调操作。

下面是一个使用定时器的具体示例:

代码语言:javascript
复制
public class ProcessingTimeTimerExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        env
                .addSource(new CustomSource())
                .keyBy(r -> true)
                .process(new KeyedProcessFunction<Boolean, String, String>() {
                    // 每来一条数据都会调用一次
                    @Override
                    public void processElement(String s, Context context, Collector<String> collector) throws Exception {
                        long currTs = context.timerService().currentProcessingTime();
                        collector.collect("数据到达,到达时间是:" + new Timestamp(currTs));
                        // 注册10s之后的定时器
                        context.timerService().registerProcessingTimeTimer(currTs + 10 * 1000L);
                    }

                    // 定时器触发时的操作
                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                        out.collect("定时器触发,触发时间是:" + new Timestamp(timestamp));
                    }
                })
                .print();

        env.execute();
    }

    public static class CustomSource implements SourceFunction<String> {
        @Override
        public void run(SourceFunction.SourceContext<String> ctx) throws Exception {
            ctx.collect("a");
            // 为了让程序不直接退出,等待20秒
            Thread.sleep(20 * 1000L);
        }

        @Override
        public void cancel() {
        }
    }
}

在上面的代码中,我们单独定义了一个数据源。这个数据源只发出一条数据,之后等待20秒,这样程序就不会在处理完数据之后直接退出,方便我们看到定时器的行为。由于定时器只能在KeyedStream上使用,所以先要进行keyBy;这里的.keyBy(r -> true)是将所有数据的key都指定为了true,其实就是所有数据拥有相同的key,会分配到同一个分区。

之后我们自定义了一个KeyedProcessFunction,其中.processElement()方法是每来一个数据都会调用一次,主要是定义了一个10秒之后的定时器;而.onTimer()方法则会在定时器触发时调用。所以我们会看到,程序运行后先在控制台输出“数据到达”的信息,等待10秒之后,又会输出“定时器触发”的信息,打印出的时间间隔正是10秒。

11.3 窗口处理函数

除了KeyedProcessFunction,另外一大类常用的处理函数,就是基于窗口的ProcessWindowFunction和ProcessAllWindowFunction了。在第六章窗口函数的介绍中,我们之前已经简单地使用过窗口处理函数了。

11.3.1 窗口处理函数的使用

进行窗口计算,我们可以直接调用现成的简单聚合方法(sum/max/min),也可以通过调用.reduce()或.aggregate()来自定义一般的增量聚合函数(ReduceFunction/AggregateFucntion);而对于更加复杂、需要窗口信息和额外状态的一些场景,我们还可以直接使用全窗口函数、把数据全部收集保存在窗口内,等到触发窗口计算时再统一处理。窗口处理函数就是一种典型的全窗口函数。

窗口处理函数ProcessWindowFunction的使用与其他窗口函数类似,也是基于WindowedStream直接调用方法就可以,只不过这时调用的是.process()。

代码语言:javascript
复制
stream.keyBy( t -> t.f0 )
        .window( TumblingEventTimeWindows.of(Time.seconds(10)) )
        .process(new MyProcessWindowFunction())

11.3.2 ProcessWindowFunction解析

ProcessWindowFunction既是处理函数又是全窗口函数。从名字上也可以推测出,它的本质似乎更倾向于“窗口函数”一些。事实上它的用法也确实跟其他处理函数有很大不同。我们可以从源码中的定义看到这一点:

代码语言:javascript
复制
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window>
        extends AbstractRichFunction {
...
public abstract void process(
        KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
public void clear(Context context) throws Exception {}
public abstract class Context implements java.io.Serializable {...}
}

ProcessWindowFunction依然是一个继承了AbstractRichFunction的抽象类,它有四个类型参数:

代码语言:javascript
复制
IN:input,数据流中窗口任务的输入数据类型。
OUT:output,窗口任务进行计算之后的输出数据类型。
KEY:数据中键key的类型。
W:窗口的类型,是Window的子类型。一般情况下我们定义时间窗口,W就是TimeWindow。
而内部定义的方法,跟我们之前熟悉的处理函数就有所区别了。因为全窗口函数不是逐个处理元素的,所以处理数据的方法在这里并不是.processElement(),而是改成了.process()。方法包含四个参数。
key:窗口做统计计算基于的键,也就是之前keyBy用来分区的字段。
context:当前窗口进行计算的上下文,它的类型就是ProcessWindowFunction内部定义的抽象类Context。
elements:窗口收集到用来计算的所有数据,这是一个可迭代的集合类型。
out:用来发送数据输出计算结果的收集器,类型为Collector。
可以明显看出,这里的参数不再是一个输入数据,而是窗口中所有数据的集合。而上下文context所包含的内容也跟其他处理函数有所差别:
代码语言:javascript
复制
public abstract class Context implements java.io.Serializable {
    public abstract W window();

    public abstract long currentProcessingTime();
    public abstract long currentWatermark();

    public abstract KeyedStateStore windowState();
    public abstract KeyedStateStore globalState();
    public abstract <X> void output(OutputTag<X> outputTag, X value);
}

除了可以通过.output()方法定义侧输出流不变外,其他部分都有所变化。这里不再持有TimerService对象,只能通过currentProcessingTime()和currentWatermark()来获取当前时间,所以失去了设置定时器的功能;另外由于当前不是只处理一个数据,所以也不再提供.timestamp()方法。与此同时,也增加了一些获取其他信息的方法:比如可以通过.window()直接获取到当前的窗口对象,也可以通过.windowState()和.globalState()获取到当前自定义的窗口状态和全局状态。注意这里的“窗口状态”是自定义的,不包括窗口本身已经有的状态,针对当前key、当前窗口有效;而“全局状态”同样是自定义的状态,针对当前key的所有窗口有效。

所以我们会发现,ProcessWindowFunction中除了.process()方法外,并没有.onTimer()方法,而是多出了一个.clear()方法。从名字就可以看出,这主要是方便我们进行窗口的清理工作。如果我们自定义了窗口状态,那么必须在.clear()方法中进行显式地清除,避免内存溢出。

至于另一种窗口处理函数ProcessAllWindowFunction,它的用法非常类似。区别在于它基于的是AllWindowedStream,相当于对没有keyBy的数据流直接开窗并调用.process()方法:

代码语言:javascript
复制
stream.windowAll( TumblingEventTimeWindows.of(Time.seconds(10)) )
.process(new MyProcessAllWindowFunction())

11.4 应用案例——Top N

窗口的计算处理,在实际应用中非常常见。对于一些比较复杂的需求,如果增量聚合函数无法满足,我们就需要考虑使用窗口处理函数这样的“大招”了。网站中一个非常经典的例子,就是实时统计一段时间内的热门url。例如,需要统计最近10秒钟内最热门的两个url链接,并且每5秒钟更新一次。我们知道,这可以用一个滑动窗口来实现,而“热门度”一般可以直接用访问量来表示。于是就需要开滑动窗口收集url的访问数据,按照不同的url进行统计,而后汇总排序并最终输出前两名。这其实就是著名的“Top N”问题。很显然,简单的增量聚合可以得到url链接的访问量,但是后续的排序输出Top N就很难实现了。所以接下来我们用窗口处理函数进行实现。

11.4.1 使用ProcessAllWindowFunction

一种最简单的想法是,我们干脆不区分url链接,而是将所有访问数据都收集起来,统一进行统计计算。所以可以不做keyBy,直接基于DataStream开窗,然后使用全窗口函数ProcessAllWindowFunction来进行处理。在窗口中可以用一个HashMap来保存每个url的访问次数,只要遍历窗口中的所有数据,自然就能得到所有url的热门度。最后把HashMap转成一个列表ArrayList,然后进行排序、取出前两名输出就可以了。代码具体实现如下:

代码语言:javascript
复制
public class ProcessAllWindowTopN {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Event> pvStream = env
                .addSource(new ClickSource())
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                                    @Override
                                    public long extractTimestamp(Event element, long recordTimestamp) {
                                        return element.timestamp;
                                    }
                                })
                );

        pvStream
                .map(new MapFunction<Event, String>() {
                    @Override
                    public String map(Event value) throws Exception {
                        return value.url;
                    }
                })    // 只需要url,提取转换成String
                .windowAll(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))    // 开滑动窗口
                .process(new ProcessAllWindowFunction<String, String, TimeWindow>() {
                    @Override
                    public void process(Context context, Iterable<String> elements, Collector<String> out) throws Exception {
                        HashMap<String, Long> urlCountMap = new HashMap<>();
                        // 遍历窗口中数据,将浏览量保存到HashMap中
                        for (String url : elements) {
                            if (urlCountMap.containsKey(url)) {
                                long count = urlCountMap.get(url);
                                urlCountMap.put(url, count + 1L);
                            } else {
                                urlCountMap.put(url, 1L);
                            }
                        }
                        ArrayList<Tuple2<String, Long>> mapList = new ArrayList<Tuple2<String, Long>>();
                        // 将浏览量数据放入ArrayList,进行排序
                        for (String key : urlCountMap.keySet()) {
                            mapList.add(Tuple2.of(key, urlCountMap.get(key)));
                        }
                        mapList.sort(new Comparator<Tuple2<String, Long>>() {
                            @Override
                            public int compare(Tuple2<String, Long> o1, Tuple2<String, Long> o2) {
                                return o2.f1.intValue() - o1.f1.intValue();
                            }
                        });
                        StringBuilder result = new StringBuilder();
                        result.append("========================================\n");
                        // 取前两名,构建输出结果
                        for (int i = 0; i < 2; i++) {
                            Tuple2<String, Long> temp = mapList.get(i);
                            result
                                    .append("浏览量No." + (i + 1) + " ")
                                    .append("url:" + temp.f0 + " ")
                                    .append("浏览量:" + temp.f1 + " ")
                                    .append("窗口结束时间:" + new Timestamp(context.window().getEnd()) + "\n");
                        }
                        result
                                .append("========================================\n\n\n");
                        out.collect(result.toString());
                    }
                })
                .print();
        env.execute();
    }
}

11.4.2 使用KeyedProcessFunction

在上一小节的实现过程中,我们没有进行按键分区,直接将所有数据放在一个分区上进行了开窗操作。这相当于将并行度强行设置为1,在实际应用中是要尽量避免的,所以Flink官方也并不推荐使用AllWindowedStream进行处理。另外,我们在全窗口函数中定义了HashMap来统计url链接的浏览量,计算过程是要先收集齐所有数据、然后再逐一遍历更新HashMap,这显然不够高效。基于这样的想法,我们可以从两个方面去做优化:一是对数据进行按键分区,分别统计浏览量;二是进行增量聚合,得到结果最后再做排序输出。所以,我们可以使用增量聚合函数AggregateFunction进行浏览量的统计,然后结合ProcessWindowFunction排序输出来实现Top N的需求。具体实现可以分成两步:先对每个url链接统计出浏览量,然后再将统计结果收集起来,排序输出最终结果。由于最后的排序还是基于每个时间窗口的,所以为了让输出的统计结果中包含窗口信息,我们可以借用第六章中定义的POJO类UrlViewCount来表示,它包含了url、浏览量(count)以及窗口的起始结束时间。之后对UrlViewCount的处理,可以先按窗口分区,然后用KeyedProcessFunction来实现。

代码语言:javascript
复制
总结处理流程如下:
(1)读取数据源;
(2)筛选浏览行为(pv);
(3)提取时间戳并生成水位线;
(4)按照url进行keyBy分区操作;
(5)开长度为1小时、步长为5分钟的事件时间滑动窗口;
(6)使用增量聚合函数AggregateFunction,并结合全窗口函数WindowFunction进行窗口聚合,得到每个url、在每个统计窗口内的浏览量,包装成UrlViewCount;
(7)按照窗口进行keyBy分区操作;
(8)对同一窗口的统计结果数据,使用KeyedProcessFunction进行收集并排序输出。
用KeyedProcessFunction来收集数据做排序,这时面对的是窗口聚合之后的数据流,而窗口已经不存在了;我们需要确保能够收集齐所有数据,所以应该在窗口结束时间基础上再“多等一会儿”。具体实现上,可以采用一个延迟触发的事件时间定时器。基于窗口的结束时间来设定延迟,其实并不需要等太久——因为我们是靠水位线的推进来触发定时器,而水位线的含义就是“之前的数据都到齐了”。所以我们只需要设置1毫秒的延迟,就一定可以保证这一点。而在等待过程中,之前已经到达的数据应该缓存起来,我们这里用一个自定义的“列表状态”(ListState)来进行存储,如图所示。这个状态需要使用富函数类的getRuntimeContext()方法获取运行时上下文来定义,我们一般把它放在open()生命周期方法中。之后每来一个UrlViewCount,就把它添加到当前的列表状态中,并注册一个触发时间为窗口结束时间加1毫秒(windowEnd + 1)的定时器。待到水位线到达这个时间,定时器触发,我们可以保证当前窗口所有url的统计结果UrlViewCount都到齐了;于是从状态中取出进行排序输出。

具体代码实现如下:

代码语言:javascript
复制
public class KeyedProcessTopN {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Event> pvStream = env
                .addSource(new ClickSource())
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                                    @Override
                                    public long extractTimestamp(Event element, long recordTimestamp) {
                                        return element.timestamp;
                                    }
                                })
                );

        // 先求出每个url在每个窗口的浏览量
        SingleOutputStreamOperator<UrlViewCount> uvcStream = pvStream
                .keyBy(r -> r.url)
                .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                .aggregate(new CountAgg(), new WindowResult());    // 增量聚合,并结合全窗口函数包装UrlViewCount

        // 针对同一个窗口中的不同url的UrlViewCount次数,进行排序输出
        KeyedStream<UrlViewCount, Long> uvcKeyedStream = uvcStream
                .keyBy(r -> r.windowEnd);

        uvcKeyedStream
                .process(new TopN(2))
                .print();

        env.execute();
    }

    public static class TopN extends KeyedProcessFunction<Long, UrlViewCount, String> {
        // 列表状态变量
        private ListState<UrlViewCount> UrlViewCountListState;
        private Integer threshold;

        public TopN(Integer threshold) {
            this.threshold = threshold;
        }
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            // 声明一个列表状态,保存已经到达的统计结果
            UrlViewCountListState = getRuntimeContext().getListState(
                    new ListStateDescriptor<UrlViewCount>("list-state", Types.POJO(UrlViewCount.class))
            );
        }

        @Override
        public void processElement(UrlViewCount value, Context ctx, Collector<String> out) throws Exception {
            // 添加到列表状态变量中
            UrlViewCountListState.add(value);
            // 水位线达到 窗口结束时间 + 1毫秒 时触发定时器来进行排序
            ctx.timerService().registerEventTimeTimer(value.windowEnd + 1);
        }

        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
            super.onTimer(timestamp, ctx, out);
            // 将数据从列表状态变量中取出,放入ArrayList
            ArrayList<UrlViewCount> UrlViewCountArrayList = new ArrayList<>();
            for (UrlViewCount uvc : UrlViewCountListState.get()) {
                UrlViewCountArrayList.add(uvc);
            }
            // 清空状态释放资源
            UrlViewCountListState.clear();

            // 排序
            UrlViewCountArrayList.sort(new Comparator<UrlViewCount>() {
                @Override
                public int compare(UrlViewCount o1, UrlViewCount o2) {
                    return o2.count.intValue() - o1.count.intValue();
                }
            });

            // 取前三名,构建输出结果
            StringBuilder result = new StringBuilder();
            result.append("========================================\n");
            for (int i = 0; i < this.threshold; i++) {
                UrlViewCount UrlViewCount = UrlViewCountArrayList.get(i);
                result
                        .append("浏览量No." + (i + 1) + " ")
                        .append("url:" + UrlViewCount.url + " ")
                        .append("浏览量:" + UrlViewCount.count + " ")
                        .append("窗口结束时间:" + new Timestamp(timestamp - 1) + "\n");
            }
            result
                    .append("========================================\n\n\n");
            out.collect(result.toString());
        }
    }

    public static class CountAgg implements AggregateFunction<Event, Long, Long> {
        @Override
        public Long createAccumulator() {
            return 0L;
        }

        @Override
        public Long add(Event value, Long accumulator) {
            return accumulator + 1L;
        }

        @Override
        public Long getResult(Long accumulator) {
            return accumulator;
        }

        @Override
        public Long merge(Long a, Long b) {
            return null;
        }
    }

    public static class WindowResult extends ProcessWindowFunction<Long, UrlViewCount, String, TimeWindow> {
        @Override
        public void process(String s, Context context, Iterable<Long> elements, Collector<UrlViewCount> out) throws Exception {
            out.collect(new UrlViewCount(s, elements.iterator().next(), context.window().getStart(), context.window().getEnd()));
        }
    }

	public static class UrlViewCount {
		public String url;
		public Long count;
		public Long windowStart;
		public Long windowEnd;
	
		public UrlViewCount() {
		}
	
		public UrlViewCount(String url, Long count, Long windowStart, Long windowEnd) {
			this.url = url;
			this.count = count;
			this.windowStart = windowStart;
			this.windowEnd = windowEnd;
		}
	
		@Override
		public String toString() {
			return "UrlViewCount{" +
					"url='" + url + '\'' +
					", count=" + count +
					", windowStart=" + new Timestamp(windowStart) +
					", windowEnd=" + new Timestamp(windowEnd) +
					'}';
		}
	}
}

我们在上面的代码中使用了后面要讲解的ListState。这里可以先简单说明一下。我们先声明一个列表状态变量:

代码语言:javascript
复制
private ListState<Event> UrlViewCountListState;

然后在open方法中初始化了列表状态变量,我们初始化的时候使用了ListStateDescriptor描述符,这个描述符用来告诉Flink列表状态变量的名字和类型。列表状态变量是单例,也就是说只会被实例化一次。这个列表状态变量的作用域是当前key所对应的逻辑分区。我们使用add方法向列表状态变量中添加数据,使用get方法读取列表状态变量中的所有元素。

11.5 侧输出流(Side Output)

处理函数还有另外一个特有功能,就是将自定义的数据放入“侧输出流”(side output)输出。这个概念我们并不陌生,之前在讲到窗口处理迟到数据时,最后一招就是输出到侧输出流。而这种处理方式的本质,其实就是处理函数的侧输出流功能。我们之前讲到的绝大多数转换算子,输出的都是单一流,流里的数据类型只能有一种。而侧输出流可以认为是“主流”上分叉出的“支流”,所以可以由一条流产生出多条流,而且这些流中的数据类型还可以不一样。利用这个功能可以很容易地实现“分流”操作。具体应用时,只要在处理函数的.processElement()或者.onTimer()方法中,调用上下文的.output()方法就可以了。

代码语言:javascript
复制
DataStream<Integer> stream = env.addSource(...);
SingleOutputStreamOperator<Long> longStream = stream.process(new ProcessFunction<Integer, Long>() {
      @Override
      public void processElement( Integer value, Context ctx, Collector<Integer> out) throws Exception {
        // 转换成Long,输出到主流中
        out.collect(Long.valueOf(value));
        // 转换成String,输出到侧输出流中
        ctx.output(outputTag, "side-output: " + String.valueOf(value));
      }
});

这里output()方法需要传入两个参数,第一个是一个“输出标签”OutputTag,用来标识侧输出流,一般会在外部统一声明;第二个就是要输出的数据。

我们可以在外部先将OutputTag声明出来:

代码语言:javascript
复制
OutputTag<String> outputTag = new OutputTag<String>("side-output") {};

如果想要获取这个侧输出流,可以基于处理之后的DataStream直接调用.getSideOutput()方法,传入对应的OutputTag,这个方式与窗口API中获取侧输出流是完全一样的。

代码语言:javascript
复制
DataStream<String> stringStream = longStream.getSideOutput(outputTag);

十二、多流转换

多流转换可以分为“分流”和“合流”两大类。目前分流的操作一般是通过侧输出流(side output)来实现,而合流的算子比较丰富,根据不同的需求可以调用union、connect、join以及coGroup等接口进行连接合并操作。

12.1 分流

所谓“分流”,就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream,得到完全平等的多个子DataStream,如图所示。一般来说,我们会定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里。

12.1.1 简单实现

其实根据条件筛选数据的需求,本身非常容易实现:只要针对同一条流多次独立调用.filter()方法进行筛选,就可以得到拆分之后的流了。例如,我们可以将电商网站收集到的用户行为数据进行一个拆分,根据类型(type)的不同,分为“Mary”的浏览数据、“Bob”的浏览数据等等。那么代码就可以这样实现:

代码语言:javascript
复制
public class SplitStreamByFilter {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<Event> stream = env
                .addSource(new ClickSource());
        // 筛选Mary的浏览行为放入MaryStream流中
        DataStream<Event> MaryStream = stream.filter(new FilterFunction<Event>() {
            @Override
            public boolean filter(Event value) throws Exception {
                return value.user.equals("Mary");
            }
        });
        // 筛选Bob的购买行为放入BobStream流中
        DataStream<Event> BobStream = stream.filter(new FilterFunction<Event>() {
            @Override
            public boolean filter(Event value) throws Exception {
                return value.user.equals("Bob");
            }
        });
        // 筛选其他人的浏览行为放入elseStream流中
        DataStream<Event> elseStream = stream.filter(new FilterFunction<Event>() {
            @Override
            public boolean filter(Event value) throws Exception {
                return !value.user.equals("Mary") && !value.user.equals("Bob") ;
            }
        });

        MaryStream.print("Mary pv");
        BobStream.print("Bob pv");
        elseStream.print("else pv");

        env.execute();

    }

    // 这里重新实现了ClickSource(),因为我们在自定义数据源中发送了水位线
    public static class ClickSource implements SourceFunction<Event> {
        private boolean running = true;
        @Override
        public void run(SourceContext<Event> sourceContext) throws Exception {
            Random random = new Random();
            String[] userArr = {"Mary", "Bob", "Alice"};
            String[] urlArr  = {"./home", "./cart", "./prod?id=1"};
            while (running) {
                long currTs = Calendar.getInstance().getTimeInMillis(); // 毫秒时间戳
                String username = userArr[random.nextInt(userArr.length)];
                String url      = urlArr[random.nextInt(urlArr.length)];
                Event event = new Event(username, url, currTs);
                // 使用collectWithTimestamp方法将数据发送出去,并指明数据中的时间戳的字段
                sourceContext.collectWithTimestamp(event, event.timestamp);
                // 发送水位线
                sourceContext.emitWatermark(new Watermark(event.timestamp - 1L));
                Thread.sleep(1000L);
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    }
}

这种实现非常简单,但代码显得有些冗余——我们的处理逻辑对拆分出的三条流其实是一样的,却重复写了三次。而且这段代码背后的含义,是将原始数据流stream复制三份,然后对每一份分别做筛选;这明显是不够高效的。我们自然想到,能不能不用复制流,直接用一个算子就把它们都拆分开呢?在早期的版本中,DataStream API中提供了一个.split()方法,专门用来将一条流“切分”成多个。它的基本思路其实就是按照给定的筛选条件,给数据分类“盖戳”;然后基于这条盖戳之后的流,分别拣选想要的“戳”就可以得到拆分后的流。这样我们就不必再对流进行复制了。不过这种方法有一个缺陷:因为只是“盖戳”拣选,所以无法对数据进行转换,分流后的数据类型必须跟原始流保持一致。这就极大地限制了分流操作的应用场景。现在split方法已经淘汰掉了,我们以后分流只使用下面要讲的侧输出流。

12.1.2 使用侧输出流

在Flink 1.13版本中,已经弃用了.split()方法,取而代之的是直接用处理函数(process function)的侧输出流(side output)。关于处理函数中侧输出流的用法,我们已经在7.5节做了详细介绍。简单来说,只需要调用上下文ctx的.output()方法,就可以输出任意类型的数据了。而侧输出流的标记和提取,都离不开一个“输出标签”(OutputTag),它就相当于split()分流时的“戳”,指定了侧输出流的id和类型。

我们可以使用侧输出流将上一小节的分流代码改写如下:

代码语言:javascript
复制
public class SplitStreamByOutputTag {
    // 定义输出标签,侧输出流的数据类型为三元组(user, url, timestamp)
    private static OutputTag<Tuple3<String, String, Long>> MaryTag = new OutputTag<Tuple3<String, String, Long>>("Mary-pv"){};
    private static OutputTag<Tuple3<String, String, Long>> BobTag = new OutputTag<Tuple3<String, String, Long>>("Bob-pv"){};
    
public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<Event> stream = env
                .addSource(new ClickSource());

        SingleOutputStreamOperator<Event> processedStream = stream.process(new ProcessFunction<Event, Event>() {
            @Override
            public void processElement(Event value, Context ctx, Collector<Event> out) throws Exception {
                if (value.user.equals("Mary")){
                    ctx.output(MaryTag, new Tuple3<>(value.user, value.url, value.timestamp));
                } else if (value.user.equals("Bob")){
                    ctx.output(BobTag, new Tuple3<>(value.user, value.url, value.timestamp));
                } else {
                    out.collect(value);
                }
            }
        });

        processedStream.getSideOutput(MaryTag).print("Mary pv");
        processedStream.getSideOutput(BobTag).print("Bob pv");
        processedStream.print("else");

        env.execute();
    }

    // 这里重新实现了ClickSource(),因为我们在自定义数据源中发送了水位线
    public static class ClickSource implements SourceFunction<Event> {
        private boolean running = true;
        @Override
        public void run(SourceContext<Event> sourceContext) throws Exception {
            Random random = new Random();
            String[] userArr = {"Mary", "Bob", "Alice"};
            String[] urlArr  = {"./home", "./cart", "./prod?id=1"};
            while (running) {
                long currTs = Calendar.getInstance().getTimeInMillis(); // 毫秒时间戳
                String username = userArr[random.nextInt(userArr.length)];
                String url      = urlArr[random.nextInt(urlArr.length)];
                Event event = new Event(username, url, currTs);
                // 使用collectWithTimestamp方法将数据发送出去,并指明数据中的时间戳的字段
                sourceContext.collectWithTimestamp(event, event.timestamp);
                // 发送水位线
                sourceContext.emitWatermark(new Watermark(event.timestamp - 1L));
                Thread.sleep(1000L);
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    }
}

12.2 基本合流操作

在实际应用中,我们经常会遇到来源不同的多条流,需要将它们的数据进行联合处理。所以Flink中合流的操作会更加普遍,对应的API也更加丰富。

12.2.1 联合(Union)

最简单的合流操作,就是直接将多条流合在一起,叫作流的“联合”(union),如图所示。联合操作要求必须流中的数据类型必须相同,合并之后的新流会包括所有流中的元素,数据类型不变。

在代码中,我们只要基于DataStream直接调用.union()方法,传入其他DataStream作为参数,就可以实现流的联合了;得到的依然是一个DataStream:

代码语言:javascript
复制
stream1.union(stream2, stream3, ...)

注意:union()的参数可以是多个DataStream,所以联合操作可以实现多条流的合并。

这里需要考虑一个问题。在事件时间语义下,水位线是时间的进度标志;不同的流中可能水位线的进展快慢完全不同,如果它们合并在一起,水位线又该以哪个为准呢?还以要考虑水位线的本质含义,是“之前的所有数据已经到齐了”;所以对于合流之后的水位线,也是要以最小的那个为准,这样才可以保证所有流都不会再传来之前的数据。换句话说,多流合并时处理的时效性是以最慢的那个流为准的。我们自然可以想到,这与之前介绍的并行任务水位线传递的规则是完全一致的;多条流的合并,某种意义上也可以看作是多个并行任务向同一个下游任务汇合的过程。

我们可以用下面的代码做一个简单测试:

代码语言:javascript
复制
public class UnionExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Tuple2<String, Long>> stream1 = env
                .socketTextStream("localhost", 8888)
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> map(String value) throws Exception {
                        String[] arr = value.split(" ");
                        return Tuple2.of(arr[0], Long.parseLong(arr[1]) * 1000L);
                    }
                })
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Tuple2<String, Long>>forMonotonousTimestamps()
                                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                                    @Override
                                    public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                                        return element.f1;
                                    }
                                })
                );

        SingleOutputStreamOperator<Tuple2<String, Long>> stream2 = env
                .socketTextStream("localhost", 9999)
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> map(String value) throws Exception {
                        String[] arr = value.split(" ");
                        return Tuple2.of(arr[0], Long.parseLong(arr[1]) * 1000L);
                    }
                })
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Tuple2<String, Long>>forMonotonousTimestamps()
                                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                                    @Override
                                    public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                                        return element.f1;
                                    }
                                })
                );

        // 合并两条流进行处理
        stream1
                .union(stream2)
                .process(new ProcessFunction<Tuple2<String, Long>, String>() {
                    @Override
                    public void processElement(Tuple2<String, Long> in, Context context, Collector<String> collector) throws Exception {
                        collector.collect("当前水位线是:" + context.timerService().currentWatermark());
                    }
                })
                .print();

        env.execute();
    }
}

这里为了更清晰地看到水位线的进展,我们创建了两条流来读取socket文本数据,并从数据中提取时间戳作为生成水位线的依据。用union将两条流合并后,用一个ProcessFunction来进行处理,获取当前的水位线进行输出。我们会发现两条流中每输入一个数据,合并之后的流中都会有数据出现;而水位线只有在两条流中水位线最小值增大的时候,才会真正向前推进。

12.2.2 连接(Connect)

流的联合虽然简单,不过受限于数据类型不能改变,灵活性大打折扣,所以实际应用较少出现。除了联合(union),Flink还提供了另外一种方便的合流操作——连接(connect)。

1. 连接流(ConnectedStreams)为了处理更加灵活,连接操作允许流的数据类型不同。但我们知道一个DataStream中的数据只能有唯一的类型,所以连接得到的并不是DataStream,而是一个“连接流”(ConnectedStreams)。连接流可以看成是两条流形式上的“统一”,被放在了一个同一个流中;事实上内部仍保持各自的数据形式不变,彼此之间是相互独立的。要想得到新的DataStream,还需要进一步定义一个“同处理”(co-process)转换操作,用来说明对于不同来源、不同类型的数据,怎样分别进行处理转换、得到统一的输出类型。所以整体上来,两条流的连接就像是“一国两制”,两条流可以保持各自的数据类型、处理方式也可以不同,不过最终还是会统一到同一个DataStream中,如图所示。

在代码实现上,需要分为两步:首先基于一条DataStream调用.connect()方法,传入另外一条DataStream作为参数,将两条流连接起来,得到一个ConnectedStreams;然后再调用同处理方法得到DataStream。这里可以的调用的同处理方法有.map()/.flatMap(),以及.process()方法。

代码语言:javascript
复制
public class CoMapExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStream<Integer> stream1 = env.fromElements(1,2,3);
        DataStream<Long> stream2 = env.fromElements(1L,2L,3L);

        ConnectedStreams<Integer, Long> connectedStreams = stream1.connect(stream2);
        SingleOutputStreamOperator<String> result = connectedStreams.map(new CoMapFunction<Integer, Long, String>() {
            @Override
            public String map1(Integer value) {
                return "Integer: " + value;
            }

            @Override
            public String map2(Long value) {
                return "Long: " + value;
            }
        });

        result.print();

        env.execute();
    }
}

上面的代码中,ConnectedStreams有两个类型参数,分别表示内部包含的两条流各自的数据类型;由于需要“一国两制”,因此调用.map()方法时传入的不再是一个简单的MapFunction,而是一个CoMapFunction,表示分别对两条流中的数据执行map操作。这个接口有三个类型参数,依次表示第一条流、第二条流,以及合并后的流中的数据类型。需要实现的方法也非常直白:.map1()就是对第一条流中数据的map操作,.map2()则是针对第二条流。

值得一提的是,ConnectedStreams也可以直接调用.keyBy()进行按键分区的操作,得到的还是一个ConnectedStreams:

connectedStreams.keyBy(keySelector1, keySelector2); 这里传入两个参数keySelector1和keySelector2,是两条流中各自的键选择器;当然也可以直接传入键的位置值(keyPosition),或者键的字段名(field),这与普通的keyBy用法完全一致。ConnectedStreams进行keyBy操作,其实就是把两条流中key相同的数据放到了一起,然后针对来源的流再做各自处理,这在一些场景下非常有用。

2. CoProcessFunction对于连接流ConnectedStreams的处理操作,需要分别定义对两条流的处理转换,因此接口中就会有两个相同的方法需要实现,用数字“1”“2”区分,在两条流中的数据到来时分别调用。我们把这种接口叫作“协同处理函数”(co-process function)。与CoMapFunction类似,如果是调用.flatMap()就需要传入一个CoFlatMapFunction,需要实现flatMap1()、flatMap2()两个方法;而调用.process()时,传入的则是一个CoProcessFunction。

CoProcessFunction也是“处理函数”家族中的一员,用法非常相似。它需要实现的就是processElement1()、processElement2()两个方法,在每个数据到来时,会根据来源的流调用其中的一个方法进行处理。CoProcessFunction同样可以通过上下文ctx来访问timestamp、水位线,并通过TimerService注册定时器;另外也提供了.onTimer()方法,用于定义定时触发的处理操作。

下面是CoProcessFunction的一个具体示例:我们可以实现一个实时对账的需求,也就是app的支付操作和第三方的支付操作的一个双流Join。App的支付事件和第三方的支付事件将会互相等待5秒钟,如果等不来对应的支付事件,那么就输出报警信息。程序如下:

代码语言:javascript
复制
// 实时对账
public class BillCheckExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Tuple3<String, String, Long>> appStream = env
                .fromElements(
                        Tuple3.of("order-1", "app", 1000L),
                        Tuple3.of("order-2", "app", 2000L)
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Tuple3<String, String, Long>>forMonotonousTimestamps()
                                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
                                    @Override
                                    public long extractTimestamp(Tuple3<String, String, Long> in, long l) {
                                        return in.f2;
                                    }
                                })
                );

        SingleOutputStreamOperator<Tuple3<String, String, Long>> thirdPartyStream = env
                .fromElements(
                        Tuple3.of("order-1", "third-party", 3000L),
                        Tuple3.of("order-3", "third-party", 4000L)
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Tuple3<String, String, Long>>forMonotonousTimestamps()
                                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
                                    @Override
                                    public long extractTimestamp(Tuple3<String, String, Long> in, long l) {
                                        return in.f2;
                                    }
                                })
                );

        appStream.keyBy(r -> r.f0).connect(thirdPartyStream.keyBy(r -> r.f0)).process(new Match()).print();

        env.execute();
    }

    public static class Match extends CoProcessFunction<Tuple3<String, String, Long>, Tuple3<String, String, Long>, String> {
        // 声明状态变量用来保存app的支付事件
        private ValueState<Tuple3<String, String, Long>> appEvent;
        // 声明状态变量用来保存第三方平台的到账事件
        private ValueState<Tuple3<String, String, Long>> thirdPartyEvent;
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            appEvent = getRuntimeContext().getState(
                    new ValueStateDescriptor<Tuple3<String, String, Long>>("app", Types.TUPLE(
                            Types.STRING, Types.STRING, Types.LONG
                    ))
            );
            thirdPartyEvent = getRuntimeContext().getState(
                    new ValueStateDescriptor<Tuple3<String, String, Long>>("third-party", Types.TUPLE(
                            Types.STRING, Types.STRING, Types.LONG
                    ))
            );
        }

        @Override
        public void processElement1(Tuple3<String, String, Long> app, Context context, Collector<String> collector) throws Exception {
            if (thirdPartyEvent.value() != null) {
                collector.collect(app.f0 + " 对账成功");
                thirdPartyEvent.clear();
            } else {
                appEvent.update(app);
                context.timerService().registerEventTimeTimer(app.f2 + 5000L);
            }
        }

        @Override
        public void processElement2(Tuple3<String, String, Long> thirdParty, Context context, Collector<String> collector) throws Exception {
            if (appEvent.value() != null) {
                collector.collect(thirdParty.f0 + " 对账成功");
                appEvent.clear();
            } else {
                thirdPartyEvent.update(thirdParty);
                context.timerService().registerEventTimeTimer(thirdParty.f2 + 5000L);
            }
        }

        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
            super.onTimer(timestamp, ctx, out);
            if (appEvent.value() != null) {
                out.collect(appEvent.value().f0 + "对账失败,订单的第三方支付信息未到");
                appEvent.clear();
            }
            if (thirdPartyEvent.value() != null) {
                out.collect(thirdPartyEvent.value().f0 + "对账失败,订单的app支付信息未到");
                thirdPartyEvent.clear();
            }
        }
    }
}

在程序中,我们声明了两个值状态(ValueState)分别用来保存App的支付信息和第三方的支付信息。App的支付信息到达以后,会检查对应的第三方支付信息是否已经先到达(先到达会保存在对应的状态变量中),如果已经到达了,那么对账成功,直接输出对账成功的信息,并将保存第三方支付消息的状态变量清空。如果App对应的第三方支付信息没有到来,那么我们会注册一个5秒钟之后的定时器,也就是说等待第三方支付事件5秒钟。当定时器触发时,检查保存app支付信息的状态变量是否还在,如果还在,说明对应的第三方支付信息没有到来,所以输出报警信息。

12.3 基于时间的合流——双流联结(Join)

可以发现,根据某个key合并两条流,与关系型数据库中表的join操作非常相近。事实上,Flink中两条流的connect操作,就可以通过keyBy指定键进行分组后合并,实现了类似于SQL中的join操作;另外connect支持处理函数,可以使用自定义状态和TimerService灵活实现各种需求,其实已经能够处理双流join的大多数场景。

不过处理函数是底层接口,所以尽管connect能做的事情多,但在一些具体应用场景下还是显得太过抽象了。比如,如果我们希望统计固定时间内两条流数据的匹配情况,那就需要设置定时器、自定义触发逻辑来实现——其实这完全可以用窗口(window)来表示。为了更方便地实现基于时间的合流操作,Flink的DataStrema API提供了内置的join算子。

12.3.1 窗口联结(Window Join)

Flink为基于一段时间的双流合并专门提供了一个窗口联结(window join)算子,可以定义时间窗口,并将两条流中共享一个公共键(key)的数据放在窗口中进行配对处理。

1. 窗口联结的调用窗口联结在代码中的实现,首先需要调用DataStream的.join()方法来合并两条流,得到一个JoinedStreams;接着通过.where()和.equalTo()方法指定两条流中联结的key;然后通过.window()开窗口,并调用.apply()传入联结窗口函数进行处理计算。通用调用形式如下:

代码语言:javascript
复制
stream1.join(stream2)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(<WindowAssigner>)
    .apply(<JoinFunction>)

上面代码中.where()的参数是键选择器(KeySelector),用来指定第一条流中的key;而.equalTo()传入的KeySelector则指定了第二条流中的key。两者相同的元素,如果在同一窗口中,就可以匹配起来,并通过一个“联结函数”(JoinFunction)进行处理了。

这里.window()传入的就是窗口分配器,之前讲到的三种时间窗口都可以用在这里:滚动窗口(tumbling window)、滑动窗口(sliding window)和会话窗口(session window)。

而后面调用.apply()可以看作实现了一个特殊的窗口函数。注意这里只能调用.apply(),没有其他替代的方法。

传入的JoinFunction也是一个函数类接口,使用时需要实现内部的.join()方法。这个方法有两个参数,分别表示两条流中成对匹配的数据。其实仔细观察可以发现,窗口join的调用语法和我们熟悉的SQL中表的join非常相似:

代码语言:javascript
复制
SELECT * FROM table1 t1, table2 t2 WHERE t1.id = t2.id;

这句SQL中where子句的表达,等价于inner join ... on,所以本身表示的是两张表基于id的“内连接”(inner join)。而Flink中的window join,同样类似于inner join。也就是说,最后处理输出的,只有两条流中数据按key配对成功的那些;如果某个窗口中一条流的数据没有任何另一条流的数据匹配,那么就不会调用JoinFunction的.join()方法,也就没有任何输出了。

2. 窗口联结实例在电商网站中,往往需要统计用户不同行为之间的转化,这就需要对不同的行为数据流,按照用户ID进行分组后再合并,以分析它们之间的关联。如果这些是以固定时间周期(比如1小时)来统计的,那我们就可以使用窗口join来实现这样的需求。

下面是一段示例代码:

代码语言:javascript
复制
// 基于窗口的join
public class WindowJoinExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStream<Tuple2<String, Long>> stream1 = env
                .fromElements(
                        Tuple2.of("a", 1000L),
                        Tuple2.of("b", 1000L),
                        Tuple2.of("a", 2000L),
                        Tuple2.of("b", 2000L)
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<Tuple2<String, Long>>forMonotonousTimestamps()
                                .withTimestampAssigner(
                                        new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                                            @Override
                                            public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
                                                return stringLongTuple2.f1;
                                            }
                                        }
                                )
                );

        DataStream<Tuple2<String, Long>> stream2 = env
                .fromElements(
                        Tuple2.of("a", 3000L),
                        Tuple2.of("b", 3000L),
                        Tuple2.of("a", 4000L),
                        Tuple2.of("b", 4000L)
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<Tuple2<String, Long>>forMonotonousTimestamps()
                                .withTimestampAssigner(
                                        new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                                            @Override
                                            public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
                                                return stringLongTuple2.f1;
                                            }
                                        }
                                )
                );

        stream1
                .join(stream2)
                .where(r -> r.f0)
                .equalTo(r -> r.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .apply(new JoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {
                    @Override
                    public String join(Tuple2<String, Long> left, Tuple2<String, Long> right) throws Exception {
                        return left + "=>" + right;
                    }
                })
                .print();

        env.execute();
    }
}

12.3.2 间隔联结(Interval Join)

在有些场景下,我们要处理的时间间隔可能并不是固定的。这时显然不应该用滚动窗口或滑动窗口来处理——因为匹配的两个数据有可能刚好“卡在”窗口边缘两侧,于是窗口内就都没有匹配了;会话窗口虽然时间不固定,但也明显不适合这个场景。基于时间的窗口联结已经无能为力了。

为了应对这样的需求,Flink提供了一种叫作“间隔联结”(interval join)的合流操作。顾名思义,间隔联结的思路就是针对一条流的每个数据,开辟出其时间戳前后的一段时间间隔,看这期间是否有来自另一条流的数据匹配。

1. 间隔联结的原理间隔联结具体的定义方式是,我们给定两个时间点,分别叫作间隔的“上界”(upperBound)和“下界”(lowerBound);于是对于一条流(不妨叫作A)中的任意一个数据元素a,就可以开辟一段时间间隔:[a.timestamp + lowerBound, a.timestamp + upperBound],即以a的时间戳为中心,下至下界点、上至上界点的一个闭区间:我们就把这段时间作为可以匹配另一条流数据的“窗口”范围。所以对于另一条流(不妨叫B)中的数据元素b,如果它的时间戳落在了这个区间范围内,a和b就可以成功配对,进而进行计算输出结果。所以匹配的条件为:

代码语言:javascript
复制
a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

这里需要注意,做间隔联结的两条流A和B,也必须基于相同的key;下界lowerBound应该小于等于上界upperBound,两者都可正可负;间隔联结目前只支持事件时间语义。如图所示,我们可以清楚地看到间隔联结的方式:

下方的流A去间隔联结上方的流B,所以基于A的每个数据元素,都可以开辟一个间隔区间。我们这里设置下界为-2毫秒,上界为1毫秒。于是对于时间戳为2的A中元素,它的可匹配区间就是[0, 3],流B中有时间戳为0、1的两个元素落在这个范围内,所以就可以得到匹配数据对(2, 0)和(2, 1)。同样地,A中时间戳为3的元素,可匹配区间为[1, 4],B中只有时间戳为1的一个数据可以匹配,于是得到匹配数据对(3, 1)。

所以我们可以看到,间隔联结同样是一种内连接(inner join)。与窗口联结不同的是,interval join做匹配的时间段是基于流中数据的,所以并不确定;而且流B中的数据可以不只在一个区间内被匹配。

2. 间隔联结的调用间隔联结在代码中,是基于KeyedStream的联结(join)操作。DataStream在keyBy得到KeyedStream之后,可以调用.intervalJoin()来合并两条流,传入的参数同样是一个KeyedStream,两者的key类型应该一致;得到的是一个IntervalJoin类型。后续的操作同样是完全固定的:先通过.between()方法指定间隔的上下界,再调用.process()方法,定义对匹配数据对的处理操作。调用.process()需要传入一个处理函数,这是处理函数家族的最后一员:“处理联结函数”ProcessJoinFunction。

通用调用形式如下:

代码语言:javascript
复制
stream1
    .keyBy(<KeySelector>)
    .intervalJoin(stream2.keyBy(<KeySelector>))
    .between(Time.milliseconds(-2), Time.milliseconds(1))
    .process (new ProcessJoinFunction<Integer, Integer, String(){
        @Override
        public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
            out.collect(left + "," + right);
        }
    });

可以看到,抽象类ProcessJoinFunction就像是ProcessFunction和JoinFunction的结合,内部同样有一个抽象方法.processElement()。与其他处理函数不同的是,它多了一个参数,这自然是因为有来自两条流的数据。参数中left指的就是第一条流中的数据,right则是第二条流中与它匹配的数据。每当检测到一组匹配,就会调用这里的.processElement()方法,经处理转换之后输出结果。

3. 间隔联结实例在电商网站中,某些用户行为往往会有短时间内的强关联。我们这里举一个例子,我们有两条流,一条是下订单的流,一条是浏览数据的流。我们可以针对同一个用户,来做这样一个联结。也就是使用一个用户的下订单的事件和这个用户的最近十分钟的浏览数据进行一个联结查询。

下面是一段示例代码:

代码语言:javascript
复制
// 基于间隔的join
public class IntervalJoinExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Tuple3<String, String, Long>> orderStream = env
                .fromElements(
                        Tuple3.of("Mary", "order", 20 * 60 * 1000L)
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Tuple3<String, String, Long>>forMonotonousTimestamps()
                                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
                                    @Override
                                    public long extractTimestamp(Tuple3<String, String, Long> e, long l) {
                                        return e.f2;
                                    }
                                })
                );

        SingleOutputStreamOperator<Tuple3<String, String, Long>> pvStream = env
                .fromElements(
                        Tuple3.of("Mary", "pv", 15 * 60 * 1000L),
                        Tuple3.of("Mary", "pv", 11 * 60 * 1000L),
                        Tuple3.of("Mary", "pv", 9 * 60 * 1000L),
                        Tuple3.of("Mary", "pv", 21 * 60 * 1000L)
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Tuple3<String, String, Long>>forMonotonousTimestamps()
                                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
                                    @Override
                                    public long extractTimestamp(Tuple3<String, String, Long> e, long l) {
                                        return e.f2;
                                    }
                                })
                );

        // a -> b
        // (lower, higher)
        orderStream
                .keyBy(r -> r.f0)
                .intervalJoin(pvStream.keyBy(r -> r.f0))
                .between(Time.minutes(-10), Time.minutes(5))
                .process(new ProcessJoinFunction<Tuple3<String, String, Long>, Tuple3<String, String, Long>, String>() {
                    @Override
                    public void processElement(Tuple3<String, String, Long> left, Tuple3<String, String, Long> right, Context context, Collector<String> collector) throws Exception {
                        collector.collect(left + "=>" + right);
                    }
                })
                .print();

        // b -> a
        // (-higher, -lower)
        pvStream
                .keyBy(r -> r.f0)
                .intervalJoin(orderStream.keyBy(r -> r.f0))
                .between(Time.minutes(-5), Time.minutes(10))
                .process(new ProcessJoinFunction<Tuple3<String, String, Long>, Tuple3<String, String, Long>, String>() {
                    @Override
                    public void processElement(Tuple3<String, String, Long> left, Tuple3<String, String, Long> right, Context context, Collector<String> collector) throws Exception {
                        collector.collect(right + "->" + left);
                    }
                })
                .print();

        env.execute();
    }
}

十三、状态管理

Flink处理机制的核心,就是“有状态的流式计算”。我们之前多次提到了“状态”(state),不论是简单聚合、窗口聚合,还是处理函数的应用,都会有状态的身影出现。在Flink这样的分布式系统中,我们不仅需要定义出状态在任务并行时的处理方式,还需要考虑如何持久化保存、以便发生故障时正确地恢复。这就需要一套完整的管理机制来处理所有的状态。

13.1 Flink中的状态

在流处理中,数据是连续不断到来和处理的。每个任务进行计算处理时,可以基于当前数据直接转换得到输出结果;也可以依赖一些其他数据。这些由一个任务维护,并且用来计算输出结果的所有数据,就叫作这个任务的状态。

13.1.1 有状态算子

在Flink中,算子任务可以分为无状态和有状态两种情况。无状态的算子任务只需要观察每个独立事件,根据当前输入的数据直接转换输出结果,如图所示。例如,可以将一个字符串类型的数据拆分开作为元组输出;也可以对数据做一些计算,比如每个代表数量的字段加1。我们之前讲到的基本转换算子,如map、filter、flatMap,计算时不依赖其他数据,就都属于无状态的算子。

而有状态的算子任务,则除当前数据之外,还需要一些其他数据来得到计算结果。这里的“其他数据”,就是所谓的状态(state),最常见的就是之前到达的数据,或者由之前数据计算出的某个结果。比如,做求和(sum)计算时,需要保存之前所有数据的和,这就是状态;窗口算子中会保存已经到达的所有数据,这些也都是它的状态。另外,如果我们希望检索到某种“事件模式”(event pattern),比如“先有下单行为,后有支付行为”,那么也应该把之前的行为保存下来,这同样属于状态。容易发现,之前讲过的聚合算子、窗口算子都属于有状态的算子。

如图所示为有状态算子的一般处理流程,具体步骤如下。

代码语言:javascript
复制
(1)算子任务接收到上游发来的数据;
(2)获取当前状态;
(3)根据业务逻辑进行计算,更新状态;
(4)得到计算结果,输出发送到下游任务。

13.1.2 状态的管理

在传统的事务型处理架构中,这种额外的状态数据是保存在数据库中的。而对于实时流处理来说,这样做需要频繁读写外部数据库,如果数据规模非常大肯定就达不到性能要求了。所以Flink的解决方案是,将状态直接保存在内存中来保证性能,并通过分布式扩展来提高吞吐量。在Flink中,每一个算子任务都可以设置并行度,从而可以在不同的slot上并行运行多个实例,我们把它们叫作“并行子任务”。而状态既然在内存中,那么就可以认为是子任务实例上的一个本地变量,能够被任务的业务逻辑访问和修改。

这样看来状态的管理似乎非常简单,我们直接把它作为一个对象交给JVM就可以了。然而大数据的场景下,我们必须使用分布式架构来做扩展,在低延迟、高吞吐的基础上还要保证容错性,一系列复杂的问题就会随之而来了。

代码语言:javascript
复制
1.状态的访问权限。我们知道Flink上的聚合和窗口操作,一般都是基于KeyedStream的,数据会按照key的哈希值进行分区,聚合处理的结果也应该是只对当前key有效。然而同一个分区(也就是slot)上执行的任务实例,可能会包含多个key的数据,它们同时访问和更改本地变量,就会导致计算结果错误。所以这时状态并不是单纯的本地变量。

2.容错性,也就是故障后的恢复。状态只保存在内存中显然是不够稳定的,我们需要将它持久化保存,做一个备份;在发生故障后可以从这个备份中恢复状态。

3.我们还应该考虑到分布式应用的横向扩展性。比如处理的数据量增大时,我们应该相应地对计算资源扩容,调大并行度。这时就涉及到了状态的重组调整。

可见状态的管理并不是一件轻松的事。好在Flink作为有状态的大数据流式处理框架,已经帮我们搞定了这一切。Flink有一套完整的状态管理机制,将底层一些核心功能全部封装起来,包括状态的高效存储和访问、持久化保存和故障恢复,以及资源扩展时的调整。这样,我们只需要调用相应的API就可以很方便地使用状态,或对应用的容错机制进行配置,从而将更多的精力放在业务逻辑的开发上。

13.1.3 状态的分类

1. 托管状态(Managed State)和原始状态(Raw State)

Flink的状态有两种:托管状态(Managed State)和原始状态(Raw State)。托管状态就是由Flink统一管理的,状态的存储访问、故障恢复和重组等一系列问题都由Flink实现,我们只要调接口就可以;而原始状态则是自定义的,相当于就是开辟了一块内存,需要我们自己管理,实现状态的序列化和故障恢复。具体来讲,托管状态是由Flink的运行时(Runtime)来托管的;在配置容错机制后,状态会自动持久化保存,并在发生故障时自动恢复。当应用发生横向扩展时,状态也会自动地重组分配到所有的子任务实例上。对于具体的状态内容,Flink也提供了值状态(ValueState)、列表状态(ListState)、映射状态(MapState)、聚合状态(AggregateState)等多种结构,内部支持各种数据类型。聚合、窗口等算子中内置的状态,就都是托管状态;我们也可以在富函数类(RichFunction)中通过上下文来自定义状态,这些也都是托管状态。而对比之下,原始状态就全部需要自定义了。Flink不会对状态进行任何自动操作,也不知道状态的具体数据类型,只会把它当作最原始的字节(Byte)数组来存储。我们需要花费大量的精力来处理状态的管理和维护。所以只有在遇到托管状态无法实现的特殊需求时,我们才会考虑使用原始状态;一般情况下不推荐使用。绝大多数应用场景,我们都可以用Flink提供的算子或者自定义托管状态来实现需求。

2. 算子状态(Operator State)和按键分区状态(Keyed State)

接下来我们的重点就是托管状态(Managed State)。我们知道在Flink中,一个算子任务会按照并行度分为多个并行子任务执行,而不同的子任务会占据不同的任务槽(task slot)。由于不同的slot在计算资源上是物理隔离的,所以Flink能管理的状态在并行任务间是无法共享的,每个状态只能针对当前子任务的实例有效。而很多有状态的操作(比如聚合、窗口)都是要先做keyBy进行按键分区的。按键分区之后,任务所进行的所有计算都应该只针对当前key有效,所以状态也应该按照key彼此隔离。在这种情况下,状态的访问方式又会有所不同。基于这样的想法,我们又可以将托管状态分为两类:算子状态和按键分区状态。

(1)算子状态(Operator State)

状态作用范围限定为当前的算子任务实例,也就是只对当前并行子任务实例有效。这就意味着对于一个并行子任务,占据了一个“分区”,它所处理的所有数据都会访问到相同的状态,状态对于同一任务而言是共享的,如图所示。

算子状态可以用在所有算子上,使用的时候其实就跟一个本地变量没什么区别——因为本地变量的作用域也是当前任务实例。在使用时,我们还需进一步实现CheckpointedFunction接口。

(2)按键分区状态(Keyed State)

状态是根据输入流中定义的键(key)来维护和访问的,所以只能定义在按键分区流(KeyedStream)中,也就keyBy之后才可以使用,如图所示。

按键分区状态应用非常广泛。之前讲到的聚合算子必须在keyBy之后才能使用,就是因为聚合的结果是以Keyed State的形式保存的。另外,也可以通过富函数类(Rich Function)来自定义Keyed State,所以只要提供了富函数类接口的算子,也都可以使用Keyed State。

所以即使是map、filter这样无状态的基本转换算子,我们也可以通过富函数类给它们“追加”Keyed State,或者实现CheckpointedFunction接口来定义Operator State;从这个角度讲,Flink中所有的算子都可以是有状态的,不愧是“有状态的流处理”。

无论是Keyed State还是Operator State,它们都是在本地实例上维护的,也就是说每个并行子任务维护着对应的状态,算子的子任务之间状态不共享。

13.2 按键分区状态(Keyed State)

在实际应用中,我们一般都需要将数据按照某个key进行分区,然后再进行计算处理;所以最为常见的状态类型就是Keyed State。之前介绍到keyBy之后的聚合、窗口计算,算子所持有的状态,都是Keyed State。另外,我们还可以通过富函数类(Rich Function)对转换算子进行扩展、实现自定义功能,比如RichMapFunction、RichFilterFunction。在富函数中,我们可以调用.getRuntimeContext()获取当前的运行时上下文(RuntimeContext),进而获取到访问状态的句柄;这种富函数中自定义的状态也是Keyed State。

13.2.1 基本概念和特点

按键分区状态(Keyed State)顾名思义,是任务按照键(key)来访问和维护的状态。它的特点非常鲜明,就是以key为作用范围进行隔离。

我们知道,在进行按键分区(keyBy)之后,具有相同键的所有数据,都会分配到同一个并行子任务中;所以如果当前任务定义了状态,Flink就会在当前并行子任务实例中,为每个键值维护一个状态的实例。于是当前任务就会为分配来的所有数据,按照key维护和处理对应的状态。

因为一个并行子任务可能会处理多个key的数据,所以Flink需要对Keyed State进行一些特殊优化。在底层,Keyed State类似于一个分布式的映射(map)数据结构,所有的状态会根据key保存成键值对(key-value)的形式。这样当一条数据到来时,任务就会自动将状态的访问范围限定为当前数据的key,从map存储中读取出对应的状态值。所以具有相同key的所有数据都会到访问相同的状态,而不同key的状态之间是彼此隔离的。

这种将状态绑定到key上的方式,相当于使得状态和流的逻辑分区一一对应了:不会有别的key的数据来访问当前状态;而当前状态对应key的数据也只会访问这一个状态,不会分发到其他分区去。这就保证了对状态的操作都是本地进行的,对数据流和状态的处理做到了分区一致性。

另外,在应用的并行度改变时,状态也需要随之进行重组。不同key对应的Keyed State可以进一步组成所谓的键组(key groups),每一组都对应着一个并行子任务。键组是Flink重新分配Keyed State的单元,键组的数量就等于定义的最大并行度。当算子并行度发生改变时,Keyed State就会按照当前的并行度重新平均分配,保证运行时各个子任务的负载相同。

需要注意,使用Keyed State必须基于KeyedStream。没有进行keyBy分区的DataStream,即使转换算子实现了对应的富函数类,也不能通过运行时上下文访问Keyed State。

13.2.2 支持的结构类型

实际应用中,需要保存为状态的数据会有各种各样的类型,有时还需要复杂的集合类型,比如列表(List)和映射(Map)。对于这些常见的用法,Flink的按键分区状态(Keyed State)提供了足够的支持。接下来我们就来了解一下Keyed State 所支持的结构类型.

1. 值状态(ValueState)

顾名思义,状态中只保存一个“值”(value)。ValueState本身是一个接口,源码中定义如下:

代码语言:javascript
复制
  public interface ValueState<T> extends State {
T value() throws IOException;
void update(T value) throws IOException;
}

这里的T是泛型,表示状态的数据内容可以是任何具体的数据类型。如果想要保存一个长整型值作为状态,那么类型就是ValueState。我们可以在代码中读写值状态,实现对于状态的访问和更新。

代码语言:javascript
复制
T value():获取当前状态的值;
update(T value):对状态进行更新,传入的参数value就是要覆写的状态值。

在具体使用时,为了让运行时上下文清楚到底是哪个状态,我们还需要创建一个“状态描述器”(StateDescriptor)来提供状态的基本信息。例如源码中,ValueState的状态描述器构造方法如下:

代码语言:javascript
复制
  public ValueStateDescriptor(String name, Class<T> typeClass) {
    super(name, typeClass, null);
}

这里需要传入状态的名称和类型——这跟我们声明一个变量时做的事情完全一样。有了这个描述器,运行时环境就可以获取到状态的控制句柄(handler)了。关于代码中状态的使用,我们会在下一节详细介绍。

2. 列表状态(ListState)

将需要保存的数据,以列表(List)的形式组织起来。在ListState接口中同样有一个类型参数T,表示列表中数据的类型。ListState也提供了一系列的方法来操作状态,使用方式与一般的List非常相似。

代码语言:javascript
复制
Iterable<T> get():获取当前的列表状态,返回的是一个可迭代类型Iterable<T>;
update(List<T> values):传入一个列表values,直接对状态进行覆盖;
add(T value):在状态列表中添加一个元素value;
addAll(List<T> values):向列表中添加多个元素,以列表values形式传入。

类似地,ListState的状态描述器就叫作ListStateDescriptor,用法跟ValueStateDescriptor完全一致。

3. 映射状态(MapState)

把一些键值对(key-value)作为状态整体保存起来,可以认为就是一组key-value映射的列表。对应的MapState<UK, UV>接口中,就会有UK、UV两个泛型,分别表示保存的key和value的类型。同样,MapState提供了操作映射状态的方法,与Map的使用非常类似。

代码语言:javascript
复制
UV get(UK key):传入一个key作为参数,查询对应的value值;
put(UK key, UV value):传入一个键值对,更新key对应的value值;
putAll(Map<UK, UV> map):将传入的映射map中所有的键值对,全部添加到映射状态中;
remove(UK key):将指定key对应的键值对删除;
boolean contains(UK key):判断是否存在指定的key,返回一个boolean值。
另外,MapState也提供了获取整个映射相关信息的方法:
Iterable<Map.Entry<UK, UV>> entries():获取映射状态中所有的键值对;
Iterable<UK> keys():获取映射状态中所有的键(key),返回一个可迭代Iterable类型;
Iterable<UV> values():获取映射状态中所有的值(value),返回一个可迭代Iterable类型;
boolean isEmpty():判断映射是否为空,返回一个boolean值。

4. 归约状态(ReducingState)

类似于值状态(Value),不过需要对添加进来的所有数据进行归约,将归约聚合之后的值作为状态保存下来。ReducintState这个接口调用的方法类似于ListState,只不过它保存的只是一个聚合值,所以调用.add()方法时,不是在状态列表里添加元素,而是直接把新数据和之前的状态进行归约,并用得到的结果更新状态。

归约逻辑的定义,是在归约状态描述器(ReducingStateDescriptor)中,通过传入一个归约函数(ReduceFunction)来实现的。这里的归约函数,就是我们之前介绍reduce聚合算子时讲到的ReduceFunction,所以状态类型跟输入的数据类型是一样的。

代码语言:javascript
复制
public ReducingStateDescriptor(
        String name, ReduceFunction<T> reduceFunction, Class<T> typeClass) {...}

这里的描述器有三个参数,其中第二个参数就是定义了归约聚合逻辑的ReduceFunction,另外两个参数则是状态的名称和类型。

5. 聚合状态(AggregatingState)

与归约状态非常类似,聚合状态也是一个值,用来保存添加进来的所有数据的聚合结果。与ReducingState不同的是,它的聚合逻辑是由在描述器中传入一个更加一般化的聚合函数(AggregateFunction)来定义的;这也就是之前我们讲过的AggregateFunction,里面通过一个累加器(Accumulator)来表示状态,所以聚合的状态类型可以跟添加进来的数据类型完全不同,使用更加灵活。

同样地,AggregatingState接口调用方法也与ReducingState相同,调用.add()方法添加元素时,会直接使用指定的AggregateFunction进行聚合并更新状态。

13.2.3 代码实现

了解了按键分区状态(Keyed State)的基本概念和类型,接下来我们就可以尝试在代码中使用状态了。

1. 整体介绍

在 Flink 中,状态始终是与特定算子相关联的;算子在使用状态前首先需要“注册”,其实就是告诉Flink当前上下文中定义状态的信息,这样运行时的 Flink 才能知道算子有哪些状态。

状态的注册,主要是通过“状态描述器”(StateDescriptor)来实现的。状态描述器中最重要的内容,就是状态的名称(name)和类型(type)。我们知道Flink中的状态,可以认为是加了一些复杂操作的内存中的变量;而当我们在代码中声明一个局部变量时,都需要指定变量类型和名称,名称就代表了变量在内存中的地址,类型则指定了占据内存空间的大小。同样地,我们一旦指定了名称和类型,Flink就可以在运行时准确地在内存中找到对应的状态,进而返回状态对象供我们使用了。所以在一个算子中,我们也可以定义多个状态,只要它们的名称不同就可以了。

另外,状态描述器中还可能需要传入一个用户自定义函数(user-defined-function,UDF),用来说明处理逻辑,比如前面提到的ReduceFunction和AggregateFunction。以ValueState为例,我们可以定义值状态描述器如下:

代码语言:javascript
复制
ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>(
"my state", // 状态名称
Types.LONG // 状态类型
);

这里我们定义了一个叫作“my state”的长整型ValueState的描述器。代码中完整的操作是,首先定义出状态描述器;然后调用.getRuntimeContext()方法获取运行时上下文;继而调用RuntimeContext的获取状态的方法,将状态描述器传入,就可以得到对应的状态了。因为状态的访问需要获取运行时上下文,这只能在富函数类(Rich Function)中获取到,所以自定义的Keyed State只能在富函数中使用。当然,底层的处理函数(Process Function)本身继承了AbstractRichFunction抽象类,所以也可以使用。在富函数中,调用.getRuntimeContext()方法获取到运行时上下文之后,RuntimeContext有以下几个获取状态的方法:

代码语言:javascript
复制
ValueState<T> getState(ValueStateDescriptor<T>)
MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)
ListState<T> getListState(ListStateDescriptor<T>)
ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)

对于不同结构类型的状态,只要传入对应的描述器、调用对应的方法就可以了。获取到状态对象之后,就可以调用它们各自的方法进行读写操作了。另外,所有类型的状态都有一个方法.clear(),用于清除当前状态。代码中使用状态的整体结构如下:

代码语言:javascript
复制
public static class MyFlatMapFunction extends RichFlatMapFunction<Long, String> {
    // 声明状态
private transient ValueState<Long> state;

    @Override
public void open(Configuration config) {
    // 在open生命周期方法中获取状态
        ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>(
"my state", // 状态名称
Types.LONG // 状态类型 
);
        state = getRuntimeContext().getState(descriptor);
    }

    @Override
    public void flatMap(Long input, Collector<String> out) throws Exception {
        // 访问状态
        Long currentState = state.value();
        currentState += 1;    // 状态数值加1
        // 更新状态
        state.update(currentState);
        if (currentState >= 100) {
            out.collect(“state: ” + currentState);
            state.clear();    // 清空状态
        }
    }
}

因为RichFlatmapFunction中的.flatmap()是每来一条数据都会调用一次的,所以我们不应该在这里调用运行时上下文的.getState()方法,而是在生命周期方法.open()中获取状态对象。另外还有一个问题,我们获取到的状态对象也需要有一个变量名称state,但这个变量不应该在open中声明——否则在.flatmap()里就访问不到了。所以我们还需要在外面直接把它定义为类的属性,这样就可以在不同的方法中通用了。而在外部又不能直接获取状态,因为编译时是无法拿到运行时上下文的。所以最终的解决方案就变成了:在外部声明状态对象,在open生命周期方法中通过运行时上下文获取状态。

这里需要注意,这种方式定义的都是Keyed State,它对于每个key都会保存一份状态实例。所以对状态进行读写操作时,获取到的状态跟当前输入数据的key有关;只有相同key的数据,才会操作同一个状态,不同key的数据访问到的状态值是不同的。而且上面提到的.clear()方法,也只会清除当前key对应的状态。下面我们给出一些不同类型状态的应用实例。

1. 值状态(ValueState)

我们这里会使用用户id来进行分流,然后分别统计每个用户的pv数据,由于我们并不想每次pv加一,就将统计结果发送到下游去,所以这里我们注册了一个定时器,用来隔一段时间发送pv的统计结果,这样对下游算子的压力不至于太大。

具体实现方式是定义一个用来保存定时器时间戳的值状态变量。当定时器触发并向下游发送数据以后,便清空储存定时器时间戳的状态变量,这样当新的数据到来时,发现并没有定时器存在,就可以注册新的定时器了,注册完定时器之后将定时器的时间戳继续保存在状态变量中。

代码语言:javascript
复制
public class ValueStateExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        env
                .addSource(new ClickSource())
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Event>forMonotonousTimestamps()
                                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                                    @Override
                                    public long extractTimestamp(Event element, long recordTimestamp) {
                                        return element.timestamp;
                                    }
                                })
                )
                .keyBy(r -> r.user)
                .process(new KeyedProcessFunction<String, Event, String>() {
                    private ValueState<Long> valueState;
                    private ValueState<Long> timerTs;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        valueState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("pv", Types.LONG));
                    }

                    @Override
                    public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
                        if (valueState.value() == null) {
                            valueState.update(1L);
                        } else {
                            valueState.update(valueState.value() + 1L);
                        }
                        if (timerTs.value() == null) {
ctx.timerService().registerEventTimeTimer(value.timestamp + 10 * 1000L);
                            timerTs.update(value.timestamp + 10 * 1000L);
                        }
                    }

                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                        super.onTimer(timestamp, ctx, out);
                        out.collect("用户 " + ctx.getCurrentKey() + " 的PV是:" + valueState.value());
                        timerTs.clear();
                    }
                })
                .print();

        env.execute();
    }
}

2. 映射状态(MapState)

映射状态的用法和Java中的HashMap很相似。在这里我们可以通过MapState的使用来探索一下窗口的底层实现,也就是我们要用映射状态来完整模拟窗口的功能。这里我们模拟一个滚动窗口。我们要计算的是每一个url在每一个窗口中的pv数据。我们之前使用增量聚合和全窗口聚合结合的方式实现过这个需求。这里我们用MapState再来实现一下。

代码语言:javascript
复制
// 使用KeyedProcessFunction模拟滚动窗口
public class FakeWindowExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        env
                .addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event event, long l) {
                        return event.timestamp;
                    }
                }))
                .keyBy(r -> r.url)
                .process(new FakeWindow(5000L))
                .print();

        env.execute();
    }

    public static class FakeWindow extends KeyedProcessFunction<String, Event, String> {
        // 窗口的开始时间 -> 窗口中的pv
        private MapState<Long, Long> mapState;
        // 滚动窗口的长度
        private Long windowSize;

        public FakeWindow(Long windowSize) {
            this.windowSize = windowSize;
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            mapState = getRuntimeContext().getMapState(
                    new MapStateDescriptor<Long, Long>("window-pv", Types.LONG, Types.LONG)
            );
        }

        @Override
        public void processElement(Event event, Context context, Collector<String> collector) throws Exception {
            long windowStart = event.timestamp - event.timestamp % windowSize;
            long windowEnd = windowStart + windowSize;
            context.timerService().registerEventTimeTimer(windowEnd – 1);
            if (mapState.contains(windowStart)) {
                long pv = mapState.get(windowStart);
                mapState.put(windowStart, pv + 1L);
            } else {
                mapState.put(windowStart, 1L);
            }
        }

        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
            super.onTimer(timestamp, ctx, out);
            long start = timestamp + 1L - windowSize;
            long end = timestamp + 1L;
            out.collect(ctx.getCurrentKey() + ":" + new Timestamp(start) + "~" + new Timestamp(end) + ":" + mapState.get(start));
// 删除窗口,因为窗口的默认操作是计算完成以后销毁窗口
            mapState.remove(start);
        }
    }

3. 聚合状态(AggregatingState)

我们举一个简单的例子,首先自定义一个产生随机整数的自定义数据源,然后进行累加。当累加到999时,清空聚合状态变量,然后重新累加。可以看到我们这里使用RichFlatMapFunction实现了sum的功能。

代码语言:javascript
复制
public class AggregateStateExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env
                .addSource(new SourceFunction<Tuple2<String, Integer>>() {
                    private boolean running = true;
                    private Random random = new Random();
                    @Override
                    public void run(SourceContext<Tuple2<String, Integer>> sourceContext) throws Exception {
                        while (true) {
                            sourceContext.collect(Tuple2.of("key", random.nextInt()));
                        }
                    }

                    @Override
                    public void cancel() {
                        running = false;
                    }
                })
                .keyBy(r -> r.f0)
                .flatMap(new CountFunction())
                .print();
        env.execute();
    }

    public static class CountFunction extends RichFlatMapFunction<Tuple2<String, Integer>, Integer> {
        private int count = 0;
        // 声明聚合状态变量
        private AggregatingState<Tuple2<String, Integer>, Integer> aggregatingState;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            AggregatingStateDescriptor<Tuple2<String, Integer>, Integer, Integer> descriptor = new AggregatingStateDescriptor<Tuple2<String, Integer>, Integer, Integer>(
                    "aggregatingState", new AggregateFunction<Tuple2<String, Integer>, Integer, Integer>() {
                @Override
                public Integer createAccumulator() {
                    return 0;
                }

                @Override
                public Integer add(Tuple2<String, Integer> value, Integer accumulator) {
                    return accumulator + 1;
                }

                @Override
                public Integer getResult(Integer accumulator) {
                    return accumulator;
                }

                @Override
                public Integer merge(Integer a, Integer b) {
                    return a + b;
                }
            }, Types.INT);
            aggregatingState = getRuntimeContext().getAggregatingState(descriptor);
        }

        @Override
        public void flatMap(Tuple2<String, Integer> value, Collector<Integer> out) throws Exception {
            count++;
            if (count % 1000 == 0) {
                out.collect(aggregatingState.get());
                aggregatingState.clear(); // 清空状态变量
            } else {
                // 增量更新AggregatingState,这里每来一个新元素,对ACC累加1
                aggregatingState.add(value);
            }
        }
    }
}

13.2.4 状态生存时间(TTL)

在实际应用中,很多状态会随着时间的推移逐渐增长,如果不加以限制,最终就会导致存储空间的耗尽。一个优化的思路是直接在代码中调用.clear()方法去清除状态,但是有时候我们的逻辑要求不能直接清除。这时就需要配置一个状态的“生存时间”(time-to-live,TTL),当状态在内存中存在的时间超出这个值时,就将它清除。

具体实现上,如果用一个进程不停地扫描所有状态看是否过期,显然会占用大量资源做无用功。状态的失效其实不需要立即删除,所以我们可以给状态附加一个属性,也就是状态的“失效时间”。状态创建的时候,设置 失效时间 = 当前时间 + TTL;之后如果有对状态的访问和修改,我们可以再对失效时间进行更新;当设置的清除条件被触发时(比如,状态被访问的时候,或者每隔一段时间扫描一次失效状态),就可以判断状态是否失效、从而进行清除了。

配置状态的TTL时,需要创建一个StateTtlConfig配置对象,然后调用状态描述器的.enableTimeToLive()方法启动TTL功能。

代码语言:javascript
复制
StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(10))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("my state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);

这里用到了几个配置项:

代码语言:javascript
复制
.newBuilder()
状态TTL配置的构造器方法,必须调用,返回一个Builder之后再调用.build()方法就可以得到StateTtlConfig了。方法需要传入一个Time作为参数,这就是设定的状态生存时间。
.setUpdateType()
设置更新类型。更新类型指定了什么时候更新状态失效时间,这里的OnCreateAndWrite表示只有创建状态和更改状态(写操作)时更新失效时间。另一种类型OnReadAndWrite则表示无论读写操作都会更新失效时间,也就是只要对状态进行了访问,就表明它是活跃的,从而延长生存时间。这个配置默认为OnCreateAndWrite。
.setStateVisibility()
设置状态的可见性。所谓的“状态可见性”,是指因为清除操作并不是实时的,所以当状态过期之后还有可能继续存在,这时如果对它进行访问,能否正常读取到就是一个问题了。这里设置的NeverReturnExpired是默认行为,表示从不返回过期值,也就是只要过期就认为它已经被清除了,应用不能继续读取;这在处理会话或者隐私数据时比较重要。对应的另一种配置是ReturnExpireDefNotCleanedUp,就是如果过期状态还存在,就返回它的值。

除此之外,TTL配置还可以设置在保存检查点(checkpoint)时触发清除操作,或者配置增量的清理(incremental cleanup),还可以针对RocksDB状态后端使用压缩过滤器(compaction filter)进行后台清理。这里需要注意,目前的TTL设置只支持处理时间。

13.3 算子状态(Operator State)

除按键分区状态(Keyed State)之外,另一大类受控状态就是算子状态(Operator State)。从某种意义上说,算子状态是更底层的状态类型,因为它只针对当前算子并行任务有效,不需要考虑不同key的隔离。算子状态功能不如按键分区状态丰富,应用场景较少,它的调用方法也会有一些区别。

13.3.1 基本概念和特点

算子状态(Operator State)就是一个算子并行实例上定义的状态,作用范围被限定为当前算子任务。算子状态跟数据的key无关,所以不同key的数据只要被分发到同一个并行子任务,就会访问到同一个Operator State。算子状态的实际应用场景不如Keyed State多,一般用在Source或Sink等与外部系统连接的算子上,或者完全没有key定义的场景。比如Flink的Kafka连接器中,就用到了算子状态。当算子的并行度发生变化时,算子状态也支持在并行的算子任务实例之间做重组分配。根据状态的类型不同,重组分配的方案也会不同。

13.3.2 状态类型

算子状态也支持不同的结构类型,主要有三种:ListState、UnionListState和BroadcastState。

1. 列表状态(ListState)

与Keyed State中的ListState一样,将状态表示为一组数据的列表。与Keyed State中的列表状态的区别是:在算子状态的上下文中,不会按键(key)分别处理状态,所以每一个并行子任务上只会保留一个“列表”(list),也就是当前并行子任务上所有状态项的集合。列表中的状态项就是可以重新分配的最细粒度,彼此之间完全独立。当算子并行度进行缩放调整时,算子的列表状态中的所有元素项会被统一收集起来,相当于把多个分区的列表合并成了一个“大列表”,然后再均匀地分配给所有并行任务。这种“均匀分配”的具体方法就是“轮询”(round-robin),与之前介绍的rebanlance数据传输方式类似,是通过逐一“发牌”的方式将状态项平均分配的。这种方式也叫作“平均分割重组”(even-split redistribution)。算子状态中不会存在“键组”(key group)这样的结构,所以为了方便重组分配,就把它直接定义成了“列表”(list)。这也就解释了,为什么算子状态中没有最简单的值状态(ValueState)。

2. 联合列表状态(UnionListState)

与ListState类似,联合列表状态也会将状态表示为一个列表。它与常规列表状态的区别在于,算子并行度进行缩放调整时对于状态的分配方式不同。UnionListState的重点就在于“联合”(union)。在并行度调整时,常规列表状态是轮询分配状态项,而联合列表状态的算子则会直接广播状态的完整列表。这样,并行度缩放之后的并行子任务就获取到了联合后完整的“大列表”,可以自行选择要使用的状态项和要丢弃的状态项。这种分配也叫作“联合重组”(union redistribution)。如果列表中状态项数量太多,为资源和效率考虑一般不建议使用联合重组的方式。

3. 广播状态(BroadcastState)

有时我们希望算子并行子任务都保持同一份“全局”状态,用来做统一的配置和规则设定。这时所有分区的所有数据都会访问到同一个状态,状态就像被“广播”到所有分区一样,这种特殊的算子状态,就叫作广播状态(BroadcastState)。因为广播状态在每个并行子任务上的实例都一样,所以在并行度调整的时候就比较简单,只要复制一份到新的并行任务就可以实现扩展;而对于并行度缩小的情况,可以将多余的并行子任务连同状态直接砍掉——因为状态都是复制出来的,并不会丢失。

13.4 状态持久化和状态后端

在Flink的状态管理机制中,很重要的一个功能就是对状态进行持久化(persistence)保存,这样就可以在发生故障后进行重启恢复。Flink对状态进行持久化的方式,就是将当前所有分布式状态进行“快照”保存,写入一个“检查点”(checkpoint)或者保存点(savepoint)保存到外部存储系统中。具体的存储介质,一般是分布式文件系统(distributed file system)。

13.4.1 检查点(Checkpoint)

有状态流应用中的检查点(checkpoint),其实就是所有任务的状态在某个时间点的一个快照(一份拷贝)。简单来讲,就是一次“存盘”,让我们之前处理数据的进度不要丢掉。在一个流应用程序运行时,Flink 会定期保存检查点,在检查点中会记录每个算子的id和状态;如果发生故障,Flink就会用最近一次成功保存的检查点来恢复应用的状态,重新启动处理流程,就如同“读档”一样。

如果保存检查点之后又处理了一些数据,然后发生了故障,那么重启恢复状态之后这些数据带来的状态改变会丢失。为了让最终处理结果正确,我们还需要让源(Source)算子重新读取这些数据,再次处理一遍。这就需要流的数据源具有“数据重放”的能力,一个典型的例子就是Kafka,我们可以通过保存消费数据的偏移量、故障重启后重新提交来实现数据的重放。这是对“至少一次”(at least once)状态一致性的保证,如果希望实现“精确一次”(exactly once)的一致性,还需要数据写入外部系统时的相关保证。关于这部分内容我们会在第10章继续讨论。

默认情况下,检查点是被禁用的,需要在代码中手动开启。直接调用执行环境的.enableCheckpointing()方法就可以开启检查点。

代码语言:javascript
复制
StreamExecutionEnvironment env = StreamExecutionEnvironment.getEnvironment();
env.enableCheckpointing(1000);

这里传入的参数是检查点的间隔时间,单位为毫秒。关于检查点的详细配置,可以参考第10章的内容。除了检查点之外,Flink还提供了“保存点”(savepoint)的功能。保存点在原理和形式上跟检查点完全一样,也是状态持久化保存的一个快照;区别在于,保存点是自定义的镜像保存,所以不会由Flink自动创建,而需要用户手动触发。这在有计划地停止、重启应用时非常有用。

13.4.2 状态后端(State Backends)

检查点的保存离不开JobManager和TaskManager,以及外部存储系统的协调。在应用进行检查点保存时,首先会由JobManager向所有TaskManager发出触发检查点的命令;TaskManger收到之后,将当前任务的所有状态进行快照保存,持久化到远程的存储介质中;完成之后向JobManager返回确认信息。这个过程是分布式的,当JobManger收到所有TaskManager的返回信息后,就会确认当前检查点成功保存,如图所示。而这一切工作的协调,就需要一个“专职人员”来完成。

在Flink中,状态的存储、访问以及维护,都是由一个可插拔的组件决定的,这个组件就叫作状态后端(state backend)。状态后端主要负责两件事:一是本地的状态管理,二是将检查点(checkpoint)写入远程的持久化存储。

1. 状态后端的分类

状态后端是一个“开箱即用”的组件,可以在不改变应用程序逻辑的情况下独立配置。Flink中提供了两类不同的状态后端,一种是“哈希表状态后端”(HashMapStateBackend),另一种是“内嵌RocksDB状态后端”(EmbeddedRocksDBStateBackend)。如果没有特别配置,系统默认的状态后端是HashMapStateBackend。

(1)哈希表状态后端(HashMapStateBackend)

这种方式就是我们之前所说的,把状态存放在内存里。具体实现上,哈希表状态后端在内部会直接把状态当作对象(objects),保存在Taskmanager的JVM堆(heap)上。普通的状态,以及窗口中收集的数据和触发器(triggers),都会以键值对(key-value)的形式存储起来,所以底层是一个哈希表(HashMap),这种状态后端也因此得名。对于检查点的保存,一般是放在持久化的分布式文件系统(file system)中,也可以通过配置“检查点存储”(CheckpointStorage)来另外指定。HashMapStateBackend是将本地状态全部放入内存的,这样可以获得最快的读写速度,使计算性能达到最佳;代价则是内存的占用。它适用于具有大状态、长窗口、大键值状态的作业,对所有高可用性设置也是有效的。

(2)内嵌RocksDB状态后端(EmbeddedRocksDBStateBackend)

RocksDB是一种内嵌的key-value存储介质,可以把数据持久化到本地硬盘。配置EmbeddedRocksDBStateBackend后,会将处理中的数据全部放入RocksDB数据库中,RocksDB默认存储在TaskManager的本地数据目录里。与HashMapStateBackend直接在堆内存中存储对象不同,这种方式下状态主要是放在RocksDB中的。数据被存储为序列化的字节数组(Byte Arrays),读写操作需要序列化/反序列化,因此状态的访问性能要差一些。另外,因为做了序列化,key的比较也会按照字节进行,而不是直接调用.hashCode()和.equals()方法。对于检查点,同样会写入到远程的持久化文件系统中。EmbeddedRocksDBStateBackend始终执行的是异步快照,也就是不会因为保存检查点而阻塞数据的处理;而且它还提供了增量式保存检查点的机制,这在很多情况下可以大大提升保存效率。由于它会把状态数据落盘,而且支持增量化的检查点,所以在状态非常大、窗口非常长、键/值状态很大的应用场景中是一个好选择,同样对所有高可用性设置有效。

2. 如何选择正确的状态后端

HashMap和RocksDB两种状态后端最大的区别,就在于本地状态存放在哪里:前者是内存,后者是RocksDB。在实际应用中,选择那种状态后端,主要是需要根据业务需求在处理性能和应用的扩展性上做一个选择。HashMapStateBackend是内存计算,读写速度非常快;但是,状态的大小会受到集群可用内存的限制,如果应用的状态随着时间不停地增长,就会耗尽内存资源。而RocksDB是硬盘存储,所以可以根据可用的磁盘空间进行扩展,而且是唯一支持增量检查点的状态后端,所以它非常适合于超级海量状态的存储。不过由于每个状态的读写都需要做序列化/反序列化,而且可能需要直接从磁盘读取数据,这就会导致性能的降低,平均读写性能要比HashMapStateBackend慢一个数量级。我们可以发现,实际应用就是权衡利弊后的取舍。最理想的当然是处理速度快且内存不受限制可以处理海量状态,那就需要非常大的内存资源了,这会导致成本超出项目预算。比起花更多的钱,稍慢的处理速度或者稍小的处理规模,老板可能更容易接受一点。

3. 状态后端的配置

在不做配置的时候,应用程序使用的默认状态后端是由集群配置文件flink-conf.yaml中指定的,配置的键名称为state.backend。这个默认配置对集群上运行的所有作业都有效,我们可以通过更改配置值来改变默认的状态后端。另外,我们还可以在代码中为当前作业单独配置状态后端,这个配置会覆盖掉集群配置文件的默认值。

(1)配置默认的状态后端

在flink-conf.yaml中,可以使用state.backend来配置默认状态后端。配置项的可能值为hashmap,这样配置的就是HashMapStateBackend;也可以是rocksdb,这样配置的就是EmbeddedRocksDBStateBackend。另外,也可以是一个实现了状态后端工厂StateBackendFactory的类的完全限定类名。

下面是一个配置HashMapStateBackend的例子:

代码语言:javascript
复制
# 默认状态后端
state.backend: hashmap
# 存放检查点的文件路径
state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
这里的state.checkpoints.dir配置项,定义了状态后端将检查点和元数据写入的目录。

(2)为每个作业(Per-job)单独配置状态后端

每个作业独立的状态后端,可以在代码中,基于作业的执行环境直接设置。代码如下:

代码语言:javascript
复制
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());

上面代码设置的是HashMapStateBackend,如果想要设置EmbeddedRocksDBStateBackend,可以用下面的配置方式:

代码语言:javascript
复制
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new EmbeddedRocksDBStateBackend());

需要注意,如果想在IDE中使用EmbeddedRocksDBStateBackend,需要为Flink项目添加依赖:

代码语言:javascript
复制
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

而由于Flink发行版中默认就包含了RocksDB,所以只要我们的代码中没有使用RocksDB的相关内容,就不需要引入这个依赖。

十四、容错机制

在分布式架构中,当某个节点出现故障,其它节点基本不受影响。这时只需要重启应用,恢复之前某个时间点的状态继续处理就可以了。这一切看似简单,可是在实时流处理中,我们不仅需要保证故障后能够重启继续运行,还要保证结果的正确性、故障恢复的速度、对处理性能的影响,这就需要在架构上做出更加精巧的设计。在Flink中,有一套完整的容错机制(fault tolerance)来保证故障后的恢复,其中最重要的就是检查点(checkpoint)。在第九章中,我们已经介绍过检查点的基本概念和用途,接下来我们就深入探讨一下检查点的原理和Flink的容错机制。

14.1 检查点(Checkpoint)

在流处理中,我们可以用存档读档的思路,把之前的计算结果做个保存,这样重启之后就可以继续处理新数据、而不需要重新计算了。进一步地,我们知道在有状态的流处理中,任务继续处理新数据,并不需要“之前的计算结果”,而是需要任务“之前的状态”。所以我们最终的选择,就是将之前某个时间点所有的状态保存下来,这份“存档”就是所谓的“检查点”(checkpoint)。遇到故障重启的时候,我们可以从检查点中“读档”,恢复出之前的状态,这样就可以回到当时保存的一刻接着处理数据了。检查点是Flink容错机制的核心。这里所谓的“检查”,其实是针对故障恢复的结果而言的:故障恢复之后继续处理的结果,应该与发生故障前完全一致,我们需要“检查”结果的正确性。所以,有时又会把checkpoint叫做“一致性检查点”。

14.1.1 检查点的保存

1. 周期性的触发保存

“随时存档”确实恢复起来方便,可是需要我们不停地做存档操作。如果每处理一条数据就进行检查点的保存,当大量数据同时到来时,就会耗费很多资源来频繁做检查点,数据处理的速度就会收到影响。所以更好的方式是,每隔一段时间去做一次存档,这样既不会影响数据的正常处理,也不会有太大的延迟——毕竟故障恢复的情况不是随时发生的。在Flink中,检查点的保存是周期性触发的,间隔时间可以进行设置。

所以检查点作为应用状态的一份“存档”,其实就是所有任务状态在同一时间点的一个“快照”(snapshot),它的触发是周期性的。具体来说,当每隔一段时间检查点保存操作被触发时,就把每个任务当前的状态复制一份,按照一定的逻辑结构放在一起持久化保存起来,就构成了检查点。

2. 保存的时间点

我们应该在所有任务都恰好处理完一个相同的输入数据的时候,将它们的状态保存下来。首先,这样避免了除状态之外其它额外信息的存储,提高了检查点保存的效率。其次,一个数据要么就是被所有任务完整地处理完,状态得到了保存;要么就是没处理完,状态全部没保存:这就相当于构建了一个“事务”(transaction)。如果出现故障,我们恢复到之前保存的状态,故障时正在处理的所有数据都需要重新处理;所以我们只需要让源(source)任务向数据源重新提交偏移量、请求重放数据就可以了。当然这需要源任务可以把偏移量作为算子状态保存下来,而且外部数据源能够重置偏移量;kafka就是满足这些要求的一个最好的例子。

3. 保存的具体流程

检查点的保存,最关键的就是要等所有任务将“同一个数据”处理完毕。下面我们通过一个具体的例子,来详细描述一下检查点具体的保存过程。回忆一下我们最初实现的统计词频的程序——word count。这里为了方便,我们直接从数据源读入已经分开的一个个单词,例如这里输入的就是:

代码语言:javascript
复制
“hello”,“world”,“hello”,“flink”,“hello”,“world”,“hello”,“flink”…

对应的代码就可以简化为:

代码语言:javascript
复制
SingleOutputStreamOperator<Tuple2<String, Long>> wordCountStream = 
env.addSource(...)
        .map(word -> Tuple2.of(word, 1L))
        .returns(Types.TUPLE(Types.STRING, Types.LONG));
        .keyBy(t -> t.f0);
        .sum(1);

源(Source)任务从外部数据源读取数据,并记录当前的偏移量,作为算子状态(Operator State)保存下来。然后将数据发给下游的Map任务,它会将一个单词转换成(word, count)二元组,初始count都是1,也就是(“hello”, 1)、(“world”, 1)这样的形式;这是一个无状态的算子任务。进而以word作为键(key)进行分区,调用.sum()方法就可以对count值进行求和统计了;Sum算子会把当前求和的结果作为按键分区状态(Keyed State)保存下来。最后得到的就是当前单词的频次统计(word, count),如图所示。

当我们需要保存检查点(checkpoint)时,就是在所有任务处理完同一条数据后,对状态做个快照保存下来。例如上图中,已经处理了3条数据:“hello”“world”“hello”,所以我们会看到Source算子的偏移量为3;后面的Sum算子处理完第三条数据“hello”之后,此时已经有2个“hello”和1个“world”,所以对应的状态为“hello”-> 2,“world”-> 1(这里KeyedState底层会以key-value形式存储)。此时所有任务都已经处理完了前三个数据,所以我们可以把当前的状态保存成一个检查点,写入外部存储中。至于具体保存到哪里,这是由状态后端的配置项“检查点存储”(CheckpointStorage)来决定的,可以有作业管理器的堆内存(JobManagerCheckpointStorage)和文件系统(FileSystemCheckpointStorage)两种选择。一般情况下,我们会将检查点写入持久化的分布式文件系统。

14.1.2 从检查点恢复状态

在运行流处理程序时,Flink会周期性地保存检查点。当发生故障时,就需要找到最近一次成功保存的检查点来恢复状态。例如在上节的word count示例中,我们处理完三个数据后保存了一个检查点。之后继续运行,又正常处理了一个数据“flink”,在处理第五个数据“hello”时发生了故障,如图所示。

这里Source任务已经处理完毕,所以偏移量为5;Map任务也处理完成了。而Sum任务在处理中发生了故障,此时状态并未保存。接下来就需要从检查点来恢复状态了。具体的步骤为:

(1)重启应用

遇到故障之后,第一步当然就是重启。我们将应用重新启动后,所有任务的状态会清空,如图所示。

(2)读取检查点,重置状态

找到最近一次保存的检查点,从中读出每个算子任务状态的快照,分别填充到对应的状态中。这样,Flink内部所有任务的状态,就恢复到了保存检查点的那一时刻,也就是刚好处理完第三个数据的时候,如图所示。

(3)重放数据

从检查点恢复状态后还有一个问题:如果直接继续处理数据,那么保存检查点之后、到发生故障这段时间内的数据,也就是第4、5个数据(“flink”“hello”)就相当于丢掉了;这会造成计算结果的错误。为了不丢数据,我们应该从保存检查点后开始重新读取数据,这可以通过Source任务向外部数据源重新提交偏移量(offset)来实现,如图所示。

这样,整个系统的状态已经完全回退到了检查点保存完成的那一时刻。

(4)继续处理数据

接下来,我们就可以正常处理数据了。首先是重放第4、5个数据,然后继续读取后面的数据,如图所示。

当处理到第5个数据时,就已经追上了发生故障时的系统状态。之后继续处理,就好像没有发生过故障一样;我们既没有丢掉数据、也没有重复计算数据,这就保证了计算结果的正确性。在分布式系统中,这叫做实现了“精确一次”(exactly-once)的状态一致性保证。

14.1.3 检查点算法

在Flink中,采用了基于Chandy-Lamport算法的分布式快照,可以在不暂停整体流处理的前提下,将状态备份保存到检查点。

1. 检查点分界线(Barrier)

我们可以借鉴水位线(watermark)的设计,在数据流中插入一个特殊的数据结构,专门用来表示触发检查点保存的时间点。收到保存检查点的指令后,Source任务可以在当前数据流中插入这个结构;之后的所有任务只要遇到它就开始对状态做持久化快照保存。由于数据流是保持顺序依次处理的,因此遇到这个标识就代表之前的数据都处理完了,可以保存一个检查点;而在它之后的数据,引起的状态改变就不会体现在这个检查点中,而需要保存到下一个检查点。

这种特殊的数据形式,把一条流上的数据按照不同的检查点分隔开,所以就叫做检查点的“分界线”(Checkpoint Barrier)。与水位线很类似,检查点分界线也是一条特殊的数据,由Source算子注入到常规的数据流中,它的位置是限定好的,不能超过其他数据,也不能被后面的数据超过。检查点分界线中带有一个检查点ID,这是当前要保存的检查点的唯一标识,如图所示。

这样,分界线就将一条流逻辑上分成了两部分:分界线之前到来的数据导致的状态更改,都会被包含在当前分界线所表示的检查点中;而基于分界线之后的数据导致的状态更改,则会被包含在之后的检查点中。

在JobManager中有一个“检查点协调器”(checkpoint coordinator),专门用来协调处理检查点的相关工作。检查点协调器会定期向TaskManager发出指令,要求保存检查点(带着检查点ID);TaskManager会让所有的Source任务把自己的偏移量(算子状态)保存起来,并将带有检查点ID的分界线(barrier)插入到当前的数据流中,然后像正常的数据一样像下游传递;之后Source任务就可以继续读入新的数据了。

每个算子任务只要处理到这个barrier,就把当前的状态进行快照;在收到barrier之前,还是正常地处理之前的数据,完全不受影响。比如上图中,Source任务收到1号检查点保存指令时,读取完了三个数据,所以将偏移量3保存到外部存储中;而后将ID为1的barrier注入数据流;与此同时,Map任务刚刚收到上一条数据“hello”,而Sum任务则还在处理之前的第二条数据(world, 1)。下游任务不会在这时就立刻保存状态,而是等收到barrier时才去做快照,这时可以保证前三个数据都已经处理完了。同样地,下游任务做状态快照时,也不会影响上游任务的处理,每个任务的快照保存并行不悖,不会有暂停等待的时间。

2. 分布式快照算法

watermark指示的是“之前的数据全部到齐了”,而barrier指示的是“之前所有数据的状态更改保存入当前检查点”:它们都是一个“截止时间”的标志。所以在处理多个分区的传递时,也要以是否还会有数据到来作为一个判断标准。

具体实现上,Flink使用了Chandy-Lamport算法的一种变体,被称为“异步分界线快照”(asynchronous barrier snapshotting)算法。算法的核心就是两个原则:当上游任务向多个并行下游任务发送barrier时,需要广播出去;而当多个上游任务向同一个下游任务传递barrier时,需要在下游任务执行“分界线对齐”(barrier alignment)操作,也就是需要等到所有并行分区的barrier都到齐,才可以开始状态的保存。

为了详细解释检查点算法的原理,我们对之前的word count程序进行扩展,考虑所有算子并行度为2的场景,如图所示。

我们有两个并行的Source任务,会分别读取两个数据流(或者是一个源的不同分区)。这里每条流中的数据都是一个个的单词:“hello”“world”“hello”“flink”交替出现。此时第一条流的Source任务(为了方便,下文中我们直接叫它“Source 1”,其它任务类似)读取了3个数据,偏移量为3;而第二条流的Source任务(Source 2)只读取了一个“hello”数据,偏移量为1。

接下来就是检查点保存的算法。具体过程如下:

(1)JobManager发送指令,触发检查点的保存;Source任务保存状态,插入分界线

JobManager 会周期性地向每个 TaskManager发送一条带有新检查点 ID 的消息,通过这种方式来启动检查点。收到指令后,TaskManger会在所有Source 任务中插入一个分界线(barrier),并将偏移量保存到远程的持久化存储中,如图所示。

并行的Source任务保存的状态为3和1,表示当前的1号检查点应该包含:第一条流中截至第三个数据、第二条流中截至第一个数据的所有状态更改。可以发现Source任务做这些的时候并不影响后面任务的处理,Sum任务已经处理完了第一条流中传来的(world, 1),对应的状态也有了更改。

(2)状态快照保存完成,分界线向下游传递

状态存入持久化存储之后,会返回通知给 Source 任务;Source 任务就会向 JobManager 确认检查点完成,然后跟数据一样把barrier向下游任务传递,如图所示。

由于Source和Map之间是一对一(forward)的传输关系(这里没有考虑算子链operator chain),所以barrier可以直接传递给对应的Map任务。之后Source任务就可以继续读取新的数据了。与此同时,Sum 1已经将第二条流传来的(hello,1)处理完毕,更新了状态。

(3)向下游多个并行子任务广播分界线,执行分界线对齐

Map任务没有状态,所以直接将barrier继续向下游传递。这时由于进行了keyBy分区,所以需要将barrier广播到下游并行的两个Sum任务,如图所示。同时,Sum任务可能收到来自上游两个并行Map任务的barrier,所以需要执行“分界线对齐”操作。

此时的Sum 2收到了来自上游两个Map任务的barrier,说明第一条流第三个数据、第二条流第一个数据都已经处理完,可以进行状态的保存了;而Sum 1只收到了来自Map 2的barrier,所以这时需要等待分界线对齐。在等待的过程中,如果分界线尚未到达的分区任务Map 1又传来了数据(hello, 1),说明这是需要保存到检查点的,Sum任务应该正常继续处理数据,状态更新为3;而如果分界线已经到达的分区任务Map 2又传来数据,这已经是下一个检查点要保存的内容了,就不应立即处理,而是要缓存起来、等到状态保存之后再做处理。

(4)分界线对齐后,保存状态到持久化存储

各个分区的分界线都对齐后,就可以对当前状态做快照,保存到持久化存储了。存储完成之后,同样将barrier向下游继续传递,并通知JobManager保存完毕,如图所示。

这个过程中,每个任务保存自己的状态都是相对独立的,互不影响。我们可以看到,当Sum将当前状态保存完毕时,Source 1任务已经读取到第一条流的第五个数据了。

(5)先处理缓存数据,然后正常继续处理

完成检查点保存之后,任务就可以继续正常处理数据了。这时如果有等待分界线对齐时缓存的数据,需要先做处理;然后再按照顺序依次处理新到的数据。当JobManager收到所有任务成功保存状态的信息,就可以确认当前检查点成功保存。之后遇到故障就可以从这里恢复了。由于分界线对齐要求先到达的分区做缓存等待,一定程度上会影响处理的速度;当出现背压(backpressure)时,下游任务会堆积大量的缓冲数据,检查点可能需要很久才可以保存完毕。为了应对这种场景,Flink 1.11之后提供了不对齐的检查点保存方式,可以将未处理的缓冲数据(in-flight data)也保存进检查点。这样,当我们遇到一个分区barrier时就不需等待对齐,而是可以直接启动状态的保存了。

14.1.4 检查点配置

检查点的作用是为了故障恢复,我们不能因为保存检查点占据了大量时间、导致数据处理性能明显降低。为了兼顾容错性和处理性能,我们可以在代码中对检查点进行各种配置。

1. 启用检查点

默认情况下,Flink程序是禁用检查点的。如果想要为Flink应用开启自动保存快照的功能,需要在代码中显式地调用执行环境的.enableCheckpointing()方法:

代码语言:javascript
复制
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
// 每隔1秒启动一次检查点保存
env.enableCheckpointing(1000);

这里需要传入一个长整型的毫秒数,表示周期性保存检查点的间隔时间。如果不传参数直接启用检查点,默认的间隔周期为500毫秒,这种方式已经被弃用。检查点的间隔时间是对处理性能和故障恢复速度的一个权衡。如果我们希望对性能的影响更小,可以调大间隔时间;而如果希望故障重启后迅速赶上实时的数据处理,就需要将间隔时间设小一些。

2. 检查点存储(Checkpoint Storage)

检查点具体的持久化存储位置,取决于“检查点存储”(CheckpointStorage)的设置。默认情况下,检查点存储在JobManager的堆(heap)内存中。而对于大状态的持久化保存,Flink也提供了在其他存储位置进行保存的接口,这就是CheckpointStorage。具体可以通过调用检查点配置的.setCheckpointStorage()来配置,需要传入一个CheckpointStorage的实现类。Flink主要提供了两种CheckpointStorage:作业管理器的堆内存(JobManagerCheckpointStorage)和文件系统(FileSystemCheckpointStorage)。

代码语言:javascript
复制
// 配置存储检查点到JobManager堆内存
env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage());
// 配置存储检查点到文件系统
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://namenode:40010/flink/checkpoints"));

对于实际生产应用,我们一般会将CheckpointStorage配置为高可用的分布式文件系统(HDFS,S3等)。

3. 其它高级配置

检查点还有很多可以配置的选项,可以通过获取检查点配置(CheckpointConfig)来进行设置。CheckpointConfig checkpointConfig = env.getCheckpointConfig(); 我们这里做一个简单的列举说明:

代码语言:javascript
复制
1.检查点模式(CheckpointingMode)
设置检查点一致性的保证级别,有“精确一次”(exactly-once)和“至少一次”(at-least-once)两个选项。默认级别为exactly-once,而对于大多数低延迟的流处理程序,at-least-once就够用了,而且处理效率会更高。关于一致性级别,我们会在10.2节继续展开。
2.超时时间(checkpointTimeout)
用于指定检查点保存的超时时间,超时没完成就会被丢弃掉。传入一个长整型毫秒数作为参数,表示超时时间。
3.最小间隔时间(minPauseBetweenCheckpoints)
用于指定在上一个检查点完成之后,检查点协调器(checkpoint coordinator)最快等多久可以出发保存下一个检查点的指令。这就意味着即使已经达到了周期触发的时间点,只要距离上一个检查点完成的间隔不够,就依然不能开启下一次检查点的保存。这就为正常处理数据留下了充足的间隙。当指定这个参数时,maxConcurrentCheckpoints的值强制为1。
4.最大并发检查点数量(maxConcurrentCheckpoints)
用于指定运行中的检查点最多可以有多少个。由于每个任务的处理进度不同,完全可能出现后面的任务还没完成前一个检查点的保存、前面任务已经开始保存下一个检查点了。这个参数就是限制同时进行的最大数量。
如果前面设置了minPauseBetweenCheckpoints,则maxConcurrentCheckpoints这个参数就不起作用了。
5.开启外部持久化存储(enableExternalizedCheckpoints)
用于开启检查点的外部持久化,而且默认在作业失败的时候不会自动清理,如果想释放空间需要自己手工清理。里面传入的参数ExternalizedCheckpointCleanup指定了当作业取消的时候外部的检查点该如何清理。
DELETE_ON_CANCELLATION:在作业取消的时候会自动删除外部检查点,但是如果是作业失败退出,则会保留检查点。
RETAIN_ON_CANCELLATION:作业取消的时候也会保留外部检查点。
6.检查点异常时是否让整个任务失败(failOnCheckpointingErrors)
用于指定在检查点发生异常的时候,是否应该让任务直接失败退出。默认为true,如果设置为false,则任务会丢弃掉检查点然后继续运行。
7..不对齐检查点(enableUnalignedCheckpoints)
不再执行检查点的分界线对齐操作,启用之后可以大大减少产生背压时的检查点保存时间。这个设置要求检查点模式(CheckpointingMode)必须为exctly-once,并且并发的检查点个数为1。

代码中具体设置如下:

代码语言:javascript
复制
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

// 启用检查点,间隔时间1秒
env.enableCheckpointing(1000);
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
// 设置精确一次模式
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 最小间隔时间500毫秒
checkpointConfig.setMinPauseBetweenCheckpoints(500);
// 超时时间1分钟
checkpointConfig.setCheckpointTimeout(60000);
// 同时只能有一个检查点
checkpointConfig.setMaxConcurrentCheckpoints(1);
// 开启检查点的外部持久化保存,作业取消后依然保留
checkpointConfig.enableExternalizedCheckpoints(
    ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 启用不对齐的检查点保存方式
checkpointConfig.enableUnalignedCheckpoints();
// 设置检查点存储,可以直接传入一个String,指定文件系统的路径
checkpointConfig.setCheckpointStorage("hdfs://my/checkpoint/dir")

14.1.5 保存点(Savepoint)

除了检查点(checkpoint)外,Flink还提供了另一个非常独特的镜像保存功能——保存点(savepoint)。从名称就可以看出,这也是一个存盘的备份,它的原理和算法与检查点完全相同,只是多了一些额外的元数据。事实上,保存点就是通过检查点的机制来创建流式作业状态的一致性镜像(consistent image)的。保存点中的状态快照,是以算子ID和状态名称组织起来的,相当于一个键值对。从保存点启动应用程序时,Flink会将保存点的状态数据重新分配给相应的算子任务。

1. 保存点的用途

保存点与检查点最大的区别,就是触发的时机。检查点是由Flink自动管理的,定期创建,发生故障之后自动读取进行恢复,这是一个“自动存盘”的功能;而保存点不会自动创建,必须由用户明确地手动触发保存操作,所以就是“手动存盘”。因此两者尽管原理一致,但用途就有所差别了:检查点主要用来做故障恢复,是容错机制的核心;保存点则更加灵活,可以用来做有计划的手动备份和恢复。保存点可以当作一个强大的运维工具来使用。我们可以在需要的时候创建一个保存点,然后停止应用,做一些处理调整之后再从保存点重启。它适用的具体场景有:

代码语言:javascript
复制
1.版本管理和归档存储
对重要的节点进行手动备份,设置为某一版本,归档(archive)存储应用程序的状态。
2.更新Flink版本
目前Flink的底层架构已经非常稳定,所以当Flink版本升级时,程序本身一般是兼容的。这时不需要重新执行所有的计算,只要创建一个保存点,停掉应用、升级Flink后,从保存点重启就可以继续处理了。
3.更新应用程序
我们不仅可以在应用程序不变的时候,更新Flink版本;还可以直接更新应用程序。前提是程序必须是兼容的,也就是说更改之后的程序,状态的拓扑结构和数据类型都是不变的,这样才能正常从之前的保存点去加载。
这个功能非常有用。我们可以及时修复应用程序中的逻辑bug,更新之后接着继续处理;也可以用于有不同业务逻辑的场景,比如A/B测试等等。
4.调整并行度
如果应用运行的过程中,发现需要的资源不足、或者已经有了大量剩余,也可以通过从保存点重启的方式,将应用程序的并行度增大或减小。
5.暂停应用程序
有时候我们不需要调整集群或者更新程序,只是单纯地希望把应用暂停、释放一些资源来处理更重要的应用程序。使用保存点就可以灵活实现应用的暂停和重启,可以对有限的集群资源做最好的优化配置。
需要注意的是,保存点能够在程序更改的时候依然兼容,前提是状态的拓扑结构和数据类型不变。我们知道保存点中状态都是以算子ID-状态名称这样的key-value组织起来的,算子ID可以在代码中直接调用SingleOutputStreamOperator的.uid()方法来进行指定:
代码语言:javascript
复制
DataStream<String> stream = env
  .addSource(new StatefulSource())
  .uid("source-id")
  .map(new StatefulMapper())
  .uid("mapper-id")
  .print();

对于没有设置ID的算子,Flink默认会自动进行设置,所以在重新启动应用后可能会导致ID不同而无法兼容以前的状态。所以为了方便后续的维护,强烈建议在程序中为每一个算子手动指定ID。

2. 使用保存点

保存点的使用非常简单,我们可以使用命令行工具来创建保存点,也可以从保存点恢复作业。

(1)创建保存点

要在命令行中为运行的作业创建一个保存点镜像,只需要执行:

代码语言:javascript
复制
bin/flink savepoint :jobId [:targetDirectory]

这里jobId需要填充要做镜像保存的作业ID,目标路径targetDirectory可选,表示保存点存储的路径。对于保存点的默认路径,可以通过配置文件flink-conf.yaml中的state.savepoints.dir项来设定:

代码语言:javascript
复制
state.savepoints.dir: hdfs:///flink/savepoints

当然对于单独的作业,我们也可以在程序代码中通过执行环境来设置:

代码语言:javascript
复制
env.setDefaultSavepointDir("hdfs:///flink/savepoints");

由于创建保存点一般都是希望更改环境之后重启,所以创建之后往往紧接着就是停掉作业的操作。除了对运行的作业创建保存点,我们也可以在停掉一个作业时直接创建保存点:

代码语言:javascript
复制
bin/flink stop --savepointPath [:targetDirectory] :jobId

(2)从保存点重启应用

我们已经知道,提交启动一个Flink作业,使用的命令是flink run;现在要从保存点重启一个应用,其实本质是一样的:

代码语言:javascript
复制
bin/flink run -s :savepointPath [:runArgs]

这里只要增加一个-s参数,指定保存点的路径就可以了,其它启动时的参数还是完全一样的。细心的读者可能还记得我们在第三章使用web UI进行作业提交时,可以填入的参数除了入口类、并行度和运行参数,还有一个“Savepoint Path”,这就是从保存点启动应用的配置。

14.2 状态一致性

之前讲到检查点又叫做“一致性检查点”,是Flink容错机制的核心。接下来我们就对状态一致性的概念进行展开,结合理论和实际应用场景,讨论一下Flink流式处理架构中的应对机制。

14.2.1 一致性的概念和级别

简单来讲,一致性其实就是结果的正确性。对于分布式系统而言,强调的是不同节点中相同数据的副本应该总是“一致的”,也就是从不同节点读取时总能得到相同的值;而对于事务而言,是要求提交更新操作后,能够读取到新的数据。对于Flink来说,多个节点并行处理不同的任务,我们要保证计算结果是正确的,就必须不漏掉任何一个数据,而且也不会重复处理同一个数据。流式计算本身就是一个一个来的,所以正常处理的过程中结果肯定是正确的;但在发生故障、需要恢复状态进行回滚时就需要更多的保障机制了。我们通过检查点的保存来保证状态恢复后结果的正确,所以主要讨论的就是“状态的一致性”。一般说来,状态一致性有三种级别:

代码语言:javascript
复制
1.最多一次(AT-MOST-ONCE) 
当任务发生故障时,最简单的做法就是直接重启,别的什么都不干;既不恢复丢失的状态,也不重放丢失的数据。每个数据在正常情况下会被处理一次,遇到故障时就会丢掉,所以就是“最多处理一次”。
我们发现,如果数据可以直接被丢掉,那其实就是没有任何操作来保证结果的准确性;所以这种类型的保证也叫“没有保证”。尽管看起来比较糟糕,不过如果我们的主要诉求是“快”,而对近似正确的结果也能接受,那这也不失为一种很好的解决方案。
2.至少一次(AT-LEAST-ONCE) 
在实际应用中,我们一般会希望至少不要丢掉数据。这种一致性级别就叫做“至少一次”(at-least-once),就是说是所有数据都不会丢,肯定被处理了;不过不能保证只处理一次,有些数据会被重复处理。
在有些场景下,重复处理数据是不影响结果的正确性的,这种操作具有“幂等性”。比如,如果我们统计电商网站的UV,需要对每个用户的访问数据进行去重处理,所以即使同一个数据被处理多次,也不会影响最终的结果,这时使用at-least-once语义是完全没问题的。当然,如果重复数据对结果有影响,比如统计的是PV,或者之前的统计词频word count,使用at-least-once语义就可能会导致结果的不一致了。
为了保证达到at-least-once 的状态一致性,我们需要在发生故障时能够重放数据。最常见的做法是,可以用持久化的事件日志系统,把所有的事件写入到持久化存储中。这时只要记录一个偏移量,当任务发生故障重启后,重置偏移量就可以重放检查点之后的数据了。Kafka就是这种架构的一个典型实现。
3.精确一次(EXACTLY-ONCE) 
最严格的一致性保证,就是所谓的“精确一次”(exactly-once,有时也译作“恰好一次”)。这也是最难实现的状态一致性语义。exactly-once意味着所有数据不仅不会丢失,而且只被处理一次,不会重复处理。也就是说对于每一个数据,最终体现在状态和输出结果上,只能有一次统计。
exactly-once可以真正意义上保证结果的绝对正确,在发生故障恢复后,就好像从未发生过故障一样。

很明显,要做的exactly-once,首先必须能达到at-least-once的要求,就是数据不丢。所以同样需要有数据重放机制来保证这一点。另外还需要有专门的设计保证每个数据只被处理一次。Flink中使用的是一种轻量级快照机制——检查点(checkpoint)来保证exactly-once语义。

14.2.2 端到端的状态一致性

我们已经知道检查点可以保证Flink内部状态的一致性,而且可以做到精确一次(exactly-once)。那是不是说,只要开启了检查点,发生故障进行恢复,结果就不会有任何问题呢?

没那么简单。在实际应用中,一般要保证从用户的角度看来,最终消费的数据是正确的。而用户或者外部应用不会直接从Flink内部的状态读取数据,往往需要我们将处理结果写入外部存储中。这就要求我们不仅要考虑Flink内部数据的处理转换,还涉及到从外部数据源读取,以及写入外部持久化系统,整个应用处理流程从头到尾都应该是正确的。

所以完整的流处理应用,应该包括了数据源、流处理器和外部存储系统三个部分。这个完整应用的一致性,就叫做“端到端(end-to-end)的状态一致性”,它取决于三个组件中最弱的那一环。一般来说,能否达到at-least-once一致性级别,主要看数据源能够重放数据;而能否达到exactly-once级别,流处理器内部、数据源、外部存储都要有相应的保证机制。在下一节,我们就将详细讨论端到端的exactly-once一致性语义如何保证。

14.3 端到端精确一次(end-to-end exactly-once)

实际应用中,最难做到、也最希望做到的一致性语义,无疑就是端到端(end-to-end)的“精确一次”(exactly-once)。我们知道,对于Flink内部来说,检查点机制可以保证故障恢复后数据不丢(在能够重放的前提下),并且只处理一次,所以已经可以做到exactly-once的一致性语义了。所以端到端一致性的关键点,就在于输入的数据源端和输出的外部存储端。

14.3.1 输入端保证

输入端主要指的就是Flink读取的外部数据源。对于一些数据源来说,并不提供数据的缓冲或是持久化保存,数据被消费之后就彻底不存在了,例如socket文本流。对于这样的数据源,故障后我们即使通过检查点恢复之前的状态,可保存检查点之后到发生故障期间的数据已经不能重发了,这就会导致数据丢失。所以就只能保证at-most-once的一致性语义,相当于没有保证。

想要在故障恢复后不丢数据,外部数据源就必须拥有重放数据的能力。常见的做法就是对数据进行持久化保存,并且可以重设数据的读取位置。一个最经典的应用就是Kafka。在Flink的Source任务中将数据读取的偏移量保存为状态,这样就可以在故障恢复时从检查点中读取出来,对数据源重置偏移量,重新获取数据。

数据源可重放数据,或者说可重置读取数据偏移量,加上Flink的Source算子将偏移量作为状态保存进检查点,就可以保证数据不丢。这是达到at-least-once一致性语义的基本要求,当然也是实现端到端exactly-once的基本要求。

14.3.2 输出端保证

有了Flink的检查点机制,以及可重放数据的外部数据源,我们已经能做到at-least-once了。但是想要实现exactly-once却有更大的困难:数据有可能重复写入外部系统。

因为检查点保存之后,继续到来的数据也会一一处理,任务的状态也会更新,最终通过Sink任务将计算结果输出到外部系统;只是状态改变还没有存到下一个检查点中。这时如果出现故障,这些数据都会重新来一遍,就计算了两次。我们知道对Flink内部状态来说,重复计算的动作是没有影响的,因为状态已经回滚,最终改变只会发生一次;但对于外部系统来说,已经写入的结果就是泼出去的水,已经无法收回了,再次执行写入就会把同一个数据写入两次。

所以这时,我们只保证了端到端的at-least-once语义。为了实现端到端exactly-once,我们还需要对外部存储系统、以及Sink连接器有额外的要求。能够保证exactly-once一致性的写入方式有两种:

代码语言:javascript
复制
幂等写入
事务写入

我们需要外部存储系统对这两种写入方式的支持,而Flink也为提供了一些Sink连接器接口。接下来我们进行展开讲解。

1. 幂等(idempotent)写入

所谓“幂等”操作,就是说一个操作可以重复执行很多次,但只导致一次结果更改。也就是说,后面再重复执行就不会对结果起作用了。这相当于说,我们并没有真正解决数据重复计算、写入的问题;而是说,重复写入也没关系,结果不会改变。所以这种方式主要的限制在于外部存储系统必须支持这样的幂等写入:比如Redis中键值存储,或者关系型数据库(如MySQL)中满足查询条件的更新操作。需要注意,对于幂等写入,遇到故障进行恢复时,有可能会出现短暂的不一致。因为保存点完成之后到发生故障之间的数据,其实已经写入了一遍,回滚的时候并不能消除它们。如果有一个外部应用读取写入的数据,可能会看到奇怪的现象:短时间内,结果会突然“跳回”到之前的某个值,然后“重播”一段之前的数据。不过当数据的重放逐渐超过发生故障的点的时候,最终的结果还是一致的。

2. 事务(transactional)写入

如果说幂等写入对应用场景限制太多,那么事务写入可以说是更一般化的保证一致性的方式。输出端最大的问题,就是写入到外部系统的数据难以撤回。而利用事务就可以实现对已写入数据的撤回。事务(transaction)是应用程序中一系列严密的操作,所有操作必须成功完成,否则在每个操作中所作的所有更改都会被撤消。事务有四个基本特性:原子性(Atomicity)、一致性(Correspondence)、隔离性(Isolation)和持久性(Durability),这就是著名的ACID。在Flink流处理的结果写入外部系统时,如果能够构建一个事务,让写入操作可以随着检查点来提交和回滚,那么自然就可以解决重复写入的问题了。所以事务写入的基本思想就是:用一个事务来进行数据向外部系统的写入,这个事务是与检查点绑定在一起的。当Sink任务遇到barrier时,开始保存状态的同时就开启一个事务,接下来所有数据的写入都在这个事务中;待到当前检查点保存完毕时,将事务提交,所有写入的数据就真正可用了。如果中间过程出现故障,状态会回退到上一个检查点,而当前事务没有正常关闭(因为当前检查点没有保存完),所以也会回滚,写入到外部的数据就被撤销了。具体来说,又有两种实现方式:预写日志(WAL)和两阶段提交(2PC)

代码语言:javascript
复制
(1)预写日志(write-ahead-log,WAL)
我们发现,事务提交是需要外部存储系统支持事务的,否则没有办法真正实现写入的回撤。那对于一般不支持事务的存储系统,能够实现事务写入呢?
预写日志(WAL)就是一种非常简单的方式。具体步骤是:
①先把结果数据作为日志(log)状态保存起来
②进行检查点保存时,也会将这些结果数据一并做持久化存储
③在收到检查点完成的通知时,将所有结果一次性写入外部系统。
我们会发现,这种方式类似于检查点完成时做一个批处理,一次性的写入会带来一些性能上的问题;而优点就是比较简单,由于数据提前在状态后端中做了缓存,所以无论什么外部存储系统,理论上都能用这种方式一批搞定。在Flink中DataStream API提供了一个模板类GenericWriteAheadSink,用来实现这种事务型的写入方式。
需要注意的是,预写日志这种一批写入的方式,有可能会写入失败;所以在执行写入动作之后,必须等待发送成功的返回确认消息。在成功写入所有数据后,在内部再次确认相应的检查点,这才代表着检查点的真正完成。这里需要将确认信息也进行持久化保存,在故障恢复时,只有存在对应的确认信息,才能保证这批数据已经写入,可以恢复到对应的检查点位置。
但这种“再次确认”的方式,也会有一些缺陷。如果我们的检查点已经成功保存、数据也成功地一批写入到了外部系统,但是最终保存确认信息时出现了故障,Flink最终还是会认为没有成功写入。于是发生故障时,不会使用这个检查点,而是需要回退到上一个;这样就会导致这批数据的重复写入。
代码语言:javascript
复制
(2)两阶段提交(two-phase-commit,2PC)
前面提到的各种实现exactly-once的方式,多少都有点缺陷;而更好的方法就是传说中的两阶段提交(2PC)。
顾名思义,它的想法是分成两个阶段:先做“预提交”,等检查点完成之后再正式提交。这种提交方式是真正基于事务的,它需要外部系统提供事务支持。
具体的实现步骤为:
①当第一条数据到来时,或者收到检查点的分界线时,Sink任务都会启动一个事务。
②接下来接收到的所有数据,都通过这个事务写入外部系统;这时由于事务没有提交,所以数据尽管写入了外部系统,但是不可用,是“预提交”的状态。
③当Sink任务收到JobManager发来检查点完成的通知时,正式提交事务,写入的结果就真正可用了。
当中间发生故障时,当前未提交的事务就会回滚,于是所有写入外部系统的数据也就实现了撤回。这种两阶段提交(2PC)的方式充分利用了Flink现有的检查点机制:分界线的到来,就标志着开始一个新事务;而收到来自JobManager的checkpoint成功的消息,就是提交事务的指令。每个结果数据的写入,依然是流式的,不再有预写日志时批处理的性能问题;最终提交时,也只需要额外发送一个确认信息。所以2PC协议不仅真正意义上实现了exactly-once,而且通过搭载Flink的检查点机制来实现事务,只给系统增加了很少的开销。

Flink提供了TwoPhaseCommitSinkFunction接口,方便我们自定义实现两阶段提交的SinkFunction的实现,提供了真正端到端的exactly-once保证。不过两阶段提交虽然精巧,却对外部系统有很高的要求。这里将2PC对外部系统的要求列举如下:

代码语言:javascript
复制
1.外部系统必须提供事务支持,或者Sink任务必须能够模拟外部系统上的事务。
2.在检查点的间隔期间里,必须能够开启一个事务并接受数据写入。
3.在收到检查点完成的通知之前,事务必须是“等待提交”的状态。在故障恢复的情况下,这可能需要一些时间。如果这个时候外部系统关闭事务(例如超时了),那么未提交的数据就会丢失。
4.Sink任务必须能够在进程失败后恢复事务。
5.提交事务必须是幂等操作。也就是说,事务的重复提交应该是无效的。

可见,2PC在实际应用同样会受到比较大的限制。具体在项目中的选型,最终还应该是一致性级别和处理性能的权衡考量。

14.3.3 Flink和Kafka连接时的精确一次保证

在流处理的应用中,最佳的数据源当然就是可重置偏移量的消息队列了;它不仅可以提供数据重放的功能,而且天生就是以流的方式存储和处理数据的。所以作为大数据工具中消息队列的代表,Kafka可以说与Flink是天作之合,实际项目中也经常会看到以Kafka作为数据源和写入的外部系统的应用。在本小节中,我们就来具体讨论一下Flink和Kafka连接时,怎样保证端到端的exactly-once状态一致性。

1. 整体介绍

既然是端到端的exactly-once,我们依然可以从三个组件的角度来进行分析:

代码语言:javascript
复制
(1)Flink内部
Flink内部可以通过检查点机制保证状态和处理结果的exactly-once语义。
(2)输入端
输入数据源端的Kafka可以对数据进行持久化保存,并可以重置偏移量(offset)。所以我们可以在Source任务(FlinkKafkaConsumer)中将当前读取的偏移量保存为算子状态,写入到检查点中;当发生故障时,从检查点中读取恢复状态,并由连接器FlinkKafkaConsumer向Kafka重新提交偏移量,就可以重新消费数据、保证结果的一致性了。
(3)输出端
输出端保证exactly-once的最佳实现,当然就是两阶段提交(2PC)。作为与Flink天生一对的Kafka,自然需要用最强有力的一致性保证来证明自己。

Flink官方实现的Kafka连接器中,提供了写入到Kafka的FlinkKafkaProducer,它就实现了TwoPhaseCommitSinkFunction接口:

代码语言:javascript
复制
public class FlinkKafkaProducer<IN> extends TwoPhaseCommitSinkFunction<IN, FlinkKafkaProducer.KafkaTransactionState, FlinkKafkaProducer.KafkaTransactionContext> {
...
}

也就是说,我们写入Kafka的过程实际上是一个两段式的提交:处理完毕得到结果,写入Kafka时是基于事务的“预提交”;等到检查点保存完毕,才会提交事务进行“正式提交”。如果中间出现故障,事务进行回滚,预提交就会被放弃;恢复状态之后,也只能恢复所有已经确认提交的操作。

2. 具体步骤

为了方便说明,我们来考虑一个具体的流处理系统,由Flink从Kafka读取数据、并将处理结果写入Kafka,如图所示。

这是一个Flink与Kafka构建的完整数据管道,Source任务从Kafka读取数据,经过一系列处理(比如窗口计算),然后由Sink任务将结果再写入Kafka。Flink与Kafka连接的两阶段提交,离不开检查点的配合,这个过程需要JobManager协调各个TaskManager进行状态快照,而检查点具体存储位置则是由状态后端(State Backend)来配置管理的。一般情况,我们会将检查点存储到分布式文件系统上。实现端到端exactly-once的具体过程可以分解如下:

(1)启动检查点保存

检查点保存的启动,标志着我们进入了两阶段提交协议的“预提交”阶段。当然,现在还没有具体提交的数据。

如图所示,JobManager通知各个TaskManager启动检查点保存,Source任务会将检查点分界线(barrier)注入数据流。这个barrier可以将数据流中的数据,分为进入当前检查点的集合和进入下一个检查点的集合。

(2)算子任务对状态做快照

分界线(barrier)会在算子间传递下去。每个算子收到barrier时,会将当前的状态做个快照,保存到状态后端。

如图所示,Source任务将barrier插入数据流后,也会将当前读取数据的偏移量作为状态写入检查点,存入状态后端;然后把barrier向下游传递,自己就可以继续读取数据了。接下来barrier传递到了内部的Window算子,它同样会对自己的状态进行快照保存,写入远程的持久化存储。

(3)Sink任务开启事务,进行预提交

如图所示,分界线(barrier)终于传到了Sink任务,这时Sink任务会开启一个事务。接下来到来的所有数据,Sink任务都会通过这个事务来写入Kafka。这里barrier是检查点的分界线,也是事务的分界线。由于之前的检查点可能尚未完成,因此上一个事务也可能尚未提交;此时barrier的到来开启了新的事务,上一个事务尽管可能没有被提交,但也不再接收新的数据了。对于Kafka而言,提交的数据会被标记为“未确认”(uncommitted)。这个过程就是所谓的“预提交”(pre-commit)。

(4)检查点保存完成,提交事务

当所有算子的快照都完成,也就是这次的检查点保存最终完成时,JobManager会向所有任务发确认通知,告诉大家当前检查点已成功保存,如图所示。

当Sink任务收到确认通知后,就会正式提交之前的事务,把之前“未确认”的数据标为“已确认”,接下来就可以正常消费了。在任务运行中的任何阶段失败,都会从上一次的状态恢复,所有没有正式提交的数据也会回滚。这样,Flink和Kafka连接构成的流处理系统,就实现了端到端的exactly-once状态一致性。

3. 需要的配置

在具体应用中,实现真正的端到端exactly-once,还需要有一些额外的配置:

代码语言:javascript
复制
(1)必须启用检查点
(2)在FlinkKafkaProducer的构造函数中传入参数Semantic.EXACTLY_ONCE
(3)配置Kafka读取数据的消费者的隔离级别
这里所说的Kafka,是写入的外部系统。预提交阶段数据已经写入,只是被标记为“未提交”(uncommitted),而Kafka中默认的隔离级别isolation.level是read_uncommitted,也就是可以读取未提交的数据。这样一来,外部应用就可以直接消费未提交的数据,对于事务性的保证就失效了。所以应该将隔离级别配置
为read_committed,表示消费者遇到未提交的消息时,会停止从分区中消费数据,直到消息被标记为已提交才会再次恢复消费。当然,这样做的话,外部应用消费数据就会有显著的延迟。
(4)事务超时配置
Flink的Kafka连接器中配置的事务超时时间transaction.timeout.ms默认是1小时,而Kafka集群配置的事务最大超时时间transaction.max.timeout.ms默认是15分钟。所以在检查点保存时间很长时,有可能出现Kafka已经认为事务超时了,丢弃了预提交的数据;而Sink任务认为还可以继续等待。如果接下来检查点保存成功,发生故障后回滚到这个检查点的状态,这部分数据就被真正丢掉了。所以这两个超时时间,前者应该小于等于后者。

由于微信正文只允许5万字,导致没办法全篇都放进来,欢迎关注我们,后续正在路上!

让我们拭目以待!

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-12-05,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 857Hub 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 十一、处理函数
    • 11.1 基本处理函数(ProcessFunction)
      • 11.1.1 处理函数的功能和使用
      • 11.1.2 ProcessFunction解析
      • 11.1.3 处理函数的分类
      • 11.2.1定时器(Timer)和定时服务(TimerService)
      • 11.2.2 KeyedProcessFunction的使用
      • 11.3 窗口处理函数
      • 11.3.1 窗口处理函数的使用
      • 11.3.2 ProcessWindowFunction解析
      • 11.4 应用案例——Top N
      • 11.4.1 使用ProcessAllWindowFunction
      • 11.4.2 使用KeyedProcessFunction
    • 11.5 侧输出流(Side Output)
    • 十二、多流转换
      • 12.1 分流
        • 12.1.1 简单实现
        • 12.1.2 使用侧输出流
      • 12.2 基本合流操作
        • 12.2.1 联合(Union)
        • 12.2.2 连接(Connect)
      • 12.3 基于时间的合流——双流联结(Join)
        • 12.3.1 窗口联结(Window Join)
        • 12.3.2 间隔联结(Interval Join)
    • 十三、状态管理
      • 13.1 Flink中的状态
        • 13.1.1 有状态算子
        • 13.1.2 状态的管理
        • 13.1.3 状态的分类
      • 13.2 按键分区状态(Keyed State)
        • 13.2.1 基本概念和特点
        • 13.2.2 支持的结构类型
        • 13.2.3 代码实现
        • 13.2.4 状态生存时间(TTL)
      • 13.3 算子状态(Operator State)
        • 13.3.1 基本概念和特点
        • 13.3.2 状态类型
      • 13.4 状态持久化和状态后端
        • 13.4.1 检查点(Checkpoint)
        • 13.4.2 状态后端(State Backends)
    • 十四、容错机制
      • 14.1 检查点(Checkpoint)
        • 14.1.1 检查点的保存
        • 14.1.2 从检查点恢复状态
        • 14.1.3 检查点算法
        • 14.1.4 检查点配置
        • 14.1.5 保存点(Savepoint)
      • 14.2 状态一致性
        • 14.2.1 一致性的概念和级别
        • 14.2.2 端到端的状态一致性
      • 14.3 端到端精确一次(end-to-end exactly-once)
        • 14.3.1 输入端保证
        • 14.3.2 输出端保证
        • 14.3.3 Flink和Kafka连接时的精确一次保证
    相关产品与服务
    数据库
    云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档