前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink 流计算算子函数详解

Flink 流计算算子函数详解

作者头像
Tim在路上
发布2020-08-04 20:11:41
1.8K0
发布2020-08-04 20:11:41
举报
文章被收录于专栏:后台技术底层理解

Flink 的算子函数和spark的大致一样,但是由于其是流处理的模式,所有还要有需要加强理解的地方

Flink 中 和spark算子一致的算子

  1. Map, FlaMap 做一对一,一对多映射
  2. Reuce 多对一进行聚合
  3. 聚合函数,sum,min,minBy,MaxBy 等
  4. keyBy 按Key进行分组 名字不一样但是操作一样。

Flink 特有的或需要重新理解的算子

  1. 窗口函数: 窗口函数用于对每一个key开窗口,windowsAll 全体元素开窗口
代码语言:javascript
复制
text.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5)))
text.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))

窗口函数实际上分为滚动时间窗口,滑动时间窗口,会话窗口

滚动时间窗口不会发生重叠, 滑动时间窗口,当步长小于窗口大小,就会重叠。

会话窗口是根据相邻时间间隔确定窗口边界

全局窗口必须定义触发器

在窗口内也可以进行其他的操作

  1. 窗口连接

两个数据源相同窗口内的连接

代码语言:javascript
复制
text.join(windowCounts)
        .where()
        .equalTo()
        .window(TumblingEventTimeWindows.of(Time.seconds(5)))
        .apply((e1,e2) => e1 + "," + e2)
代码语言:javascript
复制
0 1      2
0 1   2  3

0,1 0,1 1,0 1,0      2,2  3,2   一个窗口一个窗口内连接

间隔连接,以一个元素去连接一个窗口形成锥形

代码语言:javascript
复制
  text.keyBy(0).intervalJoin(windowCounts.keyBy(0))
      .between(Time.milliseconds(-2),Time.milliseconds(2))
        .process(new ProcessJoinFunction[Integer,Integer,String] {
          override def processElement(in1: Integer, in2: Integer, context: ProcessJoinFunction[Integer, Integer, String]#Context, collector: Collector[String]): Unit = {
            collector.collect(in1 + "," + in2)
          }
        })
    ```
    ```
0  1         6

0     2     3

以2 进行聚合 2,0 2,1 
  1. 数据分区

数据分区的好处是,如果分区数和算子数一致,则他们会直接运行到一个节点,通过内存进行传输,减少网络带宽的压力

自定义分区 :

代码语言:javascript
复制
text.partitionCustom(partitioner,"key")

使用shuffle() 进行均匀分区

代码语言:javascript
复制
text.shuffle()`

使用负载均衡的轮询调度算法进行数据分区

代码语言:javascript
复制
text.rebalance

可伸缩动态分区,使数据尽可能在一个slot内流转,减少网络开销

代码语言:javascript
复制
dataStream.rescale()

广播分区,每一个元素广播到下一个节点

代码语言:javascript
复制
text.broadcast()
  1. 资源共享

Flink 将多个任务连接成一个任务在一个线程中执行,以实现资源共享

(1) 创建链, 开启作业优化

代码语言:javascript
复制
dataStream.map(..).map(...).startNewChain().map(...)

(2) Slot共享组

在同一个组所有任务在同一个实例中运行

代码语言:javascript
复制
dataStream.map(...).slotSharingGroup("name")

(3) 关闭作业优化

代码语言:javascript
复制
dataStream.map(...).disableChaining()
  1. RichFunction函数

处理函数生命周期和获取函数上下文能力的算子

代码语言:javascript
复制
@Public
public abstract class AbstractRichFunction implements RichFunction, Serializable {
    private static final long serialVersionUID = 1L;
    private transient RuntimeContext runtimeContext;

    public AbstractRichFunction() {
    }
    
    public void setRuntimeContext(RuntimeContext t) {
        this.runtimeContext = t;
    }
    // 得到上下文函数 
    public RuntimeContext getRuntimeContext() {
        if (this.runtimeContext != null) {
            return this.runtimeContext;
        } else {
            throw new IllegalStateException("The runtime context has not been initialized.");
        }
    }

    public IterationRuntimeContext getIterationRuntimeContext() {
        if (this.runtimeContext == null) {
            throw new IllegalStateException("The runtime context has not been initialized.");
        } else if (this.runtimeContext instanceof IterationRuntimeContext) {
            return (IterationRuntimeContext)this.runtimeContext;
        } else {
            throw new IllegalStateException("This stub is not part of an iteration step function.");
        }
    }
    // 自定义初始化和销毁函数
    public void open(Configuration parameters) throws Exception {
    }

    public void close() throws Exception {
    }
}
  1. 触发器

基于事件的触发器

(1)onElement 窗口没收到一个元素,调用该方法

(2)onProcessingTime 根据注册处理时间进行触发,定时可以参数设定

(3)onEventTime 根据注册事件时间进行触发,定时可以参数设定

(4)onMerge 两个窗口合并时触发

  1. 清除器

在触发器后函数执行窗口前或者后执行清除的操作

evictor()可以在触发器后,窗口执行前或者后都可以触发

  1. 状态分类
代码语言:javascript
复制
val env = StreamExecutionEnvironment.getExecutionEnvironment()

env.setStateBackend(...)

设置状态后端,内存,JVM堆内存,JVM堆外内存,

9.检查点

检查点是Flink实现 exactly-once 语义的核心机制,启用检测点,需要:

(1) 支持时空穿梭的外部数据源, kafka 和 分布式文件系统

(2) 可持久化状态的外部存储, 如分布式文件系统。

检查点默认是关闭的,启用检查点需要配置 一致性级别, exactly-once

检测超时时间,

Kafka进行流计算实例

  1. 创建连接器

添加kafka source

代码语言:javascript
复制
  // 设置配置文件
          val properties = new Properties()
          properties.setProperty("bootstraps.servers","localhost:9092")
          properties.setProperty("zookeeper.connect","localhost:2181")

          // 设置消费组
          properties.setProperty("group.id","test")
          val myConsumer = new FlinkKafkaConsumer09[String]("topic",
                           new SimpleStringSchema(),properties)
          stream = env.addSource(myConsumer)
  1. 创建反序列化器
代码语言:javascript
复制
// https://mvnrepository.com/artifact/org.apache.flink/flink-avro
compile group: 'org.apache.flink', name: 'flink-avro', version: '1.7.1'
  1. 设置消息起始位置的偏移 设置 据上一次的偏移位置
代码语言:javascript
复制
myConsumer.setStartFromGroupOffsets()

// 从最早和最晚开始记录

代码语言:javascript
复制
myConsumer.setStartFromEarliest()

myConsumer.setStartFromLatest()

// 从固定的时间点开始

代码语言:javascript
复制
myConsumer.setStartFromTimestamp(23L)

// 设置分区的起始位置

代码语言:javascript
复制
val specificStartOffsets = new java.util.HashMap[]
  1. 设置检查点
代码语言:javascript
复制
env.enableCheckpointing(5000)
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Flink 中 和spark算子一致的算子
  • Flink 特有的或需要重新理解的算子
  • Kafka进行流计算实例
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档