专栏首页SmartSiFlink1.4 并发执行

Flink1.4 并发执行

本节介绍如何在Flink中配置程序的并行执行。一个Flink程序由多个任务(transformations/operatorsdata sourcessinks)组成。一个任务被分成多个并发实例来执行,每个并发实例只处理任务输入数据的一个子集。一个任务的并发实例的个数称为并发度(parallelism)。

如果你想使用保存点,也应该考虑设置最大并发度。从保存点恢复时,可以更改特定算子或整个程序的并发度,并且此配置指定了并发的上限。

1. 设置并发度

一个任务的并发度可以在Flink中指定不同级别。

1.1 算子级别

单个算子,数据源,sink可以通过调用setParallelism()方法来定义并发度。例如,像这样:

Java版本:

DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = text
    .flatMap(new LineSplitter())
    .keyBy(0)
    .timeWindow(Time.seconds(5))
    .sum(1).setParallelism(5);

wordCounts.print();

env.execute("Word Count Example");

Scala版本:

val env = StreamExecutionEnvironment.getExecutionEnvironment

val text = [...]
val wordCounts = text
    .flatMap{ _.split(" ") map { (_, 1) } }
    .keyBy(0)
    .timeWindow(Time.seconds(5))
    .sum(1).setParallelism(5)
wordCounts.print()

env.execute("Word Count Example")

1.2 执行环境级别

如这所述,Flink程序是在执行环境的上下文中执行的。执行环境为它执行的所有算子,数据源和数据sink提供了默认的并发度。执行环境的并发度可以通过显式配置一个算子的并发度来覆盖。

执行环境的默认并发度可以通过调用setParallelism()方法来指定。要为执行的所有算子,数据源和sink设置并发度为3,请按如下方式设置执行环境的默认并发度:

Java版本:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);

DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = [...]
wordCounts.print();

env.execute("Word Count Example");

Scala版本:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(3)

val text = [...]
val wordCounts = text
    .flatMap{ _.split(" ") map { (_, 1) } }
    .keyBy(0)
    .timeWindow(Time.seconds(5))
    .sum(1)
wordCounts.print()

env.execute("Word Count Example")

1.3 客户端级别

在向Flink提交作业时,可以在客户端设置并发度。客户端可以是JavaScala程序。Flink的命令行接口(CLI)就是一种客户端。

对于CLI客户端,可以使用-p指定并发度参数。 例如:

./bin/flink run -p 10 ../examples/*WordCount-java*.jar

Java/Scala程序中,并发度设置如下:

Java版本:

try {
    PackagedProgram program = new PackagedProgram(file, args);
    InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123");
    Configuration config = new Configuration();

    Client client = new Client(jobManagerAddress, config, program.getUserCodeClassLoader());

    // set the parallelism to 10 here
    client.run(program, 10, true);

} catch (ProgramInvocationException e) {
    e.printStackTrace();
}

Scala版本:

try {
    PackagedProgram program = new PackagedProgram(file, args)
    InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123")
    Configuration config = new Configuration()

    Client client = new Client(jobManagerAddress, new Configuration(), program.getUserCodeClassLoader())

    // set the parallelism to 10 here
    client.run(program, 10, true)

} catch {
    case e: Exception => e.printStackTrace
}

1.4 系统级别

可以通过在./conf/flink-conf.yaml中设置parallelism.default属性来为所有执行环境定义全系统默认并发度。详细信息请参阅配置文档。

2. 设置最大并发度

最大并发度可以在可以设置并发度的地方设置(客户端级别和系统级别除外)。你可以调用setMaxParallelism()取代setParallelism()方法来设置最大并发度。

最大并发度的默认设置大致为operatorParallelism +(operatorParallelism / 2),下限为127,上限为32768

备注:

将最大并发度设置为非常大的数值可能会对性能造成不利影响,因为一些后端状态必须保持在内部数据结构,而这些内部数据结构随key-groups(这是可扩展状态的内部实现机制)的数量进行扩展。(some state backends have to keep internal data structures that scale with the number of key-groups (which are the internal implementation mechanism for rescalable state).)

备注:

Flink版本:1.4

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Hadoop 数据压缩简介

    文件压缩带来两大好处:它减少了存储文件所需的空间,并加速了数据在网络或者磁盘上的传输速度。在处理大量数据时,这两项节省可能非常重要,因此需要仔细考虑如何在 Ha...

    smartsi
  • Storm与Redis集成

    Storm-redis 提供了基本的 Bolt 实现:RedisLookupBolt,RedisStoreBolt 以及 RedisFilterBolt。

    smartsi
  • Spark内部原理之内存管理

    Spark 作为一个基于内存的分布式计算引擎,其内存管理模块在整个系统中扮演着非常重要的角色。理解 Spark 内存管理的基本原理,有助于更好地开发 Spark...

    smartsi
  • Python 高级并发

    就是直接用『原子操作』(atomic operation)所实现的并发。这种并发是给程序库的编写者用的, 而应用程序开发者则不需要它,因为这种写法很容易出错,而...

    用户1416054
  • 到底什么级别才算是高并发?

    https://segmentfault.com/a/1190000010844969

    Java技术栈
  • 聊聊面试中关于并发问题的应对方案

    技术zhai
  • Web开发中说高并发的时候,我们在说什么

    大家先心里仔细想想,当你们听到高并发网站时,心里对这个网站是个什么概念?首先想到的是淘宝吗?带着问题,我们一起思考技术

    大愚
  • 突破Java面试(45)-高并发系统的架构设计

    所以如果你确实有真才实学,在互联网公司里干过高并发系统,那你确实拿offer基本如探囊取物,没啥问题。

    JavaEdge
  • 百度压测,分析性能拐点

    空闲之余用jmeter对百度进行了一次压测,目的是分析一下性能的拐点,验证一下理论知识

    飞天小子
  • 学习PHP性能优化正式篇(一)

    首先我们要先了解到如何判断一个的性能上限是多少,这就为我们引入了压测工具的了解和使用,常用的压测工具当然就是Apache 开源基金会的 ab工具了。

    Hi胡瀚

扫码关注云+社区

领取腾讯云代金券