专栏首页码字搬砖一文搞定 Flink 消费消息的全流程

一文搞定 Flink 消费消息的全流程

我们以下面代码为例:

FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("canal_monitor_order_astable", new SimpleStringSchema(), properties);
		consumer.setStartFromEarliest();
		
env.addSource(consumer).flatMap(...).print()

当 Flink 程序启动,leader、blobServer 等都创建完毕,当 ExecutionGraph 构建完成,提交成功之后。就到了,task 正式执行的阶段了。这个时候,一条消息是如何流转的呢? 首先,进入了 Task 的 run 方法

......
/*
			这个方法就是用户代码所真正被执行的入口。比如我们写的什么 new MapFunction() 的逻辑,最终就是在这里被执行的
			 */
			// run the invokable
			invokable.invoke();
......

然后就到了 StreamTask 的 invoke 方法,这里是每个算子真正开始执行的地方

......
run();
.....

最为关键的就是 run 方法。 进入 SourceStreamTask run 方法

@Override
	// source task 获取数据的入口方法
	protected void run() throws Exception {
		headOperator.run(getCheckpointLock(), getStreamStatusMaintainer());
	}

继续追踪就到了 StreamSource 的 run 方法

......
	// 生成上下文之后,接下来就是把上下文交给 SourceFunction 去执行,用户自定义的 run 方法开始正式运行
			userFunction.run(ctx);
......

此处的 userFunction 实际上就是 FlinkKafkaConsumer 具体是如何消费消息的可以参考 写给大忙人看的Flink 消费 Kafka 彻底搞懂 Flink Kafka OffsetState 存储 继续追踪到 RecordWriter

private void emit(T record, int targetChannel) throws IOException, InterruptedException {
		// 最底层的抽象是 MemorySegment,用于数据传输的是 Buffer,将 java 对象转化为 buffer 是这个
		// Flink 把对象调用该对象所属的序列化器序列化为字节数组
		serializer.serializeRecord(record);
		
		if (copyFromSerializerToTargetChannel(targetChannel)) {
			serializer.prune();
		}
	}

RecordWriter 还是比较有意思的,RecordWriter 主要就是把 java 对象转化为 byte 数组( 也就是 flink 自己管理内存,不借助与 JVM )。而后面的传输也是基于 byte 数组的。

copyFromSerializerToTargetChannel 会将 byte 数据 flush 到 相应的 targetChannel ( targetChannel 对于下游来说就是 InputChannel 具体可以参考一下 Flink反压机制 ) 底层通过 netty 进行数据的传送,传送至 PartitionRequestQueue

......
if (cause != null) {
						ErrorResponse msg = new ErrorResponse(
							new ProducerFailedException(cause),
							reader.getReceiverId());

						// 真正往 netty 的 nio 通道里写入.
						// 在这里,写入的是一个 RemoteInputChannel,对应的就是下游节点的 InputGate 的 channels。
						ctx.writeAndFlush(msg);
					}
......

这个时候,这条数据就进入了下游的 InputChannel 。 有写得需要有读,进入到 CreditBasedPartitionRequestClientHandler

// nio 通道的另一端( 下游 )需要读入 buffer
	// 上游的算子写入,下游的算子读取,这也是反压的原理
	// 为什么叫 decodeMsg,主要上游传过来的是 byte 数组,这个将 byte 数组 转化为 record
	private void decodeMsg(Object msg) throws Throwable {
		final Class<?> msgClazz = msg.getClass();

		// ---- Buffer --------------------------------------------------------
		if (msgClazz == NettyMessage.BufferResponse.class) {
			NettyMessage.BufferResponse bufferOrEvent = (NettyMessage.BufferResponse) msg;

			RemoteInputChannel inputChannel = inputChannels.get(bufferOrEvent.receiverId);
			if (inputChannel == null) {
				bufferOrEvent.releaseBuffer();

				cancelRequestFor(bufferOrEvent.receiverId);

				return;
			}

			decodeBufferOrEvent(inputChannel, bufferOrEvent);

		} else if (msgClazz == NettyMessage.ErrorResponse.class) {
			// ---- Error ---------------------------------------------------------
			NettyMessage.ErrorResponse error = (NettyMessage.ErrorResponse) msg;

			SocketAddress remoteAddr = ctx.channel().remoteAddress();

			if (error.isFatalError()) {
				notifyAllChannelsOfErrorAndClose(new RemoteTransportException(
					"Fatal error at remote task manager '" + remoteAddr + "'.",
					remoteAddr,
					error.cause));
			} else {
				RemoteInputChannel inputChannel = inputChannels.get(error.receiverId);

				if (inputChannel != null) {
					if (error.cause.getClass() == PartitionNotFoundException.class) {
						inputChannel.onFailedPartitionRequest();
					} else {
						inputChannel.onError(new RemoteTransportException(
							"Error at remote task manager '" + remoteAddr + "'.",
							remoteAddr,
							error.cause));
					}
				}
			}
		} else {
			throw new IllegalStateException("Received unknown message from producer: " + msg.getClass());
		}
	}

至此呢,就该下游算子 flapMap 运行处理了。(当然啦,实际上应该是先 print 对应的 task 运行,然后 flatMap 对应的 task 运行,最后才是 source 对应的 task 运行 )。

我们得回到 Task 的 run 方法

......
/*
			这个方法就是用户代码所真正被执行的入口。比如我们写的什么 new MapFunction() 的逻辑,最终就是在这里被执行的
			 */
			// run the invokable
			invokable.invoke();
......

然后就到了 StreamTask 的 invoke 方法,这里是每个算子真正开始执行的地方

......
run();
.....

最为关键的就是 run 方法。 这次调用的是 flatMap 对应 task 的 run 方法,所以进入 OneInputStreamTask

	@Override
	protected void run() throws Exception {
		// cache processor reference on the stack, to make the code more JIT friendly
		final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;
		//处理输入的消息
		while (running && inputProcessor.processInput()) {
			// all the work happens in the "processInput" method
		}
	}

进入 processInput 方法

//            程序首先获取下一个 buffer
			// 主要是尝试获取 buffer,然后赋值给当前的反序列化器
			// 处理 barrier 的逻辑,被包含在了getNextNonBlocked 中
			final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
			if (bufferOrEvent != null) {
				if (bufferOrEvent.isBuffer()) {
					currentChannel = bufferOrEvent.getChannelIndex();
					currentRecordDeserializer = recordDeserializers[currentChannel];
					currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
				}
				else {
					// Event received
					final AbstractEvent event = bufferOrEvent.getEvent();
					if (event.getClass() != EndOfPartitionEvent.class) {
						throw new IOException("Unexpected event: " + event);
					}
				}
			}

获取到 buffer 之后

// 这里就是真正的,用户的代码即将被执行的地方
						// now we can do the actual processing
						StreamRecord<IN> record = recordOrMark.asRecord();
						synchronized (lock) {
							numRecordsIn.inc();
							//set KeyContext setCurrentKey
							streamOperator.setKeyContextElement1(record);
							streamOperator.processElement(record);
						}
						return true;

交给 flatMap 去处理。处理完了之后就又把数据发往 RecordWriter 的 emit 然后就这样反复执行,直到最后一个 operator ,这个消息也就消费完毕了。当然了,这仅仅是跨 taskManager 的消息流程,同一个 taskMananger 的消息流程就很简单了,就是简单的消息传递,不需要序列化成 byte 数组

总结一下

整体流程

1. 第一步必然是准备一个ResultPartition; 2. 通知JobMaster; 3. JobMaster通知下游节点;如果下游节点尚未部署,则部署之; 4. 下游节点向上游请求数据 5. 开始传输数据

数据跨 task 传输

1. 数据在本operator处理完后,交给RecordWriter。每条记录都要选择一个下游节点,所以要经过ChannelSelector。 2. 每个channel都有一个serializer(我认为这应该是为了避免多线程写的麻烦),把这条Record序列化为ByteBuffer 3. 接下来数据被写入ResultPartition下的各个subPartition里,此时该数据已经存入DirectBuffer(MemorySegment) 4. 单独的线程控制数据的flush速度,一旦触发flush,则通过Netty的nio通道向对端写入 5. 对端的netty client接收到数据,decode出来,把数据拷贝到buffer里,然后通知InputChannel 6. 有可用的数据时,下游算子从阻塞醒来,从InputChannel取出buffer,再解序列化成record,交给算子执行用户代码

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • kafka权限认证

    背景: 最近公司因为用的云服务器,需要保证kafka的安全性。可喜的是kafka0.9开始,已经支持权限控制了。网上中文资料又少,特此基于kafka0.9,...

    shengjk1
  • Flink 自定义 countAndTimeTrigger

    项目中需要自定义 trigger,需要基于两个条件:1. count 即 msg 的个数,当个数大于某个数时触发窗口 2. time 即每个固定的时间触发窗口

    shengjk1
  • 一文搞定 Flink Task 提交执行全流程

    这里创建了一个 Task 对象并启动,我们来看一下 Task 启动的时候都做了什么

    shengjk1
  • Linux内核源代码情景分析-访问权限与文件安全性

    在Linux内核源代码情景分析-从路径名到目标节点,一文中path_walk代码中,err = permission(inode, MAY_EXEC)当前...

    小小科
  • MySQL:The server quit without updating PID file

    按照字面意思么,就是没有更新PID文件,于是乎我就去我的MySQL目录,看了一下,并建了一个PID文件,随便弄了个进程号进去(童鞋们这么搞首先得确定这个进程号,...

    用户2353021
  • .NET Core 中的路径问题

    .NET Core 应用程序相对于以前的.NET Framework 应用程序在启动运行的方式上有一定的差异,今天就来谈一谈这个获取应用程序启动路径的问题。

    晓晨
  • .NET Core 中的路径问题

    晓晨
  • retrofit 源码分析

    loadServiceMethod: 拿到对应的解析器,根据注解解析方法的返回类型,方法参数,网络请求的一系列参数 封装成一个对象

    曾大稳
  • 这一次搞懂Spring代理创建及AOP链式调用过程

    AOP,也就是面向切面编程,它可以将公共的代码抽离出来,动态的织入到目标类、目标方法中,大大提高我们编程的效率,也使程序变得更加优雅。如事务、操作日志等都可以使...

    夜勿语
  • itext7知识点研究(PDF编辑)

    static class MyEventListener implements IEventListener { private List<Recta...

    老梁

扫码关注云+社区

领取腾讯云代金券