Kafka是由LinkedIn开发并开源的分布式消息系统,因其分布式及高吞吐率而被广泛使用,现已与Cloudera Hadoop,Apache Storm,Apache Spark集成,具备许多优秀的性能:高吞吐、分布式、跨平台、实时性以及伸缩性,本文我们就来看看如何将Spring Cloud Bus和Kafka进行整合。
Kafka现在是Apache上的开源项目,直接到官网下载即可(http://kafka.apache.org/),这个不用我多说。
下载成功之后,是一个压缩文件,解压该文件,我们可以看到一个bin目录,进入到bin目录中,bin目录下的.sh文件都是Linux/Unix下的shell脚本,在Linux/Unix环境下直接运行这些脚本即可,bin目录中还有一个windows目录,该目录下存储的都是windows中的批处理文件。我们在运行时根据自己的操作系统选择合适的命令去执行,本文以windows为例。解压后为了后面的命令操作方便,我将windows文件配置到环境变量中,我的是D:\Program\kafka_2.11-0.11.0.1\bin\windows
,然后在cmd中进入到解压目录下,执行zookeeper-server-start.bat .\config\zookeeper.properties
命令,表示启动zookeeper(由于Kafka依赖的zookeeper,所以我们要先启动zookeeper再启动Kafka),如下:
zookeeper在启动的过程中需要用到zookeeper.properties配置文件,这个文件中定义了zookeeper的端口为2181。zookeeper启动成功之后,接下来我们要启动Kafka,执行kafka-server-start.bat .\config\server.properties
命令,如下:
两者都启动成功之后,我们可以执行如下命令kafka-console-producer.bat --broker-list localhost:9092 --topic test
来发送一条消息,该命令可以启动Kafka基于命令行的消息生产客户端,启动成功之后,我们就可以直接在命令行发送消息了,如下:
消息发送了,当然要有人来接收,接下来我们来创建消息接收端,执行kafka-console-consumer.bat --zookeeper localhost:2181 --topic test --from-beginning
命令,启动成功之后,我们就可以收到刚刚发送的消息了,如下:
Spring Cloud Bus和Kafka的整合非常简单,如果我们使用了默认配置,就可以从RabbitMQ无缝切换过来,只需要修改一下我们之前config-server和config-client的依赖,将spring-cloud-starter-bus-amqp改为spring-cloud-starter-bus-kafka,如下:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-kafka</artifactId>
</dependency>
修改之后,我们分别启动eureka、config-server和config-client,测试方式还是和上文一样,该有的/bus/refresh接口的功能还是不变。这里我就不再赘述了。
好了,Kafka我们就先说这么多。有问题欢迎留言讨论。
参考资料:
1.《Spring Cloud微服务实战》