首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Kafka消费者(0.8.2.2)是否可以批量读取消息

Kafka消费者(0.8.2.2)是否可以批量读取消息
EN

Stack Overflow用户
提问于 2016-02-25 17:22:24
回答 3查看 19.9K关注 0票数 5

根据我的理解,Kafka消费者顺序地从分配的分区读取消息...

我们计划让多个Kafka客户(Java)拥有相同的组I'd ..so,如果它按顺序从指定的分区读取数据,那么我们如何实现高吞吐量..i.e。例如,Producer每秒发布40条消息...消费者进程消息1每秒..though我们可以有多个消费者,但不能有40个rt?如果我说错了请纠正我。

在我们的例子中,消费者只有在消息成功处理后才必须提交偏移量,..else消息将被重新处理...还有更好的解决方案吗?

EN

回答 3

Stack Overflow用户

发布于 2016-02-25 23:00:52

基于你的问题澄清。

一个Kafka消费者可以同时读取多条消息。但是Kafka消费者并不真正读取消息,更准确的说法是,消费者读取一定数量的字节,然后根据单个消息的大小来确定将读取多少消息。通过读取Kafka Consumer Configs,您不允许指定要获取的消息数量,您需要指定消费者可以获取的最大/最小数据大小。无论多少消息适合在这个范围内,您将获得多少消息。正如您所指出的,您将始终按顺序获得消息。

相关的消费者配置(适用于0.9.0.0及更高版本)

  • fetch.min.bytes
  • max.partition.fetch.bytes

更新

在注释中使用您的示例,“我的理解是,如果我在配置中指定读取10字节,并且如果每条消息为2字节,则消费者一次读取5条消息。”这是事实。您的下一条语句“这意味着这5条消息的偏移量在分区中是随机的”是假的。顺序阅读并不意味着一个接一个地读,它只是意味着它们保持有序。您可以对项目进行批处理,并使其保持顺序/有序。请看下面的例子。

在Kafka日志中,如果有10条消息(每条消息2字节),偏移量为0,1,2,3,4,5,6,7,8,9。

如果读取10个字节,您将获得一个包含偏移量0、1、2、3、4处的消息的批处理。

如果读取6个字节,您将获得一个包含偏移量0、1、2处的消息的批处理。

如果先读取6个字节,然后再读取6个字节,就会得到包含消息0、1、2和3、4、5的两个批次。

如果先读取8个字节,然后读取4个字节,就会得到包含消息0、1、2、3和4、5的两个批次。

更新:澄清提交

我不是100%确定提交是如何工作的,我主要是在Storm环境中使用Kafka。提供的KafkaSpout会自动提交Kafka消息。

但是浏览一下0.9.0.1 Consumer APIs,我建议你这样做。似乎有三种方法特别与此讨论相关。

长偏移量轮询(long timeout)

  • commitSync()

  • commitSync(java.util.Map

  • )

poll方法检索消息,可能只有1,也可能是20,对于您的示例,假设返回了3条消息,0,1,2。您现在就有了这三条消息。现在该由您决定如何处理它们了。你可以处理它们0 => 1 => 2,1 => 0 => 2,2 => 0 => 1,这取决于。无论您如何处理它们,在处理之后,您将想要提交,这将告诉Kafka服务器您已处理完这些消息。

使用commitSync()提交上次轮询返回的所有内容,在本例中它将提交偏移量0,1,2。

另一方面,如果您选择使用commitSync(java.util.Map偏移量),则可以手动指定要提交的偏移量。如果你按顺序处理它们,你可以处理偏移量0然后提交它,处理偏移量1然后提交它,最后处理偏移量2并提交。

总而言之,Kafka让你可以自由地处理你想要的消息,你可以选择按顺序或完全随机地处理它们。

票数 16
EN

Stack Overflow用户

发布于 2016-02-26 04:04:05

为了实现并行性,这似乎就是您所要求的,您可以使用主题分区(您将主题划分为N个部分,称为分区)。然后,在使用者中,从这些分区派生多个线程进行消费。

在生产者端,您将消息发布到随机分区(默认),或者为Kafka提供一些消息属性来计算散列(如果需要排序),这将确保具有相同散列的所有msg都转到相同的分区。

编辑(偏移量提交请求示例):

我就是这么做的。所有未提供的方法都是非必要的。

代码语言:javascript
运行
复制
 /**
   * Commits the provided offset for the current client (i.e. unique topic/partition/clientName combination)
   * 
   * @param offset
   * @return {@code true} or {@code false}, depending on whether commit succeeded
   * @throws Exception
   */
  public static boolean commitOffset(String topic, int partition, String clientName, SimpleConsumer consumer,
      long offset) throws Exception {
    try {
      TopicAndPartition tap = new TopicAndPartition(topic, partition);
      OffsetAndMetadata offsetMetaAndErr = new OffsetAndMetadata(offset, OffsetAndMetadata.NoMetadata(), -1L);
      Map<TopicAndPartition, OffsetAndMetadata> mapForCommitOffset = new HashMap<>(1);
      mapForCommitOffset.put(tap, offsetMetaAndErr);

      kafka.javaapi.OffsetCommitRequest offsetCommitReq = new kafka.javaapi.OffsetCommitRequest(
          ConsumerContext.getMainIndexingConsumerGroupId(), mapForCommitOffset, 1, clientName,
          ConsumerContext.getOffsetStorageType());

      OffsetCommitResponse offsetCommitResp = consumer.commitOffsets(offsetCommitReq);
      Short errCode = (Short) offsetCommitResp.errors().get(tap);
      if (errCode != 0) {
        processKafkaOffsetCommitError(tap, offsetCommitResp, BrokerInfo.of(consumer.host()));
        ErrorMapping.maybeThrowException(errCode);
      }
      LOG.debug("Successfully committed offset [{}].", offset);
    } catch (Exception e) {
      LOG.error("Error while committing offset [" + offset + "].", e);
      throw e;
    }
    return true;
  }
票数 1
EN

Stack Overflow用户

发布于 2018-07-27 17:26:49

您可以批量消费消息,也可以批量处理消息。batch.max.wait.ms (属性)使用者将等待这段时间并轮询新消息

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/35623057

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档