大家好,我是ChinaManor,直译过来就是中国码农的意思,我希望自己能成为国家复兴道路的铺路人,大数据领域的耕耘者,平凡但不甘于平庸的人。
下面为大家带来阿里巴巴极度热推的Flink,实时数仓是未来的方向,学好Flink,月薪过万不是梦!!
2021年最新最全Flink系列教程_Flink原理初探和流批一体API(二)
2021年最新最全Flink系列教程_Flink原理初探和流批一体API(二.五)
2021年最新最全Flink系列教程__Flink高级API(三)
Flink 中批处理是流处理的一种特例。
package cn.itcast.sz22.day01;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
* Author itcast
* Date 2021/5/4 15:25
* 通过 flink 批处理实现 wordcount
* 开发步骤:
* 1. 获取环境变量
* 2. 读取数据源
* 3. 转换操作
* 4. 将数据落地,打印到控制台
* 5. 执行(流环境下)
*/
public class Wordcount {
public static void main(String[] args) throws Exception {
//获取环境变量
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//读取数据
//1. 文件中读取
//2. 获取本地的数据,开发测试用
DataSource<String> source = env
.fromElements("itcast hadoop spark", "itcast hadoop spark", "itcast hadoop", "itcast");
//3. 进行转换操作
DataSet<String> flatMapDS = source.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String[] words = value.split(" ");
for (String word : words) {
out.collect(word);
}
}
});
//3.2 转换成元素 map
MapOperator<String, Tuple2<String, Integer>> mapDS = flatMapDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return Tuple2.of(value, 1);
}
});
//3.3统计
AggregateOperator<Tuple2<String, Integer>> result = mapDS.groupBy(0).sum(1);
//4.打印输出
result.print();
}
}
package cn.itcast.sz22.day01;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* Author itcast
* Date 2021/5/4 15:55
* 流式计算 wordcount 统计
* 编码步骤
* 1.准备环境-env
* 2.准备数据-source
* 3.处理数据-transformation
* 4.输出结果-sink
* 5.触发执行-execute
*/
public class Wordcount2 {
public static void main(String[] args) throws Exception {
//编码步骤
//1.准备环境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
env.setParallelism(1);
//2.准备数据-source
DataStream<String> linesDS = env
.fromElements("itcast hadoop spark","itcast hadoop spark","itcast hadoop","itcast");
//3.处理数据-transformation
SingleOutputStreamOperator<String> flatMap = linesDS.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String[] words = value.split(" ");
for (String word : words) {
out.collect(word);
}
}
});
SingleOutputStreamOperator<Tuple2<String, Integer>> mapDS = flatMap.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return Tuple2.of(value, 1);
}
});
SingleOutputStreamOperator<Tuple2<String, Integer>> result = mapDS.keyBy(t -> t.f0)
.sum(1);
//4.输出结果-sink
result.print();
//5.触发执行-execute
env.execute();
}
}
package cn.itcast.sz22.day01;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
/**
* Author itcast
* Date 2021/5/4 16:05
* 通过 java 的 lambda 表达式实现 wordcount
*/
public class Wordcount3 {
public static void main(String[] args) throws Exception {
//1. 获取环境变量
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2. 读取数据源
DataStreamSource<String> source = env.fromElements("itcast hadoop spark", "itcast hadoop spark", "itcast hadoop", "itcast");
//3. 转换操作
// void flatMap(T value, Collector<O> out)
DataStream<String> faltMapDS = source.flatMap((String value, Collector<String> out) ->
Arrays.stream(value.split(" "))
.forEach(out::collect))
.returns(Types.STRING);
//O map(T value)
SingleOutputStreamOperator<Tuple2<String, Integer>> mapDS = faltMapDS
.map((word) -> Tuple2.of(word, 1))
.returns(Types.TUPLE(Types.STRING, Types.INT));
SingleOutputStreamOperator<Tuple2<String, Integer>> result = mapDS.keyBy(t -> t.f0).sum(1);
//4. 将数据落地,打印到控制台
result.print();
//5. 执行(流环境下)
env.execute();
}
}
1. jobmanager
2. taskmanager
3. clienttaskmanager 执行能力
1. taskslot 静态的概念
2. parallelism 并行度 动态概念每个节点就是一个 task 任务
每个任务拆分成多个并行处理的任务,就叫子任务 subtask
以上便是2021年最新最全Flink系列教程_Flink概述、安装部署和入门案例(一)