我正在设置一个ConcurrentMessageListenerContainer
<bean class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" id="messageListenerContainer">
<constructor-arg index="0" ref="consumerFactory"/>
<constructor-arg index="1" ref="containerProperties"/>
<property name="concurrency" value="2"/>
</bean>
ConsumerFactory使用此配置:
<util:map id="consumerConfig" map-class="java.util.HashMap">
<entry key="#{T(org.apache.kafka.clients.consumer.ConsumerConfig).BOOTSTRAP_SERVERS_CONFIG}"
value="${rp.kafka.bootstrap.servers}"/>
<entry key="#{T(org.apache.kafka.clients.consumer.ConsumerConfig).KEY_DESERIALIZER_CLASS_CONFIG}"
value="org.apache.kafka.common.serialization.StringDeserializer"/>
<entry key="#{T(org.apache.kafka.clients.consumer.ConsumerConfig).VALUE_DESERIALIZER_CLASS_CONFIG}"
value="org.springframework.kafka.support.serializer.JsonDeserializer"/>
<entry key="#{T(org.springframework.kafka.support.serializer.JsonDeserializer).TRUSTED_PACKAGES}"
value="*"/>
<entry key="#{T(org.apache.kafka.clients.consumer.ConsumerConfig).PARTITION_ASSIGNMENT_STRATEGY_CONFIG}"
value="org.apache.kafka.clients.consumer.RoundRobinAssignor"/>
<entry key="#{T(org.apache.kafka.clients.consumer.ConsumerConfig).ENABLE_AUTO_COMMIT_CONFIG}"
value="false"/>
</util:map>
和ContainerProperties是
<bean class="org.springframework.kafka.listener.ContainerProperties" id="containerProperties">
<constructor-arg>
<list>
<value>sendSMS</value>
</list>
</constructor-arg>
<property name="groupId" value="main"/>
<property name="messageListener" ref="messageListener"/>
<property name="ackMode" value="RECORD"/>
</bean>
我的主题"sendSMS“在3节点集群上有5个分区,rep因子为3,所以我预计由并发1创建的每个KafkaMessageListenerContainer (在这种情况下总共2个)将占用其分区的一部分来处理。Hovewer,在应用程序启动后,我在调试器窗口中看到每个侦听器正在处理所有5个侦听器!来自第4个分区的分区https://gyazo.com/183626ff60061b471858f8cc52573353和消息(我在其中有一条消息挂起处理,在重新启动后没有提交,但这与此问题无关)上的相同偏移量在具有不同使用者的不同线程中传递了2次!为什么会这样呢?这是一个bug还是预期的行为?
发布于 2019-03-27 12:49:27
您没有显示足够的信息。并发容器为子KafkaListenerContainer
聚合分配的分区(每个并发一个分区)。
@Override
public Collection<TopicPartition> getAssignedPartitions() {
return this.containers.stream()
.map(KafkaMessageListenerContainer::getAssignedPartitions)
.filter(Objects::nonNull)
.flatMap(Collection::stream)
.collect(Collectors.toList());
}
您需要显示重新交付的日志;有关详细信息,请打开调试日志记录。
https://stackoverflow.com/questions/55374697
复制