Kafka源码系列是以kafka 0.8.2.2源码为例讲解。浪尖纯属个人爱好,才写想写这个系列。希望对kafka的使用者们带来帮助。
一,消费者消费的过程讲解
<Kafka源码系列之以kafka为例讲解分布式存储系统>这篇文章已经讲过,在一个分布式存储系统中,客户端访问数据一般会分两个步骤,一个是向元数据服务器获取存储的元数据,另一个则是正式发起数据的访问。对于kafka呢?本讲只是针对SimpleConsumer为例讲解,为啥突出讲解这个呢?只是由于这SimpleConsumer是Broker之间用来同步的方式,而且也是SparkStreaming的Direct Stream这种方式来获取数据的方式。所以,此讲为了后面讲spark也是打下基础。
为了使用SampleConsumer从kafka获取数据,我们需要有以下三个请求步骤:
1,随便选取一个Broker,请求元数据,找到leader。
请求的key为:RequestKeys.MetadataKey。
Broker的处理函数为(kafkaApis):
handleTopicMetadataRequest(request)
此举只是为了获取leader信息,而且要强调的一点是一个SampleConsumer只会向一个Broker的分区请求数据。
2,获取请求数据的偏移
请求的key为:RequestKeys.OffsetsKey
Broker的处理函数为(kafkaApis):
handleOffsetRequest(request)
此举是为指定偏移获取数据打下基础。
3,指定偏移获取数据
请求的key为:RequestKeys.FetchKey。
Broker的处理函数为(kafkaApis):
handleFetchRequest(request)
整个过程通讯过程<Kafka源码系列之Broker的IO服务及业务处理>这个文章里面已经讲过了,SimpleConsumer的方式对于使用者来说比较灵活,方便使用者针对主机消费特定的分区或者指定分区进行消费。假如我们自己设计存储系统的话也客户端访问数据的过程也可采用这种方式。
二,以具体的源码过程讲解SimpleConsumer消费数据的过程
1,请求元数据,获取leader位置的过程
首先是遍历我们给的Broker地址列表,然后构建SimpleConsumer,然后构建topic的元数据请求TopicMetadataRequest,遍历获取到的结果,跟我们指定的分区进行匹配,得到我们指定分区的元数据,等到之后停止遍历Broker地址列表返回。这个是指定分区进行消费消费这端的过程。
for (String seed : a_seedBrokers) { //
SimpleConsumer consumer = null;
try {
SimpleConsumer consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");
List<String> topics = Collections.singletonList(a_topic);
TopicMetadataRequest req = new TopicMetadataRequest(topics);
kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
List<TopicMetadata> metaData = resp.topicsMetadata();
for (TopicMetadata item : metaData) {
for (PartitionMetadata part : item.partitionsMetadata()) {
if (part.partitionId() == a_partition) {
returnMetaData = part;
break loop;
}
}
}
} catch (Exception e) {
System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic
+ ", " + a_partition + "] Reason: " + e);
} finally {
if (consumer != null) consumer.close();
}
}
Broker端的过程是
消息具体接受过程已经讲过了,这里只讲处理函数。
首先是KakaApis里
case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
根据key的类型,调用具体的处理函数重要步骤,获取topic信息,获取存活的Broker构建响应。
val topicMetadata = getTopicMetadata(metadataRequest.topics.toSet)
val brokers = metadataCache.getAliveBrokers
val response = new TopicMetadataResponse(brokers, topicMetadata, metadataRequest.correlationId)
从getTopicMetadata深挖的话,会发现最终构建的一个重要对象是分区的元数据信息,消费者就能够根据这个获取获取到leader信息,向leader请求,偏移信息和具体数据了。
new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError)
2,请求偏移的具体过程
实际过程是,我们SimpleConsumer会根据上步获取到的分区信息,构建偏移请求,然后请求的具体的偏移,这个过程中会涉及到一个配置就是是获取最大偏移,还是最小偏移,还是我们指定的偏移。
kafka.api.OffsetRequest.EarliestTime()
kafka.api.OffsetRequest.LatestTime()
也即是我们常见的auto.offset.reset这个配置的两个值smallest和largest两个配置。当然了,这个偏移使我们可以指定的,比如SparkStreaming的directStreaming这种策略下,我们就需要自己手动维护偏移或者进行Checkpoint,否则的话每次重启它都会采用auto.offset.reset指定的偏移来作为初始偏移去获取数据,造成数据多次消费或者数据丢失。
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError()) {
System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );
return 0;
}
long[] offsets = response.offsets(topic, partition);
此种情况下,kafka的Broker端的处理是
首先,还是根据消息请求的key找到处理函数
case RequestKeys.OffsetsKey => handleOffsetRequest(request)
接着是在处理函数里面调用具体的函数,根据我们的要求去获取偏移
val allOffsets = fetchOffsets(replicaManager.logManager,
topicAndPartition,
partitionOffsetRequestInfo.time,
partitionOffsetRequestInfo.maxNumOffsets)
最终,在fetchOffsetsBefore方法里获取我们上次的偏移。
timestamp match {
case OffsetRequest.LatestTime =>
startIndex = offsetTimeArray.length - 1
case OffsetRequest.EarliestTime =>
startIndex = 0
case _ =>
var isFound = false
debug("Offset time array = " + offsetTimeArray.foreach(o => "%d, %d".format(o._1, o._2)))
startIndex = offsetTimeArray.length - 1
while (startIndex >= 0 && !isFound) {
if (offsetTimeArray(startIndex)._2 <= timestamp)
isFound = true
else
startIndex -=1
}
}
val retSize = maxNumOffsets.min(startIndex + 1)
val ret = new Array[Long](retSize)
for(j <- 0 until retSize) {
ret(j) = offsetTimeArray(startIndex)._1
startIndex -= 1
}
// ensure that the returned seq is in descending order of offsets
ret.toSeq.sortBy(-_)
最后是,构建响应,结束
val response = OffsetResponse(offsetRequest.correlationId, responseMap)
3,根据获得的偏移请求数据的过程
首先是,构建数据请求
FetchRequest req = new FetchRequestBuilder()
.clientId(clientName)
.addFetch(a_topic, a_partition, readOffset, 10000)
.build();
我们可以通过多次调用addFetch,来访问一个Broker的多个分区的数据。
需要fetchSize。最终实际获取数据的大小,会在服务端根据情况返回给消费者。最后就可以遍历读取数据了。
for ( Map.Entry<String, List<Integer>> entry : topicMap.entrySet() ) {
String topic = entry.getKey();
for ( Integer offset : entry.getValue()) {
System.out.println("Response from fetch request no: " + ++fetchReq);
printMessages((ByteBufferMessageSet) fetchResponse.messageSet(topic, offset));
}
}
此种情况下,kafka的Broker端的处理是
首先,还是根据消息请求的key找到处理函数
case RequestKeys.FetchKey => handleFetchRequest(request)
在处理函数里
val dataRead = replicaManager.readMessageSets(fetchRequest)
由于此处调用比较深,最终是在LogSegment的read方法里根据实际数据大小做响应的处理。
def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int): FetchDataInfo = {
if(maxSize < 0)
throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize))
val logSize = log.sizeInBytes // this may change, need to save a consistent copy
val startPosition = translateOffset(startOffset)
// if the start position is already off the end of the log, return null
if(startPosition == null)
return null
val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition.position)
// if the size is zero, still return a log segment but with zero size
if(maxSize == 0)
return FetchDataInfo(offsetMetadata, MessageSet.Empty)
// calculate the length of the message set to read based on whether or not they gave us a maxOffset
val length =
maxOffset match {
case None =>
// no max offset, just use the max size they gave unmolested
maxSize
case Some(offset) => {
// there is a max offset, translate it to a file position and use that to calculate the max read size
if(offset < startOffset)
throw new IllegalArgumentException("Attempt to read with a maximum offset (%d) less than the start offset (%d).".format(offset, startOffset))
val mapping = translateOffset(offset, startPosition.position)
val endPosition =
if(mapping == null)
logSize // the max offset is off the end of the log, use the end of the file
else
mapping.position
min(endPosition - startPosition.position, maxSize)
}
}
FetchDataInfo(offsetMetadata, log.read(startPosition.position, length))
}
三,总结
本文主要是将simpleConsumer从kafka消费数据的过程。在这里在总结一下:
1,获取leader位置。
2,获取上次偏移。
3,请求数据。
其实少了一点,就是容错处理:动态感知leader位置,实现很简单,SimpleConsumer为空进行重新构建SimpleConsumer即可。
从kafka消费数据,还可以总结出亮点:
1,指定分区消费,这个SparkStreaming的directStreaming采用了。
2,消费指定Broker的分区数据。这个适合数据量大,消费者部署在kafka的Broker节点,每台消费者只消费当前Broker上的分区可以减少夸主机流量传输,节省带宽。
熟练掌握SimpleConsumer,对我们了解kafka数据副本同步,和spark Streaming的directStreaming原理有很大帮助,也有利于做我们自己高效的消费者。希望大家能从这篇文章中吸取到又有的知识。