前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >深入Spring Boot (十三):整合Kafka详解

深入Spring Boot (十三):整合Kafka详解

作者头像
JavaQ
发布2019-11-12 13:18:57
1.6K0
发布2019-11-12 13:18:57
举报
文章被收录于专栏:JavaQ

Kafka是一种高吞吐量的分布式流处理平台,它具有高可用、高吞吐量、速度快、易扩展等特性。本篇将介绍如何使用Spring Boot整合Kafka及使用Kafka实现简单的消息发送和消费,主要包括以下3部分内容:

  1. Kafka
  2. 整合Kafka
  3. 小结

Kafka

Kafka是Apache组织下的一个分布式流处理平台,它具有以下三个功能特性:

  • 作为消息系统,发布和订阅流式的记录,这个与消息队列或者企业消息系统类似。
  • 作为存储系统,储存流式的记录,并且有较好的容错性。
  • 作为流处理,在流式记录产生时就进行实时处理。

Kafka可用于构建以下两大类别的应用:

  • 构造实时流数据管道,它可以在系统或应用之间可靠地获取数据,相当于消息队列。
  • 构建实时流式应用程序,对这些流数据进行转换或者影,也就是流处理。

Kafka的内容比较多,这里只简单介绍相关基本概念,更多kafka知识请浏览http://kafka.apache.org/。

topic

topic直译为主题,在kafka中就是数据主题,是数据记录发布的地方,可用来区分数据、业务系统。

producer

producer就是生产者,在kafka中Producer API允许一个应用程序发布一串流式的数据到一个或者多个topic。

consumer

consumer就是消费者,在kafka中Consumer API允许一个应用程序订阅一个或多个topic ,并且对发布给他们的流式数据进行处理。

Stream Processors

kafka中的Connector API允许构建并运行可重用的生产者或者消费者,将topics连接到已存在的应用程序或者数据系统,例如连接到一个关系型数据库,捕捉表的内容变更。

整合Kafka

使用IDEA新建项目,选择maven管理依赖和构建项目,在pom.xml中添加spring-boot-starter和spring-kafka依赖配置,项目中会使用单元测试检查整合是否正确,所以需要添加spring-boot-starter-test依赖,pom.xml详细内容如下。

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">    <modelVersion>4.0.0</modelVersion>    <groupId>com.kafka</groupId>    <artifactId>demo</artifactId>    <version>1.0-SNAPSHOT</version>
    <parent>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-parent</artifactId>        <version>2.2.0.RELEASE</version>    </parent>
    <dependencies>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter</artifactId>        </dependency>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-test</artifactId>            <scope>test</scope>        </dependency>        <dependency>            <groupId>org.springframework.kafka</groupId>            <artifactId>spring-kafka</artifactId>            <version>2.3.1.RELEASE</version>        </dependency>        <dependency>            <groupId>org.slf4j</groupId>            <artifactId>jcl-over-slf4j</artifactId>            <version>1.7.28</version>        </dependency>    </dependencies>
    <build>        <plugins>            <plugin>                <groupId>org.springframework.boot</groupId>                <artifactId>spring-boot-maven-plugin</artifactId>            </plugin>        </plugins>    </build></project>

在resources目录下新增application.properties,并在其中配置生产者和消费者的相关参数,application.properties中参数会在应用启动时被加载解析并初始化,更多生产者和消费者的参数配置请查阅官方文档。

代码语言:javascript
复制
# kafka server的地址,如果有多个,使用逗号分割spring.kafka.bootstrap-servers=127.0.0.1:9092# 生产者发送失败时,重试次数spring.kafka.producer.retries=0# 生产者消息key和消息value的序列化处理类spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer# 默认消费者group idspring.kafka.consumer.group-id=testGroup# 消费者消息key和消息value的序列化处理类spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

项目目录结构如下图所示。

DemoApplication.java

代码语言:javascript
复制
import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class DemoApplication {    public static void main(String[] args) {        SpringApplication.run(DemoApplication.class, args);    }}

ProducerService.java

代码语言:javascript
复制
public interface ProducerService {    void send(String topic, String msg);}

ProducerServiceImpl.java

代码语言:javascript
复制
import com.kafka.demo.service.ProducerService;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.lang.Nullable;import org.springframework.stereotype.Service;import org.springframework.util.concurrent.ListenableFuture;import org.springframework.util.concurrent.ListenableFutureCallback;@Service("ProducerService")public class ProducerServiceImpl implements ProducerService {    private Logger log = LoggerFactory.getLogger(ProducerServiceImpl.class);    @Autowired    private KafkaTemplate kafkaTemplate;
    @Override    public void send(String topic, String msg) {        ListenableFuture future = kafkaTemplate.send(topic, msg);        future.addCallback(new ListenableFutureCallback() {            @Override            public void onFailure(Throwable throwable) {                log.info("send failure");            }
            @Override            public void onSuccess(@Nullable Object o) {                log.info("send success");            }          });      }}

ConsumerService.java

代码语言:javascript
复制
public interface ConsumerService {    void onReceived(String msg);}

ConsumerServiceImpl.java

代码语言:javascript
复制
import com.kafka.demo.service.ConsumerService;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Service;@Servicepublic class ConsumerServiceImpl implements ConsumerService {    private Logger log = LoggerFactory.getLogger(ConsumerServiceImpl.class);    @KafkaListener(topics="test")
    @Override    public void onReceived(String msg) {        log.info("receive msg=" + msg);    }}

DemoApplicationTest.java

代码语言:javascript
复制
import com.kafka.demo.service.ProducerService;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)@SpringBootTest(classes={com.kafka.demo.DemoApplication.class})public class DemoApplicationTest {
    @Autowired    private ProducerService producerService;
    @Test    public void test(){        producerService.send("test", "hello world");    }}

运行单元测试之前,需要下载并启动Kafka服务器。进入http://kafka.apache.org/downloads下载最新版本并解压。压缩包中Kafka脚本在Unix和Windows平台是不同的,下面使用到的相关命令,如果在Unix平台下请使用bin/,如果在Windows平台下请使用bin\windows\,并且脚本扩展名分别为.bat和.sh。因为kafka使用zookeeper来实现动态的集群扩展,所以要先启动zookeeper,使用如下命令:

代码语言:javascript
复制
bin/zookeeper-server-start.sh config/zookeeper.properties

然后使用如下命令启动kafka:

代码语言:javascript
复制
bin/kafka-server-start.sh config/server.properties

使用如下命令创建一个名为"test"的topic:

代码语言:javascript
复制
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

上面各步骤完成后,即可执行单元测试验证了。

小结

本文通读下来,你会发现整合kafka很简单,添加kafka依赖、使用KafkaTemplate、使用@KafkaListener注解就完成了,其实是SpringBoot在背后默默的做了很多工作,如果想深入了解这部分工作做了什么,入口就是@SpringBootApplication注解。@SpringBootApplication是一个组合注解,它包含了@SpringBootConfiguration、

@EnableAutoConfiguration和@ComponentScan等注解,通过这三个注解实现了bean的配置和加载。深入@EnableAutoConfiguration注解源码,你会发现加载了KafkaAutoConfiguration,在这里加载并实例化了kafka相关的类。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-11-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 JavaQ 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档