前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >flink exactly-once系列之事务性输出实现

flink exactly-once系列之事务性输出实现

作者头像
Flink实战剖析
发布2022-04-18 11:27:35
5660
发布2022-04-18 11:27:35
举报
文章被收录于专栏:Flink实战剖析Flink实战剖析

flink exactly-once系列目录: 一、两阶段提交概述 二、两阶段提交实现分析 三、StreamingFileSink分析 四、事务性输出实现 五、最终一致性实现 前几篇分析到Flink 是可以通过状态与checkpoint机制实现内部Exactly-Once 的语义,对于端到端的Exactly-Once语义,Flink 通过两阶段提交方式提供了对Kafka/HDFS输出支持,两阶段提交实现是结合checkpoint流程提供的hook来实现的,实现CheckpointedFunction与CheckpointListener接口: 1. initializeState 方法里面做事务状态的恢复与重新提交 2. snapshotState 方法里面开启事务与将需要输出的数据写到状态中容错 3. notifyCheckpointComplete方法提交事务 使用flink自带的实现要求继承TwoPhaseCommitSinkFunction类,并且实现beginTransaction、preCommit、commit、abort这几个方法,虽然说使用起来很方便,但是其有一个限制那就是所提供的事务hook(比喻Connection)能够被序列化,并且反序列化之后能够继续提交之前的事务,这个对于很多事务性的数据库是无法做到的,所以需要实现一套特有的事务提交。 之前分析到两阶段提交的主要问题是在第二阶段,commit有可能会存在部分成功与部分失败,所以才有了事务容错恢复,提交失败的重启继续提交,提交成功的重启再次提交是幂等的不会影响数据的结果,现在没有了这样一个可序列化的事务hook,另外需要提交的数据也做了状态容错。但是Flink 在checkpoint机制中提供了一个唯一的标识checkpointId, 它是用户可访问的、单调递增的、容错的,任务失败之后会从最近一次成功点继续递增,那么就可以使用checkpointId 来作为事务提交的句柄,首先看一下逻辑流程:

1. invoke 方法:将需要提交的数据添加到内存List中 2. snapshotState方法:将checkpointId与list存放在状态中 3. notifyCheckpointComplete方法:将list与checkpointId做事务性提交,并且使用checkpointId做CAS机制 4. initializeState方法:从状态中恢复checkpointId与list数据,同样做事务性提交

代码实现:

代码语言:javascript
复制
public abstract class CommonTwoPhaseCommit<IN extends Serializable> extends RichSinkFunction<IN>

        implements CheckpointedFunction, CheckpointListener {





    private long checkpointId;

    private List<IN> dataList;



    private ListState<IN> dataListState;

    private ListState<Long> checkpointIdState;





    @Override public void initializeState(FunctionInitializationContext context) throws Exception {



        dataList=new ArrayList<>();

        dataListState=context.getOperatorStateStore().getSerializableListState("listdata");

        checkpointIdState=context.getOperatorStateStore().getListState(new ListStateDescriptor<Long>("checkpointI",Long.class));

        if(context.isRestored())

        {

            dataListState.get().forEach(x->{

                dataList.add(x);

            });

            Iterator<Long> ckIdIter=checkpointIdState.get().iterator();

            checkpointId=ckIdIter.next();

            commit(dataList,checkpointId);

        }

    }





    @Override public void invoke(IN value, Context context) throws Exception {

        dataList.add(value);

    }





    @Override public void snapshotState(FunctionSnapshotContext context) throws Exception {



        dataListState.clear();

        dataListState.addAll(dataList);

        dataList.clear();



        checkpointIdState.clear();

        checkpointId=context.getCheckpointId();

        checkpointIdState.addAll(Collections.singletonList(checkpointId));

    }



    @Override public void notifyCheckpointComplete(long checkpointId) throws Exception {

        commit(dataListState.get(),checkpointId);

    }



    /**

     * 使用checkpoint与数据库已经存在值进行比较,要求正好比其大1

     * @param data

     * @param checkpointId

     */

    public abstract void commit(Iterable<IN> data,long checkpointId);



}
 

那么只需要继承CommonTwoPhaseCommit 类,实现其commit方法,做事务提交即可。目前该方案用于对window窗口聚合的延时补偿处理中,输出端为MySql,后期将会研究对Redis等其他数据库如何做一致性处理。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-10-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Flink实战剖析 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
文件存储
文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档