前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Flink】第二十六篇:源码角度分析Task执行过程

【Flink】第二十六篇:源码角度分析Task执行过程

作者头像
章鱼carl
发布2022-03-31 11:20:09
6080
发布2022-03-31 11:20:09
举报
文章被收录于专栏:章鱼carl的专栏章鱼carl的专栏

源码分析系列推荐:

【Flink】第四篇:【迷思】对update语义拆解D-、I+后造成update原子性丢失

【Flink】第十五篇:Redis Connector 数据保序思考

【Flink】第十六篇:源码角度分析 sink 端的数据一致性

【Flink】第二十四篇:源码角度分析 DataStream API 调用逻辑

【Flink】第二十五篇:源码角度分析作业提交逻辑

继上篇 【Flink】第二十五篇:源码角度分析作业提交逻辑 我们分析了Flink在执行execute提交作业前,将用户编写的业务UDF逻辑封装成List<Transformation>数据结构,然后,在执行execute提交作业中,又用递归算法将其绘制成DAG数据结构,并且进行了四层的DAG转换,最终,转换为可调度的ExecutionGraph。

本文接着分析Task被调度到TaskManager上后,Task是如何处理输入数据和输出数据。

依旧以socket window wordcount程序为例,

代码语言:javascript
复制
public class WindowWordCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        env
                .socketTextStream("127.0.0.1", 5555)
                .flatMap(new Splitter())
                .keyBy(value -> value.f0)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .sum(1)
                .print();
        env.execute("Window WordCount");
    }

    public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word : sentence.split(" ")) {
                out.collect(new Tuple2<>(word, 1));
            }
        }
    }
}

这次笔者尝试优化之前文章的行文逻辑,将结论进行切分,然后一段结论再结合一段源码分析,本文的主要线索以wrodcount程序处理一次输入数据的过程为线索,在探索这个线索的过程中,以期能达到抽丝剥茧的方式为读者呈现。主要内容有两点:

  1. 从TaskManager的subtask线程如何执行调用到了用户的自定义UDF业务逻辑代码
  2. 输入分区和输出分区的对应关系

直接上flatmap算子的调用栈,如下

可以看到,栈底是Thread,这个Thread应该是MiniCluster启动的subtask的执行线程,在往上就是flink抽象的运行时角色的实例了,例如,Task,StreamTask,自底向上逐渐由面向Thread的层面过渡到面向flink的udf用户逻辑层面。

我们直接从Task调起StreamTask的入口看起,Task将接收到的输入数据给了StreamTask的processInput,

StreamTask又将其交给inputProcessor(StreamInputProcessor)处理。

而在StreamTask与StreamInputProcessor之间使用了Mailbox线程模型,它是一个单线程的模型,在此只做简单介绍,

先来看下这个改造/改进最初的动机,在之前 Flink 的线程模型中,会有多个潜在的线程去并发访问其内部的状态,比如 event-processing 和 checkpoint triggering,它们都是通过一个全局锁(checkpoint lock)来保证线程安全,这种实现方案带来的问题是:

1. 锁对象会在多个类中传递,代码的可读性比较差

2. 在使用时,如果没有获取锁,可能会造成很多问题,使得问题难以定位

3. 锁对象还暴露给了面向用户的 API

基于上面的这些问题,关于线程模型,提出了一个全新的解决方案 —— MailBox 模型,它可以让 StreamTask 中所有状态的改变都会像在单线程中实现得一样简单。方案借鉴了 Actor 模型的 MailBox 设计理念,它会让这些 action 操作(需要获取 checkpoint lock 的操作)先加入到一个 阻塞队列,然后主线程再从队列取相应的 mail task 去执行。

最后真正执行的是 MailboxProcessor 中的 runMailboxLoop() 方法,也就是上面说的 MailBox 主线程,StreamTask 运行的核心流程也是在这个方法中,其实现如下:

上面的方法中,最关键的有两个地方:

1. processMail(): 它会检测 MailBox 中是否有 mail 需要处理,如果有的话,就做相应的处理,一直将全部的 mail 处理完才会返回,只要 loop 还在进行,这里就会返回 true,否则会返回 false

2. runDefaultAction(): 这个最终调用的 StreamTask 的 processInput() 方法,event-processing 的处理就是在这个方法中进行的

我们沿着StreamTask的线索继续探索,在processInput中,StreamTask将消息交给了StreamInputProcessor,而StreamInputProcessor是对StreamTask中读取数据的行为抽象,具体由StreamTaskInput完成,如下就是StreamInputProcessor调用StreamTaskInput的emitNext处理输入数据,

而StreamTaskInput是StreamTask输入数据的抽象,将输入数据反序列后交给StreamTaskNetWorkOutput。同时,StreamTaskInput有两个主要子类:

1. StreamTaskNetworkInput:使用InputGate读取数据

2. StreamTaskSourceInput:使用SourceFunction读取数据

那么,接着来看StreamTaskNetworkInput是如何处理StreamTask传递进来的输入数据的,

这里把流数据元素的抽象StreamElement划分为了四类,与我们在之前介绍的一致:

  • Record
  • Watermark
  • LatencyMarker
  • StreamStatus

在此我们先顺着调用栈的线索,进入OneInputStreamTask,

OneInputStreamTask持有了OneInputStreamOperator对输入进行处理,而OneInputStreamOperator我们在之前已经介绍过,它其实就是用户的UDF业务逻辑的封装,在这里因为我们进入的是FlatMap的调用栈,所以,运行时的实例是StreamFlatMap,所以继续进入这个类的处理元素的方法,

在这里,我们就和wordcount自定义的FlatMapFunction对接上了,他调用了userFunction的flatMap接口运行wordcount中的分词逻辑,即最终执行了如下wordcount代码,

代码语言:javascript
复制
public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
    @Override
    public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
        for (String word : sentence.split(" ")) {
            out.collect(new Tuple2<>(word, 1));
        }
    }
}

讲完本文的第一个内容再来看看第二个:输入分区和输出分区的对应关系

以上是一个典型的物理执行计划,

在数据输出方面,主要包含两个核心抽象:

  • ResultPartition:是一个Task的输出的抽象,包含若干ResultSubPartition。
  • ResultSubPartition:下游请求数据是请求ResultSubPartition,而不是ResultPartition,负责实际上存储Buffer

ResultPartition数量决定因素主要是上游并行度。

ResultSubPartition数量决定因素主要是:下游并行度 + 上游数据分发模式

另外,关于Buffer,我们在【Flink】第八篇:Flink 内存管理 中已经介绍过Flink的内存模型。在Flink中Java对象的有效信息被序列化,在内存中连续存储,保存在预分配的内存块上,内存块叫作MemorySegment,即内存分配的最小单元。很多运算可以直接操作序列化的二进制数据,而不需要反序列化。MemorySegment可以在堆上:Java byte数组;也可以在堆外:ByteBuffer。Task算子处理完数据后,将结果交给下游的时候,使用的抽象或者说内存对象是Buffer。其实现类是NetworkBuffer。一个NetworkBuffer包装了一个MemorySegment。

在数据输入方面,主要包含两个核心抽象:

  • InputGate:是一个Task的输入数据的抽象,包含若干InputChannel,主要包含SignleInputGate和UnionInputGate两个实现类
  • InputChannel:实际负责数据消费的是InputChannel,主要包含LocalInputChannel,即数据本地性;RmoteInputChannel,即跨网络数据交换Flink选择了Netty

一个InputChannel对应上游一个ResultSubPartition。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-09-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 章鱼沉思录 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
文件存储
文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档