前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ实战代码

RabbitMQ实战代码

作者头像
码客说
发布2019-10-21 17:23:06
4670
发布2019-10-21 17:23:06
举报
文章被收录于专栏:码客

Maven依赖

RabbitMQ 支持多种语言访问,以 Java 为例看下一般使用 RabbitMQ 的步骤。

maven工程的pom文件中添加依赖

代码语言:javascript
复制
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.1.0</version>
</dependency>

工具类

代码语言:javascript
复制
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class ConnectionUtils {
    public static Connection getConnection() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("psvmc");
        factory.setPassword("psvmc");
        factory.setHost("mq.psvmc.cn");
        //建立到代理服务器到连接
        Connection conn = factory.newConnection();
        return conn;
    }
}

QUEUE

我这里暂且把当前的这种方式定义为队列模式

队列模式的特点

  • 先打开生产者发送消息消息不会丢失
  • 多个消费者不会收到同一个消息 由服务器去分配
  • 生产者把消息直接放在队列中 队列由生产者创建
  • 发布消息是交换机的名字填空字符串
  • RabbitMQ内置一个名称为空字符串的默认交换机,它根据Routing key将消息路由到与队列名与Routing key完全相等的队列中

消息生产者

代码语言:javascript
复制
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class MyProducer {
    public final static String QUEUE_NAME = "rabbitMQ.work1";

    public static void main(String[] args) throws Exception {
        //创建一个新的连接
        Connection connection = ConnectionUtils.getConnection();
        //创建一个通道
        Channel channel = connection.createChannel();
        //  声明一个队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        int message = 1;
        while (message < 1000) {
            //发送消息到队列中
            channel.basicPublish(
                    "",
                    QUEUE_NAME,
                    null,
                    ("" + message).getBytes("UTF-8")
            );
            System.out.println("发送消息:" + message);
            Thread.sleep(2000);
            message += 1;
        }

        //关闭通道和连接
        channel.close();
        connection.close();
    }
}

消息消费者

代码语言:javascript
复制
import com.rabbitmq.client.*;

import java.io.IOException;

public class MyCustomer {
    private final static String QUEUE_NAME = "rabbitMQ.work1";

    public static void main(String[] args) throws Exception {

        //创建一个新的连接
        Connection connection = ConnectionUtils.getConnection();
        //创建一个通道
        final Channel channel = connection.createChannel();
        //每次从队列获取的数量
        channel.basicQos(1);
        //声明要关注的队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //DefaultConsumer类实现了Consumer接口,通过传入一个频道,
        //告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("接收消息:" + message);
                long deliveryTag = envelope.getDeliveryTag();
                //确认消息
                try {
                    Thread.sleep(3000);
                    channel.basicAck(deliveryTag, false);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        };
        //自动回复队列应答 -- RabbitMQ中的消息确认机制
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }
}

注1:queueDeclare第一个参数表示队列名称、第二个参数为是否持久化(true表示是,队列将在服务器重启时生存)、第三个参数为是否是独占队列(创建者可以使用的私有队列,断开后自动删除)、第四个参数为当所有消费者客户端连接断开时是否自动删除队列、第五个参数为队列的其他参数 注2:basicPublish第一个参数为交换机名称、第二个参数为队列映射的路由key、第三个参数为消息的其他属性、第四个参数为发送信息的主体

EXCHANGE

我这里暂且把当前的这种方式定义为路由模式

路由模式的特点

  • 先打开生产者发送消息,消息会丢失
  • 多个消费者会收到同一个消息 由服务器根据规则去分配
  • 需要队列和路由进行绑定
  • 队列可以多次和路由绑定 只要routingKey不同即可
  • 交换机类型:fanout(发布订阅模式),direct(精准匹配模式), topic(通配符模式), headers(头匹配模式)

fanout(发布订阅模式)

这种模式的特点

  • routingKey 为空字符串
  • 只要订阅后都能收到消息

消息生产者

代码语言:javascript
复制
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class MyProducer {

    public final static String EXCHANGE_NAME = "myexchange";

    public static void main(String[] args) throws Exception {
        //创建一个新的连接
        Connection connection = ConnectionUtils.getConnection();
        //创建一个通道
        Channel channel = connection.createChannel();
        //声明一个交换机 发布订阅模式
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

        int message = 1;
        while (message < 1000) {
            //发送消息到队列中
            channel.basicPublish(
                    EXCHANGE_NAME,
                    "",
                    null,
                    ("" + message).getBytes("UTF-8")
            );
            System.out.println("发送消息:" + message);
            Thread.sleep(2000);
            message += 1;
        }

        //关闭通道和连接
        channel.close();
        connection.close();
    }
}

消息消费者

代码语言:javascript
复制
import com.rabbitmq.client.*;
import java.io.IOException;

public class MyCustomer {

    public final static String EXCHANGE_NAME = "myexchange";
    private final static String QUEUE_NAME = "rabbitMQ.queue1";

    public static void main(String[] args) throws Exception {
        //创建一个新的连接
        Connection connection = ConnectionUtils.getConnection();
        //创建一个通道
        final Channel channel = connection.createChannel();
        //每次从队列获取的数量
        channel.basicQos(1);
        //声明定义队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
        //DefaultConsumer类实现了Consumer接口,通过传入一个频道,
        //告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("接收消息:" + message);
                long deliveryTag = envelope.getDeliveryTag();
                //确认消息
                try {
                    Thread.sleep(3000);
                    channel.basicAck(deliveryTag, false);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        //自动回复队列应答 -- RabbitMQ中的消息确认机制
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }
}

direct(精准匹配模式)

特点

  • 根据routingKey精准匹配消息

topic(通配符模式)

特点

  • 根据routingKey模糊匹配消息
  • routingKey为aa.bb形式
  • 可以用*#进行匹配 a.*可以匹配 a.a、a.b 不能匹配a.b.c a.#既可以匹配 a.a、a.b 也能匹配a.b.c

headers(头匹配模式)

  • x-match = all :表示所有的键值对都匹配才能接受到消息
  • x-match = any :表示只要有键值对匹配就能接受到消息

消费者

代码语言:javascript
复制
//声明路由名字和类型
channel.exchangeDeclare(EXCHANGE_NAME, "headers", false, true, null);
//创建队列
channel.queueDeclare(QUEUE_NAME, false, false, true, null);
//设置消息头键值对信息
Map<String, Object> headers = new Hashtable<String,Object>();
//这里x-match有两种类型
//all:表示所有的键值对都匹配才能接受到消息
//any:表示只要有键值对匹配就能接受到消息
headers.put("x-match", "any");
headers.put("name", "jack");
headers.put("age" , 31);

//把队列绑定到路由上并指定headers
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "", headers);

生产者

代码语言:javascript
复制
//声明路由名字和类型
channel.exchangeDeclare(EXCHANGE_NAME, "headers", false, true, null);

//设置消息头键值对信息
Map<String, Object> headers = new Hashtable<String, Object>();
headers.put("name", "jack");
headers.put("age", 30);
Builder builder = new Builder();
builder.headers(headers);

channel.basicPublish(EXCHANGE_NAME, "", builder.build(), message.getBytes());

上面的例子中name的值都为jack 匹配上了一个 就能收到消息

Spring集成

这里的示例是用的QUEUE的方式

注意下面的这行配置

代码语言:javascript
复制
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="" routing-key="myqueue"/>

一定要配置的是routing-key="myqueue" 不要配成queue="myqueue" 我就是在这里折腾了好久。

下面是具体的配置:

1) 添加依赖

代码语言:javascript
复制
<dependencies>
  <dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.11</version>
    <scope>test</scope>
  </dependency>

  <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-nop</artifactId>
    <version>1.7.2</version>
  </dependency>

  <dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.1.0</version>
  </dependency>

  <dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>1.7.6.RELEASE</version>
  </dependency>

</dependencies>

2) 在resources中添加文件rabbitmq.xml 内容如下

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/rabbit
    http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd
    http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-4.3.xsd">

    <bean id="connectionFactory"
          class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
        <constructor-arg value="mq.psvmc.cn"/>
        <property name="username" value="psvmc"/>
        <property name="password" value="psvmc"/>
        <property name="port" value="5672"/>
        <property name="channelCacheSize" value="5"/>
    </bean>

    <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
    <rabbit:admin connection-factory="connectionFactory"/>

    <!--定义queue -->
    <rabbit:queue name="myqueue" auto-declare="true" durable="false" auto-delete="false" exclusive="false"/>


    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="" routing-key="myqueue"/>

    <!-- 监听生产者发送的消息开始 -->
    <!-- 消息接收者 -->
    <bean id="messageReceiver" class="cn.psvmc.spring.MessageConsumer"></bean>

    <!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象 -->
    <!-- acknowledeg = "manual":意为表示该消费者的ack方式为手动 ;acknowledge="auto"表示自动-->
    <!-- prefetch=1设置预取消息数目为1 -->
    <rabbit:listener-container
            prefetch="1"
            connection-factory="connectionFactory"
            auto-declare="true"
            acknowledge="manual">
        <rabbit:listener queue-names="myqueue" ref="messageReceiver" method="onMessage"/>
    </rabbit:listener-container>
    <!-- 监听生产者发送的消息结束 -->
</beans>

3) 消息的生产者

代码语言:javascript
复制
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class MessageProducer {
    public static void main(String[] args) throws Exception {
        ApplicationContext context = new ClassPathXmlApplicationContext("classpath:rabbitmq.xml");
        RabbitTemplate template = context.getBean(RabbitTemplate.class);
        int message = 1;
        while (message < 100) {
            //发送消息到队列中
            template.convertAndSend("" + message);
            System.out.println("+ 发送消息:" + message);
            Thread.sleep(2000);
            message += 1;
        }
        ((ClassPathXmlApplicationContext) context).destroy();
    }
}

4) 消息的消费者

代码语言:javascript
复制
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.*;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;

public class MessageConsumer implements ChannelAwareMessageListener {

    /**
     * 处理收到的rabbit消息的回调方法。
     *
     * @param message AMQP封装消息对象
     * @param channel 信道对象,可以进行确认回复
     * @throws Exception Any.
     */
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        System.out.println("- 收到消息:" + new String(message.getBody(), "UTF-8"));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019-04-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Maven依赖
  • 工具类
  • QUEUE
  • EXCHANGE
    • fanout(发布订阅模式)
      • direct(精准匹配模式)
        • topic(通配符模式)
          • headers(头匹配模式)
          • Spring集成
          相关产品与服务
          云服务器
          云服务器(Cloud Virtual Machine,CVM)提供安全可靠的弹性计算服务。 您可以实时扩展或缩减计算资源,适应变化的业务需求,并只需按实际使用的资源计费。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档