前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink实现WordCount(实操详细步骤)

Flink实现WordCount(实操详细步骤)

作者头像
bboy枫亭
发布2020-09-22 11:00:30
3.7K0
发布2020-09-22 11:00:30
举报
文章被收录于专栏:csdn_blog

本文使用 Flink 的两种方式实现 WordCount

  1. 基于流计算
  2. 基于批计算

文章目录
  • 1. Idea 新建 Maven 项目并配置以下依赖
  • 2. 实现代码及详细注释
    • 2.1 Flink 基于流计算实现 WordCount
    • 2.2 Flink 基于批计算实现 WordCount
    • 2.3 附件:完整代码

先说一下我的环境: Flink 1.9 开发工具:Idea Maven版本:3.3.9 Linux:CentOS 7 演示语言:Scala 2.11

1. Idea 新建 Maven 项目并配置以下依赖

代码语言:javascript
复制
<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-scala_2.11</artifactId>
        <version>1.9.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_2.11</artifactId>
        <version>1.9.1</version>
    </dependency>
</dependencies>

2. 实现代码及详细注释

2.1 Flink 基于流计算实现 WordCount

案例需求:采用 Netcat 数据源发送数据,使用Flink统计每个单词的数量

Idea执行代码 –> 打开 Linux 使用 nc(netcat)命令发送数据测试 nc -lk 8888

2.2 Flink 基于批计算实现 WordCount

需求:读取本地数据文件,统计文件中每个单词出现的次数

wc.txt文件的内容

代码语言:javascript
复制
hadoop hbase hello
hello hadoop apache apache
flink hello

执行代码结果

2.3 附件:完整代码

代码语言:javascript
复制
package com.bigdataBC.flink

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

/**
  * 基于流计算的WordCount案例
  */
object WordCountBySrteaming {
  def main(args: Array[String]): Unit = {
    // 初始化Flink的Streaming(流计算)上下文执行环境
    val streamEvn: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //导入隐式转换,建议写在这里,可以防止IDEA代码提示出错的问题
    import org.apache.flink.streaming.api.scala._

    // 设置默认的分区(分区优先级:先找单独设置的分区,若没有就用默认的)
    streamEvn.setParallelism(1)

    // 读取流数据
    val ds: DataStream[String] = streamEvn.socketTextStream("node1",8888)
    // 转换计算
    val result: DataStream[(String, Int)] = ds.flatMap(_.split(" "))
      .map((_, 1))
      .setParallelism(2) //设置单独的分区
      .keyBy(0) // 分组:必须制定根据哪个字段分组,参数代表当前要分组的字段的下标(另外还有fieldsname)
      .sum(1) // 1代表下标,下标为1的进行累加

    //打印结果到控制台
    result.print()
      .setParallelism(4) //设置单独的分区
    //启动流式处理,如果没有该行代码上面的程序不会运行
    streamEvn.execute("wordcount")

  }
}
代码语言:javascript
复制
package com.bigdataBC.flink

import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}

/**
  * 基于批计算的WordCount案例
  */
object WordCountByBatch {
  def main(args: Array[String]): Unit = {
    // 初始化Flink批计算环境、
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment

    // 导入隐式转换
    import org.apache.flink.api.scala._

    // 设置默认的分区
//    env.setParallelism(1)

    // 读取数据
    val ds: DataSet[String] = env.readTextFile("D:\\workspace\\Idea-workspace\\Flinkdemo\\src\\main\\resources\\wc.txt")

    // 转换计算
    val result: AggregateDataSet[(String, Int)] = ds.flatMap(_.split(" "))
      .map((_, 1))
      .groupBy(0)
      .sum(1)

    // 打印(这里的print不能设置分区)
    result.print()

  }
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020/08/10 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 文章目录
  • 1. Idea 新建 Maven 项目并配置以下依赖
  • 2. 实现代码及详细注释
    • 2.1 Flink 基于流计算实现 WordCount
      • 2.2 Flink 基于批计算实现 WordCount
        • 2.3 附件:完整代码
        相关产品与服务
        大数据
        全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档