Apache Pulsar:核心概念、架构原理与实践
Apache Pulsar 是 Apache 顶级项目之一,凭借其“计算与存储分离”的云原生架构,集消息、存储、轻量化计算于一体,成为下一代分布式消息流平台的首选。它不仅支持多租户、跨地域复制等企业级特性,还兼容 Kafka、RabbitMQ 等生态,完美适配现代云原生应用的弹性伸缩与高可用需求。本文将结合官方课程文档,从核心概念、架构设计、关键特性到实战价值,全面拆解 Pulsar 的技术精髓。
思维导图
一、Pulsar 核心定位:不止是消息队列
Apache Pulsar 是下一代云原生分布式消息流平台,并非传统意义上的消息队列。它的核心定位是“三合一”:
- 消息队列:支持 Queue/Stream 双模型,兼容 RabbitMQ、Kafka 核心能力。这意味着 Pulsar 能够满足不同业务场景下对消息队列的需求,无论是传统的队列模式还是流式处理模式,都能游刃有余。
- 数据存储:通过 BookKeeper 实现高可靠持久化,支持无限消息堆积。在数据存储方面,Pulsar 利用 BookKeeper 的分布式存储机制,确保消息的持久化和可靠性,即使在面对海量消息的情况下,也能够保证数据的完整性和持久性。
- 轻量化计算:内置 Pulsar Functions 框架,实现流数据实时处理。Pulsar 不仅提供消息传输和存储功能,还具备轻量级的计算能力,通过 Pulsar Functions 框架,用户可以快速实现对流数据的实时处理,如数据过滤、转换、聚合等操作,无需额外部署复杂的流处理引擎。
其设计初衷是解决传统消息系统的架构瓶颈(如 Kafka 计算存储耦合、RabbitMQ 扩展性不足),适配云原生环境下“弹性伸缩、多租户共享、跨地域协同”的核心需求。目前已被 Yahoo、腾讯、中国电信、VIPKID 等国内外企业广泛应用于金融、物联网、在线教育等多个行业。
二、核心概念
2.1 基础核心概念补充
2.1.1 统一消息存储模型
Pulsar 采用统一消息存储模型,同时支持 Queue 和 Stream 两种语义,核心特点如下:
- 无界流存储:以流的方式永久保存原始数据,无数据过期限制。这意味着数据可以长期存储,不会因为时间的推移而丢失,为数据的长期分析和挖掘提供了基础。
- 分区容量无上限:突破单节点存储限制,通过分片扩展存储能力。Pulsar 的分区机制允许数据在多个节点之间进行分布存储,从而突破了单个节点的存储容量限制,实现了存储能力的水平扩展。
- 存储灵活:可利用云存储或廉价存储(如 HDFS),降低长期存储成本。Pulsar 支持多种存储介质,用户可以根据自己的需求和成本考虑,选择合适的存储方案,如将热数据存储在高性能的存储介质中,而将冷数据存储在廉价的云存储或 HDFS 中,从而降低存储成本。
- 透明化存储:客户端无需关心数据存储位置,由 Pulsar 统一管理。Pulsar 对客户端隐藏了数据存储的细节,客户端只需关注消息的生产和消费,而无需关心数据存储的具体位置和方式,这大大简化了客户端的开发和使用。
2.1.2 多租户隔离机制
Pulsar 原生支持多租户,通过“Tenant(租户)→Namespace(命名空间)→Topic(主题)”三级结构实现彻底隔离:
- 租户(Tenant):跨集群分布,拥有独立认证授权机制和资源配额(存储/带宽限制)。租户是 Pulsar 中的最高级别隔离单位,不同租户之间在资源分配、认证授权等方面是相互独立的,从而保证了不同租户之间的隔离性。
- 命名空间(Namespace):租户的管理单元,同一命名空间下的 Topic 共享配置(消息 TTL、存储配额等)。命名空间是租户内部的管理单元,用于对 Topic 进行分组和管理,同一命名空间下的 Topic 可以共享一些配置信息,如消息的 TTL(生存时间)、存储配额等。
- 主题(Topic):消息传输最小单元,命名格式为
persistent://tenant/namespace/topic,明确归属关系。Topic 是 Pulsar 中消息传输的最小单元,通过其命名格式可以明确地表示出该 Topic 所属的租户和命名空间,从而实现了消息的层次化管理和隔离。
2.2 四种订阅模式:原理、图解与实践
2.2.1 独占模式(Exclusive):单消费者严格顺序消费
核心原理
同一 Topic 仅允许一个消费者订阅并消费,若多个消费者尝试订阅,直接抛出异常。消息按生产者发送顺序依次投递,确保全量消息的严格顺序性,是 Pulsar 的默认订阅模式。
架构图解
流程说明:
- Producer1 和 Producer2 向 Topic 发送消息(顺序:message0 → message1 → message2 → message3);
- Consumer A-1 独占订阅该 Topic,所有消息按发送顺序依次投递至 A-1;
- 若 Consumer A-0 尝试订阅同一 Topic,Broker 直接返回报错,拒绝连接;
- Consumer 消费完成后发送 ACK,Broker 标记消息为已处理,后续不再重复投递。
代码配置(Java 客户端)
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic("persistent://public/default/order-topic")
.subscriptionName("exclusive-sub")
.subscriptionType(SubscriptionType.Exclusive) // 指定独占模式
.ackTimeout(10, TimeUnit.SECONDS)
.subscribe();
适用场景与优缺点
- 适用场景:需要严格顺序的核心业务(如订单状态流转、金融交易日志、数据同步);
- 优点:消息顺序绝对保证,无重复消费风险,配置简单;
- 缺点:单消费者存在性能瓶颈,消费者故障会导致消费停滞(需手动重启或切换)。
2.2.2 灾备模式(Failover):多消费者故障自动切换
核心原理
同一 Topic 可注册多个消费者,Broker 按消费者连接顺序排序,仅让“活跃消费者”处理消息,其他消费者作为备用。当活跃消费者故障(如网络断开、进程崩溃),Broker 自动将备用消费者升级为活跃状态,继续消费,保证服务连续性。
架构图解
流程说明:
- Producer1 和 Producer2 向 Topic 发送消息,初始状态下 Consumer B-2 为活跃消费者,接收所有消息;
- 当 Consumer B-2 故障(如服务器宕机),Broker 检测到连接断开后,自动将排序第二的 Consumer B-1 升级为活跃消费者;
- Consumer B-1 从 B-2 未消费完成的消息(如 message3、message4)开始继续处理,无消息丢失;
- 支持动态添加备用消费者,提升高可用等级。
代码配置(Java 客户端)
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic("persistent://public/default/payment-topic")
.subscriptionName("failover-sub")
.subscriptionType(SubscriptionType.Failover) // 指定灾备模式
.negativeAckRedeliveryDelay(60, TimeUnit.SECONDS) // 故障重试延迟
.subscribe();
适用场景与优缺点
- 适用场景:高可用核心业务(如支付通知、风控告警、实时消息推送);
- 优点:故障自动切换,无人工干预,保证消息顺序,服务可用性高;
- 缺点:同一时间仅一个消费者工作,吞吐量受限于单个消费者性能。
2.2.3 共享模式(Shared):多消费者轮询并行消费
核心原理
同一 Topic 可注册多个消费者,Broker 采用 Round-Robin 轮询机制分发消息,每条消息仅投递至一个消费者。支持动态增减消费者节点,吞吐量随消费者数量线性提升,但不保证消息顺序。
架构图解
流程说明:
- Producer1 和 Producer2 向 Topic 发送消息(message0 → message1 → message2 → message3 → message4);
- Broker 通过轮询机制分发消息:message0 投递给 Consumer C-1,message1 投递给 C-2,message2 投递给 C-3,message3 再次投递给 C-1;
- 若 Consumer C-2 断开连接,其未消费的消息(如 message4)会重新分配给其他存活的消费者(C-1 或 C-3);
- 消费者独立处理消息并发送 ACK,互不影响,支持高并发处理。
代码配置(Java 客户端)
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic("persistent://public/default/log-topic")
.subscriptionName("shared-sub")
.subscriptionType(SubscriptionType.Shared) // 指定共享模式
.maxTotalReceiverQueueSizeAcrossPartitions(1000) // 增大接收队列,提升吞吐量
.subscribe();
适用场景与优缺点
- 适用场景:高吞吐无序场景(如日志采集、监控数据上报、任务分发);
- 优点:支持水平扩展,吞吐量高,消费者故障不影响整体服务;
- 缺点:不保证消息顺序,不支持累积确认(仅支持单条确认),可能存在重复消费(如消费者故障后消息重投)。
2.2.4 键共享模式(Key_Shared):同 Key 顺序+多消费者并行
核心原理
消息与消费者均绑定 Key,Broker 按消息 Key 的哈希值定向分发,同一 Key 的消息始终投递至同一个消费者,保证同 Key 消息的顺序性;不同 Key 的消息可分发至不同消费者,实现并行处理,兼顾顺序与吞吐量。
架构图解
流程说明:
- Producer1 和 Producer2 发送带 Key 的消息:<k1.v1>、<k1.v2>(Key=k1),<k2.v3>(Key=k2),<k3.v1>、<k3.v2>(Key=k3);
- Broker 按 Key 哈希分发:k1 消息投递给 Consumer D-1,k2 消息投递给 D-2,k3 消息投递给 D-3;
- 同 Key 消息按发送顺序处理(如 <k1.v1> 先于 <k1.v2> 投递),不同 Key 消息并行处理;
- 新增消费者时,Broker 会重新分配部分 Key 的消息,确保负载均衡;消费者故障后,其负责的 Key 会迁移至其他消费者。
代码配置(Java 客户端)
// 注意:Key_Shared 模式需配合 KEY_BASED 批处理构建器
Producer<byte[]> producer = pulsarClient.newProducer()
.topic("persistent://public/default/user-topic")
.batcherBuilder(BatcherBuilder.KEY_BASED) // 按 Key 分组批处理
.create();
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic("persistent://public/default/user-topic")
.subscriptionName("key-shared-sub")
.subscriptionType(SubscriptionType.Key_Shared) // 指定键共享模式
.subscribe();
// 发送带 Key 的消息
producer.newMessage().key("user-1001").value("user behavior data".getBytes()).send();
适用场景与优缺点
- 适用场景:按分组顺序消费的高吞吐场景(如用户行为分析、会话跟踪、订单履约);
- 优点:兼顾同 Key 顺序与整体吞吐量,支持动态扩缩容,负载均衡;
- 缺点:配置稍复杂(需启用 KEY_BASED 批处理),Key 分布不均可能导致部分消费者负载过高。
2.2.5 四种订阅模式对比总结
三、云原生架构设计:计算与存储分离的精髓
Pulsar 最核心的架构创新是计算与存储分离,彻底解决了传统消息系统“扩容难、故障影响大”的痛点。
3.1 架构组成三大部分
(引用自 Apache Pulsar 官网)
- Broker 集群:无状态计算节点
- 核心职责:消息路由、负载均衡、分发消费、跨集群复制。Broker 负责接收生产者发送的消息,并将消息路由到相应的存储节点(BookKeeper),同时负责将消息分发给消费者,并处理跨集群的消息复制工作。
- 关键特性:无状态设计,扩容/缩容/替换不影响数据,支持 K8s 弹性调度。由于 Broker 是无状态的,因此在进行扩容、缩容或替换操作时,不会对数据产生影响,这使得 Broker 非常适合在 Kubernetes 等容器编排平台上进行弹性调度。
- 内部组件:HTTP 服务器(管理接口)、TCP 调度器(数据传输)。Broker 内部包含 HTTP 服务器,用于提供管理接口,方便用户对集群进行管理和监控;同时,它还包含 TCP 调度器,用于处理数据的传输工作。
- BookKeeper 集群:持久化存储节点(Bookie)
- 核心职责:消息持久化存储,支持多副本容错。BookKeeper 负责将消息持久化存储到磁盘上,并通过多副本机制来保证数据的可靠性,即使部分存储节点出现故障,也不会导致数据丢失。
- 关键特性:
- 分片存储:消息按 Ledger→Segment→Entry 分层存储,独立扩容无瓶颈。BookKeeper 采用分片存储机制,将消息存储在多个 Ledger 中,每个 Ledger 又分为多个 Segment,每个 Segment 包含多个 Entry,这种分层存储结构使得存储容量可以独立进行扩容,不存在瓶颈问题。
- IO 隔离:日志与数据分盘存储,读写互不干扰,提升并发性能。BookKeeper 将日志和数据分别存储在不同的磁盘上,从而实现了读写操作的隔离,避免了读写操作之间的相互干扰,提高了并发性能。
- 高并发:单 Bookie 支持数千 Ledger 并发读写,适配高吞吐场景。每个 Bookie 节点都能够支持数千个 Ledger 的并发读写操作,这使得 BookKeeper 能够很好地适配高吞吐量的场景。
- ZooKeeper 集群:元数据与协调中心
- 本地 ZooKeeper:存储集群内部配置(Broker 负载、BookKeeper 元数据)。本地 ZooKeeper 用于存储 Pulsar 集群内部的配置信息,如 Broker 的负载情况、BookKeeper 的元数据等,这些信息对于集群的正常运行和管理至关重要。
- 全局 ZooKeeper:存储跨集群配置(租户信息、命名空间策略、跨地域复制规则)。全局 ZooKeeper 则用于存储跨集群的配置信息,如租户信息、命名空间策略、跨地域复制规则等,这些信息对于实现多集群之间的协同工作和跨地域的数据复制非常重要。
- 核心作用:集群协调、主从选举、元数据一致性保障。ZooKeeper 在 Pulsar 集群中起到了协调中心的作用,负责处理集群内部的主从选举、元数据的一致性保障等工作,确保集群的正常运行和数据的可靠性。
3.2 核心数据流转流程
- 生产者发送消息到 Broker,支持同步/异步发送,批量发送时可累积消息提升效率。生产者可以通过同步或异步的方式将消息发送到 Broker,为了提高发送效率,还可以采用批量发送的方式,将多条消息累积在一起后一起发送。
- Broker 将消息写入本地缓存以降低延迟,同时异步同步到 BookKeeper 集群(多副本存储确保可靠性)。Broker 接收到消息后,会先将消息写入本地缓存,以降低消息发送的延迟,然后异步地将消息同步到 BookKeeper 集群中,并采用多副本存储的方式,确保消息的可靠性。
- BookKeeper 以 Ledger 为单位存储消息,每个 Ledger 包含多个 Segment,实现数据分片管理。BookKeeper 接收到消息后,会以 Ledger 为单位将消息存储起来,每个 Ledger 包含多个 Segment,通过这种方式实现数据的分片管理,提高了存储的灵活性和扩展性。
- 消费者通过订阅从 Broker 拉取消息,支持同步接收或异步回调。消费者可以通过订阅的方式从 Broker 拉取消息,支持同步接收或异步回调的方式,根据具体的应用场景选择合适的接收方式。
- 消费者消费完成后发送 ACK 确认,Broker 收到 ACK 后标记消息状态,按需清理或保留(遵循 Namespace 级别的保留策略)。消费者在消费完消息后,会发送 ACK 确认消息给 Broker,Broker 收到 ACK 后会标记消息的状态,然后根据 Namespace 级别的保留策略,决定是否清理或保留该消息。
3.3 架构优势对比传统 MQ
3.4 与 Kafka 核心差异补充
| | |
|---|
| | 基于 Ledger→Segment→Entry 分层存储 |
| | |
| | |
| | |
| | |
四、核心特性:为什么选择 Pulsar?
4.1 架构层面:天生适配云原生
- 弹性伸缩:Broker 无状态,可基于流量动态扩缩容;BookKeeper 支持横向扩展,存储容量无上限。Pulsar 的架构设计使其能够很好地适配云原生环境,Broker 的无状态特性使得它可以基于流量的变化动态地进行扩缩容操作,而 BookKeeper 的横向扩展能力则保证了存储容量可以无限制地增长,满足了云原生应用对弹性和可扩展性的要求。
- 分层存储:热数据存 BookKeeper 保证低延迟,冷数据自动迁移到 S3/GCS/HDFS 等廉价存储,降低 70%+ 存储成本。Pulsar 支持分层存储机制,将热数据存储在 BookKeeper 中,以保证低延迟的访问性能,而将冷数据自动迁移到 S3、GCS、HDFS 等廉价存储介质中,从而降低了存储成本,提高了存储资源的利用效率。
- 分片流(Segmented Streams):无界数据拆分为分片存储,用户无需手动迁移数据,实现“无限存储”体验。Pulsar 将无界数据拆分为多个分片进行存储,用户无需手动进行数据迁移操作,就可以实现“无限存储”的体验,这大大简化了数据存储和管理的复杂性。
4.2 功能层面:一站式解决消息+存储+计算
- 统一消息模型:同一份数据可通过不同订阅模式实现队列(如 Exclusive/Failover 模式)或流(如 Shared/Key_Shared 模式)消费,无需重复存储。Pulsar 的统一消息模型允许同一份数据通过不同的订阅模式进行消费,既可以实现队列模式的消费,也可以实现流模式的消费,而无需对数据进行重复存储,提高了数据的利用效率和系统的灵活性。
- 跨地域复制:集群间实时同步消息,支持复制订阅模式(Replicated-Subscriptions),某集群失效后可在其他集群恢复消费状态,实现异地灾备。Pulsar 支持跨地域复制功能,可以在不同的集群之间实时同步消息,并且支持复制订阅模式,当某个集群失效后,可以在其他集群中恢复消费状态,从而实现了异地灾备,提高了系统的可靠性和可用性。
- 消息治理能力:
- 去重:基于 Sequence ID 自动去重,Broker 端维护去重索引,确保 Exactly-Once 语义。Pulsar 具备消息去重能力,通过基于 Sequence ID 的自动去重机制,以及在 Broker 端维护去重索引,确保了消息的 Exactly-Once 语义,避免了消息的重复处理。
- 延迟传递:支持
deliverAfter(延迟时长)/deliverAt(指定时间)两种模式,延迟精度可通过 delayedDeliveryTickTimeMillis 配置(默认 1s)。Pulsar 支持延迟传递功能,提供了 deliverAfter 和 deliverAt 两种模式,用户可以根据需要配置延迟传递的时间,延迟精度可以通过 delayedDeliveryTickTimeMillis 参数进行配置,默认值为 1 秒。 - 保留/过期:Namespace 级别配置消息保留时长(如保留 7 天)或 TTL(如 24 小时过期),自动清理过期消息。Pulsar 允许在 Namespace 级别配置消息的保留时长或 TTL(Time-To-Live),系统会自动清理过期的消息,从而节省存储空间并保证数据的时效性。
4.3 生态层面:兼容现有系统,降低迁移成本
- 协议兼容:通过 KoP(Kafka-on-Pulsar)组件,无需修改 Kafka 客户端代码即可迁移至 Pulsar。Pulsar 提供了 KoP 组件,使得用户可以在不修改 Kafka 客户端代码的情况下,将现有的 Kafka 应用迁移到 Pulsar 上,大大降低了迁移成本,提高了系统的兼容性。
- 连接器生态:Pulsar IO 支持对接 HDFS、Spark、Flink、ES、HBase 等大数据组件,实现数据无缝流转。Pulsar 的连接器生态非常丰富,通过 Pulsar IO,可以方便地对接 HDFS、Spark、Flink、ES、HBase 等大数据组件,实现数据在不同系统之间的无缝流转,满足了大数据处理和分析的需求。
- 轻量化计算:Pulsar Functions 框架支持多语言(Java/Go/Python),可快速实现消息过滤、转换、聚合,无需部署复杂流处理引擎。Pulsar 提供了轻量级的计算框架 Pulsar Functions,支持多种编程语言(如 Java、Go、Python),用户可以快速实现对消息的过滤、转换、聚合等操作,而无需部署复杂的流处理引擎,提高了开发效率和系统的灵活性。
五、实战关键配置与典型场景
5.1 核心配置示例(Java 客户端+Spring Boot 集成)
1. 生产者配置(开启批处理+压缩+分块)
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic("persistent://tenant/ns/topic")
.producerName("my-producer")
.enableBatching(true) // 开启批处理(仅异步发送生效)
.batchingMaxMessages(1000) // 批处理最大消息数
.batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS) // 批处理最大延迟
.compressionType(CompressionType.LZ4) // 启用 LZ4 压缩,降低 3/4 带宽
.enableChunking(true) // 开启大消息分块(需禁用批处理或单独配置)
.chunkMaxMessageSize(1024 * 1024) // 分块大小 1MB
.sendTimeout(30, TimeUnit.SECONDS)
.blockIfQueueFull(true) // 队列满时阻塞,避免报错
.build();
2. 消费者配置(死信主题+重试+指定消费位置)
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic("persistent://tenant/ns/topic")
.subscriptionName("my-sub")
.subscriptionType(SubscriptionType.Shared)
.enableRetry(true) // 启用自动重试
.retryLetterTopic("persistent://tenant/ns/retry-topic") // 自定义重试主题
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(5) // 最大重试 5 次
.deadLetterTopic("persistent://tenant/ns/dlq-topic") // 死信主题
.build())
.ackTimeout(10, TimeUnit.SECONDS)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) // 从最早消息开始消费
.negativeAckRedeliveryDelay(60, TimeUnit.SECONDS) // 消费失败 60s 后重投
.build();
3. Spring Boot 集成核心配置(application.yml)
pulsar:
url: pulsar://192.168.8.144:6650,192.168.8.145:6650,192.168.8.146:6650
topic: topic1,topic2 # 多个 Topic 以逗号分隔
subscription: topicGroup # 消费者组名称
5.2 典型应用场景
- 高吞吐日志/监控数据采集:支持百万级 QPS,兼容 Kafka 生态,结合分层存储降低长期存储成本,可替代 Kafka 作为日志收集中枢。Pulsar 的高吞吐量和兼容 Kafka 生态的特性使其非常适合用于日志和监控数据的采集场景,同时结合分层存储机制可以有效降低长期存储成本,是 Kafka 的理想替代品。
- 多租户 SaaS 平台:原生多租户隔离,支持资源配额、权限管控,适合为不同客户提供独立消息服务(如电商 SaaS、营销平台)。Pulsar 的原生多租户隔离特性以及对资源配额和权限管控的支持,使其非常适合用于多租户 SaaS 平台,能够为不同客户提供独立的消息服务,满足电商 SaaS、营销平台等应用场景的需求。
- 跨地域业务同步:金融、电商等核心业务,通过跨地域复制实现异地灾备,保障业务连续性(如支付订单同步、用户数据备份)。在金融、电商等核心业务中,Pulsar 的跨地域复制功能可以实现异地灾备,保障业务的连续性,例如支付订单的同步和用户数据的备份等场景。
- 流计算实时处理:结合 Pulsar Functions 或 Flink 连接器,实现消息实时转换、过滤、聚合(如实时风控、实时报表、用户行为分析)。Pulsar 可以与 Pulsar Functions 或 Flink 连接器相结合,实现对消息的实时转换、过滤和聚合操作,满足实时风控、实时报表、用户行为分析等流计算实时处理场景的需求。
六、总结
Apache Pulsar 以“计算与存储分离”的云原生架构为核心,整合了消息队列、持久化存储、轻量化计算的能力,其统一消费模型、多租户隔离、跨地域复制等特性,完美适配现代云原生应用的多元化需求。无论是大型互联网公司的高吞吐场景,中小型企业的轻量化部署,还是 SaaS 平台的多租户需求,Pulsar 都能提供一站式解决方案。随着云原生技术的普及,Pulsar 正逐渐成为分布式消息流领域的首选平台,值得技术团队深入学习和实践。
官方资源参考:
- 官网:https://pulsar.apache.org/
- 源码:https://github.com/apache/pulsar
- 文档:https://pulsar.apache.org/docs/
- 连接器下载:https://pulsar.apache.org/download/