前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink1.4 累加器与计数器

Flink1.4 累加器与计数器

作者头像
smartsi
发布2019-08-07 08:31:01
2.6K0
发布2019-08-07 08:31:01
举报
文章被收录于专栏:SmartSi

1. 概述

累加器(Accumulators)是一个简单的构造器,具有加法操作和获取最终累加结果操作,在作业结束后可以使用。

最直接的累加器是一个计数器(counter):你可以使用Accumulator.add()方法对其进行累加。在作业结束时,Flink将合并所有部分结果并将最终结果发送给客户端。在调试过程中,或者你快速想要了解有关数据的更多信息,累加器很有用。

目前Flink拥有以下内置累加器。它们中的每一个都实现了累加器接口:

(1) IntCounter, LongCounter 以及 DoubleCounter: 参阅下面示例中使用的计数器。

(2) Histogram:为离散数据的直方图(A histogram implementation for a discrete number of bins.)。内部它只是一个整数到整数的映射。你可以用它来计算值的分布,例如 单词计数程序的每行单词分配。

2. 如何使用

首先,你必须在你要使用的用户自定义转换函数中创建一个累加器(accumulator)对象(这里是一个计数器):

代码语言:javascript
复制
private IntCounter numLines = new IntCounter();

其次,你必须注册累加器(accumulator)对象,通常在rich函数的open()方法中注册。在这里你也可以自定义累加器的名字:

代码语言:javascript
复制
getRuntimeContext().addAccumulator("num-lines", this.numLines);

现在你就可以在算子函数中的任何位置使用累加器,包括在open()close()方法中:

代码语言:javascript
复制
this.numLines.add(1);

最后结果将存储在JobExecutionResult对象中,该对象从执行环境的execute()方法返回(当前仅当执行等待作业完成时才起作用):

代码语言:javascript
复制
JobExecutionResult result = env.execute();
long lineCounter = result.getAccumulatorResult("num-lines");
System.out.println(lineCounter);

每个作业的所有累加器共享一个命名空间。因此,你可以在作业的不同算子函数中使用同一个累加器。Flink在内部合并所有具有相同名称的累加器。

备注:

代码语言:javascript
复制
目前累加器的结果只有在整个工作结束之后才可以使用。我们还计划在下一次迭代中可以使用前一次迭代的结果。你可以使用聚合器来计算每次迭代的统计信息,并基于此类统计信息来终止迭代。

3. Example

代码语言:javascript
复制
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.qunar.innovation.data.bean.AdsPushBehavior;
import com.qunar.innovation.data.utils.ConstantUtil;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;

public class AdsPushParseMap extends RichMapFunction<String, AdsPushBehavior> {

    private static Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create();
    private final LongCounter behaviorCounter = new LongCounter();

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        getRuntimeContext().addAccumulator(ConstantUtil.ADS_PUSH_APP_CODE, behaviorCounter);
    }

    @Override
    public AdsPushBehavior map(String content) throws Exception {

        try{
            // 解析
            AdsPushBehavior adsPushBehavior = gson.fromJson(content, AdsPushBehavior.class);
            this.behaviorCounter.add(1);
            return adsPushBehavior;
        }
        catch (Exception e){
            e.printStackTrace();
        }
        return null;

    }
}
代码语言:javascript
复制
import com.qunar.innovation.data.TestFlink;
import com.qunar.innovation.data.functions.*;
import com.qunar.innovation.data.utils.ConstantUtil;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.core.fs.FileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AdsPushLocalStream {

    private final static Logger LOGGER = LoggerFactory.getLogger(TestFlink.class);

    public static void main(String[] args) {

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<String> dataSet = env.readTextFile("file:///home/xiaosi/input.txt");

        // 处理数据
        DataSet<String> adsPushDataSet = dataSet.map(new ContentMap()).name("contentMap").setParallelism(1).
                map(new AdsPushParseMap()).name("behaviorMap").setParallelism(1)
                .map(new AdsPushFeatureMap()).name("featureMap").setParallelism(1)
                .filter(new AdsPushFeatureFilter()).name("featureFilter").setParallelism(1);

        adsPushDataSet.writeAsText("file:///home/xiaosi/output", FileSystem.WriteMode.OVERWRITE);

        try {
            JobExecutionResult result = env.execute();
            long behaviorCounter = result.getAccumulatorResult(ConstantUtil.ADS_PUSH_APP_CODE);
            System.out.println(behaviorCounter);
        } catch (Exception e) {
            LOGGER.error(e.getMessage(), e);
        }

    }
}

3. 自定义累加器

为了实现你自己的累加器,你只需要编写你的Accumulator接口的实现。如果你认为你的自定义累加器应与Flink一起传输,请随意创建一个拉取请求(Feel free to create a pull request if you think your custom accumulator should be shipped with Flink.)。

你可以选择实现AccumulatorSimpleAccumulator

Accumulator<V,R>非常灵活:它为要添加的值定义一个类型V,并为最终结果定义一个结果类型R。例如,对于直方图,V是数字,R是直方图。SimpleAccumulator适用于两种类型相同的情况,例如,计数器。

备注:

代码语言:javascript
复制
Flink版本:1.4
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018-01-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 概述
  • 2. 如何使用
  • 3. Example
  • 3. 自定义累加器
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档