前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >构建第一个Flink应用-WordCount

构建第一个Flink应用-WordCount

作者头像
Eights
发布2020-07-10 11:59:26
3780
发布2020-07-10 11:59:26
举报
文章被收录于专栏:Eights做数据Eights做数据

本篇文章大概5143字,阅读时间大约13分钟

体验flink的hello world

使用maven初始化第一个flink的wordcount应用,将应用打包上传到flink-standalone集群,运行起来。

1

文档编写目的

  • 使用maven生成flink的模板应用
  • 开发wordcount应用

2

构建maven工程

进入模板工程的目录,构建一个maven工程

代码语言:javascript
复制
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.10.1

运行该命令会提示输入maven项目的groupId artifactId version信息,输入即可

将工程导入idea,引入flink-scala的依赖,去除模板项目中java依赖的scope

代码语言:javascript
复制
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-scala_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>

添加scala编译插件

代码语言:javascript
复制
      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.4.6</version>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
              <goal>testCompile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>3.0.0</version>
        <configuration>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
      </plugin>

3

Scala

StreamingWordCount

本地调试
代码语言:javascript
复制
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

object StreamingWordCount {

  val HOST:String = "localhost"
  val PORT:Int = 9001

  /**
   * stream word count
   * @param args input params
   */
  def main(args: Array[String]): Unit = {

    //get streaming env
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //get socket text stream
    val wordsDstream: DataStream[String] = env.socketTextStream(HOST, PORT)

    import org.apache.flink.api.scala._

    //word count
    val wordRes: DataStream[(String, Int)] = wordsDstream.flatMap(_.split(","))
      .filter(_.nonEmpty)
      .map((_, 1))
      .keyBy(0)
      .sum(1)

    wordRes.print()
    
    env.execute("Flink Streaming WordCount!")
  }
}

启动应用,在终端进行socket word输入

代码语言:javascript
复制
nc -lk 9001

终端输入word数据流

streaming应用的控制台中可以看到

streaming word count调试完成

集群运行

按照之前文章中编译的flink-1.10.1的包,启动集群

代码语言:javascript
复制
./bin/start-cluster.sh

访问localhost:8081出现flink-web

在submit new job中上传刚才打包好的应用程序,在maven中package一下就可以,点击submit运行

在终端上输入words,采用逗号分隔

查看task managers中的stdout

BatchWordCount

代码语言:javascript
复制
import org.apache.flink.api.scala.ExecutionEnvironment

object BatchWordCount {

  /**
   * batch word count
   *
   * @param args input params
   */
  def main(args: Array[String]): Unit = {

    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment

    import org.apache.flink.api.scala._

    val words: DataSet[String] = env.fromElements("spark,flink,hbase", "impala,hbase,kudu", "flink,flink,flink")

    //word count
    val wordRes: AggregateDataSet[(String, Int)] = words.flatMap(_.split(","))
      .map((_, 1))
      .groupBy(0)
      .sum(1)

    wordRes.print()
  }
}

运行结果如下:

4

Java

BatchWordCount

代码语言:javascript
复制
package com.eights;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.apache.flink.util.StringUtils;

public class BatchJob {

    public static void main(String[] args) throws Exception {
        // set up the batch execution environment
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSource<String> words = env.fromElements("spark,flink,hbase", "impala,hbase,kudu", "flink,flink,flink");

        AggregateOperator<Tuple2<String, Integer>> wordCount = words.flatMap(new WordLineSplitter())
                .groupBy(0)
                .sum(1);

        wordCount.print();

    }

    public static final class WordLineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

        @Override
        public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) {
            String[] wordsArr = s.split(",");

            for (String word : wordsArr) {
                if (!StringUtils.isNullOrWhitespaceOnly(word)) {
                    collector.collect(new Tuple2<>(word, 1));
                }
            }

        }
    }
}

运行结果

StreamingWordCount

代码语言:javascript
复制
package com.eights;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.flink.util.StringUtils;

public class StreamingJob {

    public static void main(String[] args) throws Exception {
        // set up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        String HOST = "localhost";
        int PORT = 9001;

        DataStreamSource<String> wordsSocketStream = env.socketTextStream(HOST, PORT);

        SingleOutputStreamOperator<Tuple2<String, Integer>> wordRes = wordsSocketStream.flatMap(new WordsLineSplitter())
                .keyBy(0)
                .sum(1);

        wordRes.print();

        // execute program
        env.execute("Flink Streaming Java API Word Count");
    }

    private static class WordsLineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) {
            String[] wordsArr = s.split(",");

            for (String word : wordsArr) {
                if (!StringUtils.isNullOrWhitespaceOnly(word)) {
                    collector.collect(new Tuple2<>(word, 1));
                }
            }
        }
    }
}

运行结果如下

Ps:

编写文档的目的,主要是备忘和记录自己的大数据组件学习路径,记下坑和处理的流程。每周坚持写两篇吧,一年之后回头看自己的大数据之路~

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-06-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Eights做数据 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • StreamingWordCount
    • 本地调试
      • 集群运行
      • BatchWordCount
      • BatchWordCount
      • StreamingWordCount
      相关产品与服务
      批量计算
      批量计算(BatchCompute,Batch)是为有大数据计算业务的企业、科研单位等提供高性价比且易用的计算服务。批量计算 Batch 可以根据用户提供的批处理规模,智能地管理作业和调动其所需的最佳资源。有了 Batch 的帮助,您可以将精力集中在如何分析和处理数据结果上。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档