专栏首页Java架构沉思录MQ·将多消息合并为一条消息的发送、消费的设计与实现

MQ·将多消息合并为一条消息的发送、消费的设计与实现

以下文章来源于Java艺术,作者wujiuye

优质文章,及时送达

这是笔者最近处理一个叫异步大点击的业务问题所思考出来的方案。由于mq使用的是亚马逊的sqs服务,而sqs是按请求数消费的原因,所以才有的将多消息合并为一条消息发送的想法。

这个想法从sqs的消息批量发送以及阿里限流中间件的qps统计、netty的EventLoopGroup设计中得到启发。本篇将介绍如何将多个消息合并成一个消息发送而不影响服务的并发性能,以及由于合并后产生的大消息消费出现的消息堆积现象,开的消费者越多反而消息堆积越多的bug。

为什么要将多消息合并为一个消息发送?

前面也说了,为了节约成本。以每分钟50w的广告点击数来算,一个月将产生50*60*24*31w的点击消息,再乘以3就是每个月的sqs请求数,3代表的是发送消息、拉取消息、删除消息,按每100w请求0.4美刀的价格计算大概一个月要26784美刀。

由于sqs限制单条消息的大小最大为256k,根据业务场景估算每点击消息也不可能达到1k,,所以我将256个请求合并为一个消息发送,或者1s内未达到256个消息也合并为一个消息发送,这样每月的费用可以直接除以256,这不是一个小数目。

什么样的业务场景下才适合这么干?

将大量消息合并为一个消息后会导致消息消费失去原子性。你无法保证原本是256个消息的合并为一个消息后,这256个消息能全部消费成功或者全部消费失败,因此要求业务必须允许消息消费失败直接丢弃的情况。无论多少个成功多少个失败,都需要将整条消息从mq中删除。笔者考虑过这个问题才决定是否要这样做的,也考虑过失败重试的问题,但我觉得没必要为这种概率买单,因为一个点击在非异步的情况下,失败就是失败了。

如何将大量消息合并为一条消息发送而不影响服务的高并发性能呢?

其实不影响是不存在的,只是让影响变得微弱。经过长时间的观察,我了解该高并发服务对内存的消耗并不高,最大qps下也就消耗1.5g左右的堆内存,而netty使用的直接内存大概在2g这样,对于2核8g的机器,有足够多的内存来实现队列缓存消息。

我借签Dubbo的客户端与服务端配置多个连接时使用轮询方式使用连接,同时也借签了netty的EventLoop的设计,实现消息合并发送。我定义一个MesaageLoopGroup,一个MesaageLoopGroup可以配置有多少个MesaageLooper,而每个MesaageLooper就是一个线程,且维护一个阻塞队列,默认队列大小是102400,这个数字是我配置单个进程所能打开的最大文件句柄数。

当往MesaageLoopGroup push一个点击消息时,先用原子类自增1与MesaageLooper数组的长度取余,选出一个MesaageLooper。然后再将消息push到这个MesaageLooper的阻塞队列。

每个MesaageLooper的run方法实现的就是一个死循环,从阻塞队列中拿消息,当消息等于256时,或者阻塞超过1s就将拿到的消息合并成一个消息发送到mq。如果阻塞队列满,那么push会直接将消息发送到mq。因此,服务重启时如果使用kill 9强行结束进程,至多只会有1s的数据丢失。设置1s还有一个原因就是控制消息的实时性。

灰度上线测试一天后也证明此方案对服务的影响并不大,无论是gc还是内存占用,都看不出加了这么一层逻辑。1s的平均请求按50w计算,四台机器分担,每个服务的每秒请求数平均是2000。

为何用golang实现消费者?

然而消息的消费并不顺利。一个是因为消息消费我用了golang实现,我也是刚入门,写起代码来还感觉别扭,二是一个消息是由原本256个消息组合而成的问题。

使用golang其实是有原因的。原本计划是让消费者占用较小的内存,以实现将消费者寄生在其它服务所在的机器上,充分利用其它耗内存而cpu利用率低的服务所在的机器。同时利用docker实现快速部署,让docker 的镜像更小,不需要安装jdk什么的。还有就是利用go的协程并发处理能力吧,让消费者消费消息的速度能赶上消息的产生速度。

为入门golang买单

为了便于理解,我还是以java的线程池来说明。假设我配置的线程池线程数量是512。寄生在其它服务的机器上需要给主人点面子,不能把人家的cpu全部吃完,导致主服务不可用,所以线程的数量结合消息的消费情况综合考虑,不能超过一半的cpu使用率,而选择512这个数量。

Sqs支持一次拉取多条消息,并且有一个可见性超时的特性,当消息被消费者拉取到之后,在多长时间内未删除,下次可能还会被拉取到,或者其它消费者还能拉取到。最初我设置的可见性超时是60s。

一开始我开启5个线程拉取消息,每次最多拉取10条消息。那么很可能同一时间内会拉取到50条消息。由于一条消息是由原本256条消息合并而成的,所以512个线程同一时间段至多只能消费2条消息,而一条消息(合并后的)的消费平均耗时是10s,也就是说一分钟内最多消费12条消息,其它38条消息在一分钟后会被其它消费者拉取到,所以就会出现大量消息重复消费的情况,久而久之,消息越积累越多。

我用golang的channel实现生产者与消费者,channel的大小可设置,当channel满时,拉取到的消息是放不进channel的,因此会将拉取线程阻塞住,只有消费者从 channel取数据才能继续放入。但阻塞的那段时间要小于消息的可见性超时,因为消息只有在开始消费时我才会将其从mq中删除。

后面的改进就是根据消费能力去调整消息的拉取线程数,以及每次拉取的消息数。还有一点要注意,为保证时刻有消息准备就绪开始消费,最好不要让消息消费完再从mq中拉取。但这也会导致另一个问题,一些消息拉取到本地后,由于channel已满,放不进,而其它空闲消费节点又拉不到,导致消息被消费到的时间延长。这就需要作出取舍。

祝大家在2020年工作顺路,家庭幸福,合家团圆

本文分享自微信公众号 - Java架构沉思录(code-thinker)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-02-09

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 如果面试官再问你消息队列,就把这篇甩给他!

    A 系统产生了一个比较关键的数据,很多系统需要 A 系统将数据发过来,强耦合(B,C,D,E 系统可能参数不一样、一会需要一会不需要数据,A 系统要不断修改代码...

    黄泽杰
  • 分布式系统事务一致性

    现今互联网界,分布式系统和微服务架构盛行。业界著名的CAP理论也告诉我们,在设计和实现一个分布式系统时,需要将数据一致性、系统可用性和分区容忍性放在一起考虑。

    黄泽杰
  • 算法篇之BitMap原理与改造,利与弊的取舍

    内存不足,一直都是一件令人头疼的事情,在有限的资源下,时间与空间的取舍是我们平时开发中思考最多的问题。无论是操作数据库还是Redis缓存,都没有直接使用内存缓存...

    黄泽杰
  • RabbitMQ消息的100%投递 顶

    上图中BIZ DB为我们的业务库,比方说保存订单;MSG DB为消息库,保存我们发送到MQ消息。如果在Step 3的时候,网络出现故障,Confirm机制没有收...

    算法之名
  • 跟我学RocketMQ之消息幂等

    链接:http://rocketmq.cloud/zh-cn/blog/tocloud-catalog.html

    好好学java
  • Redis高级特性之Pub/Sub与Stream

    在Stream之前,Redis PUB/SUB亦可可实现消息的传递及广播,但消息不支持持久化,不记录消费端状态,并且“Fire and Forgot”,可靠性无...

    vitofliu
  • 【PAT乙级】挖掘机技术哪家强

    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 ...

    喜欢ctrl的cxk
  • Iptables防火墙(SNAT和DNAT)

    L宝宝聊IT
  • 详解FIX协议的原理、消息格式及配置开发

    FIX协议是由国际FIX协会组织提供的一个开放式协议,目的是推动国际贸易电子化的进程,在各类参与者之间,包括投资经理、经纪人,买方、卖方建立起实时的电子化通讯协...

    宜信技术学院
  • 跑鞋的春天?阿迪达斯发布Futurecraft 4D款3D打印跑鞋

    镁客网

扫码关注云+社区

领取腾讯云代金券