专栏首页一名叫大蕉的程序员求求你大蕉别学了之 Flink No.127

求求你大蕉别学了之 Flink No.127

求求你大蕉别学了。我偏不。

Flink ,为纯粹的流计算为生的一个大数据项目,玩一波先。跟 Spark 有什么区别呢?其实就一个区别,Spark 永远是批量处理,Flink 可以批量也可以实时流。啥意思呢?就是 Spark 没有一批就不处理就存着,永远只能准实时,而 Flink 拿到就处理拿到就处理,跟家里自来水似的,来多少处理多少。

废话不多说,先上手再说,什么原理什么的,后边再看,属于重要但不紧急的事情。我先当你装了 maven 和 IDEA 了,先用maven 初始化一下项目,试试看这个没有毒,是官网上的初始化方法。

mvn archetype:generate \

-DarchetypeGroupId=org.apache.flink \

-DarchetypeArtifactId=flink-quickstart-java \

-DarchetypeVersion=1.7.0 \

-DgroupId=wiki-edits \

-DartifactId=wiki-edits \

-Dversion=0.1 \

-Dpackage=wikiedits \

-DinteractiveMode=false

下边介绍几种学完基本能吹牛逼的用法。

首先命令行先初始化一个 scoket 输入源,下边所有的步骤都用得到,对就这样开。

nc -lk 9999

招式1 : 纯流式处理

val text = env.socketTextStream("localhost",9999);
val count = text.flatMap(_.toLowerCase.split("\\W+").filter(_.nonEmpty))
          .map( (_,1))

count.print();
System.out.println(env.getExecutionPlan(1));
env.execute("Flink Streaming Scala API Skeleton")

用过Spark 的同学应该都很清楚, flatMap 就是把每一个值变成一个列表然后所有的小列表打平成一个大的列表,filter 就是过滤出符合条件的值们。所以这坨代码的大概意思就是把输入的字符串,先按空格切割一下,然后输出掉,这里其中的 count 是 Flink 的其中一种 sink 。当然你也可以自己定义自己的 sink ,譬如这样。

count.addSink(new SinkFunction[(String, Int)] {
  // DO SOME THING
})

ps: sink 是 Flink 的输出叫法,一次对外的输出叫一次 sink,sink 的方式有很多种,比如写文件,写数据库,调用其他 rpc 接口等等。

招式2 : 每隔三个进行进行一次处理

val count = text.flatMap(_.toLowerCase.split("\\W+").filter(_.nonEmpty))
  .map( (_,1))
  .countWindowAll(3L)
  .aggregate(new MyAggregate())


这就是传说中的小量批处理啦,Spark 就无法原生支持这种吧?每三个作为一批进行批量处理。

招式3 : 每隔三个对过去十个进行一次处理

val count = text.flatMap(_.toLowerCase.split("\\W+").filter(_.nonEmpty))
  .map( (_,1))
  .countWindowAll(10L,3L)
  .aggregate(new MyAggregate())
这个就更加棒棒了,我们经常都会有这样的需求,每接收到3笔订单统计过去10笔订单的订单总额,这也是传说中的一个数据在多个窗口里统计。这个有什么卵用呢?

招式4 : 每隔三秒进行进行一次处理

val count = text.flatMap(_.toLowerCase.split("\\W+").filter(_.nonEmpty))
  .map( (_,1))
    .timeWindowAll(Time.seconds(3))
  .aggregate(new MyAggregate())

这种小批量就是每隔一定的时间窗口进行一次数据处理,这就不多说了,跟没什么特殊了。大概就是类似金拱门工作人员,每隔三秒钟看看有多少个麦包包,一起拿起来打包这样 (卧槽好饿.(๑>؂<๑)۶好吃)。

招式5 : 每隔三秒对过去十秒的数据进行进行一次处理

val count = text.flatMap(_.toLowerCase.split("\\W+").filter(_.nonEmpty))
  .map( (_,1))
    .timeWindowAll(Time.seconds(10),Time.seconds(3))
  .aggregate(new MyAggregate())

这就牛逼了,老板每天都会有这样的需求,你给我看看过去十秒咱赚了多少钱?每隔三秒钟计算一次。这种需求你还要存储过去十秒的数据,其实是比较困难的,在以前那种没有状态的流计算里边。

大蕉 : 1、2、3 老板咱赚了一栋楼

老板 : 好好干,明年哥给你娶个嫂子。

大蕉 : zzz...

Flink 常规用法就是这么简单,自己玩玩看吧。那么上边那个myAggregate 是啥玩意呢?说实话我找了半天没找到 Flink 中类似Spark 的reduceByKey,很生气,自己实现了一个统计数字的版本,丢一波吧,自己随意看看,大概就是实现一个 aggregate 的方法。其中需要实现四个方法。

1、创建一个收集器

2、定义怎么merge 两个收集器

3、定义怎么获取收集器的结果

4、定义一个元素怎么加入到收集器中。

这样四个步骤都实现完,就实现了reduce,至于怎么reduceByKey,自己想想然后看看下边的步骤,相信你能看懂的,看不懂留言。。

关于AggregateFunction 的接口定义是长这样的,有三个泛型,分别是输入元素类型 IN(input),收集器类型 ACC(accumulator),返回结果的类型 OUT(output)。

public interface AggregateFunction<IN, ACC, OUT> {
  ACC createAccumulator();
  ACC add(IN value, ACC accumulator);
  OUT getResult(ACC accumulator);
  ACC merge(ACC a, ACC b);
}
class MyAggregate extends AggregateFunction[(String, Int), Map[String, Int], Map[String, Int]] {

  var map = new HashMap[String, Int];
  override def createAccumulator(): Map[String, Int] = {
    return map;
  }

  override def merge(a: Map[String, Int], b: Map[String, Int]): Map[String, Int] = {
/**
  * 把两个Map里边的值拿出来,key如果一样的把他们的value加到一起
  */
val keySet1 = a.keySet;
    val keySet2 = b.keySet;
    val keySetAll = keySet1.++(keySet2);
    var result = new HashMap[String, Int];
    keySetAll.foreach(key => {
      val aCounting = a.get(key).getOrElse(0);
      val bCounting = b.get(key).getOrElse(0);
      val countingAll = (key, aCounting + bCounting);
      result = result.+(countingAll);
    });
    return result;
  }

  override def getResult(accumulator: Map[String, Int]): Map[String, Int] = {
    return accumulator;
  }

  override def add(value: (String, Int), accumulator: Map[String, Int]): Map[String, Int] = {
/**
  * 如果收集器中没有,那么把自己加进去.如果收集器中有,把自己的value和收集器中的value相加一下然后放回去
  */
    val key = value._1;
    val counting = value._2;
    if (!accumulator.contains(key)) {
      return accumulator.+(value);
    }

    val currentCounting = accumulator.get(key).getOrElse(0);
    val tempValue = (key, counting + currentCounting);

    return accumulator.+(tempValue);

  }
}

最近买了极客时间的 10x 程序员工作法,感觉还是挺不错的,看一看不亏。

有一段时间,网上流传着一个帖子,亚马逊 CTO 介绍亚马逊是如何开发一项产品的,简单来说,他们采用向后工作的方法,开发一项产品的顺序为: 写新闻稿 。 写 FAQ 。 写用户文档 。 写代码 。

本文分享自微信公众号 - 一名叫大蕉的程序员(DaBananaTalk)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2018-12-30

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 分布式文件系统.get(LEVEL1)No.104

    如果你有两个小时,还有IDE,还懂一点点 Java ,来坐下来,半个小时教你自己写一个分布式文件系统,还有电脑。因为这篇文章在手机完全没法看。。

    大蕉
  • 大数据计数原理1+0=1这你都不会算(三)No.51

    这是本坑的第三篇,之前已经说了关于 HashSet 和 BitMap 了,这次说说 Bloom Filter 布隆过滤器,要是还不知道前面讲了啥的,可以点一下下...

    大蕉
  • Spark你一定学得会(一)No.7

    我是小蕉。 上一篇大家说没有干货,妈蛋回南天哪来的干货你告诉我!!!还好这几天天气还不错,干货来了。 首先祭上今天关键代码,要做的事情就是从Hive表中取得年龄...

    大蕉
  • Swift3.0 - 扩展

    b.如果想要在定义协议的时候,不指定变量名称,在实现协议的时候,再去设定变量类型,应该怎么写?

    酷走天涯
  • 类似微博等社交软件中用户关注关系的存储实现方案遐想

    本文主要对设计方案进行一些思考及测试,思考结果的正确性无法保证,测试结果保证正确.

    呼延十
  • 【Flink】Flink流应用开发

    (1)Flink简介 Flink是一个低延迟、高吞吐、统一的大数据分布式实时计算引擎,使用官网的一句话来介绍Flink就是“Stateful Computat...

    魏晓蕾
  • Android:关于Kotlin的入门语法指南(类、变量 & 函数)都总结在这里了!

    Kotlin的基本数值类型有六种:Byte、Short、Int、Long、Float、Double

    Carson.Ho
  • Java 数据表映射

    Mirror王宇阳
  • 你说你是高工,String有多长也不知道?

    String类是由final修饰的,所以是不能被继承的①,我们在对字符串进行比较时,一般是期望对比其中的字符串是否一样,所以这里我们不能用"=="进行字符串的比...

    吴延宝
  • R包vegan执行非参数多元方差分析(置换多元方差分析)

    当因变量不止一个时,即一个或多个因子变量对应了多个因变量时,可使用多元方差分析(MANOVA)。但是MANOVA的条件非常苛刻,以前文简述的单因素MANOVA为...

    用户7585161

扫码关注云+社区

领取腾讯云代金券