前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊spring对kafka的集成方式

聊聊spring对kafka的集成方式

作者头像
code4it
发布2018-09-17 15:05:27
2.9K0
发布2018-09-17 15:05:27
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要简单梳理梳理java应用中生产/消费kafka消息的一些使用选择。

可用类库

  • kafka client
  • spring for apache kafka
  • spring integration kafka
  • spring cloud stream binder kafka

除了官方的java api类库外,spring生态中又额外包装了很多,这里一一简单介绍下。

spring for apache kafka

基于java版的kafka client与spring进行集成

代码语言:javascript
复制
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>1.2.2.RELEASE</version>
</dependency>

与springboot的集成

对于springboot 1.5版本之前的话,需要自己去配置java configuration,而1.5版本以后则提供了auto config,具体详见org.springframework.boot.autoconfigure.kafka这个包,主要有

  • KafkaAutoConfiguration spring-boot-autoconfigure-1.5.7.RELEASE-sources.jar!/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java
代码语言:javascript
复制
@Configuration
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
@Import(KafkaAnnotationDrivenConfiguration.class)
public class KafkaAutoConfiguration {

    private final KafkaProperties properties;

    public KafkaAutoConfiguration(KafkaProperties properties) {
        this.properties = properties;
    }

    @Bean
    @ConditionalOnMissingBean(KafkaTemplate.class)
    public KafkaTemplate<?, ?> kafkaTemplate(
            ProducerFactory<Object, Object> kafkaProducerFactory,
            ProducerListener<Object, Object> kafkaProducerListener) {
        KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<Object, Object>(
                kafkaProducerFactory);
        kafkaTemplate.setProducerListener(kafkaProducerListener);
        kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
        return kafkaTemplate;
    }

    @Bean
    @ConditionalOnMissingBean(ProducerListener.class)
    public ProducerListener<Object, Object> kafkaProducerListener() {
        return new LoggingProducerListener<Object, Object>();
    }

    @Bean
    @ConditionalOnMissingBean(ConsumerFactory.class)
    public ConsumerFactory<?, ?> kafkaConsumerFactory() {
        return new DefaultKafkaConsumerFactory<Object, Object>(
                this.properties.buildConsumerProperties());
    }

    @Bean
    @ConditionalOnMissingBean(ProducerFactory.class)
    public ProducerFactory<?, ?> kafkaProducerFactory() {
        return new DefaultKafkaProducerFactory<Object, Object>(
                this.properties.buildProducerProperties());
    }

}
  • KafkaAnnotationDrivenConfiguration spring-boot-autoconfigure-1.5.7.RELEASE-sources.jar!/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java
代码语言:javascript
复制
@Configuration
@ConditionalOnClass(EnableKafka.class)
class KafkaAnnotationDrivenConfiguration {

    private final KafkaProperties properties;

    KafkaAnnotationDrivenConfiguration(KafkaProperties properties) {
        this.properties = properties;
    }

    @Bean
    @ConditionalOnMissingBean
    public ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() {
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer();
        configurer.setKafkaProperties(this.properties);
        return configurer;
    }

    @Bean
    @ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
        configurer.configure(factory, kafkaConsumerFactory);
        return factory;
    }

    @EnableKafka
    @ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
    protected static class EnableKafkaConfiguration {

    }
}
  • ConcurrentKafkaListenerContainerFactoryConfigurer spring-boot-autoconfigure-1.5.7.RELEASE-sources.jar!/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java
代码语言:javascript
复制
public class ConcurrentKafkaListenerContainerFactoryConfigurer {

    private KafkaProperties properties;

    /**
     * Set the {@link KafkaProperties} to use.
     * @param properties the properties
     */
    void setKafkaProperties(KafkaProperties properties) {
        this.properties = properties;
    }

    /**
     * Configure the specified Kafka listener container factory. The factory can be
     * further tuned and default settings can be overridden.
     * @param listenerContainerFactory the {@link ConcurrentKafkaListenerContainerFactory}
     * instance to configure
     * @param consumerFactory the {@link ConsumerFactory} to use
     */
    public void configure(
            ConcurrentKafkaListenerContainerFactory<Object, Object> listenerContainerFactory,
            ConsumerFactory<Object, Object> consumerFactory) {
        listenerContainerFactory.setConsumerFactory(consumerFactory);
        Listener container = this.properties.getListener();
        ContainerProperties containerProperties = listenerContainerFactory
                .getContainerProperties();
        if (container.getAckMode() != null) {
            containerProperties.setAckMode(container.getAckMode());
        }
        if (container.getAckCount() != null) {
            containerProperties.setAckCount(container.getAckCount());
        }
        if (container.getAckTime() != null) {
            containerProperties.setAckTime(container.getAckTime());
        }
        if (container.getPollTimeout() != null) {
            containerProperties.setPollTimeout(container.getPollTimeout());
        }
        if (container.getConcurrency() != null) {
            listenerContainerFactory.setConcurrency(container.getConcurrency());
        }
    }

}

创建并发的多个KafkaMessageListenerContainer,相当于一个应用实例创建多个consumer 如果是1.5版本及以上的springboot,使用起来就比较简单了,注入kafkaTemplate直接发消息,然后简单配置一下就可以消费消息

spring integration kafka

spring integration是spring关于Enterprise Integration Patterns的实现,而spring integration kafka则基于spring for apache kafka提供了inbound以及outbound channel的适配器 Starting from version 2.0 version this project is a complete rewrite based on the new spring-kafka project which uses the pure java Producer and Consumer clients provided by Kafka 0.9.x.x and 0.10.x.x

这个的话,没有自动配置,又引入了integration相关的概念,整体来讲,相对复杂一些。

consumer配置

代码语言:javascript
复制
    @Bean
    public KafkaMessageListenerContainer<String, String> container(
            ConsumerFactory<String, String> kafkaConsumerFactory) {
        return new KafkaMessageListenerContainer<>(kafkaConsumerFactory,
                new ContainerProperties(new TopicPartitionInitialOffset(topic, 0)));
    }
    @Bean
    public ConsumerFactory<?, ?> kafkaConsumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }
    @Bean
    public KafkaMessageDrivenChannelAdapter<String, String> adapter(KafkaMessageListenerContainer<String, String> container) {
        KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
                new KafkaMessageDrivenChannelAdapter<>(container);
        kafkaMessageDrivenChannelAdapter.setOutputChannel(fromKafka());
        return kafkaMessageDrivenChannelAdapter;
    }
    @Bean
    public PollableChannel fromKafka() {
        return new QueueChannel();
    }

producer配置

代码语言:javascript
复制
    @Bean
    @ServiceActivator(inputChannel = "toKafka")
    public MessageHandler handler() throws Exception {
        KafkaProducerMessageHandler<String, String> handler =
                new KafkaProducerMessageHandler<>(kafkaTemplate());
        handler.setTopicExpression(new LiteralExpression(topic));
        handler.setMessageKeyExpression(new LiteralExpression(messageKey));
        return handler;
    }
    @Bean
    public ProducerFactory<String, String> kafkaProducerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerAddress);
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(props);
    }
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(kafkaProducerFactory());
    }

收发信息

代码语言:javascript
复制
    @Autowired
    @Qualifier("fromKafka")
    private PollableChannel fromKafka;

    @Autowired
    @Qualifier("toKafka")
    MessageChannel toKafka;

    Message msg = fromKafka.receive(10000l);
    toKafka.send(new GenericMessage<Object>(UUID.randomUUID().toString()));

spring cloud stream

基于Spring Integration构建,在spring cloud环境中又稍作加工,也稍微有点封装了. 具体详见spring cloud stream kafka实例以及spring-cloud-stream-binder-kafka属性配置

doc

  • spring-kafka
  • spring-integration
  • spring-integration-kafka
  • spring-integration-samples-kafka
  • spring-cloud-stream
  • spring boot与kafka集成
  • 总结kafka的consumer消费能力很低的情况下的处理方案
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2017-10-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
    • 可用类库
    • spring for apache kafka
      • 与springboot的集成
      • spring integration kafka
        • consumer配置
          • producer配置
            • 收发信息
            • spring cloud stream
            • doc
            相关产品与服务
            消息队列 CKafka 版
            消息队列 CKafka 版(TDMQ for CKafka)是一个分布式、高吞吐量、高可扩展性的消息系统,100%兼容开源 Kafka API 2.4、2.8、3.2 版本。CKafka 基于发布/订阅模式,通过消息解耦,使生产者和消费者异步交互,无需彼此等待。CKafka 具有高可用、数据压缩、同时支持离线和实时数据处理等优点,适用于日志压缩收集、监控数据聚合、流式数据集成等场景。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档