前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kafka源码系列之源码解析SimpleConsumer的消费过程

Kafka源码系列之源码解析SimpleConsumer的消费过程

作者头像
Spark学习技巧
发布2018-01-30 18:15:24
1.4K0
发布2018-01-30 18:15:24
举报
文章被收录于专栏:Spark学习技巧Spark学习技巧

Kafka源码系列是以kafka 0.8.2.2源码为例讲解。浪尖纯属个人爱好,才写想写这个系列。希望对kafka的使用者们带来帮助。

一,消费者消费的过程讲解

<Kafka源码系列之以kafka为例讲解分布式存储系统>这篇文章已经讲过,在一个分布式存储系统中,客户端访问数据一般会分两个步骤,一个是向元数据服务器获取存储的元数据,另一个则是正式发起数据的访问。对于kafka呢?本讲只是针对SimpleConsumer为例讲解,为啥突出讲解这个呢?只是由于这SimpleConsumer是Broker之间用来同步的方式,而且也是SparkStreaming的Direct Stream这种方式来获取数据的方式。所以,此讲为了后面讲spark也是打下基础。

为了使用SampleConsumer从kafka获取数据,我们需要有以下三个请求步骤:

1,随便选取一个Broker,请求元数据,找到leader。

请求的key为:RequestKeys.MetadataKey。

Broker的处理函数为(kafkaApis):

handleTopicMetadataRequest(request)

此举只是为了获取leader信息,而且要强调的一点是一个SampleConsumer只会向一个Broker的分区请求数据。

2,获取请求数据的偏移

请求的key为:RequestKeys.OffsetsKey

Broker的处理函数为(kafkaApis):

handleOffsetRequest(request)

此举是为指定偏移获取数据打下基础。

3,指定偏移获取数据

请求的key为:RequestKeys.FetchKey。

Broker的处理函数为(kafkaApis):

handleFetchRequest(request)

整个过程通讯过程<Kafka源码系列之Broker的IO服务及业务处理>这个文章里面已经讲过了,SimpleConsumer的方式对于使用者来说比较灵活,方便使用者针对主机消费特定的分区或者指定分区进行消费。假如我们自己设计存储系统的话也客户端访问数据的过程也可采用这种方式。

二,以具体的源码过程讲解SimpleConsumer消费数据的过程

1,请求元数据,获取leader位置的过程

首先是遍历我们给的Broker地址列表,然后构建SimpleConsumer,然后构建topic的元数据请求TopicMetadataRequest,遍历获取到的结果,跟我们指定的分区进行匹配,得到我们指定分区的元数据,等到之后停止遍历Broker地址列表返回。这个是指定分区进行消费消费这端的过程。

代码语言:js
复制
for (String seed : a_seedBrokers) { //
 SimpleConsumer consumer = null;
 try {
    SimpleConsumer consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");
 List<String> topics = Collections.singletonList(a_topic);
 TopicMetadataRequest req = new TopicMetadataRequest(topics);
 kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

 List<TopicMetadata> metaData = resp.topicsMetadata();
 for (TopicMetadata item : metaData) {
 for (PartitionMetadata part : item.partitionsMetadata()) {
 if (part.partitionId() == a_partition) {
        returnMetaData = part;
 break loop;
 }
    }
  }
  } catch (Exception e) {
    System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic
      + ", " + a_partition + "] Reason: " + e);
 } finally {
 if (consumer != null) consumer.close();
 }
}

Broker端的过程是

消息具体接受过程已经讲过了,这里只讲处理函数。

首先是KakaApis里

代码语言:js
复制
case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)

根据key的类型,调用具体的处理函数重要步骤,获取topic信息,获取存活的Broker构建响应。

代码语言:js
复制
val topicMetadata = getTopicMetadata(metadataRequest.topics.toSet)
val brokers = metadataCache.getAliveBrokers
val response = new TopicMetadataResponse(brokers, topicMetadata, metadataRequest.correlationId)

从getTopicMetadata深挖的话,会发现最终构建的一个重要对象是分区的元数据信息,消费者就能够根据这个获取获取到leader信息,向leader请求,偏移信息和具体数据了。

代码语言:js
复制
new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError)

2,请求偏移的具体过程

实际过程是,我们SimpleConsumer会根据上步获取到的分区信息,构建偏移请求,然后请求的具体的偏移,这个过程中会涉及到一个配置就是是获取最大偏移,还是最小偏移,还是我们指定的偏移。

代码语言:js
复制
kafka.api.OffsetRequest.EarliestTime()
kafka.api.OffsetRequest.LatestTime()

也即是我们常见的auto.offset.reset这个配置的两个值smallest和largest两个配置。当然了,这个偏移使我们可以指定的,比如SparkStreaming的directStreaming这种策略下,我们就需要自己手动维护偏移或者进行Checkpoint,否则的话每次重启它都会采用auto.offset.reset指定的偏移来作为初始偏移去获取数据,造成数据多次消费或者数据丢失。

代码语言:js
复制
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
  requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);

if (response.hasError()) {
  System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );
 return 0;
}
long[] offsets = response.offsets(topic, partition);

此种情况下,kafka的Broker端的处理是

首先,还是根据消息请求的key找到处理函数

代码语言:js
复制
case RequestKeys.OffsetsKey => handleOffsetRequest(request)

接着是在处理函数里面调用具体的函数,根据我们的要求去获取偏移

代码语言:js
复制
val allOffsets = fetchOffsets(replicaManager.logManager,
 topicAndPartition,
 partitionOffsetRequestInfo.time,
 partitionOffsetRequestInfo.maxNumOffsets)

最终,在fetchOffsetsBefore方法里获取我们上次的偏移。

代码语言:js
复制
timestamp match {
 case OffsetRequest.LatestTime =>
    startIndex = offsetTimeArray.length - 1
 case OffsetRequest.EarliestTime =>
    startIndex = 0
 case _ =>
 var isFound = false
 debug("Offset time array = " + offsetTimeArray.foreach(o => "%d, %d".format(o._1, o._2)))
    startIndex = offsetTimeArray.length - 1
 while (startIndex >= 0 && !isFound) {
 if (offsetTimeArray(startIndex)._2 <= timestamp)
        isFound = true
      else
 startIndex -=1
 }
}

val retSize = maxNumOffsets.min(startIndex + 1)
val ret = new Array[Long](retSize)
for(j <- 0 until retSize) {
  ret(j) = offsetTimeArray(startIndex)._1
  startIndex -= 1
}
// ensure that the returned seq is in descending order of offsets
ret.toSeq.sortBy(-_)

最后是,构建响应,结束

代码语言:js
复制
val response = OffsetResponse(offsetRequest.correlationId, responseMap)

3,根据获得的偏移请求数据的过程

首先是,构建数据请求

代码语言:js
复制
FetchRequest req = new FetchRequestBuilder()
          .clientId(clientName)
          .addFetch(a_topic, a_partition, readOffset, 10000) 
 .build();

我们可以通过多次调用addFetch,来访问一个Broker的多个分区的数据。

需要fetchSize。最终实际获取数据的大小,会在服务端根据情况返回给消费者。最后就可以遍历读取数据了。

代码语言:js
复制
for ( Map.Entry<String, List<Integer>> entry : topicMap.entrySet() ) {
  String topic = entry.getKey();
  for ( Integer offset : entry.getValue()) {
    System.out.println("Response from fetch request no: " + ++fetchReq);
 printMessages((ByteBufferMessageSet) fetchResponse.messageSet(topic, offset));
 }
}

此种情况下,kafka的Broker端的处理是

首先,还是根据消息请求的key找到处理函数

代码语言:js
复制
case RequestKeys.FetchKey => handleFetchRequest(request)

在处理函数里

代码语言:js
复制
val dataRead = replicaManager.readMessageSets(fetchRequest)

由于此处调用比较深,最终是在LogSegment的read方法里根据实际数据大小做响应的处理。

代码语言:js
复制
def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int): FetchDataInfo = {
 if(maxSize < 0)
 throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize))

 val logSize = log.sizeInBytes // this may change, need to save a consistent copy
 val startPosition = translateOffset(startOffset)

 // if the start position is already off the end of the log, return null
 if(startPosition == null)
 return null

  val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition.position)

 // if the size is zero, still return a log segment but with zero size
 if(maxSize == 0)
 return FetchDataInfo(offsetMetadata, MessageSet.Empty)

 // calculate the length of the message set to read based on whether or not they gave us a maxOffset
 val length = 
    maxOffset match {
 case None =>
 // no max offset, just use the max size they gave unmolested
 maxSize
 case Some(offset) => {
 // there is a max offset, translate it to a file position and use that to calculate the max read size
 if(offset < startOffset)
 throw new IllegalArgumentException("Attempt to read with a maximum offset (%d) less than the start offset (%d).".format(offset, startOffset))
 val mapping = translateOffset(offset, startPosition.position)
 val endPosition = 
 if(mapping == null)
            logSize // the max offset is off the end of the log, use the end of the file
 else
 mapping.position
 min(endPosition - startPosition.position, maxSize) 
      }
    }
 FetchDataInfo(offsetMetadata, log.read(startPosition.position, length))
}

三,总结

本文主要是将simpleConsumer从kafka消费数据的过程。在这里在总结一下:

1,获取leader位置。

2,获取上次偏移。

3,请求数据。

其实少了一点,就是容错处理:动态感知leader位置,实现很简单,SimpleConsumer为空进行重新构建SimpleConsumer即可。

从kafka消费数据,还可以总结出亮点:

1,指定分区消费,这个SparkStreaming的directStreaming采用了。

2,消费指定Broker的分区数据。这个适合数据量大,消费者部署在kafka的Broker节点,每台消费者只消费当前Broker上的分区可以减少夸主机流量传输,节省带宽。

熟练掌握SimpleConsumer,对我们了解kafka数据副本同步,和spark Streaming的directStreaming原理有很大帮助,也有利于做我们自己高效的消费者。希望大家能从这篇文章中吸取到又有的知识。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2017-06-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 浪尖聊大数据 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档