Apache Pulsar 是 Apache 软件基金会顶级项目,自称是下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐、低延时及高可扩展性等流数据存储特性。
Pulsar 是一个用于服务器到服务器的消息系统,具有多租户、高性能等优势。Pulsar 最初由 Yahoo 开发,目前由 Apache 软件基金会管理。
目前的Pulsar社区发展十分迅速,Pulasr的版本也在不断的更新迭代,目前大版本的迭代已经到了2.8,本文是结合我个人在Github和Pulsar社区中对Pulsar的探索过程中总结的,云原生的时代,Pulsar能否大放异彩,值得我们期待。
本文是一个引子,后续我会对Pulsar进行非常详细的介绍。
这部分你可能会看到非常多的概念跟Kafka相关,我们一起来看一下:
是不是很熟悉,在Kafka中也有Topic的概念。
Topic 是一个消息目录或者说存放消息的命名空间,也就是消息发布(生产)的位置。一个 topic 可以有一个或多个 producer 和/或 consumer。Producer 向 topic 写入消息,consumer 从 topic 消费消息。图 1 展示了三者之间如何协同工作。
Apache Pulsar 使用 Apache BookKeeper 作为存储层。Apache BookKeeper 针对实时工作负载进行优化,是一项可扩展、可容错、低延迟的存储服务。客户端发布的消息存储在 BookKeeper 的服务器实例中,即 bookie。
Ledger 是 BookKeeper 中的基本存储单元。一系列的 entry 组成一个 ledger,entry 被顺序写入 ledger。
Journal 文件包含 BookKeeper 中的消息写入日志。在更新 ledger 前,bookie 确保已经将更新的交易(交易日志 entry)写入非易失存储。在 bookie 第一次运行或旧的 journal 文件大小达到指定阈值时,会创建新的 journal 文件。
Entry log 文件用于管理 BookKeeper 客户端写入的 entry。来自不同 ledger 的 entry 会被依次写入一个或多个 entry log 中,而偏移量则作为指针保存在 ledger 缓存中,以进行快速查找。
和Kafka一样,在 Pulsar 中,broker 是一个无状态服务器,用于协助读写数据。一个 topic 不能同时被多个 broker 管理,但是 topic 可以存储在多个 bookie 服务中。
Entry是存储到bookkeeper中的一条记录
可以认为ledger是用来存储Entry的,多个Entry序列组成一个ledger
元数据存储,是用于存储bookie相关的元数据,比如bookie上有哪些ledger,bookkeeper目前使用的是zk存储,所在在部署bookkeeper前,要先有zk集群
其实就是bookkeeper的WAL(write ahead log),用于存bookkeeper的事务日志,journal文件有一个最大大小,达到这个大小后会新起一个journal文件
存储entry的文件,我理解ledger是一个逻辑上的概念,不同ledger中的entry会先按ledger聚合,然后写入entry log文件中。同样,entry log会有一个最大大小,达到最大大小后会新起一个新的entry log文件
ledger的索引文件,ledger中的entry被写入到了entry log文件中,索引文件用于对entry log文件中每一个ledger做索引,记录每个ledger在entry log中的存储位置以及数据在entry log文件中的长度
用于缓存索引文件的,加快查找效率
内存中会存储一个LastLogMark,其中包含txnLogId(journal文件的id)和txnLogPos(journal文件中的位置),entry log文件和index文件都会先在内存中被缓存,当内存达到一定值或者离上一次刷盘过期了一段时间(定时线程)后,会触发entry log文件和index文件的刷盘,之后再将LastLogMark持久化,当lastLogMark被持久化后,表示在lastLogMark之前的entry和索引都已经写到了磁盘上,这个时候可以将lastLogMark之前的journal文件清掉,如果LastLogMark在持久化前出现了宕机,可以通过journal文件做恢复,保证了数据不丢
数据的合并,有点类似于hbase的compact过程。在bookie上,虽然entry log在刷盘前会按ledger做聚合,但是因数数据会不断的新增,每个leadger的数据会交差存储在entry log文件中,而bookie上有一个用于做垃圾回收的线程,该线程会将没有关联任何ledger的entry文件进行删除,以便回收磁盘空间,而compaction的目的则是为了避免entry log中只有少数的记录是有关联的ledger的情况,不能让这样的entry log文件一直占用磁盘空间,所以垃圾收集线程会将这样的entry log中有关联ledger的entry复制到一个新的entry log文件中(同时修改索引),然后将老的entry log文件删除。与hbase类似,bookkeeper的compaction也分为两种:
当entry log中有效的entry只占20#以下时做compaction
当entry log中有效的占到80%以下时就可开始做compaction
关键特性
一个Pulsar实例由一个或多个Pulsar集群组成。实例中的集群可以在它们之间复制数据。一个Pulsar cluster由三部分组成:
特别需要注意的是:集群间可以通过跨地域复制(Geo-Replication)进行消息同步。
ZooKeeper负责存储元数据,集群配置,协调:其中local zk负责Pulsar Cluster内部的配置,global zk则用于Pulsar Cluster之间的数据复制。
Bookie负责存储;Broker负责负载均衡和消息的读取、写入等;Global replicators负责集群间的数据复制。
Apache BookKeeper
在这里我们赵中介绍一下Apache BookKeeper。Pulsar用 Apache BookKeeper作为持久化存储,BookKeeper有以下几个特性:
Ledgers
Ledger是一个只追加(append-only)的数据结构,并且只有一个写入器,这个写入器负责多个BookKeeper存储节点(就是Bookies)的写入。Ledger的条目(entries)会被复制到多个bookies。Ledgers具有以下特性:
Pulsar geo-replication
下图说明了 Pulsar 在不同集群之间跨地域复制的过程:
在上图中,每当P1,P2和P3生产者将消息分别发布到Cluster-A,Cluster-B和Cluster-C群集上的T1主题时,这些消息就会立即在群集之间复制。复制消息后,C1和C2使用者可以使用它们各自群集中的消息。没有geo-replication,C1和C2使用者将无法使用P3产生者发布的消息。
分层存储
通过使用分层存储(Tiered Storage),在 backlog 中的旧消息可以从 BookKeeper 转移到更廉价的存储中,不出其他问题,客户端将仍然可以访问 backlog,降低了存储成本。
Pulsar 当前支持 S3, Google Cloud Storage (GCS) 和文件系统(filesystem)来做长期存储(long term store)。可以将数据卸载(Offloading)到长期存储中。
我们直接饮用Pulsar官方博客中的总结:Pulsar Broker是无状态的,没有不能丢失的持久化状态,与存储层分开。
Bookeeper集群本身并不执行复制,每个Bookies只是一个跟随者被领导者人知做什么,领导人是Pulsar Broker。每个Topic都由一个Pulsar Broker拥有,该Broker提供Topic的所有读写操作。Pulsar通过让领导人(Pulsar Broker)没有状态,BookKeeper的fencing特性可以很好的处理脑裂问题。没有脑裂,没有分歧,没有数据丢失。
此外,当在Bookie上写入数据时,首先将该消息写入日志文件,这是一个预写日志(WAL),它可以帮助BookKeeper在发生故障时避免数据丢失。它与关系型数据库持久化保证的机制相同。
Pulsar的顺序保证只在特定的模式下才能得到保证。BookKeeper容许将磁盘IO做读写分离。写入都按顺序写入日志文件可以存储在专用的磁盘上,并且可以批量刷盘以获得搞得吞吐量。除此之外从写入操作来看没有其他的同步磁盘IO操作,数据都是写入到内存的缓存区。
关于读写延迟测试你可以参考官方给出的文档:
https://blog.csdn.net/zhaijia03/article/details/111602732
这个文档包含了 Pulsar 和 Kafka 支持的各持久性级别,以及在同一持久性级别下两者的吞吐量和延迟的对比。