根据我的理解,Kafka消费者顺序地从分配的分区读取消息...
我们计划让多个Kafka客户(Java)拥有相同的组I'd ..so,如果它按顺序从指定的分区读取数据,那么我们如何实现高吞吐量..i.e。例如,Producer每秒发布40条消息...消费者进程消息1每秒..though我们可以有多个消费者,但不能有40个rt?如果我说错了请纠正我。
在我们的例子中,消费者只有在消息成功处理后才必须提交偏移量,..else消息将被重新处理...还有更好的解决方案吗?
发布于 2016-02-25 23:00:52
基于你的问题澄清。
一个Kafka消费者可以同时读取多条消息。但是Kafka消费者并不真正读取消息,更准确的说法是,消费者读取一定数量的字节,然后根据单个消息的大小来确定将读取多少消息。通过读取Kafka Consumer Configs,您不允许指定要获取的消息数量,您需要指定消费者可以获取的最大/最小数据大小。无论多少消息适合在这个范围内,您将获得多少消息。正如您所指出的,您将始终按顺序获得消息。
与相关的消费者配置(适用于0.9.0.0及更高版本)
更新
在注释中使用您的示例,“我的理解是,如果我在配置中指定读取10字节,并且如果每条消息为2字节,则消费者一次读取5条消息。”这是事实。您的下一条语句“这意味着这5条消息的偏移量在分区中是随机的”是假的。顺序阅读并不意味着一个接一个地读,它只是意味着它们保持有序。您可以对项目进行批处理,并使其保持顺序/有序。请看下面的例子。
在Kafka日志中,如果有10条消息(每条消息2字节),偏移量为0,1,2,3,4,5,6,7,8,9。
如果读取10个字节,您将获得一个包含偏移量0、1、2、3、4处的消息的批处理。
如果读取6个字节,您将获得一个包含偏移量0、1、2处的消息的批处理。
如果先读取6个字节,然后再读取6个字节,就会得到包含消息0、1、2和3、4、5的两个批次。
如果先读取8个字节,然后读取4个字节,就会得到包含消息0、1、2、3和4、5的两个批次。
更新:澄清提交
我不是100%确定提交是如何工作的,我主要是在Storm环境中使用Kafka。提供的KafkaSpout会自动提交Kafka消息。
但是浏览一下0.9.0.1 Consumer APIs,我建议你这样做。似乎有三种方法特别与此讨论相关。
长偏移量轮询(long timeout)
poll方法检索消息,可能只有1,也可能是20,对于您的示例,假设返回了3条消息,0,1,2。您现在就有了这三条消息。现在该由您决定如何处理它们了。你可以处理它们0 => 1 => 2,1 => 0 => 2,2 => 0 => 1,这取决于。无论您如何处理它们,在处理之后,您将想要提交,这将告诉Kafka服务器您已处理完这些消息。
使用commitSync()提交上次轮询返回的所有内容,在本例中它将提交偏移量0,1,2。
另一方面,如果您选择使用commitSync(java.util.Map偏移量),则可以手动指定要提交的偏移量。如果你按顺序处理它们,你可以处理偏移量0然后提交它,处理偏移量1然后提交它,最后处理偏移量2并提交。
总而言之,Kafka让你可以自由地处理你想要的消息,你可以选择按顺序或完全随机地处理它们。
发布于 2016-02-26 04:04:05
为了实现并行性,这似乎就是您所要求的,您可以使用主题分区(您将主题划分为N个部分,称为分区)。然后,在使用者中,从这些分区派生多个线程进行消费。
在生产者端,您将消息发布到随机分区(默认),或者为Kafka提供一些消息属性来计算散列(如果需要排序),这将确保具有相同散列的所有msg都转到相同的分区。
编辑(偏移量提交请求示例):
我就是这么做的。所有未提供的方法都是非必要的。
/**
* Commits the provided offset for the current client (i.e. unique topic/partition/clientName combination)
*
* @param offset
* @return {@code true} or {@code false}, depending on whether commit succeeded
* @throws Exception
*/
public static boolean commitOffset(String topic, int partition, String clientName, SimpleConsumer consumer,
long offset) throws Exception {
try {
TopicAndPartition tap = new TopicAndPartition(topic, partition);
OffsetAndMetadata offsetMetaAndErr = new OffsetAndMetadata(offset, OffsetAndMetadata.NoMetadata(), -1L);
Map<TopicAndPartition, OffsetAndMetadata> mapForCommitOffset = new HashMap<>(1);
mapForCommitOffset.put(tap, offsetMetaAndErr);
kafka.javaapi.OffsetCommitRequest offsetCommitReq = new kafka.javaapi.OffsetCommitRequest(
ConsumerContext.getMainIndexingConsumerGroupId(), mapForCommitOffset, 1, clientName,
ConsumerContext.getOffsetStorageType());
OffsetCommitResponse offsetCommitResp = consumer.commitOffsets(offsetCommitReq);
Short errCode = (Short) offsetCommitResp.errors().get(tap);
if (errCode != 0) {
processKafkaOffsetCommitError(tap, offsetCommitResp, BrokerInfo.of(consumer.host()));
ErrorMapping.maybeThrowException(errCode);
}
LOG.debug("Successfully committed offset [{}].", offset);
} catch (Exception e) {
LOG.error("Error while committing offset [" + offset + "].", e);
throw e;
}
return true;
}发布于 2018-07-27 17:26:49
您可以批量消费消息,也可以批量处理消息。batch.max.wait.ms (属性)使用者将等待这段时间并轮询新消息
https://stackoverflow.com/questions/35623057
复制相似问题