package cn.itcast.flink.basestone;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
* Author itcast
* Date 2021/6/18 15:00
* 开发步骤
* 1. 将 字符串 9,3 转换成 CartInfo
* 2. 使用 滚动窗口, 滑动窗口
* 3. 分组和聚合
* 4. 打印输出
* 5. 执行环境
*/
public class WindowDemo01 {
public static void main(String[] args) throws Exception {
//1.env 创建流执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.读取 socket 数据源
DataStreamSource<String> source = env.socketTextStream("192.168.88.161", 9999);
//3.将9,3转为CartInfo(9,3)
DataStream<CartInfo> mapDS = source.map(new MapFunction<String, CartInfo>() {
@Override
public CartInfo map(String value) throws Exception {
String[] kv = value.split(",");
return new CartInfo(kv[0], Integer.parseInt(kv[1]));
}
});
//4.按照 sensorId 分组并划分滚动窗口为5秒,在窗口上求和
// Tumbling(滚动)Processing(处理)TimeWindows(时间窗口)
//需求1:每5秒钟统计一次,最近5秒钟内,各个路口/信号灯通过红绿灯汽车的数量
SingleOutputStreamOperator<CartInfo> result1 = mapDS.keyBy(t -> t.sensorId)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum("count");
//需求2:每5秒钟统计一次,最近10秒钟内,各个路口/信号灯通过红绿灯汽车的数量
SingleOutputStreamOperator<CartInfo> result2 = mapDS.keyBy(t -> t.sensorId)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)))
.sum("count");
//5.打印输出
//result1.print();
result2.print();
//6.execute
env.execute();
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class CartInfo {
private String sensorId;//信号灯id
private Integer count;//通过该信号灯的车的数量
}
}
package cn.itcast.flink.basestone;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* Author itcast
* Date 2021/6/18 15:46
* Desc TODO
*/
public class CountWindowDemo01 {
public static void main(String[] args) throws Exception {
//1.env 创建流执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.读取 socket 数据源
DataStreamSource<String> source = env.socketTextStream("192.168.88.161", 9999);
//3.将9,3转为CartInfo(9,3)
DataStream<WindowDemo01.CartInfo> mapDS = source.map(new MapFunction<String, WindowDemo01.CartInfo>() {
@Override
public WindowDemo01.CartInfo map(String value) throws Exception {
String[] kv = value.split(",");
return new WindowDemo01.CartInfo(kv[0], Integer.parseInt(kv[1]));
}
});
// * 需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计--基于数量的滚动窗口
// //countWindow(long size, long slide)
SingleOutputStreamOperator<WindowDemo01.CartInfo> result1 = mapDS.keyBy(t -> t.getSensorId())
.countWindow(5)
.sum("count");
// * 需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计--基于数量的滑动窗口
SingleOutputStreamOperator<WindowDemo01.CartInfo> result2 = mapDS.keyBy(t -> t.getSensorId())
.countWindow(5, 3)
.sum("count");
//打印输出
//result1.print();
result2.print();
//执行环境
env.execute();
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class CartInfo {
private String sensorId;//信号灯id
private Integer count;//通过该信号灯的车的数量
}
}
package cn.itcast.flink.basestone;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* Author itcast
* Date 2021/6/18 15:46
* Desc TODO
*/
public class CountWindowDemo01 {
public static void main(String[] args) throws Exception {
//1.env 创建流执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.读取 socket 数据源
DataStreamSource<String> source = env.socketTextStream("192.168.88.161", 9999);
//3.将9,3转为CartInfo(9,3)
DataStream<WindowDemo01.CartInfo> mapDS = source.map(new MapFunction<String, WindowDemo01.CartInfo>() {
@Override
public WindowDemo01.CartInfo map(String value) throws Exception {
String[] kv = value.split(",");
return new WindowDemo01.CartInfo(kv[0], Integer.parseInt(kv[1]));
}
});
// * 需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计--基于数量的滚动窗口
// //countWindow(long size, long slide)
SingleOutputStreamOperator<WindowDemo01.CartInfo> result1 = mapDS.keyBy(t -> t.getSensorId())
.countWindow(5)
.sum("count");
// * 需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计--基于数量的滑动窗口
SingleOutputStreamOperator<WindowDemo01.CartInfo> result2 = mapDS.keyBy(t -> t.getSensorId())
.countWindow(5, 3)
.sum("count");
//打印输出
//result1.print();
result2.print();
//执行环境
env.execute();
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class CartInfo {
private String sensorId;//信号灯id
private Integer count;//通过该信号灯的车的数量
}
}
有订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)
要求每隔5s, 计算5秒内,每个用户的订单总金额
并添加Watermark来解决一定程度上的数据延迟和数据乱序(最多延时 3 秒)问题。
package cn.itcast.flink.basestone;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.time.Duration;
import java.util.Random;
import java.util.UUID;
/**
* Author itcast
* Date 2021/6/18 16:54
* Desc TODO
*/
public class WatermarkDemo01 {
public static void main(String[] args) throws Exception {
//1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置属性 ProcessingTime , 新版本 默认设置 EventTime
//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//2.Source 创建 Order 类 orderId:String userId:Integer money:Integer eventTime:Long
DataStreamSource<Order> source = env.addSource(new SourceFunction<Order>() {
boolean flag = true;
Random rm = new Random();
@Override
public void run(SourceContext<Order> ctx) throws Exception {
while (flag) {
ctx.collect(new Order(
UUID.randomUUID().toString(),
rm.nextInt(3),
rm.nextInt(101),
//模拟生成 Order 数据 事件时间=当前时间-5秒钟随机*1000
System.currentTimeMillis() - rm.nextInt(5) * 1000
));
Thread.sleep(1000);
}
}
@Override
public void cancel() {
flag = false;
}
});
//3.Transformation
//-告诉Flink要基于事件时间来计算!
//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//新版本默认就是EventTime
DataStream<Order> result = source.assignTimestampsAndWatermarks(
WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((element, recordTimestamp) -> element.eventTime)
)
//-分配水印机制,最多延迟3秒,告诉Flink数据中的哪一列是事件时间,因为Watermark = 当前最大的事件时间 - 最大允许的延迟时间或乱序时间
//代码走到这里,就已经被添加上Watermark了!接下来就可以进行窗口计算了
//要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额
.keyBy(t -> t.userId)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sum("money");
//4.Sink
result.print();
//5.execute
env.execute();
}
//创建订单类
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class Order{
private String orderId;
private Integer userId;
private Integer money;
private Long eventTime;
}
}
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.time.Duration;
import java.util.Random;
import java.util.UUID;
/**
* Author itcast
* Date 2021/6/18 16:54
* Desc TODO
*/
public class WatermarkDemo01 {
public static void main(String[] args) throws Exception {
//1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置属性 ProcessingTime , 新版本 默认设置 EventTime
//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//2.Source 创建 Order 类 orderId:String userId:Integer money:Integer eventTime:Long
DataStreamSource<Order> source = env.addSource(new SourceFunction<Order>() {
boolean flag = true;
Random rm = new Random();
@Override
public void run(SourceContext<Order> ctx) throws Exception {
while (flag) {
ctx.collect(new Order(
UUID.randomUUID().toString(),
rm.nextInt(3),
rm.nextInt(101),
//模拟生成 Order 数据 事件时间=当前时间-5秒钟随机*1000
System.currentTimeMillis() - rm.nextInt(5) * 1000
));
Thread.sleep(1000);
}
}
@Override
public void cancel() {
flag = false;
}
});
//3.Transformation
//-告诉Flink要基于事件时间来计算!
//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//新版本默认就是EventTime
DataStream<Order> result = source.assignTimestampsAndWatermarks(
WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((element, recordTimestamp) -> element.eventTime)
)
//-分配水印机制,最多延迟3秒,告诉Flink数据中的哪一列是事件时间,因为Watermark = 当前最大的事件时间 - 最大允许的延迟时间或乱序时间
//代码走到这里,就已经被添加上Watermark了!接下来就可以进行窗口计算了
//要求每隔5s,计算5秒内(基于时间的滚动窗口),每个用户的订单总金额
.keyBy(t -> t.userId)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sum("money");
//4.Sink
result.print();
//5.execute
env.execute();
}
//创建订单类
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class Order{
private String orderId;
private Integer userId;
private Integer money;
private Long eventTime;
}
}
IndexOfThisSubtask(); System.out.println(“index:”+idx+" offset:"+offset); Thread.sleep(1000); if(offset % 5 ==0){ System.out.println(“当前程序出错了…”); throw new Exception(“程序出BUG…”); } } } //重写cancel方法 @Override public void cancel() { flag = false; }
//重写snapshotState方法 , 清空 offsetState ,并将最新的offset添加进去
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
offsetState.clear();
offsetState.add(offset);
}
}
}