Apache Kafka利用循环技术为多个分区生产信息。其中自定义分区技术常用于为已经定义好的分区生产特定类型的信息,并使生产出来的信息能被特定类型的消费者使用。这种技术使我们能够掌控信息的生成和使用。Windowing使用基于时间限制的事件时间驱动分析以及数据分组。有三种不同的Windowing方式,分别是Tumbling,Session和Hopping。
在本文中,我们将通过下列方式讨论如何处理Citi Bike(美国的共享单车)的骑行数据:
安装以下内容:
使用Citi Bike公司在2017年3月的骑行数据作为源数据。这些数据包含诸如行程持续时间,开始时间,停止时间,站台名称,站台ID,站台纬度和站台经度等基本信息。
示例数据:
如果你要通过更改集群的代理端口的方法在一台服务器上设置集群,请执行以下步骤:
./bin/zookeeper-server-start etc/kafka/zookeeper.properties
您可以在下面的图片中看到ZooKeeper启动后的一些信息:
./bin/kafka-server-start etc/kafka/server.properties
您可以使用代理ID 0和端口9092查看第一个代理的启动信息:
./bin/kafka-server-start etc/kafka/server1.properties
您可以使用代理ID 1和端口9093查看第二个代理的启动信息:
./bin/zookeeper-shell localhost:2181 ls /brokers/ids
运行命令后,可以看到如下图所示的信息。
上面我们是让两个代理在同一个节点上启动。如果让代理在不同节点上启用的话,就可以更快地并行地去处理信息,而且我们还可以通过在不同节点上的存储器之间共享信息的方式来实现信息的复用,这样可以解决内存不够的问题。
若要使用自定义分区技术生成和使用行程的详细信息,请执行以下步骤:
./bin/kafka-topics --create --zookeeper localhost:2181 --topic trip-data --replication-factor 1 --partitions 1
您可以看到代理0负责分区0的信息传输,代理1负责分区1的信息传输,如下图所示
override def partition(topic: String, key: Any, keyBytes: Array[Byte], value: Any, valueBytes: Array[Byte], cluster: Cluster): Int = {
var partition = 0
val keyInt = Integer.parseInt(key.asInstanceOf[String])
val tripData = value.asInstanceOf[String]
//从产生的信息中得到用户类型
val userType = tripData.split(",")(12)
//根据用户类型将分区和信息进行匹配
if ("Subscriber".equalsIgnoreCase(userType)) {
partition = 0;
} else if ("Customer".equalsIgnoreCase(userType)) {
partition = 1;
}
println("Partition for message " + value + " is " + partition)
partition
}
您可以查看匹配到分区0中的Subscriber类型用户的信息,并将Customer类型用户的信息存储到分区1。
//将信息拆分到特定的分区
props.put("partitioner.class", "com.treselle.core.CustomPartitioner");
val topicPartition = new TopicPartition(TOPIC,partition)
consumer.assign(Collections.singletonList(topicPartition))
java -cp custom_partitioner.jar com.treselle.core.ConsumerBasedOnPartition trip-data localhost:9092 0
java -cp custom_partitioner.jar com.treselle.core.ConsumerBasedOnPartition trip-data localhost:9092 1
java –cp custom_partitioner.jar com.treselle.core.CustomPartionedProducer trip-data localhost:9092
您可以单独查看消费者1拥有的来自分区0的Subscriber类型的信息,也单独查看消费者2拥有的来自分区1的Customer类型的信息。
Consumer1:
Consumer2:
这里,Customer类型的信息被代理localhost:9092使用,Subscriber类型的信息被代理localhost:9093使用。由于Customer类型的信息较少,因此其在kafka-logs(localhost:9092)中占用的内存相对就较少。
在KSQL中,并不选择使用那些基于分区的信息。而是从指定主题的所有分区中取出信息,用来创建流或表。要创建行程数据流,请执行以下步骤:
CREATE STREAM
trip_data_stream
(
tripduration BIGINT,
starttime VARCHAR,
stoptime VARCHAR,
start_station_id BIGINT,
start_station_name VARCHAR,
start_station_latitude DOUBLE,
start_station_longitude DOUBLE,
end_station_id BIGINT,
end_station_name VARCHAR,
end_station_latitude DOUBLE,
end_station_longitude DOUBLE,
bikeid INT,usertype VARCHAR,
birth_year VARCHAR,
gender VARCHAR
)
WITH
(
kafka_topic='trip-data',
value_format='DELIMITED'
);
CREATE STREAM
subscribers_trip_data_stream
WITH
(
TIMESTAMP='startime_timestamp',
PARTITIONS=2
) AS
select
STRINGTOTIMESTAMP(starttime, 'yyyy-MM-dd HH:mm:ss') AS startime_timestamp,
tripduration,
starttime,
usertype
FROM TRIP_DATA_STREAM
where usertype='Subscriber';
Window Tumbling将给定时间间隔内的数据分组到大小固定的不重叠的窗口中。它被用于在一定时间间隔内对流进行异常检测。如下图,以5分钟的时间间隔为例进行分析。
要以五分钟的时间为间隔查找Subscribers开始的行程数,请执行以下命令:
SELECT
COUNT(*),
starttime
FROM subscribers_trip_data_stream
WINDOW TUMBLING (SIZE 5 MINUTE)
GROUP BY usertype;
从上述结果可以看出,第四分钟结束时已经开始了19次车程,第九分钟结束时开始了25次车程,第14分钟结束时开始了26次车程。由此可以看出在每个给定的时间间隔内所开始的行程都被记录了下来。
在Window session中,数据被分组在特定的Session中。例如,如果设置一个1分钟的Session,并且在一分钟的时间间隔内让数据不可用,则会开始一个新的Session来进行数据分组。如下图所示,以一分钟的Session为例进行分析:
要将特定Session中的用户的行程详细信息进行分组,请使用以下命令将Session的间隔设置为20秒:
SELECT
count(*),
starttime
FROM subscribers_trip_data_stream
WINDOW SESSION (20 SECOND)
GROUP BY usertype;
从上图可以看出,数据分组是在特定时间间隔的Session中进行的。当数据在20秒的时间间隔内不可用时,就会开始一个新的Session来进行数据分组。
例如00:01:09到00:01:57之间的时间间隔。在00:01:09和00:01:33之间的时间间隔内,您可以查看20秒或者更长的时间间隔内的内容。由此可以看出行程的数量在增加。在00:01:33和00:01:57之间的间隔内,您可以查看超过20秒间隔的无活动状态。由此可以看出在第57秒时开始了新的Session。
在Window Hopping中,通过前进给定的时间间隔,将数据按给定的时间间隔分组到重叠的窗口中。如下图所示,以前进间隔为一分钟、工作时间为五分钟的行程信息为例进行分析:
要将前进间隔为一分钟的五分钟行程详细信息分组,请执行以下Hopping Window 分析命令:
SELECT
count(*),
starttime
FROM subscribers_trip_data_stream
WINDOW HOPPING (SIZE 5 MINUTE, ADVANCE BY 1 MINUTE)
GROUP BY usertype;
从上面的图表可以看出,每个记录的五个条目在5分钟的时间内被使用并且其前进时间间隔为一分钟。条目大小根据时间间隔大小和给定的前进间隔的变化而变化。
在上面的例子中,以一个00:02:12的时间记录场景为例,用5分钟的时间检查Hopping的工作情况,并将前进时间间隔设为一分钟。00:02:12场景有5个条目,其中旅行计数为7,7,7,6,1。在两分钟内,前三项只有两个1分钟的前进间隔。00:00:00至00:02:12的时间间隔内开始了七次行程。第四项前进了一分钟。00:01:00至00:02:12的时间间隔内有六次行程记录,第五次的时候进入了另一个一分钟的前进间隔。由此可以看出从00:02:00到00:02:12的时间里只有一次行程被分析了。