基于akka的分布式实时消息系统

写在前面

ArchSummit 2016 全球架构师峰会已经于前几天落幕,我司CTO张斌在大会上做了专题演讲,和大家分享了基于 akka 的分布式实时消息系统,同时在会上表示已将整个解决方案开源

项目源码:https://github.com/goodrain/realtime-message-system

在我们的云市里,基于akka的分布式实时消息系统由三个独立的云应用组成,分别是:RTMWebsocket、RTMManager、RTMSharding。按照引导简单点击部署这三个云应用,一个可用的、完备的实时消息系统就诞生了。

云上一键部署:http://app.goodrain.com/category/0/?order_by=download&key_word=RTM

演讲实录

主持人:接下来把时间交给今天的第一位分享嘉宾,来自好雨科技 CTO 张斌先生,他今天将分享的主题是“基于 akka 的分布式实时消息系统”,掌声有请张先生!

张斌:首先欢迎大家来参加这个会场,很高兴和大家相聚在美丽的深圳。我今天给大家带来的演讲主题是“基于akka的分布式实时消息系统”这个系统是我们长期对移动互联网公司做技术支持总结的经验,今天把这个经验分享给大家。

今天演讲的目的有三个:一是介绍akka在分布式应用;二是分享基于akka的实时消息平台,这个平台能做什么呢?这个平台所有涉及到实时消息的,包括推送什么,都可以用这个平台轻松搞定。比如我们要实现直播室或者聊天室或者SIS的一个系统,甚至我要实现基于类似微信的服务端,都可以用这个平台轻松搞定;三是我们如何利用容器技术让该平台部署更加简单、更高效。我今天介绍akka的特点和它分布式的设计模式,以及我们如何通过这个设计模式设计一个消息推送平台,在这个平台有一定的特点,后面我会逐一介绍。

我先讲一下分布式实时系统,分布式系统是建在网络之上的软件系统,它的特点是为了解决单一系统资源不足,另外它的计算正确性不仅取决于它的程序逻辑的正确性,还取决于这个计算的时间,分布式实时系统还是强调实时的概念。这里面有两个点:一是它解决单一系统资源不足,二是时间性,因为我们设计一个分布式系统的时候,需要秒级得到响应,但是程序跑起来需要数分钟或者数小时才能满足结果,这样不符合我们的目标,也达不到我们的要求。我如何找到一款能够高效的,而且还能够同时比较及时的符合系统的设计框架。

分布式实时系统的特点,一是可扩展性,根据业务的变化进行扩容和缩容,它当前的量比较小,随着量增加,如何对系统进行扩容?当我的量变小的时候,如何进行缩容。二是时间性,每个任务执行一定要有时间约束,不能说放着任务随便执行。三是预测性,能够对任务执行的时间进行判断。四是稳定性,整个实时系统重要指标,比如说系统某一个节点挂了,或者一台机器宕机,系统有没有影响。你的部署,是滚动的还是停掉集群然后再升级,升级完之后再起来。五是交互性,这是比较重要的一点,一个分布实时系统一定要和外部进行通讯,否则就是闭环,只有积极响应外部请求,才能和外部打通,形成一个通路。

使用的场景有:实时计算、在线聊天、消息推送、在线游戏、实时推荐等等,还有很多其他场景,我相信大家在设计的时候会遇到。

分布式实时系统的难点,在其中事件如何处理,怎么保证数据不会出现不一致数据。集群内部的复杂网络,整个系统中网络节点如何通讯、如何感知,以及节点之间如何通讯,通讯的延时和失败,因为在分布式系统里面比较重要的就是网络,网络不仅取决于本地网络,还取决于外界的情况。如果外界网络不稳定或者出现瞬断,它能不能自恢复或者有没有影响。还有部署的复杂度,比如现在有几十个节点的集群,我要升级一个集群的应用程序是怎么处理?先滚动升级,滚动升级是一个节点的升级、关闭都不会对系统有影响,还是先把集群停掉之后再把程序升级完再开,这个有没有顺序。整个系统中的消息传递的过程,因为到底我们采用什么样的传送协议,消息是否需要序列化。分布式系统的这些特点和难点,我们看看akka是如何处理的。

sacla是一门多范式的编程语言,sacla编写的语言是跑在java虚拟机上的,akka是sacla开发的一个库,用于编写可容错、高可伸缩的模型。akka模型很早提出来,有很多公司使用它,比如twitter、亚马逊等等,我知道国内也有很多公司也是基于akka做大量的数据交易等等。

在akka里面我们所有事物都可以扮演actors的角色,就是演员的角色。所有事物都可以定位为演员,面向函数的变化中,所有元素都可以定位为函数。actor是独立的,它有标识和当前行为描述。消息传递是非阻塞和异步的,保存在邮件队列,按顺序处理。所有消息发送都是并行的,接收消息所采取的动作都是并行的。这里面是说发送消息的actor都是采用并行处理,收到信息的actor是并行的,而且非阻塞和异步的。actor之间是独立的,不会进行通讯,他们通讯的唯一方式就是通过消息队列进行处理。actor内部是有状态的,它里面不用分享状态,这决定我们可以用akka构件一个分布式有状态服务,我想在座的各位如果要设计分布式、有状态的服务还是比较困难的,因为可以通过其他的系统协同的组件等去做,但是akka在这方面非常方便,本来每个actor中间有状态,不用分享状态,唯一通信就是消息队列进行通信,你给我发消息、我给你发消息,actor接收消息再进行处理。actor都是运行在自己的线程中,基于反映的DDD、CQRS事件溯源架构。actor内部是可容错的。

前面讲了actor的特点,下面讲actor的常用设计模式。这个地方顺便过一下,每个actor有一个行为,每个actor里面有状态,它里面是通过一个东西进行交易通讯,里面有自己的运行线程。

它的设计模式有单一模式、路由分发等,下面我们简单过一些线程模式。

单一模式在分布式实时系统中,要实行单一模式,就说整个系统在某一个时刻只有一个实例进行运行,这个模式确保有且只有一个实例,比如我们做统一流水号生成的服务,它是单一服务,而不是分布式服务,还有定单号。所以我们加载配置文件都可以用这种模式。

还有路由分发,消息通过高效路由整个集群来提升。这个地方就是通过router把整个路由消息分布式集群中,充分利用集群的网络能力和计算能力,比如说我现在有一个消息要给一百万人发,单一机器会循环100次或者通过多线程,或者把人分成不同的组进行分发提高效率。但是akka里面只要有集群,就可以把这个分散到集群上面去,然后发出去。如果是千万的人发送,不需要改集群,只需要增加节点数,充分使用整个集群的网络能力和计算能力。

下面是Pub、Sub,我要把一条消息发送给集群中的另一个对象,我们称之为actor,但是我不知道接受方在哪个节点,就可以用这种模式进行处理。还有我要把一个消息发送已注册改进的所有actor上,比如现在有一个热门的话题,大家都在订阅,当有一条消息发送,就可以有一个新的事件或者新的消息时,我就把消息发送给所有感应人。

下面是Cluster client,这样可以让集群和外部能够通讯。client可以是另外一个集群,我们一定要响应外部的请求,分布式系统是一个集群,但是集群怎么外部通讯就可以用这个。

下面是sharding,目的是解决单一机器资源不足的压力,前面讲的actor,一切事物都可以看成actor,我们把它根据一定的规则把它sharding到整个集群上去,即使我这里面事物再多,actor再多,随着集群节点的扩容,我可以轻松满足所有需要的东西。这是actor的设计模式。

今天进入我们的重点,我们是如何基于akka进行分布式实时系统的设计呢?

先看一个它的使用场景,当节点挂掉之后它可以迁移过来,实时消息的推动场景,现在就是这两者之间,当客户端发起异步端服务请求时,等待服务端响应,它不会立即反映,而是hold链接,该处理时一定是非阻塞。当服务端有数据的时候,主动推送数据到客户端,客户端接到数据进行数据渲染,显示、处理。举一个例子,比如说现在是订阅聊天室,加入聊天室就相当于建立链接,聊天室的服务器会把链接保持住,有新消息的时候会发送给你,因为客户端再进行数据的渲染和显示。

我们这块地方基于akka的消息推送的结构图,我抽象说三层,第一层是协议层,协议层我们是可以支持多协议,比如websocket,还有其他协议。中间一层是topic sharding cluster,因为这里面都可以扮演成actor,我们现在称之为主题,主题的sharding集群,我可以把它sharding到不同集群上,这个规则是比较灵活,自己可以根据自己的业务场景去进行编写。下一层是持久化层,这个地方用于消息化的持久化,当消息系统化,重启之后可以恢复,保持消息的持久性。总共就是这三层。

整个系统的架构,ETCD,相信在座的各位在做微服务架构的时候,这些用于服务发现的属性大家都有发现。这个地方我整合成了ETCD,中间就是主题的sharding集群,ETCD用于sharding集群的服务发现。后面是协议层,协议层可以定义websocket等,后面还有mongoDB,基于事件溯源的消息载体。还有topic manager,是用于定义主题,这个地方可以定义在多租户下定义主题,包括主题消息的模板,消息如果渲染等等这些功能。还有消息是否持久化,都可以通过主题的manager来管理它。下面讲一下他们的功能。

ETCD是服务发现的组件,它提供Seed节点的发现,大家使用akka的时候,seed节点的时候是需要显示申明,我的集群中seed节点是什么,这里用这个方法,整个集群不用显示申明我的seed节点。集群中seed节点变化之后,我也不用关心显示的再配置seed节点在哪儿。二是主题的sharding集群,每个主题在集群中有且只有一个,前面讲我们每个topic都是内部有状态的,这个就是有状态的分布式系统,每个actor自己维护自己的内部状态。但是每个topic,根据业务逻辑可以定义为子topic。现在一个有状态的主题,主题的订阅人数很多,有200万的在线订阅数。当有一条消息过来的时候,actor要把消息发送给200万订阅的时候,是需要很长时间。但是我根据订阅值进行分区,比如每20万订阅值,基于topic创建一个子topic,相当于200万人分了10个子topic,如果集群里面有好几十个节点,那它就是均分到各个集群的节点上去。如果有消息过来,我可以用集群的网络能力和计算能力,原来一个topic要发送200万订阅,现在变成每条消息给它子topic的20万发送,整个也是很快,充分利用集群的网络能力和计算能力。提供多种的推送协议,不同协议都可以多协议支持。web manager主题管理者,一个消息过来接成串,现在定义一个ATTI的模板,我可以把消息进行渲染之后比较好看的,你期望的显示。还有mongoDB,消息的持久化。

讲一下它的工作流程,用户订阅一个主题的时候,根据他使用的协议到协议层找到对应的协议的服务去进行订阅请求,协议层会把定义请求转化成actor,然后你在主题上定义。当你定义主题的actor有一个消息过来的时候,你订阅主题的actor会把这条消息发给所有订阅它的actor,这个actor是协议层根据客户端链接,不同链接、不同协议过来之后,通过转化的actor,当协议层的actor收到消息的时候,他会把这个消息推送给客户端,客户端进行处理。这里面有两点,客户端不会和感兴趣的主题直接订阅,而是通过协议层转换,转换成actor,让actor进行转换,相当于每个订阅者就会生成一个actor,每个actor再订阅他所感兴趣的主题。第二点是actor,当你订阅主题actor有消息过来,这个消息不会直接发送给客户端,而是发给订阅自己的actor上,这个actor就是刚才协议层产生的actor,协议层收到这个消息的时候,他再根据你的请求推送给客户端。这和我们做的web socket有差别,以前是客户端和主题进行关联,这是通过协议层把你建立这个web socket转化为actor,让actor和你的主题产生关系。导致整个系统可以采用任何不同的协议客户端,都可以用我们这个系统,而且这个系统最终产生订阅关系是你通过协议层产生的后端actor订阅,这端是不变的。只是actor里面有不同协议的链接而已。

举个例子,比如web socket场景,用户发起这个请求,当到web socket的请求,它到后端整个主题集群中产生订阅关系,当后端产生集群关系的时候,有一条消息过来的时候,这个消息会发送订阅的actor,而不会发到客户端上。当协议层的web socket上某个actor收到消息之后,因为它和用户是绑定的,用户使用协议是绑定的,通过这个协议发送给客户端。一个好处酒席客户端跟着协议层,可以通过不同的协议进行关联,协议层真正定义的时候,就用actor自身的性质去做。我们在前面讲actor是通过actor标志去确定一个actor。

下面是系统的特性,协议层支持多协议,节点无限扩容,我可以支持协议层面web socket协议、socket协议等等。当我们现在有10万个在线用户订阅的时候,我一个web socket协议可以服务,如果一百万人,就要很多点,我们需要加点,点随便加,因为这不影响,客户可以任何连到任何web socket服务器上。解决的痛点是链接数过多。一百万人、两百万人都可以轻松搞定。

主题的sharding集群层支持主题动态扩容,解决主题过多、资源不够。如果有10个主题,那一两个节点可以搞定,如果有10万、20万个主题,怎么办?可以每个主题都把它看作actor,根据一个sharding规则,可以到集群上去,只要加集群的节点就可以。订阅主题过多,支持订阅者分区,也就是前面讲为什么分区,充分利用网络的计算能力和网络能力,把消息通过第一时间发送给用户。现在一百万人,根据订阅者不同来进行分区,把整个topic分成多个子topic,然后再分到集群上。进行事件溯源和容灾恢复,事件溯源的本质就是通过日志或者事件恢复到任意状态。就是说akka刚好基于事件溯源的架构方式,它能够在机器宕掉之后或者节点失掉之后进行恢复,恢复到任意状态。再就是根据日常的业务情况设定,到底恢复到哪个状态。这是具体的逻辑,和业务非常相关。支持自动休眠和唤醒,这个概念在其他地方也有,分布式实时系统里面怎么进行休眠和唤醒,整个系统中主题使用是有周期性的,这段时间内某些主题比较活跃,某些主题不活跃,我们把不活跃的主题进行冷数据的传输,让它在系统集群中消失,当它进行活跃的时候,我再从持久化的存储上进行加载,让它恢复到系统里面去。这个地方的好处是我解决系统会从开始到结束之后,系统不同的容量增加,减少了整个系统的负担。还有每个主题维护订阅自己的客户,这个地方从图里面看,订阅者不会和关注的主题进行直接发送关系,而是通过一个转化,转化成actor和后面的主题发送关系。这就解决链接状态的问题。我前面协议层无状态,随便扩容、接连,最终有状态的都是在后面的集群主题的sharding集群里面。还有基于actor模型的消息推送,这个在actor模型里面,在本机测试是能够每秒400-500万的消息推送量,分布式里面就是节点与节点、机器与机器的远程推送在十几二十万的推送。

下面就是整个平台的应用场景,发布订阅,比如现在要做pub、sub的订阅系统,这个平台可以搞定。点对点,比如我和好友之间聊天,也可以实现,因为我们也可以把好友模拟成actor,actor直接通讯。还有一个Restful,我们后面会到基于restful我们的使用经验。如果有一个系统数据要求比较实时,我们可以用这个系统,前面通过restful进行封装,这个系统就可以做基于它的分布式实时系统。还有定向通知,比如发短信、邮件、微信等,我要一次性给所有邮件发通知或者邮件或者微信等。

一个好的系统是优化出来的,我们这块地方做优化花了很长时间,把这个优化分享给大家。actor游戏存放在消息队列里面,消息队列并不是一次性全部执行完,而是根据吞吐量进行设置,如果你有效应就等待,没有就不等待。执行有两种方式。

akka remote配置优化,remote actor是基于netty进行通讯的。

基于虚拟化,akka默认的序列化是java序列化,它会产生瓶颈。我们采用的是Kryo和fst对象序列化,它快速实现热数据加载、冷数据销毁,我们当时一个50M的actor,存的数据,冷却不到1秒,加载不到1秒。还有序列化,这个速度大概是JDK的4-10倍,大小是JDK的1/3,分布式里面消息越小通讯越快。

这是一个性能测试,现在我们前提是在一个节点集合里面,有一个主题是模拟一百万用户去链接,这是长链接。当我们把主题分成4个子主题,相当于25万个长连接。第一个人发送到最后一个人发送完需要6秒时间。当我们把主题分成两个,就是两个子主题,相当于每个子主题承担的是50万的链接数。来一条消息从开始到发送结束需要12秒,我们不需要把子主题拆分,相当于一个主题上有100万的订阅数,我从第一条发送到最后完需要23秒。

这是我们一个客户里面真实的数据,这个集群的规模是60个节点,基础数据是700G,冷热数据比是6:4或者7:3之间,每天的数据是每天要实时处理500万条,但不是均衡的,而是有高峰和低谷的差别。每个集群处理的时间是6毫秒。当时我们的测试是1.5万QPS,这里面还有当一个请求去的时候,需要其他集群异步取完数据,然后聚合,聚合完之后转换给用户。这和普通的测试不一样,普通测试是一个请求过来就加载数据,加载完之后再反馈给用户,通过一个请求过来之后会产生异步的请求,你请求到会聚完之后再进行聚合,聚合再返回给自己。这不是简单的调用。请求是根据请求日志统计,每天1.2亿。当时峰值压力是每天能够达到7-8亿的接口请求。

我们现在把这个系统整理了一下,里面包含了整个结构的如何架构,包括它的优化,因为优化我们花了很长时间。还有一些微服务的服务发现,我们通过ETCD做的。基于这个地方,大家有兴趣的可以上去看一看,大家共同交流。

基于这块地方,我们是做基于容器技术的应用管理平台,我们也把它封装成一个服务,放在我们的服务市场里面。也是相当于线程的产品,是基于web socket推送的,相当于定义一个主题。大家有兴趣可以上去看看,几秒钟就可以拥有一个自己的分布式实时系统。我们把它放在这三个里面,在部署完之后,根据我们的使用情况,比如我们现在的业务量进行水平扩容和垂直扩容,现在有一个sharding比较小,就需要一两个节点,量大的时候需要几个节点,可以秒级进行水平扩容和垂直扩容。

我今天的演讲到此结束。欢迎大家下来交流,共同学习、共同分享。谢谢大家!

精彩问答

主持人:感谢张老师的精彩分享,真的是干货满满。接下来是提问环节。

提问:从这个图看,我们一个集群20个节点,客户连接处25万、50万和100万。张斌:客户端连接数是100万,把它分到不同的子节点。

提问:20万节点、100万链接,平均一台5万链接?张斌:不能这么平均,一个主题分成4个子主题,每个字主题25万的链接。

提问:20个集群的节点有多少接入节点,一个集群里面包括数据库?张斌:一个主题的sharding节点,不包括协议层的节点和数据库的。

提问:100链接数有多少个接入服务器?就是客户端连接服务端的服务器。张斌:有4台。

提问:耗时,我不太了解是一条消息拆分成100万份?张斌:不是,从第一个接收到消息和最后一个接收到消息的时间差。

提问:你们为什么选择akka作为你们的服务推动器,和它对标或者相似的产品有哪些?张斌:我从2010年开始接触sacla语言,akka我从1.7版本一直跟到现在2.48,这块我还是比较熟的。

提问:akka可不可以用于做多个服务器之间传递消息或者传递数据,我想从这个传到另外一个服务器上?张斌:可以,akka不仅用于应用层,还可以用于网络层,有一个开源软件,大家可以查一下就是用akka实现的。

主持人:再次感谢大家的提问,因为时间有限,想要和张老师进一步沟通交流可以去马德里7交流室,大家可以私下进一步沟通交流。

原文发布于微信公众号 - 好雨云(goodrain-cloud)

原文发表时间:2016-07-28

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Java架构师学习

深入理解大型网站架构的核心——了解性能

1683
来自专栏Eternally运维

FinalShell – SSH终端,同屏SFTP,同步目录切换一体化服务器管理

简介 FinalShell是一体化的的服务器,网络管理软件,功能强大的开发,运维工具,充分满足开发,运维需求.

2161
来自专栏.NET技术

关于分布式事务、两阶段提交协议、三阶提交协议

随着大型网站的各种高并发访问、海量数据处理等场景越来越多,如何实现网站的高可用、易伸缩、可扩展、安全等目标就显得越来越重要。

4182
来自专栏JAVA技术zhai

大话微服务架构的故障隔离及容错处理机制

8、限流器和负载开关(Rate Limiters and Load Shedders)

2782
来自专栏纯洁的微笑

[高级]关于分布式事务、两阶段提交协议、三阶提交协议

在分布式系统中,为了保证数据的高可用,通常,我们会将数据保留多个副本(replica),这些副本会放置在不同的物理的机器上。为了对用户提供正确的增\删\改\差等...

1243
来自专栏VMCloud

【腾讯云的1001种玩法】在腾讯云上创建您的SQL Server 故障转移集群(2)

在上一篇文章中我们介绍了如何在QCloud标准化的搭建一套域环境,并介绍了如何在生产过程中如何避免一些坑,今天,我们来介绍此次demo中真正需要注意的一些细节及...

5430
来自专栏企鹅号快讯

Java开发的几个注意点

将一些需要变动的配置写在属性文件中 比如,没有把一些需要并发执行时使用的线程数设置成可在属性文件中配置。那么你的程序无论在DEV环境中,还是TEST环境中,都可...

1826
来自专栏散尽浮华

网站系统架构梳理-解决高负载高并发

随着互联网业务的不断丰富,网站系统架构已经细分到方方面面,尤其对于大型网站来说,所采用的技术更是涉及面非常广,从硬件到软件、编程语言、数据库、WebServer...

47211
来自专栏互联网高可用架构

分布式服务化系统一致性的“最佳实干”

2325
来自专栏進无尽的文章

聊聊工程级别的组件化、插件化 以及 模块化

我们经常会听到组件化、插件化、模块化这三个概念,可是我们真的对这三个概念了解吗?明白它们三者之前的关系和区别吗?本文就我个人的理解做一下简单的总结,如有错误之处...

5594

扫码关注云+社区

领取腾讯云代金券