前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >使用Kafka SQL Windowing进行自定义分区和分析

使用Kafka SQL Windowing进行自定义分区和分析

作者头像
游城
发布2018-06-01 10:24:17
1.7K0
发布2018-06-01 10:24:17

Apache Kafka利用循环技术为多个分区生产信息。其中自定义分区技术常用于为已经定义好的分区生产特定类型的信息,并使生产出来的信息能被特定类型的消费者使用。这种技术使我们能够掌控信息的生成和使用。Windowing使用基于时间限制的事件时间驱动分析以及数据分组。有三种不同的Windowing方式,分别是Tumbling,Session和Hopping。

在本文中,我们将通过下列方式讨论如何处理Citi Bike(美国的共享单车)的骑行数据:

  • 使用自定义分区技术根据用户类型来划分行程数据。
  • 使用Kafka SQL Windowing在数据流中分析行程的详细信息。

准备工作

安装以下内容:

  • Scala
  • Java
  • Kafka
  • Confluent
  • KSQL

数据描述

使用Citi Bike公司在2017年3月的骑行数据作为源数据。这些数据包含诸如行程持续时间,开始时间,停止时间,站台名称,站台ID,站台纬度和站台经度等基本信息。

示例数据:

用例

  • 通过根据用户类型(普通用户或已经订阅的用户)的不同来划分信息,再将Citi Bike的骑行数据按这种划分分别传送给两个不同的代理。
  • 使用Kafka SQL Windowing的三种不同方法来分析以下信息:
    • 使用Window Tumbling来分析特定时间范围内的行程数量。
    • 使用Hopping Window来分析一定前进时间间隔的行程数量。
    • 使用Session Window来分析一定会话时间间隔的行程数量。

概要

  • 设置Kafka集群。
  • 使用自定义分区技术来生成并使用行程的详细信息。
  • 创建行程数据流。
  • 使用Window Tumbling执行流式分析。
  • 使用Window Session执行流式分析。
  • 使用Window Hopping执行流式分析。

设置Kafka集群

如果你要通过更改集群的代理端口的方法在一台服务器上设置集群,请执行以下步骤:

  • 在默认端口2181上运行ZooKeeper。ZooKeeper数据默认存储在路径/ tmp / data中。
  • 将默认路径(/ tmp / data)换成具有足够内存空间的其他路径,以满足生产和消费的需要。
  • 编辑根目录下的etc / kafka / zookeeper.properties中的zookeeper.properties文件中的ZooKeeper配置,如下图所示:
  • 使用以下命令启动ZooKeeper:
代码语言:txt
复制
./bin/zookeeper-server-start etc/kafka/zookeeper.properties

您可以在下面的图片中看到ZooKeeper启动后的一些信息:

  • 在端口9092中运行默认的Kafka代理并将代理ID设置为0,这样就启动了集群中的第一个代理。默认的日志存储路径是/ tmp / kafka-logs。
  • 在默认的日志存储路径下编辑日志文件(/ tmp / kafka-logs)并编辑根目录下的server.properties文件,配置第一个代理。 输入命令 vi etc / kafka / server.properties(启动vi编辑器来读写文件)
  • 使用以下命令启动代理:
代码语言:txt
复制
./bin/kafka-server-start etc/kafka/server.properties

您可以使用代理ID 0和端口9092查看第一个代理的启动信息:

  • 复制etc / kafka /下的server.properties文件并改名为server1.properties,这样就启动了集群中的第二个代理。
  • 编辑server1.properties文件进行第二个代理的配置 输入命令 vi etc / kafka / server1.properties。
  • 使用以下命令启动代理:
代码语言:txt
复制
./bin/kafka-server-start etc/kafka/server1.properties

您可以使用代理ID 1和端口9093查看第二个代理的启动信息:

  • 使用以下命令来列出集群中可被使用的代理:
代码语言:txt
复制
./bin/zookeeper-shell localhost:2181 ls /brokers/ids

运行命令后,可以看到如下图所示的信息。

上面我们是让两个代理在同一个节点上启动。如果让代理在不同节点上启用的话,就可以更快地并行地去处理信息,而且我们还可以通过在不同节点上的存储器之间共享信息的方式来实现信息的复用,这样可以解决内存不够的问题。

使用自定义分区技术生成和使用行程的详细信息

若要使用自定义分区技术生成和使用行程的详细信息,请执行以下步骤:

  • 使用下面的命令创建具有两个分区的行程数据主题:
代码语言:txt
复制
./bin/kafka-topics --create --zookeeper localhost:2181 --topic trip-data --replication-factor 1 --partitions 1
  • 查看该主题各个分区的代理。

您可以看到代理0负责分区0的信息传输,代理1负责分区1的信息传输,如下图所示

  • 使用自定义分区技术来生产信息。
  • 通过使用以下命令来覆盖分区器接口,创建CustomPartitioner类:
代码语言:txt
复制
override def partition(topic: String, key: Any, keyBytes: Array[Byte], value: Any, valueBytes: Array[Byte], cluster: Cluster): Int = {
 var partition = 0
 val keyInt = Integer.parseInt(key.asInstanceOf[String])
 val tripData = value.asInstanceOf[String]
 //从产生的信息中得到用户类型
 val userType = tripData.split(",")(12)
 //根据用户类型将分区和信息进行匹配
 if ("Subscriber".equalsIgnoreCase(userType)) {
  partition = 0;
 } else if ("Customer".equalsIgnoreCase(userType)) {
  partition = 1;
 }
 println("Partition for message " + value + " is " + partition)
 partition
}

您可以查看匹配到分区0中的Subscriber类型用户的信息,并将Customer类型用户的信息存储到分区1。

  • 在生产者属性中定义CustomPartitioner类,如下所示:
代码语言:txt
复制
//将信息拆分到特定的分区
props.put("partitioner.class", "com.treselle.core.CustomPartitioner");
  • 通过消费者匹配到的分区类型来定义消费者的分区主题,如下所示:
代码语言:txt
复制
val topicPartition = new TopicPartition(TOPIC,partition)
consumer.assign(Collections.singletonList(topicPartition))
  • 当同时有多个消费者,并且每个消费者接收不同的分区的信息时,可以将分区类型作为消费者的一个属性。
  • 用不同的分区服务多个消费者。
  • 使用以下命令启动Consumer1:
代码语言:txt
复制
java -cp custom_partitioner.jar com.treselle.core.ConsumerBasedOnPartition trip-data localhost:9092 0
  • 使用以下命令启动Consumer2:
代码语言:txt
复制
java -cp custom_partitioner.jar com.treselle.core.ConsumerBasedOnPartition trip-data localhost:9092 1
  • 使用以下命令启动自定义分区程序来生产行程的详细信息:
代码语言:txt
复制
java –cp custom_partitioner.jar com.treselle.core.CustomPartionedProducer trip-data localhost:9092

您可以单独查看消费者1拥有的来自分区0的Subscriber类型的信息,也单独查看消费者2拥有的来自分区1的Customer类型的信息。

Consumer1:

Consumer2:

  • 在两个消费者使用了所有的信息后,检查代理中的内存。其中包括代理之间共享的内存以及代理的日志文件所占用的内存。如下图所示:

这里,Customer类型的信息被代理localhost:9092使用,Subscriber类型的信息被代理localhost:9093使用。由于Customer类型的信息较少,因此其在kafka-logs(localhost:9092)中占用的内存相对就较少。

创建行程数据流

在KSQL中,并不选择使用那些基于分区的信息。而是从指定主题的所有分区中取出信息,用来创建流或表。要创建行程数据流,请执行以下步骤:

  • 使用Window processing的条件分离Subscriber类型和Customer类型的数据。
  • 使用以下命令生成行程数据并用来创建存放数据的trip_data_stream:
代码语言:txt
复制
CREATE STREAM
trip_data_stream
(
tripduration BIGINT,
starttime VARCHAR,
stoptime VARCHAR,
start_station_id BIGINT,
start_station_name VARCHAR,
start_station_latitude DOUBLE,
start_station_longitude DOUBLE,
end_station_id BIGINT,
end_station_name VARCHAR,
end_station_latitude DOUBLE,
end_station_longitude DOUBLE,
bikeid INT,usertype VARCHAR,
birth_year VARCHAR,
gender VARCHAR
)
WITH
(
kafka_topic='trip-data',
value_format='DELIMITED'
);
  • 根据行程的开始时间提取Unix TIMESTAMP进行Windowing。
  • 根据行程的开始时间而不是信息的生成时间来将提取的Unix TIMESTAMP设置为数据流的属性。
  • 使用以下命令查找subscriber的行程详细信息,使用提取的Unix TIMESTAMP和subscriber的信息创建流,:
代码语言:txt
复制
CREATE STREAM
subscribers_trip_data_stream
WITH
(
TIMESTAMP='startime_timestamp',
PARTITIONS=2
) AS
select
STRINGTOTIMESTAMP(starttime, 'yyyy-MM-dd HH:mm:ss') AS startime_timestamp,
tripduration,
starttime,
usertype
FROM TRIP_DATA_STREAM
where usertype='Subscriber';

使用Window Tumbling来执行流式分析

Window Tumbling将给定时间间隔内的数据分组到大小固定的不重叠的窗口中。它被用于在一定时间间隔内对流进行异常检测。如下图,以5分钟的时间间隔为例进行分析。

要以五分钟的时间为间隔查找Subscribers开始的行程数,请执行以下命令:

代码语言:txt
复制
SELECT
COUNT(*),
starttime
FROM subscribers_trip_data_stream
WINDOW TUMBLING (SIZE 5 MINUTE)
GROUP BY usertype;

从上述结果可以看出,第四分钟结束时已经开始了19次车程,第九分钟结束时开始了25次车程,第14分钟结束时开始了26次车程。由此可以看出在每个给定的时间间隔内所开始的行程都被记录了下来。

使用Window Session执行流式分析

在Window session中,数据被分组在特定的Session中。例如,如果设置一个1分钟的Session,并且在一分钟的时间间隔内让数据不可用,则会开始一个新的Session来进行数据分组。如下图所示,以一分钟的Session为例进行分析:

要将特定Session中的用户的行程详细信息进行分组,请使用以下命令将Session的间隔设置为20秒:

代码语言:txt
复制
SELECT
count(*),
starttime
FROM subscribers_trip_data_stream
WINDOW SESSION (20 SECOND)
GROUP BY usertype;

从上图可以看出,数据分组是在特定时间间隔的Session中进行的。当数据在20秒的时间间隔内不可用时,就会开始一个新的Session来进行数据分组。

例如00:01:09到00:01:57之间的时间间隔。在00:01:09和00:01:33之间的时间间隔内,您可以查看20秒或者更长的时间间隔内的内容。由此可以看出行程的数量在增加。在00:01:33和00:01:57之间的间隔内,您可以查看超过20秒间隔的无活动状态。由此可以看出在第57秒时开始了新的Session。

使用Window Hopping执行流分析

在Window Hopping中,通过前进给定的时间间隔,将数据按给定的时间间隔分组到重叠的窗口中。如下图所示,以前进间隔为一分钟、工作时间为五分钟的行程信息为例进行分析:

要将前进间隔为一分钟的五分钟行程详细信息分组,请执行以下Hopping Window 分析命令:

代码语言:txt
复制
SELECT
count(*),
starttime
FROM subscribers_trip_data_stream
WINDOW HOPPING (SIZE 5 MINUTE, ADVANCE BY 1 MINUTE)
GROUP BY usertype;

从上面的图表可以看出,每个记录的五个条目在5分钟的时间内被使用并且其前进时间间隔为一分钟。条目大小根据时间间隔大小和给定的前进间隔的变化而变化。

在上面的例子中,以一个00:02:12的时间记录场景为例,用5分钟的时间检查Hopping的工作情况,并将前进时间间隔设为一分钟。00:02:12场景有5个条目,其中旅行计数为7,7,7,6,1。在两分钟内,前三项只有两个1分钟的前进间隔。00:00:00至00:02:12的时间间隔内开始了七次行程。第四项前进了一分钟。00:01:00至00:02:12的时间间隔内有六次行程记录,第五次的时候进入了另一个一分钟的前进间隔。由此可以看出从00:02:00到00:02:12的时间里只有一次行程被分析了。

参考

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 准备工作
  • 数据描述
  • 用例
  • 概要
  • 设置Kafka集群
  • 使用自定义分区技术生成和使用行程的详细信息
  • 创建行程数据流
  • 使用Window Tumbling来执行流式分析
  • 使用Window Session执行流式分析
  • 使用Window Hopping执行流分析
  • 参考
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档