前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >SpringBoot系列之集成kafka实现事件发布

SpringBoot系列之集成kafka实现事件发布

作者头像
SmileNicky
发布2022-01-04 16:46:14
8970
发布2022-01-04 16:46:14
举报
文章被收录于专栏:Nicky's blog

事件发布订阅实现,我们经常使用到spring框架提供的ApplicationEventPublisher,基于kafka的特性,我们也可以简单实现类似的效果

1、kafka环境部署搭建

官网下载链接:https://kafka.apache.org/downloads,最开始用最新版的,发现在我的win10系统没部署成功,所以还是选择2.8.1版本的

在D:\kafka_2.12-2.8.1\bin\windows,使用cmd命令启动zookeeper,window系统修改conf文件夹下面的zookeeper.properties里面的dataDir

代码语言:javascript
复制
zookeeper-server-start.bat ..\..\config\zookeeper.properties
在这里插入图片描述
在这里插入图片描述

window系统修改conf文件夹下面的log.dirs路径

代码语言:javascript
复制
kafka-server-start.bat ..\..\config\server.properties
在这里插入图片描述
在这里插入图片描述

2、kafka常用命令使用

启动另外一个cmd参考,创建一个命令为test-topic的topic

代码语言:javascript
复制
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-topic

查看kafkatopic列表

代码语言:javascript
复制
kafka-topics.bat --list --zookeeper localhost:2181

启动kafka的生产者,发送topic数据

代码语言:javascript
复制
kafka-console-producer.bat --broker-list localhost:9092 --topic test-topic
在这里插入图片描述
在这里插入图片描述

启动一个kafka消费者端,可以接收到消息数据

代码语言:javascript
复制
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test-topic --from-beginning
在这里插入图片描述
在这里插入图片描述

3、创建一个kafka starter工程

创建一个工程,实现对kafka的api简单封装

在这里插入图片描述
在这里插入图片描述

jdk选择jdk8的

在这里插入图片描述
在这里插入图片描述

选择需要的依赖

在这里插入图片描述
在这里插入图片描述

基于kafka的EventPublisher

代码语言:javascript
复制
package com.example.ebus.publisher;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationEvent;
import org.springframework.kafka.core.KafkaTemplate;

@Slf4j
public class MyEventPublisher {

    private KafkaTemplate<String, Object> kafkaTemplate;

    @Value("${app.ebus.topic:ebus}")
    private String topic;

    public MyEventPublisher(KafkaTemplate<String, Object> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void publishEvent(Object event) {
        if (log.isInfoEnabled()) {
            log.info("topic发送:{}", event.getClass().getName());
        }
        kafkaTemplate.send(topic, event);
    }

}

自动配置类

代码语言:javascript
复制
package com.example.ebus.configuration;

import com.example.ebus.publisher.MyEventPublisher;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaTemplate;

@Configuration
public class EbusAutoConfiguration {

    @Bean
    public MyEventPublisher myEventPublisher(@Qualifier("kafkaTemplate") KafkaTemplate<String, Object> kafkaTemplate) {
        return new MyEventPublisher(kafkaTemplate);
    }

}

META-INF/spring.factories,加上配置

代码语言:javascript
复制
org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.example.ebus.configuration.EbusAutoConfiguration

4、kafka生产端实现

同样创建一个生产者端工程,引入starter,加上yml配置

代码语言:javascript
复制
spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    producer:
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

写个测试接口测试数据:

代码语言:javascript
复制
package com.example.producer.controller;

import com.example.ebus.event.ShopOrderEvent;
import com.example.ebus.publisher.MyEventPublisher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping(value = "/api")
public class ShopOrderController {

    @Autowired
    private MyEventPublisher eventPublisher;

    @PostMapping("/order")
    public String placeOrder(@RequestBody ShopOrderEvent orderEvent) {
        eventPublisher.publishEvent(orderEvent);
        return orderEvent.getOrderCode();
    }
}
在这里插入图片描述
在这里插入图片描述
代码语言:javascript
复制
{
    "orderCode":"123456",
    "productName":"三星手机",
    "price":1122,
    "productDesc":"三星手机"
}

5、kafka消费者端实现

同样创建一个消费者工程

代码语言:javascript
复制
spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    consumer:
      enable-auto-commit: true
      group-id: consumer1
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      max-poll-records: 1
      properties:
        spring:
          json:
            trusted:
              packages: '*'

进行监听,使用kafka的KafkaListener

代码语言:javascript
复制
package com.example.consumer.handler;


import com.example.ebus.event.ShopOrderEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class OrderListenerHandler {

    @KafkaListener(topics = {"${app.ebus.topic:ebus}"})
    public void obtainTopicData(ShopOrderEvent event) {
        log.info("下单成功,orderCode:{}" , event.getOrderCode());
    }

}

本博客代码例子可以在GitHub找到下载链接

参考资料

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022/01/01 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1、kafka环境部署搭建
  • 2、kafka常用命令使用
  • 3、创建一个kafka starter工程
  • 4、kafka生产端实现
  • 5、kafka消费者端实现
  • 参考资料
相关产品与服务
命令行工具
腾讯云命令行工具 TCCLI 是管理腾讯云资源的统一工具。使用腾讯云命令行工具,您可以快速调用腾讯云 API 来管理您的腾讯云资源。此外,您还可以基于腾讯云的命令行工具来做自动化和脚本处理,以更多样的方式进行组合和重用。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档