前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >kafka之springboot集成kafka(四)

kafka之springboot集成kafka(四)

原创
作者头像
翰墨飘香
发布2024-01-18 23:22:36
2280
发布2024-01-18 23:22:36
举报
文章被收录于专栏:翰墨飘香翰墨飘香

参考

https://kafka.apache.org/22/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html

https://juejin.cn/post/7210225864355659835

https://thepracticaldeveloper.com/spring-boot-kafka-config/

https://reflectoring.io/spring-boot-kafka/

一、项目新建

1.1 方式一、spring项目自动生成

https://start.spring.io/

1.2 方式二、手动搭建引入kafka

1、pom引入

代码语言:js
复制
     <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

2、yaml文件配置

代码语言:js
复制
spring:
  kafka:
    producer:
    bootstrap-servers: 127.0.0.1:9092

二、代码编写

2.1 方式一、使用spring

2.1.1 创建主题(create Kafka Topic)

代码语言:java
复制
@Slf4j
@RestController
public class TopicCreateController {

    @Autowired
    private KafkaProperties properties;

    @GetMapping("/create/{topicName}")
    public String createTopic(@PathVariable String topicName) {
        AdminClient client = AdminClient.create(properties.buildAdminProperties());
        if (client != null) {
            try {
                Collection<NewTopic> newTopics = new ArrayList<>(1);
                newTopics.add(new NewTopic(topicName, 1, (short) 1));
                client.createTopics(newTopics);
            } catch (Throwable e) {
                e.printStackTrace();
            } finally {
                client.close();
            }
        }
        return topicName;
    }

    @GetMapping("/test")
    public String createTopic() {
        return "success";
    }
}

2.1.2 生产者(Spring Boot Kafka Producer)

Fire-and-forget模式

发送消息后不需要逻辑程序关心是否发送成功。

同步模式

即sender()方法后再调用get()方法会同步地等待结果返回,根据结果可以判断是否发送成功。

代码语言:java
复制
@Slf4j
@RestController
public class ProducerController {
    private static final String topic = "hello";
    @Autowired
    private KafkaTemplate<Object, Object> kafkaTemplate;
    /**
     * 同步获取通知结果
     * @param msg
     * @return
     */
    @GetMapping("/produce/{msg}")
    public String produce(@PathVariable String msg) {
        // 发送消息
        try {
            SendResult result = kafkaTemplate.send(topic, msg).get();
            System.out.println(result);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
        System.out.println("send"+ msg);
        return "send: "+ msg;
    }

异步生产者

代码语言:java
复制
 @GetMapping("/produceAsync/{msg}")
    public String hello2(@PathVariable String msg) {
        // 同步获取结果
        ListenableFuture<SendResult<Object,Object>> future = kafkaTemplate.send(topic,msg);
        try {
            SendResult<Object,Object> result = future.get();
            System.out.println("success >>> "+ result.getRecordMetadata().topic() + ",offset"+result.getRecordMetadata().offset()); // success >>> hello2);
        }catch (Throwable e){
            e.printStackTrace();
        }

        System.out.println("async send: " +msg);
        return "async send: " +msg;
    }

2.1.3 消费者(Spring Boot Kafka Consumer)

代码语言:java
复制
// 输入代码内容
  @KafkaListener(id = "helloGroup", topics = "hello")
    public void hello(String msg) {
        System.out.println(msg);
    }

2.2 方式二 使用Kafka原生

2.2.1 生产者

代码语言:java
复制
private static Properties props = new Properties();
static {
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
}
public void ProduceMsg(String topic,String msg){
        Producer<String, String> producer = new KafkaProducer<>(props);
        // 发送消息
        ProducerRecord<String,String> record =
                new ProducerRecord<String, String>(topic,msg);

        //producer.send(record);
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception == null) {
                    System.out.println("消息发送成功");
                } else {
                    System.out.println("消息发送失败");
                }
            }
        });
}

2.2.2 消费者

代码语言:java
复制
  private static Properties props = new Properties();
    static {
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id","tpd-loggers");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    }
    final KafkaConsumer<String, String> consumer;

    private volatile boolean isRunning = true;

    public AutoCommitConsumer(String topicName) {
        consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList(topicName));
    }

    public void printReceiveMsg() {
        try {
            while (isRunning) {
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1000));
                Thread.sleep(1000);
                if (!consumerRecords.isEmpty()) {
                    for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                        System.out.println("TopicName: " + consumerRecord.topic() + " Partition:" +
                                consumerRecord.partition() + " Offset:" + consumerRecord.offset() + "" +
                                " Msg:" + consumerRecord.value());
                    }

                }
            }
        }catch (Exception e){
            System.out.println(e.getMessage());
        }
        finally {
            close();
        }

    }

    public void close() {
        isRunning = false;
        if (consumer != null) {
            consumer.close();
        }
    }

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、项目新建
    • 1.1 方式一、spring项目自动生成
      • 1.2 方式二、手动搭建引入kafka
        • 1、pom引入
          • 2、yaml文件配置
          • 二、代码编写
            • 2.1 方式一、使用spring
              • 2.1.1 创建主题(create Kafka Topic)
            • 2.1.2 生产者(Spring Boot Kafka Producer)
              • Fire-and-forget模式
              • 同步模式
              • 异步生产者
              • 2.1.3 消费者(Spring Boot Kafka Consumer)
            • 2.2 方式二 使用Kafka原生
              • 2.2.1 生产者
              • 2.2.2 消费者
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档