来源:https://blog.csdn.net/ldw201510803006/article/details/116176711
由spring提供用于监听以及拉取消息,并将这些消息按指定格式转换后交给由@KafkaListener注解的方法处理,相当于一个消费者;
看看其整体代码结构:
protected void doStart() {
if (isRunning()) {
return;
}
if (this.clientIdSuffix == null) { // stand-alone container
checkTopics();
}
ContainerProperties containerProperties = getContainerProperties();
checkAckMode(containerProperties);
......
// 创建ListenerConsumer消费者并放入到线程池中执行
this.listenerConsumer = new ListenerConsumer(listener, listenerType);
setRunning(true);
this.startLatch = new CountDownLatch(1);
this.listenerConsumerFuture = containerProperties
.getConsumerTaskExecutor()
.submitListenable(this.listenerConsumer);
......
}
public void run() { // NOSONAR complexity
.......
Throwable exitThrowable = null;
while (isRunning()) {
try {
// 拉去消息并处理消息
pollAndInvoke();
}
catch (@SuppressWarnings(UNUSED) WakeupException e) {
......
}
......
}
wrapUp(exitThrowable);
}
并发消息监听,相当于创建消费者;其底层逻辑仍然是通过KafkaMessageListenerContainer实现处理;从实现上看就是在KafkaMessageListenerContainer上做了层包装,有多少的concurrency就创建多个KafkaMessageListenerContainer,也就是concurrency个消费者
protected void doStart() {
if (!isRunning()) {
checkTopics();
......
setRunning(true);
for (int i = 0; i < this.concurrency; i++) {
KafkaMessageListenerContainer<K, V> container =
constructContainer(containerProperties, topicPartitions, i);
String beanName = getBeanName();
container.setBeanName((beanName != null ? beanName : "consumer") + "-" + i);
......
if (isPaused()) {
container.pause();
}
// 这里调用KafkaMessageListenerContainer启动相关监听方法
container.start();
this.containers.add(container);
}
}
}
上面已经介绍了
KafkaMessageListenerContainer
的作用是拉取并处理消息,但还缺少关键的一步,即 如何将我们的业务逻辑与KafkaMessageListenerContainer的处理逻辑联系起来?
那么这个桥梁就是@KafkaListener注解
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
...
return props;
}
}
@KafkaListener(id = "myListener", topics = "myTopic",
autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
public void listen(String data) {
...
}
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
public KafkaListenerContainerFactory<?, ?> batchFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true); // <<<<<<<<<<<<<<<<<<<<<<<<<
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
...
return props;
}
}
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {
...
}
场景:
生产上最初都采用单条消费模式,随着量的积累,部分topic常常出现消息积压,最开始通过新增消费者实例和分区来提升消费端的能力;一段时间后又开始出现消息积压,由此便从代码层面通过批量消费来提升消费能力。
简单的说就是需要配置批量消费和单条记录消费(从单条消费逐步向批量消费演进)
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean(name = [batchListenerContainerFactory])
public KafkaListenerContainerFactory<?, ?> batchFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 开启批量处理
factory.setBatchListener(true);
return factory;
}
@Bean(name = [batchConsumerFactory])
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean(name = [batchConsumerConfig])
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
...
return props;
}
}
注意点:
调试及相关源码版本:
我们创建了一个高质量的技术交流群,与优秀的人在一起,自己也会优秀起来,赶紧点击加群,享受一起成长的快乐。另外,如果你最近想跳槽的话,年前我花了2周时间收集了一波大厂面经,节后准备跳槽的可以点击这里领取!
··································
你好,我是程序猿DD,10年开发老司机、阿里云MVP、腾讯云TVP、出过书创过业、国企4年互联网6年。从普通开发到架构师、再到合伙人。一路过来,给我最深的感受就是一定要不断学习并关注前沿。只要你能坚持下来,多思考、少抱怨、勤动手,就很容易实现弯道超车!所以,不要问我现在干什么是否来得及。如果你看好一个事情,一定是坚持了才能看到希望,而不是看到希望才去坚持。相信我,只要坚持下来,你一定比现在更好!如果你还没什么方向,可以先关注我,这里会经常分享一些前沿资讯,帮你积累弯道超车的资本。