Apache Flink® - Stateful Computations over Data Streams
Apache Flink是一个框架和分布式处理引擎,用于对无限制和有限制的数据流进行有状态的计算。 Flink被设计为可以在所有常见的集群环境中运行,以内存速度和任何规模执行计算。 Flink 官网网址:https://flink.apache.org/
任何类型的数据都是作为事件流产生的。信用卡交易,传感器测量,机器日志或网站或移动应用程序上的用户交互,所有这些数据都作为流生成。
数据可以作为无界流或有界流处理。
Apache Flink擅长处理无边界和有边界的数据集。对时间和状态的精确控制使Flink的运行时能够在无限制的流上运行任何类型的应用程序。有界流由专门为固定大小的数据集设计的算法和数据结构在内部进行处理,从而产生出色的性能。
Flink旨在运行任何规模的有状态流应用程序。将应用程序并行化为可能在群集中分布并同时执行的数千个任务。因此,应用程序几乎可以利用无限数量的CPU,主内存,磁盘和网络IO。而且,Flink易于维护非常大的应用程序状态。它的异步和增量检查点算法可确保对处理延迟的影响最小,同时可保证一次状态一致性。
用户报告了其生产环境中运行的Flink应用程序的可伸缩性数字,例如
有状态Flink应用程序针对本地状态访问进行了优化。任务状态始终保持在内存中,或者,如果状态大小超出可用内存,则始终保持在访问有效的磁盘数据结构中。因此,任务通过访问通常处于内存中的状态来执行所有计算,从而产生非常低的处理延迟。Flink通过定期将本地状态异步指向持久性存储,从而确保在故障情况下一次准确的状态一致性。
Apache Flink是用于无限制和有限制的数据流上的有状态计算的框架。由于许多流应用程序的设计目的是在最少的停机时间内连续运行,因此流处理器必须提供出色的故障恢复能力,以及在运行时监视和维护应用程序的工具。 Apache Flink将重点放在流处理的操作方面。在这里,我们将说明Flink的故障恢复机制,并介绍其功能来管理和监督正在运行的应用程序
机器和过程故障在分布式系统中无处不在。像Flink这样的分布式流处理器必须从故障中恢复,才能运行24/7的流应用程序。 显然,这不仅意味着失败后重新启动应用程序,而且还确保其内部状态保持一致,使应用程序可以像从未发生过失败那样继续进行处理。
Flink提供了一些功能来确保应用程序保持运行并保持一致:
需要维护支持关键业务服务的流应用程序。需要修复错误,并需要改进或实现新功能。但是,更新有状态流应用程序并非易事。通常,一个人无法简单地停止应用程序并重新启动一个固定或改进的版本,因为一个人无法承受失去应用程序状态的负担。
Flink的保存点是一项独特而强大的功能,可以解决更新有状态应用程序的问题以及许多其他相关挑战。保存点是应用程序状态的一致快照,因此与检查点非常相似。但是,与检查点相比,保存点需要手动触发,并且在停止应用程序时不会自动将其删除。保存点可用于启动状态兼容的应用程序并初始化其状态。保存点启用以下功能:
就像其他任何服务一样,需要监视连续运行的流应用程序并将其集成到组织的操作基础架构(即监视和日志记录服务)中。 监视有助于预测问题并提前做出反应。通过日志记录可以进行根本原因分析以调查故障。易于访问的界面是控制运行中的应用程序的重要功能。
Flink与许多常用的日志记录和监视服务很好地集成在一起,并提供REST API来控制应用程序和查询信息。
Flink使用java语言开发,提供了scala编程的接口。 使用java或者scala开发Flink是需要使用jdk8版本。 如果使用Maven,maven版本需要使用3.0.4及以上。
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink.version>1.7.1</flink.version>
</properties>
<dependencies>
<!-- Flink 依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-wikiedits_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink Kafka连接器的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<!-- Flink Scala2.11 版本 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.12</version>
</dependency>
<!-- log4j 和slf4j 包,如果在控制台不想看到日志,可以将下面的包注释掉-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
<version>1.7.25</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.5</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 在maven项目中既有java又有scala代码时配置 maven-scala-plugin 插件打包时可以将两类代码一起打包 -->
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version>
<configuration>
<!-- 设置false后是去掉 MySpark-1.0-SNAPSHOT-jar-with-dependencies.jar 后的 “-jar-with-dependencies” -->
<!--<appendAssemblyId>false</appendAssemblyId>-->
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.lw.myflink.Streaming.FlinkSocketWordCount</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>assembly</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
package ah.szxy.flink;
import akka.stream.impl.fusing.GroupBy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.*;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* 创建Flink程序,统计单词数目
* 1.创建环境
* 批处理: ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment();
* 流处理: StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
* 2.在批处理中Flink处理的数据对象是DataSet
* 在流处理中Flink处理的数据对象是DataStream
* 3.代码流程必须符合 source ->transformation->sink
* transformation 都是懒执行, 需要最后使用env.execute()触发执行或者使用print(),count(),collect()触发执行
* 4.Flink编程不是K.V格式的编程, 通过某些方式来虚拟key
* 5.Flink中的tuple最多支持25个元素, 每个元素都是从0开始
*
* @author TimePause
* @create 2019-12-08 21:04
*/
public class WordCont {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> dataSource=env.readTextFile("./data/word.txt");
FlatMapOperator<String,String> words=dataSource.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String w, Collector<String> collector) throws Exception {
String[] split = w.split(" ");
for (String word : split) {
collector.collect(word);
}
}
});
MapOperator<String,Tuple2<String,Integer>> reduceWords =words.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String w) throws Exception {
return new Tuple2<String, Integer>(w,1);
}
});
UnsortedGrouping<Tuple2<String, Integer>> grouping = reduceWords.groupBy(0);
//输出到控制台
AggregateOperator<Tuple2<String, Integer>> result = grouping.sum(1);
result.print();
//输出到文件中
// DataSet<Tuple2<String, Integer>> dataSet = grouping.sum(1);
// DataSink<Tuple2<String, Integer>> dataSink = dataSet.writeAsText("./data/reslut/r1");
// env.execute("myflink");
}
}
Flink处理数据流程 Source -> Transformations ->Sink 数据源头 -> 数据转换 -> 数据输出
Flink程序的执行过程:
Flink中数据类型
Flink三种处理数据模型
env.setParallelism(1);
DataSet<Tuple2<String, Integer>> dataSet = grouping.sum(1).setParallelism(1);
public class WordCont {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//设置全局分区
// env.setParallelism(1);
DataSource<String> dataSource = env.readTextFile("./data/word.txt");
FlatMapOperator<String, String> words = dataSource.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String w, Collector<String> collector) throws Exception {
String[] split = w.split(" ");
for (String word : split) {
collector.collect(word);
}
}
});
MapOperator<String, Tuple2<String, Integer>> reduceWords = words.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String w) throws Exception {
return new Tuple2<String, Integer>(w, 1);
}
});
UnsortedGrouping<Tuple2<String, Integer>> grouping = reduceWords.groupBy(0);
//输出到控制台
// AggregateOperator<Tuple2<String, Integer>> result = grouping.sum(1);
DataSet<Tuple2<String, Integer>> dataSet = grouping.sum(1).setParallelism(1); //为单个的算子设置分区
SortPartitionOperator<Tuple2<String, Integer>> result = dataSet.sortPartition(1, Order.DESCENDING);
result.print();
}
}
DataSource<String> dataSource = env.readTextFile("./data/word.txt");
DataSet<Tuple2<String, Integer>> dataSet = grouping.sum(1).setParallelism(1); //为单个的算子设置分区
SortPartitionOperator<Tuple2<String, Integer>> result = dataSet.sortPartition(1, Order.DESCENDING);
//csv文件, 生成的文件以指定分隔符分隔,默认为逗号
result.writeAsCsv("./data/result/r2", "\n", "&", FileSystem.WriteMode.OVERWRITE);
实现过滤以 https:// 开头的网址
/**
* 统计网站链接情况
*
* @author TimePause
* @create 2019-12-09 10:11
*/
public class Flink360site {
public static void main(String[] args) throws Exception {
//创建批处理环境
ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment();
//设置分区
env.setParallelism(1);
//指定数据源
DataSource<String> dataSource = env.readTextFile("./data/360index");
//编写过滤规则
FilterOperator<String> filter= dataSource.filter(new FilterFunction<String>() {
@Override
public boolean filter(String s) throws Exception {
return s.startsWith("https://");
}
});
//对符合过滤的数据计数
long count = filter.count();
//指定文件要写入的目的地
DataSink<String> stringDataSink = filter.writeAsText("./result/data/r2", FileSystem.WriteMode.OVERWRITE);
env.execute("统计网站链接");
}
}
结果展示
/**
* 利用计数器进行网站链接计数
*
* @author TimePause
* @create 2019-12-09 10:53
*/
public class FlinkAccumulator {
public static void main(String[] args) throws Exception {
//创建批处理环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//指定数据源
DataSource<String> dataSource = env.readTextFile("./data/360index");
//创建过滤器
FilterOperator<String> filter = dataSource.filter(new FilterFunction<String>() {
@Override
public boolean filter(String s) throws Exception {
return s.startsWith("https://");
}
});
// 创建map操作器
MapOperator<String, String> map = filter.map(new RichMapFunction<String, String>() {
//创建计数器
private IntCounter intCount= new IntCounter();
//创建算子
@Override
public void open(Configuration parameters) throws Exception {
getRuntimeContext().addAccumulator("myacc",intCount);
}
@Override
public String map(String s) throws Exception {
intCount.add(1);
return s;
}
});
// 指定文件输出的目的地
DataSink<String> dataSink = map.writeAsText("./data/ressult/accumulator/r1");
// 创建任务执行结果对象
JobExecutionResult mycounter = env.execute("mycounter");
// 创建计数器的计数结果
Integer myacc = mycounter.getAccumulatorResult("myacc");
System.out.println("myCounter value= " + myacc);
System.out.println("-----------------------------");
}
}
Flink 使用 java 语言开发,提供了 scala 编程的接口。使用 java 或者 scala 开发 Flink 是需 要使用 jdk8 版本,如果使用 Maven,maven 版本需要使用 3.0.4 及以上。
并行数据流
Flink 运行时包含两种类型的进程:
jobmanager.rpc.address: node1
指定主节点
配置…/conf/slaves (指定从节点)
node2 node3
start-cluster.sh
,访问webui。node1:8081
package ah.szxy.flink4;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* 读取Socket数据(流式数据)
*
* @author TimePause
* @create 2019-12-09 14:56
*/
public class FlinkReadSocketData {
public static void main(String[] args) throws Exception {
//创建流式处理环境
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
// 参数管理工具类,帮助我们管理集群相关参数
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String node = "";
Integer port = 0;
if (parameterTool.has("node") && parameterTool.has("port")) {
node = parameterTool.get("node");
port = Integer.valueOf(parameterTool.get("port"));
} else {
System.out.println("集群提交需要参数");
System.exit(1);
}
// 创建数据来源
DataStreamSource<String> dataStreamSource = env.socketTextStream(node, port);
// 切分单词
SingleOutputStreamOperator<String> flatMap = dataStreamSource.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String s, Collector<String> collector) throws Exception {
String[] split = s.split(" ");
for (String s1 : split) {
collector.collect(s1);
}
}
});
// 规定输出格式
SingleOutputStreamOperator<Tuple3<String,String,Integer>> map = flatMap.map(new MapFunction<String, Tuple3<String, String, Integer>>() {
@Override
public Tuple3<String, String, Integer> map(String s) throws Exception {
return new Tuple3<>(s, s, 1);
}
});
// 统计单词
KeyedStream<Tuple3<String,String,Integer>,Tuple> objectTupleKeyedStream = map.keyBy(0, 1);
SingleOutputStreamOperator<Tuple3<String, String, Integer>> sum = objectTupleKeyedStream.sum(2);
sum.print();
env.execute("MySocketProject");
}
}
图2
图3
将上个实例项目打包, 放到集群中运行
图2
# flink run -c 全限定类名 jar所在目录 --node 需要监听的节点 --port 需要监听的端口
flink run -c ah.szxy.flink4.FlinkReadSocketData ~/chy/software/MyFlinkCode-1.0-SNAPSHOT-jar-with-dependencies.jar --node node4 --port 9999
图2
前提: 需要在node4中开启netcat, 运行程序后,在五秒内输入随机数据, 查看控制台打印结果
nc -lk 9999
相关代码
/**
* Flink窗口操作
*
* @author TimePause
* @create 2019-12-09 19:44
*/
public class FlinkWindowOperator {
public static void main(String[] args) throws Exception {
// 创建流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 创建数据源
DataStreamSource<String>dataStreamSource=env.socketTextStream("node4",9999);
// 切分单词
SingleOutputStreamOperator<String> flatmap=dataStreamSource.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String s, Collector<String> collector) throws Exception {
String[] split = s.split(" ");
for (String s1 : split) {
collector.collect(s1);
}
}
});
// 规定输出格式
SingleOutputStreamOperator<Tuple2<String,Integer>> map=flatmap.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String s) throws Exception {
return new Tuple2<>(s,1);
}
});
// 统计并输出结果
KeyedStream<Tuple2<String, Integer>, Tuple> keyby = map.keyBy(0);
// 一个参数: 每隔n个时间单位计算数目, 两个参数, 每隔后一个时间单位计算前一个时间单位的数据
WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> timeWindow = keyby.timeWindow(Time.seconds(15), Time.seconds(5));
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = timeWindow.sum(1);
sum.print();
env.execute("FlinkWindowsOperator");
}
}
结果
启动kafka集群后 通过Flink代码自动生成topic-ReadKafkaTopic,我们将这个topic作为生产者
kafka-console-producer.sh --broker-list node2:9092,node3:9092,node4:9092 --topic ReadKafkaTopic
然后监听另一个topic-ResultKafkaTopic(Flink代码作用是将ReadKafkaTopic中数据传到ResultKafkaTopic )
kafka-console-consumer.sh --zookeeper node2:2181,node3:2181,node4:2181 --from-beginning --topic ResultKafkaTopic
Flink整合Kafka代码
package ah.szxy.flink6.kafka;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.util.Collector;
import java.util.Properties;
/**
* 使用Flink读取Kafka中的数据
*
* @author TimePause
* @create 2019-12-09 20:27
*/
public class FlinkReadKafka {
public static void main(String[] args) throws Exception {
//创建流处理环境
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "node2:9092,node3:9092,node4:9092");
properties.setProperty("group.id", "myflink.group");
//在kafka中创建topic
FlinkKafkaConsumer011<String> readKafkaTopic = new FlinkKafkaConsumer011<String>("ReadKafkaTopic", new SimpleStringSchema(), properties);
//创建数据源
DataStreamSource<String> dataStreamSource = env.addSource(readKafkaTopic);
//分词
SingleOutputStreamOperator<Tuple2<String, Integer>> flatMap = dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] split = s.split(" ");
for (String s1 : split) {
collector.collect(new Tuple2<>(s1,1));
}
}
});
//分组, 通过0号位置单词进行分组
KeyedStream<Tuple2<String, Integer>, Tuple> keyBy = flatMap.keyBy(0);
//统计
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = keyBy.sum(1);
//
FlinkKafkaProducer011<String> resultKafkaTopic = new FlinkKafkaProducer011<String>("ResultKafkaTopic", new SimpleStringSchema(), properties);
sum.map(new MapFunction<Tuple2<String, Integer>, String>() {
@Override
public String map(Tuple2<String, Integer> value) throws Exception {
return value.f0 + "#" + value.f1;
}
}).addSink(resultKafkaTopic);
env.execute("readKafka");
}
}
链接:https://pan.baidu.com/s/1BXFZV7xiD4Pj9mHvB56MkQ 提取码:hqwe