前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >「Spring和Kafka」如何在您的Spring启动应用程序中使用Kafka

「Spring和Kafka」如何在您的Spring启动应用程序中使用Kafka

作者头像
首席架构师智库
发布2020-12-08 11:41:32
1.6K0
发布2020-12-08 11:41:32
举报
文章被收录于专栏:超级架构师超级架构师

在架构规划期间选择正确的消息传递系统始终是一个挑战,但这是需要确定的最重要的考虑因素之一。作为一名开发人员,我每天都要编写需要服务大量用户并实时处理大量数据的应用程序。

通常,我将Java与Spring框架(Spring Boot、Spring数据、Spring云、Spring缓存等)一起使用。Spring Boot是一个框架,它允许我比以前更快更轻松地完成开发过程。它已在我的组织中发挥了关键作用。随着用户数量的快速增长,我们意识到我们显然需要每秒处理1,000,000个事件。

当我们发现Apache Kafka®时,我们发现它满足了我们的需求,可以快速处理数百万条消息。这就是为什么我们决定尝试一下。从那一刻起,卡夫卡就成了我口袋里的重要工具。你会问,我为什么选择它?

Apache Kafka是:

  • 可伸缩的
  • 容错
  • 一个很棒的发布-订阅消息传递系统
  • 与大多数消息传递系统相比,具有更高的吞吐量
  • 高度耐用
  • 高度可靠
  • 高的性能

这就是为什么我决定在我的项目中使用它。根据我的经验,我在这里提供了一个循序渐进的指南,介绍如何在Spring启动应用程序中包含Apache Kafka,以便您也可以开始利用它的优点。

先决条件

  • 本文要求您拥有Confluent平台
  • 手动安装使用ZIP和TAR档案
  • 下载
  • 解压缩它
  • 按照逐步说明,您将在本地环境中启动和运行Kafka

我建议在您的开发中使用Confluent CLI来启动和运行Apache Kafka和流平台的其他组件。

你会从这本指南中得到什么

阅读完本指南后,您将拥有一个Spring Boot应用程序,其中包含一个Kafka生成器,用于向您的Kafka主题发布消息,以及一个Kafka使用者,用于读取这些消息。

好了,让我们开始吧!

内容列表

  1. 步骤1:生成项目
  2. 步骤2:发布/读取来自Kafka主题的消息
  3. 步骤3:通过应用程序配置Kafka。yml配置文件
  4. 步骤4:创建一个生产者
  5. 第五步:创造一个消费者
  6. 步骤6:创建一个REST控制器

步骤1:生成项目

首先,让我们使用Spring Initializr来生成我们的项目。我们的项目将有Spring MVC/web支持和Apache Kafka支持。

一旦你解压缩了这个项目,你将会有一个非常简单的结构。我将在本文的最后向您展示项目的外观,以便您能够轻松地遵循相同的结构。我将使用Intellij IDEA,但是你可以使用任何Java IDE。

步骤2:发布/读取来自Kafka主题的消息

现在,你可以看到它是什么样的。让我们继续讨论来自Kafka主题的发布/阅读消息。

首先创建一个简单的Java类,我们将使用它作为示例:package com.demo.models;

public class User { private String name; private int age; public User(String name, int age) { this.name = name; this.age = age; } }

步骤3:通过应用程序配置Kafka.yml配置文件

接下来,我们需要创建配置文件。我们需要以某种方式配置我们的Kafka生产者和消费者,使他们能够发布和从主题读取消息。我们可以使用任意一个应用程序,而不是创建一个Java类,并用@Configuration注释标记它。属性文件或application.yml。Spring Boot允许我们避免过去编写的所有样板代码,并为我们提供了更智能的配置应用程序的方法,如下所示:

server: port: 9000 spring: kafka: consumer: bootstrap-servers: localhost:9092 group-id: group_id auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: bootstrap-servers: localhost:9092 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer

如果您想了解更多关于Spring引导自动配置的信息,可以阅读这篇简短而有用的文章。有关可用配置属性的完整列表,请参阅官方文档。

步骤4:创建一个生产者

创建生产者将把我们的消息写入主题。

@Service public class Producer { private static final Logger logger = LoggerFactory.getLogger(Producer.class); private static final String TOPIC = "users"; @Autowired private KafkaTemplate<String, String> kafkaTemplate; public void sendMessage(String message) { logger.info(String.format("#### -> Producing message -> %s", message)); this.kafkaTemplate.send(TOPIC, message); } }

我们只是自动连接KafkaTemplate,并将使用此实例发布消息到主题——这就是生产者!

第五步:创造一个消费者

Consumer是负责根据您自己的业务逻辑的需要读取消息并对其进行处理的服务。要设置它,请输入以下内容:

@Service public class Consumer { private final Logger logger = LoggerFactory.getLogger(Producer.class); @KafkaListener(topics = "users", groupId = "group_id") public void consume(String message) throws IOException { logger.info(String.format("#### -> Consumed message -> %s", message)); } }

在这里,我们告诉我们的方法void consumption (String message)订阅用户的主题,并将每个消息发送到应用程序日志。在实际的应用程序中,可以按照业务需要的方式处理消息。

步骤6:创建一个REST控制器

如果我们已经有了一个消费者,那么我们就已经拥有了消费Kafka消息所需的一切。

为了完整地显示我们创建的所有内容是如何工作的,我们需要创建一个具有单个端点的控制器。消息将被发布到这个端点,然后由我们的生产者进行处理。

然后,我们的使用者将以登录到控制台的方式捕获和处理它。

@RestController @RequestMapping(value = "/kafka") public class KafkaController { private final Producer producer; @Autowired KafkaController(Producer producer) { this.producer = producer; } @PostMapping(value = "/publish") public void sendMessageToKafkaTopic(@RequestParam("message") String message) { this.producer.sendMessage(message); } }

让我们用cURL把信息发送给Kafka:

curl -X POST -F 'message=test' http://localhost:9000/kafka/publish

基本上,这是它!在不到10个步骤中,您就了解了将Apache Kafka添加到Spring启动项目是多么容易。如果您遵循了这个指南,您现在就知道如何将Kafka集成到您的Spring Boot项目中,并且您已经准备好使用这个超级工具了!

本文

http://jiagoushi.pro/how-work-apache-kafka-your-spring-boot-application

讨论:请加入知识星球【首席架构师圈】或者加微信小号【jiagoushi_pro】或者加QQ群【11107777】

公众号

【jiagoushipro】【首席架构师智库】精彩图文详解架构方法论,架构实践,技术原理,技术趋势。我们在等你,赶快扫描关注吧。

微信小号

【jiagoushi_pro】50000人社区,讨论:企业架构,云计算,大数据,数据科学,物联网,人工智能,安全,全栈开发,DevOps,数字化.

QQ群

【11107767】深度交流企业架构,业务架构,应用架构,数据架构,技术架构,集成架构,安全架构。以及大数据,云计算,物联网,人工智能等各种新兴技术。加QQ群,有珍贵的报告和干货资料分享。

视频号

【首席架构师智库】1分钟快速了解架构相关的基本概念,模型,方法,经验。每天1分钟,架构心中熟。

知识星球

向大咖提问,近距离接触,或者获得私密资料分享。

知识星球【首席架构师圈】

微信圈子

志趣相投的同好交流。

微信圈子【首席架构师圈】

喜马拉雅

路上或者车上了解最新黑科技资讯,架构心得。

【智能时刻,架构君和你聊黑科技】

知识星球

认识更多朋友,职场和技术闲聊。

知识星球【职场和技术】

谢谢大家关注,转发,点赞和点在看。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-12-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 首席架构师智库 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 你会从这本指南中得到什么
相关产品与服务
日志服务
日志服务(Cloud Log Service,CLS)是腾讯云提供的一站式日志服务平台,提供了从日志采集、日志存储到日志检索,图表分析、监控告警、日志投递等多项服务,协助用户通过日志来解决业务运维、服务监控、日志审计等场景问题。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档