Kafka消息队列学习进阶(二)-项目实战

项目实战

项目介绍

这篇将介绍具体怎么使用Kafka,大前提是由于项目中使用的是SpringBoot,本文将介绍的是Kafka与SpringBoot的整合使用(本文的项目代码将在GitHub开放出来,地址是:https://github.com/zfrHJ/kafka)。环境是:

SpringBoot 2.0 以上。

JDK 8 以上。

编译器是IDEA。

项目相关代码

《Jar包》

本项目采用IDEA快速生成,同时利用Maven,下面是Kafka所需要的Jar包依赖:

org.springframework.boot

spring-boot-starter

org.springframework.kafka

spring-kafka

《yml文件》

本项目将采用yml后缀的配置文件格式,如下:

spring:

application:

name: migrate

kafka:

#服务器地址

bootstrap-servers: 192.168.X.X:9092

producer:

retries: 0

batch-size: 26384

buffer-memory: 53554432

key-serializer: org.apache.kafka.common.serialization.StringSerializer

value-serializer: org.apache.kafka.common.serialization.StringSerializer

consumer:

group-id: test-consumer-group

auto-commit-interval: 1000

auto-offset-reset: latest

enable-auto-commit: false

key-deserializer: org.apache.kafka.common.serialization.StringDeserializer

value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

max-poll-records: 150

《代码》

消费者:

@Component

public class KafkaConsumer {

/**

* 案例

*

* @param record

*/

@KafkaListener(topics = {"testSend"})

public void listen(ConsumerRecord record) {

Optional kafkaMessage = Optional.ofNullable(record.value());

if (kafkaMessage.isPresent()) {

Object message = kafkaMessage.get();

System.out.println("----------------- record =" + record);

System.out.println("------------------ message =" + message);

}

}

}

生成者:

@Component

public class KafkaSender {

@Resource

private KafkaTemplate kafkaTemplate;

private Gson gson = new GsonBuilder().create();

/**

* 发送消息方法

*/

public void send() {

Mesage message = new Mesage();

message.setId(String.valueOf(System.currentTimeMillis()));

message.setMsg(UUID.randomUUID().toString());

message.setSendTime(new Date());

System.out.println("+++++++++++++++++++++ message = "+ gson.toJson(message));

kafkaTemplate.send("testSend",gson.toJson(message));

System.out.println(("发送成功 message = "+ gson.toJson(message)));

}

}

《启动成功标识》

Kafka优化

补充知识点

1.清理消息命令

在测试阶段中,会产生很多垃圾数据,我们应该怎么清理呢?下面是清理消费的命令:

./kafka-topics.sh --delete --zookeeper 192.168.X.X:2181 --topic testSend

2.更改分区的命令

上面优化提到了怎么提高分区数量,下面是更改分区数量的命令:

./kafka-topics.sh --alter --partitions 3 --zookeeper 192.168.X.X:2181 --topic testSend

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20181007G0GE8700?refer=cp_1026
  • 腾讯「云+社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 yunjia_community@tencent.com 删除。

扫码关注云+社区

领取腾讯云代金券