前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink写Elasticsearch导致Checkpoint频繁失败的解决方案

Flink写Elasticsearch导致Checkpoint频繁失败的解决方案

作者头像
大数据真好玩
发布2022-06-17 14:01:05
8920
发布2022-06-17 14:01:05
举报

最近做的一个需求,在一个Flink程序中,根据数据里面的ip进行分流,每个流对应一个ES的索引,一共有14个索引,开启checkpoint。

运行报错:

有些sink始终会导致ck失败,数据量也就100条。

出现这个问题后,把ck去掉,程序正常写入,不报错了。

没有ck肯定不行,所以将ck加回来,后来看了下ElasticsearchSinkBase类的代码,实现了CheckpointedFunction接口,重写了snapshotState方法,里面会根据flushOnCheckpoint成员变量判断是否进行flush。

public abstract class ElasticsearchSinkBase<T, C extends AutoCloseable> extends RichSinkFunction<T> implements CheckpointedFunction {

 /** If true, the producer will wait until all outstanding action requests have been sent to Elasticsearch. */
 private boolean flushOnCheckpoint = true;

 /**
  * Disable flushing on checkpoint. When disabled, the sink will not wait for all
  * pending action requests to be acknowledged by Elasticsearch on checkpoints.
  *
  * <p>NOTE: If flushing on checkpoint is disabled, the Flink Elasticsearch Sink does NOT
  * provide any strong guarantees for at-least-once delivery of action requests.
  */
 public void disableFlushOnCheckpoint() {
  this.flushOnCheckpoint = false;
 }

 @Override
 public void snapshotState(FunctionSnapshotContext context) throws Exception {
  checkAsyncErrorsAndRequests();

  if (flushOnCheckpoint) {
   while (numPendingRequests.get() != 0) {
    bulkProcessor.flush();
    checkAsyncErrorsAndRequests();
   }
  }
 }
}

很明显,是这里导致的问题,调用disableFlushOnCheckpoint关闭flush

ElasticsearchSink<Row> elasticsearchSink = esSinkBuilder.build();
// 关掉基于Checkpoint的flush
elasticsearchSink.disableFlushOnCheckpoint();

问题解决

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-05-24,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据真好玩 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档