前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >消息中间件

消息中间件

作者头像
用户8447427
发布2023-10-18 15:11:53
1690
发布2023-10-18 15:11:53
举报
文章被收录于专栏:userlyz学习记录userlyz学习记录

消息中间件

RabbitMq ActiveMQ RocketMQ kafka

安装RabbitMQ

(33条消息) RabbitMQ安装教程(超详细)_凡尘-追梦者的博客-CSDN博客

为什么要使用消息队列

为什么使用消息队列? (gitee.io)

解耦 ​ 异步 削峰

缺点

系统可用性降低 系统复杂度提高 一致性问题

RabbitMQ demo

1、rabbitmq创建vhost

在 RabbitMQ 中,可以通过管理界面或者命令行方式创建 vhost(虚拟主机)。下面分别介绍两种方式:

通过 RabbitMQ 管理界面创建 vhost:

  • 打开浏览器,访问 RabbitMQ 管理界面(默认地址:http://localhost:15672/)。
  • 使用管理员账号登录(默认用户名:guest,密码:guest)。
  • 在左侧导航栏中选择 “Admin”(管理员菜单)。
  • 在 “Add a new virtual host”(添加新的虚拟主机)部分,输入要创建的 vhost 名称,然后点击 “Add virtual host” 按钮即可。

通过命令行方式创建 vhost:

使用 RabbitMQ 的 rabbitmqctl 命令可以进行管理操作。在命令行中运行以下命令来创建 vhost:

代码语言:javascript
复制
bashCopy code# 以管理员权限登录 RabbitMQ
sudo rabbitmqctl add_vhost <vhost_name>

注意将 <vhost_name> 替换为你要创建的 vhost 名称。

无论哪种方式创建 vhost,都可以在 RabbitMQ 中为不同应用程序或服务创建隔离的消息队列环境。确保为每个应用程序分配不同的 vhost,以便更好地管理和维护消息队列。

2、生产者和消费者demo

在参数设置的时候虚拟机是不需要”/“

ProducerHelloworld.java

代码语言:javascript
复制
public static void main(String[] args) throws IOException, TimeoutException {
    // 1、创建一个连接工厂
    ConnectionFactory factory=new ConnectionFactory();
    // 2、设置参数
    factory.setHost("101.200.40.242");//主机ip
    factory.setPort(5672);//端口号
    factory.setVirtualHost("lyz");//设置虚拟机(可以理解为分组)
    factory.setUsername("root");
    factory.setPassword("root");
    //3、根据配置信息 创建连接
    Connection connection = factory.newConnection();
    //4、创建channel
    Channel channel = connection.createChannel();
    //5、创建队列
    /*
     * 参数:
     *             1. queue:队列名称
     *             2. durable:是否持久化,true  当mq重启之后,还在
     *             3. exclusive:
     *                 * 是否独占。只能有一个消费者监听这队列
     *                 * 当Connection关闭时,是否删除队列
     *                 *
     *             4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
     *             5. arguments:参数。
     */
    channel.queueDeclare("queueHelloworld",true,false,false,null);

    // 6、发送消息
    String body="hello 07201111";
    /*
    basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
    参数:
        1. exchange:交换机名称。简单模式下交换机会使用默认的 ""
        2. routingKey:路由名称
        3. props:配置信息
        4. body:发送消息数据
     */
    channel.basicPublish("","queueHelloworld",null,body.getBytes());

    //7.释放资源
    channel.close();
    connection.close();
}

ConsumerHelloworld.java

代码语言:javascript
复制
public static void main(String[] args) throws IOException, TimeoutException {
    //创建连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    // 设置参数
    factory.setHost("101.200.40.242");
    factory.setPort(5672);
    factory.setUsername("root");
    factory.setPassword("root");
    factory.setVirtualHost("lyz");
    // 创建连接 Connection
    Connection connection = factory.newConnection();
    // 创建channel
    Channel channel = connection.createChannel();
    //创建队列
    channel.queueDeclare("queueHelloworld",true,false,false,null);
    //接受消息
    DefaultConsumer consumer = new DefaultConsumer(channel){
        @Override
        public void handleDelivery(String consumerTag,
                                   Envelope envelope,
                                   AMQP.BasicProperties properties,
                                   byte[] body) throws IOException {
            System.out.println("consumerTag = " + consumerTag);
            System.out.println("envelope.getExchange() = " + envelope.getExchange());
            System.out.println("envelope.getRoutingKey() = " + envelope.getRoutingKey());
            System.out.println("properties = " + properties);
            System.out.println("new String(body) = " + new String(body));
        }
    };
    //参数3 使用回调对象的对调函数 展示消息
    channel.basicConsume("queueHelloworld",true,consumer);


}

Springboot整合RabbitMQ

简单测试

配置类

代码语言:javascript
复制
@Configuration
public class RabbitMQConfig {
    public static final String EXCHANGE_NAME="boot_topic_exchange";
    public static final String QUEUE_NAME="boot_queue";
    @Bean
    public Queue queue(){
        return new Queue("queue",true);
    }
}

发送请求

代码语言:javascript
复制
@Service
@Slf4j
public class MQSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(Object message){
        log.info("发送消息:"+message);
        rabbitTemplate.convertAndSend("queue",message);
    }
}

接收消息

代码语言:javascript
复制
@Service
@Slf4j
public class MQReceiver {

    @RabbitListener(queues = "queue")
    public void receive(Object msg){
        log.info("接受消息:"+msg);
    }
}

fanout广播模式

这个模式将消息广播给所有与交换机绑定的队列。路由键在这里不起作用,只需要绑定队列到交换机即可,所有绑定的队列都会收到相同的消息。

配置类

代码语言:javascript
复制
@Configuration
public class RabbitMQConfig {
    private static final  String QUEUE1="queue_fanout01";
    private static final  String QUEUE2="queue_fanout02";
    private static final  String EXCHANGE="fanoutExchange";
    @Bean
    public Queue queue(){
        return new Queue("queue",true);
    }
    @Bean
    public Queue queue1(){
        return new Queue(QUEUE1);
    }
    @Bean
    public Queue queue2(){
        return new Queue(QUEUE2);
    }
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange(EXCHANGE);
    }
    @Bean
    public Binding binding1(){
        return BindingBuilder.bind(queue1()).to(fanoutExchange());
    }
    @Bean
    public Binding binding2(){
        return BindingBuilder.bind(queue2()).to(fanoutExchange());
    }
}

发送信息

代码语言:javascript
复制
@Service
@Slf4j
public class MQSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(Object message){
        log.info("发送消息:"+message);
        rabbitTemplate.convertAndSend("fanoutExchange","",message);
    }
}

接受消息

代码语言:javascript
复制
@Service
@Slf4j
public class MQReceiver {

    @RabbitListener(queues = "queue")
    public void receive(Object msg){
        log.info("接受消息:"+msg);
    }
    @RabbitListener(queues = "queue_fanout01")
    public void receive01(Object msg){
        log.info("QUEUE01_接受消息:"+msg);
    }
    @RabbitListener(queues = "queue_fanout02")
    public void receive02(Object msg){
        log.info("QUEUE_02接受消息:"+msg);
    }
}

测试

direct模式

路由模式 生产者将消息发送到direct交换器,在绑定队列和交换器的时候有一个路由key,生产者发送的消息会指定一个路由key,那么消息只会发送到相应key相同的队列,接着监听该队列的消费者消费消息。 也就是让消费者有选择性的接收消息。 路由模式,是以路由规则为导向,引导消息存入符合规则的队列中。再由队列的消费者进行消费的。

配置类

代码语言:javascript
复制
@Configuration
public class RabbitMQConfig {
    private static final String QUEUE1="queue_direct01";
    private static final String QUEUE2="queue_direct02";
    private static final String EXCHANGE="directExchange";
    private static final String ROUNTINGKEY01="queue.red";
    private static final String ROUNTINGKEY02="queue.green";
    @Bean
    public Queue queue1(){
        return new Queue(QUEUE1);
    }
    @Bean
    public Queue queue2(){
        return new Queue(QUEUE2);
    }
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange(EXCHANGE);
    }
    @Bean
    public Binding binding01(){
        return BindingBuilder.bind(queue1()).to(directExchange()).with(ROUNTINGKEY01);
    }
    @Bean
    public Binding binding02(){
        return BindingBuilder.bind(queue2()).to(directExchange()).with(ROUNTINGKEY02);
    }
}

消息发送者

代码语言:javascript
复制
@Service
@Slf4j
public class MQSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    public void send(Object message){
        log.info("发送消息:"+message);
        rabbitTemplate.convertAndSend("fanoutExchange","",message);
    }

    public void send01(Object message){
        log.info("发送消息:"+message);
        rabbitTemplate.convertAndSend("directExchange","queue.red",message);
    }
    public void send02(Object message){
        log.info("发送消息:"+message);
        rabbitTemplate.convertAndSend("directExchange","queue.green",message);
    }
}

消息接收者

代码语言:javascript
复制
@Service
@Slf4j
public class MQReceiver {
    @RabbitListener(queues = "queue_direct01")
    public void receive01(Object msg){
        log.info("QUEUE01_接受消息:"+msg);
    }
    @RabbitListener(queues = "queue_direct02")
    public void receive02(Object msg){
        log.info("QUEUE_02接受消息:"+msg);
    }
}

测试

Topic模式

上面的路由模式是根据路由key进行完整的匹配(完全相等才发送消息),这里的通配符模式通俗的来讲就是模糊匹配。 符号“#”表示匹配一个或多个词,符号“*”表示匹配一个词。   与路由模式相似,但是,主题模式是一种模糊的匹配方式

配置类

代码语言:javascript
复制
@Configuration
public class RabbitMQConfig {
	private static final String QUEUE01="queue_topic01";
    private static final String QUEUE02="queue_topic02";
    private static final String EXCHANGE="topicExchange";
    private static final String ROUNTINGKEY01="#.queue.#";
    private static final String ROUNTINGKEY02="*.queue.#";
    @Bean
    public Queue queue1(){
        return new Queue(QUEUE01);
    }
    @Bean
    public Queue queue2(){
        return new Queue(QUEUE02);
    }
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange(EXCHANGE);
    }
    @Bean
    public Binding binding01(){
        return BindingBuilder.bind(queue1()).to(topicExchange()).with(ROUNTINGKEY01);
    }
    @Bean
    public Binding binding02(){
        return BindingBuilder.bind(queue2()).to(topicExchange()).with(ROUNTINGKEY02);
    }
}

生产者

代码语言:javascript
复制
@Service
@Slf4j
public class MQSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    public void send03(Object msg){
        log.info("发送消息queue01接受:"+msg);
        rabbitTemplate.convertAndSend("topicExchange","queue.red.message",msg);
    }
    public void send04(Object msg){
        log.info("发送消息queue01和queue02接受:"+msg);
        rabbitTemplate.convertAndSend("topicExchange","message.queue.green",msg);
    }
}

消费者

代码语言:javascript
复制
@Service
@Slf4j
public class MQReceiver {
    @RabbitListener(queues="queue_topic01")
    public void receive05(Object msg){
        log.info("queue01接受消息"+msg);
    }
    @RabbitListener(queues="queue_topic02")
    public void receive06(Object msg){
        log.info("queue02接受消息"+msg);
    }
}

测试 @Test public void testTopic(){ mqSender.send03("发消息喽03"); } @Test public void testTopic02(){ mqSender.send04("发消息喽04"); }

测试2 @Test public void testTopic(){ mqSender.send03("发消息喽03"); mqSender.send04("发消息喽04"); }

发现再不同时发送的时候,两个接受队列都可以接收到消息04;

但是为什么在同一个方法中,两个接受队列不能同时接受消息04;

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2023-07-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 安装RabbitMQ
  • 为什么要使用消息队列
  • RabbitMQ demo
  • Springboot整合RabbitMQ
    • 简单测试
      • fanout广播模式
        • direct模式
          • Topic模式
          相关产品与服务
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档