Loading [MathJax]/jax/input/TeX/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >RabbitMQ入门案例

RabbitMQ入门案例

作者头像
张小驰出没
发布于 2021-12-06 08:32:57
发布于 2021-12-06 08:32:57
44100
代码可运行
举报
运行总次数:0
代码可运行

RabbitMQ入门案例

Rabbit 模式

https://www.rabbitmq.com/getstarted.html

实现步骤

  • 构建一个 maven工程
  • 导入 rabbitmq的依赖
  • 启动 rabbitmq-server服务
  • 定义生产者
  • 定义消费者
  • 观察消息的在 rabbitmq-server服务中的进程

初步实现

前期准备

1.构建项目
2.导入依赖
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.10.0</version>
</dependency>

简单模型

在上图的模型中,有以下概念:

  1. 生产者,也就是要发送消息的程序
  2. 消费者:消息的接受者,会一直等待消息到来。
  3. 消息队列:图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。

所有的中间件技术都是基于tcp/ip协议基础之上构建新型的协议规范,只不过rabbitmq遵循的是amqp

实现步骤:

  1. 创建连接工程
  2. 创建连接 connection
  3. 通过连接获取通道 Channel
  4. 通过通道创建交换机,声明队列,绑定关系,路由key,发送消息,和接收消息
  5. 准备消息内容
  6. 发送消息给队列 queue
  7. 关闭连接
  8. 关闭通道

生产者

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class Producer {
    public static void main(String[] args) {
        //1. 创建连接工程
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //这里要使用自己的IP地址
        connectionFactory.setHost("192.168.57.129");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            //2. 创建连接 connection
            connection = connectionFactory.newConnection("生产者");
            //3. 通过连接获取通道 Channel
            channel = connection.createChannel();
            //4. 通过通道创建交换机,声明队列,绑定关系,路由key,发送消息
            String quequeName = "queuel";
            /**
             * @params1 队列的名称
             * @params2 是否要持久化 durable-false
             * @params3 排他性,是否是独占独立
             * @params4 是否自动删除,随着最后一个消费者消息完毕以后是否把队列自动删除
             * @params5 携带的附属参数
             */
            channel.queueDeclare(quequeName,false,false,false,null);
            //5. 准备消息内容
            String message = "Hello,Consumer";
            //6. 发送消息给队列 queue
            channel.basicPublish("",quequeName,null,message.getBytes());
            System.out.println("消息发送成功");
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            //7. 关闭连接
            if (channel != null && channel.isOpen()){
                try {
                    channel.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            //8. 关闭通道
            if (connection != null && connection.isOpen()){
                try {
                    connection.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

消费者

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public class Consumer {

    public static void main(String[] args) {
        //1. 创建连接工程
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //这里要使用自己的IP地址
        connectionFactory.setHost("192.168.57.129");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;
        try {
            //2. 创建连接 connection
            connection = connectionFactory.newConnection("消费者");
            //3. 通过连接获取通道 Channel
            channel = connection.createChannel();
            //4. 通过通道创建交换机,声明队列,绑定关系,路由key,发送消息,和接收消息
            String quequeName = "queue1";
            channel.queueDeclare(quequeName,false,false,false,null);
            //5.监听消息
            DefaultConsumer consumer = new DefaultConsumer(channel){
                /*
                consumerTag:消息者标签,channel.basicConsume可以指定
                envelope:消息包内容,可从中获取消息id,消息routing key,交换机,消息和重装标记(收到消息失败后是否需要重新发送)
                properties:消息属性
                body;消息
                */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //路由key
                    System.out.println("路由key为:"+ envelope.getRoutingKey());
                    //交换机
                    System.out.println("交换机为:"+ envelope.getExchange());
                    //消息id
                    System.out.println("消息id为:"+ envelope.getDeliveryTag());
                    //收到的消息
                    System.out.println("接收到的消息:"+ new String(body,"UTF-8"));
                    System.out.println("");
                    System.out.println("======================================================");
                    System.out.println("");
                }
            };
            channel.basicConsume("queue1", true, consumer);
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            //6. 不关闭资源,一直监听
        }
    }
}

AMQP

概念介绍

AMQP 一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。 AMQP是一个二进制协议,拥有一些现代化特点:多信道协商式异步安全扩平台中立高效RabbitMQ 是 AMQP协议 的 Erlang的实现。

概念

说明

连接 Connection

一个网络连接,例如:TCP/IP套接字连接。

会话 Session

端点之间的命名对话。在一个会话上下文中,保证“恰好传递一次”。

信道 Channel

多路复用连接中的一条独立的双向数据流通道。为会话提供物理传输介质。

客户端 Client

AMQP连接或者会话的发起者。AMQP是非对称的,客户端生产和消费消息,服务器存储和路由这些消息。

服务节点Broker

消息中间件的服务节点。一般情况下可以将一个RabbitMQ Broker看作一台RabbitMQ 服务器。

端点

AMQP对话的任意一方。一个AMQP连接包括两个端点(一个是客户端,一个是服务器)。

消费者 Consumer

一个从消息队列里请求消息的客户端程序。

生产者 Producer

一个向交换机发布消息的客户端应用程序。

RabbitMQ运转流程

入门案例 为例

生产者发送消息
  1. 生产者创建连接(Connection),开启一个信道(Channel),连接到RabbitMQ Broker;
  2. 声明队列、设置属性;如是否排它,是否持久化,是否自动删除;
  3. 将路由键(空字符串)与队列绑定起来;
  4. 发送消息至RabbitMQ Broker;
  5. 关闭信道;
  6. 关闭连接;
消费者接收消息
  1. 消费者创建连接(Connection),开启一个信道(Channel),连接到RabbitMQ Broker
  2. 向Broker 请求消费相应队列中的消息,设置相应的回调函数;
  3. 等待Broker回应闭关投递响应队列中的消息,消费者接收消息;
  4. 确认(ack,自动确认)接收到的消息;
  5. RabbitMQ从队列中删除相应已经被确认的消息;
  6. 关闭信道;
  7. 关闭连接;

生产者流转过程解析

  1. 客户端与代理服务器Broker建立连接。调用 newConnection() 方法 , 会进一步封装 Protocol Header 0-9-1 的报文头发送给 Broker ,以此通知Broker 本次交互采用的是 AMQP 0-9-1 协议,紧接着 Broker 返回 Connection.Start 来建立连接,在连接的过程中涉及 Connection.Start/.Start-OKConnection.Tune/.Tune-OkConnection.Open/ .Open-Ok 这6 个命令的交互。
  2. 客户端调用 connection.createChannel 方法。此方法开启信道,其包装的 channel.open 命令发送给 Broker , 等待 channel.basicPublish 方法,对应的AMQP命令为 Basic.Publish , 这个命令包含了content Headercontent Body() 。content Header 包含了消息体的属性,例如:投递模式,优先级等,content Body 包含了消息体本身。
  3. 客户端发送完消息需要关闭资源时,涉及到Channel.Close和Channl.Close-OkConnetion.Close和Connection.Close-Ok的命令交互。

消费者流转过程解析

  1. 消费者客户端与代理服务器Broker建立连接。会调用 newConnection() 方法,这个方法会进一步封装 Protocol Header 0-9-1 的报文头发送给Broker ,以此通知Broker 本次交互采用的是 AMQP 0-9-1 协议,紧接着Broker 返回Connection.Start 来建立连接,在连接的过程中涉及Connection.Start/.Start-OK 、Connection.Tune/.Tune-Ok ,Connection.Open/ .Open-Ok 这6 个命令的交互。
  2. 消费者客户端调用connection.createChannel方法。和生产者客户端一样,协议涉及Channel . Open/Open-Ok命令。
  3. 在真正消费之前,消费者客户端需要向Broker 发送Basic.Consume 命令(即调用channel.basicConsume 方法〉将Channel 置为接收模式,之后Broker 回执 Basic . Consume - Ok 以告诉消费者客户端准备好消费消息。
  4. Broker 向消费者客户端推送(Push) 消息,即 Basic.Deliver 命令,这个命令和 Basic.Publish 命令一样会携带 Content Header 和Content Body。
  5. 消费者接收到消息并正确消费之后,向Broker 发送确认,即 Basic.Ack 命令。
  6. 客户端发送完消息需要关闭资源时,涉及到 Channel.Close和Channl.Close-Ok 与Connetion.Close和Connection.Close-Ok 的命令交互。

个人博客为: MoYu’s HomePage

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021/08/13 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
RabbitMQ发布订阅模式
在开发过程中,有一些消息需要不同消费者进行不同的处理,如电商网站的同一条促销信息需要短信发送、邮件发送、站内信发送等。此时可以使用发布订阅模式(Publish/Subscribe)
会洗碗的CV工程师
2024/04/27
2640
RabbitMQ发布订阅模式
学习RabbitMQ这篇就够了快速入门上手(超详细)
而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。
陶然同学
2023/02/27
1.1K0
学习RabbitMQ这篇就够了快速入门上手(超详细)
RabbitMQ通配符模式
RabbitMQ通配符模式,也被称为主题模式(Topic Pattern),是一种消息传递模式,它允许消息生产者将消息发送到一个交换机(exchange),并使用通配符形式的路由键来描述消息的特性。消费者则可以使用通配符匹配来订阅感兴趣的消息。
会洗碗的CV工程师
2024/04/29
3960
RabbitMQ通配符模式
RabbitMQ简单模式
RabbitMQ共有六种工作模式:简单模式(Simple)、工作队列模式(Work Queue)、发布订阅模(Publish/Subscribe)、路由模式(Routing)、通配符模式(Topics)、远程调用模式(RPC,不常用,课程不对此模式进行讲解)
会洗碗的CV工程师
2024/04/25
1430
RabbitMQ简单模式
【云原生进阶之PaaS中间件】第四章RabbitMQ-2-AMQP协议
AMQP,即Advanced Message Queuing Protocol(高级消息队列协议),一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件传递消息,不受客户端/中间件不同产品、不同开发语言等条件的限制。该协议是一种二进制协议,提供客户端应用于消息中间件之间异步、安全、高效的交互。相对于我们常见的REST API,AMQP更容易实现,可以降低开销,同时灵活性高,可以轻松的添加负载平衡和高可用性的功能,并保证消息传递,在性能上AMQP协议也相对更好一些。
江中散人_Jun
2024/02/19
4120
【云原生进阶之PaaS中间件】第四章RabbitMQ-2-AMQP协议
RabbitMQ详解解答【面试+工作】
如果安装rabbitMQ首先安装基于erlang语言支持的OTP软件,然后在下载rabbitMQ软件进行安装(安装过程都是下一步,在此不在说了)
Java帮帮
2018/09/29
1.5K0
RabbitMQ详解解答【面试+工作】
RabbitMQ初识以及简单模式初步
消息队列这种技术主要用在分布式设计当中,其实可以说是一种设计模式。是相对同步系统而言的。同步系统是什么呢?
兰舟千帆
2022/07/18
2290
RabbitMQ初识以及简单模式初步
RabbitMQ路由模式
RabbitMQ的路由模式是一种消息传递模式,它允许消息生产者将消息发送到一个或多个特定的消息队列。在路由模式中,消息生产者将消息标记为具有特定的路由键,然后消息代理(RabbitMQ)将根据路由键将消息路由到与之匹配的队列。
会洗碗的CV工程师
2024/04/28
1930
RabbitMQ路由模式
RabbitMQ 简介
RabbitMQ是一个在AMQP(Advanced Message Queuing Protocol )基础上实现的,可复用的企业消息系统。它可以用于大型软件系统各个模块之间的高效通信,支持高并发,支持可扩展。
橘子君丶
2023/03/20
2870
RabbitMQ 简介
Rabbitmq基础
通过 BasicQos 方法设置prefetchCount = 1。这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理1个Message。换句话说,在接收到该Consumer的ack前,他它不会将新的Message分发给它。相反,它会将其分派给不是仍然忙碌的下一个Consumer。 prefetchCount在手动ack的情况下才生效,自动ack不生效。
冬天vs不冷
2025/01/20
610
Rabbitmq基础
快速入门RabbitMQ核心概念
我们知道RabbitMQ是基于Erlang编写的,所以在安装RabbitMQ之前需要确保安装了Erlang环境。RabbitMQ与Erlang是有版本对应关系的,可以参考官方列举的版本对应关系:
端碗吹水
2020/11/24
5210
RabbitMQ系列(一)Hello World
消息中间件:简单的说就是用来传输消息的中间载体,就是将你的信息发送到接受方,它并不关心发送的数据是什么。RabbitMQ就是一个消息中间件。
Jensen_97
2023/07/20
1500
RabbitMQ系列(一)Hello World
026. RabbitMQ 入门及消息分发机制
1. RabbitMQ 简介 ---- RabbitMQ 是一个开源的 AMQP 实现,服务器端用 Erlang 语言编写,支持多种客户端。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。 2. RabbitMQ 安装运行 ---- 1. 安装依赖环境 2. 安装 RabbitMQ 3. 启动和关闭 4. 开启 Web 管理插件 见后面的笔记。 5. 设置防火墙 3. RabbitMQ 基本配置 ---- RabbitMQ 端口 RabbitMQ 会绑定一些端口,安装完后,需要将这
山海散人
2021/03/03
6030
026. RabbitMQ 入门及消息分发机制
RabbitMQ 学习(二)---- HelloWorld 简单模型
之前我们使用rabbitMq 网页客户端 开放了 15672 的端口,要想是的 java客户端访问服务器成功,需要开放 5672 的端口号。在服务器安全组设置
RAIN7
2022/09/29
2950
RabbitMQ 学习(二)---- HelloWorld 简单模型
RabbitMQ之入门案例
​ RabbitMQ 是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑 RabbitMQ 是 一个快递站,一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收,存储和转发消息数据
shaoshaossm
2022/12/27
3290
RabbitMQ之入门案例
什么是RabbitMQ?它的主要功能是什么?
RabbitMQ是一个开源的消息代理软件,它实现了高级消息队列协议(AMQP),用于在分布式系统之间进行可靠的异步通信。它可以在不同的应用程序、服务和系统之间传递消息,并确保消息的可靠性和顺序性。
GeekLiHua
2025/01/21
890
快速学习-RabbitMQ快速入门
RabbitMQ由Erlang语言开发,Erlang语言用于并发及分布式系统的开发,在电信领域应用广泛,OTP(Open Telecom Platform)作为Erlang语言的一部分,包含了很多基于Erlang开发的中间件及工具库,安装RabbitMQ需 要安装Erlang/OTP,并保持版本匹配,如下图:
cwl_java
2020/01/14
1.1K0
快速学习-RabbitMQ快速入门
RabbitMQ实战代码
RabbitMQ 支持多种语言访问,以 Java 为例看下一般使用 RabbitMQ 的步骤。
码客说
2019/10/21
4800
RabbitMQ 的第一个程序
Work queues,也被称为(Task queues),任务模型。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。
后端码匠
2021/02/26
4070
RabbitMQ 的第一个程序
RabbitMQ系列2 RabbitMQ安装与基础入门
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。
一只胡说八道的猴子
2021/03/04
4720
RabbitMQ系列2  RabbitMQ安装与基础入门
相关推荐
RabbitMQ发布订阅模式
更多 >
领券
社区富文本编辑器全新改版!诚邀体验~
全新交互,全新视觉,新增快捷键、悬浮工具栏、高亮块等功能并同时优化现有功能,全面提升创作效率和体验
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
查看详情【社区公告】 技术创作特训营有奖征文
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验