前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >零距离接触Flink:全面解读流计算框架入门与实操指南

零距离接触Flink:全面解读流计算框架入门与实操指南

原创
作者头像
司夜
修改2023-09-14 10:48:52
6390
修改2023-09-14 10:48:52
举报
文章被收录于专栏:开发三两事开发三两事

前言

Apache Flink作为开源的分布式流处理框架,受到了广泛的关注和应用。本文将分享如何从零开始搭建一个Flink运行环境,并在其上运行一个“WordCount”的例子程序。

Flink环境搭建

1. 环境准备

Flink支持在Linux、MacOS和Windows三大平台上部署。本文以Linux环境为例。

需要的软件依赖如下:

  • JDK 8或以上版本
  • Maven 3.5+
  • Flink 1.14.5版本
代码语言:javascript
复制
# 安装JDK
yum install -y java-1.8.0-openjdk-devel

# 安装Maven
yum install -y maven

接着下载Flink压缩包进行解压:

代码语言:javascript
复制
wget https://dlcdn.apache.org/flink/flink-1.14.5/flink-1.14.5-bin-scala_2.12.tgz 
tar -xvf flink-1.14.5-bin-scala_2.12.tgz

2.单机模式运行Flink

单机模式下,JobManager和TaskManager均运行在同一台机器上。

代码语言:javascript
复制
# 启动JobManager
./bin/start-cluster.sh

# 提交并运行WordCount程序
./bin/flink run examples/streaming/WordCount.jar

本文以单机模式为例进行讲解。实际生产环境中,建议部署在集群模式下运行。

3. 分布式集群模式

在集群模式下,JobManager和TaskManager会部署在不同节点上。

  • 首先在一台机器上启动ResourceManager
  • 在其他Worker节点上启动TaskManager
  • 提交Job到JobManager进行调度和运行

以此实现Flink在分布式环境下高可靠且高性能的计算。

4. 编写WordCount程序

WordCount是一个流式WordCount程序,读取文本源头,以单词为单位进行计数统计。

代码语言:javascript
复制
// 定义文本源DataStream
DataStream<String> text = env.socketTextStream("localhost", 9999); 

//将每行内容切分转换成单词列表
DataStream<String> words = text
  .flatMap(new FlatMapFunction<String, String>() {
     public void flatMap(String value, Collector<String> out) {
       String[] split = value.toLowerCase().split("\\W+");
       // ...
    }
  });
  
//按单词进行计数统计        
DataStream<Tuple2<String, Long>> counts = words
  .keyBy(value -> value)
  .sum(1);
  
//输出结果
counts.print();

5. 运行和结果

编译打包项目,使用FlinkClient提交Job:

代码语言:javascript
复制
mvn clean package

bin/flink run target/wordcount-1.0-SNAPSHOT.jar

运行程序,使用netcat工具发送输入字符串,可以实时看到统计结果:

代码语言:javascript
复制
nc localhost 9999
hello world bye
hello again

6.代码示例

这里提供一个完整的WordCount流处理程序代码示例:

代码语言:javascript
复制
// 导入Flink相关包
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class WordCount {

  public static void main(String[] args) throws Exception {

    // 创建执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // 从文件读取文本行数据源
    DataStream<String> text = env.addSource(new MySourceFunction());

    // 将每行内容切分成单词
    DataStream<String> words = text.flatMap(new FlatMapFunction<String, String>() {
      public void flatMap(String value, Collector<String> out) {
        String[] splits = value.split("\\s+");
        for (String word : splits) {
          out.collect(word);
        }
      }
    });

    // 按单词进行分组计数
    DataStream<Tuple2<String, Long>> result = words.keyBy(e -> e)
      .timeWindow(Time.seconds(5)) 
      .sum(1);

    // 打印最终结果
    result.print();

    // 执行任务
    env.execute("WordCount");

  }

  // 自定义文本数据源
  public static class MySourceFunction implements SourceFunction<String> {

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
      // 从文件或集合读取文本 
      // ...
      ctx.collect("hello world"); 
    }

    @Override
    public void cancel() {

    }
  }

}

该示例从文件读取文本行,进行词频统计,并以对象流的方式输出结果。希望能给您一个完整代码实例的参考!

Flink与Yarn集成

Flink可以利用Yarn资源管理器来管理和调度Flink作业的执行。主要有以下步骤:

1. 安装和配置Yarn

安装Hadoop并配置Yarn资源管理器。

2. 配置Flink支持Yarn

修改flink-conf.yaml配置文件,添加如下配置:

代码语言:javascript
复制
yarn.distributed.enabled: true

3. 打包Flink项目为Yarn应用

代码语言:javascript
复制
mvn package -Pyarn 

4. 提交Flink作业到Yarn

代码语言:javascript
复制
./bin/flink run -m yarn-cluster -yn 1 -ys 1 /path/to/job.jar

-m 参数指定使用Yarn作为资源管理器,-yn -ys 分配给任务的Container数量。

5. Yarn WebUI监控作业

可以在Yarn ResourceManager WebUI中查看和监控Flink作业状态。

6. 停止和重启作业

使用Flink Cli同样可以停止和重启在Yarn上运行的作业。

与此同时,Yarn也能根据负载自动扩缩容Flink作业上的Container数量。这样实现了Flink与Yarn的良好集成。

通过上述步骤就可以利用Yarn的资源管理能力来管理Flink分布式作业的执行了。

Flink通过时间窗口操作sql

Flink通过Table API和SQL来支持时间窗口的操作。

下面通过一个例子来说明:

1. 定义数据源

导入Flink的TableEnvironment:

代码语言:javascript
复制
TableEnvironment tableEnv = TableEnvironment.create(env);

从Kafka读取数据注册成Table:

代码语言:javascript
复制
tableEnv.connect(new FlinkKafkaConsumer<>(...)
  .property(...));

2. 定义表结构

使用DDL定义Table结构:

代码语言:javascript
复制
CREATE TABLE inputTable (
  id STRING, 
  timestamp TIMESTAMP,
  ...)
WITH (...); 

3. 定义窗口

使用TUMBLE或HOP动态时间窗口

代码语言:javascript
复制
SELECT 
  id, 
  COUNT(*) 
FROM 
  inputTable
GROUP BY 
  TUMBLE(timestamp, INTERVAL '5' MINUTE)

4. 窗口转换

支持窗口函数如SUM、COUNT、MAX等聚合计算:

代码语言:javascript
复制
SELECT 
  SUM(amount) 
FROM 
  inputTable
GROUP BY 
  HOP(timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE)

5. 输出结果

将结果输出到Kafka或打印:

代码语言:javascript
复制
tableEnv.toRetractStream[Row]...

通过Table API和SQL的时间窗口支持,可以更高效地操作和处理时间序列数据流。开发者可以使用熟悉的SQL语法进行流处理。

6. sql任务代码示例

这里提供一个完整的使用SQL实现单词计数的示例:

代码语言:javascript
复制
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = TableEnvironment.create(env); 

// 从Kafka读取文本行数据
tableEnv.connect(new FlinkKafkaConsumer<>(...)
  .topic("kafka_topic"))
  .withFormat(new SimpleStringSchema())
  .createTemporaryTable("lines");

// 分词表
tableEnv.executeSql(
  "CREATE TABLE words WITH ('connector' = 'upsert', 'url' = '...") AS " +
  "SELECT " +
  "  ROW_NUMBER() OVER() AS id, " +
  "  word " + 
  "FROM lines, LATERAL(FLATTEN(SPLIT(lines, ' ')))";

// 窗口聚合表   
tableEnv.executeSql(
  "CREATE TABLE word_counts WITH ('connector' = 'upsert', 'url' = '...'") AS " +
  "SELECT " +
  "  word, " +
  "  COUNT(*) AS count " +
  "FROM words " +
  "GROUP BY TUMBLE(rowtime, INTERVAL '5' SECOND), word");

// 输出结果
tableEnv.executeSql("INSERT INTO sink SELECT * FROM word_counts");

// 执行程序
env.execute();

这个完整示例包含数据输入、分词、窗口聚合和结果输出的全流程SQL定义。希望对您理解SQL实现流处理过程有帮助。

时间窗口说明

1. 滚动窗口

  • 滚动窗口分为定长窗口(TUMBLE)和滑动窗口(HOP)两种。
  • 定长窗口将事件锁定到连续的固定大小时间窗口中,窗口不重合。
  • 滑动窗口以固定时间间隔滑动,窗口重合部分可重复计算。

2. 窗口分配

  • 每条事件根据时间戳分配到对应的窗口份组中。
  • 窗口分配采用窗函数TIMESTAMP_WINDOW(timeField,窗口大小)实现。

3. 窗口聚合

  • 事件分配完毕后,对每个窗口执行聚合操作(如COUNT、SUM等)。
  • 窗口会将中间结果保存在状态后端(如RocksDB)。

4. 窗口结果输出

  • 窗口被关闭时(到期),将最终结果输出。
  • 也可以提前输出或定期输出中间结果。

5. 状态管理

  • 窗口状态会进行快照保存,实现断点续传重启能力。
  • 状态由KeyedStateBackend管理,比如RocksDB。

所以Flink时间窗口的原理就是:根据时间戳分配事件到窗口,窗口聚合操作更新状态,窗口关闭时输出结果。它独立于算子,为流处理引入了时间的概念。

6. 同批次时间窗口处理逻辑

如果一次从Kafka拉取的数据中,有一半的数据在当前时间窗口内,一半在窗口外,Flink会进行如下处理:

  • 先根据事件时间戳,将数据分配到对应的时间窗口分区组(keyed state)中。
  • 对每个时间窗口分区组单独处理:
    • 时间窗口内的数据按正常流程进行聚合计算。
    • 时间窗口外的数据不会参与当前窗口的聚合,但是会加入该key的back pressure。
  • 窗口结果输出时:
    • 只输出当前窗口已经关闭的分区组的结果。其他分区组处于开启状态,不会输出。
  • 周期性检查窗口状态:
    • 关闭那些超出时间范围的过期窗口。
    • 对还未到期的窗口继续累积状态,待到期后输出结果。

所以Flink可以正确区分时间窗口内外的数据:

  • 窗口内数据参与当前窗口计算
  • 窗口外数据加入back pressure,未来窗口处理
  • 只输出实际到期窗口的结果

这样保证了时间正确性,不会导致窗口结果计算错误

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • Flink环境搭建
    • 1. 环境准备
      • 2.单机模式运行Flink
        • 3. 分布式集群模式
          • 4. 编写WordCount程序
            • 5. 运行和结果
              • 6.代码示例
              • Flink与Yarn集成
                • 1. 安装和配置Yarn
                  • 2. 配置Flink支持Yarn
                    • 3. 打包Flink项目为Yarn应用
                      • 4. 提交Flink作业到Yarn
                        • 5. Yarn WebUI监控作业
                          • 6. 停止和重启作业
                          • Flink通过时间窗口操作sql
                            • 1. 定义数据源
                              • 2. 定义表结构
                                • 3. 定义窗口
                                  • 4. 窗口转换
                                    • 5. 输出结果
                                      • 6. sql任务代码示例
                                      • 时间窗口说明
                                        • 1. 滚动窗口
                                          • 2. 窗口分配
                                            • 3. 窗口聚合
                                              • 4. 窗口结果输出
                                                • 5. 状态管理
                                                  • 6. 同批次时间窗口处理逻辑
                                                  领券
                                                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档