前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink实战:消费Wikipedia实时消息

Flink实战:消费Wikipedia实时消息

作者头像
程序员欣宸
发布2019-06-02 00:35:29
8200
发布2019-06-02 00:35:29
举报
文章被收录于专栏:实战docker

版权声明:欢迎转载,请注明出处,谢谢。

https://blog.csdn.net/boling_cavalry/article/details/85205622

关于Wikipedia Edit Stream

Wikipedia Edit Stream是Flink官网提供的一个经典demo,该应用消费的消息来自维基百科,消息中包含了用户名对wiki的编辑情况,demo的官方资料地址:https://ci.apache.org/projects/flink/flink-docs-release-1.2/quickstart/run_example_quickstart.html

消息来源

消息的DataSource是个名为WikipediaEditsSource的类,这里面建立了到irc.wikimedia.org的Socker连接,再通过Internet Relay Chat (IRC) 协议接收对方的数据,收到数据后保存在阻塞队列中,通过一个while循环不停的从队列取出数据,再调用SourceContext的collect方法,就在Flink中将这条数据生产出来了;

IRC是应用层协议,更多细节请看:https://en.wikipedia.org/wiki/Internet_Relay_Chat

关于WikipediaEditsSource类的深入分析,请参考《Flink数据源拆解分析(WikipediaEditsSource)》

实战简介

本次实战就是消费上述消息,然后统计每个用户十五秒内所有的消息,将每次操作的字节数累加起来,就得到用户十五秒内操作的字节数总和,并且每次累加了多少都会记录下来并最终和聚合结果一起展示;

和官网demo的不同之处

和官网的demo略有不同,官网用的是Tuple2来处理数据,但我这里用了Tuple3,多保存了一个StringBuilder对象,用来记录每次聚合时加了哪些值,这样在结果中通过这个字段就能看出来这个时间窗口内每个用户做了多少次聚合,每次是什么值:

环境信息

Flink:1.7;

运行模式:单机(官网称之为Local Flink Cluster);

Flink所在机器的操作系统:CentOS Linux release 7.5.1804;

开发环境JDK:1.8.0_181;

开发环境Maven:3.5.0;

操作步骤简介

今天的实战分为以下步骤:

  1. 创建应用;
  2. 编码;
  3. 构建;
  4. 部署运行;

创建应用

  1. 应用基本代码是通过mvn命令创建的,在命令行输入以下命令:
代码语言:txt
复制
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.7.0
  1. 按控制台的提示输入groupId、artifactId、version、package等信息,一路回车确认后,会生成一个和你输入的artifactId同名的文件夹(我这里是wikipediaeditstreamdemo),里面是个maven工程:
代码语言:txt
复制
Define value for property 'groupId': com.bolingcavalry
Define value for property 'artifactId': wikipediaeditstreamdemo
Define value for property 'version' 1.0-SNAPSHOT: :
Define value for property 'package' com.bolingcavalry: :
Confirm properties configuration:
groupId: com.bolingcavalry
artifactId: wikipediaeditstreamdemo
version: 1.0-SNAPSHOT
package: com.bolingcavalry
 Y: :
  1. 用IEDA导入这个maven工程,如下图,已经有了两个类:BatchJob和StreamingJob,BatchJob是用于批处理的,本次实战用不上,因此可以删除,只保留流处理的StreamingJob:

应用创建成功,接下来可以开始编码了;

编码

您可以选择直接从GitHub下载这个工程的源码,地址和链接信息如下表所示:

名称

链接

备注

项目主页

该项目在GitHub上的主页

git仓库地址(https)

该项目源码的仓库地址,https协议

git仓库地址(ssh)

git@github.com:zq2599/blog_demos.git

该项目源码的仓库地址,ssh协议

这个git项目中有多个文件夹,本章源码在wikipediaeditstreamdemo这个文件夹下,如下图红框所示:

接下来开始编码:

  1. 在pom.mxl文件中增加wikipedia相关的库依赖:
代码语言:txt
复制
<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-connector-wikiedits_2.11</artifactId>
	<version>${flink.version}</version>
</dependency>
  1. 在类中增加代码,如下所示,源码中已加详细注释:
代码语言:txt
复制
package com.bolingcavalry;

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;

public class StreamingJob {

	public static void main(String[] args) throws Exception {
		// 环境信息
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		env.addSource(new WikipediaEditsSource())
				//以用户名为key分组
				.keyBy((KeySelector<WikipediaEditEvent, String>) wikipediaEditEvent -> wikipediaEditEvent.getUser())
				//时间窗口为5秒
				.timeWindow(Time.seconds(15))
				//在时间窗口内按照key将所有数据做聚合
				.aggregate(new AggregateFunction<WikipediaEditEvent, Tuple3<String, Integer, StringBuilder>, Tuple3<String, Integer, StringBuilder>>() {
					@Override
					public Tuple3<String, Integer, StringBuilder> createAccumulator() {
						//创建ACC
						return new Tuple3<>("", 0, new StringBuilder());
					}

					@Override
					public Tuple3<String, Integer, StringBuilder> add(WikipediaEditEvent wikipediaEditEvent, Tuple3<String, Integer, StringBuilder> tuple3) {

						StringBuilder sbud = tuple3.f2;

						//如果是第一条记录,就加个"Details :"作为前缀,
						//如果不是第一条记录,就用空格作为分隔符
						if(StringUtils.isBlank(sbud.toString())){
							sbud.append("Details : ");
						}else {
							sbud.append(" ");
						}

						//聚合逻辑是将改动的字节数累加
						return new Tuple3<>(wikipediaEditEvent.getUser(),
								wikipediaEditEvent.getByteDiff() + tuple3.f1,
								sbud.append(wikipediaEditEvent.getByteDiff()));
					}

					@Override
					public Tuple3<String, Integer, StringBuilder> getResult(Tuple3<String, Integer, StringBuilder> tuple3) {
						return tuple3;
					}

					@Override
					public Tuple3<String, Integer, StringBuilder> merge(Tuple3<String, Integer, StringBuilder> tuple3, Tuple3<String, Integer, StringBuilder> acc1) {
						//合并窗口的场景才会用到
						return new Tuple3<>(tuple3.f0,
								tuple3.f1 + acc1.f1, tuple3.f2.append(acc1.f2));
					}
				})
				//聚合操作后,将每个key的聚合结果单独转为字符串
				.map((MapFunction<Tuple3<String, Integer, StringBuilder>, String>) tuple3 -> tuple3.toString())
				//输出方式是STDOUT
				.print();

		// 执行
		env.execute("Flink Streaming Java API Skeleton");
	}
}

至此编码结束;

构建

  1. 在pom.xml文件所在目录下执行命令:
代码语言:txt
复制
mvn clean package -U
  1. 命令执行完毕后,在target目录下的wikipediaeditstreamdemo-1.0-SNAPSHOT.jar文件就是构建成功的jar包;

在Flink验证

  1. Flink的安装和启动请参考《Flink1.7从安装到体验》
  2. 我这边Flink所在机器的IP地址是192.168.1.103,因此用浏览器访问的Flink的web地址为:http://192.168.1.103:8081;
  3. 选择刚刚生成的jar文件作为一个新的任务,如下图:
  1. 点击下图红框中的"upload",将文件提交:
  1. 目前还只是将jar文件上传了而已,接下来就是手工设置执行类并启动任务,操作如下图,红框2中填写的前面编写的StreamingJob类的完整名称:
  1. 提交后的页面效果如下图所示,可见一个job已经在运行中了:
  1. 接下来看看我们的job的执行效果,如下图,以用户名聚合后的字数统计已经被打印出来了,并且Details后面的内容还展示了具体的聚合情况:

至此,一个实施处理的Flink应用就开发完成了,希望能给您的开发过程提供一些参考,后面的实战中咱们一起继续深入学习和探讨Flink;

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018年12月22日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 关于Wikipedia Edit Stream
  • 消息来源
  • 实战简介
  • 和官网demo的不同之处
  • 环境信息
  • 操作步骤简介
  • 创建应用
  • 编码
  • 构建
  • 在Flink验证
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档