前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spring集成RabbitMQ简单实现RPC

Spring集成RabbitMQ简单实现RPC

作者头像
java进阶架构师
发布2020-12-03 10:26:22
6750
发布2020-12-03 10:26:22
举报
文章被收录于专栏:Java进阶架构师

2020年Java原创面试题库连载中

【000期】Java最全面试题库思维导图

【020期】JavaSE系列面试题汇总(共18篇)

【028期】JavaWeb系列面试题汇总(共10篇)

【042期】JavaEE系列面试题汇总(共13篇)

【049期】数据库系列面试题汇总(共6篇)

【053期】中间件系列面试题汇总(共3篇)

【065期】数据结构与算法面试题汇总(共11篇)

【076期】分布式面试题汇总(共10篇)

【077期】综合面试题系列(一)

【078期】综合面试题系列(二)

【079期】综合面试题系列(三)

【080期】综合面试题系列(四)

【081期】综合面试题系列(五)

【082期】综合面试题系列(六)

【083期】综合面试题系列(七)

【084期】综合面试题系列(八)

【085期】综合面试题系列(九)

【086期】综合面试题系列(十)

【087期】综合面试题系列(十一)

【088期】综合面试题系列(十二)

【089期】综合面试题系列(十三)

更多内容,点击上面蓝字查看

代码语言:javascript
复制
public Object convertSendAndReceive(final String routingKey, final Object message) throws AmqpException {
return this.convertSendAndReceive(this.exchange, routingKey, message, null);
}

spring整合Rabbit MQ提供了Reply来实现RPC,AMQP协议定义了14中消息的属性,其中两项,一项是Replyto,表示返回消息的队列,一个是correlationId 用来表示发送消息和返回消息的标志,来区分是否是一个调用

下面一步步来实现RPC

首先贴出spring配置文件代码

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx"
  xmlns:aop="http://www.springframework.org/schema/aop" xmlns:p="http://www.springframework.org/schema/p"
  xmlns:context="http://www.springframework.org/schema/context"
  xmlns:rabbit="http://www.springframework.org/schema/rabbit"
  xmlns:prpc="http://www.pinnettech.com/schema/rpc"
  xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
    http://www.springframework.org/schema/tx
    http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context-4.0.xsd
    http://www.springframework.org/schema/aop
    http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
    http://www.pinnettech.com/schema/rpc
    http://www.pinnettech.com/schema/springtag.xsd
    http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
    <context:component-scan
            base-package="com.temp.rabbit">
       </context:component-scan>
       
       <!-- rabbit消息发送方 -->
       <!-- 连接服务配置 如果MQ服务器在远程服务器上,请新建用户用新建的用户名密码  guest默认不允许远程登录-->
       <rabbit:connection-factory id="rabbitConnectionFactory" host="localhost" username="dengwei" password="dengwei"
         port="5672" virtual-host="/" channel-cache-size="5"/>
       <rabbit:admin connection-factory="rabbitConnectionFactory"/>
       <!-- 发送消息可以带*,绑定关系需全单词 -->
       <rabbit:direct-exchange name="rpc.bao.direct.goods" durable="true" auto-delete="false">
         <rabbit:bindings>
           <rabbit:binding queue="rpc.bao.goods" key="dengwei.goods"/>
         </rabbit:bindings>
       </rabbit:direct-exchange>
       
       <!-- durable是否持久化  exclusive:是否排外的-->
       <rabbit:queue durable="true" auto-delete="false" exclusive="false" name="rpc.bao.goods"/>
       
       <!-- 消息转换器 -->
       <bean id="byteMessageConverter" class="com.temp.rabbit.BytesMessageConverter"/>
       <!-- 发送消息模板 -->
       <rabbit:template id="amqTemplate" exchange="rpc.bao.direct.goods"
         connection-factory="rabbitConnectionFactory" message-converter="byteMessageConverter" />
       <!-- 消息发送方 end -->
       
       
       <!-- 消息接受方处理器 -->
       <bean id="msgHandler" class="com.temp.rabbit.receive.MessageHandler"/>
       <!-- 消息消费者 -->
       <bean id="msgLisenerAdapter" class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter">
         <constructor-arg name="delegate" ref="msgHandler"/>
         <constructor-arg name="messageConverter" ref="byteMessageConverter"/>
       </bean>
    <!-- 消费者容器 -->
       <rabbit:listener-container connection-factory="rabbitConnectionFactory" acknowledge="auto">
       
         <rabbit:listener queues="rpc.bao.goods" ref="msgLisenerAdapter"/>
       </rabbit:listener-container>
       
       <!-- 消息接收方 end -->
       <!--  RPC 配置 -->
       <!-- 消息服务提供接口实现 -->
       <bean id="service1" class="com.temp.rabbit.bean.TempServiceImp"/>
       <!-- 代理类 -->
       <bean id="service1Proxy" class="com.temp.rabbit.receive.proxy.ServiceProxy">
         <property name="t" ref="service1"></property>
       </bean>
       <!-- 代理对象 -->
       <bean id="proxyService" factory-bean="service1Proxy" factory-method="getProxy"/>
       
</beans>

其中消息转换器类

代码语言:javascript
复制
package com.temp.rabbit;


import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.AbstractJsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.util.SerializationUtils;


public class BytesMessageConverter extends AbstractJsonMessageConverter{


@Override
protected Message createMessage(Object msg, MessageProperties msgPro) {
byte[] data = SerializationUtils.serialize(msg);
msgPro.setContentLength(data.length);
System.out.println("create message "+msg.getClass());
return new Message(data , msgPro);
}


@Override
public Object fromMessage(Message msg) throws MessageConversionException {
byte[] data = msg.getBody() ;
Object result = SerializationUtils.deserialize(data);
System.out.println("create obj "+result.getClass());
return result;
}


}

消费者处理handler

代码语言:javascript
复制
package com.temp.rabbit.receive;


import java.lang.reflect.Method;


import com.temp.rabbit.bean.RpcRequest;
import com.temp.rabbit.bean.RpcResponse;
import com.temp.rabbit.bean.TempServiceImp;


public class MessageHandler {


//没有设置默认的处理方法的时候,方法名是handleMessage
public RpcResponse handleMessage(RpcRequest message){
Class<?> clazz = message.getClassName() ;
RpcResponse response = new RpcResponse();
Method method;
try {
method = clazz.getMethod(message.getMethodName(), message.getParamType());
Object result = method.invoke(new TempServiceImp(), message.getParams());
response.setResult(result);
} catch (Exception e) {
e.printStackTrace();
}
return response ;
}
}

服务提供:

代码语言:javascript
复制
package com.temp.rabbit.bean;


public class TempServiceImp implements TempService {


public String sayHello(){
return "TempServiceImp hello ... " ;
}

}

代理类:

代码语言:javascript
复制
package com.temp.rabbit.receive.proxy;


import java.io.Serializable;
import java.lang.reflect.Method;


import org.springframework.beans.factory.annotation.Autowired;


import com.temp.rabbit.bean.RpcRequest;
import com.temp.rabbit.bean.RpcResponse;
import com.temp.rabbit.send.SendRabbitMsgImp;


import net.sf.cglib.proxy.Enhancer;
import net.sf.cglib.proxy.MethodInterceptor;
import net.sf.cglib.proxy.MethodProxy;


public class ServiceProxy<T> implements MethodInterceptor{


private Enhancer enhancer = new Enhancer();

private T t ;

public void setT(T t){
this.t = t ;
}
@Autowired
private SendRabbitMsgImp rabbitMsg ;

public Object getProxy(){
enhancer.setSuperclass(t.getClass());
enhancer.setCallback(this);
return enhancer.create();
}


@Override
public Object intercept(Object obj, Method method, Object[] param, MethodProxy proxy) throws Throwable {
RpcRequest request = new RpcRequest();
request.setMethodName(method.getName());
request.setClassName(t.getClass());
Class<?>[] paramType = new Class<?>[param.length];
Serializable[] para = new Serializable[param.length];
for(int i = 0 ; i < param.length ; i ++){
paramType[i] = param[i].getClass();
para[i] = (Serializable)param[i];
}
request.setParams(para);
request.setParamType(paramType);
RpcResponse result = (RpcResponse)rabbitMsg.sendAdcReceive("dengwei.goods", request) ;
return result.getResult();
}


}

主程序

代码语言:javascript
复制
package com.temp;


import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;


import com.temp.rabbit.bean.TempService;


public class Main {


public static void main(String[] args) {
ApplicationContext app = new ClassPathXmlApplicationContext("classpath:spring.xml");
TempService proxy = (TempService)app.getBean("proxyService");
System.out.println("main result " + proxy.sayHello()) ;
}

}  

消息发送实现:

代码语言:javascript
复制
package com.temp.rabbit.send;


import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;


@Component("sendMsg")
public class SendRabbitMsgImp implements SendRabbitMsg{


@Autowired
private RabbitTemplate template ;

@Override
public void sendData2Queue(String queueKey, Object msg) {
try {
template.convertAndSend(queueKey, msg);
} catch (Exception e) {
e.printStackTrace();
System.out.println("send data 2 msg erro ");
}
System.out.println("消息已发送");
}

@Override
public Object sendAdcReceive(String queueKey , Object msg){
try {
return template.convertSendAndReceive(queueKey, msg);
} catch (Exception e) {
e.printStackTrace();
System.out.println("send data 2 msg erro ");
}
System.out.println("消息已发送");
return null ;
}
}

这里面的RpcRequest和RpcResponse就不贴代码了

这里讲一下原理实现,我们可以跟着源码看一下

首先调用的是RabbitTemplate的

代码语言:javascript
复制
public Object convertSendAndReceive(final String routingKey, final Object message) throws AmqpException {
return this.convertSendAndReceive(this.exchange, routingKey, message, null);
}

然后一路走下去

代码语言:javascript
复制
@Override
public Object convertSendAndReceive(final String exchange, final String routingKey, final Object message,
final MessagePostProcessor messagePostProcessor) throws AmqpException {
Message requestMessage = convertMessageIfNecessary(message);
if (messagePostProcessor != null) {
requestMessage = messagePostProcessor.postProcessMessage(requestMessage);
}
Message replyMessage = this.doSendAndReceive(exchange, routingKey, requestMessage);
if (replyMessage == null) {
return null;
}
return this.getRequiredMessageConverter().fromMessage(replyMessage);
}


protected Message doSendAndReceive(final String exchange, final String routingKey, final Message message) {
if (this.replyQueue == null) {
return doSendAndReceiveWithTemporary(exchange, routingKey, message);
}
else {
return doSendAndReceiveWithFixed(exchange, routingKey, message);
}
}

到这里我们会看到,有一个分支如果replyqueue不为空则是走另外的一个方法,因为之前没有设置replyqueue所以,这里会

走第一步方法,也就是doSendAndReceiveWithTemporary

来看一下这个方法源码

代码语言:javascript
复制
protected Message doSendAndReceiveWithTemporary(final String exchange, final String routingKey, final Message message) {
return this.execute(new ChannelCallback<Message>() {


@Override
public Message doInRabbit(Channel channel) throws Exception {
final ArrayBlockingQueue<Message> replyHandoff = new ArrayBlockingQueue<Message>(1);


Assert.isNull(message.getMessageProperties().getReplyTo(),
"Send-and-receive methods can only be used if the Message does not already have a replyTo property.");
DeclareOk queueDeclaration = channel.queueDeclare();
String replyTo = queueDeclaration.getQueue();
message.getMessageProperties().setReplyTo(replyTo);


String consumerTag = UUID.randomUUID().toString();
DefaultConsumer consumer = new DefaultConsumer(channel) {


@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
MessageProperties messageProperties = messagePropertiesConverter.toMessageProperties(
properties, envelope, encoding);
Message reply = new Message(body, messageProperties);
if (logger.isTraceEnabled()) {
logger.trace("Message received " + reply);
}
try {
replyHandoff.put(reply);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
};
channel.basicConsume(replyTo, true, consumerTag, true, true, null, consumer);
doSend(channel, exchange, routingKey, message, null);
Message reply = (replyTimeout < 0) ? replyHandoff.take() : replyHandoff.poll(replyTimeout,
TimeUnit.MILLISECONDS);
channel.basicCancel(consumerTag);
return reply;
}
});
}

这里流程就是申明一个大小为1的临时队列,然后发送消息,然后监听返回的消息,放到临时队列,然后取出返回消息。

那么因为每次都会创建临时队列,所以对性能是个考验那么有第二种方式,在rabbitmq中申明一个返回队列,用来存放该服务的返回消息。

那么需要在spring配置文件中配置一个reply队列

代码语言:javascript
复制
<rabbit:queue durable="true" auto-delete="false" exclusive="false" name="reply"/>

然后在消息监听容器中再配置一个发送消息的模板template为消费者

代码语言:javascript
复制

最后再发送消息的实现中即SendRabbitMsgImp类中注入队列

代码语言:javascript
复制
@Autowired
@Qualifier("reply")
private Queue reply ; 

然后设置template的replyqueue为reply ;template.setReplyQueue(reply);

这个设置代码可以再初始化方法中,也可以再发送消息之前,其实最好的实在spring中设置

那么该说原理了,我们可以看最开始发送消息的第二个方法

代码语言:javascript
复制
代码语言:javascript
复制
protected Message doSendAndReceive(final String exchange, final String routingKey, final Message message) {
if (this.replyQueue == null) {
return doSendAndReceiveWithTemporary(exchange, routingKey, message);
}
else {
return doSendAndReceiveWithFixed(exchange, routingKey, message);
}
}
 
protected Message doSendAndReceiveWithFixed(final String exchange, final String routingKey, final Message message) {
return this.execute(new ChannelCallback<Message>() {


@Override
public Message doInRabbit(Channel channel) throws Exception {
final PendingReply pendingReply = new PendingReply();
String messageTag = UUID.randomUUID().toString();
RabbitTemplate.this.replyHolder.put(messageTag, pendingReply);
// Save any existing replyTo and correlation data
String savedReplyTo = message.getMessageProperties().getReplyTo();
pendingReply.setSavedReplyTo(savedReplyTo);
if (StringUtils.hasLength(savedReplyTo) && logger.isDebugEnabled()) {
logger.debug("Replacing replyTo header:" + savedReplyTo
+ " in favor of template's configured reply-queue:"
+ RabbitTemplate.this.replyQueue.getName());
}
message.getMessageProperties().setReplyTo(RabbitTemplate.this.replyQueue.getName());
String savedCorrelation = null;
if (RabbitTemplate.this.correlationKey == null) { // using standard correlationId property
byte[] correlationId = message.getMessageProperties().getCorrelationId();
if (correlationId != null) {
savedCorrelation = new String(correlationId,
RabbitTemplate.this.encoding);
}
}
else {
savedCorrelation = (String) message.getMessageProperties()
.getHeaders().get(RabbitTemplate.this.correlationKey);
}
pendingReply.setSavedCorrelation(savedCorrelation);
if (RabbitTemplate.this.correlationKey == null) { // using standard correlationId property
message.getMessageProperties().setCorrelationId(messageTag
.getBytes(RabbitTemplate.this.encoding));
}
else {
message.getMessageProperties().setHeader(
RabbitTemplate.this.correlationKey, messageTag);
}


if (logger.isDebugEnabled()) {
logger.debug("Sending message with tag " + messageTag);
}
doSend(channel, exchange, routingKey, message, null);
LinkedBlockingQueue<Message> replyHandoff = pendingReply.getQueue();
Message reply = (replyTimeout < 0) ? replyHandoff.take() : replyHandoff.poll(replyTimeout,
TimeUnit.MILLISECONDS);
RabbitTemplate.this.replyHolder.remove(messageTag);
return reply;
}
});
}
代码语言:javascript
复制

这个方法并没有申请临时队列,发送消息后直接再pendingReply中的队列中取,那么怎么放到pendingReply的队列中区的呢,可以看到,RabbitTemplate是实现了MessageLIstener,那么看他实现的onMessage方法

代码语言:javascript
复制
public void onMessage(Message message) {
try {
String messageTag;
if (this.correlationKey == null) { // using standard correlationId property
messageTag = new String(message.getMessageProperties().getCorrelationId(), this.encoding);
}
else {
messageTag = (String) message.getMessageProperties()
.getHeaders().get(this.correlationKey);
}
if (messageTag == null) {
logger.error("No correlation header in reply");
return;
}


PendingReply pendingReply = this.replyHolder.get(messageTag);
if (pendingReply == null) {
if (logger.isWarnEnabled()) {
logger.warn("Reply received after timeout for " + messageTag);
}
}
else {
// Restore the inbound correlation data
String savedCorrelation = pendingReply.getSavedCorrelation();
if (this.correlationKey == null) {
if (savedCorrelation == null) {
message.getMessageProperties().setCorrelationId(null);
}
else {
message.getMessageProperties().setCorrelationId(
savedCorrelation.getBytes(this.encoding));
}
}
else {
if (savedCorrelation != null) {
message.getMessageProperties().setHeader(this.correlationKey,
savedCorrelation);
}
else {
message.getMessageProperties().getHeaders().remove(this.correlationKey);
}
}
// Restore any inbound replyTo
String savedReplyTo = pendingReply.getSavedReplyTo();
message.getMessageProperties().setReplyTo(savedReplyTo);
LinkedBlockingQueue<Message> queue = pendingReply.getQueue();
queue.add(message);
if (logger.isDebugEnabled()) {
logger.debug("Reply received for " + messageTag);
if (savedReplyTo != null) {
logger.debug("Restored replyTo to " + savedReplyTo);
}
}
}
}
catch (UnsupportedEncodingException e) {
throw new AmqpIllegalStateException("Invalid Character Set:" + this.encoding, e);
}
}

这里就明白了,根据唯一id也就是前面说的correlationId找到消息的pendingReply,然后将返回的消息放到pendingReply的队列中,这样就实现了RPC的调用,

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-11-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 java进阶架构师 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档