前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ 快速入门实战

RabbitMQ 快速入门实战

原创
作者头像
技术路漫漫
修改2020-07-20 10:07:27
7650
修改2020-07-20 10:07:27
举报
文章被收录于专栏:技术路漫漫技术路漫漫

本文基于最新rabbitmq:3.8.5版本,实现了direct、fanout、topic等几种主要消息模式,并基于spring-amqp完整实现了常见消息案例,同时也通过插件方式,实现了延迟消息的处理,帮您快速入门Rabbit消息处理。

内容概括

  • rabbitmq相关环境及插件的安装
  • springboot应用中work、pubish/subscribe、routing、topics、rpc、publisher confirm等模式示例
  • 纯java应用中work、publisher confirm模式的示例
  • 延迟消息队列示例

基础环境搭建

本文基于docker来安装RabbitMQ,通过pull当前最新版本rabbitmq:3.8.5-management即可,之后通过如下的命令即可运行:

代码语言:txt
复制
docker run -d --hostname rabbit-test --name rabbit-test -p 5672:5672 -p 15672:15672 rabbitmq:3.8.5-management

同时,如有需要,也可以通过-e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password来指定相关的默认用户名称和密码。

另外,因为后续会需要实现一个延迟消息的示例(例如常见的网购7天后自动确认收货等),需要用到rabbitmq-delayed-message-exchange插件,具体安装过程如下:

代码语言:txt
复制
# 在官方页面下载插件后,拷贝到容器中
docker cp E:\dev\2try\backends\hiboot\other\rabbitmq\rabbitmq_delayed_message_exchange-3.8.0.ez rabbit-test:/plugins
# 进入容器类
docker exec -it rabbit-test /bin/sh
# enable插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 确认插件安装成功
rabbitmq-plugins list

SpringBoot-AMQP应用模式

work普通工作模式

该模式通过构建1个queue、2个receiver、1个sender来模拟:

  • WorkSender 是每秒钟发送消息到队列中。
  • WorkReceiver 接收并打印消息内容。
  • WorkMqConfig 配置相关队列、接受者和发送者。
  • 其他基于默认配置进行处理,默认的exchange是direct模式,两个receiver相互竞争模式获取消息。

配置类具体如下:

代码语言:txt
复制
@Configuration
public class WorkMqConfig {

    @Bean
    public Queue workQueue() {
        return new Queue("rabbit-work");
    }

    @Bean
    public WorkReceiver WorkReceiver1() {
        return new WorkReceiver(1);
    }

    @Bean
    public WorkReceiver workReceiver2() {
        return new WorkReceiver(2);
    }

    @Bean
    public WorkSender sender() {
        return new WorkSender();
    }

}

消息发送类如下:

代码语言:txt
复制
@Slf4j
public class WorkSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private Queue queue;

    AtomicInteger dots = new AtomicInteger(0);

    AtomicInteger msgCount = new AtomicInteger(0);

    @Scheduled(fixedDelay = 1000, initialDelay = 500)
    public void send() {
        // 每秒发送消息
        StringBuilder msgBuilder = new StringBuilder("hi");
        if (dots.getAndIncrement() == 4) {
            // 达到4次则重置
            dots.set(0);
        }
        for (int i = 0; i < dots.get(); i++) {
            msgBuilder.append(".");
        }
        msgBuilder.append(msgCount.incrementAndGet());
        rabbitTemplate.convertAndSend(queue.getName(), msgBuilder.toString());
        log.info("send msg:{}", msgBuilder);
    }
}

消息接受者(消费者)具体代码如下:

代码语言:txt
复制
@RabbitListener(queues = "rabbit-work")
@Slf4j
public class WorkReceiver {
    private final int instance;

    public WorkReceiver(int i) {
        this.instance = i;
    }

    @RabbitHandler
    public void receive(String msg) throws InterruptedException {
        StopWatch watch = new StopWatch();
        watch.start();
        log.info("worker[{}] received msg:{}", instance, msg);
        dealMsg(msg);
        watch.stop();
        log.info("worker[{}] done in {}s", instance, watch.getTotalTimeSeconds());
    }

    private void dealMsg(String msg) throws InterruptedException {
        for (char ch : msg.toCharArray()) {
            if (ch == '.') {
                // 消息中每个点休眠1秒
                Thread.sleep(1000);
            }
        }
    }
}

运行效果:(两个worker相关交替消费)

代码语言:txt
复制
worker[1] received msg:hi.1
worker[2] received msg:hi..2
worker[1] received msg:hi...3
worker[2] received msg:hi....4

publish/subscribe 发布/订阅(广播)模式

该模式基于FanoutExchange来实现:

  • 构建一个FanoutExchange,实现对内容的路由
  • 建立两个AnonymousQueue(匿名自动删除)队列,分别绑定到上述exchange,实现对消息的订阅
  • 两个消费者从对应队列获取消息
  • 此模式与 work模式的区别是,此处两个消费者都收到了相同的消息。

相关具体配置如下:

代码语言:txt
复制
@Configuration
public class PubSubConfig {

    private static final String EXCHANGE_NAME = "sample.fanout";

    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(EXCHANGE_NAME);
    }

    @Bean
    public Queue autoDeleteQueue1() {
        // 匿名自动删除队列
        return new AnonymousQueue();
    }

    @Bean
    public Queue autoDeleteQueue2() {
        return new AnonymousQueue();
    }

    @Bean
    public Binding binding1() {
        return BindingBuilder.bind(autoDeleteQueue1()).to(fanoutExchange());
    }

    @Bean
    public Binding binding2() {
        return BindingBuilder.bind(autoDeleteQueue2()).to(fanoutExchange());
    }

    @Bean
    public PubSubSender sender() {
        return new PubSubSender();
    }

    @Bean
    public PubSubReceiver receiver() {
        return new PubSubReceiver();
    }

}

消息发送模式类似:

代码语言:txt
复制
@Slf4j
public class PubSubSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private FanoutExchange fanoutExchange;

    AtomicInteger dots = new AtomicInteger();

    AtomicInteger msgCount = new AtomicInteger();

    @Scheduled(fixedDelay = 1000, initialDelay = 500)
    public void send() {
        StringBuilder msgBuilder = new StringBuilder("hello");
        if (dots.getAndIncrement() == 4) {
            dots.set(0);
        }
        for (int i = 0; i < dots.get(); i++) {
            msgBuilder.append(".");
        }
        msgBuilder.append(msgCount.incrementAndGet());
        // 把消息发送到 exchange,注意此处的写法与 work模式有差异,也即需要指定exchange的name
        rabbitTemplate.convertAndSend(fanoutExchange.getName(), "", msgBuilder.toString());
        log.info("send msg:{}", msgBuilder);
    }
}

消息消费者从队列获取消息:

代码语言:txt
复制
@Slf4j
public class PubSubReceiver {

    @RabbitListener(queues = "#{autoDeleteQueue1.name}")
    public void receiver1(String msg) throws InterruptedException {
        receiveMsg(msg, 1);
    }

    @RabbitListener(queues = "#{autoDeleteQueue2.name}")
    public void receiver2(String msg) throws InterruptedException {
        receiveMsg(msg, 2);
    }

    public void receiveMsg(String msg, int receiver) throws InterruptedException {
        log.info("receiver[{}] receiver msg:{}", receiver, msg);
        dealMsg(msg);
        log.info("receiver[{}] done", receiver);
    }

    private void dealMsg(String msg) throws InterruptedException {
        for (char ch : msg.toCharArray()) {
            if (ch == '.') {
                // 消息中每个点休眠1秒
                Thread.sleep(1000);
            }
        }
    }
}

运行效果:(两个receiver相关均获取到消息)

代码语言:txt
复制
receiver[1] receiver msg:hello.1
receiver[2] receiver msg:hello.1
receiver[1] receiver msg:hello..2
receiver[2] receiver msg:hello..2

routing路由模式

路由模式则更灵活,可自动根据规则将消息投递到对应的队列中:

  • 路由模式的exchange与work一样,都是direct模式。
  • 通过在banding的时候,指定routingkey来实现消息路由,并且一个队列可绑定多个路由模式。
  • 注意此时的routingkey不支持模糊匹配

下面以日志处理示例内容:

  • 将error类消息路由到criticalQueue
  • 将info、warn类消息路由到normalQueue
代码语言:txt
复制
@Configuration
public class MqRouteConfig {

    @Bean
    public DirectExchange logExchange() {
        return new DirectExchange("sample.direct.log");
    }

    @Bean
    public Queue criticalQueue() {
        return new AnonymousQueue();
    }

    @Bean
    public Queue normalQueue() {
        return new AnonymousQueue();
    }

    @Bean
    public Binding bindingError() {
        // 把错误日志绑定到criticalQueue
        return BindingBuilder.bind(criticalQueue()).to(logExchange()).with("error");
    }

    @Bean
    public Binding bindingInfo() {
        // 把info日志绑定到normalQueue
        return BindingBuilder.bind(normalQueue()).to(logExchange()).with("info");
    }

    @Bean
    public Binding bindingWarn() {
        // 把告警日志绑定到normalQueue
        return BindingBuilder.bind(normalQueue()).to(logExchange()).with("warn");
    }

    @Bean
    public LogSender logSender() {
        return new LogSender();
    }

    @Bean
    public LogReceiver logReceiver() {
        return new LogReceiver();
    }

}

消息发送者代码如下:

代码语言:txt
复制
@Slf4j
public class LogSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private DirectExchange logExchange;

    AtomicInteger index = new AtomicInteger(0);

    AtomicInteger msgCount = new AtomicInteger(0);

    private final String[] types = {"error", "warn", "info"};

    @Scheduled(fixedDelay = 1000, initialDelay = 500)
    public void send() {
        StringBuilder msgBuilder = new StringBuilder("log of ");
        if (this.index.incrementAndGet() == types.length) {
            this.index.set(0);
        }
        String type = types[this.index.get()];
        msgBuilder.append(type).append(" ").append(this.msgCount.incrementAndGet());
        for (int i = 0; i < index.get(); i++) {
            msgBuilder.append(".");
        }
        rabbitTemplate.convertAndSend(logExchange.getName(), type, msgBuilder.toString());
        log.info("send msg:{}", msgBuilder.toString());
    }

}

消息接受者功能如下:

  • 分别注册三个RabbitListener,其中criticalLogger对应criticalQueue,normalLogger、otherLogger对应normalQueue
  • criticalLogger消费error消息
  • normalLogger、otherLogger则类似于work模式,均从normalQueue竞争模式获取消息。
代码语言:txt
复制
@Slf4j
public class LogReceiver {

    @RabbitListener(queues = "#{criticalQueue.name}")
    public void criticalLogger(String msg) throws InterruptedException {
        receive(msg, "critical");
    }

    @RabbitListener(queues = "#{normalQueue.name}")
    public void normalLogger(String msg) throws InterruptedException {
        receive(msg, "normal");
    }

    @RabbitListener(queues = "#{normalQueue.name}")
    public void otherLogger(String msg) throws InterruptedException {
        receive(msg, "other");
    }

    public void receive(String msg, String type) throws InterruptedException {
        log.info("logger[{}] get msg:{}", type, msg);
        this.dealMsg(msg);
        log.info("logger[{}] done.", type);
    }

    private void dealMsg(String msg) throws InterruptedException {
        for (char ch : msg.toCharArray()) {
            if (ch == '.') {
                // 消息中每个点休眠1秒
                Thread.sleep(1000);
            }
        }
    }
}

运行效果:(critical队列的消息都是error,normal和other则分别交替获取到warn和info消息)

代码语言:txt
复制
logger[normal] get msg:log of warn 1.
logger[other] get msg:log of info 2..
logger[critical] get msg:log of error 3
logger[normal] get msg:log of warn 4.
logger[other] get msg:log of info 5..
logger[critical] get msg:log of error 6

topic主题匹配模式

topic模式是最灵活的模式:

  • 其exchange是三种类型中的最后一类:TopicExchange
  • 与direct和fanout的区别是,topic支持按*#来模糊匹配,其中*号代表一个单词,#代表0或多个单词(word)

下面以通常的短信和email通知示例:

  • *.reg路由消息代表用户注册,同时发送到emailQueue和smsQueue
  • #.password路由消息代表用户密码变动,仅发送到emailQueue
  • #.captcha路由消息代表验证码内容,仅发送到smsQueue

具体配置信息如下:

代码语言:txt
复制
@Configuration
public class TopicMsgConfig {

    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("sample.topic.user");
    }

    @Bean
    public TopicMsgReceiver receiver() {
        return new TopicMsgReceiver();
    }

    @Bean
    public Queue emailQueue() {
        return new AnonymousQueue();
    }

    @Bean
    public Queue smsQueue() {
        return new AnonymousQueue();
    }

    @Bean
    public Binding bindingEmailReg() {
        // 注册消息都发邮件
        return BindingBuilder.bind(emailQueue()).to(topicExchange()).with("*.reg");
    }

    @Bean
    public Binding bindingEmailPassword() {
        // 密码信息都发邮件
        return BindingBuilder.bind(emailQueue()).to(topicExchange()).with("#.password");
    }

    @Bean
    public Binding bindingSmsReg() {
        // 用户注册都发短信
        return BindingBuilder.bind(smsQueue()).to(topicExchange()).with("*.reg");
    }

    @Bean
    public Binding bindingSmsCaptcha() {
        // 验证码均发短信
        return BindingBuilder.bind(smsQueue()).to(topicExchange()).with("#.captcha");
    }

    @Bean
    public TopicMsgSender msgSender() {
        return new TopicMsgSender();
    }

}

消息发送示例如下:

代码语言:txt
复制
@Slf4j
public class TopicMsgSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private TopicExchange topicExchange;

    AtomicInteger index = new AtomicInteger();

    AtomicInteger msgCount = new AtomicInteger();

    private final String[] msgKeys = {"user.reg", "user.update.password", "user.reg.captcha"};

    @Scheduled(fixedDelay = 1000, initialDelay = 500)
    public void sendMsg() {
        StringBuilder msgBuilder = new StringBuilder("用户信息变更 ");
        if (this.index.incrementAndGet() == msgKeys.length) {
            this.index.set(0);
        }
        String key = msgKeys[this.index.get()];
        msgBuilder.append(key).append(msgCount.incrementAndGet());
        rabbitTemplate.convertAndSend(topicExchange.getName(), key, msgBuilder.toString());
        log.info("发出消息:{}", msgBuilder);
    }
}

消息接收代码如下:

  • 注册两个RabbitListener,一个对接emailQueue,另一个对接smsQueue
代码语言:txt
复制
@Slf4j
public class TopicMsgReceiver {

    @RabbitListener(queues = "#{emailQueue.name}")
    public void emailReceiver(String msg) throws InterruptedException {
        receive(msg, "email");
    }

    @RabbitListener(queues = "#{smsQueue.name}")
    public void SmsReceiver(String msg) throws InterruptedException {
        receive(msg, "sms");
    }

    public void receive(String msg, String inst) throws InterruptedException {
        log.info("instance[{}] received msg:{}", inst, msg);
        dealMsg(msg);
        log.info("instance[{}] done", inst);
    }

    private void dealMsg(String msg) throws InterruptedException {
        for (char ch : msg.toCharArray()) {
            if (ch == '.') {
                // 消息中每个点休眠1秒
                Thread.sleep(1000);
            }
        }
    }
}

运行效果:

  • email和sms两个实例都获取到了user.reg消息
  • email单独获取到了user.update.password消息
  • sms单独获取到了user.reg.captcha消息
代码语言:txt
复制
instance[email] received msg:用户信息变更 user.update.password1
instance[email] received msg:用户信息变更 user.reg3
instance[email] received msg:用户信息变更 user.update.password4
instance[email] received msg:用户信息变更 user.reg6

instance[sms] received msg:用户信息变更 user.reg.captcha2
instance[sms] received msg:用户信息变更 user.reg3
instance[sms] received msg:用户信息变更 user.reg.captcha5
instance[sms] received msg:用户信息变更 user.reg6

rpc远程过程调用模式

rpc涉及到客户端和服务端交互:

  • exchange依然是基于DirectExchange模式
  • 此处的RPC模式与routing模式的主要区别是,RPC模式下,发送消息是基于convertSendAndReceive方法,而其他模式一般是基于convertAndSend方法
  • 另外,除了常规的基于RabbitTemplate来实现同步模式外,也可以通过AsyncRabbitTemplate来实现异步RPC,也即不需要等待上一条消息的返回,通过回调来接收消息响应信息。

配置类具体代码如下:

代码语言:txt
复制
@Configuration
public class RpcMsgConfig {

    @Bean
    public DirectExchange exchange() {
        return new DirectExchange("sample.rpc");
    }

    @Bean
    public RpcMsgClient client() {
        return new RpcMsgClient();
    }

    @Bean
    public Queue queue() {
        return new Queue("sample.rpc.requests");
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(exchange()).with("rpc");
    }

    @Bean
    public RpcMsgServer server() {
        return new RpcMsgServer();
    }

    @Bean
    public AsyncRabbitTemplate asyncRabbitTemplate(RabbitTemplate rabbitTemplate) {
        return new AsyncRabbitTemplate(rabbitTemplate);
    }
}

服务端实现一个fibnacci数列求和,输入是n,返回的是求和结果

代码语言:txt
复制
@Slf4j
public class RpcMsgServer {

    @RabbitListener(queues = "sample.rpc.requests")
    public int fibnacci(int n) {
        log.info("server received fib of :{}", n);
        int result = fib(n);
        log.info("server returned result:{}", result);
        return result;
    }

    public int fib(int n) {
        return n == 0 ? 0 : (n == 1 ? 1 : (fib(n - 1) + fib(n - 2)));
    }
}

客户端同步模式代码如下:

代码语言:txt
复制
@Scheduled(fixedDelay = 1000, initialDelay = 500)
public void sendMsg() {
    // 此处为默认同步队列方式发送消息,也即上一消息未收到回复之前,不会发送下一条消息。
    // 默认的 超时时间是5秒,可通过setReplyTimeout来修改
    rabbitTemplate.setReplyTimeout(6000L);

    int fib = start;
    log.info("sync client send requesting fib({})", fib);
    Integer response = (Integer) rabbitTemplate.convertSendAndReceive(exchange.getName(), "rpc",
            fib);
    // 超时之后会得到null的返回值
    log.info("sync client got fib({}) response:{}", fib, response);
    start++;
}

同步模式运行效果:

  • 在获取到上一个结果之后,才会发出小一条消息
  • 在计算到46时,客户端就因为超时而无法获取到结果,并且抛出了相关异常
代码语言:txt
复制
sync client send requesting fib(45)
sync client got fib(45) response:1134903170
sync client send requesting fib(46)
sync client got fib(46) response:null
sync client send requesting fib(47)
sync client got fib(47) response:null

客户端异常回调代码如下:

代码语言:txt
复制
@Scheduled(fixedDelay = 1000, initialDelay = 500)
public void asyncSendMsg() {
    int fib = start;
    log.info("async client send fib({})", fib);
    // 异步发送请求
    AsyncRabbitTemplate.RabbitConverterFuture<Integer> future = asyncRabbitTemplate.
            convertSendAndReceive(exchange.getName(), "rpc", fib);
    // 增加回调
    future.addCallback(new ListenableFutureCallback<Integer>() {
        @Override
        public void onFailure(Throwable throwable) {
            log.warn("async client failed", throwable);
        }

        @Override
        public void onSuccess(Integer integer) {
            log.info("async client got fib({}) reponse:{}", fib, integer);
        }
    });
    start++;
}

异常模式运行效果如下:

  • 消息的发送没有被阻塞
  • 异步获取到了相关返回结果
代码语言:txt
复制
async client send fib(44)
async client got fib(43) reponse:433494437
async client send fib(45)
async client got fib(43) reponse:433494437
async client send fib(46)
async client send fib(47)
async client send fib(48)
async client got fib(45) reponse:1134903170
async client send fib(49)
async client send fib(50)

publisher confirm发布者确认模式

官方示例中并没有给出publisher confirm的实现模式,以下示例供参考:

  • 通过在application配置文件中,设定publisher-confirm-typepublisher-returns来设定发布者的确认回调和 返回回调模式。
  • 通常是,如果消息正常发布到了exchange则算是自动确认(ack),如果因为routingkey错误等导致无法被正常路由,如果publisher-returns没有设置为true,则一般会被自动删除,否则会触发return回调。
  • 可通过listener.simple.acknowledge-mode来设置消费者的确认模式,默认是自动,可设置为manual来手工确认。【注意如果设置为manual,如果客户端因为异常等原因没有触发basicAck或basicNack等操作,该消息在消息队列中处于Ready状态,但对于消息发送方来说,依然是属于已确认状态,因为消息发送方的确认是指消息被成功投递到exchange broker】
  • 可通过listener.simple.prefetch来进行消费端限流,尤其是在消费端涉及到数据库操作等情况下。

示例application.yml配置如下:

代码语言:txt
复制
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    publisher-confirm-type: simple
    publisher-returns: true
    listener:
      simple:
        #        手工确认,通常不需要
        acknowledge-mode: manual
        #        限流
        prefetch: 30

配置类代码:

代码语言:txt
复制
@Configuration
public class MsgConfig {

    @Bean
    public TopicExchange exchange() {
        return new TopicExchange("sample.confirm.exchange");
    }

    @Bean
    public Queue queue() {
        return new Queue("sample.confirm.queue");
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(exchange()).with("#.confirm");
    }

    @Bean
    public Snowflake snowflake() {
        return IdUtil.createSnowflake(1, 1);
    }
}

消息发送者代码如下:

代码语言:txt
复制
@Component
@Slf4j
public class MsgSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private TopicExchange exchange;

    @Autowired
    private Snowflake snowflake;

    private static AtomicInteger msgCount = new AtomicInteger();

    private static String[] keys = {"test.confirm", "good.confirm", "error.conf"};

    @Scheduled(fixedDelay = 1000, initialDelay = 1000)
    public void sendMsg() {
        rabbitTemplate.setConfirmCallback((
                (correlationData, ack, cause) -> {
                    // 监听 broker的应答
                    log.info("sender confirm callback:{},{},{}", correlationData, ack, cause);
                    if (!ack) {
                        log.warn("sender 消息未确认");
                    }
                }));

        // 设置强制标识,必须设置了setReturnCallback,true是指broker不自动删除不可达消息,并通过ReturnCallback回调
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback((
                (message, replyCode, replyText, exchange, routingKey) -> {
                    // 监听 不可路由的消息
                    log.info("sender return callback:{},{},{},{},{}", message, replyCode, replyText,
                            exchange, routingKey);
                }));

        CorrelationData data = new CorrelationData(snowflake.nextIdStr());
        // 有意根据数组发送无法路由消息
        String key = keys[msgCount.get() % 3];
        DemoUser user = DemoUser.builder()
                .userId(msgCount.incrementAndGet())
                .userName("用户" + msgCount.get())
                .build();
        String msg = JSON.toJSONString(user);
        MessageProperties messageProperties = new MessageProperties();
        // 设置内容为json模式
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
        messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
        // 设置3秒过期,过期的被自动删除
        messageProperties.setExpiration("3000");
        Message message = new Message(msg.getBytes(), messageProperties);
        rabbitTemplate.convertAndSend(exchange.getName(), key, message, data);
    }
}

消息消费者代码如下:

  • 注意如果basicNack中指定requeue为true,该消息会再次回到消息队列头部,从而很容易造成消息消费的死循环。
代码语言:txt
复制
@Component
@Slf4j
public class MsgConsumer {

    @RabbitListener(queues = "#{queue.name}")
    @RabbitHandler
    public void dealMsg(Message message, Channel channel) throws IOException, InterruptedException {
        String msg = new String(message.getBody());
        log.info("consumer got msg:{}", msg);
        DemoUser user = JSON.parseObject(msg, DemoUser.class);
        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        if (user.getUserId() % 10 == 0) {
            // 有意造成失败,也会返回失败确认,也即 ack,nack,reject都是确认
            // requeue 如果设置为true,很可能会造成消息消费的死循环
            channel.basicNack(deliveryTag, true, false);
            log.info("consumer msg nack:{},{}", deliveryTag, user);
        } else {
            channel.basicAck(deliveryTag, true);
        }
        // 有意减慢处理速度
        Thread.sleep(2000);
    }
}

运行效果:

  • sender每个成功投递的消息,都收到了confirm callback
  • sender路由错误的消息,都收到了return callback
代码语言:txt
复制
sender confirm callback:CorrelationData [id=1284487848413761536],true,null
sender confirm callback:CorrelationData [id=1284487852880695296],true,null
sender return callback:(Body:'{"userId":3,"userName":"用户3"}' MessageProperties [headers={spring_returned_message_correlation=1284487857083387904}, contentType=application/json, contentLength=0, receivedDeliveryMode=PERSISTENT, expiration=3000, priority=0, deliveryTag=0]),312,NO_ROUTE,sample.confirm.exchange,error.conf

consumer got msg:{"userId":1,"userName":"用户1"}
consumer got msg:{"userId":2,"userName":"用户2"}
consumer got msg:{"userId":4,"userName":"用户4"}
consumer got msg:{"userId":5,"userName":"用户5"}
consumer got msg:{"userId":7,"userName":"用户7"}
consumer got msg:{"userId":8,"userName":"用户8"}
consumer got msg:{"userId":10,"userName":"用户10"}
consumer msg nack:7,DemoUser(userId=10, userName=用户10)

Java应用模式

简单work模式

springaqmp自动实现了相关依赖配置,纯java应用则需要自己来进行调用:

  • 通过 ConnectionFactory 来配置连接信息,创建连接connection,并通过connection来创建channel
  • 通过channel.queueDeclare来声明队列信息
  • 通过channel.basicPublish等方法来发送消息

消息发送端代码示例:

代码语言:txt
复制
@Slf4j
public class PureSender {
    private final static String QUEUE_NAME = "sample.java";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String msg = "hello,你好";
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            log.info("client send:{}", msg);
        }
    }
}

消息消费端示例:

  • 在消费端,注意不要用try语法来自动关闭连接,否则就只能运行一次。
代码语言:txt
复制
@Slf4j
public class PureReceiver {
    private final static String QUEUE_NAME = "sample.java";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        log.info("server waiting for msg");
        channel.basicConsume(QUEUE_NAME, true, ((s, delivery) -> {
            String msg = new String(delivery.getBody(), "UTF-8");
            log.info("server received msg:{}", msg);
        }), s -> {
        });
    }
}

运行效果:

  • 主要要先运行PureReceiver,等待消息发来,然后再运行PureSender,向队列发送消息。
  • 客户端成功发出消息,服务端也成功接收到了消息
代码语言:txt
复制
client send:hello,你好

server waiting for msg
server received msg:hello,你好

publisher confirm发布者确认

rabbitmq官网提供了java版本的模式实现:

  • 调用channel的confirmSelect,将模式设定为开启发布者确认。
  • pubMsgIndividually是最简单模式,直接发送消息后通过waitForConfirmsOrDie来等待确认,也是性能最差的一种方式
  • pubMsgInBatch是批量模式,逐条发送消息后,通过计数器来进行批次确认等待确认
  • handleMsgAsync是异步模式,也是性能最高的一种模式,通过channel的addConfirmListener来进行确认结果的监听

消息发送方:

1、创建连接:

代码语言:txt
复制
static Connection createConnection() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    factory.setUsername("guest");
    factory.setPassword("guest");
    return factory.newConnection();
}

2、逐条发送模式:

代码语言:txt
复制
static void pubMsgIndividually() throws IOException, TimeoutException, InterruptedException {
    // 单独发送消息模式
    try (Connection connection = createConnection(); Channel channel = connection.createChannel();) {
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        channel.confirmSelect();

        long start = System.nanoTime();
        for (int i = 0; i < MSG_COUNT; i++) {
            String msg = String.valueOf(i);
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            // 同步等待消息确认结果,5秒后超时
            channel.waitForConfirmsOrDie(5_000);
        }
        long end = System.nanoTime();
        // 打印发出的消息数目,以及用时

        log.info("sended {} msgs individually in {} ms", MSG_COUNT,
                Duration.ofNanos(end - start).toMillis());
    }
}

3、批量模式:

代码语言:txt
复制
static void pubMsgInBatch() throws IOException, TimeoutException, InterruptedException {
    // 批量发送消息
    try (Connection connection = createConnection(); Channel channel = connection.createChannel();) {
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        channel.confirmSelect();

        int batchSize = 100;
        int outstandingMsgCount = 0;

        long start = System.nanoTime();
        for (int i = 0; i < MSG_COUNT; i++) {
            String msg = String.valueOf(i);
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            outstandingMsgCount++;

            if (outstandingMsgCount == batchSize) {
                // 按100条的批次等待消息确认
                channel.waitForConfirmsOrDie(5_000);
                outstandingMsgCount = 0;
            }
        }

        if (outstandingMsgCount > 0) {
            channel.waitForConfirmsOrDie(5_000);
        }

        long end = System.nanoTime();
        log.info("sended {} msgs in bath in {} ms", MSG_COUNT,
                Duration.ofNanos(end - start).toMillis());
    }
}

4、异步模式:

  • 构建ConcurrentSkipListMap来存储待处理消息
  • 通过ackCallback和nackCallback,监听消息确认情况,确认的消息将被清除
代码语言:txt
复制
static void handleMsgAsync() throws IOException, TimeoutException, InterruptedException {
    try (Connection connection = createConnection(); Channel channel = connection.createChannel();) {
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        channel.confirmSelect();

        // ConcurrentSkipListMap是线程安全的高并发有序哈希map
        // key为消息序号,string为消息内容
        // 用该map来记录待处理的消息
        ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();

        ConfirmCallback cleanOutstandingConfirms = (deliveryTag, multiple) -> {
            // 消息被确认的回调
            if (multiple) {
                // 批量确认,小于等于deliveryTag的都会被确认
                // headMap返回的是小于等于给定值的map子集
                ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(
                        deliveryTag, true);
                // 清空所有确认的消息
                confirmed.clear();
            } else {
                outstandingConfirms.remove(deliveryTag);
            }
        };

        channel.addConfirmListener(cleanOutstandingConfirms, (
                (deliveryTag, multiple) -> {
                    // 消息丢失的回调
                    String msg = outstandingConfirms.get(deliveryTag);
                    log.error("msg with body {} nack-ed,deliveryTag:{},multiple:{}", msg,
                            deliveryTag, multiple);
                    // 处理丢失的消息
                    cleanOutstandingConfirms.handle(deliveryTag, multiple);
                }));

        long start = System.nanoTime();
        for (int i = 0; i < MSG_COUNT; i++) {
            String msg = String.valueOf(i);
            outstandingConfirms.put(channel.getNextPublishSeqNo(), msg);
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
        }
        if (!waitUntil(Duration.ofSeconds(60), () -> outstandingConfirms.isEmpty())) {
            throw new IllegalStateException("60秒内无法确认所有消息");
        }

        long end = System.nanoTime();
        log.info("sended {} msgs in handleMsgAsync in {} ms", MSG_COUNT,
                Duration.ofNanos(end - start).toMillis());
    }
}

消息接收端代码如下:

代码语言:txt
复制
@Slf4j
public class MsgReceiver {

    private final static String QUEUE_NAME = "sample.pubConf";

    static AtomicInteger msgCount = new AtomicInteger(0);

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = PublisherConfirms.createConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        log.info("server waiting for msg");
        channel.basicConsume(QUEUE_NAME, true, (
                (c, delivery) -> {
                    String msg = new String(delivery.getBody(), "UTF-8");
                    // 对接收到的消息进行累加
                    if (msgCount.incrementAndGet() % 20000 == 0) {
                        // 每2万条消息打印一次
                        log.info("server got {} messages", msgCount);
                        log.info("current msg:{}", msg);
                    }
                }), s -> {
            log.warn("consumer canceled:{}", s);
        });
    }
}

运行效果:

  • 客户端分三批次发出了2万条消息,第一次用时14秒左右,第二次用时1.6秒,第三次用时1.1秒
  • 服务端总共收到了6万条消息
代码语言:txt
复制
 sended 20000 msgs individually in 14609 ms
 sended 20000 msgs in bath in 1655 ms
 sended 20000 msgs in handleMsgAsync in 1176 ms
 
 server got 20000 messages
 current msg:19999
 server got 40000 messages
 current msg:19999
 server got 60000 messages
 current msg:19999

延迟消息模式

开始延迟消息模式之前,需要先安装rabbitmq_delayed_message_exchange插件,然后:

  • 通过exchange.setDelayed(true);来将broker设置为延迟模式
  • 发送消息时,通过properties.setDelay(3_000);来设定每条消息的延迟时间,单位毫秒

消息配置类:

代码语言:txt
复制
@Configuration
public class DelayMQConfig {
    @Bean
    public TopicExchange exchange() {
        TopicExchange exchange = new TopicExchange("sample.delay");
        exchange.setDelayed(true);
        return exchange;
    }

    @Bean
    public Queue queue() {
        return new Queue("queue.delay");
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(exchange()).with("#.delay");
    }
}

消息发送类:

代码语言:txt
复制
@Component
@Slf4j
public class MsgSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private TopicExchange exchange;

    private static AtomicInteger msgCount = new AtomicInteger();

    @Scheduled(fixedDelay = 2000, initialDelay = 500)
    public void sendMsg() {
        // 循环发送3条消息
        String msg = String.format("你好,msg%s", msgCount.incrementAndGet());
        rabbitTemplate.convertAndSend(exchange.getName(), "msg.delay", msg, (message -> {
            MessageProperties properties = message.getMessageProperties();
            // 设置延迟3秒
            properties.setDelay(3_000);
            properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            return message;
        }));
        log.info("send msg:{}", msg);
    }
}

消息消费类:

代码语言:txt
复制
@Component
@Slf4j
public class MsgConsumer {

    @RabbitListener(queues = "#{queue.name}")
    @RabbitHandler
    public void dealMsg(Message message) {
        log.info("consumer got msg:{}", new String(message.getBody()));
    }
}

运行效果:

  • 在消息发出3秒钟以后,消费方才接收到消息
代码语言:txt
复制
2020-07-18 22:37:44.851: send msg:你好,msg1
2020-07-18 22:37:46.851: send msg:你好,msg2
2020-07-18 22:37:47.869: consumer got msg:你好,msg1
2020-07-18 22:37:48.854: send msg:你好,msg3
2020-07-18 22:37:49.856: consumer got msg:你好,msg2
2020-07-18 22:37:50.855: send msg:你好,msg4
2020-07-18 22:37:51.861: consumer got msg:你好,msg3
2020-07-18 22:37:52.856: send msg:你好,msg5

源码信息

本案例源码地址:https://gitee.com/coolpine/backends/tree/master/hiboot/src/main/java/pers/techlmm/rabbit

更多参考资料:

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 内容概括
  • 基础环境搭建
  • SpringBoot-AMQP应用模式
    • work普通工作模式
      • publish/subscribe 发布/订阅(广播)模式
        • routing路由模式
          • topic主题匹配模式
            • rpc远程过程调用模式
              • publisher confirm发布者确认模式
              • Java应用模式
                • 简单work模式
                  • publisher confirm发布者确认
                  • 延迟消息模式
                  • 源码信息
                  相关产品与服务
                  容器服务
                  腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档