前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >消息队列可以有的样子

消息队列可以有的样子

作者头像
ImportSource
发布2018-04-03 14:24:30
1.4K0
发布2018-04-03 14:24:30
举报
文章被收录于专栏:ImportSourceImportSourceImportSource
铺垫

无论是什么类型的消息队列,恐怕都离不开三个东东:读取端 、消息存储平台、写入端,无论你给这三者起了什么样子的名字。也就是写入、存储、读取。

写入端通常被叫做“生产者”,producer。读取通常被叫做“消费者”,consumer。存储端则名字各有不同,比如nsq中叫nsqd;kafka叫做broker,broker也是消息中间件们比较爱使用的名字。

消息结构设计

消息体设计其实都是通用的。一个消息体总是由两部分组成:header和body。

这基本上是最主要的两个部分。header负责封装那些与除了消息内容本身之外的那些元数据,而body则很明显存储的是业务消息本身。

如果你学习过http servlet的设计,那么设计一个mq的消息体对你来说是一个比较容易的事情。

body和header在两端(producer、consumer)所呈现的接口又提供了类似getParameter的方式,也就是说header和body分别被封装成两个map。然后开发者通过get来获取。

当然了,如果你不想设计最外面的map。也可以直接抛给客户端一个byte数组,把消息格式交给开发者自己去定义,这样就可以支持任何用户自定义的序列化格式或者其它已有的格式如Apache Avro、protobuf等。

当然了,如果你了解过JMS的规范的话就知道JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。

· StreamMessage -- Java原始值的数据流

· MapMessage--一套名称-值对

· TextMessage--一个字符串对象

· ObjectMessage--一个序列化的 Java对象

· BytesMessage--一个未解释字节的数据流

消息体的大小

说到大小,基本上只要涉及到比较大的消息体,基本上性能都不好看。mq本身一般是不会限制消息体的大小,但实际使用的过程中,消息的体积还是尽量越小越好,其实你大部分时候处理的消息可能只有1k-10k左右的样子,总之不要太大。

消息队列可以具备的特性

  • 高吞吐量、低延迟:每秒可以处理几十万条消息,延迟最低只有几毫秒 当然这事很多事情取决于集群的扩展能力,后面会详细说到。
  • 扩展性:扩展性分为热扩展和冷扩展。作为一个如今的集群,也许支持热扩展是标配了,如果你的mq消息是一个基于集群来存储的话。
  • 可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
  • 容错性:允许集群中节点失败(比如副本数量为n,则允许n-1个节点失败或者其他规则)
  • 安全性:为了确保消息在传输的过程的安全,你需要搞一些安全传输协议类似TLS或者其他的安全传输协议来为消息保驾护航。
  • 高并发:这是个老生常谈的问题,支持数千个客户端同时读写是常有的事情。
  • 支持多种消息格式。

三个标准动作

1.写入

1、支持批量写。

当人们觉得一个一个的写入比较浪费时,几乎总是想到“批量”二字。一个能力强大的消息队列,自然也会考虑到支持批量push。

批量又有多种做法:

(1)、按数据大小。这种做法也是比较常用的。就是当本地内存的消息堆砌到指定大小后就发送一次。

(2)、按时间。本地内存的消息一直堆砌,定期发送。

(3)、按消息量。本地内存中的消息数量达到配置数量后批量发送。

2、压缩支持。

当涉及到传输的时候,如果你要想进一步提高传输速度和节省流量,几乎总会想起使用压缩算法来减少消息体的体积。有人会担心cpu的消耗,但事实上常常是传输所带来的瓶颈更多一点,cpu一般还是蛮宽裕的。压缩常用的算法有gzip、snappy、LZO等等。

3、同步写和异步写。

消息队列本身就是一种异步解决方案。但这还不够,为了进一步的提高效率,你可以把写入,也就是生产者producer的动作,也设计成同时支持同步写入和异步写入。

4、ack。

一个消息被发送到存储地后,存储地需要response。但这个response可以有两种做法,一种做法是二话不说,立马返回。另外一种做法是等待消息被成功的发送到指定的分片,确保存储成功后再携带状态码返回给发送者。后面会在可靠性那里会具体说到。

5、负载均衡。

涉及到集群几乎总是要提到负载均衡。这里说的负载均衡主要是指数据以怎样的方式被均衡地分布到集群的每个节点。

2.存储

1、分布式存储。

凡是涉及到存储。涉及到大量存储,最后总是要去搞分布式存储。消息的存储也是一样的。

2、热扩展。

扩展分为冷扩展和热扩展。现在几乎所有的那些“集群”组件都是天然热扩展。

3、持久化。

消息队列主要解决两件事情,一件事情是缓解同步压力,另外一件事情就是以极快的速度把请求处理掉。为了写入速度快,有的消息中间件喜欢把消息存到内存中,然后再异步定时刷入到磁盘。也有的消息中间件直接不支持持久化。一个可靠的消息队列组件总是应该支持持久化。也有的消息中间件直接为了提高速度,直接把消息写入磁盘,你听到这里是不是感觉怪怪的,其实有时候磁盘的读写要比内存读写速度更快,只要你操作的姿势足够风骚。有关磁盘读写的速度问题后面会专门说到。

4、消息的有效期限。

涉及到存储,自然就要有成本,内存和磁盘的无限扩大总会让你感到“不够用”的感觉。于是你会想到为那些并不需要必须被消费的消息设置一个过期时间,到了时间点,消息就无情的从内存或磁盘中删除掉了。

5、数据复制。

数据复制主要分为主从复制和对等复制。而主从复制是非常主要的一种做法。

主从复制(replication)过程就是把数据从主节点同步到从节点的过程。

NoSQL-Master-Slave Replication 主从复制

NoSQL Peer-to-Peer Replication 对等复制

NoSQL- 分片和复制双剑合璧

可能你都懒得去看链接中的内容,这里还是放两张图给个直观印象:

主从复制(上图)

对等复制(上图)

6、数据分片。

分片是为了解决数据的横向扩展能力。比如kafka就是通过为每个topic增加parition的方式来进行横向扩展。redis是通过增加master和slave的方式达到了横向扩展的能力。当然slave起到的扩展了读取以及数据复制的能力。

下面这个图就是kafka的做法:

一个topic下通过多个partition来扩展存储能力。每个partition内部又有多个slave和一个master(kafka内部叫leader,都一样)

这样多个partition就组成了一个真正的集群,多个master,每个master又对应有多个slave。

7、消息状态管理。

关于消息状态管理,无非就是两种做法。一种是把消息状态交给存储层来管理。另外一种做法就是把消息状态交给读取端管理,也就是消费者去管理。有的消息会把状态管理交给存储地。这在一定程度上统一了对消息的处理,但也在一定程度上限制了客户端对于消息的控制。现在很多的消息中间件会倾向把状态管理交给读取端来处理。也就是消费者。这样的话,消费者就可以自由的处理消息了,可以把消费的消息再重新消费等等。总之就是持有一个offset而已。

3.读取

1、状态维护。

你可以把状态维护的事情交给消费者来做。这样你可以自由的控制消息。说到消息状态,主要是指消息的offset。状态维护上面已经说过,如下:

关于消息状态管理,无非就是两种做法。一种是把消息状态交给存储层来管理。另外一种做法就是把消息状态交给读取端管理,也就是消费者去管理。有的消息会把状态管理交给存储地。这在一定程度上统一了对消息的处理,但也在一定程度上限制了客户端对于消息的控制。现在很多的消息中间件会倾向把状态管理交给读取端来处理。也就是消费者。这样的话,消费者就可以自由的处理消息了,可以把消费的消息再重新消费等等。总之就是持有一个offset而已。

2、消费能力横向扩展。

有关消费能力的横向扩展后面会详细讲到。

3、解压。

对应于上面写入的“压缩”,不说了。

一个标准概念

Topic

一种mq中可以有多个topic。一个topic代表一个应用,你可以这样理解。或者说叫某一种业务的抽象表现。比如用户模块、交易模块等等。

一个topic逻辑上就是一个queue,你可以这样理解。相信你肯定编写过有关linkedlist的代码,甚至自己去实现过一个链表。消息队列的结构非常类似linkedlist。好了,topic不多说了。

消息队列使用场景

你可能第一个想到的就是日志。没错。这也是一个非常典型的使用场景。其实只要是涉及到“同步压力”的,我们都可以使用消息队列做缓冲。

以下是一些典型的使用场景:

  • 日志收集:你可以用消息队列收集各种服务的log,然后以统一接口的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
  • 消息系统:解耦和生产者和消费者、缓存消息等。
  • 用户活动跟踪:记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到mq的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。
  • 运营指标:记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
  • 流式处理:比如spark streaming和storm
  • 事件源
  • 其它

关于消息队列标准的一些思考

笔者一直认为,在将来的某一天会有人设计出针对mq的数据库,或者现在已经有了。而且这种数据库的设计应该像sql语句那样成为一个标准。如果有这样的举措的话,那么未来的mq圈也算是有了统一的mq模型。也许你不同意我的观点。

消息可靠性问题

检验和支持

加个checksum可以让你知道你的数据是否被篡改,虽然这样做只能发现问题,但不能解决问题。加上还是好的。

加密

加密上面已经提到。

为了确保消息在传输的过程的安全,你需要搞一些安全传输协议类似TLS或者其他的安全传输协议来为消息保驾护航。

消费者commit

就像下面将要说到的生产者一样。在消费者接收到消息后,你可以先处理消息,再commit;先commit,再处理消息;或者不做commit。

下面是kafka的做法:

  • Consumer在从broker读取消息后,可以选择commit,该操作会在Zookeeper中保存该Consumer在该Partition中读取的消息的offset。该Consumer下一次再读该Partition时会从下一条开始读取。如未commit,下一次读取的开始位置会跟上一次commit之后的开始位置相同。当然可以将Consumer设置为autocommit,即Consumer一旦读到数据立即自动commit。如果只讨论这一读取消息的过程,那Kafka是确保了Exactly once。但实际使用中应用程序并非在Consumer读取完数据就结束了,而是要进行进一步处理,而数据处理与commit的顺序在很大程度上决定了消息从broker和consumer的delivery guarantee semantic。
  • 读完消息先commit再处理消息。这种模式下,如果Consumer在commit后还没来得及处理消息就crash了,下次重新开始工作后就无法读到刚刚已提交而未处理的消息,这就对应于At most once
  • 读完消息先处理再commit。这种模式下,如果在处理完消息之后commit之前Consumer crash了,下次重新开始工作时还会处理刚刚未commit的消息,实际上该消息已经被处理过了。这就对应于At least once。在很多使用场景下,消息都有一个主键,所以消息的处理往往具有幂等性,即多次处理这一条消息跟只处理一次是等效的,那就可以认为是Exactly once。(笔者认为这种说法比较牵强,毕竟它不是Kafka本身提供的机制,主键本身也并不能完全保证操作的幂等性。而且实际上我们说delivery guarantee 语义是讨论被处理多少次,而非处理结果怎样,因为处理方式多种多样,我们不应该把处理过程的特性——如是否幂等性,当成Kafka本身的Feature)
  • 如果一定要做到Exactly once,就需要协调offset和实际操作的输出。精典的做法是引入两阶段提交。如果能让offset和操作输入存在同一个地方,会更简洁和通用。这种方式可能更好,因为许多输出系统可能不支持两阶段提交。比如,Consumer拿到数据后可能把数据放到HDFS,如果把最新的offset和数据本身一起写到HDFS,那就可以保证数据的输出和offset的更新要么都完成,要么都不完成,间接实现Exactly once。(目前就high level API而言,offset是存于Zookeeper中的,无法存于HDFS,而low level API的offset是由自己去维护的,可以将之存于HDFS中)

生产者 ack

ack可以做成几种深度。

第一种,发送了就不管了。

第二种,成功发送到master后,然后返回确认状态码。

第三种,发送到master并且成功的复制到了每个副本中,然后返回确认状态码。

消费者端预保存

很简单,拿到消息先在本地做个保存。如果有问题,就可以追溯。但这样做的性能会有所损耗,你可以通过一些算法来优化一点。

总之,要想做到可靠,你只能是多保存几份。就是冗余。冗余又分为备份、重试、逻辑确认等。总之就是“多存几份,多试几次,多确认几下,搞事搞事搞事!”

其它那些说的再怎么天花乱坠都是离不开冗余!

关于扩展能力

凡是涉及到计算和存储的地方都可以加扩展。这里的扩展是指的计算能力的扩展和存储能力的扩展。在mq中,上面已经说了,有三个标准动作:写入、存储、读取。那么这三个动作理论上应该都加上扩展支持。

先来说写入。写入端的扩展能力取决于具体业务应用的处理能力。

然后是存储。存储上面也说了。

最后是读取。如果某个topic下的消息过多,这时候就需要消费者端具有更强的处理能力,这也算是为存储端缓解存储压力。怎么做呢?可以为消费者分组。这个分组的概念也很简单。就是我一个人干不完的事情,我找一帮人来搞。那一帮人去搞一个topic下的所有消息,自然需要每个人彼此都要配合协调好,比如你消费掉的我就不会再去消费了之类的。

下面这张图是kafka的读写能力扩展:

消息队列的集群设计

消息队列的集群应该以topic为单位进行集群的搭建。也就说从逻辑上,每个topic应该拥有一个单独的集群,至少是一个大集群下的“小集群”,这样也算是一种分而治之的原则。

就像redis集群一样,你总是以应用为单位,为每个应用搭建一个单独的集群。这样可以确保存储压力的分散,也算是一种去中心化的体现。当然在实践中,我们也遇到了一些比较另类缓存的实践,比如所有的应用都使用同一个缓存集群,个人不推荐这种做法,在我看来,这样的做法想想都觉得可怕。

磁盘快还是内存快?

基本上大部分时候我们都认为磁盘的速度是慢于内存的。但特殊情况下,磁盘会比内存还要快。这听起来不可思议。但真实的情况就是:磁盘线性写的速度要远远大于随机写的速度。下面有人测试的数据:

在一个6 7200rpm SATA RAID-5 的磁盘阵列上线性写的速度大概是600M/秒,但是随机写的速度只有100K/秒,两者相差将近6000倍。

线性读写在大多数的应用场景下还是可行的。而且操作系统利用read-ahead和write-behind技术来从大的数据块中预先取一部分数据,或者将多个逻辑上的写操作组合成一个大的物理写操作中。好吧,总之就是对磁盘的线性读在有些情况下可以比内存的随机访问要快一些

此外,如果我们是在JVM的基础上构建的,熟悉Java内存应用管理的人应该清楚以下两件事情:

  1. 一个对象的内存消耗是非常高的,经常是所存数据的两倍或者更多。
  2. 随着堆内数据的增多,Java的垃圾回收会变得非常昂贵。

上面说了这么多,其实告诉了我们一个新的思路,那就是除了“先内存后刷磁盘”,我们其实可以拿到数据立刻将它写到一个持久化的日志中。

其实从表面上看,我们是直接存入了磁盘,其实我们是将数据先传输到了内核页缓存中,稍后才会被刷新。我们可以增加一个配置项以让系统的用户来控制数据在什么时候被刷新到物理硬盘上。

好吧,就写这么多!蓦然回首,这人鲁班门前耍大刀!

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2017-02-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 ImportSource 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档