专栏首页Java探索之路Flink——运行在数据流上的有状态计算框架和处理引擎

Flink——运行在数据流上的有状态计算框架和处理引擎

第一章 是什么

Apache Flink® - Stateful Computations over Data Streams

Apache Flink是一个框架和分布式处理引擎,用于对无限制和有限制的数据流进行有状态的计算。 Flink被设计为可以在所有常见的集群环境中运行,以内存速度和任何规模执行计算。 Flink 官网网址:https://flink.apache.org/

一 Flink架构相关概念

架构图

处理无界和有界数据

任何类型的数据都是作为事件流产生的。信用卡交易,传感器测量,机器日志或网站或移动应用程序上的用户交互,所有这些数据都作为流生成。

数据可以作为无界流或有界流处理。

  • 无界流有一个起点,但没有定义的终点。它们不会终止并在生成数据时提供数据。无限制的流必须被连续处理,即,事件被摄取后必须立即处理。无法等待所有输入数据到达,因为输入是无界的,并且在任何时间都不会完成。处理无限制的数据通常要求以特定顺序(例如事件发生的顺序)提取事件,以便能够推断出结果的完整性。
  • 有界流具有定义的开始和结束。可以通过在执行任何计算之前提取所有数据来处理有界流。由于有界数据集始终可以排序,因此不需要有序摄取即可处理有界流。绑定流的处理也称为批处理。

Apache Flink擅长处理无边界和有边界的数据集。对时间和状态的精确控制使Flink的运行时能够在无限制的流上运行任何类型的应用程序。有界流由专门为固定大小的数据集设计的算法和数据结构在内部进行处理,从而产生出色的性能。

在任何地方部署应用程序

  • Apache Flink是一个分布式系统,需要计算资源才能执行应用程序。Flink与所有常见的群集资源管理器(如Hadoop YARN,Apache Mesos和Kubernetes)集成,但也可以设置为作为独立群集运行。
  • Flink旨在与前面列出的每个资源管理器配合使用。这是通过特定于资源管理器的部署模式实现的,该模式允许Flink以其惯用方式与每个资源管理器进行交互。
  • 部署Flink应用程序时,Flink会根据应用程序配置的并行性自动识别所需的资源,并向资源管理器请求它们。如果发生故障,Flink会通过请求新资源来替换发生故障的容器。提交或控制应用程序的所有通信均通过REST调用进行。这简化了Flink在许多环境中的集成。

任意规模运行应用程序

Flink旨在运行任何规模的有状态流应用程序。将应用程序并行化为可能在群集中分布并同时执行的数千个任务。因此,应用程序几乎可以利用无限数量的CPU,主内存,磁盘和网络IO。而且,Flink易于维护非常大的应用程序状态。它的异步和增量检查点算法可确保对处理延迟的影响最小,同时可保证一次状态一致性。

用户报告了其生产环境中运行的Flink应用程序的可伸缩性数字,例如

  • 每天处理数万亿事件的应用程序,
  • 维护多个TB状态的应用程序,以及
  • 运行在数千个内核上的应用程序。

利用内存性能

有状态Flink应用程序针对本地状态访问进行了优化。任务状态始终保持在内存中,或者,如果状态大小超出可用内存,则始终保持在访问有效的磁盘数据结构中。因此,任务通过访问通常处于内存中的状态来执行所有计算,从而产生非常低的处理延迟。Flink通过定期将本地状态异步指向持久性存储,从而确保在故障情况下一次准确的状态一致性。

二 应用领域

  • 流应用程序的构建块 流处理框架可以构建和执行的应用程序的类型由框架控制流,状态和时间的能力定义。
  • 流 显然,流是流处理的基本方面。但是,流可能具有不同的特性,这些特性会影响流的处理方式。 Flink是一个通用的处理框架,可以处理任何类型的流。
  • 状态 每个非平凡的流应用程序都是有状态的,即,仅对单个事件应用转换的应用程序不需要状态。 任何运行基本业务逻辑的应用程序都需要记住事件或中间结果,以便在以后的某个时间点访问它们. 例如,在收到下一个事件时或在特定的持续时间之后。 应用程序状态是Flink中的一等公民。通过查看Flink在状态处理上下文中提供的所有功能
  • 时间 时间是流应用程序的另一个重要组成部分。大多数事件流具有固有的时间语义,因为每个事件都是在特定的时间点产生。 此外,许多常见的流计算都是基于时间的,例如窗口聚合,会话化,模式检测和基于时间的联接。 流处理的一个重要方面是应用程序如何测量时间,即事件时间与处理时间之差。
  • 分层API Flink提供了三层API。每个API在简洁性和表达性之间提供了不同的权衡,并且针对不同的用例。
  • ProcessFunctions ProcessFunctions是Flink提供的最具表现力的功能接口。Flink提供了ProcessFunctions来处理来自一个或两个输入流或分组在一个窗口中的事件的单个事件。ProcessFunctions提供对时间和状态的细粒度控制。ProcessFunction可以任意修改其状态并注册计时器,这些计时器将来会触发回调函数。因此,ProcessFunctions可以根据许多有状态事件驱动的应用程序的需要实现复杂的每事件业务逻辑。

三 运作方式

Apache Flink是用于无限制和有限制的数据流上的有状态计算的框架。由于许多流应用程序的设计目的是在最少的停机时间内连续运行,因此流处理器必须提供出色的故障恢复能力,以及在运行时监视和维护应用程序的工具。 Apache Flink将重点放在流处理的操作方面。在这里,我们将说明Flink的故障恢复机制,并介绍其功能来管理和监督正在运行的应用程序

不间断运行应用程序24/7

机器和过程故障在分布式系统中无处不在。像Flink这样的分布式流处理器必须从故障中恢复,才能运行24/7的流应用程序。 显然,这不仅意味着失败后重新启动应用程序,而且还确保其内部状态保持一致,使应用程序可以像从未发生过失败那样继续进行处理。

Flink提供了一些功能来确保应用程序保持运行并保持一致:

  • 一致的检查点:Flink的恢复机制基于应用程序状态的一致的检查点。如果发生故障,将重新启动应用程序,并从最新的检查点加载其状态。与可重置的流源结合使用时,此功能可以保证一次状态一致性。
  • 高效的检查点:如果应用程序的状态保持TB级,则对应用程序的状态进行检查会非常昂贵。Flink可以执行异步和增量检查点,以使检查点对应用程序延迟SLA的影响很小。
  • 端到端精确一次:Flink具有特定存储系统的事务接收器,即使在发生故障的情况下,也可以保证数据仅被精确地写入一次。
  • 与集群管理器集成:Flink与集群管理器紧密集成,例如Hadoop YARN,Mesos或Kubernetes。当流程失败时,新流程将自动开始以接管其工作。
  • 高可用性设置:Flink具有高可用性模式,可消除所有单点故障。HA模式基于Apache ZooKeeper,这是一项经过实践检验的服务,可实现可靠的分布式协调。

更新,迁移,暂停和恢复应用程序

需要维护支持关键业务服务的流应用程序。需要修复错误,并需要改进或实现新功能。但是,更新有状态流应用程序并非易事。通常,一个人无法简单地停止应用程序并重新启动一个固定或改进的版本,因为一个人无法承受失去应用程序状态的负担。

Flink的保存点是一项独特而强大的功能,可以解决更新有状态应用程序的问题以及许多其他相关挑战。保存点是应用程序状态的一致快照,因此与检查点非常相似。但是,与检查点相比,保存点需要手动触发,并且在停止应用程序时不会自动将其删除。保存点可用于启动状态兼容的应用程序并初始化其状态。保存点启用以下功能:

  • 应用程序演化:保存点可用于演化应用程序。可以从先前版本的应用程序中获取的保存点重新启动应用程序的固定版本或改进版本。也可以从较早的时间点启动应用程序(如果存在这样的保存点),以修复有缺陷的版本产生的错误结果。
  • 集群迁移:使用保存点,可以将应用程序迁移(或克隆)到不同的集群。 Flink版本更新:可以使用保存点迁移应用程序以在新的Flink版本上运行。
  • 应用程序缩放:保存点可用于增加或减少应用程序的并行性。 A / B测试和假设方案:可以通过从同一保存点启动所有版本来比较应用程序的两个(或多个)不同版本的性能或质量。
  • 暂停和恢复:可以通过保存一个点并停止它来暂停应用程序。在以后的任何时间点,都可以从保存点恢复应用程序。 归档:可以将保存点归档,以便将应用程序的状态重置为较早的时间点。

监视和控制应用程序

就像其他任何服务一样,需要监视连续运行的流应用程序并将其集成到组织的操作基础架构(即监视和日志记录服务)中。 监视有助于预测问题并提前做出反应。通过日志记录可以进行根本原因分析以调查故障。易于访问的界面是控制运行中的应用程序的重要功能。

Flink与许多常用的日志记录和监视服务很好地集成在一起,并提供REST API来控制应用程序和查询信息。

  • Web UI:Flink具有Web UI,可检查,监视和调试正在运行的应用程序。它也可以用于提交执行以执行或取消执行。
  • 日志记录:Flink实现了流行的slf4j日志记录接口,并与日志记录框架log4j或logback集成。
  • 指标:Flink具有完善的指标系统,可收集和报告系统和用户定义的指标。指标可以导出到多个报告器,包括JMX,Ganglia,Graphite,Prometheus,StatsD,Datadog和Slf4j。
  • REST API:Flink公开REST API来提交新应用程序,获取正在运行的应用程序的保存点或取消应用程序。REST API还公开了正在运行或已完成的应用程序的元数据和收集的指标。
  • Strom:纯实时处理数据,吞吐量小 --水龙头滴水
  • SparkStreaming : 准实时处理数据,微批处理数据,吞吐量大 --河道中开闸关闸
  • Flink:纯实时处理数据,吞吐量大 --河流远远不断
  • MR & Flink & Storm & SparkStreaming 对比

Flink使用java语言开发,提供了scala编程的接口。 使用java或者scala开发Flink是需要使用jdk8版本。 如果使用Maven,maven版本需要使用3.0.4及以上。


第二章 编程模型

一 第一个Flink程序-WordCount

步骤

  1. 在IDEA中创建Maven项目
  1. 导入pom.xml:
 <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.7.1</flink.version>
    </properties>

    <dependencies>
        <!-- Flink 依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-wikiedits_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- Flink Kafka连接器的依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>

        <!-- Flink Scala2.11 版本 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>1.7.1</version>
        </dependency>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.12</version>
        </dependency>

        <!-- log4j 和slf4j 包,如果在控制台不想看到日志,可以将下面的包注释掉-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-nop</artifactId>
            <version>1.7.25</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.5</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- 在maven项目中既有java又有scala代码时配置 maven-scala-plugin 插件打包时可以将两类代码一起打包 -->
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.4</version>
                <configuration>
                    <!-- 设置false后是去掉 MySpark-1.0-SNAPSHOT-jar-with-dependencies.jar 后的 “-jar-with-dependencies” -->
                    <!--<appendAssemblyId>false</appendAssemblyId>-->
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass>com.lw.myflink.Streaming.FlinkSocketWordCount</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>assembly</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
 
  1. 创建被统计的文本
  1. 程序代码
package ah.szxy.flink;

import akka.stream.impl.fusing.GroupBy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.*;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * 创建Flink程序,统计单词数目
 *  1.创建环境
 *      批处理: ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment();
 *      流处理: StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
 *  2.在批处理中Flink处理的数据对象是DataSet
 *    在流处理中Flink处理的数据对象是DataStream
 *  3.代码流程必须符合    source ->transformation->sink
 *     transformation 都是懒执行, 需要最后使用env.execute()触发执行或者使用print(),count(),collect()触发执行
 *  4.Flink编程不是K.V格式的编程, 通过某些方式来虚拟key
 *  5.Flink中的tuple最多支持25个元素, 每个元素都是从0开始
 *
 * @author TimePause
 * @create 2019-12-08 21:04
 */
public class WordCont {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment();
        DataSource<String> dataSource=env.readTextFile("./data/word.txt");
        FlatMapOperator<String,String> words=dataSource.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String w, Collector<String> collector) throws Exception {
                String[] split = w.split(" ");
                for (String word : split) {
                    collector.collect(word);
                }
            }
        });
        MapOperator<String,Tuple2<String,Integer>> reduceWords =words.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String w) throws Exception {
                return new Tuple2<String, Integer>(w,1);
            }
        });

        UnsortedGrouping<Tuple2<String, Integer>> grouping = reduceWords.groupBy(0);
        //输出到控制台
       AggregateOperator<Tuple2<String, Integer>> result = grouping.sum(1);
       result.print();
        //输出到文件中
       // DataSet<Tuple2<String, Integer>> dataSet = grouping.sum(1);
       // DataSink<Tuple2<String, Integer>> dataSink = dataSet.writeAsText("./data/reslut/r1");
       // env.execute("myflink");


    }
} 
  1. 执行结果

总结

Flink处理数据流程 Source -> Transformations ->Sink 数据源头 -> 数据转换 -> 数据输出

Flink程序的执行过程:

  1. 获取flink的执行环境(execution environment)
  2. 加载数据-- soure
  3. 对加载的数据进行转换 – transformation
  4. 对结果进行保存或者打印 --sink
  5. 触发flink程序的执行(execute(),count(),collect(),print()),例如:调用ExecutionEnvironment或者StreamExecutionEnvironment的execute()方法。

Flink中数据类型

  • 有界数据流
  • 无界数据流

Flink三种处理数据模型

  • Flink批处理 Flink批处理中处理的是有界数据流 --Dataset
  • Flink流式处理 Flink流式处理中有界数据流也有无界数据流 --DataStream
  • FlinkSQL处理 有界数据流也有无界数据流

二 分区设置和排序

  • 设置全局分区 env.setParallelism(1);
  • 为某个算子设置分区 DataSet<Tuple2<String, Integer>> dataSet = grouping.sum(1).setParallelism(1);
  • 分区排序演示代码
public class WordCont {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //设置全局分区
        // env.setParallelism(1);
        DataSource<String> dataSource = env.readTextFile("./data/word.txt");
        FlatMapOperator<String, String> words = dataSource.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String w, Collector<String> collector) throws Exception {
                String[] split = w.split(" ");
                for (String word : split) {
                    collector.collect(word);
                }
            }
        });
        MapOperator<String, Tuple2<String, Integer>> reduceWords = words.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String w) throws Exception {
                return new Tuple2<String, Integer>(w, 1);
            }
        });

        UnsortedGrouping<Tuple2<String, Integer>> grouping = reduceWords.groupBy(0);
        //输出到控制台
//       AggregateOperator<Tuple2<String, Integer>> result = grouping.sum(1);
        DataSet<Tuple2<String, Integer>> dataSet = grouping.sum(1).setParallelism(1);  //为单个的算子设置分区
        SortPartitionOperator<Tuple2<String, Integer>> result = dataSet.sortPartition(1, Order.DESCENDING);
        result.print();
    }
}

三 设置 source和 sink

  • 设置数据源头 DataSource<String> dataSource = env.readTextFile("./data/word.txt");
  • 设置数据输出:
   DataSet<Tuple2<String, Integer>> dataSet = grouping.sum(1).setParallelism(1);  //为单个的算子设置分区
   SortPartitionOperator<Tuple2<String, Integer>> result = dataSet.sortPartition(1, Order.DESCENDING);
   //csv文件, 生成的文件以指定分隔符分隔,默认为逗号
   result.writeAsCsv("./data/result/r2", "\n", "&", FileSystem.WriteMode.OVERWRITE);

四 Flink网址分析案例

实现过滤以 https:// 开头的网址

/**
 * 统计网站链接情况
 *
 * @author TimePause
 * @create 2019-12-09 10:11
 */
public class Flink360site {
    public static void main(String[] args) throws Exception {
        //创建批处理环境
        ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment();
        //设置分区
        env.setParallelism(1);
        //指定数据源
        DataSource<String> dataSource = env.readTextFile("./data/360index");

        //编写过滤规则
        FilterOperator<String> filter= dataSource.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String s) throws Exception {
                return s.startsWith("https://");
            }
        });

        //对符合过滤的数据计数
        long count = filter.count();

        //指定文件要写入的目的地
        DataSink<String> stringDataSink = filter.writeAsText("./result/data/r2", FileSystem.WriteMode.OVERWRITE);
        env.execute("统计网站链接");
    }
}

结果展示

五 计数器

/**
 * 利用计数器进行网站链接计数
 *
 * @author TimePause
 * @create 2019-12-09 10:53
 */
public class FlinkAccumulator {
    public static void main(String[] args) throws Exception {
        //创建批处理环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //指定数据源
        DataSource<String> dataSource = env.readTextFile("./data/360index");

        //创建过滤器
        FilterOperator<String> filter = dataSource.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String s) throws Exception {
                return s.startsWith("https://");
            }
        });

        // 创建map操作器
        MapOperator<String, String> map = filter.map(new RichMapFunction<String, String>() {
            //创建计数器
            private IntCounter intCount= new IntCounter();

            //创建算子
            @Override
            public void open(Configuration parameters) throws Exception {
                getRuntimeContext().addAccumulator("myacc",intCount);
            }

            @Override
            public String map(String s) throws Exception {
                intCount.add(1);
                return s;
            }
        });

        // 指定文件输出的目的地
        DataSink<String> dataSink = map.writeAsText("./data/ressult/accumulator/r1");
        // 创建任务执行结果对象
        JobExecutionResult mycounter = env.execute("mycounter");
        // 创建计数器的计数结果
        Integer myacc = mycounter.getAccumulatorResult("myacc");
        System.out.println("myCounter value= " + myacc);
        System.out.println("-----------------------------");
    }
}

六 Flink术语

Flink 使用 java 语言开发,提供了 scala 编程的接口。使用 java 或者 scala 开发 Flink 是需 要使用 jdk8 版本,如果使用 Maven,maven 版本需要使用 3.0.4 及以上。

DataFlows

parallel Dataflows

并行数据流

Task和算子链

JobManager、TaskManager和clients:

Flink 运行时包含两种类型的进程:

  1. JobManger:也叫作 masters,协调分布式执行,调度 task,协调 checkpoint,协调故障恢复。在 Flink 程序中至少有一个 JobManager,高可用可以设置多个 JobManager,其中 一个是 Leader,其他都是 standby 状态。
  2. TaskManager:也叫 workers,执行 dataflow 生成的 task,负责缓冲数据,及 TaskManager 之间的交换数据。Flink 程序中必须有一个 TaskManager. Flink 程序可以运行在 standalone 集群,Yarn 或者 Mesos 资源调度框架中。 clients不是Flink程序运行时的一部分,作用是向JobManager准备和发送dataflow,之后, 客户端可以断开连接或者保持连接。

TaskSlots 任务槽

  • TaskSlots 任务槽: 每个Worker(TaskManager)是一个JVM进程,可以执行一个或者多个task,这些task可以运行在任务槽上,每个worker上至少有一个任务槽。每个任务槽都有固定的资源,例如:TaskManager有三个TaskSlots,那么每个TaskSlot会将TaskMananger中的内存均分,即每个任务槽的内存是总内存的1/3。任务槽的作用就是分离任务的托管内存,不会发生cpu隔离。 通过调整任务槽的数据量,用户可以指定每个TaskManager有多少任务槽,更多的任务槽意味着更多的task可以共享同一个JVM,同一个JVM中的task共享TCP连接和心跳信息,共享数据集和数据结构,从而减少TaskManager中的task开销。
  • 总结:task slot的个数代表TaskManager可以并行执行的task数。

第三章 安装

一 集群搭建步骤

  1. 进入https://flink.apache.org/downloads.html 下载flink. 下载好Flink之后( 资料分享至底部 )上传到Master(node1)节点上解压。
  2. 配置…/conf/flink-conf.yaml jobmanager.rpc.address: node1指定主节点 配置…/conf/slaves (指定从节点) node2 node3
  3. 将配置好的Flink发送到其他worker节点(node2,node3)上。 启动Flink集群 start-cluster.sh,访问webui。node1:8081

二 Flink读取Socket数据

  1. 编写java代码
package ah.szxy.flink4;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * 读取Socket数据(流式数据)
 *
 * @author TimePause
 * @create 2019-12-09 14:56
 */
public class FlinkReadSocketData {
    public static void main(String[] args) throws Exception {
        //创建流式处理环境
        StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
        // 参数管理工具类,帮助我们管理集群相关参数
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String node = "";
        Integer port = 0;
        if (parameterTool.has("node") && parameterTool.has("port")) {
            node = parameterTool.get("node");
            port = Integer.valueOf(parameterTool.get("port"));
        } else {
            System.out.println("集群提交需要参数");
            System.exit(1);
        }
        // 创建数据来源
        DataStreamSource<String> dataStreamSource = env.socketTextStream(node, port);
        // 切分单词
        SingleOutputStreamOperator<String> flatMap = dataStreamSource.flatMap(new FlatMapFunction<String, String>() {

            @Override
            public void flatMap(String s, Collector<String> collector) throws Exception {
                String[] split = s.split(" ");
                for (String s1 : split) {
                    collector.collect(s1);
                }
            }
        });

        // 规定输出格式
        SingleOutputStreamOperator<Tuple3<String,String,Integer>> map =  flatMap.map(new MapFunction<String, Tuple3<String, String, Integer>>() {
            @Override
            public Tuple3<String, String, Integer> map(String s) throws Exception {
                return new Tuple3<>(s, s, 1);
            }
        });
        // 统计单词
        KeyedStream<Tuple3<String,String,Integer>,Tuple> objectTupleKeyedStream = map.keyBy(0, 1);
        SingleOutputStreamOperator<Tuple3<String, String, Integer>> sum = objectTupleKeyedStream.sum(2);
        sum.print();

        env.execute("MySocketProject");
    }
}
 
  1. 在运行时, 编辑运行的参数
  1. 参数指定接收node4的 9999端口发来信息,图1, 然后运行程序 注意: 需要在node4上面下载网络工具netcat 下载工具 yum install -y nc 启动阻塞式窗口, 可以在里面输入相关字符数据( 图2 ) nc -lk 9999 图1

图2

图3

上传Flink程序到集群中运行

将上个实例项目打包, 放到集群中运行

  1. 打包程序 先clean一下,目的是清除原有的jar(图1) 然后package进行打包, 然后将含有相关jar包的jar上传到虚拟机(jar)

图2

  1. 集群的运行jar 需要jdk 1.8.0_191或者更高,本人的是 1.8.0_11版本,因此在上传jar到Flink集群会报错 flink job 运行时报错 AskTimeoutException, 解决方案是升级JDK # 删除原来的版本, 通过下面的命令找到对应的jdk存放目录, 删除 which java which javac # 上传 高版本jdk,解压, 重新配置环境变量以后重新加载一下 profile文件即可
  2. 上传任务到flink集群
# flink run -c 全限定类名 jar所在目录 --node 需要监听的节点 --port 需要监听的端口
flink run -c ah.szxy.flink4.FlinkReadSocketData ~/chy/software/MyFlinkCode-1.0-SNAPSHOT-jar-with-dependencies.jar --node node4 --port 9999
  1. node4通过netcat发送消息数据, 然后通过flink的集群查看(图1,图2) 图1

图2

三 Flink窗口操作

前提: 需要在node4中开启netcat, 运行程序后,在五秒内输入随机数据, 查看控制台打印结果

nc -lk 9999

相关代码

/**
 * Flink窗口操作
 *
 * @author TimePause
 * @create 2019-12-09 19:44
 */
public class FlinkWindowOperator {
    public static void main(String[] args) throws Exception {
        // 创建流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 创建数据源
        DataStreamSource<String>dataStreamSource=env.socketTextStream("node4",9999);
        // 切分单词
        SingleOutputStreamOperator<String> flatmap=dataStreamSource.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String s, Collector<String> collector) throws Exception {
                String[] split = s.split(" ");
                for (String s1 : split) {
                    collector.collect(s1);
                }
            }
        });
        // 规定输出格式
        SingleOutputStreamOperator<Tuple2<String,Integer>> map=flatmap.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String s) throws Exception {
                return new Tuple2<>(s,1);
            }
        });
        // 统计并输出结果
        KeyedStream<Tuple2<String, Integer>, Tuple> keyby = map.keyBy(0);
        // 一个参数: 每隔n个时间单位计算数目, 两个参数, 每隔后一个时间单位计算前一个时间单位的数据
        WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> timeWindow = keyby.timeWindow(Time.seconds(15), Time.seconds(5));
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = timeWindow.sum(1);
        sum.print();
        env.execute("FlinkWindowsOperator");

    }
}

结果

四 整合Kafka

启动kafka集群后 通过Flink代码自动生成topic-ReadKafkaTopic,我们将这个topic作为生产者

kafka-console-producer.sh --broker-list node2:9092,node3:9092,node4:9092 --topic ReadKafkaTopic

然后监听另一个topic-ResultKafkaTopic(Flink代码作用是将ReadKafkaTopic中数据传到ResultKafkaTopic )

kafka-console-consumer.sh --zookeeper node2:2181,node3:2181,node4:2181 --from-beginning --topic ResultKafkaTopic

Flink整合Kafka代码

package ah.szxy.flink6.kafka;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.util.Collector;

import java.util.Properties;

/**
 * 使用Flink读取Kafka中的数据
 *
 * @author TimePause
 * @create 2019-12-09 20:27
 */
public class FlinkReadKafka {
    public static void main(String[] args) throws Exception {
        //创建流处理环境
        StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "node2:9092,node3:9092,node4:9092");
        properties.setProperty("group.id", "myflink.group");

        //在kafka中创建topic
        FlinkKafkaConsumer011<String> readKafkaTopic = new FlinkKafkaConsumer011<String>("ReadKafkaTopic", new SimpleStringSchema(), properties);
        //创建数据源
        DataStreamSource<String> dataStreamSource = env.addSource(readKafkaTopic);
        //分词
        SingleOutputStreamOperator<Tuple2<String, Integer>> flatMap = dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] split = s.split(" ");
                for (String s1 : split) {
                    collector.collect(new Tuple2<>(s1,1));
                }

            }
        });
        //分组, 通过0号位置单词进行分组
        KeyedStream<Tuple2<String, Integer>, Tuple> keyBy = flatMap.keyBy(0);
        //统计
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = keyBy.sum(1);
        //
        FlinkKafkaProducer011<String> resultKafkaTopic = new FlinkKafkaProducer011<String>("ResultKafkaTopic", new SimpleStringSchema(), properties);

        sum.map(new MapFunction<Tuple2<String, Integer>, String>() {
            @Override
            public String map(Tuple2<String, Integer> value) throws Exception {
                return value.f0 + "#" + value.f1;
            }
        }).addSink(resultKafkaTopic);

        env.execute("readKafka");
    }
}

链接:https://pan.baidu.com/s/1BXFZV7xiD4Pj9mHvB56MkQ 提取码:hqwe

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Spark——底层操作RDD,基于内存处理数据的计算引擎

    Apache Spark是一个快速的通用集群计算框架 / 殷勤。它提供Java,Scala,Python和R中的高级API,以及支持常规执行图的优化引擎。它还支...

    时间静止不是简史
  • feign.FeignException$MethodNotAllowed: status 405 reading xxx#yyy(Integer)

    使用feign 调用异常 feign.FeignException$MethodNotAllowed: status 405 reading Consumer...

    时间静止不是简史
  • [数据结构与算法] 树结构之二叉排序树、平衡二叉树、多路查找树

    二叉排序树:BST: (Binary Sort(Search) Tree), 对于二叉排序树的任何一个非叶子节点,要求左子节点的值比当前节点的值小,右子节...

    时间静止不是简史
  • 关于网络钓鱼的深入讨论

    网络钓鱼相信大家都不会太陌生。近年来,随着人们网络安全意识的提升,网络钓鱼的手法也变得越来越高明。攻击者的社工经验愈加丰富,钓鱼技术也愈加的先进和新颖。作为企业...

    FB客服
  • Spring Cloud Zuul记录接口响应数据

    系统在生产环境出现问题时,排查问题最好的方式就是查看日志了,日志的记录尽量详细,这样你才能快速定位问题。

    猿天地
  • python读取json文件转成exce

    python处理excel有xlwt,openpyxl等,而xlwt只支持excel2003,也就是最多有256列,而openpyxl则支持excel2007以...

    py3study
  • (译)Buildpacks 进入 CNCF 沙箱

    今天 Pivotal、Salesforce Heroku 和 CNCF 联合宣布,云原生 Buildpacks 技术被接纳为 CNCF 沙箱项目。

    崔秀龙
  • Human Interface Guidelines — Sliders

    霖酱
  • 中文点选验证码之自动识别

    某次测试中遇到了汉字点选的验证码,看着很简单,尝试了一下发现有两种简单的识别方法,终于有空给重新整理一下,分享出来。

    FB客服
  • 第三十九章:基于SpringBoot & Quartz完成定时任务分布式单节点持久化

    恒宇少年

扫码关注云+社区

领取腾讯云代金券