专栏首页指尖数虫大数据HelloWorld-Flink实现WordCount

大数据HelloWorld-Flink实现WordCount

所有的语言开篇都是Hello Word,数据处理引擎也有Hello Word。那就是Word Count。MR,Spark,Flink以来开篇第一个程序都是Word Count。那么今天Flink开始目标就是在本地调试出Word Count。

开始Flink之前先在本机尝试安装一下Flink,当然FLink正常情况下是部署的集群方式。作者比较穷,机器配置太低开不了几个虚拟机。所以只能先演示个单机的安装。 Apache Flink需要在Java1.8+以上的环境中运行。 所以,先确保自己的JDK版本是1.8包含以上的。

Flink单机部署非常简单,只需安装下载安装即可。如果需要与Hadoop版本结合,那么下载相应的Hadoop关联版本即可。如果不与Hadoop结合就直接下载Scala版即可。我这里就直接下载了Scala2.11的相关版本。

点击进入Apache页面进行下载,大小约有283MB。

把下载下来的压缩包进行解压即可。

打开命令行直接执行 /bin/start-cluster.bat 进行启动。

浏览器打开 http://localhost:8081

至此在Windows10环境下即完成Flink的启动。

编写WordCount

因为Flink是由Scala进行开发的,而Scala是基于JVM的一种语言。所以最终也会转换为JAVA字节码文件,所以Flink程序可以由Java、Scala两种语言都可以进行开发。也可以同时开发。比如Java写一部分代码,Scala写另一部分代码。可以参考<Apache Flink利用Maven对Scala与Java进行混编>。

Flink官方提供快速生成工程的两种工具:SBT与Maven。由于作者比较熟悉Maven,(或者说没用过SBT)。所以直接使用Maven快速创建一个工程。

Java版本

mvn archetype:generate                               \
      -DarchetypeGroupId=org.apache.flink              \
      -DarchetypeArtifactId=flink-quickstart-java      \
      -DarchetypeVersion=1.8.0

Scala版本

mvn archetype:generate                               \
      -DarchetypeGroupId=org.apache.flink              \
      -DarchetypeArtifactId=flink-quickstart-scala     \
      -DarchetypeVersion=1.8.0

按照提示输入相关信息,即可生成最终的项目。

├── pom.xml
└── src
    └── main
        ├── resources
        │   └── log4j.properties
        └── scala/java
            └── org
                └── myorg
                    └── quickstart
                        ├── BatchJob.scala
                        └── StreamingJob.scala

把工程导入到IDEA中 如果使用Scala的话,那么需要安装Scala的插件。搜索安装同时需要把Scala语言包进行安装。不知道如何操作可以联系我 微信公号<指尖数虫>。

package jar;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class BatchJob {

	public static void main(String[] args) throws Exception {
		// set up the batch execution environment
		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		//读取目录下的文件
		DataSource<String> data = env.readTextFile("/opt/Server_Packets/log/ServerLog_1_runtime.log");
		//把文件中的内容按照空格进行拆分为 word,1    1 是为了能够在下面进行计算.
		data.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
			@Override
			public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
				for (String word : s.split(" ")){
					collector.collect(new Tuple2<>(word,1));
				}
			}
		})
		// 按照元组中的第1位进行分组
		.groupBy(0)
		// 分组的元组的计算方式为  value +value  也就是刚才的 同样的词 把 1+1
		.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
			@Override
			public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t2, Tuple2<String, Integer> t1) throws Exception {
				return new Tuple2<>(t1.f0,t1.f1+ t2.f1);
			}
		})
		//输出结果
		.print();
	}
}

本文分享自微信公众号 - 指尖数虫(zhijianshuchong),作者:CainGao

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-08-06

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Apache Flink中的各个窗口时间的概念区分

    “ Apache Flink中提供了基于时间的窗口计算,例如计算五分钟内的用户数量或每一分钟计算之前五分钟的服务器异常日志占比等。因此Apache Flink在...

    CainGao
  • Apache Flink基本编程模型

    “前一篇文章中<一文了解Flink数据-有界数据与无界数据>大致讲解了Apache Flink数据的形态问题。Apache Flink实现分布式集合数据集转换、...

    CainGao
  • Lucene的不同搜索类型及其作用

    “ Lucene对于查询的方式较多,可以实现TermQuery、BooleanQuery、PhraseQuery、 TermRangeQuery等一系列的基于不...

    CainGao
  • 译|通过构建自己的JavaScript测试框架来了解JS测试

    测试(单元或集成)是编程中非常重要的一部分。在当今的软件开发中,单元/功能测试已成为软件开发的组成部分。随着 Nodejs 的出现,我们已经看到了许多超级 JS...

    Dunizb
  • jedis:subscribe(订阅)断线重连(reconnect)

    版权声明:本文为博主原创文章,转载请注明源地址。 https://blog.csdn.net...

    用户1148648
  • 大数据实时处理的王者-Flink

    ​ 不熟悉流处理的同学可以关注下这两篇文章,什么是实时流式计算?https://mp.weixin.qq.com/s/1-rE6aayiDIK0dA0j_E...

    用户6070864
  • linux安装部署Flink

    1.1、官网下载:https://flink.apache.org/downloads.html#apache-flink-172

    猿码优创
  • Flink 1.7.0 安装、配置与使用

    Apache Flink是一个面向分布式数据流处理和批量数据处理的开源计算平台,它能够基于同一个Flink运行时,提供支持流处理和批处理两种类型应用的功能。

    用户1205080
  • Flink快速入门--安装与示例运行

    flink是一款开源的大数据流式处理框架,他可以同时批处理和流处理,具有容错性、高吞吐、低延迟等优势,本文简述flink在windows和linux中安装步骤,...

    用户6070864
  • Flink快速入门--安装与示例运行

    首先要想运行Flink,我们需要下载并解压Flink的二进制包,下载地址如下:https://flink.apache.org/downloads.html

    实时计算

扫码关注云+社区

领取腾讯云代金券