前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink1.4 并发执行

Flink1.4 并发执行

作者头像
smartsi
发布2019-08-07 08:32:52
1K0
发布2019-08-07 08:32:52
举报
文章被收录于专栏:SmartSiSmartSi

本节介绍如何在Flink中配置程序的并行执行。一个Flink程序由多个任务(transformations/operatorsdata sourcessinks)组成。一个任务被分成多个并发实例来执行,每个并发实例只处理任务输入数据的一个子集。一个任务的并发实例的个数称为并发度(parallelism)。

如果你想使用保存点,也应该考虑设置最大并发度。从保存点恢复时,可以更改特定算子或整个程序的并发度,并且此配置指定了并发的上限。

1. 设置并发度

一个任务的并发度可以在Flink中指定不同级别。

1.1 算子级别

单个算子,数据源,sink可以通过调用setParallelism()方法来定义并发度。例如,像这样:

Java版本:

代码语言:javascript
复制
DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = text
    .flatMap(new LineSplitter())
    .keyBy(0)
    .timeWindow(Time.seconds(5))
    .sum(1).setParallelism(5);

wordCounts.print();

env.execute("Word Count Example");

Scala版本:

代码语言:javascript
复制
val env = StreamExecutionEnvironment.getExecutionEnvironment

val text = [...]
val wordCounts = text
    .flatMap{ _.split(" ") map { (_, 1) } }
    .keyBy(0)
    .timeWindow(Time.seconds(5))
    .sum(1).setParallelism(5)
wordCounts.print()

env.execute("Word Count Example")
1.2 执行环境级别

如这所述,Flink程序是在执行环境的上下文中执行的。执行环境为它执行的所有算子,数据源和数据sink提供了默认的并发度。执行环境的并发度可以通过显式配置一个算子的并发度来覆盖。

执行环境的默认并发度可以通过调用setParallelism()方法来指定。要为执行的所有算子,数据源和sink设置并发度为3,请按如下方式设置执行环境的默认并发度:

Java版本:

代码语言:javascript
复制
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);

DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = [...]
wordCounts.print();

env.execute("Word Count Example");

Scala版本:

代码语言:javascript
复制
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(3)

val text = [...]
val wordCounts = text
    .flatMap{ _.split(" ") map { (_, 1) } }
    .keyBy(0)
    .timeWindow(Time.seconds(5))
    .sum(1)
wordCounts.print()

env.execute("Word Count Example")
1.3 客户端级别

在向Flink提交作业时,可以在客户端设置并发度。客户端可以是JavaScala程序。Flink的命令行接口(CLI)就是一种客户端。

对于CLI客户端,可以使用-p指定并发度参数。 例如:

代码语言:javascript
复制
./bin/flink run -p 10 ../examples/*WordCount-java*.jar

Java/Scala程序中,并发度设置如下:

Java版本:

代码语言:javascript
复制
try {
    PackagedProgram program = new PackagedProgram(file, args);
    InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123");
    Configuration config = new Configuration();

    Client client = new Client(jobManagerAddress, config, program.getUserCodeClassLoader());

    // set the parallelism to 10 here
    client.run(program, 10, true);

} catch (ProgramInvocationException e) {
    e.printStackTrace();
}

Scala版本:

代码语言:javascript
复制
try {
    PackagedProgram program = new PackagedProgram(file, args)
    InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123")
    Configuration config = new Configuration()

    Client client = new Client(jobManagerAddress, new Configuration(), program.getUserCodeClassLoader())

    // set the parallelism to 10 here
    client.run(program, 10, true)

} catch {
    case e: Exception => e.printStackTrace
}
1.4 系统级别

可以通过在./conf/flink-conf.yaml中设置parallelism.default属性来为所有执行环境定义全系统默认并发度。详细信息请参阅配置文档。

2. 设置最大并发度

最大并发度可以在可以设置并发度的地方设置(客户端级别和系统级别除外)。你可以调用setMaxParallelism()取代setParallelism()方法来设置最大并发度。

最大并发度的默认设置大致为operatorParallelism +(operatorParallelism / 2),下限为127,上限为32768

备注:

代码语言:javascript
复制
将最大并发度设置为非常大的数值可能会对性能造成不利影响,因为一些后端状态必须保持在内部数据结构,而这些内部数据结构随key-groups(这是可扩展状态的内部实现机制)的数量进行扩展。(some state backends have to keep internal data structures that scale with the number of key-groups (which are the internal implementation mechanism for rescalable state).)

备注:

代码语言:javascript
复制
Flink版本:1.4
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018-01-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 设置并发度
    • 1.1 算子级别
      • 1.2 执行环境级别
        • 1.3 客户端级别
          • 1.4 系统级别
          • 2. 设置最大并发度
          相关产品与服务
          大数据
          全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档