Flink是什么 Apache Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行状态计算。
创建一个普通的maven工程,导入相关依赖
<dependencies>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-javaartifactId>
<version>1.10.1version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-streaming-java_2.12artifactId>
<version>1.10.1version>
dependency>
dependencies>
导入成功之后有一点要注意,就是java_2.12中的2.12指的是scala的版本,导入依赖成功之后即在对应目录创建包与对应类开始项目的编写。
package com.yo.wc;
/**
* created by YO
*/
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.util.Collector;
// 批处理word count
public class WordCount {
public static void main(String[] args) throws Exception{
// 创建执行环境,类似与spark的创建上下文
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 从文件中读取数据 这里可以随意指定路径,txt文件写入空格隔开的随意单词即可
String inputPath = "D:\\hello.txt";
//read读取数据,可以指定读取的文件类型,整套批处理的api在flink里面就叫做dataset
//dataset是flink针对离线数据的处理模型
DataSet<String> inputDataSet = env.readTextFile(inputPath);
// 对数据集进行处理,按空格分词展开,转换成(word, 1)二元组进行统计
DataSet<Tuple2<String, Integer>> result = inputDataSet.flatMap(new MyFlatMapper())
.groupBy(0) // 按照第一个位置的word分组
.sum(1); // 将第二个位置上的数据求和
result.print();
}
// 自定义类,实现FlatMapFunction接口 输出是String 输出是元组Tuple2>是flink提供的元组类型
public static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
//value是输入,out就是输出的数据
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
// 按空格分词
String[] words = value.split(" ");
// 遍历所有word,包成二元组输出
for (String word : words) {
out.collect(new Tuple2<>(word, 1));
}
}
}
}
输出: 文本内的单词不同输出也不同
(scala,1)
(flink,1)
(world,1)
(hello,4)
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.net.URL;
public class StreamWordCount {
public static void main(String[] args) throws Exception{
// 创建流处理执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// // 从文件中读取数据
String inputPath = "D:\\hello.txt";
DataStream<String> inputDataStream = env.readTextFile(inputPath);
// 基于数据流进行转换计算
DataStream<Tuple2<String, Integer>> resultStream = inputDataStream.flatMap(new WordCount.MyFlatMapper())
.keyBy(0)
.sum(1);
resultStream.print();
// 执行任务
env.execute();
}
}
输出:
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.net.URL;
public class StreamWordCount {
public static void main(String[] args) throws Exception{
// 创建流处理执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 用parameter tool工具从程序启动参数中提取配置项 ,这里就是从main方法中获取参数了args,可以在集群运行,这里再IDEA传参模拟
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String host = parameterTool.get("host");
int port = parameterTool.getInt("port");
// 从socket文本流读取数据
DataStream<String> inputDataStream = env.socketTextStream(host, port);
// 基于数据流进行转换计算
DataStream<Tuple2<String, Integer>> resultStream = inputDataStream.flatMap(new WordCount.MyFlatMapper())
.keyBy(0)
.sum(1);
resultStream.print();
// 执行任务
env.execute();
}
}
Flink的第一课入门到这里就完成了,同学们有遇到问题可直接私信,博主会尽力解答!