前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >kafka 学习笔记 3 - Java 使用 kafka 收发消息

kafka 学习笔记 3 - Java 使用 kafka 收发消息

作者头像
张云飞Vir
发布2021-07-20 11:26:26
6600
发布2021-07-20 11:26:26
举报
文章被收录于专栏:写代码和思考

1. 背景

本文简述 kafka 的相关内容。

2.知识

更多基础知识见:https://cloud.tencent.com/developer/article/1619554

如何安装 kafka 见:https://cloud.tencent.com/developer/article/1849678

3. 示例

3.1 配置一个“生产者”

1、添加依赖

新建一个项目,并添加依赖:

代码语言:javascript
复制
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

2、配置kafka的服务地址

在配置文件 application.yml 中配置。

代码语言:javascript
复制
server:
  port: 8081

spring:
  application:
    name: "producer"
  kafka:
    bootstrap-servers: "localhost:9092"

3、创建topic

我使用 java 来创建 topic ,注入一个 NewTopic 对象即可。

代码语言:javascript
复制
@Component
public class KafkaConfig {
    private static final String TOPIC_NAME = "topic2";

    // 创建一个主题 topic
    @Bean
    public NewTopic topic1() {
        return TopicBuilder.name(TOPIC_NAME)
                .partitions(1)
                .replicas(1)
                .compact()
                .build();
    }

}

4、发送消息

首先需要注入一个 kafkaTemplate 对象。这个是个 kafka 基础操作的模板方法类。springboot 框架已经帮忙配置好了,直接注入即可。

代码语言:javascript
复制
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

然后直接 send 发送消息

代码语言:javascript
复制
private static final String TOPIC_NAME = "topic2";

kafkaTemplate.send(TOPIC_NAME, data);

就是这么简单省事。

3.2 配置一个“消费者 ”

1、添加依赖

新建一个项目,并添加依赖同上。

代码语言:javascript
复制
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

2、配置kafka服务器地址

修改 application.yml ,示例:

代码语言:javascript
复制
spring:
  kafka:
    bootstrap-servers: "localhost:9092"
    consumer:
      group-id: "myGroup1"
      client-id: "myGroup1"

3、监听消息

KafkaListener 这个注解 指示到一个方法上即可。

格式:

代码语言:javascript
复制
@KafkaListener(topics = TOPIC_NAME)
public void someOne(String content){
  ....
}

我的示例:

代码语言:javascript
复制
@Component
public class MyKafkaConsumer {
    private static final String TOPIC_NAME = "topic2";

    @KafkaListener(topics = TOPIC_NAME)
    public void processMessage(ConsumerRecord<String, String> record) {
        System.out.println(String.format("# record: %s", record));
        System.out.println(String.format("\t\t# 收到消息: %s", record.value()));
    }

}

4. 扩展

Spring-kafka 的文件值得一下看:https://docs.spring.io/spring-kafka/docs/current/reference/html/#configuring-topics

我的代码示例见:https://github.com/vir56k/java_demo/tree/master/kafka_demo1

5. 参考

Springboot 官网文档介绍

https://docs.spring.io/spring-boot/docs/current/reference/html/features.html#features.spring-application

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 背景
  • 2.知识
  • 3. 示例
    • 3.1 配置一个“生产者”
      • 3.2 配置一个“消费者 ”
      • 4. 扩展
      • 5. 参考
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档