Java 小记 — RabbitMQ 的实践与思考

前言

本篇随笔将汇总一些我对消息队列 RabbitMQ 的认识,顺便谈谈其在高并发和秒杀系统中的具体应用。

1. 预备示例

想了下,还是先抛出一个简单示例,随后再根据其具体应用场景进行扩展,我觉得这样表述条理更清晰些。

RabbitConfig:

@Configuration
public class RabbitConfig {

    @Bean
    public Queue callQueue() {
        return new Queue(MQConstant.CALL);
    }
}

Client:

@Component
public class Client {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendCall(String content) {
        for (int i = 0; i < 10000; i++) {
            String message = i + "-" + content;
            System.out.println(String.format("Sender: %s", message));
            rabbitTemplate.convertAndSend(MQConstant.CALL, message);
        }
    }
}

Server:

@Component
public class Server {

    @RabbitHandler
    @RabbitListener(queues = MQConstant.CALL)
    public void callProcess(String message) throws InterruptedException {
        Thread.sleep(1000);
        System.out.println(String.format("Receiver: reply(\"%s\") Yes, I just saw your message!", message));
    }

}

Result:

Sender: Hello, are you there!
Receiver: reply("Hello, are you there!") Yes, I just saw your message!

以上示例会在 rabbitmq 中创建一条队列 CALL, 消息在其中等待消费:

在此基础上的简单扩展我就不再写案例了,比如领域模块完成了其核心业务规则之后可能需要更新缓存、写个邮件、记个复杂日志、做个统计报表等等,这些不需要及时反馈或者耗时的附属业务都可以通过异步队列分发,以此来提升核心业务的响应速度,同时如此处理能让领域边界更加清晰,代码的可维护性和持续拓展的能力也会有所提升。

2. 削峰

上个示例中我提到的应用场景是解耦和通知,再接着扩展,因其具备良好的缓冲性质,所以还有一个非常适合的应用场景那就是削峰。对于突如其来的极高并发请求,我们可以先瞬速地将其加入队列并回复用户一个友好提示,然后服务器可在其能承受的范围内慢慢处理,以此来防止突发的 CPU 和内存 “爆表”。

改造之后对于发送方来说当然是比较爽的,他只是将请求加入消息队列而已,处理压力都归到了消费端。接着思考,这样处理有没有副作用?如果这个请求刚好是线程阻塞的,那还要加入队列慢慢排队处理,那不是完蛋了,用户要猴年马月才能得到反馈?所以针对此,我觉得应该将消费端的方法改为异步调用(即多线程)以提升吞吐量,在 Spring Boot 中的写法也非常简单:

@Component
public class Server {

    @Async
    @RabbitHandler
    @RabbitListener(queues = MQConstant.CALL)
    public void callProcess(String message) throws InterruptedException {
        Thread.sleep(100);
        System.out.println(String.format("Receiver: reply(\"%s\") Yes, I just saw your message!", message));
    }

}

参照示例一的方法,我发布了 10000 条消息加入队列,且消费端的调用每次阻塞一秒,那可有意思了,什么时候能处理完?但如果开几百个线程同时处理的话,那几十秒就够了,当然具体多少合适还应根据具体的业务场景和服务器配置酌情考虑。另外,别忘了配线程池:

@Configuration
public class AsyncConfig {

    @Bean
    public Executor asyncExecutor(){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(500);
        executor.setQueueCapacity(10);

        executor.setThreadNamePrefix("MyExecutor-");

        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
}

3. Exchange

RabbitMQ 可能为 N 个应用同时提供服务,要是你和你的蓝颜知己突然心有灵犀,在不同的业务上使用了同一个 routingKey,想想就刺激。因此,队列多了自然要进行分组管理,限定好 Exchange 的规则,接下来就可以独自玩耍了。

MQConstant:

public class MQConstant {

    public static final String EXCHANGE = "YOUCLK-MESSAGE-EXCHANGE";

    public static final String CALL = MQConstant.EXCHANGE + ".CALL";

    public static final String ALL = MQConstant.EXCHANGE + ".#";
}

RabbitConfig:

@Configuration
public class RabbitConfig {

    @Bean
    public Queue callQueue() {
        return new Queue(MQConstant.CALL);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange(MQConstant.EXCHANGE);
    }

    @Bean
    Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessage).to(exchange).with(MQConstant.ALL);
    }
}

此时我们再去查队列 CALL,可以看到已经绑定了Exchange:

当然 Exchange 的作用远不止如此,以上示例为 Topic 模式,除此之外还有 Direct、Headers 和 Fanout 模式,写法都差不多,感兴趣的童鞋可以去查看 “官方文档” 进行更深入了解。

4. 延时队列

延时任务的场景相信小伙伴们都接触过,特别是抢购的时候,在规定时间内未付款订单就被回收了。微信支付的 API 里面也有一个支付完成后的延时再确认消息推送,实现原理应该都差不多。

利用 RabbitMQ 实现该功能首先要了解他的两个特性,分别是 Time-To-Live Extensions 和 Dead Letter Exchanges,字面意思上就能理解个大概,一个是生存时间,一个是死信。整个过程也很容易理解,TTL 相当于一个缓冲队列,等待其过期之后消息会由 DLX 转发到实际消费队列,如此便实现了他的延时过程。

MQConstant:

public class MQConstant {

    public static final String PER_DELAY_EXCHANGE = "PER_DELAY_EXCHANGE";

    public static final String DELAY_EXCHANGE = "DELAY_EXCHANGE";

    public static final String DELAY_CALL_TTL = "DELAY_CALL_TTL";

    public static final String CALL = "CALL";

}

ExpirationMessagePostProcessor:

public class ExpirationMessagePostProcessor implements MessagePostProcessor {
    private final Long ttl;

    public ExpirationMessagePostProcessor(Long ttl) {
        this.ttl = ttl;
    }

    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
        message.getMessageProperties()
                .setExpiration(ttl.toString());
        return message;
    }
}

Client:

@Component
public class Client {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendCall(String content) {
        for (int i = 1; i <= 3; i++) {
            long expiration = i * 5000;
            String message = i + "-" + content;
            System.out.println(String.format("Sender: %s", message));
            rabbitTemplate.convertAndSend(MQConstant.DELAY_CALL_TTL, (Object) message, new ExpirationMessagePostProcessor(expiration));

        }
    }
}

Server:

@Component
public class Server {

    @Async
    @RabbitHandler
    @RabbitListener(queues = MQConstant.CALL)
    public void callProcess(String message) throws InterruptedException {
        String date = (new SimpleDateFormat("HH:mm:ss")).format(new Date());
        System.out.println(String.format("Receiver: reply(\"%s\") Yes, I just saw your message!- %s", message, date));
    }

}

Result:

Sender: 1-Hello, are you there!
Sender: 2-Hello, are you there!
Sender: 3-Hello, are you there!
Receiver: reply("1-Hello, are you there!") Yes, I just saw your message!- 23:04:12
Receiver: reply("2-Hello, are you there!") Yes, I just saw your message!- 23:04:17
Receiver: reply("3-Hello, are you there!") Yes, I just saw your message!- 23:04:22

结果一目了然,分别在队列中延迟了 5秒,10秒,15秒,当然,以上只是我的简单示例,童鞋们可翻阅官方文档(“ ttl ” && “ dlx ”)进一步深入学习。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏编舟记

一步步编写SonarQube Plugin

插件确实不好写,因为插件是插入庞大的系统当中工作的,那也就意味着写插件需要具备一定的领域知识,包括系统架构、扩展点、业务共性及差异、API及其业务模型对应、安装...

7943
来自专栏逸鹏说道

我这么玩Web Api(一)

帮助页面或用户手册(Microsoft and Swashbuckle Help Page) 前言   你需要为客户编写Api调用手册?你需要测试你的Api接口...

3135
来自专栏Golang语言社区

Go语言开发RESTFul JSON API

也许我们之前有使用过各种各样的API, 当我们遇到设计很糟糕的API的时候,简直感觉崩溃至极。希望通过本文之后,能对设计良好的RESTful API有一个初步认...

1.3K3
来自专栏coolblog.xyz技术专栏

Spring IOC 容器源码分析系列文章导读

Spring 是一个轻量级的企业级应用开发框架,于 2004 年由 Rod Johnson 发布了 1.0 版本。经过十几年的迭代,现在的 Spring 框架已...

29510
来自专栏WindCoder

微信小程序踩坑记-Java基于SSM下的post请求

最近在持续踩微信小程序的坑,canvas和WebSocket的暂时还没找到相关的解决方案,暂时先将post请求无法获取data参数的坑填上。直接附上解决方案,已...

1.1K1
来自专栏Java成神之路

分布式_事务_02_2PC框架raincat源码解析

上一节已经将raincat demo工程运行起来了,这一节来分析下raincat的源码

2271
来自专栏张戈的专栏

移动搜索SEO:网站移动适配之Meta标注、移动跳转终结篇

这些天,在给博客的标签页(tag)添加跳转和 META 动态申明时,居然让我醍醐灌顶,发现之前的动态适配的做法是多么的苦逼和小白! 总结前,先来回顾下小白张戈在...

5136
来自专栏向治洪

android获取设备唯一标示

概述 有时需要对用户设备进行标识,所以希望能够得到一个稳定可靠并且唯一的识别码。虽然Android系统中提供了这样设备识别码,但是由于Android系统版本、...

1.8K7
来自专栏老码农专栏

原 荐 一场版本升级引发的性能血案 - 之数

2363
来自专栏FreeBuf

开源BUG跟踪平台JIRA目录遍历漏洞分析

作者 Taskiller 最近,一则新发布的公告报告了一个影响Jira 5.0.11和6.0.3版本的目录遍历漏洞,该漏洞在去年7月份被验证,并在接下来的几个月...

4326

扫码关注云+社区

领取腾讯云代金券