Apache Flink作为开源的分布式流处理框架,受到了广泛的关注和应用。本文将分享如何从零开始搭建一个Flink运行环境,并在其上运行一个“WordCount”的例子程序。
Flink支持在Linux、MacOS和Windows三大平台上部署。本文以Linux环境为例。
需要的软件依赖如下:
# 安装JDK
yum install -y java-1.8.0-openjdk-devel
# 安装Maven
yum install -y maven
接着下载Flink压缩包进行解压:
wget https://dlcdn.apache.org/flink/flink-1.14.5/flink-1.14.5-bin-scala_2.12.tgz
tar -xvf flink-1.14.5-bin-scala_2.12.tgz
单机模式下,JobManager和TaskManager均运行在同一台机器上。
# 启动JobManager
./bin/start-cluster.sh
# 提交并运行WordCount程序
./bin/flink run examples/streaming/WordCount.jar
本文以单机模式为例进行讲解。实际生产环境中,建议部署在集群模式下运行。
在集群模式下,JobManager和TaskManager会部署在不同节点上。
以此实现Flink在分布式环境下高可靠且高性能的计算。
WordCount是一个流式WordCount程序,读取文本源头,以单词为单位进行计数统计。
// 定义文本源DataStream
DataStream<String> text = env.socketTextStream("localhost", 9999);
//将每行内容切分转换成单词列表
DataStream<String> words = text
.flatMap(new FlatMapFunction<String, String>() {
public void flatMap(String value, Collector<String> out) {
String[] split = value.toLowerCase().split("\\W+");
// ...
}
});
//按单词进行计数统计
DataStream<Tuple2<String, Long>> counts = words
.keyBy(value -> value)
.sum(1);
//输出结果
counts.print();
编译打包项目,使用FlinkClient提交Job:
mvn clean package
bin/flink run target/wordcount-1.0-SNAPSHOT.jar
运行程序,使用netcat工具发送输入字符串,可以实时看到统计结果:
nc localhost 9999
hello world bye
hello again
这里提供一个完整的WordCount流处理程序代码示例:
// 导入Flink相关包
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
public class WordCount {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从文件读取文本行数据源
DataStream<String> text = env.addSource(new MySourceFunction());
// 将每行内容切分成单词
DataStream<String> words = text.flatMap(new FlatMapFunction<String, String>() {
public void flatMap(String value, Collector<String> out) {
String[] splits = value.split("\\s+");
for (String word : splits) {
out.collect(word);
}
}
});
// 按单词进行分组计数
DataStream<Tuple2<String, Long>> result = words.keyBy(e -> e)
.timeWindow(Time.seconds(5))
.sum(1);
// 打印最终结果
result.print();
// 执行任务
env.execute("WordCount");
}
// 自定义文本数据源
public static class MySourceFunction implements SourceFunction<String> {
@Override
public void run(SourceContext<String> ctx) throws Exception {
// 从文件或集合读取文本
// ...
ctx.collect("hello world");
}
@Override
public void cancel() {
}
}
}
该示例从文件读取文本行,进行词频统计,并以对象流的方式输出结果。希望能给您一个完整代码实例的参考!
Flink可以利用Yarn资源管理器来管理和调度Flink作业的执行。主要有以下步骤:
安装Hadoop并配置Yarn资源管理器。
修改flink-conf.yaml配置文件,添加如下配置:
yarn.distributed.enabled: true
mvn package -Pyarn
./bin/flink run -m yarn-cluster -yn 1 -ys 1 /path/to/job.jar
-m 参数指定使用Yarn作为资源管理器,-yn -ys 分配给任务的Container数量。
可以在Yarn ResourceManager WebUI中查看和监控Flink作业状态。
使用Flink Cli同样可以停止和重启在Yarn上运行的作业。
与此同时,Yarn也能根据负载自动扩缩容Flink作业上的Container数量。这样实现了Flink与Yarn的良好集成。
通过上述步骤就可以利用Yarn的资源管理能力来管理Flink分布式作业的执行了。
Flink通过Table API和SQL来支持时间窗口的操作。
下面通过一个例子来说明:
导入Flink的TableEnvironment:
TableEnvironment tableEnv = TableEnvironment.create(env);
从Kafka读取数据注册成Table:
tableEnv.connect(new FlinkKafkaConsumer<>(...)
.property(...));
使用DDL定义Table结构:
CREATE TABLE inputTable (
id STRING,
timestamp TIMESTAMP,
...)
WITH (...);
使用TUMBLE或HOP动态时间窗口
SELECT
id,
COUNT(*)
FROM
inputTable
GROUP BY
TUMBLE(timestamp, INTERVAL '5' MINUTE)
支持窗口函数如SUM、COUNT、MAX等聚合计算:
SELECT
SUM(amount)
FROM
inputTable
GROUP BY
HOP(timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE)
将结果输出到Kafka或打印:
tableEnv.toRetractStream[Row]...
通过Table API和SQL的时间窗口支持,可以更高效地操作和处理时间序列数据流。开发者可以使用熟悉的SQL语法进行流处理。
这里提供一个完整的使用SQL实现单词计数的示例:
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = TableEnvironment.create(env);
// 从Kafka读取文本行数据
tableEnv.connect(new FlinkKafkaConsumer<>(...)
.topic("kafka_topic"))
.withFormat(new SimpleStringSchema())
.createTemporaryTable("lines");
// 分词表
tableEnv.executeSql(
"CREATE TABLE words WITH ('connector' = 'upsert', 'url' = '...") AS " +
"SELECT " +
" ROW_NUMBER() OVER() AS id, " +
" word " +
"FROM lines, LATERAL(FLATTEN(SPLIT(lines, ' ')))";
// 窗口聚合表
tableEnv.executeSql(
"CREATE TABLE word_counts WITH ('connector' = 'upsert', 'url' = '...'") AS " +
"SELECT " +
" word, " +
" COUNT(*) AS count " +
"FROM words " +
"GROUP BY TUMBLE(rowtime, INTERVAL '5' SECOND), word");
// 输出结果
tableEnv.executeSql("INSERT INTO sink SELECT * FROM word_counts");
// 执行程序
env.execute();
这个完整示例包含数据输入、分词、窗口聚合和结果输出的全流程SQL定义。希望对您理解SQL实现流处理过程有帮助。
所以Flink时间窗口的原理就是:根据时间戳分配事件到窗口,窗口聚合操作更新状态,窗口关闭时输出结果。它独立于算子,为流处理引入了时间的概念。
如果一次从Kafka拉取的数据中,有一半的数据在当前时间窗口内,一半在窗口外,Flink会进行如下处理:
所以Flink可以正确区分时间窗口内外的数据:
这样保证了时间正确性,不会导致窗口结果计算错误
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。