前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【应用进阶】Kafka的部署和案例

【应用进阶】Kafka的部署和案例

作者头像
用户5640963
发布2021-06-29 11:15:01
4160
发布2021-06-29 11:15:01
举报
文章被收录于专栏:卯金刀GG卯金刀GG卯金刀GG

这两天学习MQ在项目中的使用,就自己搭建了一个测试环境,在笔记本电脑搭建,使用的win10系统。不废话,开撸。

最先安装JDK,凡是java开发基本都会有,不再赘述;

一、安装zookeeper

1、下载安装文件,地址: http://zookeeper.apache.org/releases.html

2、解压。 打开zookeeper-3.4.13\conf,把zoo_sample.cfg重命名成zoo.cf

3、修改 dataDir= C:/tmp/zookeeper/data ,改成自己的路径

4、添加系统变量

ZOOKEEPER_HOME: C:\hake\soft\apache-zookeeper-3.7.0 (zookeeper目录)

Path: 在现有的值后面添加 ";%ZOOKEEPER_HOME%\bin;"

5、启动

6、测试,可下载 可视化的工具:https://link.zhihu.com/?target=https%3A//issues.apache.org/jira/secure/attachment/12436620/ZooInspector.zip

二、下载安装 Kafka

1、下载地址: http://kafka.apache.org/downloads.html

2、解压文件。 打开kafka_2.12-2.8.0\config

3、编辑server.properties,修改 log.dirs=C:/tmp/kafka-logs配置

4、 进入 C:\hake\soft\kafka_2.12-2.8.0\bin\windows(kafka目录)

5、执行命令:kafka-server-start.bat C:\hake\soft\kafka_2.12-2.8.0\config\server.properties

三、Springboot2 集成 Kafka - 生产者

1、生产者,创建项目 kafka-producter

2、添加pom关键引用

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

3、yml配置

server:
  servlet:
    context-path: /kafka-producter
  port: 9004
#### kafka配置生产者 begin ####
spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      # 写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,
      # 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。
      retries: 0
      # 每次批量发送消息的数量,produce积累到一定数据,一次发送
      batch-size: 16384
      # produce积累数据一次发送,缓存大小达到buffer.memory就发送数据
      buffer-memory: 33554432
      #procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
      #acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
      #acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
      #acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。
      #可以设置的值为:all, -1, 0, 1
      acks: 1
      # 指定消息key和消息体的序列化编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
#### kafka配置生产者 end ####

4、发送代码

/**
 * @Author: Liu Yue
 * @Descripition:
 * @Date; Create in 2021/6/24 14:07
 **/
@RestController
@RequestMapping
@Slf4j
public class KafkaController {

    @Autowired
    private KafkaTemplate  kafkaTemplate;

    @PostMapping("/message/send")
    public boolean send(){
        String msg = "kafka测试数据";
        kafkaTemplate.send("topic_A",msg);
        return true;
    }
}

四、Springboot2 集成 Kafka - 消费者

1、pom配置同上

2、yml配置消费者

server:
  servlet:
    context-path: /kafka-consumer
  port: 9003
#### kafka配置生产者 begin ####
spring:
  kafka:
#### kafka配置消费者 begin ####
    consumer:
      # 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
      group-id: user-log-group
      # smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest
      auto-offset-reset: earliest
      # enable.auto.commit:true --> 设置自动提交offset
      enable-auto-commit: true
      auto-commit-interval: 100
      # 指定消息key和消息体的序列化编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
#### kafka配置消费者 end ####

3、消费者代码

/**
 * @Author: Liu Yue
 * @Descripition:
 * @Date; Create in 2021/6/24 14:19
 **/
@Component
@Slf4j
public class ConsumerListener {

    @KafkaListener(topics = "topic_A")
    public void onMessage(String message){
        //这里为插入数据库代码,业务逻辑等等
        log.info("收到消息内容:{}",message);
    }

}

至此,一个完整的生产者/消费者模型完成。

作用:削峰填谷、解耦、异步执行等等

每天进步一点点!

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档