前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【Flink】第三十三篇: 任务线程模型

【Flink】第三十三篇: 任务线程模型

作者头像
章鱼carl
发布2022-03-31 11:26:24
2K0
发布2022-03-31 11:26:24
举报
文章被收录于专栏:章鱼carl的专栏

源码系列推荐:

【Flink】第二十四篇:源码角度分析 DataStream API 调用逻辑

【Flink】第二十五篇:源码角度分析作业提交逻辑

【Flink】第二十六篇:源码角度分析Task执行过程

【Flink】第二十八篇:Flink SQL 与 Apache Calcite

【Flink】第二十九篇:源码分析 Blink Planner

线程模型能帮助我们更深刻的理解Flink任务执行原理,更精确的控制Flink程序,这些是使用Flink解决复杂问题、写出高性能和高可用程序的基础。

例如,在运用DataStream API完成业务需求时,可以更精确的把控Function中每个field、state field的作用范围及其完整的生命周期,也可以帮助我们进一步思考线程并发、线程安全、线程同步等问题。

梗概

本文从JobMaster向TaskManager,通过RPC远程过程调用的方式,提交一次Task为分析线索,以线程调度为主题,对Flink这一过程的源码进行分析。

先对分析的结论进行简单阐述,以防止读者迷路。

1. 远端JobMaster通过TaskExecutor# submitTask,向其提交Task

2. 在TaskExecutor# submitTask中,

(1) 首先,完成Task的实例化,注意在实例化Task的过程中为其构造方法中提供了一个重要的新线程

(2) 然后,调用TaskExecutor# startTaskThread,运行Task中为其初始化的新线程,来运行task

在 (1) 中,完成Task实例化时,为其生成了一个新的Thread,这个Thread就是Mailbox线程模型的执行线程。

在 (2) 中,线程首先会去按照提交的StreamTask,进行反序列化生成相应的StreamTask,而在实例化StreamTaskl的过程中,将当前线程传递给了StreamTask,并进一步传递了TaskMailbox,并调用StreamTask# invoke,执行MailboxProcessor# runMailboxLoop,进入Mailbox的单线程循环执行模式。

Mailbox模型

Mailbox线程模型,它是Actor模型的理念,简单来说,本质就是一种生产者-消费者模型

包含一个阻塞队列和一个执行线程,有多个消费者,即任何线程都可以通过同步互斥的方式向阻塞队列(即Box)中添加Task(及Mail),而消费者只有一个线程,消费MailBox中的Mail并线程安全的执行它。

笔者在阅读源码的过程中发现Flink多处都用了这种模型,例如,akka、Flink的RPC等均是Mailbox模型。而且,这个模型从 Flink 1.9 开始实现,之前通过一个全局锁(checkpoint lock)来保证线程安全。

这部分推荐阅读:https://blog.csdn.net/yuchuanchen/article/details/105677408

源码阅读

TaskExecutor

TaskExecutor其实就是TaskManager的最外层封装了,而TaskExecutor又是通过TaskExecutorGateway来抽象所有的与TaskExecutor交互的Gateway,这种交互是基于RPC的。

为什么说TaskExecutor就是taskmanager的本身了?

TaskExecutor的注释:TaskExecutor负责执行多个Task。即是说taskmanager被分配了虚拟化的资源槽:taskslot,而这就是taskslot就可以被分配运行task。

在TaskExecutor中,我们最关心的是submitTask:

在其中就有JM从远程进行任务调度后进行Task初始化的代码:

生成Task实例后,便是调度其进行异步执行:

我们依然沿着任务调度,线程模型的线索,先来看看Task的实例化过程。

Task

Task的注释中非常重要的一句话,描述了Mailbox的影子:

Each Task is run by one dedicated thread. // 每个任务由一个专用线程运行。

并且所有为Task创建的Task的线程都从属于"Flink Task Threads"

这也是我们调试Flink源码时,发现Task线程的从属线程组都是Flink Task Threads:

接着我们在Task中寻找这个线程:

接着在构造方法中看到对它的初始化逻辑:

就是在这里,为这个Task的运行提供了一个新的线程,并且指定了线程组、Runnable逻辑、线程名称。

至此,完成了Task实例的初始化。第二个重要的操作便是执行这个task,即调用Task# startTaskThread:

Task# startTaskThrea仅仅是简单的调用了Thread# start,而Runnable我们也刚刚分析过了,就是刚刚初始化的Task实例,所以Task实例里的run方法就是线程执行的逻辑,run中又执行了doRun,doRun便是Task的核心执行逻辑,

在源码中可以看到,先通过反射的方式对具体的Task的逻辑进行了加载,然后便是调用其的invoke进行执行。对于我们用户定义的普通Stream代码,这里的invokable实例就是StreamTask。

StreamTask

这里我们要特别注意StreamTask和StreamOperator 的关系,以下是StreamTask的注释的翻译:

所有流式传输任务的基类。任务是由 TaskManager 部署和执行的本地处理单元。每个任务运行一个或多个StreamOperator ,它们形成了任务的操作员链。链接在一起的运算符在同一个线程中同步执行,因此在同一个流分区上。这些链的常见情况是连续的 map/flatmap/filter 任务。

所以,StreamTask包含一个或多个Operator,而例如连续的 map/flatmap/filter会形成operator-chain,便交给一个StreamTask执行。而StreamTask又是和线程对应的。

我们关心的是线程模型,所以相关的field是:

以及两个构造方法:

这里便可以看到Mailbox的身影了,用当前线程构造了TaskMailboxImpl实例,即StreamTask的子类。

又用这个mailbox构造了两个同样关键的实例:mailboxProcessor、mainMailboxExecutor。

在被Task# doRun中调用了StreamTask# invoke后,接着再来看看invoke:

很容易的看出, 核心的执行方法是runMailboxLoop,

而StreamTask# runMailboxLoop调用了MailboxProcessor# runMailboxLoop,所以我们来到MailboxProcessor的runMailboxLoop一探究竟,

MailboxProcessor

我们最关心的依然是mailbox的传递,这个mailbox成员便是我们刚刚在StreamTask的构造方法里传递给MailboxProcessor的传参,接着继续看调用方法,

所以,任务线程是在一个循环中,不断的从Mailbox中取出Mail,然后执行,这也和我们在Mailbox部分的介绍是一致的,源码及笔者的注释如下,

TaskMailboxImpl

顺着这个思路,我们继续来看看TaskMailbox,核心的代码及注释如下图,

而消费的方法tryTake如下,

主要是先对当前线程进行check,如果当前线程不是唯一的消费者线程,不允许消费。

接着是执行Mail:

底层是StreamTaskActionExecutor用一个mutex互斥同步监视器实现互斥调用:

那么Task主要包含哪些类别?

总结

至此,我们通过源码分析了,TaskManager是如何接收JobManager调度给它的Task,并且又是如何创建执行线程,通过构造方法一步一步传递给了Mailbox线程模型,完成单消费者线程安全的执行各类Stream消息,即Mail。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-02-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 章鱼沉思录 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档