前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink中: 你的Function是如何被执行的

Flink中: 你的Function是如何被执行的

作者头像
Flink实战剖析
发布2022-11-21 20:21:16
8170
发布2022-11-21 20:21:16
举报
文章被收录于专栏:Flink实战剖析Flink实战剖析

在Flink编程中,不管你是使用DataStream api还是 Table/SQL ,接触最多的就是UserFunction , 比喻说MapFunction、ScalarFunction, 在这些Function 里面可以自定义用户的业务处理逻辑,但是这些Function是如何被调用的呢?本文主要介绍Function 被调用的流程以及对应的方法如何被调用的。

核心调用逻辑

当我们编写完成一个Flink-Job 就会将代码打包成为jar提交到集群中去,当整个资源申请、任务调度完成之后就开始执行这个job,从source到transform 到最后sink 都是在TaskManager 资源节点中执行。Flink-Job 会被划分为一个个Task(整个任务中的一部分处理逻辑)节点, 每一个Task节点都在一个Thread中执行,在这个Thread中会不断的调用UserFunction的相应方法(如上图)。这个是一个大概的处理流程, 让用户只需要关心自身业务处理逻辑,无须关心网络通信、数据传输等流程。

接下来介绍具体的调用逻辑:

当JobMaster 向TaskManager 提交Task(整个任务中的一部分处理逻辑)时,会携带该Task的相关信息, 之后:

  1. org.apache.flink.runtime.taskmanager.Task

创建一个Task对象,代表了并发Task中一个subTask, 里面包含了一个Thread对象,也就是提交的任务会在该Thread中执行。

  1. org.apache.flink.streaming.runtime.tasks.StreamTask

在Task中会创建StreamTask对象, 在StreamTask中完成任务的初始化工作(配置、operatorchain构建、状态恢复等)之后,执行数据处理流程。

  1. org.apache.flink.streaming.runtime.tasks.OperatorChain

Flink优化中有一环是operator-chain, 即将满足一定规则的operator链在一起,他们之前以函数调用的方式执行,减少(网络)数据传输,OperatorChain就代表了多个Operator。

  1. org.apache.flink.streaming.api.operators.StreamOperator

StreamOperator 代表了具体执行的某一个Operator, 每一个UserFunction都会有对应的StreamOperator, 例如MapFunction对应 StreamMap、KeyedProcessFunction 对应KeyedProcessOperator, 也就是在这些Operator中完成对应Function的调用。

Method 是如何被调用的

我们通常定义一个Function , 实现其相关的方法,例如MapFunction 实现map方法、WindowFunction 实现apply方法、KeyedProcessFunction 实现open/processElement/onTimer 方法,如果你的Function 还实现了CheckpointedFunction/CheckpointListener接口, 那么还得实现对应的initializeState、snapshotState、notifyCheckpointComplete方法等等。这些所有的方法都由对应的Operator调用, 下面以MapFunction 对应的StreamMap 这个operator 为例理解Function的调用。

代码语言:javascript
复制
public class StreamMap<IN, OUT> extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
        implements OneInputStreamOperator<IN, OUT> {

    private static final long serialVersionUID = 1L;

    public StreamMap(MapFunction<IN, OUT> mapper) {
        super(mapper);
        chainingStrategy = ChainingStrategy.ALWAYS;
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        output.collect(element.replace(userFunction.map(element.getValue())));
    }
}
  • userFunction 即代表自定义的MapFunction , 其被processElement 方法内部调用, 而processElement 是被上面提到的StreamTask以OperatorChain 方式不断被调用。
  • StreamMap 继承了抽象的AbstractUdfStreamOperator 类, 该operator 包含了所有userFunction 的方法调用。(部分代码如下)
代码语言:javascript
复制
public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
extends AbstractStreamOperator<OUT> implements OutputTypeConfigurable<OUT> {

private static final long serialVersionUID = 1L;

/** The user function. */
protected final F userFunction;


@Override
public void setup(
StreamTask<?, ?> containingTask,
StreamConfig config,
Output<StreamRecord<OUT>> output) {
super.setup(containingTask, config, output);
FunctionUtils.setFunctionRuntimeContext(userFunction, getRuntimeContext());
}

@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
StreamingFunctionUtils.snapshotFunctionState(
context, getOperatorStateBackend(), userFunction);
}

@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
StreamingFunctionUtils.restoreFunctionState(context, userFunction);
}

@Override
public void open() throws Exception {
super.open();
FunctionUtils.openFunction(userFunction, new Configuration());
}

@Override
public void close() throws Exception {
super.close();
functionsClosed = true;
FunctionUtils.closeFunction(userFunction);
}

@Override
public void dispose() throws Exception {
super.dispose();
if (!functionsClosed) {
functionsClosed = true;
FunctionUtils.closeFunction(userFunction);
}
}

// ------------------------------------------------------------------------
//  checkpointing and recovery
// ------------------------------------------------------------------------

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
super.notifyCheckpointComplete(checkpointId);

if (userFunction instanceof CheckpointListener) {
((CheckpointListener) userFunction).notifyCheckpointComplete(checkpointId);
}
}

@Override
public void notifyCheckpointAborted(long checkpointId) throws Exception {
super.notifyCheckpointAborted(checkpointId);

if (userFunction instanceof CheckpointListener) {
((CheckpointListener) userFunction).notifyCheckpointAborted(checkpointId);
}
}
}

可以看出对于UserFunction 中method的调用核心点就在operator,每个不同的UserFunction 会对应不同的operator, 但是都会继承这个抽象的

AbstractUdfStreamOperator类, 通过这个类可以熟知其整体调用链路。

总结

本文主要以调用者的视角窥探UserFunction 的调用流程以及具体method 调用逻辑。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-06-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Flink实战剖析 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 核心调用逻辑
  • Method 是如何被调用的
  • 总结
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档