RabbitMQ在微服务中

RabbitMQ,也称为开源消息代理,支持多种消息协议,可以部署在分布式系统上。它非常轻巧,可以轻松部署应用程序。它主要作为一个队列,首先可以对输入的消息进行操作。RabbitMQ可在许多操作系统和云环境中运行,并为大多数流行语言提供各种开发人员工具。它是生产者 - 消费者风格模式,生产者发送消息,消费者使用它。RabbitMQ的主要功能如下:

  1. 异步消息
  2. 分布式部署
  3. 管理和监督
  4. 企业级和云就绪型

安装

对于RabbitMQ,您首先需要在系统中安装ErLang,因为RabbitMQ程序是用ErLang编程语言编写的。在ErLang之后,您可以按照其中的说明从其主页下载最新版本的RabbitMQ

在微服务中使用RabbitMQ

RabbitMQ是在微服务架构中实现消息队列的最简单的免费选项之一。这些队列模式可以通过在各种微服务之间进行通信来帮助扩展应用程序。我们可以将这些队列用于各种目的,例如核心微服务之间的交互,微服务的分离,实现故障转移机制以及通过消息代理发送电子邮件通知。

在两个或多个核心模块需要相互通信的地方,我们不应该进行直接的HTTP调用,因为它们可以使核心层紧密耦合,并且当每个核心模块有更多实例时很难管理。此外,每当服务关闭时,HTTP调用模式将失败,因为重新启动后,无法跟踪旧的HTTP请求调用。这导致需要RabbitMQ。

在微服务中设置RabbitMQ

在微服务架构中,对于此演示,我们将使用通过各种核心微服务发送电子邮件通知的示例模式。在这种模式中,我们将有一个生产者,任何核心微服务,它将生成电子邮件内容并将其传递给队列。然后,这个电子邮件内容由消费者使用,消费者总是在队列中收听新消息。

请注意,我们使用Spring Boot作为我们的微服务,因此我们将为Spring提供配置。

1)生产者: 该层负责生成电子邮件内容并将此内容传递给RabbitMQ中的消息代理。

a)在属性文件中,我们需要提及队列名称和交换类型以及安装RabbitMQ服务器的主机和端口。

queue.name=messagequeue
fanout.exchange=messagequeue-exchange
spring.rabbitmq.host: localhost
spring.rabbitmq.port: 5672
spring.rabbitmq.username: guest
spring.rabbitmq.password: guest    

b)我们需要创建一个配置类,它将使用队列名称和交换类型将队列绑定到微服务模块。

@Configuration
public class RabbitConfiguration {
 @Value("${fanout.exchange}")
 private String fanoutExchange;
 @Value("${queue.name}")
 private String queueName;
 @Bean
 Queue queue() {
  return new Queue(queueName, true);
 }
 @Bean
 FanoutExchange exchange() {
  return new FanoutExchange(fanoutExchange);
 }
 @Bean
 Binding binding(Queue queue, FanoutExchange exchange) {
  return BindingBuilder.bind(queue).to(exchange);
 }
}

c)最后,我们需要一个util类,它将用于使用Spring框架提供的RabbitTemplate将实际的电子邮件内容发送到队列。

@Component
public class QueueProducer {
 protected Logger logger = LoggerFactory.getLogger(getClass());
 @Value("${fanout.exchange}")
 private String fanoutExchange;
 private final RabbitTemplate rabbitTemplate;
 @Autowired
 public QueueProducer(RabbitTemplate rabbitTemplate) {
  super();
  this.rabbitTemplate = rabbitTemplate;
 }
 public void produce(NotificationRequestDTO notificationDTO) throws Exception {
  logger.info("Storing notification...");
  rabbitTemplate.setExchange(fanoutExchange);
  rabbitTemplate.convertAndSend(new ObjectMapper().writeValueAsString(notificationDTO));
  logger.info("Notification stored in queue sucessfully");
 }
}

d)然后,您可以从模块中的任何位置调用Produ方法。

{
  queueProducer.produce(notificationDTO);
}

2)消费者: 该层负责使用FIFO方法从RabbitMQ消息代理消费消息,然后执行与电子邮件相关的操作。

a)在属性文件中,我们需要提到队列名称和交换类型,以及安装RabbitMQ服务器的主机和端口。

queue.name=messagequeue
fanout.exchange=messagequeue-exchange
spring.rabbitmq.host: localhost
spring.rabbitmq.port: 5672
spring.rabbitmq.username: guest
spring.rabbitmq.password: guest

b)我们需要创建一个配置类,它将使用队列名称和交换类型将队列绑定到微服务模块。此外,在消费者的RabbitMQ配置中,我们需要创建一个 MessageListenerAdapter bean,它将使其充当使用者并始终在队列管道中侦听传入消息。这个MessageListenerAdapter 将有一个带有Consumer util类和defaultListenerMethod的参数化构造函数 ,我们可以在其中指定与电子邮件相关的操作。

@Configuration
public class RabbitConfiguration {
 private static final String LISTENER_METHOD = "receiveMessage";
 @Value("${queue.name}")
 private String queueName;
 @Value("${fanout.exchange}")
 private String fanoutExchange;
 @Bean
 Queue queue() {
  return new Queue(queueName, true);
 }
 @Bean
 FanoutExchange exchange() {
  return new FanoutExchange(fanoutExchange);
 }
 @Bean
 Binding binding(Queue queue, FanoutExchange exchange) {
  return BindingBuilder.bind(queue).to(exchange);
 }
 @Bean
 SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
  MessageListenerAdapter listenerAdapter) {
  SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
  container.setConnectionFactory(connectionFactory);
  container.setQueueNames(queueName);
  container.setMessageListener(listenerAdapter);
  return container;
 }
 @Bean
 MessageListenerAdapter listenerAdapter(QueueConsumer consumer) {
  return new MessageListenerAdapter(consumer, LISTENER_METHOD);
 }
}

c)然后,我们需要创建一个具有指定消息监听器方法的类QueueConsumer ,我们可以在其中进行实际的电子邮件发送操作。

@Component
public class QueueConsumer {
 @Autowired
 MailServiceImpl mailServiceImpl;
 protected Logger logger = LoggerFactory.getLogger(getClass());
 public void receiveMessage(String message) {
  logger.info("Received (String) " + message);
  processMessage(message);
 }
 public void receiveMessage(byte[] message) {
  String strMessage = new String(message);
  logger.info("Received (No String) " + strMessage);
  processMessage(strMessage);
 }
 private void processMessage(String message) {
  try {
   MailDTO mailDTO = new ObjectMapper().readValue(message, MailDTO.class);
   ValidationUtil.validateMailDTO(mailDTO);
   mailServiceImpl.sendMail(mailDTO, null);
  } catch (JsonParseException e) {
   logger.warn("Bad JSON in message: " + message);
  } catch (JsonMappingException e) {
   logger.warn("cannot map JSON to NotificationRequest: " + message);
  } catch (Exception e) {
   logger.error(e.getMessage());
  }
 }
}

结论

使用RabbitMQ,您可以避免服务之间的直接HTTP调用,并消除核心微服务的紧密耦合。这将帮助您在更高级别扩展微服务,并在微服务之间添加故障转移机制。

原文标题《RabbitMQ in Microservices》

作者:Akash Bhingole

译者:February

不代表云加社区观点,更多详情请查看原文链接

原文链接:https://dzone.com/articles/rabbitmq-in-microservices

原文作者:Akash Bhingole

编辑于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏大闲人柴毛毛

手把手0基础项目实战(三)——教你开发一套电商平台的安全框架

写在最前 本文是《手把手项目实战系列》的第三篇文章,预告一下,整个系列会介绍如下内容: 《手把手0基础项目实战(一)——教你搭建一套可自动化构建的微服务框架(S...

4556
来自专栏码生

react-navigation 监听页面显隐(viewDidAppear viewDidDisappear)

我们经常遇到的需求就是,当某个界面出现的时候,就刷新一下此界面的数据 保证用户的数据处于一种相对同步的情况

2734
来自专栏java工会

推荐几个自己写的Java后端相关的范例项目

2505
来自专栏芋道源码1024

IntelliJ IDEA JDK 8 性能调优

IntelliJ IDEA 问题描述问题原因解决方法调优后观察为什么要选择用户`idea.vmoptions`文件

1682
来自专栏用户2442861的专栏

使用IntelliJ IDEA开发SpringMVC网站(四)用户管理

转载请注明出处:Gaussic(一个致力于AI研究却不得不兼顾项目的研究生) 。

2291
来自专栏java、Spring、技术分享

java 日志处理

  common-logging是 apache提供的一个通用的日志接口。用户可以自由选择第三方的日志组件作为具体实现,像log4j,或者jdk自带的loggi...

3243
来自专栏微信公众号:Java团长

Java Web现代化开发:Spring Boot + Mybatis + Redis二级缓存

Spring-Boot因其提供了各种开箱即用的插件,使得它成为了当今最为主流的Java Web开发框架之一。Mybatis是一个十分轻量好用的ORM框架。Red...

2472
来自专栏Gaussic

使用IntelliJ IDEA开发SpringMVC网站(四)用户管理 顶

访问GitHub下载最新源码:https://github.com/gaussic/SpringMVCDemo

1872
来自专栏kangvcar

Cobbler 快速入门指南

1573
来自专栏Java帮帮-微信公众号-技术文章全总结

02.WebService_使用三要素

02.WebService_使用三要素 一、Java中WebService规范 JAVA 中共有三种WebService 规范,分别是JAX-WS、J...

3686

扫码关注云+社区

领取腾讯云代金券