作者:小傅哥 博客:https://bugstack.cn
❝沉淀、分享、成长,让自己和他人都能有所收获!😜❞
本文的宗旨在于通过简单干净实践的方式教会读者,使用 Docker 部署 Kafka 以及 Kafka 的管理后台,同时基于 DDD 工程使用 Kafka 消息。这里有一个非常重要的点,就是怎么优雅的在 DDD 工程结构下使用 MQ 消息。
在整个《Java简明教程》已经讲解过 RocketMQ、RabbitMQ 的使用,本文是对 MQ 系列的一个补充,基本大家在选择使用 MQ 组件时,也就这三类。【全文:https://bugstack.cn/md/road-map/road-map.html】
本文设计的工程:
下载 SwitchHost 配置一个映射地址。点击 +
添加一个本地环境,之后配置你的 IP kafka 这样就能找这个地址了。IP 为你本地的IP,如果是云服务器就是公网IP地址。
本案例涉及了 Kafka 的使用,环境的安装脚本已经放到工程下,可以直接点击安装即可。—— 需要前置条件已安装 Docker 环境。
docker-compose -f docker-compose.yml up -d
本节的重点内容在于如何优雅的发送 MQ 消息,让消息聚合到领域层中,并在发送的时候可以不需要让使用方关注过多的细节。【如图】
application-dev.yml
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
# 发生错误后,消息重发的次数。
retries: 1
#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
batch-size: 16384
# 设置生产者内存缓冲区的大小。
buffer-memory: 33554432
# 键的序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 值的序列化方式
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
acks: 1
...
# 配置主题
kafka:
topic:
group: xfg-group
user: xfg-topic
bootstrap-servers: localhost:9092
源码:cn.bugstack.xfg.dev.tech.infrastructure.event.EventPublisher
@Slf4j
@Component
public class EventPublisher {
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
public void publish(String topic, BaseEvent.EventMessage<?> eventMessage) {
try {
String messageJson = JSON.toJSONString(eventMessage);
kafkaTemplate.send(topic, messageJson);
log.info("发送MQ消息 topic:{} message:{}", topic, messageJson);
} catch (Exception e) {
log.error("发送MQ消息失败 topic:{} message:{}", topic, JSON.toJSONString(eventMessage), e);
throw e;
}
}
}
源码:cn.bugstack.xfg.dev.tech.domain.event.UserMessageEvent
public class UserMessageEvent extends BaseEvent<UserMessageEvent.UserMessage> {
@Value("${kafka.topic.user}")
private String topic;
@Override
public EventMessage<UserMessage> buildEventMessage(UserMessage data) {
return EventMessage.<UserMessage>builder()
.id(RandomStringUtils.randomNumeric(11))
.timestamp(new Date())
.data(data)
.build();
}
@Override
public String topic() {
return topic;
}
/**
* 要推送的事件消息,聚合到当前类下。
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public static class UserMessage {
private String userId;
private String userName;
private String userType;
}
}
源码:cn.bugstack.xfg.dev.tech.infrastructure.repository.UserRepository
@Service
public class UserRepository extends UserMessageEvent implements IUserRepository {
@Resource
private EventPublisher publisher;
@Override
public void doSaveUser(UserEntity userEntity) {
// 推送消息
publisher.publish(this.topic(), this.buildEventMessage(UserMessageEvent.UserMessage.builder()
.userId(userEntity.getUserId())
.userName(userEntity.getUserName())
.userType(userEntity.getUserTypeVO().getDesc())
.build()));
}
}
源码:cn.bugstack.xfg.dev.tech.trigger.listener.KafkaMessageListener
@Slf4j
@Component
public class KafkaMessageListener {
@KafkaListener(topics = "${kafka.topic.user}", groupId = "${kafka.topic.group}", concurrency = "1")
public void topic_test(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
Optional<?> message = Optional.ofNullable(record.value());
if (message.isPresent()) {
Object msg = message.get();
try {
// 逻辑处理
// 确认消息消费完成,如果抛异常消息会进入重试
ack.acknowledge();
log.info("Kafka消费成功! Topic:" + topic + ",Message:" + msg);
} catch (Exception e) {
e.printStackTrace();
log.error("Kafka消费失败!Topic:" + topic + ",Message:" + msg, e);
}
}
}
}
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class UserServiceTest {
@Resource
private IUserService userService;
@Test
public void test_register() throws InterruptedException {
while (true) {
UserEntity userEntity = new UserEntity();
userEntity.setUserId("10001");
userEntity.setUserName("小傅哥");
userEntity.setUserTypeVO(UserTypeVO.T8);
userService.register(userEntity);
Thread.sleep(1500);
}
}
}
24-03-17.12:28:58.308 [main ] INFO EventPublisher - 发送MQ消息 topic:xfg-topic message:{"data":{"userId":"10001","userName":"小傅哥","userType":"架构师"},"id":"45672247343","timestamp":1710649737803}
24-03-17.12:28:59.811 [main ] INFO EventPublisher - 发送MQ消息 topic:xfg-topic message:{"data":{"userId":"10001","userName":"小傅哥","userType":"架构师"},"id":"18572935390","timestamp":1710649739809}
24-03-17.12:29:01.294 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO ConsumerCoordinator - [Consumer clientId=consumer-xfg-group-1, groupId=xfg-group] Successfully joined group with generation Generation{generationId=1, memberId='consumer-xfg-group-1-f1c1ab73-b72d-4296-809b-d951f88a49dd', protocol='range'}
24-03-17.12:29:01.297 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO ConsumerCoordinator - [Consumer clientId=consumer-xfg-group-1, groupId=xfg-group] Finished assignment for group at generation 1: {consumer-xfg-group-1-f1c1ab73-b72d-4296-809b-d951f88a49dd=Assignment(partitions=[xfg-topic-0])}
24-03-17.12:29:01.314 [main ] INFO EventPublisher - 发送MQ消息 topic:xfg-topic message:{"data":{"userId":"10001","userName":"小傅哥","userType":"架构师"},"id":"15051699480","timestamp":1710649741313}
24-03-17.12:29:01.334 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO ConsumerCoordinator - [Consumer clientId=consumer-xfg-group-1, groupId=xfg-group] Successfully synced group in generation Generation{generationId=1, memberId='consumer-xfg-group-1-f1c1ab73-b72d-4296-809b-d951f88a49dd', protocol='range'}
24-03-17.12:29:01.334 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO ConsumerCoordinator - [Consumer clientId=consumer-xfg-group-1, groupId=xfg-group] Notifying assignor about the new Assignment(partitions=[xfg-topic-0])
24-03-17.12:29:01.341 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO ConsumerCoordinator - [Consumer clientId=consumer-xfg-group-1, groupId=xfg-group] Adding newly assigned partitions: xfg-topic-0
24-03-17.12:29:01.354 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO ConsumerCoordinator - [Consumer clientId=consumer-xfg-group-1, groupId=xfg-group] Found no committed offset for partition xfg-topic-0
24-03-17.12:29:01.380 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO SubscriptionState - [Consumer clientId=consumer-xfg-group-1, groupId=xfg-group] Resetting offset for partition xfg-topic-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafka:9092 (id: 1 rack: null)], epoch=0}}.
24-03-17.12:29:01.381 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO KafkaMessageListenerContainer - xfg-group: partitions assigned: [xfg-topic-0]
24-03-17.12:29:01.631 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO KafkaMessageListener - Kafka消费成功! Topic:xfg-topic,Message:{"data":{"userId":"10001","userName":"小傅哥","userType":"架构师"},"id":"45672247343","timestamp":1710649737803}
24-03-17.12:29:01.642 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO KafkaMessageListener - Kafka消费成功! Topic:xfg-topic,Message:{"data":{"userId":"10001","userName":"小傅哥","userType":"架构师"},"id":"18572935390","timestamp":1710649739809}
24-03-17.12:29:01.647 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO KafkaMessageListener - Kafka消费成功! Topic:xfg-topic,Message:{"data":{"userId":"10001","userName":"小傅哥","userType":"架构师"},"id":"15051699480","timestamp":1710649741313}
小傅哥是一个大厂的架构师,经常会带着伙伴们,卷这些实际场景中非常有必要的技术。也会带着伙伴实战项目,这些项目也都是来自于互联网大厂中真实的业务场景,所有学习这样的项目无论是实习、校招、社招,都是有非常强的竞争力。别人还在玩玩具,而你已经涨能力!
这样的项目学习在小傅哥星球「码农会锁」有8个,每个都是从0到1开发并提供简历模板和面试题,并且还在继续开发,后续还将有更多!价格嘎嘎实惠,早点加入,早点提升自己。项目地址:https://gaga.plus