专栏首页SmartSiFlink1.4 累加器与计数器

Flink1.4 累加器与计数器

1. 概述

累加器(Accumulators)是一个简单的构造器,具有加法操作和获取最终累加结果操作,在作业结束后可以使用。

最直接的累加器是一个计数器(counter):你可以使用Accumulator.add()方法对其进行累加。在作业结束时,Flink将合并所有部分结果并将最终结果发送给客户端。在调试过程中,或者你快速想要了解有关数据的更多信息,累加器很有用。

目前Flink拥有以下内置累加器。它们中的每一个都实现了累加器接口:

(1) IntCounter, LongCounter 以及 DoubleCounter: 参阅下面示例中使用的计数器。

(2) Histogram:为离散数据的直方图(A histogram implementation for a discrete number of bins.)。内部它只是一个整数到整数的映射。你可以用它来计算值的分布,例如 单词计数程序的每行单词分配。

2. 如何使用

首先,你必须在你要使用的用户自定义转换函数中创建一个累加器(accumulator)对象(这里是一个计数器):

private IntCounter numLines = new IntCounter();

其次,你必须注册累加器(accumulator)对象,通常在rich函数的open()方法中注册。在这里你也可以自定义累加器的名字:

getRuntimeContext().addAccumulator("num-lines", this.numLines);

现在你就可以在算子函数中的任何位置使用累加器,包括在open()close()方法中:

this.numLines.add(1);

最后结果将存储在JobExecutionResult对象中,该对象从执行环境的execute()方法返回(当前仅当执行等待作业完成时才起作用):

JobExecutionResult result = env.execute();
long lineCounter = result.getAccumulatorResult("num-lines");
System.out.println(lineCounter);

每个作业的所有累加器共享一个命名空间。因此,你可以在作业的不同算子函数中使用同一个累加器。Flink在内部合并所有具有相同名称的累加器。

备注:

目前累加器的结果只有在整个工作结束之后才可以使用。我们还计划在下一次迭代中可以使用前一次迭代的结果。你可以使用聚合器来计算每次迭代的统计信息,并基于此类统计信息来终止迭代。

3. Example

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.qunar.innovation.data.bean.AdsPushBehavior;
import com.qunar.innovation.data.utils.ConstantUtil;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;

public class AdsPushParseMap extends RichMapFunction<String, AdsPushBehavior> {

    private static Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create();
    private final LongCounter behaviorCounter = new LongCounter();

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        getRuntimeContext().addAccumulator(ConstantUtil.ADS_PUSH_APP_CODE, behaviorCounter);
    }

    @Override
    public AdsPushBehavior map(String content) throws Exception {

        try{
            // 解析
            AdsPushBehavior adsPushBehavior = gson.fromJson(content, AdsPushBehavior.class);
            this.behaviorCounter.add(1);
            return adsPushBehavior;
        }
        catch (Exception e){
            e.printStackTrace();
        }
        return null;

    }
}
import com.qunar.innovation.data.TestFlink;
import com.qunar.innovation.data.functions.*;
import com.qunar.innovation.data.utils.ConstantUtil;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.core.fs.FileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AdsPushLocalStream {

    private final static Logger LOGGER = LoggerFactory.getLogger(TestFlink.class);

    public static void main(String[] args) {

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<String> dataSet = env.readTextFile("file:///home/xiaosi/input.txt");

        // 处理数据
        DataSet<String> adsPushDataSet = dataSet.map(new ContentMap()).name("contentMap").setParallelism(1).
                map(new AdsPushParseMap()).name("behaviorMap").setParallelism(1)
                .map(new AdsPushFeatureMap()).name("featureMap").setParallelism(1)
                .filter(new AdsPushFeatureFilter()).name("featureFilter").setParallelism(1);

        adsPushDataSet.writeAsText("file:///home/xiaosi/output", FileSystem.WriteMode.OVERWRITE);

        try {
            JobExecutionResult result = env.execute();
            long behaviorCounter = result.getAccumulatorResult(ConstantUtil.ADS_PUSH_APP_CODE);
            System.out.println(behaviorCounter);
        } catch (Exception e) {
            LOGGER.error(e.getMessage(), e);
        }

    }
}

3. 自定义累加器

为了实现你自己的累加器,你只需要编写你的Accumulator接口的实现。如果你认为你的自定义累加器应与Flink一起传输,请随意创建一个拉取请求(Feel free to create a pull request if you think your custom accumulator should be shipped with Flink.)。

你可以选择实现AccumulatorSimpleAccumulator

Accumulator<V,R>非常灵活:它为要添加的值定义一个类型V,并为最终结果定义一个结果类型R。例如,对于直方图,V是数字,R是直方图。SimpleAccumulator适用于两种类型相同的情况,例如,计数器。

备注:

Flink版本:1.4

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Flink1.4 安装与启动

    Flink 可以运行在 Linux, Mac OS X和Windows上。为了运行Flink, 唯一的要求是必须在Java 7.x (或者更高版本)上安装。Wi...

    smartsi
  • Spark 如何使用累加器Accumulator

    Accumulator 是 spark 提供的累加器,累加器可以用来实现计数器(如在 MapReduce 中)或者求和。Spark 本身支持数字类型的累加器,程...

    smartsi
  • Spark2.3.0 共享变量

    通常情况下,传递给 Spark 操作(例如 map 或 reduce)的函数是在远程集群节点上执行的,函数中使用的变量,在多个节点上执行时是同一变量的多个副本。...

    smartsi
  • 这是中国为什么投资印尼的原因

    印尼不仅拥有发展电子商务的巨大潜力,据预测印尼电子商务市场规模在2020年达到1300亿美元,并且这一市场还在不断的扩大。雅加达和爪哇岛的其他大城市仍然是核心...

    点滴科技资讯
  • 上千家现金贷盯上印尼,这是逃生出口,还是新的狂欢?

    文 | 洛桑 零和 现金贷的监管落地,行业的野蛮生长结束,进入强监管阶段。 行业正在苦寻出口,而想到的第一站,就是印尼。 每月上百家企业组团去印尼考察,且已有百...

    企鹅号小编
  • 项目不知道如何做性能优化?不妨试一下代码分割

    最近我们在前端校招面试冲刺群里辅导简历的时候,经常有同学感叹不知道怎么优化项目,那不妨尝试下在项目中引入代码分割的方式提升性能。

    前端劝退师
  • LeetCode 82. 删除排序链表中的重复元素 II(链表)

    给定一个排序链表,删除所有含有重复数字的节点,只保留原始链表中 没有重复出现 的数字。

    Michael阿明
  • 基于keras实现VGG-19网络的音频分类

    在这篇文章中,我将针对音频分类的问题。我将根据音频波形训练VGG-19的音频分类器。下边是整个项目的步骤和代码:

    深度学习与Python
  • [Leetcode][python]Valid Sudoku/有效的数独

    判断一个数度棋盘是否合理,不需要能解。 1. 横向0-9 2. 纵向0-9 3. 小方格0-9

    后端技术漫谈
  • spring-boot 速成(8) 集成druid+mybatis

    spring-boot与druid、mybatis集成(包括pageHelper分页插件), 要添加以下几个依赖项: compile('mysql:my...

    菩提树下的杨过

扫码关注云+社区

领取腾讯云代金券