前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spring boot + Kafka踩坑之路

Spring boot + Kafka踩坑之路

作者头像
雷子
发布2021-03-15 15:08:12
5460
发布2021-03-15 15:08:12
举报
文章被收录于专栏:雷子说测试开发

近期学习Kafka,在搭建过程中遇到了一些坑,总结下,分享了出来。

首先,我们要先知道什么是Kafaka。

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

那么我们来看看,怎么搭建的环境,这里使用的是docker。

代码语言:javascript
复制
docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka

启动 zookeeper

代码语言:javascript
复制
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper

我们可以看下对应的启动的日志

这里面的,我们要记录下对应的ip,我这里的是172.17.0.1 在启动Kafka需要用到。

代码语言:javascript
复制
docker run  -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=172.17.0.1:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.100:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka

对应的ip要对应一致才能启动。IP要根据自己的实际项目中来配置。否则启动配置的时候,会失败的,启动完docker的镜像后,我们一定要及时的查看日志,启动是否正常。

然后我们看下在spring boot的工程中怎么用。我们利用的是注解的模式。我们考虑着是绑定发送邮件走异步消息队列,就不用担心同步发送消息过长,接口响应过慢。我们看下在pom.xml如何配置。这里不用去配置版本,自动化根据spring boot版本选择适合的,在实际中,我刚开始选择配置版本,结果就出问题了,所以我去掉了版本。

代码语言:javascript
复制
  <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

下载完,源码包之后,我们在application.yaml如下配置。

代码语言:javascript
复制
kafka:
    bootstrap-servers:  0.0.0.0:9092
    producer:
      retries: 1
      batch-size: 16384
      buffer-memory: 33554432
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      enable-auto-commit: true
      group-id:  testGroup
      auto-offset-reset: earliest
    template:
      default-topic:  testTopic

配置完毕只有呢,我们去开发消息的产生端。

代码语言:javascript
复制
 @Override
    @Transactional
    public User bindemail(String email, String username) {
        User user = userRepository.findByUsername(username);
        if (user != null) {
            try {
                if (user.getEmail().isEmpty() == false) {
                    throw new PanExection(ResultEmus.USER_BIND);
                }
                user.setEmail(email);
                userRepository.save(user);
                Map<String ,Object> map=new HashMap<>();
                map.put("username",user.getUsername());
                map.put("email",email);
                map.put("subject","全栈测试平台绑定邮箱");
                map.put("from","全栈测试平台管理员");
                map.put("type","bangding");
                kafkaTemplate.send("testTopic",map.toString());
                redisTemplate.delete("Plan_user");
                return user;
            } catch (NullPointerException e) {
                User emailuser = userRepository.findByEmail(email);
                if (emailuser == null) {
                    user.setEmail(email);
                    userRepository.save(user);
                    Map<String ,Object> map=new HashMap<>();
                    map.put("username",user.getUsername());
                    map.put("email",email);
                    map.put("subject","全栈测试平台绑定邮箱");
                    map.put("from","全栈测试平台管理员");
                    map.put("type","bangding");
                    kafkaTemplate.send("testTopic",map);
                    redisTemplate.delete("Plan_user");
                    return user;
                }
                throw new PanExection(ResultEmus.USER_EMAIL);

            }
        }
        throw new PanExection(ResultEmus.USER_NOT_EXIT);
    }

我们去开发下,对应的消费端。

代码语言:javascript
复制
@Component
@Slf4j
public class ConsumerListener {
    @Autowired
    private EmailServer emailServerl;

    @KafkaListener(topics = "testTopic")
    public void onMessage(String message){
        StringJsonParson stringJsonParson=new StringJsonParson();
        Map<String,Object> map=stringJsonParson.json2map(message);

        if (map.get("type").equals("bangding")){
            System.out.println(map.get("type"));
            log.info("lafka异步发送邮件--绑定成功");
            emailServerl.sendemail(map.get("subject").toString(), map.get("from").toString(),
                    map.get("email").toString(), "你已经成功绑定了登录邮箱,用户名:" + map.get("username"));
        }
    }

}

这样我们就可以启动我们的项目了。

这样是脚本调试,我们看下对应的数据库

提示已经绑定成功了,我们看下日志输出。

日志已经打印出来了,这样spring boot结合kafka的实践已经完成,后续还会有其他的交给Kafka来完成的。这只是一个简单的demo,结合docker部署Kafka环境。

Spring boot系列文章:

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-05-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 雷子说测试开发 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
容器镜像服务
容器镜像服务(Tencent Container Registry,TCR)为您提供安全独享、高性能的容器镜像托管分发服务。您可同时在全球多个地域创建独享实例,以实现容器镜像的就近拉取,降低拉取时间,节约带宽成本。TCR 提供细颗粒度的权限管理及访问控制,保障您的数据安全。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档