专栏首页码字搬砖FlinkConsumer是如何保证一个partition对应一个thread的

FlinkConsumer是如何保证一个partition对应一个thread的

版权声明:本文为博主原创,欢迎转载,转载请标明出处 Blog Address:http://blog.csdn.net/jsjsjs1789 https://blog.csdn.net/jsjsjs1789/article/details/90576232

我们都知道flink 连接kafka时,默认是一个partition对应一个thread,它究竟是怎么实现的呢?以及到我们自己定义 RichParallelSourceFunction 的时候如何借鉴这部分代码呢? 我们一起来看一下(基于flink-1.8) 看过flink kafka连接器源码的同学对 FlinkKafkaConsumerBase 应该不陌生(没有看过的也无所谓,我们一起来看就好) 一起来看一下 FlinkKafkaConsumerBase 的 open 方法中关键的部分

//获取fixed topic's or topic pattern 's   partitions of this subtask
		final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();

没错这就是查看Flink Consumer 保证 一个partition对应一个Thread的入口方法

public List<KafkaTopicPartition> discoverPartitions() throws WakeupException, ClosedException {
		if (!closed && !wakeup) {
			try {
			...
				// (2) eliminate partition that are old partitions or should not be subscribed by this subtask
				if (newDiscoveredPartitions == null || newDiscoveredPartitions.isEmpty()) {
					throw new RuntimeException("Unable to retrieve any partitions with KafkaTopicsDescriptor: " + topicsDescriptor);
				} else {
					Iterator<KafkaTopicPartition> iter = newDiscoveredPartitions.iterator();
					KafkaTopicPartition nextPartition;
					while (iter.hasNext()) {
						nextPartition = iter.next();
						//从之前已经发现的KafkaTopicPartition中移除,其二可以保证仅仅是这个subtask的partition
						if (!setAndCheckDiscoveredPartition(nextPartition)) {
							iter.remove();
						}
					}
				}

				return newDiscoveredPartitions;
			...
	}

关键性的部分 setAndCheckDiscoveredPartition 方法,点进去

public boolean setAndCheckDiscoveredPartition(KafkaTopicPartition partition) {
		if (isUndiscoveredPartition(partition)) {
			discoveredPartitions.add(partition);
			
			//kafkaPartition与indexOfThisSubTask --对应
			return KafkaTopicPartitionAssigner.assign(partition, numParallelSubtasks) == indexOfThisSubtask;
		}
		return false;
	}

indexOfThisSubtask 表示当前线程是那个subtask,numParallelSubtasks 表示总共并行的subtask 的个数, 当其返回true的时候,表示此partition 属于此indexOfThisSubtask。 下面来看一下具体是怎么划分的

public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) {
		int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks;

		// here, the assumption is that the id of Kafka partitions are always ascending
		// starting from 0, and therefore can be used directly as the offset clockwise from the start index
		return (startIndex + partition.getPartition()) % numParallelSubtasks;
	}

基于topic 和 partition,然后对numParallelSubtasks取余。

那么,当我们自己去定义RichParallelSourceFunction的时候如何去借鉴它呢,直接上代码:

public class WordSource extends RichParallelSourceFunction<Tuple2<Long, Long>> {
	
	private Boolean isRun = true;
	
	@Override
	public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
		int start = 0;
		int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
		while (isRun) {
			start += 1;
			if (start % numberOfParallelSubtasks == getRuntimeContext().getIndexOfThisSubtask()) {
				ctx.collect(new Tuple2<>(
						Long.parseLong(start+""),
						1L));
				Thread.sleep(1000);
				System.out.println("Thread.currentThread().getName()=========== " + Thread.currentThread().getName());
			}
		}
	}
	
	@Override
	public void cancel() {
		isRun = false;
	}
}

当当当,自此,自己定义个RichParallelSourceFunction也可以并行发数据了,啦啦啦啦!

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Comparator 和 Comparable的区别

    1.位置 Comparable 位于 java.lang,我们都知道 java.lang包可以说是 java体系中基础包,有 ClassLoader、Clas...

    shengjk1
  • CDH离线安装文档

    准备工作 1, 配置hostname vi /etc/sysconfig/network 修改hostname: NETWORKING=yes ...

    shengjk1
  • hive原理分析

    基于hive1.1(从hive2.0开始,hive开始基于内存了,底层的实现框架不在基于MR)

    shengjk1
  • Spring Security 实战干货——搞清楚UserDetails

    前一篇介绍了 Spring Security 入门的基础准备。从今天开始我们来一步步窥探它是如何工作的。我们又该如何驾驭它。请多多关注公众号: Felordcn...

    码农小胖哥
  • 【JDK1.8】JDK1.8集合源码阅读——Set汇总

    joemsu
  • 【JDK1.8】JDK1.8集合源码阅读——Set汇总

    joemsu
  • 运维是做什么的?史上最全互联网Linux工作规划!十分钟找到linux运维工程师职业方向!

    首先祝贺你选择学习Linux,你可能即将踏上Linux的工作之旅,出发之前,让我带你来看一看关于Linux和Linux运维的一切。

    马哥教育
  • 新手司机带你看神经网络

    什么是神经网络 我们现在所谈论的神经网络不是动物或者人上的神经网络,而是为计算机量身定制的神经系统。 计算机神经网络是一种模仿生物的神经中枢或者动物的神经网络...

    云时之间
  • System Generator & HLS数字信号处理教程

    System Generator & Vivado HLS数字信号处理教程(暨FPGA高级数字信号处理教程)已经发布,包含如下内容:

    狂人V
  • 程序员进阶之路 -- 算法刷题必备神器

    不少程序员提起算法可能都心惊胆战,但是又逃不过真香定理,因为会算法的程序员真的惹不起,先不说会算法的要比不会算法的薪资多好多,单是哪个思维逻辑能力就能甩开好大一...

    周三不加班

扫码关注云+社区

领取腾讯云代金券