前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spring Boot Kafka 生产者/消费者示例

Spring Boot Kafka 生产者/消费者示例

作者头像
用户1418987
发布2023-10-26 14:09:14
6160
发布2023-10-26 14:09:14
举报
文章被收录于专栏:coder
Spring Boot Kafka 生产者/消费者示例_kafka
Spring Boot Kafka 生产者/消费者示例_kafka

Spring Boot Kafka 生产者示例

Spring Boot 是最流行和最常用的 Java 编程语言框架之一。它是一个基于微服务的框架,使用 Spring Boot 制作一个可用于生产的应用程序只需很少的时间。Spring Boot 可以轻松创建独立的、生产级的基于 Spring 的应用程序,您可以“直接运行”。下面列出了 Spring boot 的一些主要特性。

  • 创建独立的 Spring 应用程序
  • 直接嵌入 Tomcat、Jetty 或 Undertow。
  • 提供“入门”依赖项以简化构建配置。
  • 尽可能自动配置 Spring 和第 3 方库。
  • 提供生产就绪的功能,例如运行状况检查、指标和外部化配置。
  • 几乎不需要生成代码,也不需要 XML 配置。

Apache Kafka 是一个发布-订阅消息系统。消息传递系统允许您在进程、应用程序和服务器之间发送消息。从广义上讲,Apache Kafka 是一个可以定义并进一步处理主题(主题可能是一个类别)的软件。应用程序可以连接到该系统并将消息传输到该主题。消息可以包含来自您个人博客上的任何事件的任何类型的信息,也可以是会触发任何其他事件的非常简单的文本消息。

Spring Boot Kafka 生产者/消费者示例_Apache_02
Spring Boot Kafka 生产者/消费者示例_Apache_02

例子:

先决条件

确保您已在本地计算机上安装 Apache Kafka。

步骤 1:

转到此链接https://start.spring.io/并创建一个 Spring Boot 项目。将以下依赖项添加到您的 Spring Boot 项目中。 

  • Apache Kafka 的 Spring
Spring Boot Kafka 生产者/消费者示例_应用程序_03
Spring Boot Kafka 生产者/消费者示例_应用程序_03

步骤 2:

现在让我们创建一个名为DemoController的控制器类。

DemoController.java
代码语言:javascript
复制
// Java Program to Illustrate Controller Class

package com.amiya.kafka.apachekafkaproducer;

// Importing required classes
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.*;

// Annotation
@RestController

// Class
public class DemoController {

	// Autowiring Kafka Template
	@Autowired KafkaTemplate<String, String> kafkaTemplate;

	private static final String TOPIC = "NewTopic";

	// Publish messages using the GetMapping
	@GetMapping("/publish/{message}")
	public String publishMessage(@PathVariable("message")
								final String message)
	{

		// Sending the message
		kafkaTemplate.send(TOPIC, message);

		return "Published Successfully";
	}
}

第3步:

现在我们必须执行以下操作才能使用 Spring Boot 将消息发布到 Kafka 主题

  1. 运行 Apache Zookeeper 服务器
  2. 运行 Apache Kafka 服务器
  3. 监听来自新主题的消息

C:\kafka>.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

同样,使用此命令运行 Apache Kafka 服务器

代码语言:javascript
复制
C:\kafka>.\bin\windows\kafka-server-start.bat .\config\server.properties

运行以下命令来监听来自新主题的消息 

代码语言:javascript
复制
C:\kafka>.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic NewTopic --from-beginning

第4步:

现在运行您的 Spring Boot 应用程序。确保您已更改application.properties文件中的端口号

代码语言:javascript
复制
server.port=8081

让我们在 ApacheKafkaProducerApplication 文件中运行 Spring boot 应用程序

Spring Boot Kafka 生产者/消费者示例_应用程序_04
Spring Boot Kafka 生产者/消费者示例_应用程序_04

第 5 步:

浏览此 URL 并在 /publish/ 后传递您的消息。

代码语言:javascript
复制
http://localhost:8081/publish/demo00

您可以看到我们得到了“Published Successed”作为回应。并且实时您可以看到该消息也已发布到服务器上。消息流是实时的。 

Spring Boot Kafka 生产者/消费者示例_kafka_05
Spring Boot Kafka 生产者/消费者示例_kafka_05

同样,如果我们在此处传递了Hello World,您可以看到我们得到了“发布成功”作为回报。并且实时您可以看到该消息也已发布到服务器上。

Spring Boot Kafka 生产者/消费者示例_Apache_06
Spring Boot Kafka 生产者/消费者示例_Apache_06

Spring Boot Kafka 消费者示例

第 1 步:

创建一个 Spring Boot 项目。将“ Spring for Apache Kafka ”依赖项添加到您的 Spring Boot 项目中。 

Spring Boot Kafka 生产者/消费者示例_Apache_07
Spring Boot Kafka 生产者/消费者示例_Apache_07

第 2 步:

创建一个名为KafkaConfig的配置文件。以下是KafkaConfig.java文件的代码。

KafkaConfig.java
代码语言:javascript
复制
// Java Program to Illustrate Kafka Configuration 

package com.amiya.kafka.apachekafkaconsumer.config; 

// Importing required classes 
import java.util.HashMap; 
import java.util.Map; 
import org.apache.kafka.clients.consumer.ConsumerConfig; 
import org.apache.kafka.common.serialization.StringDeserializer; 
import org.springframework.context.annotation.Bean; 
import org.springframework.context.annotation.Configuration; 
import org.springframework.kafka.annotation.EnableKafka; 
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; 
import org.springframework.kafka.core.ConsumerFactory; 
import org.springframework.kafka.core.DefaultKafkaConsumerFactory; 

// Annotations 
@EnableKafka
@Configuration

// Class 
public class KafkaConfig { 

	@Bean
	public ConsumerFactory<String, String> consumerFactory() 
	{ 

		// Creating a Map of string-object pairs 
		Map<String, Object> config = new HashMap<>(); 

		// Adding the Configuration 
		config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
				"127.0.0.1:9092"); 
		config.put(ConsumerConfig.GROUP_ID_CONFIG, 
				"group_id"); 
		config.put( 
			ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
			StringDeserializer.class); 
		config.put( 
			ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
			StringDeserializer.class); 

		return new DefaultKafkaConsumerFactory<>(config); 
	} 

	// Creating a Listener 
	public ConcurrentKafkaListenerContainerFactory 
	concurrentKafkaListenerContainerFactory() 
	{ 
		ConcurrentKafkaListenerContainerFactory< 
			String, String> factory 
			= new ConcurrentKafkaListenerContainerFactory<>(); 
		factory.setConsumerFactory(consumerFactory()); 
		return factory; 
	} 
}
第 3 步:

创建一个名为KafkaConsumer的Consumer文件

KafkaConsumer.java
代码语言:javascript
复制
// Java Program to Illustrate Kafka Consumer 

package com.amiya.kafka.apachekafkaconsumer.consumer; 

// Importing required classes 
import org.springframework.kafka.annotation.KafkaListener; 
import org.springframework.stereotype.Component; 

@Component

// Class 
public class KafkaConsumer { 

	@KafkaListener(topics = "NewTopic", 
				groupId = "group_id") 

	// Method 
	public void
	consume(String message) 
	{ 
		// Print statement 
		System.out.println("message = " + message); 
	} 
}

第 4 步:

现在我们必须执行以下操作才能使用 Spring Boot 消费来自 Kafka 主题的消息

  • 运行 Apache Zookeeper 服务器
  • 运行 Apache Kafka 服务器
  • 从 Kafka 主题发送消息

使用此命令运行 Apache Zookeeper 服务器

代码语言:javascript
复制
C:\kafka>.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

同样,使用此命令运行 Apache Kafka 服务器

代码语言:javascript
复制
C:\kafka>.\bin\windows\kafka-server-start.bat .\config\server.properties

运行以下命令从 Kafka Topics 发送消息

代码语言:javascript
复制
C:\kafka>.\bin\windows\kafka-console- Producer.bat --broker-list localhost:9092 --topic NewTopic

第 5 步:

现在运行您的 Spring Boot 应用程序。确保您已更改application.properties文件中的端口号

代码语言:javascript
复制
server.port=8081

让我们在 ApacheKafkaConsumerApplication 文件中运行 Spring boot 应用程序

Spring Boot Kafka 生产者/消费者示例_Apache_08
Spring Boot Kafka 生产者/消费者示例_Apache_08

输出:在输出中,您可以看到当您从 Kafka Topics 发送消息时,它会实时显示在控制台上。 

Spring Boot Kafka 生产者/消费者示例_应用程序_09
Spring Boot Kafka 生产者/消费者示例_应用程序_09
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023-10-24,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Spring Boot Kafka 生产者示例
    • 例子:
      • 先决条件
    • 步骤 1:
      • 步骤 2:
        • DemoController.java
      • 第3步:
        • 第4步:
          • 第 5 步:
          • Spring Boot Kafka 消费者示例
            • 第 1 步:
              • 第 2 步:
                • KafkaConfig.java
                • 第 3 步:
                • KafkaConsumer.java
              • 第 4 步:
                • 第 5 步:
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档