交易系统使用storm,在消息高可靠情况下,如何避免消息重复

概要:在使用storm分布式计算框架进行数据处理时,如何保证进入storm的消息的一定会被处理,且不会被重复处理。这个时候仅仅开启storm的ack机制并不能解决上述问题。那么该如何设计出一个好的方案来解决上述问题?

现有架构背景:本人所在项目组的实时系统负责为XXX的实时产生的交易记录进行处理,根据处理的结果向用户推送不同的信息。实时系统平时接入量每秒1000条,双十一的时候,最大几十万条。

原文和作者一起讨论:http://www.cnblogs.com/intsmaze/p/6219878.html

新浪微博:intsmaze刘洋洋哥

架构设计:

  storm设置的超时时间为3分钟;kafkaspout的pending的长度为2000;storm开启ack机制,拓扑程序中如果出现异常则调用ack方法,向spout发出ack消息;每一个交易数据会有一个全局唯一性di。

  处理流程:

  交易数据会发送到kafka,然后拓扑A去kafka取数据进行处理,拓扑A中的OnceBolt会先对从kafka取出的消息进行一个唯一性过滤(根据该消息的全局id判断该消息是否存储在redis中,如果有,则说明拓扑A已经对该消息处理过了,则不会把该消息发送该下游的calculateBolt,直接向spout发送ack响应;如果没有,则把该消息发送该下游的calculateBolt。),calculateBolt对接收到来自上游的数据进行规则的匹配,根据该消息所符合的规则推送到不同的kafka通知主题中。

  拓扑B则是不同的通知拓扑,去kafka读取对应通知的主题,然后把该消息推送到不同的客户端(微信客户端,支付宝客户端等)。

架构设计的意义:

  通过借用redis,来保证消息不会被重复处理,对异常的消息,我们不让该消息重发。

  因为系统只是对交易成功后的数据通过配置的规则进行区分来向用户推送不同的活动信息,从业务上看,系统并不需要保证所有交易的用户都一定要收到活动信息,只需要保证交易的用户不会收到重复的数据即可。

 但是在线上运行半年后,还是发现了消息重复处理的问题,某些用户还是会收到两条甚至多条重复信息。

  通过对现有架构的查看,我们发现问题出在拓扑B中(各个不同的通知拓扑),原因是拓扑B没有添加唯一性过滤bolt,虽然上游的拓扑对消息进行唯一性过滤了(保证了外部系统向kafka生产消息出现重复下,拓扑A不进行重复处理),但是回看拓扑B,我们可以知道消息重发绝对不是kafka主题中存在重复的两条消息,且拓扑B消息重复不是系统异常导致的(我们队异常进行ack应答),那么导致消息重复处理的原因就一定是消息超时导致的。ps:消息在storm中被处理,没有发生异常,而是由于集群硬件资源的争抢或者下游接口瓶颈无法快速处理拓扑B推送出去的消息,导致一条消息在3分钟内没有处理完,spout就认为该消息fail,而重新发该消息,但是超时的那一条消息并不是说不会处理,当他获得资源了,仍然会处理结束的。

 解决方案:在拓扑B中添加唯一性过滤bolt即可解决。

个人推测:当时实时系统架构设计时,设计唯一性过滤bolt时,可能仅仅是考虑到外部系统向kafka推送数据可能会存在相同的消息,并没有想到storm本身tuple超时导致的消息重复处理。

该系统改进:虽然从业务的角度来说,并不需要保证每一个交易用户都一定要收到活动信息,但是我们完全可以做到每一个用户都收到活动信息,且收到的消息不重复。

我们可以做到对程序的异常进行控制,但是超时导致的fail我们无法控制。

  我们对消息处理异常控制,当发生异常信息,我们在发送fail应答前,把该异常的消息存储到redis中,这样唯一性过滤的bolt就会对收到的每一条消息进行判断,如果在redis中,我们就知道该消息是异常导致的失败,就让该消息继续处理,如果该消息不在redis中,我们就知道该消息是超时导致的fail,那么我们就过滤掉该消息,不进行下一步处理。

这样我们就做到了消息的可靠处理且不会重复处理。

博主解决的是90%的问题,主要是因为:

1,彻头彻尾的异常是不会给你写redis的机会的,只能说绝大多数时候是OK的。
2,超时的任务最终也可能运行成功,这也会导致你做了2次。

我的看法:
既然是交易系统,最重要的就是业务本身满足幂等性和可重入,架构上容错导致的重试和重入,都不应该导致业务错乱。

所以,我认为在架构上能做的,是要保障at least once,博主判断redis不存在就认为是超时重发,殊不知超时的bolt可能很久之后异常退出,这样消息就没有人处理了。

不过具体场景具体分析,看业务需求取舍既可。
    超时的任务最终也可能运行成功,这也会导致你做了2次。(ps:这个不会,我们认为超时的任务最终会处理成功,所以再次发送,我们会在唯一性过滤bolt中把该消息过滤掉)
   超时的bolt可能很久之后异常退出,这样消息就没有人处理了(ps:这个我要研究下,就是超时后,再异常向spout发送fial响应是否还会重发消息,如果还会重发,那么就可以保证该异常消息可以再一次被处理)

  彻头彻尾的异常是不会给你写redis的机会的,只能说绝大多数时候是OK的。(ps:正确,但是是不可控的吧,就像kafka把offset存储在zookeeper中,如果zookeeper挂掉就没有办法,确实绝大部分是ok
的,解决办法不知道有没有。)
  最重要的就是业务本身满足幂等性和可重入,架构上容错导致的重试和重入,都不应该导致业务错乱(ps:我不是很明白,我这里并不要求一条消息具备事务的特性和幂等性有什么关系)
以上是我对该朋友对本系统架构找出的问题的个人思考。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏崔庆才的专栏

付费代理的使用

1.2K40
来自专栏做全栈攻城狮

程序员,一起玩转GitHub版本控制,超简单入门教程 干货2

本GitHub教程旨在能够帮助大家快速入门学习使用GitHub,进行版本控制。帮助大家摆脱命令行工具,简单快速的使用GitHub。

12230
来自专栏Golang语言社区

Golang协程与通道整理

协程goroutine 不由OS调度,而是用户层自行释放CPU,从而在执行体之间切换。Go在底层进行协助实现 涉及系统调用的地方由Go标准库...

27970
来自专栏知晓程序

【好文回顾】小程序想要「任性推送」模板消息?这个办法可以一试!

「模板消息」能力,几乎是小程序唯一可以向用户主动推送消息的渠道。有了它,小程序就可以向用户发送重要的消息通知。

14820
来自专栏北京马哥教育

Puppet,Ansible,Saltstack 有哪些区别和联系

目前主流的自动化运维工具有puppet、ansible、saltstack,实际上每一个工具都基本上能够完成你的运维任务,也都是久经考验的。都有NB的地方,也有...

18820
来自专栏沈唁志

API接口开发简述示例

26820
来自专栏BeJavaGod

SSO - 我们为何需要单点登录系统

SSO,Single Sign On,也就是单点登录,保证一个账户在多个系统上实现单一用户的登录 现在随着网站的壮大,很多服务会进行拆分,会做SOA服务,会使用...

35350
来自专栏Linyb极客之路

分布式事务的实现原理

事务是数据库系统中非常有趣也非常重要的概念,它是数据库管理系统执行过程中的一个逻辑单元,它能够保证一个事务中的所有操作要么全部执行,要么全不执行;在 SOA 与...

17230
来自专栏做全栈攻城狮

GitHub这么火,程序员你不学学吗? 超简单入门教程干货2

本GitHub教程旨在能够帮助大家快速入门学习使用GitHub,进行版本控制。帮助大家摆脱命令行工具,简单快速的使用GitHub。

8620
来自专栏FreeBuf

企业安全漏洞通告引擎

? 背景 ? 如今大多数企业都在用漏洞扫描+漏洞通告,存在如下两个问题: 1、漏扫存在“扫描周期长、扫描库更新不及时”等情况,同时扫描报告中有无数干扰项,导致...

23950

扫码关注云+社区

领取腾讯云代金券