前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink 的三种WordCount(文末领取Flink书籍)

Flink 的三种WordCount(文末领取Flink书籍)

作者头像
Python编程爱好者
发布2022-09-21 21:09:11
8340
发布2022-09-21 21:09:11
举报
文章被收录于专栏:Python编程爱好者
Hi,大家好,我是 Johngo 呀!

今天是 Flink 从 0 到 1 系列的第 2 篇:《WordCount及FlinkSQL》。

目标:通过每天一小会儿,熟悉 Flink 大大小小知识点。

环境版本

JDK:1.8 Flink:1.13.6 Scala:2.12 github:https://github.com/xiaozhutec/FlinkProject1.13.6.git

创建Flink 工程网上已经很多说明方法了,这里先不赘述,以下全部的代码使用 IDEA 进行编码。

本文讲解的 WordCount 程序是大数据的入门程序。

WordCount 程序是在不同上下文环境下实现的,是一个入门版本,可以跟着一步一步实现起来。包括 Streaming 和 Batch 以及 SQL 的简单案例。

上述所有的 Flink 语义都会在后面分篇章详细赘述。

基础配置

首先pom.xml 中要配置的依赖是:

provided 选项在这表示此依赖只在代码编译的时候使用,运行和打包的时候不使用。

版本依赖

代码语言:javascript
复制
<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <flink.version>1.13.6</flink.version>
    <scala.version>2.12</scala.version>
</properties>

java 相关依赖:

代码语言:javascript
复制
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>${flink.version}</version>
  <!-- <scope>provided</scope> -->
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_${scala.version}</artifactId>
    <version>${flink.version}</version>
  <!-- <scope>provided</scope>-->
</dependency>

scala 相关依赖:

代码语言:javascript
复制
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
  <!-- <scope>provided</scope> -->
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
  <!-- <scope>provided</scope>-->
</dependency>

另外,pom文件中镜像文件建议配置maven仓库,国内下载速度会快,如果找不到对应的镜像文件,需要切换到国外仓库。

代码语言:javascript
复制
<repositories>
    <repository>
        <id>central</id>
        <name>aliyun maven</name>
        <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        <layout>default</layout>
        <!-- 是否开启发布版构件下载 -->
        <releases>
            <enabled>true</enabled>
        </releases>
        <!-- 是否开启快照版构件下载 -->
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
    </repository>
</repositories>

语言界的 hello word,大数据界的 WordCount,都是一个入门Demo。

今天咱们也按照这个入门的 Demo,把 Flink 相关代码捋顺。

包括 Streaming、Batch 以及 Flink Sql 三方面分别来实现。

Streaming WordCount

先来分析一个 Streaming WordCount。

为了模仿流式计算,咱们在本地利用 netcat 命令 nc -l {port}来进行模仿数据产出。

同时,咱们实现的功能是:每隔 1s 计算过去 2s 内产出数据各单词的个数,也就是实现每隔1s计算过去 2s 的 WordCount 程序。

将窗口内接收到的数据进行拆分致每一行,然后分别赋值为1,之后进行分组求和。

大致处理的流程如上所示,现在来一步一步实现这个案例。

先开始创建 Flink 的运行环境:

代码语言:javascript
复制
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

然后指定了数据 Source 源,以及 Source 源的一些配置:

代码语言:javascript
复制
String hostname = "127.0.0.1";
int port = 8899;
String delimiter = "\n";
// 链接 socket 获取数据
DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);

之后就进行了数据的平铺,分组,窗口计算等操作。

另外,程序中实现了一个内部类WordWithCount,用来表示单词的 key 和 count。

利用 keyBy()函数对 key进行分组。

window函数表示每一个滑动窗口,SlidingProcessingTimeWindows实现每隔 1s 对过去 2s 进行计数。

后面的教程会详细讲解 Windows 相关知识,这里仅做入门学习。

下面整体看下代码:

代码语言:javascript
复制
public class SocketWindowWCJava {
    public static void main(String[] args) throws Exception {
        // 获取流式运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        String hostname = "127.0.0.1";
        int port = 8899;
        String delimiter = "\n";
        // 获取数据源(Socket数据源,单词以逗号分割)
        DataStreamSource<String> source = env.socketTextStream(hostname, port, delimiter);
        SingleOutputStreamOperator<WC> res = source.flatMap(new FlatMapFunction<String, WC>() {

                    @Override
                    public void flatMap(String value, Collector<WC> out) throws Exception {
                        String[] splits = value.split(",");
                        for (String split : splits) {
                            out.collect(new WC(split, 1));
                        }
                    }
                }).keyBy(x -> x.word)
                .window(SlidingProcessingTimeWindows.of(Time.seconds(1), Time.seconds(2)))  // 每隔1秒,统计过去2秒的数据
                // .sum("count");
                .reduce(new ReduceFunction<WC>() {
                    @Override
                    public WC reduce(WC t1, WC t2) throws Exception {
                        return new WC(t1.word, t1.count+t2.count);
                    }
                });
        
        res.print().setParallelism(1);
        env.execute("SocketWindowWCJava");
    }

    public static class WC {
        public String word;
        public int count;

        public WC() {}
        public WC(String word, int count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return "WC{" +
                    "word='" + word + '\'' +
                    ", count=" + count +
                    '}';
        }
    }
}

现在把程序执行起来,先在本地起一个netcat程序,然后启动Flink程序:

代码语言:javascript
复制
$ nc -lk 8899
flink,flink,spark
hadoop,flink

之后,控制台进行了相应的打印:

用 java 实现完,接下来用 scala 也实现一下相同的逻辑,有兴趣的朋友可作参考:

代码语言:javascript
复制
object SocketWindowWCScala {
  def main(args: Array[String]): Unit = {
    // 获取运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val hostname = "localhost"
    val port = 8899
    val delimiter = '\n'
    val source = env.socketTextStream(hostname, port, delimiter)

    import org.apache.flink.api.scala._
    // 数据格式:word,word2,word3
    val res = source.flatMap(line => line.split(',')) // 将每一行按照逗号打平
      .map(word => WC(word, 1))
      .keyBy(x => x.word)
      .window(SlidingProcessingTimeWindows.of(Time.seconds(1), Time.seconds(2)))
      .reduce((v1, v2) => WC(v1.word, v1.count + v2.count))

    res.print("data: ").setParallelism(1)

    env.execute("SocketWindowWCScala")
  }

  case class WC(word: String, count: Long)
}

依然是启动 flink 程序和 nc:

代码语言:javascript
复制
nc -lk 8888
flink,flink,spark
hadoop,flink

再看控制台的打印结果,是和咱们想实现的一致:

再次注意:窗口的使用方式在新版本中有较大的区别,这个咱们在后面会详细把这部分进行讲解。

Batch WordCount

批处理程序,这里用一个文本来作为数据源。

将文本中的数据进行拆分致每一行,然后分别赋值为1,之后进行分组求和。

处理逻辑依然如图所示,然后下面咱们也创建一个文本如图里的内容(src/main/datas/dm.csv):

代码语言:javascript
复制
Java,Fink
Scala 
Streaming
Flink,Java 
Scala
Batch,Scala

首先创建 Flink 运行环境

代码语言:javascript
复制
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

之后进行读取文件

代码语言:javascript
复制
DataSource text = env.readTextFile(filePath);

然后通过实现 FlatMapFunction 接口进行数据的打平操作(上面类 Tokenizer 的实现)。

最后进行分组求和,Batch WordCount 全部完成!

下面看 Batch 整体代码:

代码语言:javascript
复制
public class WordCountJava {
    public static void main(String[] args) throws Exception {
        String filePath = "./datas/dm.csv";
        String resultPath = "./datas/wc_rst.csv";

        // 获取运行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSource<String> source = env.readTextFile(filePath);

        AggregateOperator<Tuple2<String, Integer>> res = source.flatMap(new JGFlatMapFunction())
                .groupBy(0)
                .sum(1);
        res.print();
        res.writeAsCsv(resultPath).setParallelism(1);

        env.execute("WordCountJava");
    }

    public static class JGFlatMapFunction implements FlatMapFunction<String, Tuple2<String, Integer>> {
 
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
            String[] splits = value.split(",");
            for (String split : splits) {
                out.collect(Tuple2.of(split, 1));
            }
        }
    }
}

程序中,通过读取./datas/dm.csv中的数据,最后计算结果打印到控制台以及存储结果数据到./datas/wc_rst.csv

执行起来,看打印结果:

求得给定文件的 WordCount 的结果。

下面用 Scala 实现一次:

代码语言:javascript
复制
object WordCountScala {
  def main(args: Array[String]): Unit = {
    val filePath = "./datas/dm.csv"
    val resultPath = "./datas/wc_rst.csv"

    // 获取运行环境
    val env = ExecutionEnvironment.getExecutionEnvironment

    val text = env.readTextFile(filePath)

    //引入隐式转换
    import org.apache.flink.api.scala._
    val counts = text.flatMap { _.toLowerCase.split(",") filter { _.nonEmpty } }
      .map((_, 1))
      .groupBy(0)
      .sum(1)
    counts.print()
    counts.writeAsCsv(resultPath, "\n", " ")
  }
}

用 Scala 实现起来就很简单了。

注意:这块如果代码出错的话,试着找找导入的包是否正确。

Flink SQL WordCount

尤其是有过 MapReduce 和 Hive 经历的朋友,就可以和它们放在一起做比较,一个复杂,一个简单。

比如说下面的 SQL 语句,就一句就可以省去上面那么多的代码工作量。

代码语言:javascript
复制
SELECT word, COUNT(*) FROM table GROUP BY word;

下面利用 FlinkSQL 实现 WordCount 功能。

首先,pom 文件必须要添加的依赖:

代码语言:javascript
复制
<!-- use the Table API & SQL for defining pipelines.-->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-scala-bridge_${scala.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<!-- run the Table API & SQL programs locally within your IDE,-->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_${scala.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

<!-- SQL Client-->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>${flink.version}</version>
    <!-- <scope>provided</scope>-->
</dependency>

先用 Java 来实现 FlinkSQL,将 nc 程序起来,进行按照逗号分割进行测试。

代码语言:javascript
复制
$ nc -lk 8899
spark,flink,spark
spark,flink,spark
...

a. 首先创建 Flink 的运行环境以及 SQL api 环境:

代码语言:javascript
复制
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

b. nc 输出,将字符串转换为 (word, 1L) 的格式:

代码语言:javascript
复制
SingleOutputStreamOperator<WC> dataStream = env.socketTextStream("localhost", 8899)
        .flatMap(new FlatMapFunction<String, WC>() {
    @Override
    public void flatMap(String value, Collector<WC> out) throws Exception {
        String[] splits = value.split(",");
        for (String split : splits) {
            out.collect(new WC(split, 1L));
        }
    }
});

c. 注册成表,转为视图&查询

代码语言:javascript
复制
Table WordCountTable = tableEnv.fromDataStream(dataStream);
tableEnv.createTemporaryView("WC", WordCountTable);
Table resultTable = tableEnv.sqlQuery("SELECT word, SUM(`count`) FROM WC group by word");

d. 转为 Stream 并且打印出来

代码语言:javascript
复制
tableEnv.toRetractStream(resultTable, Row.class).print().setParallelism(1);

下面看整体代码:

代码语言:javascript
复制
public class WordCountWithSQLJava {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        SingleOutputStreamOperator<WC> dataStream = env.socketTextStream("localhost", 8899)
                .flatMap(new FlatMapFunction<String, WC>() {
            @Override
            public void flatMap(String value, Collector<WC> out) throws Exception {
                String[] splits = value.split(",");
                for (String split : splits) {
                    out.collect(new WC(split, 1L));
                }
            }
        });

        //DataStream 转sql & 查询
        Table WordCountTable = tableEnv.fromDataStream(dataStream);
        tableEnv.createTemporaryView("WC", WordCountTable);
        Table resultTable = tableEnv.sqlQuery("SELECT word, SUM(`count`) FROM WC group by word");

        // 将结果数据转换为DataStream toRetractStream toAppendStream
        tableEnv.toRetractStream(resultTable, Row.class).print().setParallelism(1);
        env.execute("WCSQLJava");
    }

    public static class WC {
        public String word;
        public long count;

        public  WC() {}
        public WC(String word, long count) {
            this.word = word;
            this.count = count;
        }
        @Override
        public String toString() {
            return "WC {" +
                    "word='" + word + '\'' +
                    ", count=" + count +
                    '}';
        }
    }
}

整体代码执行结果:

其中,+是操作后,I 是插入,U是更新,D是删除。例如:-U是撤回前的数据,+U是更新后的数据

true代表数据插入,false代表数据的撤回

Java 实现后,下面再用 Scala 来实现一次,代码逻辑一致,可以参考:

代码语言:javascript
复制
object WordCountSQLScala {
  def main(args: Array[String]): Unit = {
    // 创建运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = StreamTableEnvironment.create(env)

    import org.apache.flink.api.scala._
    // 从 nc 接入数据, 数据格式:word,word2,word3
    val dataStream = env.socketTextStream("localhost", 8899, '\n')
      .flatMap(line => line.split(','))
      .map(word => WC(word, 1L))

    // 转换为一个表(table) & 查询
    val inputTable = tableEnv.fromDataStream(dataStream)
    tableEnv.createTemporaryView("WC", inputTable)
    val resultTable = tableEnv.sqlQuery("SELECT word, SUM(`count`) FROM WC GROUP BY word")

    // toAppendStream toRetractStream
    val resValue = tableEnv.toChangelogStream(resultTable)
    resValue.print().setParallelism(1)

    env.execute("WordCountSQLScala")
  }

  case class WC(word: String, count: Long)
}

代码执行的结果也一致:

总结

今天实现了大数据的经典案例 WordCount,然后在不同场景下的实现。包括 Streaming 和 Batch,以及 Flink SQL 的实现。

该篇文章还只是一个入门级的程序,后面将会各重要点进行详细阐述。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-04-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Johngo学长 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 环境版本
  • 基础配置
  • Streaming WordCount
  • Batch WordCount
  • Flink SQL WordCount
  • 总结
相关产品与服务
批量计算
批量计算(BatchCompute,Batch)是为有大数据计算业务的企业、科研单位等提供高性价比且易用的计算服务。批量计算 Batch 可以根据用户提供的批处理规模,智能地管理作业和调动其所需的最佳资源。有了 Batch 的帮助,您可以将精力集中在如何分析和处理数据结果上。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档