前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >001. Flink入门案例-WordCount实时处理

001. Flink入门案例-WordCount实时处理

作者头像
CoderJed
发布2019-07-25 16:56:41
5.6K1
发布2019-07-25 16:56:41
举报
文章被收录于专栏:Jed的技术阶梯Jed的技术阶梯

1. maven依赖

代码语言:javascript
复制
<properties>
    <!-- flink版本好 -->
    <flink.version>1.8.1</flink.version>
    <!-- scala主版本号 -->
    <scala.binary.version>2.11</scala.binary.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-scala_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
</dependencies>

2. Flink WordCount Java版

代码语言:javascript
复制
package com.bairong.flink.java;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
 * author: YangYunhe
 * date: 2019/7/22 19:32
 * description: 每隔1s对过去2s的数据进行WordCount
 */
public class SocketWindowWordCountJava {

    public static class WordWithCount {

        public String word;
        public long count;

        public WordWithCount() {}

        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return word + ": " + count;
        }
    }

    public static void main(String[] args) throws Exception {

        // 1. 解析外部参数,获取要监听的主机、端口,没有配置则取默认值localhost:9999
        ParameterTool tool = ParameterTool.fromArgs(args);
        String host = tool.get("host", "localhost");
        int port = tool.getInt("port", 9999);

        // 2. 获取Flink的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 3. 初始化数据
        DataStreamSource<String> source = env.socketTextStream(host, port);

        // 4. 计算,扁平化,每个单次计数为1,分组,累加次数
        SingleOutputStreamOperator<WordWithCount> counts = source.flatMap(new FlatMapFunction<String, WordWithCount>() {
            @Override
            public void flatMap(String line, Collector<WordWithCount> collector) throws Exception {
                String[] words = line.split("\\s+");
                for(String word : words) {
                    collector.collect(new WordWithCount(word, 1L));
                }

            }
        }).keyBy("word")
        .timeWindow(Time.seconds(2), Time.seconds(1))
        .reduce(new ReduceFunction<WordWithCount>() {
            @Override
            public WordWithCount reduce(WordWithCount word1, WordWithCount word2) throws Exception {
                return new WordWithCount(word1.word, word1.count + word2.count);
            }
        });
        /*
         * 如果只是简单的相加,可以直接使用sum()方法
         * .keyBy("word").timeWindow(Time.seconds(2), Time.seconds(1)).sum("count")
         */

        // 5. 打印结果,设置并行度
        counts.print().setParallelism(1);

        // 6. 开启流任务,这是一个action算子,将触发计算
        env.execute("SocketWindowWordCountJava");

    }

}

3. 运行代码

(1) 设置参数,指定host和port

(2) 在服务器上执行 nc 命令

(3) 运行程序

(4) 在服务器上输出一些单词,在程序控制台查看结果

Flink WordCount 程序Java版就完成咯。

4. Flink WordCount Scala版

代码语言:javascript
复制
package com.bairong.flink.scala

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

/**
  * author: YangYunhe
  * date: 2019/7/22 21:14
  * description: Flink WordCount Scala版
  */
object SocketWindowWordCountScala {

  def main(args: Array[String]): Unit = {

    val tool = ParameterTool.fromArgs(args)
    val host = tool.get("host", "localhost")
    val port = tool.getInt("port", 9999)

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val source: DataStream[String] = env.socketTextStream(host, port)

    source.flatMap(_.split("\\s+"))
      .map((_, 1))
      .keyBy(0)
      .timeWindow(Time.seconds(2), Time.seconds(1))
      .sum(1)
      .print()
      .setParallelism(1)

    env.execute("SocketWindowWordCountScala")

  }

}

测试过程同上。

有一个注意点是,scala API的类要全部导入:

代码语言:javascript
复制
import org.apache.flink.streaming.api.scala._

否则代码编译会报错:

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019.07.24 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. maven依赖
  • 2. Flink WordCount Java版
  • 3. 运行代码
    • (1) 设置参数,指定host和port
      • (2) 在服务器上执行 nc 命令
        • (3) 运行程序
          • (4) 在服务器上输出一些单词,在程序控制台查看结果
          • 4. Flink WordCount Scala版
          相关产品与服务
          大数据
          全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档