前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊spring for kafka对consumer的封装与集成

聊聊spring for kafka对consumer的封装与集成

作者头像
code4it
发布2018-09-17 15:12:16
1.2K0
发布2018-09-17 15:12:16
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要解析一下spring for kafka对原生的kafka client consumer的封装与集成。

consumer工厂

spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java

代码语言:javascript
复制
protected KafkaConsumer<K, V> createKafkaConsumer(String clientIdSuffix) {
        if (!this.configs.containsKey(ConsumerConfig.CLIENT_ID_CONFIG) || clientIdSuffix == null) {
            return createKafkaConsumer();
        }
        else {
            Map<String, Object> modifiedClientIdConfigs = new HashMap<>(this.configs);
            modifiedClientIdConfigs.put(ConsumerConfig.CLIENT_ID_CONFIG,
                    modifiedClientIdConfigs.get(ConsumerConfig.CLIENT_ID_CONFIG) + clientIdSuffix);
            return createKafkaConsumer(modifiedClientIdConfigs);
        }
    }

    protected KafkaConsumer<K, V> createKafkaConsumer(Map<String, Object> configs) {
        return new KafkaConsumer<K, V>(configs, this.keyDeserializer, this.valueDeserializer);
    }

小结

  • 对于生产者来说,封装KafkaProducer到KafkaTemplate相对简单
  • 对于消费者来说,由于spring是采用注解的形式去标注消息处理方法的,所以这里稍微费劲一点:
    • 先在KafkaListenerAnnotationBeanPostProcessor中扫描bean,然后注册到KafkaListenerEndpointRegistrar
    • 而KafkaListenerEndpointRegistrar在afterPropertiesSet的时候去创建MessageListenerContainer
    • messageListener包含了原始endpoint携带的bean以及method转换成的InvocableHandlerMethod
    • ConcurrentMessageListenerContainer这个衔接上,根据配置的spring.kafka.listener.concurrency来生成多个并发的KafkaMessageListenerContainer实例
    • 每个KafkaMessageListenerContainer都自己创建一个ListenerConsumer,然后自己创建一个独立的kafka consumer,每个ListenerConsumer在线程池里头运行,这样来实现并发
    • 每个ListenerConsumer里头都有一个recordsToProcess队列,从原始的kafka consumer poll出来的记录会放到这个队列里头,
    • 然后有一个ListenerInvoker线程循环超时等待从recordsToProcess取出记录,然后调用messageListener的onMessage方法(即KafkaListener注解标准的方法)

ListenerConsumer是重点,里头还有包括offset的提交,这里改天再详解一下。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2017-10-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • consumer工厂
  • 小结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档