首页
学习
活动
专区
工具
TVP
发布

SpringBoot整合Kafka

前言

一直用公司的kafka组件,对kafka原生实现比较模糊,作为一名优秀(渣渣)程序员,必须自己搭建kafka环境调试一把,Let`s go~

环境

:0.11.0.1

: 1.8.0_161

: 1.1.1.RELEASE

: 10.12.6

spring-kafka是什么呢?首先我们要知道,kafka服务端运行起来之后,消息的发送和消费都是从服务端(broker)获取,那么就涉及到业务代码和kafka服务端的连接(client)。kafka提供client来建立连接,但是提供的实现Api较为底层,而spring-kafka项目则是为了方便集成spring项目,而封装了client Api,方便java业务代码接入。

spring-kafka 主页地址: https://spring.io/projects/spring-kafka,其中,需要注意spring-kafka的版本和kafka-client版本的对应关系,官网有对照表,红框为本次实验所使用的版本。

名词解释

消息生产者:Producer,负责生产消息并发送到kafka服务器

消息消费者:Consumer,消息的使用方,负责消费kafka服务器上的消息

主题:topic,由使用方(用户)定义,并配置在kafka服务器,用来建立生产者和消费者之间的关系

消息分区:Partition,一个topic可以有多个分区,同一个topic的消息,生产者发送到kafka服务器后,会根据相应的负载均衡写入到某个partition分区中,写入时按顺序追加写入到尾部

Broker:即kafka服务器,用于存储消息。kafka消息以log方式存储,但是其中并不是字符串,而是标准格式的二进制文件,使用kafka提供的shell工具可以查看内容,使用 也可以直接查看log文件内容

offset:消息存储在kafka的Broker上,消费者拉取消息数据过程中需要知道消息在文件中的偏移量,偏移量即是offset

服务启动

启动步骤:

1 启动zookeeper

kafka的启动是依赖zk的,kafka启动时会首先连接上zk进行节点注册(该节点为临时节点,随着kafka的宕机或下线,节点会被删除),注册成功后,kafka服务器(Broker)会将自己的IP地址和端口信息写入到节点下。

同时同一个topic的消息会被分别存储在多个分区(Partition)并分布到多个Broker上,这些分区信息和Broker的对应关系也是由zk来维护的。

解压下载的kafka,进入到最上层目录,运行 启动单机zk

2 启动kafka

打开另一个终端Tab,运行

再打开一个终端Tab,使用jps查看,出现 和 即可。kafka默认端口是9092,zk默认端口是2181

建立springboot工程

常规springboot工程,版本为 1.5.9 ,工程目录如下:

maven依赖如下:

配置如下,简单整合如下配置就好:

消息实体:

生产者:

其中topic主题不需要提前在kafka上建立,没有该topic,kafka会自动建立

消费者:

其中 注解接收一组topic,可以订阅多个主题

触发controller:

运行工程:在浏览器中输入:

IDEA 输出:

可以看到消息成功发送并消费到了。

消息在哪里

到这里完成kafka消息的生产和消费,那么我们推送到kafka服务器的消息,到底在哪里呢?

上面提到过,kafka会持久化接收到的消息,以log的形式进行写入,其中log以标准二进制存储数据。存储文件的配置路径就在

文件中,其中属性: 指明了消息存储路径,打开该路径,可以看到一个文件夹

前面部分是topic名称,后面数字0表示分区,意为topic为 的消息,分布在分区0的消息存储在该文件夹中,打开文件夹看到: 文件,该文件即是消息存储的文件了。使用

可以看到:

注意:有些可能会遇到提示说 实际是因为没有权限,加上sudo即可

一些报错和解决

1、NoSuchMethodException

原因:应该是spring-kafka的版本用的太高了,用了2.x.x的版本,版本需要spring5的支持,但是springboot使用了1.5.x,集成的是spring4

解决方法:使用springboot版本 2.x.x

2、zk启动以后会有下面的Error信息:

原因:暂时不是很清楚,google了好像是zk的机制,第一次注册上去会提示说没有节点,但紧接着会创建节点(不是很确定)

解决办法:暂时忽略,后面找时间详细研究下,从结果看不影响使用kafka

3、发送消息会失败,大约1分钟后提示超时

原因:可能是没有连上kafka服务器

解决办法:这里只能说是有可能的解决方式。

1、 文件中指明本机地址,不要使用

2、打开kafka的配置文件

图中圈红的一行,放开注释,并且写上本机的ip地址。

小提示:关闭的时候,先关闭kafka,再关闭zk;否则先关掉这zk,kafka会一直报错,导致 失效....

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20180923G0IQKO00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券