我们有一个包含操作的管道,分为两个工作负载-- Source -> Transform
属于第一个组,并且是CPU密集型的工作负载,它们被放入相同的时隙共享组,比如source
。和Sink
,内存密集型的工作负载,因为它使用批量上传和存储的数据量在内存中.它被发送到sink
插槽共享组。
此外,Source -> Transform
工作负载和Sink
工作负载具有不同的并行级别,因为第一种工作负载受源并行性的限制。例如,我们的Source -> Transform
并行度为50,而Sink
并行度为78。我们有8个TMs,每个核都有16个核(因此也有槽)。
在这种情况下,我们理想的时隙分配策略似乎是在每个TM上为Source -> Transform
分配6-7个插槽,而对于Sink
领先的CPU-RAM工作负载,则在所有TMs中分配大致均匀。
那么,我想知道是否有一些配置设置,它将告诉分配槽共享组均匀?
我只找到了cluster.evenly-spread-out-slots配置参数,但我不确定它是否实际上均匀地分配了时隙共享组,而不仅仅是时隙--例如,我得到了TMs,同时我期望有6个或7个Source -> Transform
任务。
那么,问题是是否可以告诉Flink在集群内平均分配时隙共享组?或者还有其他的可能吗?
在任务管理器中均匀分配Flink运算符似乎有点类似于我的问题,但我主要问的是插槽共享组的分布。本主题还仅包含使用cluster.evenly-spread-out-slots的建议,但此后可能发生了一些变化。
发布于 2020-11-05 10:29:47
我找到了一个解决办法,使插槽共享组的分布均匀。
从flink 1.9.2开始,甚至引入了任务分发特性,它可以通过cluster.evenly-spread-out-slots: true
在flink-conf.yaml
:FLINK-12122在所有可用的注册TaskManagers中均匀地分配任务。中打开。我试着启用它,但没有起作用。在深入研究之后,我找到了开发人员的评论,其中指出,该功能只在独立模式下工作,因为它需要预先分配的资源- https://issues.apache.org/jira/browse/FLINK-12122?focusedCommentId=17013089&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17013089"。
该功能只保证将任务分散到在调度时注册的TMs集合中。因此,当您使用active Yarn模式并提交第一个作业时,就不会有任何TMs注册。因此,Flink将分配第一个容器,填充它,然后只分配一个新容器。但是,如果您在独立模式下启动Flink,或者在您的第一份工作完成后,仍然有一些TMs注册,那么下一个工作将被分散。
因此,我们的想法是使用增加的空闲容器超时设置启动分离纱段,首先提交一些短活假作业,这将简单地从纱线中获取所需的资源并完成,然后立即启动将分配给已经分配的容器的主管道,在这种情况下,cluster.evenly-spread-out-slots: true
完成了对所有插槽共享组的分配。
因此,总括而言,以下是为了在工作中得到分布均匀的时隙共享组而做的:
resourcemanager.taskmanager-timeout
,以便在为空闲任务管理器释放的容器之前提交主作业。我把这个增加到1分钟,这就足够了。yarn-session
并向其动态提交作业。val env = StreamExecutionEnvironment.getExecutionEnvironment
val job = env
.fromElements(0)
.map { x =>
x * 2
}
.setParallelism(parallelismMax)
.print()
val jobResult = env.execute("Resources pre-allocation job")
println(jobResult)
print("Done. Starting main job!")
发布于 2020-10-30 16:21:17
我试过一次来实现这一点,但问题是,Flink没有提供一个功能来启用操作员的位置。我可以得到的接近是使用.map(...).slotSharingGroup("name");
。正如有关"集槽共享组“的文档所述:
设置操作的时隙共享组。Flink将把具有相同时隙共享组的操作放到相同的时隙中,同时保持在其他时隙中没有时隙共享组的操作。这可以用来隔离插槽。如果所有输入操作都在同一个时隙共享组中,则从输入操作继承时隙共享组。默认插槽共享组的名称为“默认”,可以通过调用slotSharingGroup(“默认”)显式地将操作放入该组。 someStream.filter(...).slotSharingGroup("name");
因此,我根据任务插槽的数量以及并行性定义了不同的组。
https://stackoverflow.com/questions/64607665
复制相似问题