image.png 一般采用“回调网关+MQ”方案来解耦: a、调用方直接跨公网调用微信接口 b、微信返回调用成功,此时并不代表返回成功 c、微信执行完成后,回调统一网关 d、网关将返回结果通知MQ e、请求方收到结果通知
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
//声明并初始化一个producer
//需要一个producer group名字作为构造方法的参数,这里为producer1
DefaultMQProducer producer = new DefaultMQProducer("producer1");
//设置NameServer地址,此处应改为实际NameServer地址,多个地址之间用;分隔
//NameServer的地址必须有,但是也可以通过环境变量的方式设置,不一定非得写死在代码里
producer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876");
//调用start()方法启动一个producer实例
producer.start();
//发送10条消息到Topic为TopicTest,tag为TagA,消息内容为“Hello RocketMQ”拼接上i的值
for (int i = 0; i < 10; i++) {
try {
Message msg = new Message("TopicTest",// topic
"TagA",// tag
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)// body
);
//调用producer的send()方法发送消息
//这里调用的是同步的方式,所以会有返回结果
SendResult sendResult = producer.send(msg);
//打印返回结果,可以看到消息发送的状态以及一些相关信息
System.out.println(sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
//发送完消息之后,调用shutdown()方法关闭producer
producer.shutdown();
}
}
在开发过程中,如果想测试生产者是否发出了mq,可以编写一个消费者进行测试
@Test
public void testMqConsumer() throws Exception {
String rocketmqAddress="10.113.41.2:9876;10.113.41.4:9876";
int threadNum = 5;
String topics = "WechatUnionCoreTemplateNotifyTopic";
String instanceName = "TemplateComsumer";
String groupName = "wechatUnionTemplateNotifyConsumer";
DefaultMQPushConsumer consumer = null;
consumer = new DefaultMQPushConsumer(groupName);
consumer.setNamesrvAddr(rocketmqAddress);//MQ地址
consumer.setClientCallbackExecutorThreads(threadNum);//消费现场数量
consumer.setInstanceName(instanceName);//实例名称
consumer.subscribe(topics, "*");
//注册监听
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (int i = 0; i < msgs.size(); i++) {
MessageExt msgExt = msgs.get(i);
String msgId = msgExt.getMsgId();
Integer flag = msgExt.getFlag();
TemplateNotifyItem templateNotifyItem = ProtoBufSerialize.fromProto(msgExt.getBody(), TemplateNotifyItem.class);
logger.info("receive new Msg: " + " msgId=" + msgId + " flag=" + flag + " templateNotifyItem=" + templateNotifyItem);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
logger.info("监听执行中");
Thread.sleep(1000000);
}
参考: http://blog.csdn.net/manzhizhen/article/details/52606733 https://www.jianshu.com/p/824066d70da8 架构师之路-mq系列