前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >flink时间系统系列之实例讲解:如何做定时输出

flink时间系统系列之实例讲解:如何做定时输出

作者头像
Flink实战剖析
发布2022-04-18 11:22:28
7830
发布2022-04-18 11:22:28
举报
文章被收录于专栏:Flink实战剖析Flink实战剖析

flink时间系统系列篇幅目录:

一、时间系统概述介绍

二、Processing Time源码分析

三、Event Time源码分析

四、时间系统在窗口函数中的应用分析

五、ProcessFunction 使用分析

六、实例讲解:如何做定时输出

今天为大家带来flink时间系统系列最后一篇实战篇,同样也是查漏补缺篇:如何做定时输出,首先说一下定时输出的需求背景,在flink流处理中需要将任务处理的结果数据定时输出到外部存储中例如mysql/hbase等,如果我们单条输出就可能会造成对外部存储造成较大的压力,首先我们想到的批量输出,就是当需要输出的数据累计到一定大小然后批量写入外部存储,这种方式在flink 官方文档的operator state篇其实给了很好的实践例子,实现了批量输出并且对内存中缓存的数据做了state容错机制,保证数据不会丢失,但是同样存在这样的场景:某些业务可能有高低峰期,在高峰的时候,批量输出在外部存储中可以查到结果数据,但是在业务低峰期可能很长时间都满足输出条件,导致的结果是很长时间都看不到结果数据,这个时候就需要做定时输出。

先来看下一下大家可能想到的几种定时输出方案:

一、在sinkFunction 里面做一个定时器定时将内存的数据输出到外部存储,但是这里有两点需要考虑:a. 自己做定时器是一个异步执行过程,如果抛出异常是否能够被flink检测到并且使任务失败(经过实际测试是不能的);b. 异步输出所读取的数据与invoke里面写入的数据是存储在同于个存储(List)里面的,这里又要考虑线程安全的问题,那么需要做数据同步;

二、由于checkpoint是周期性的执行,那么我们可以利用在任务每次checkpoint的时候做一次数据检查将数据写入外部存储,也就是在CheckpointedFunction.snapshotState 方法中将数据输出,但是这种方式必须与checkpoint的时间同步,缺乏灵活性。

三、使用KeyedProcessFunction 来实现,在KeyedProcessFunction 可以使用flink提供的定时机制完成,但是有一个限制就是只针对KeyedStream流处理,在通常情况下输出的是一个DataStream.

由以上几种方案的弊端可知,要实现定时输出功能需要考虑以下几点:

1. 定时提供给用户灵活可配置

2. 缓存的数据必须提供容错

3. 定时输出错误必须能够抛出给flink

4. 定时输出读取的数据与invoke处理的数据同步性

5. 满足DataStream类型流输出

对于第一点很好实现做成参数配置即可,第二点缓存数据容错使用flink状态容错机制即可,重点看第三、四点。

首先声明一点定时输出是一个ProcessingTime的定时,在来看第三点异常捕获,在flink注册处理时间定时器所触发的定时处理同样是一个异步线程完成,那么在这里面是如何做到异步异常获取的,查看触发位置SystemProcessingTimeService.TriggerTask,

可以查看这里抛出的异常由一个AsyncExceptionHandler类型的exceptionHandler对象处理, 追踪其来源可发现AsyncExceptionHandler是有StreamTask 实现传入进来,也就是当定时调用出现异常会调用StreamTask.handleAsyncException ,而该方法可以使任务抛出异常并且失败;在看第四点,正常invoke处理数据,定时也处理数据,那么必然涉及到状态的切换,细心的同学可以看到在触发定时调用时使用了synchronizd(lock) 同步锁,追踪其来源可以找到lock 表示的就是StreamTask 中的lock对象,而这个对象又会传给StreamInputProcessor(可任务是数据处理的入口,会调用StreamOperator处理数据),而这里每次处理数据的时候也会使用synchronizd(lock) ,

到这里我想大家都应该明白了,正常的数据流处理与定时逻辑处理只能同时有一个进行,那么就解决key切换带来状态操作问题,同时也为我们提供的解决思路,使用flink自带定时来帮助我们完成定时输出处理。

使用flink自带定时功能,首先我们得能够获取到ProcessingTimeService这个对象,但是该对象的获取只能在AbstractStreamOperator通过getProcessingTimeService方法获取到,那么我们可以自定义一个StreamOperator 继承AbstractStreamOperator,首先看下代码实现:

代码语言:javascript
复制
public abstract class CommonSinkOperator<T extendsSerializable> extends AbstractStreamOperator<Object>implements ProcessingTimeCallback, OneInputStreamOperator<T, Object> {
    private List<T> list;
    private ListState<T> listState;
    private int batchSize;
    private long interval;
    private ProcessingTimeService processingTimeService;
    public CommonSinkOperator() {
    }

public CommonSinkOperator(int batchSize, long interval){        this.chainingStrategy = ChainingStrategy.ALWAYS;
        this.batchSize = batchSize;
        this.interval = interval;
    }

    @Override public void open() throws Exception {
        super.open();
        if (interval > 0 && batchSize > 1) {
  processingTimeService = getProcessingTimeService();
 long now=processingTimeService.getCurrentProcessingTime();
 processingTimeService.registerTimer(now + interval, this);
        }
    }

@Override public void initializeState(StateInitializationContext context) throws Exception{        super.initializeState(context);
        this.list = new ArrayList<T>();
        listState = context.getOperatorStateStore()      .getSerializableListState("batch-interval-sink");        if (context.isRestored()) {
            listState.get().forEach(x -> {
                list.add(x);
            });
        }

    }

@Override public void processElement(StreamRecord<T> element) throws Exception {
        list.add(element.getValue());
        if (list.size() >= batchSize) {
            saveRecords(list);
        }

    }

@Override public void snapshotState(StateSnapshotContext context) throws Exception {        super.snapshotState(context);
        if (list.size() > 0) {
            listState.clear();
            listState.addAll(list);
        }
    }

@Override public void onProcessingTime(long timestamp)throws Exception {        if (list.size() > 0) {
            saveRecords(list);
            list.clear();
        }
long now =        processingTimeService.getCurrentProcessingTime();
        processingTimeService.registerTimer(now + interval,                  this);
    }

    public abstract void saveRecords(List<T> datas);
}

定义一个CommonSinkOperator的抽象类,继承AbstractStreamOperator,并且实现ProcessingTimeCallback与OneInputStreamOperator接口,那么看下类里面的方法,

  1. 首先看构造函数传入批写大小batchSize与定时写入时间interval
  2. 重载了AbstractStreamOperator的open方法,在这个方法里面获取ProcessingTimeService,然后注册一个interval时长的定时器
  3. 重载AbstractStreamOperator的initializeState方法,用于恢复内存数据
  4. 重载AbstractStreamOperator的snapshotState方法,在checkpoint会将内存数据写入状态中容错
  5. 实现了OneInputStreamOperator接口的processElement方法,将结果数据写入到内存中,如果满足一定大小则输出
  6. 实现了ProcessingTimeCallback接口的onProcessingTime方法,注册定时器的执行方法,进行数据输出的同时,注册下一个定时器
  7. 定义了一个抽象saveRecords方法,实际输出操作

那么这个CommonSinkOperator就是一个模板方法,能够做到将任何类型的数据输出,只需要继承该类,并且实现saveRecords方法即可。

在这里也贴上一个测试的例子:

调用:

以上整个就是关于flink定时定量输出的实例分析,希望对大家有所帮助。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-10-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Flink实战剖析 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档