前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink 消费 kafka 时如何处理空闲 task

Flink 消费 kafka 时如何处理空闲 task

作者头像
shengjk1
发布2020-03-26 12:33:08
5700
发布2020-03-26 12:33:08
举报
文章被收录于专栏:码字搬砖

我们都知道 flink 消费 kafka 是一个 partition 对应一个 task,但比如说 flink task 数多于 kafka partition 时。flink 是如何处理这个空闲的 task 的。

代码语言:javascript
复制
@Override
	public void run(SourceContext<T> sourceContext) throws Exception {
		if (subscribedPartitionsToStartOffsets == null) {
			throw new Exception("The partitions were not set for the consumer");
		}

		// initialize commit metrics and default offset callback method
		this.successfulCommits = this.getRuntimeContext().getMetricGroup().counter(COMMITS_SUCCEEDED_METRICS_COUNTER);
		this.failedCommits =  this.getRuntimeContext().getMetricGroup().counter(COMMITS_FAILED_METRICS_COUNTER);

		// offset commit 的回调方法,当 notifyCheckpointComplete 时,会调用此方法
		this.offsetCommitCallback = new KafkaCommitCallback() {
			@Override
			public void onSuccess() {
				successfulCommits.inc();
			}

			@Override
			public void onException(Throwable cause) {
				LOG.warn("Async Kafka commit failed.", cause);
				failedCommits.inc();
			}
		};

		// mark the subtask as temporarily idle if there are no initial seed partitions;
		// once this subtask discovers some partitions and starts collecting records, the subtask's
		// status will automatically be triggered back to be active.
		//标记为该 task 为空闲状态。什么样的场景会被标记为空闲状态呢?当 Flink 的并行度大于 partitions 数时,有一个 task 就会被标记为空闲状态
		//标记为空闲状态时,就会通知下游,我不在发送任何 recode 和 watermarks,可以理解为我不存在
		if (subscribedPartitionsToStartOffsets.isEmpty()) {
			sourceContext.markAsTemporarilyIdle();
		}

		......
	}

标记为空闲状态就完了

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020/03/25 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档