前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >002. Flink入门案例-WordCount批处理

002. Flink入门案例-WordCount批处理

作者头像
CoderJed
发布2019-07-25 16:50:53
8190
发布2019-07-25 16:50:53
举报
文章被收录于专栏:Jed的技术阶梯Jed的技术阶梯

1. WordCount批处理Java版

代码语言:javascript
复制
package com.bairong.flink.java;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/**
 * author: YangYunhe
 * date: 2019/7/23 19:41
 * description:
 */
public class BatchWordCountJava {

    public static void main(String[] args) throws Exception {

        /*
         * 输入路径可以是一个目录,代表读取该目录下的所有文件,也可以是一个具体的文件
         * 输出路径:当最终的文件只有一个的时候,把输出路径看成一个文件
         * 当最终的文件有多个的时候,把输出路径看成一个目录
         */
        String inputPath = "D:\\space\\idea\\course\\learning-flink\\inputPath\\words.txt";
        String outputPath = "D:\\space\\idea\\course\\learning-flink\\outputPath\\wordcount_batch_result.txt";

        // 1. 获取运行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // 2. 获取文件内容
        DataSource<String> source = env.readTextFile(inputPath);

        // 3. 扁平化,每个单次计数为1,分组,累加次数
        AggregateOperator<Tuple2<String, Long>> sum = source.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Long>> collector) throws Exception {
                String[] words = line.toLowerCase().split("\\W+");
                for (String word : words) {
                    collector.collect(new Tuple2<String, Long>(word, 1L));
                }
            }
        }).groupBy(0).sum(1);

        // 4. 写到指定目录下,设置换行符和Tuple元素之间的分隔符
        sum.writeAsCsv(outputPath, "\n", ", ").setParallelism(1);

        env.execute("BatchWordCountJava");

    }

}

2. WordCount批处理Scala版

代码语言:javascript
复制
package com.bairong.flink.scala

import org.apache.flink.api.scala._

/**
  * author: YangYunhe
  * date: 2019/7/23 20:13
  * description: 
  */
object BatchWordCountScala {

  def main(args: Array[String]): Unit = {

    val inputPath = "D:\\space\\idea\\course\\learning-flink\\inputPath\\words.txt"
    val outputPath = "D:\\space\\idea\\course\\learning-flink\\outputPath\\wordcount_batch_result.txt"

    val environment = ExecutionEnvironment.getExecutionEnvironment

    val source: DataSet[String] = environment.readTextFile(inputPath)

    source.flatMap(_.toLowerCase.split("\\W+"))
      .map((_, 1))
      .groupBy(0)
      .sum(1)
      .writeAsCsv(outputPath, "\n", ", ")
      .setParallelism(1)

    environment.execute("BatchWordCountScala")

  }

}

3. 流处理和批处理在代码层面的区别

  • Flink运行环境
代码语言:javascript
复制
# 流处理
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

# 批处理
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  • 获取数据后的类型
代码语言:javascript
复制
# 流处理
DataStreamSource<String> source = env.socketTextStream(host, port);

# 批处理
DataSource<String> source = env.readTextFile(inputPath);
  • 数据处理后类型
代码语言:javascript
复制
# 流处理
SingleOutputStreamOperator<WordWithCount> counts = source.flatMap(...)...

# 即处理数据后的类型为DataStream
SingleOutputStreamOperator<T> extends DataStream<T>

# 批处理
AggregateOperator<Tuple2<String, Long>> sum = source.flatMap(...)...

# 即处理数据后的类型为DataSet
AggregateOperator<IN> extends SingleInputOperator<IN, IN, AggregateOperator<IN>>
SingleInputOperator<IN, OUT, O extends SingleInputOperator<IN, OUT, O>> extends Operator<OUT, O>
Operator<OUT, O extends Operator<OUT, O>> extends DataSet<OUT>
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019.07.24 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. WordCount批处理Java版
  • 2. WordCount批处理Scala版
  • 3. 流处理和批处理在代码层面的区别
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档