专栏首页Spark学习技巧Kafka源码系列之源码解析SimpleConsumer的消费过程

Kafka源码系列之源码解析SimpleConsumer的消费过程

Kafka源码系列是以kafka 0.8.2.2源码为例讲解。浪尖纯属个人爱好,才写想写这个系列。希望对kafka的使用者们带来帮助。

一,消费者消费的过程讲解

<Kafka源码系列之以kafka为例讲解分布式存储系统>这篇文章已经讲过,在一个分布式存储系统中,客户端访问数据一般会分两个步骤,一个是向元数据服务器获取存储的元数据,另一个则是正式发起数据的访问。对于kafka呢?本讲只是针对SimpleConsumer为例讲解,为啥突出讲解这个呢?只是由于这SimpleConsumer是Broker之间用来同步的方式,而且也是SparkStreaming的Direct Stream这种方式来获取数据的方式。所以,此讲为了后面讲spark也是打下基础。

为了使用SampleConsumer从kafka获取数据,我们需要有以下三个请求步骤:

1,随便选取一个Broker,请求元数据,找到leader。

请求的key为:RequestKeys.MetadataKey。

Broker的处理函数为(kafkaApis):

handleTopicMetadataRequest(request)

此举只是为了获取leader信息,而且要强调的一点是一个SampleConsumer只会向一个Broker的分区请求数据。

2,获取请求数据的偏移

请求的key为:RequestKeys.OffsetsKey

Broker的处理函数为(kafkaApis):

handleOffsetRequest(request)

此举是为指定偏移获取数据打下基础。

3,指定偏移获取数据

请求的key为:RequestKeys.FetchKey。

Broker的处理函数为(kafkaApis):

handleFetchRequest(request)

整个过程通讯过程<Kafka源码系列之Broker的IO服务及业务处理>这个文章里面已经讲过了,SimpleConsumer的方式对于使用者来说比较灵活,方便使用者针对主机消费特定的分区或者指定分区进行消费。假如我们自己设计存储系统的话也客户端访问数据的过程也可采用这种方式。

二,以具体的源码过程讲解SimpleConsumer消费数据的过程

1,请求元数据,获取leader位置的过程

首先是遍历我们给的Broker地址列表,然后构建SimpleConsumer,然后构建topic的元数据请求TopicMetadataRequest,遍历获取到的结果,跟我们指定的分区进行匹配,得到我们指定分区的元数据,等到之后停止遍历Broker地址列表返回。这个是指定分区进行消费消费这端的过程。

for (String seed : a_seedBrokers) { //
 SimpleConsumer consumer = null;
 try {
    SimpleConsumer consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");
 List<String> topics = Collections.singletonList(a_topic);
 TopicMetadataRequest req = new TopicMetadataRequest(topics);
 kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

 List<TopicMetadata> metaData = resp.topicsMetadata();
 for (TopicMetadata item : metaData) {
 for (PartitionMetadata part : item.partitionsMetadata()) {
 if (part.partitionId() == a_partition) {
        returnMetaData = part;
 break loop;
 }
    }
  }
  } catch (Exception e) {
    System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic
      + ", " + a_partition + "] Reason: " + e);
 } finally {
 if (consumer != null) consumer.close();
 }
}

Broker端的过程是

消息具体接受过程已经讲过了,这里只讲处理函数。

首先是KakaApis里

case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)

根据key的类型,调用具体的处理函数重要步骤,获取topic信息,获取存活的Broker构建响应。

val topicMetadata = getTopicMetadata(metadataRequest.topics.toSet)
val brokers = metadataCache.getAliveBrokers
val response = new TopicMetadataResponse(brokers, topicMetadata, metadataRequest.correlationId)

从getTopicMetadata深挖的话,会发现最终构建的一个重要对象是分区的元数据信息,消费者就能够根据这个获取获取到leader信息,向leader请求,偏移信息和具体数据了。

new PartitionMetadata(partitionId, leaderInfo, replicaInfo, isrInfo, ErrorMapping.NoError)

2,请求偏移的具体过程

实际过程是,我们SimpleConsumer会根据上步获取到的分区信息,构建偏移请求,然后请求的具体的偏移,这个过程中会涉及到一个配置就是是获取最大偏移,还是最小偏移,还是我们指定的偏移。

kafka.api.OffsetRequest.EarliestTime()
kafka.api.OffsetRequest.LatestTime()

也即是我们常见的auto.offset.reset这个配置的两个值smallest和largest两个配置。当然了,这个偏移使我们可以指定的,比如SparkStreaming的directStreaming这种策略下,我们就需要自己手动维护偏移或者进行Checkpoint,否则的话每次重启它都会采用auto.offset.reset指定的偏移来作为初始偏移去获取数据,造成数据多次消费或者数据丢失。

TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
  requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);

if (response.hasError()) {
  System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );
 return 0;
}
long[] offsets = response.offsets(topic, partition);

此种情况下,kafka的Broker端的处理是

首先,还是根据消息请求的key找到处理函数

case RequestKeys.OffsetsKey => handleOffsetRequest(request)

接着是在处理函数里面调用具体的函数,根据我们的要求去获取偏移

val allOffsets = fetchOffsets(replicaManager.logManager,
 topicAndPartition,
 partitionOffsetRequestInfo.time,
 partitionOffsetRequestInfo.maxNumOffsets)

最终,在fetchOffsetsBefore方法里获取我们上次的偏移。

timestamp match {
 case OffsetRequest.LatestTime =>
    startIndex = offsetTimeArray.length - 1
 case OffsetRequest.EarliestTime =>
    startIndex = 0
 case _ =>
 var isFound = false
 debug("Offset time array = " + offsetTimeArray.foreach(o => "%d, %d".format(o._1, o._2)))
    startIndex = offsetTimeArray.length - 1
 while (startIndex >= 0 && !isFound) {
 if (offsetTimeArray(startIndex)._2 <= timestamp)
        isFound = true
      else
 startIndex -=1
 }
}

val retSize = maxNumOffsets.min(startIndex + 1)
val ret = new Array[Long](retSize)
for(j <- 0 until retSize) {
  ret(j) = offsetTimeArray(startIndex)._1
  startIndex -= 1
}
// ensure that the returned seq is in descending order of offsets
ret.toSeq.sortBy(-_)

最后是,构建响应,结束

val response = OffsetResponse(offsetRequest.correlationId, responseMap)

3,根据获得的偏移请求数据的过程

首先是,构建数据请求

FetchRequest req = new FetchRequestBuilder()
          .clientId(clientName)
          .addFetch(a_topic, a_partition, readOffset, 10000) 
 .build();

我们可以通过多次调用addFetch,来访问一个Broker的多个分区的数据。

需要fetchSize。最终实际获取数据的大小,会在服务端根据情况返回给消费者。最后就可以遍历读取数据了。

for ( Map.Entry<String, List<Integer>> entry : topicMap.entrySet() ) {
  String topic = entry.getKey();
  for ( Integer offset : entry.getValue()) {
    System.out.println("Response from fetch request no: " + ++fetchReq);
 printMessages((ByteBufferMessageSet) fetchResponse.messageSet(topic, offset));
 }
}

此种情况下,kafka的Broker端的处理是

首先,还是根据消息请求的key找到处理函数

case RequestKeys.FetchKey => handleFetchRequest(request)

在处理函数里

val dataRead = replicaManager.readMessageSets(fetchRequest)

由于此处调用比较深,最终是在LogSegment的read方法里根据实际数据大小做响应的处理。

def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int): FetchDataInfo = {
 if(maxSize < 0)
 throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize))

 val logSize = log.sizeInBytes // this may change, need to save a consistent copy
 val startPosition = translateOffset(startOffset)

 // if the start position is already off the end of the log, return null
 if(startPosition == null)
 return null

  val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition.position)

 // if the size is zero, still return a log segment but with zero size
 if(maxSize == 0)
 return FetchDataInfo(offsetMetadata, MessageSet.Empty)

 // calculate the length of the message set to read based on whether or not they gave us a maxOffset
 val length = 
    maxOffset match {
 case None =>
 // no max offset, just use the max size they gave unmolested
 maxSize
 case Some(offset) => {
 // there is a max offset, translate it to a file position and use that to calculate the max read size
 if(offset < startOffset)
 throw new IllegalArgumentException("Attempt to read with a maximum offset (%d) less than the start offset (%d).".format(offset, startOffset))
 val mapping = translateOffset(offset, startPosition.position)
 val endPosition = 
 if(mapping == null)
            logSize // the max offset is off the end of the log, use the end of the file
 else
 mapping.position
 min(endPosition - startPosition.position, maxSize) 
      }
    }
 FetchDataInfo(offsetMetadata, log.read(startPosition.position, length))
}

三,总结

本文主要是将simpleConsumer从kafka消费数据的过程。在这里在总结一下:

1,获取leader位置。

2,获取上次偏移。

3,请求数据。

其实少了一点,就是容错处理:动态感知leader位置,实现很简单,SimpleConsumer为空进行重新构建SimpleConsumer即可。

从kafka消费数据,还可以总结出亮点:

1,指定分区消费,这个SparkStreaming的directStreaming采用了。

2,消费指定Broker的分区数据。这个适合数据量大,消费者部署在kafka的Broker节点,每台消费者只消费当前Broker上的分区可以减少夸主机流量传输,节省带宽。

熟练掌握SimpleConsumer,对我们了解kafka数据副本同步,和spark Streaming的directStreaming原理有很大帮助,也有利于做我们自己高效的消费者。希望大家能从这篇文章中吸取到又有的知识。

本文分享自微信公众号 - Spark学习技巧(bigdatatip),作者:浪尖

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2017-06-13

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Flink与Spark Streaming在与kafka结合的区别!

    本文主要是想聊聊flink与kafka结合。当然,单纯的介绍flink与kafka的结合呢,比较单调,也没有可对比性,所以的准备顺便帮大家简单回顾一下Spark...

    Spark学习技巧
  • kafka管理神器-kafkamanager

    https://github.com/yahoo/kafka-manager/releases

    Spark学习技巧
  • 老司机常用的kafka监控-eagle

    前面有文章说到了一个叫kafka manager的kafka管理工具,这个工具管理kafka确实很强大,但是没有安全认证,随便都可以创建,删除,修改topic,...

    Spark学习技巧
  • SAP Data Intelligence Modeler里的Kafka Producer和Kafka Consumer

    新建一个graph,使用典型的生产者-消费者模型:将Data Generator生成的数据交给kafka Producer operator;

    Jerry Wang
  • 如何在Ubuntu 18.04上安装Apache Kafka

    Apache Kafka是一种流行的分布式消息代理,旨在有效处理大量实时数据。Kafka集群不仅具有高度可扩展性和容错性,而且与其他消息代理(如ActiveMQ...

    你在哪里
  • 如何在CentOS 7上安装Apache Kafka

    Apache Kafka是一种流行的分布式消息代理,旨在有效处理大量实时数据。Kafka集群不仅具有高度可扩展性和容错性,而且与其他消息代理(如ActiveMQ...

    好烟
  • 孟德尔随机化之研究背景

    久违的孟德尔随机化开始更新了,在今天的内容中,我将向大家介绍孟德尔随机化的基本概念及其背景知识,并举例说明何时可以使用该方法以及该方法为何能有效解释因果关系。本...

    生信与临床
  • ELK+filebeat+kafka+zooKeeper搭建(单机版)

    关于elk的配置参考我之前的一篇文章,不在累述: elk安装地址: https://jjlu521016.github.io/2018/05/01/spring...

    日薪月亿
  • kafka系列之camel-kafka

    首先关于 camel 的基本概念和用法,以及 kafka 的基本概念和用法,这里就不啰嗦了。这篇文章假设你对二者都有基本的认识。

    用户7634691
  • 分布式存储系统性能对比

    Here’s how Gaia stacks up against other decentralized storage systems. Features ...

    rectinajh

扫码关注云+社区

领取腾讯云代金券