RabbitMQ详解解答【面试+工作】

RabbitMQ详解解答【面试+工作】

RabbitMQ安装与配置

在Windows下进行rabbitMQ的安装

第一步:软件安装

如果安装rabbitMQ首先安装基于erlang语言支持的OTP软件,然后在下载rabbitMQ软件进行安装(安装过程都是下一步,在此不在说了)

第二步:环境变量配置

如果上面完成安装以后要进行环境变量的配置,首先配置ERLANG_HOME如下图(变量值就是你按照otp软件的路径)

然后在配置RABBITMQ_SERVER如下图(变量值是rabbitMQ的安装路径)

最后进行path的配置如下图(path的值为;%ERLANG_HOME%\bin;%RABBITMQ_SERVER%\sbin;注意是追加)

第三步:启动监控管理器

找到你安装rabbitMQ的路径,然后切换到sbin的文件夹

输入rabbitmq-plugins enable rabbitmq_management命令来启动监控管理器

然后在浏览器输入http:localhost:15672 用户名和密码默认都为guest。

这样一来我们进安装好了。

第四步:rabbitMQ常用的命令

启动监控管理器:rabbitmq-plugins enable rabbitmq_management

关闭监控管理器:rabbitmq-plugins disable rabbitmq_management

启动rabbitmq:rabbitmq-service start

关闭rabbitmq:rabbitmq-service stop

查看所有的队列:rabbitmqctl list_queues

清除所有的队列:rabbitmqctl reset

关闭应用:rabbitmqctl stop_app

启动应用:rabbitmqctl start_app

用户和权限设置(后面用处)

添加用户:rabbitmqctl add_user username password

分配角色:rabbitmqctl set_user_tags username administrator

新增虚拟主机:rabbitmqctl add_vhost vhost_name

将新虚拟主机授权给新用户:rabbitmqctl set_permissions -p vhost_name username \'.*\' \'.*\' \'.*\'

角色说明

none 最小权限角色

management 管理员角色

policymaker 决策者

monitoring 监控

administrator 超级管理员


Java简单实现RabbitMQ

前言:在这里我将用java来简单的实现rabbitMQ。下面我们带着下面问题来一步步的了解和学习rabbitMQ。

1:如果消费者连接中断,这期间我们应该怎么办

2:如何做到负载均衡

3:如何有效的将数据发送到相关的接收者?就是怎么样过滤

4:如何保证消费者收到完整正确的数据

5:如何让优先级高的接收者先收到数据

一:"Hello RabbitMQ"

下面有一幅图,其中P表示生产者,C表示消费者,红色部分为消息队列

二:项目开始

2.1:首先引入rabbitMQ jar包

 <dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>3.6.5</version>
 </dependency>

2.2:创建消费者Producer

/**

* 消息生成者

*/

public class Producer {

public final static String QUEUE_NAME="rabbitMQ.test";

public static void main(String[] args) throws IOException, TimeoutException {

//创建连接工厂

ConnectionFactory factory = new ConnectionFactory();

//设置RabbitMQ相关信息

factory.setHost("localhost");

//factory.setUsername("lp");

//factory.setPassword("");

// factory.setPort(2088);

//创建一个新的连接

Connection connection = factory.newConnection();

//创建一个通道

Channel channel = connection.createChannel();

// 声明一个队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null);

String message = "Hello RabbitMQ";

//发送消息到队列中

channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));

System.out.println("Producer Send +\'" + message + "\'");

//关闭通道和连接

channel.close();

connection.close();

}

}

注1:queueDeclare第一个参数表示队列名称、第二个参数为是否持久化(true表示是,队列将在服务器重启时生存)、第三个参数为是否是独占队列(创建者可以使用的私有队列,断开后自动删除)、第四个参数为当所有消费者客户端连接断开时是否自动删除队列、第五个参数为队列的其他参数

注2:basicPublish第一个参数为交换机名称、第二个参数为队列映射的路由key、第三个参数为消息的其他属性、第四个参数为发送信息的主体

2.3:创建消费者

public class Customer {

private final static String QUEUE_NAME = "rabbitMQ.test";

public static void main(String[] args) throws IOException, TimeoutException {

// 创建连接工厂

ConnectionFactory factory = new ConnectionFactory();

//设置RabbitMQ地址

factory.setHost("localhost");

//创建一个新的连接

Connection connection = factory.newConnection();

//创建一个通道

Channel channel = connection.createChannel();

//声明要关注的队列

channel.queueDeclare(QUEUE_NAME, false, false, true, null);

System.out.println("Customer Waiting Received messages");

//DefaultConsumer类实现了Consumer接口,通过传入一个频道,

// 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery

Consumer consumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body)throws IOException {

String message = new String(body, "UTF-8");

System.out.println("Customer Received \'" + message + "\'");

}

};

//自动回复队列应答 -- RabbitMQ中的消息确认机制

channel.basicConsume(QUEUE_NAME, true, consumer);

}

前面代码我们可以看出和生成者一样的,后面的是获取生产者发送的信息,其中envelope主要存放生产者相关信息(比如交换机、路由key等)body是消息实体。

2.4:运行结果

生产者:

消费者:

三:实现任务分发

工作队列

一个队列的优点就是很容易处理并行化的工作能力,但是如果我们积累了大量的工作,我们就需要更多的工作者来处理,这里就要采用分布机制了。

我们新创建一个生产者NewTask

public class NewTask {

private static final String TASK_QUEUE_NAME="task_queue";

public static void main(String[] args) throws IOException, TimeoutException {

ConnectionFactory factory=new ConnectionFactory();

factory.setHost("localhost");

Connection connection=factory.newConnection();

Channel channel=connection.createChannel();

channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);

//分发信息

for (int i=0;i<10;i++){

String message="Hello RabbitMQ"+i;

channel.basicPublish("",TASK_QUEUE_NAME,

MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());

System.out.println("NewTask send \'"+message+"\'");

}

channel.close();

connection.close();

}

}

然后创建2个工作者Work1和Work2代码一样

public class Work1 {

private static final String TASK_QUEUE_NAME = "task_queue";

public static void main(String[] args) throws IOException, TimeoutException {

final ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

Connection connection = factory.newConnection();

final Channel channel = connection.createChannel();

channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

System.out.println("Worker1 Waiting for messages");

//每次从队列获取的数量

channel.basicQos(1);

final Consumer consumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag,

Envelope envelope,

AMQP.BasicProperties properties,

byte[] body) throws IOException {

String message = new String(body, "UTF-8");

System.out.println("Worker1 Received \'" + message + "\'");

try {

throw new Exception();

//doWork(message);

}catch (Exception e){

channel.abort();

}finally {

System.out.println("Worker1 Done");

channel.basicAck(envelope.getDeliveryTag(),false);

}

}

};

boolean autoAck=false;

//消息消费完成确认

channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

}

private static void doWork(String task) {

try {

Thread.sleep(1000); // 暂停1秒钟

} catch (InterruptedException _ignored) {

Thread.currentThread().interrupt();

}

}

}

注:channel.basicQos(1);保证一次只分发一个 。autoAck是否自动回复,如果为true的话,每次生产者只要发送信息就会从内存中删除,那么如果消费者程序异常退出,那么就无法获取数据,我们当然是不希望出现这样的情况,所以才去手动回复,每当消费者收到并处理信息然后在通知生成者。最后从队列中删除这条信息。如果消费者异常退出,如果还有其他消费者,那么就会把队列中的消息发送给其他消费者,如果没有,等消费者启动时候再次发送。


RabbitMQ采用不同的交互机制

在上一篇我们都是采用发送信息到队列然后队列把信息在发送到消费者,其实实际情况并非如此,rabbitMQ其实真正的思想是生产者不发送任何信息到队列,甚至不知道信息将发送到哪个队列。相反生产者只能发送信息到交换机,交换机接收到生产者的信息,然后按照规则把它推送到对列中,交换机是如何做处理他接收到的信息,并怎么样发送到特定的队列,那么这一篇主要是讲解交换机的规则。

一:发布/订阅

在上一篇说到的队列都指定了名称,但是现在我们不需要这么做,我们需要所有的日志信息,而不只是其中的一个。如果要做这样的队列,我们需要2件事,一个就是获取一个新的空的队列,这样我就需要创建一个随机名称的队列,最好让服务器帮我们做出选择,第一个就是我们断开用户的队列,应该自动进行删除。ok下面是一副工作图。

信息发送端代码

public class EmitLog {

private static final String EXCHANGE_NAME = "logs";

public static void main(String[] args) throws IOException, TimeoutException {

ConnectionFactory factory=new ConnectionFactory();

factory.setHost("localhost");

Connection connection=factory.newConnection();

Channel channel=connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME,"fanout");//fanout表示分发,所有的消费者得到同样的队列信息

//分发信息

for (int i=0;i<5;i++){

String message="Hello World"+i;

channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());

System.out.println("EmitLog Sent \'" + message + "\'");

}

channel.close();

connection.close();

}

消费者代码

public class ReceiveLogs1 {

private static final String EXCHANGE_NAME = "logs";

public static void main(String[] args) throws IOException, TimeoutException {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

//产生一个随机的队列名称

String queueName = channel.queueDeclare().getQueue();

channel.queueBind(queueName, EXCHANGE_NAME, "");//对队列进行绑定

System.out.println("ReceiveLogs1 Waiting for messages");

Consumer consumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

String message = new String(body, "UTF-8");

System.out.println("ReceiveLogs1 Received \'" + message + "\'");

}

};

channel.basicConsume(queueName, true, consumer);//队列会自动删除

}

}

上面就完成了一个发布/订阅模式的消息队列 看看结果

二:Routing

上面我用采用了广播的模式进行消息的发送,现在我们采用路由的方式对不同的消息进行过滤

发送端代码

public class RoutingSendDirect {

private static final String EXCHANGE_NAME = "direct_logs";

// 路由关键字

private static final String[] routingKeys = new String[]{"info" ,"warning", "error"};

public static void main(String[] args) throws IOException, TimeoutException {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

//声明交换机

channel.exchangeDeclare(EXCHANGE_NAME,"direct");//注意是direct

//发送信息

for (String routingKey:routingKeys){

String message = "RoutingSendDirect Send the message level:" + routingKey;

channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes());

System.out.println("RoutingSendDirect Send"+routingKey +"\':\'" + message);

}

channel.close();

connection.close();

}

}

ReceiveLogsDirect1 消费者代码

public class ReceiveLogsDirect1 {

// 交换器名称

private static final String EXCHANGE_NAME = "direct_logs";

// 路由关键字

private static final String[] routingKeys = new String[]{"info" ,"warning"};

public static void main(String[] args) throws IOException, TimeoutException {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

//声明交换器

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

//获取匿名队列名称

String queueName=channel.queueDeclare().getQueue();

//根据路由关键字进行绑定

for (String routingKey:routingKeys){

channel.queueBind(queueName,EXCHANGE_NAME,routingKey);

System.out.println("ReceiveLogsDirect1 exchange:"+EXCHANGE_NAME+"," +

" queue:"+queueName+", BindRoutingKey:" + routingKey);

}

System.out.println("ReceiveLogsDirect1 Waiting for messages");

Consumer consumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

String message = new String(body, "UTF-8");

System.out.println("ReceiveLogsDirect1 Received \'" + envelope.getRoutingKey() + "\':\'" + message + "\'");

}

};

channel.basicConsume(queueName, true, consumer);

}

ReceiveLogsDirect2消费者代码

public class ReceiveLogsDirect2 {

// 交换器名称

private static final String EXCHANGE_NAME = "direct_logs";

// 路由关键字

private static final String[] routingKeys = new String[]{"error"};

public static void main(String[] argv) throws Exception {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

//声明交换器

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

//获取匿名队列名称

String queueName = channel.queueDeclare().getQueue();

//根据路由关键字进行多重绑定

for (String severity : routingKeys) {

channel.queueBind(queueName, EXCHANGE_NAME, severity);

System.out.println("ReceiveLogsDirect2 exchange:"+EXCHANGE_NAME+", queue:"+queueName+", BindRoutingKey:" + severity);

}

System.out.println("ReceiveLogsDirect2 Waiting for messages");

Consumer consumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException {

String message = new String(body, "UTF-8");

System.out.println("ReceiveLogsDirect2 Received \'" + envelope.getRoutingKey() + "\':\'" + message + "\'");

}

};

channel.basicConsume(queueName, true, consumer);

}

}

上面代码可以看出这里是通过路由来找个这个对列的。我们看下结果

三:Topics

这种应该属于模糊匹配

* :可以替代一个词

#:可以替代0或者更多的词

现在我们继续看看代码来理解

发送端

public class TopicSend {

private static final String EXCHANGE_NAME = "topic_logs";

public static void main(String[] args) throws IOException, TimeoutException {

Connection connection = null;

Channel channel = null;

try{

ConnectionFactory factory=new ConnectionFactory();

factory.setHost("localhost");

connection=factory.newConnection();

channel=connection.createChannel();

//声明一个匹配模式的交换机

channel.exchangeDeclare(EXCHANGE_NAME,"topic");

//待发送的消息

String[] routingKeys=new String[]{

"quick.orange.rabbit",

"lazy.orange.elephant",

"quick.orange.fox",

"lazy.brown.fox",

"quick.brown.fox",

"quick.orange.male.rabbit",

"lazy.orange.male.rabbit"

};

//发送消息

for(String severity :routingKeys){

String message = "From "+severity+" routingKey\' s message!";

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

System.out.println("TopicSend Sent \'" + severity + "\':\'" + message + "\'");

}

}catch (Exception e){

e.printStackTrace();

if (connection!=null){

channel.close();

connection.close();

}

}finally {

if (connection!=null){

channel.close();

connection.close();

}

}

}

}

消费者1:

public class ReceiveLogsTopic1 {

private static final String EXCHANGE_NAME = "topic_logs";

public static void main(String[] args) throws IOException, TimeoutException {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

//声明一个匹配模式的交换机

channel.exchangeDeclare(EXCHANGE_NAME, "topic");

String queueName = channel.queueDeclare().getQueue();

//路由关键字

String[] routingKeys = new String[]{"*.orange.*"};

//绑定路由

for (String routingKey : routingKeys) {

channel.queueBind(queueName, EXCHANGE_NAME, routingKey);

System.out.println("ReceiveLogsTopic1 exchange:" + EXCHANGE_NAME + ", queue:" + queueName + ", BindRoutingKey:" + routingKey);

}

System.out.println("ReceiveLogsTopic1 Waiting for messages");

Consumer consumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

String message = new String(body, "UTF-8");

System.out.println("ReceiveLogsTopic1 Received \'" + envelope.getRoutingKey() + "\':\'" + message + "\'");

}

};

channel.basicConsume(queueName, true, consumer);

}

}

消费者2:

ublic class ReceiveLogsTopic2 {

private static final String EXCHANGE_NAME = "topic_logs";

public static void main(String[] argv) throws IOException, TimeoutException {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

// 声明一个匹配模式的交换器

channel.exchangeDeclare(EXCHANGE_NAME, "topic");

String queueName = channel.queueDeclare().getQueue();

// 路由关键字

String[] routingKeys = new String[]{"*.*.rabbit", "lazy.#"};

// 绑定路由关键字

for (String bindingKey : routingKeys) {

channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);

System.out.println("ReceiveLogsTopic2 exchange:"+EXCHANGE_NAME+", queue:"+queueName+", BindRoutingKey:" + bindingKey);

}

System.out.println("ReceiveLogsTopic2 Waiting for messages");

Consumer consumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException {

String message = new String(body, "UTF-8");

System.out.println("ReceiveLogsTopic2 Received \'" + envelope.getRoutingKey() + "\':\'" + message + "\'");

}

};

channel.basicConsume(queueName, true, consumer);

}

}

运行后结果


RabbitMQ远程调用

前言:前面我们讲解的都是本地服务器,现在如果需要远程计算机上运行一个函数,等待结果。这就是一个不同的故事了,这种模式通常被称为远程过程调用或者RPC。

本章教程我们使用RabbitMQ搭建一个RPC系统,一个客户端和一个可扩展的RPC服务器,现在我们开始吧。

Callback queue

一般做rpc在RabbitMQ是比较容易的,一个客户端发送一个请求信息和一个响应信息的服务器回复,为了得到一个响应,我们需要发送一个回调队列地址请求。如下

Message属性:

AMQP协议一共预定义了14个属性,但是大多数属性很少使用,下面几个可能用的比较多

deliveryMode:有2个值,一个是持久,另一个表示短暂(第二篇说过)

contentType:内容类型:用来描述编码的MIME类型。例如,经常使用JSON编码是将此属性设置为一个很好的做法:application/json。

replyTo:经常使用的是回调队列的名字

correlationid:RPC响应请求的相关应用

Correlation Id

在队列上接收到一个响应,但它并不清楚响应属于哪一个,当我们使用CorrelationId属性的时候,我们就可以将它设置为每个请求的唯一值,稍后当我们在回调队列中接收消息的时候,我们会看到这个属性,如果我们看到一个未知的CorrelationId,我们就可以安全地忽略信息-它不属于我们的请求。为什么我们应该忽略未知的消息在回调队列中,而不是失败的错误?这是由于服务器端的一个竞争条件的可能性。比如还未发送了一个确认信息给请求,但是此时RPC服务器挂了。如果这种情况发生,将再次重启RPC服务器处理请求。这就是为什么在客户端必须处理重复的反应。

需求

我们的rpc工作方式如下:

1:当客户端启动时,它创建一个匿名的独占回调队列。

2:对于rpc请求,客户端发送2个属性,一个是replyTo设置回调队列,另一是correlationId为每个队列设置唯一值

3:请求被发送到一个rpc_queue队列中

4:rpc服务器是等待队列的请求,当收到一个请求的时候,他就把消息返回的结果返回给客户端,使请求结束。

5:客户端等待回调队列上的数据,当消息出现的时候,他检查correlationId,如果它和从请求返回的值匹配,就进行响应。

编码

RPCServer.Java

public class RPCServer {

private static final String RPC_QUEUE_NAME = "rpc_queue";

private static int fib(int n) {

if (n == 0) {

return 0;

}

if (n == 1) {

return 1;

}

return fib(n - 1) + fib(n - 1);

}

public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);

channel.basicQos(1);

QueueingConsumer consumer = new QueueingConsumer(channel);

channel.basicConsume(RPC_QUEUE_NAME, false, consumer);

System.out.println("RPCServer Awating RPC request");

while (true) {

QueueingConsumer.Delivery delivery = consumer.nextDelivery();

BasicProperties props = delivery.getProperties();

BasicProperties replyProps = new AMQP.BasicProperties.Builder().

correlationId(props.getCorrelationId()).build();

String message = new String(delivery.getBody(), "UTF-8");

int n = Integer.parseInt(message);

System.out.println("RPCServer fib(" + message + ")");

String response = "" + fib(n);

channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes());

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

}

}

}

服务器代码比较简单

1:建立连接,通道,队列

2:我们可能运行多个服务器进程,为了分散负载服务器压力,我们设置channel.basicQos(1);

3:我们用basicconsume访问队列。然后进入循环,在其中我们等待请求消息并处理消息然后发送响应。

RPCClient.java

public class RPCClient {

private Connection connection;

private Channel channel;

private String requestQueueName = "rpc_queue";

private String replyQueueName;

private QueueingConsumer consumer;

public RPCClient() throws IOException, TimeoutException {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

connection = factory.newConnection();

channel = connection.createChannel();

replyQueueName = channel.queueDeclare().getQueue();

consumer = new QueueingConsumer(channel);

channel.basicConsume(replyQueueName, true, consumer);

}

public String call(String message) throws IOException, InterruptedException {

String response;

String corrID = UUID.randomUUID().toString();

AMQP.BasicProperties props = new AMQP.BasicProperties().builder()

.correlationId(corrID).replyTo(replyQueueName).build();

channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));

while (true) {

QueueingConsumer.Delivery delivery = consumer.nextDelivery();

if (delivery.getProperties().getCorrelationId().equals(corrID)) {

response = new String(delivery.getBody(), "UTF-8");

break;

}

}

return response;

}

public void close() throws Exception {

connection.close();

}

public static void main(String[] args) throws Exception {

RPCClient rpcClient = null;

String response;

try {

rpcClient = new RPCClient();

System.out.println("RPCClient Requesting fib(20)");

response = rpcClient.call("20");

System.out.println("RPCClient Got \'" + response + "\'");

} catch (Exception e) {

e.printStackTrace();

} finally {

if (rpcClient != null) {

rpcClient.close();

}

}

}

}

客户端代码解读

1:建立一个连接和通道,并声明了一个唯一的“回调”队列的答复

2:我们订阅回调队列,这样就可以得到RPC的响应

3:定义一个call方法用于发送当前的回调请求

4:生成一个唯一的correlationid,然后通过while循环来捕获合适的回应

5:我们请求信息,发送2个属性,replyTo 和correlationId

6:然后就是等待直到有合适的回应到达

7:while循环是做一个非常简单的工作,对于每一个响应消息,它检查是否有correlationid然后进行匹配。然后是就进行响应。

8:最后把响应返回到客户端。


spring集成RabbitMQ

前面几篇讲解了如何使用rabbitMq,这一篇主要讲解spring集成rabbitmq。

首先引入配置文件org.springframework.amqp,如下

  <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
        <version>1.6.0.RELEASE</version>
  </dependency>

一:配置消费者和生成者公共部分

<rabbit:connection-factory id="connectionFactory" host="${rabbit.hosts}"

port="${rabbit.port}" username="${rabbit.username}" password="${rabbit.password}" virtual-host="${rabbit.virtualHost}"

channel-cache-size="50"/>

<rabbit:admin connection-factory="connectionFactory"/>

<!--定义消息队列-->

<rabbit:queue name="spittle.alert.queue.1" durable="true" auto-delete="false"/>

<rabbit:queue name="spittle.alert.queue.2" durable="true" auto-delete="false"/>

<rabbit:queue name="spittle.alert.queue.3" durable="true" auto-delete="false"/>

<!--绑定队列-->

<rabbit:fanout-exchange id="spittle.fanout" name="spittle.fanout" durable="true">

<rabbit:bindings>

<rabbit:binding queue="spittle.alert.queue.1"></rabbit:binding>

<rabbit:binding queue="spittle.alert.queue.2"></rabbit:binding>

<rabbit:binding queue="spittle.alert.queue.3"></rabbit:binding>

</rabbit:bindings>

</rabbit:fanout-exchange>

二:配置生成者

<import resource="amqp-share.xml"/>

<!--创建消息队列模板-->

<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"

exchange="spittle.fanout" message-converter="jsonMessageConverter">

</rabbit:template>

<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.JsonMessageConverter"></bean>

三:生产者程序

public class Spittle implements Serializable {

private Long id;

private Spitter spitter;

private String message;

private Date postedTime;

public Spittle(Long id, Spitter spitter, String message, Date postedTime) {

this.id = id;

this.spitter = spitter;

this.message = message;

this.postedTime = postedTime;

}

public Long getId() {

return this.id;

}

public String getMessage() {

return this.message;

}

public Date getPostedTime() {

return this.postedTime;

}

public Spitter getSpitter() {

return this.spitter;

}

}

public class ProducerMain {

public static void main(String[] args) throws Exception {

ApplicationContext context = new ClassPathXmlApplicationContext("amqp/amqp-producer.xml");

AmqpTemplate template = (AmqpTemplate) context.getBean("rabbitTemplate");

for (int i = 0; i < 20; i++) {

System.out.println("Sending message #" + i);

Spittle spittle = new Spittle((long) i, null, "Hello world (" + i + ")", new Date());

template.convertAndSend(spittle);

Thread.sleep(5000);

}

System.out.println("Done!");

}

}

其中convertAndSend方法默认第一个参数是交换机名称,第二个参数是路由名称,第三个才是我们发送的数据,现在我们启动程序,效果如下

第四个:消费者程序

首先编写一个用于监听生产者发送信息的代码

/**

* Created by Administrator on 2016/11/18.

*/

public class SpittleAlertHandler implements MessageListener {

@Override

public void onMessage(Message message) {

try {

String body=new String(message.getBody(),"UTF-8");

System.out.println(body);

} catch (UnsupportedEncodingException e) {

e.printStackTrace();

}

}

}

一定要注意实现MessageListener,我们只需要获取message的body即可,通过json来转换我们需要的程序(比如我们可以发送一个map,map存放方法和实体,这样我们可以通过反射来调用不同的程序来运行)。

下面我们配置消费者

<import resource="amqp-share.xml"/>

<rabbit:listener-container connection-factory="connectionFactory">

<rabbit:listener ref="spittleListener" method="onMessage" queues="spittle.alert.queue.1,spittle.alert.queue.3,spittle.alert.queue.2"/>

</rabbit:listener-container>

<bean id="spittleListener" class="com.lp.summary.rabbitmq.impl.SpittleAlertHandler"/>

其中spittleListener是监听的程序,method是执行的方法,queues是我们监听的队列,多个队列可以逗号隔开(因为我们采用的是分发,所以三个队列获取的消息是相同的,这里为了简便我放在一个监听程序中了,其实我们可以写三个消费者,每个消费者监听一个队列)

现在只需要启动程序即可运行

public class ConsumerMain {    public static void main(String[] args) {
        ApplicationContext context = new ClassPathXmlApplicationContext("amqp/amqp-consumer.xml");
    }
}

当然direct跟上面的情况差不多,只不过这个是根据路由匹配,先把数据发送到交换机,然后绑定路由和队列,通过交换机id和路由来找到队列,下面是一些主要的配置

<rabbit:queue id="spring-test-queue1" durable="true" auto-delete="false" exclusive="false" name="spring-test-queue1"></rabbit:queue>

<rabbit:queue name="spring-test-queue2" durable="true" auto-delete="false" exclusive="false"></rabbit:queue>

<!--交换机定义-->

<!--rabbit:direct-exchange:定义exchange模式为direct,

意思就是消息与一个特定的路由键完全匹配,才会转发。

rabbit:binding:设置消息queue匹配的key-->

<rabbit:direct-exchange name="${rabbit.exchange.direct}" durable="true" auto-delete="false" id="${rabbit.exchange.direct}">

<rabbit:bindings>

<rabbit:binding queue="spring-test-queue1" key="spring.test.queueKey1"/>

<rabbit:binding queue="spring-test-queue2" key="spring.test.queueKey2"/>

</rabbit:bindings>

</rabbit:direct-exchange>

<!--spring template声明-->

<rabbit:template exchange="${rabbit.exchange.direct}" id="rabbitTemplate" connection-factory="connectionFactory"

message-converter="jsonMessageConverter"></rabbit:template>

<!--消息对象转成成json-->

<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.JsonMessageConverter"></bean>

下面是消费者监听配置

<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">

<rabbit:listener queues="spring-test-queue1" method="onMessage" ref="queueListenter"></rabbit:listener>

</rabbit:listener-container>

<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">

<rabbit:listener queues="spring-test-queue2" method="onMessage" ref="queueListenter"></rabbit:listener>

</rabbit:listener-container>

下面是程序

public static void main(String[] args) {

ApplicationContext context=new ClassPathXmlApplicationContext("applicationContext-rabbitmq-producer.xml");

MQProducer mqProducer=(MQProducer) context.getBean("mqProducer");

mqProducer.sendDateToQueue("spring.test.queueKey1","Hello World spring.test.queueKey1");

mqProducer.sendDateToQueue("spring.test.queueKey2","Hello World spring.test.queueKey2");

}

实际情况可能需要我们去分离消费者和生成者的程序。当然spring还有负载均衡的配置,这里就不多介绍了。

原文发布于微信公众号 - Java帮帮(javahelp)

原文发表时间:2018-08-21

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏步履前行

Spring Retry

  在我们的业务场景中,经常要调用其他的API来获取信息,比如我们的业务场景需要依赖个人信息来处理,这个时候调用个人信息服务的API,但是由于可能同一时段多方在...

3263
来自专栏Java帮帮-微信公众号-技术文章全总结

Python常见面试题【悟空教程】

1.MySQL 数据库总结 MySQL 可以建多少个数据库,理论上是没有限制的,每一个数据库可以有上亿的对象,但是一般基于硬件要求、效率问题一般不超过64个, ...

1692
来自专栏北京马哥教育

27个Linux文档编辑命令

在许多UNIX说明文件里,都有RLF控制字符。当我们运用shell特殊字符">"和">>",把说明文件的内容输出成纯文本文件时,控制字符会变成乱码,col指令则...

1755
来自专栏JadePeng的技术博客

axios介绍与使用说明 axios中文文档

本周在做一个使用vuejs的前端项目,访问后端服务使用axios库,这里对照官方文档,简单记录下,也方便大家参考。 Axios 是一个基于 Promise 的 ...

2.5K9
来自专栏后台全栈之路

基于汇编的 C/C++ 协程 - 实现

将 libco 和 libevent 两者的功能糅合起来,所以我把我的工程,命名为 libcoevent,意为 “基于 libevent 的同步协程服务器编程框...

5453
来自专栏Java架构

前沪江高级架构师学习笔记分享:分布式框架设计与实现

2006
来自专栏程序员互动联盟

【专业技术】linux下socket编程

1. 网络中进程之间如何通信 进程通信的概念最初来源于单机系统。由于每个进程都在自己的地址范围内运行,为保证两个相互通信的进程之间既互不干扰又协调一致工作,操作...

3196
来自专栏Java3y

Hibernate面试题大全

Hibernate常见面试题 Hibernate工作原理及为什么要用? Hibernate工作原理及为什么要用? 读取并解析配置文件 读取并解析映射信息,创建...

3935
来自专栏和蔼的张星的图像处理专栏

1.Win10+VsCode的C/CPP编译环境搭建

我是从开始学C++的时候就一直用的是visual studio,毕竟宇宙第一IDE,写和调试都是超级方便快捷,唯一的缺点可能就是启动慢一点。 之前电脑没有换固...

7836
来自专栏蓝天

Linux下共享库(SO)有关的几个环境变量

Linux支持共享库已经有悠久的历史了,不再是什么新概念了。大家都知道如何编译、连接以及动态加载(dlopen/dlsym/dlclose) 共享库。但是,...

1411

扫码关注云+社区

领取腾讯云代金券