--------------------------------------------------------------------------------...
代码版本 Flink : 1.10.0 Scala : 2.12.6 侧输出流(SideOutput) 本文介绍的内容是侧输出流(SideOutput),在平时大部分的 DataStream API...ProcessWindowFunction ProcessAllWindowFunction 案例 下面举一个例子是将含有特殊字符串的流区分开,数据由两个定义好的工具类向Kafka灌入不同内容的数据,然后通过侧输出流(SideOutput...= dstream .map(line => { JSON.parseObject(line, classOf[Person_t]) }) val sideOutput...out.collect(value.toString) } else { // 测输出流输出的部分 ctx.output(outputTag, "sideOutput...(outputTag) // 测输出流处理 sideOutputStream.print("测输出流") // 常规数据处理 sideOutput.print("常规数据
, OutputTag source2SideOutput) { super(); this.source1SideOutput = source1SideOutput...; this.source2SideOutput = source2SideOutput; } private OutputTag source1SideOutput...final OutputTag source1SideOutput = new OutputTag("source1-sideoutput"){};...// 假设aaa流入2号源后,如果1号源超过10秒没有收到aaa,那么2号源的aaa就会流入source2SideOutput final OutputTag source2SideOutput..., source2SideOutput); } @Override protected void doSideOutput(SingleOutputStreamOperator
第二个demo 第二个demo是实现旁路输出(Side Outputs),对于一个DataStream来说,可以通过旁路输出将数据输出到其他算子中去,而不影响原有的算子的处理,下面来演示旁路输出: 创建SideOutput...import org.apache.flink.util.OutputTag; import java.util.ArrayList; import java.util.List; public class SideOutput...> fromCollectionDataStream = env.fromCollection(list); //所有元素都进入mainDataStream,f1字段为奇数的元素进入SideOutput...mainDataStream.print(); sideDataStream.print(); env.execute("processfunction demo : sideoutput...发给主流程算子,再将f1字段为奇数的元素发到旁路输出; 数据源发出元素时,提前把元素的f0、f1、时间戳打印出来,和后面的数据核对是否一致; 将主流程和旁路输出的元素都打印出来,验证处理结果是否符合预期; 执行SideOutput
侧输出SideOutput ProcessFunction的另一大特色功能是可以将一部分数据发送到另外一个流中,而且输出到的两个流数据类型可以不一样,我们通过OutputTag[T]来标记另外一个数据流...out: Collector[String]): Unit = { // 其他业务逻辑... // 定义一个OutputTag,Stock为这个SideOutput...筛选出来发送到该OutputTag下 context.output(highVolumeOutput, stock) } } } 在主逻辑中,通过下面的方法获取侧输出: // 收集SideOutput...DataStream[Stock] = mainStream.getSideOutput(outputTag) 从这个例子中可以看到,KeyedProcessFunction的输出类型是String,而SideOutput
sideOutPut 是最后兜底操作,所有过期延迟数据,指定窗口已经彻底关闭了,就会把数据放到侧输出流。...resultStream.print(); //把迟到的数据暂时打印到控制台,实际中可以保存到其他存储介质中 DataStream> sideOutput...= resultStream.getSideOutput(outputTag); sideOutput.print(); env.execute(); } }
map.get("pay").get(0); } }); //result.print(); DataStream sideOutput...= result.getSideOutput(orderTimeoutOutput); sideOutput.print(); 运行结果: 二、连续登录失败 需求:找出那些 5 秒钟内连续登录失败的账号
其解决方案就是 Watermark / allowLateNess / sideOutPut 这一组合拳。 Watermark 的作用是防止 数据乱序 / 指定时间内获取不到全部数据。...**sideOutPut **是最后兜底操作,当指定窗口已经彻底关闭后,就会把所有过期延迟数据放到侧输出流,让用户决定如何处理。...总结起来就是说 Windows -----> Watermark -----> allowLateNess -----> sideOutPut 用Windows把流数据分块处理,用Watermark...用sideOutPut 最后兜底把数据导出到其他地方。 问题2. Watermark应该翻译成水位线 我最初看的一篇文章中把Watermark翻译成“水印”。我当时比较晕。...sideOutPut是最后兜底操作,所有过期延迟数据,指定窗口已经彻底关闭了,就会把数据放到侧输出流。 4. 实例 采用系统时间做Watermark 我们将水位线设置为当前系统时间间-5秒。
任务咋报错了呢,查看错误日志,大概说的是写入mysql唯一键冲突,内心万马鹏腾,老老实实翻开代码看一下写入逻辑:做了一个窗口聚合的业务逻辑将窗口聚合的结果写入mysql,延时数据也不能丢弃,因此加了一个sideOutput
= null){ sideOutput(element); } else { this.numLateRecordsDropped.inc(); } } } 这个里的
该方法已经标记为废弃,推荐使用SideOutput。...4.3 旁路输出 旁路输出在Flink中叫做SideOutput,类似于DataStream#split,本质上是一个数据流的切分行为,按照条件将DataStream切分为多个子数据流,
admin. 5.1 启动一个Flink Job flink run -m yarn-cluster -ynm LateDataProcess -yn 1 -c com.venn.stream.api.sideoutput.lateDataProcess.LateDataProcess
= null){ sideOutput(element); } else { this.numLateRecordsDropped.inc
一个案例: 监控传感器温度值,将温度值低于 30 度的数据输出到 SideOutput public class ProcessTest3_SideOutputCase { public
此时,可以这个事件放到 sideoutput 队列中,额外逻辑处理。 ? 四、Flink 1.11 版本 中,如何定义水印 所以在 1.11 版本中,重构了水印生成接口。
此时,可以这个事件放到 sideoutput 队列中,额外逻辑处理。
领取专属 10元无门槛券
手把手带您无忧上云