前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spring Boot 整合 Kafka 详解

Spring Boot 整合 Kafka 详解

作者头像
九转成圣
发布2024-08-09 10:26:56
1810
发布2024-08-09 10:26:56
举报
文章被收录于专栏:csdn

Spring Boot 整合 Kafka 详解

本文将详细介绍如何在 Spring Boot 项目中整合 Apache Kafka,包括 Kafka 的配置、消息的同步和异步发送。

1. 环境准备

在开始之前,请确保你已经安装并配置好 Kafka 集群。如果还没有,请参考 Kafka 官方文档进行安装和配置。

2. 创建 Spring Boot 项目
2.1 使用 Spring Initializr 创建项目

访问 Spring Initializr,选择以下配置:

  • Project: Maven Project
  • Language: Java
  • Spring Boot: 2.2.2.RELEASE
  • Dependencies: Spring for Apache Kafka

点击 “Generate” 按钮,下载生成的项目,并解压到本地。

3. 添加依赖

pom.xml 文件中添加 Kafka 依赖:

代码语言:javascript
复制
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
</dependencies>
4. 配置 Kafka

src/main/resources 目录下创建 application.yml 文件,并添加以下配置:

代码语言:javascript
复制
spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: my-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

配置说明:

  • bootstrap-servers: Kafka broker 的地址列表。
  • consumer: 消费者配置,包括消费者组 ID、偏移量重置策略、键和值的反序列化器。
  • producer: 生产者配置,包括键和值的序列化器。
5. 创建 Kafka 生产者

src/main/java/com/example/demo 目录下创建 KafkaProducerConfig.java 文件,并添加以下代码:

代码语言:javascript
复制
package com.example.demo;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}
6. 发送消息

src/main/java/com/example/demo 目录下创建 KafkaProducerService.java 文件,并添加以下代码:

代码语言:javascript
复制
package com.example.demo;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

@Service
public class KafkaProducerService {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    private static final String TOPIC = "my-topic";

    // 同步发送消息
    public void sendMessageSync(String message) {
        try {
            kafkaTemplate.send(TOPIC, message).get();
            System.out.println("同步消息发送成功: " + message);
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("同步消息发送失败: " + message);
        }
    }

    // 异步发送消息
    public void sendMessageAsync(String message) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC, message);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                System.out.println("异步消息发送成功: " + message);
            }

            @Override
            public void onFailure(Throwable ex) {
                ex.printStackTrace();
                System.out.println("异步消息发送失败: " + message);
            }
        });
    }
}
7. 测试 Kafka 生产者

src/main/java/com/example/demo 目录下创建 DemoApplication.java 文件,并添加以下代码:

代码语言:javascript
复制
package com.example.demo;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class DemoApplication implements CommandLineRunner {

    @Autowired
    private KafkaProducerService kafkaProducerService;

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        kafkaProducerService.sendMessageSync("Hello, Kafka (Sync)!");
        kafkaProducerService.sendMessageAsync("Hello, Kafka (Async)!");
    }
}
8. 运行效果

运行 DemoApplication 类,将看到控制台输出如下消息:

代码语言:javascript
复制
同步消息发送成功: Hello, Kafka (Sync)!
异步消息发送成功: Hello, Kafka (Async)!

如果 Kafka 生产者发送消息失败,将看到错误信息。

9. 总结

本文详细介绍了如何在 Spring Boot 项目中整合 Apache Kafka,包括 Kafka 的配置、消息的同步和异步发送。通过理解和实践这些内容,可以帮助你更好地掌握 Spring Boot 与 Kafka 的整合与应用。希望本文对你有所帮助,如有任何疑问或建议,欢迎留言讨论。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2024-08-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Spring Boot 整合 Kafka 详解
    • 1. 环境准备
      • 2. 创建 Spring Boot 项目
        • 2.1 使用 Spring Initializr 创建项目
      • 3. 添加依赖
        • 4. 配置 Kafka
          • 5. 创建 Kafka 生产者
            • 6. 发送消息
              • 7. 测试 Kafka 生产者
                • 8. 运行效果
                  • 9. 总结
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档