
Apache Pulsar 是一款开源的分布式消息系统,以其多租户、高性能、存储与计算分离等特性在现代消息队列领域占据一席之地。作为消息队列的重要组成部分,Consumer(消费者)承担了从 Broker 获取消息并处理的关键角色,其设计直接关系到系统的吞吐量、延迟以及可靠性。理解 Pulsar Consumer 的源码实现,不仅能帮助我们更好地掌握消息消费的流程和机制,还能为性能优化、故障排查及二次开发提供指导。
在本文中,我们将深入分析 Pulsar Consumer 的源码结构与实现细节,从创建流程、消息拉取、消息确认到负载均衡等方面全面剖析其工作原理,揭示其背后的设计思想与工程实践。无论是初学者还是有经验的开发者,相信都能通过这篇文章获得关于 Pulsar Consumer 的新认知。
老规矩,我们先以Consumer消费消息,来跟进Consumer的相关源码流程。
/**
* @author: 微信公众号【老周聊架构】
*/
publicclass PulsarClientTest {
public static void main(String[] args) throws PulsarClientException {
PulsarClient client = PulsarClient.builder()
.listenerThreads(1)
.ioThreads(1)
.serviceUrl("pulsar://127.0.0.1:6650")
.build();
Consumer<byte[]> consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-sub")
.subscribe();
Message<byte[]> message = consumer.receive();
System.out.println("receive message" + message.getData());
}
}
从上面的代码可以看出 Pulsar 为用户提供了非常简洁方便的 API,在使用时,只需要如下两步:
3.1 消费者的创建

创建 PulsarClient 时实际上是创建了 PulsarClientImpl 对象,其中 newConsumer 方法是创建一个 builder 用于链式调用:
其中 builder 的方法就不仔细阅读了,主要是对参数进行必要的验证后设置相应字段,比如必要的是主题名和订阅名,都保存在 ClientConfigurationData conf 中。
主要跟到 org.apache.pulsar.client.impl.ConsumerBuilderImpl#subscribeAsync

上述代码小结:
这段代码的主要功能是通过一系列配置验证和处理步骤来确保消费者配置的正确性,并根据配置动态生成重试和死信队列的主题名称。最终,它会创建并返回一个 Consumer实例。具体来说:
继续跟进 org.apache.pulsar.client.impl.PulsarClientImpl#subscribeAsync(org.apache.pulsar.client.impl.conf.ConsumerConfigurationData, org.apache.pulsar.client.api.Schema, org.apache.pulsar.client.impl.ConsumerInterceptors)

简单起见,我们这只看单主题订阅的情况:
private <T> CompletableFuture<Consumer<T>> singleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
// 调用 preProcessSchemaBeforeSubscribe 方法预处理模式(schema)
return preProcessSchemaBeforeSubscribe(this, schema, conf.getSingleTopic())
// 使用 thenCompose 将返回的 CompletableFuture 进行链式调用
.thenCompose(schemaClone -> doSingleTopicSubscribeAsync(conf, schemaClone, interceptors));
}
preProcessSchemaBeforeSubscribe方法:

doSingleTopicSubscribeAsync方法:

多分区订阅和多主题订阅本质上是一样的,都是使用 MultiTopicsConsumerImpl 管理多个主题(因为 Pulsar 中分区只不过是一个包含后缀 -partition-的主题)。
这里我们还是来关注单分区订阅,ConsumerImpl.newConsumerImpl(...) 只是将参数原封不动传给其构造方法,构造方法包括了 consumer 内部一些字段的初始化。


我们还是只关注重点,那就是 Consumer 和 Broker 如何交互的?实际上这部分逻辑在构造方法的最后面,调用的 grabCnx 方法:
void grabCnx() {
this.connectionHandler.grabCnx();
}
3.2 Consumer 与 Broker 连接的建立
我们先来看下 connectionHandler 的构造:
// 创建一个新的 ConnectionHandler 实例,并将其赋值给当前类的 connectionHandler 属性
this.connectionHandler = new ConnectionHandler(
this, // 当前类的实例,作为上下文或回调传递给 ConnectionHandler
// 使用 BackoffBuilder 构建一个回退策略对象
new BackoffBuilder()
.setInitialTime(
client.getConfiguration().getInitialBackoffIntervalNanos(), // 从客户端配置中获取初始回退时间间隔(纳秒)
TimeUnit.NANOSECONDS // 指定时间单位为纳秒
)
.setMax(
client.getConfiguration().getMaxBackoffIntervalNanos(), // 从客户端配置中获取最大回退时间间隔(纳秒)
TimeUnit.NANOSECONDS // 指定时间单位为纳秒
)
.setMandatoryStop(0, TimeUnit.MILLISECONDS) // 设置强制停止时间为 0 毫秒,意味着没有强制停止时间
.create(), // 根据上述配置创建具体的回退策略对象
this// 再次传递当前类的实例,可能是为了在 ConnectionHandler 中使用当前类的某些方法或属性
);
可以看出,当前代码片段,只体现了固定时间间隔的退避策略,我发现其它很多中间件喜欢用指数退避逻辑。那么问题来了,如果这里要实现指数退避该怎么操作呢?
可以考虑修改 BackoffBuilder 的配置或使用支持指数退避的库。例如,可以添加类似 .setMultiplier(2) 的方法来指定每次重试时等待时间的增长因子。

可以看出 ConsumerImpl 与 ProducerImpl 一样去构造connectionHandler的实例,意思就是connectionHandler连接的组件在生产者和消费者中都持有,connectionHandler相当于两者的通信门面。

这段grabCnx()代码同样也适用于生产者以及多主题消费者,它们对应的类都实现了 Connection 接口,只需要实现各自的回调即可。

3.3 连接成功的回调

可以看到连接成功后,主要是发送 CommandSubscribe(订阅命令),Broker 处理成功后,consumer 就处于 Ready 状态,并且会发送 Flow 请求携带 permits 为接收队列大小的一半再发送给Broker,从这里看到Flow指令消息也只有consumer_id、messagePermits,消费者只给了Broker消费者id和消息许可的最大消息数量,从这里也可以看出,Pulsar的Consumer是push模式,消费者只告诉Broker许可的最大消息数量,剩下的交给Broker去推。


3.4 Broker 处理 Consumer 指令消息
Broker 对 TCP 协议的处理代码位于 org.apache.pulsar.broker.service.ServerCnx
对于 Consumer 而言,比较独有的就是 CommandSubscribe 和 CommandFlow。
3.4.1 CommandSubscribe 处理
@Override
protected void handleSubscribe(final CommandSubscribe subscribe) {
/**
* 1.参数验证与初始化
* 2.权限检查
* 3.提取订阅参数
* 这三步的代码省略
*/
// 检查是否允许操作主题
CompletableFuture<Boolean> isAuthorizedFuture = isTopicOperationAllowed(
topicName,
subscriptionName,
TopicOperation.CONSUME
);
isAuthorizedFuture.thenApply(isAuthorized -> {
if (isAuthorized) {
if (log.isDebugEnabled()) {
log.debug("[{}] Client is authorized to subscribe with role {}",
remoteAddress, getPrincipal());
}
log.info("[{}] Subscribing on topic {} / {}", remoteAddress, topicName, subscriptionName);
try {
// 元数据验证
Metadata.validateMetadata(metadata);
} catch (IllegalArgumentException iae) {
final String msg = iae.getMessage();
commandSender.sendErrorResponse(requestId, ServerError.MetadataError, msg);
returnnull;
}
CompletableFuture<Consumer> consumerFuture = new CompletableFuture<>();
// 检查是否已有相同ID的消费者
CompletableFuture<Consumer> existingConsumerFuture = consumers.putIfAbsent(consumerId,
consumerFuture);
// 已经创建过 broker consumer
if (existingConsumerFuture != null) {
if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) {
Consumer consumer = existingConsumerFuture.getNow(null);
log.info("[{}] Consumer with the same id is already created:"
+ " consumerId={}, consumer={}",
remoteAddress, consumerId, consumer);
// 创建完成则直接发送成功的响应
commandSender.sendSuccessResponse(requestId);
returnnull;
} else {
// There was an early request to create a consumer with same consumerId. This can happen
// when
// client timeout is lower the broker timeouts. We need to wait until the previous
// consumer
// creation request either complete or fails.
log.warn("[{}][{}][{}] Consumer with id is already present on the connection,"
+ " consumerId={}", remoteAddress, topicName, subscriptionName, consumerId);
// 之前已经有相同 consumerId 的 subscribe 请求,这是因为 client timeout 小于 broker timeout,
// 因此 client 发生重试,此时需要等待之前的 consumer future 完成。
ServerError error = null;
if (!existingConsumerFuture.isDone()) {
// 前一个 subscribe 请求还未完成,直接返回 ServiceNotReady。
error = ServerError.ServiceNotReady;
} else {
// 前一个 subscribe 请求异常完成,则返回同样的错误码并将其移除 cache 避免下次重新进入此分支。
error = getErrorCode(existingConsumerFuture);
consumers.remove(consumerId, existingConsumerFuture);
}
commandSender.sendErrorResponse(requestId, error,
"Consumer is already present on the connection");
returnnull;
}
}
// 判断是否自动创建 topic,forceTopicCreation 为 client 填充的字段(proto字段定义默认为true)
// service 对象则是通过配置或者 system topic 的配置来判断是否允许自动创建 topic
boolean createTopicIfDoesNotExist = forceTopicCreation
&& service.isAllowAutoTopicCreation(topicName.toString());
// 当前 broker 获取或创建 Topic 对象
service.getTopic(topicName.toString(), createTopicIfDoesNotExist)
.thenCompose(optTopic -> {
if (!optTopic.isPresent()) {
return FutureUtil
.failedFuture(new TopicNotFoundException(
"Topic " + topicName + " does not exist"));
}
Topic topic = optTopic.get();
// 对于 durable cursor 而言,如果该订阅不存在且不允许订阅自动创建,subscribe 会失败
boolean rejectSubscriptionIfDoesNotExist = isDurable
&& !service.isAllowAutoSubscriptionCreation(topicName.toString())
&& !topic.getSubscriptions().containsKey(subscriptionName);
if (rejectSubscriptionIfDoesNotExist) {
return FutureUtil
.failedFuture(
new SubscriptionNotFoundException(
"Subscription does not exist"));
}
// 若带有 schema 则先检查 schema 兼容性,最后都会调用 Topic#subscribe
if (schema != null) {
return topic.addSchemaIfIdleOrCheckCompatible(schema)
.thenCompose(v -> topic.subscribe(
ServerCnx.this, subscriptionName, consumerId,
subType, priorityLevel, consumerName, isDurable,
startMessageId, metadata,
readCompacted, initialPosition, startMessageRollbackDurationSec,
isReplicated, keySharedMeta));
} else {
return topic.subscribe(ServerCnx.this, subscriptionName, consumerId,
subType, priorityLevel, consumerName, isDurable,
startMessageId, metadata, readCompacted, initialPosition,
startMessageRollbackDurationSec, isReplicated, keySharedMeta);
}
})
.thenAccept(consumer -> {
if (consumerFuture.complete(consumer)) {
log.info("[{}] Created subscription on topic {} / {}",
remoteAddress, topicName, subscriptionName);
commandSender.sendSuccessResponse(requestId);
} else {
// The consumer future was completed before by a close command
// 如果 consumerFuture 已经完成,则当前 consumer 是 client timeout 重新创建的 consumer
// 此时需要关闭 consumer 并移除这个 future
try {
consumer.close();
log.info("[{}] Cleared consumer created after timeout on client side {}",
remoteAddress, consumer);
} catch (BrokerServiceException e) {
log.warn(
"[{}] Error closing consumer created"
+ " after timeout on client side {}: {}",
remoteAddress, consumer, e.getMessage());
}
consumers.remove(consumerId, consumerFuture);
}
})
.exceptionally(exception -> {
if (exception.getCause() instanceof ConsumerBusyException) {
if (log.isDebugEnabled()) {
log.debug(
"[{}][{}][{}] Failed to create consumer because exclusive consumer"
+ " is already connected: {}",
remoteAddress, topicName, subscriptionName,
exception.getCause().getMessage());
}
} elseif (exception.getCause() instanceof BrokerServiceException) {
log.warn("[{}][{}][{}] Failed to create consumer: consumerId={}, {}",
remoteAddress, topicName, subscriptionName,
consumerId, exception.getCause().getMessage());
} else {
log.warn("[{}][{}][{}] Failed to create consumer: consumerId={}, {}",
remoteAddress, topicName, subscriptionName,
consumerId, exception.getCause().getMessage(), exception);
}
// If client timed out, the future would have been completed by subsequent close.
// Send error
// back to client, only if not completed already.
if (consumerFuture.completeExceptionally(exception)) {
commandSender.sendErrorResponse(requestId,
BrokerServiceException.getClientErrorCode(exception),
exception.getCause().getMessage());
}
consumers.remove(consumerId, consumerFuture);
returnnull;
});
} else {
String msg = "Client is not authorized to subscribe";
log.warn("[{}] {} with role {}", remoteAddress, msg, getPrincipal());
ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg));
}
returnnull;
}).exceptionally(ex -> {
logAuthException(remoteAddress, "subscribe", getPrincipal(), Optional.of(topicName), ex);
commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, ex.getMessage());
returnnull;
});
}
小结:
处理 subscribe 请求核心调用是:
其中 Topic 和 Consumer 是 Broker 端对主题和消费者的抽象,负责管理对应的资源。均位于 org.apache.pulsar.broker.service 包下。

可以看出,这里有持久化主题和非持久化主题,这里我们重点看 PersistentTopic#subscribe。
/**
* 订阅主题并创建消费者
*
* @param cnx 客户端连接对象
* @param subscriptionName 订阅名称
* @param consumerId 消费者ID
* @param subType 订阅类型(如 Failover, Exclusive 等)
* @param priorityLevel 消费者的优先级
* @param consumerName 消费者名称
* @param isDurable 是否为持久化订阅
* @param startMessageId 开始消费的消息ID
* @param metadata 元数据
* @param readCompacted 是否读取压缩后的消息
* @param initialPosition 初始位置(最早或最晚)
* @param startMessageRollbackDurationSec 消息回滚时间
* @param replicatedSubscriptionStateArg 是否启用复制订阅状态
* @param keySharedMeta Key-Shared 元数据
* @return 返回一个 CompletableFuture<Consumer>,表示订阅操作的结果
*/
@Override
public CompletableFuture<Consumer> subscribe(final TransportCnx cnx, String subscriptionName, long consumerId,
SubType subType, int priorityLevel, String consumerName,
boolean isDurable, MessageId startMessageId,
Map<String, String> metadata, boolean readCompacted,
InitialPosition initialPosition,
long startMessageRollbackDurationSec,
boolean replicatedSubscriptionStateArg,
KeySharedMeta keySharedMeta) {
// 如果启用了读取压缩消息,但订阅类型不是 Failover 或 Exclusive,则返回失败
if (readCompacted && !(subType == SubType.Failover || subType == SubType.Exclusive)) {
return FutureUtil.failedFuture(new NotAllowedException(
"readCompacted only allowed on failover or exclusive subscriptions"));
}
// 检查主题的命名空间所有权
return brokerService.checkTopicNsOwnership(getName()).thenCompose(__ -> {
/**
* 1.如果启用了复制订阅状态,但代理配置中未启用,则禁用复制订阅状态
* 2.如果订阅类型是 Key_Shared,但代理配置中未启用,则返回失败
* 3.检查主题是否支持指定的订阅类型
* 4.检查订阅名称是否为空
* 5.检查消费者是否支持批量消息
* 6.检查订阅名称是否为保留名称
* 以上检查代码省略
*/
// 检查订阅速率限制
// 用连接地址作为 key,检查 consumer 对应的 RateLimiter 是否存在,限制重连次数,因为重连的连接地址是相同的。
// 参考 https://github.com/apache/pulsar/pull/2977
if (cnx.clientAddress() != null && cnx.clientAddress().toString().contains(":")) {
SubscribeRateLimiter.ConsumerIdentifier consumer = new SubscribeRateLimiter.ConsumerIdentifier(
cnx.clientAddress().toString().split(":")[0], consumerName, consumerId);
if (subscribeRateLimiter.isPresent() && (!subscribeRateLimiter.get().subscribeAvailable(consumer)
|| !subscribeRateLimiter.get().tryAcquire(consumer))) {
log.warn("[{}] Failed to create subscription for {} {} limited by {}, available {}",
topic, subscriptionName, consumer, subscribeRateLimiter.get().getSubscribeRate(),
subscribeRateLimiter.get().getAvailableSubscribeRateLimit(consumer));
return FutureUtil.failedFuture(
new NotAllowedException("Subscribe limited by subscribe rate limit per consumer."));
}
}
lock.readLock().lock();
try {
// 当 topic 被删除或关闭,或者 producer 写消息失败时都会标记为 fence 状态,此时禁止订阅。
if (isFenced) {
log.warn("[{}] Attempting to subscribe to a fenced topic", topic);
return FutureUtil.failedFuture(new TopicFencedException("Topic is temporarily unavailable"));
}
// 实际上是增加 usageCount(连接的 producer 和 consumer 总数),传参数是为了打印 debug 日志
handleConsumerAdded(subscriptionName, consumerName);
} finally {
lock.readLock().unlock();
}
// 根据是否持久化订阅,获取相应的订阅对象
CompletableFuture<? extends Subscription> subscriptionFuture = isDurable ? //
getDurableSubscription(subscriptionName, initialPosition, startMessageRollbackDurationSec,
replicatedSubscriptionState)
: getNonDurableSubscription(subscriptionName, startMessageId, initialPosition,
startMessageRollbackDurationSec);
// 对于持久化订阅,可以获取最大的未确认的消息
int maxUnackedMessages = isDurable
? getMaxUnackedMessagesOnConsumer()
: 0;
CompletableFuture<Consumer> future = subscriptionFuture.thenCompose(subscription -> {
// 创建 Consumer 对象并加入到对应的订阅中
Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel,
consumerName, maxUnackedMessages, cnx, cnx.getAuthRole(), metadata,
readCompacted, initialPosition, keySharedMeta, startMessageId);
return addConsumerToSubscription(subscription, consumer).thenCompose(v -> {
checkBackloggedCursors();
if (!cnx.isActive()) { // 连接已经断开,则需要关闭 consumer
try {
consumer.close();
} catch (BrokerServiceException e) {
if (e instanceof ConsumerBusyException) {
log.warn("[{}][{}] Consumer {} {} already connected",
topic, subscriptionName, consumerId, consumerName);
} elseif (e instanceof SubscriptionBusyException) {
log.warn("[{}][{}] {}", topic, subscriptionName, e.getMessage());
}
// 减少使用计数
decrementUsageCount();
return FutureUtil.failedFuture(e);
}
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] [{}] Subscribe failed -- count: {}", topic, subscriptionName,
consumer.consumerName(), currentUsageCount());
}
decrementUsageCount();
return FutureUtil.failedFuture(
new BrokerServiceException("Connection was closed while the opening the cursor "));
} else {
// 连接仍存活,则继续检查复制订阅的状态,至此完成整个订阅。
checkReplicatedSubscriptionControllerState();
log.info("[{}][{}] Created new subscription for {}", topic, subscriptionName, consumerId);
return CompletableFuture.completedFuture(consumer);
}
});
});
future.exceptionally(ex -> {
decrementUsageCount();
if (ex.getCause() instanceof ConsumerBusyException) {
log.warn("[{}][{}] Consumer {} {} already connected", topic, subscriptionName, consumerId,
consumerName);
} elseif (ex.getCause() instanceof SubscriptionBusyException) {
log.warn("[{}][{}] {}", topic, subscriptionName, ex.getMessage());
} else {
log.error("[{}] Failed to create subscription: {}", topic, subscriptionName, ex);
}
returnnull;
});
return future;
});
}
核心流程:
小结:
1. 持久化与非持久化订阅的区别
2.消费者加入订阅的过程
当消费者加入订阅时,系统会根据订阅类型创建相应的分发器(Dispatcher)。以默认的 Exclusive 订阅为例:
注:https://github.com/apache/pulsar/pull/9056 引入了 streaming dispatcher。

将 consumer 加入 dispatcher:

订阅里实际负责消息分发的就是 dispatcher。
3.4.2 CommandFlow 处理
直接定位到Netty读取消息的位置: org.apache.pulsar.common.protocol.PulsarDecoder#channelRead

指定handleFlow处理器处理:org.apache.pulsar.broker.service.ServerCnx#handleFlow

然后递交给 Consumer:

可见,更新完 Consumer 内部的 messagePermits 后,交由对应的 Subscription 对象处理。这里仅看 PersistentSubscription 的实现:

仅仅是传递给 dispatcher,因此实际的处理是由 dispatcher 完成的。

5.1 Pulsar 消费者订阅 Topic 的流程
5.1.1 Client 端
subscribe 命令给 Broker,注册自己为该 Topic 的消费者。Flow 请求,携带 permits 参数。permits 的值通常是内部缓冲区大小的一半(对于无缓冲区的零队列消费者例外)。这个请求告知 Broker 客户端可以接收的消息数量。5.1.2 Broker 端
subscribe 命令后,在对应的 Topic 中创建或查找已有的 Subscription。Exclusive、Failover、Shared 等),Broker 会创建相应的 Dispatcher 实例。Dispatcher 负责管理和分发消息给消费者。cursor;对于非持久化订阅(NonPersistentSubscription),则在内存中管理消费进度。Flow 请求时,它会根据 permits 的值调整 Dispatcher 的行为,确保按照客户端的需求发送消息。5.2 Push vs Pull 模型
5.2.1 Kafka 的 Pull 模型
FETCH 请求给 Broker 来拉取消息。FETCH 响应中返回读取的消息。5.2.2 Pulsar 的 Push 模型
Flow 请求,告知 Broker 自己可以缓存多少条消息。Flow 请求中的 permits 参数灵活定制 Dispatcher 的行为,并主动推送消息给客户端。5.2.3 Kafka与Pulsar消费者模型对比
FETCH 请求拉取消息。Flow 请求告知 Broker 缓存能力,Broker 主动推送消息。这种设计使得 Pulsar 的 Push 模型能够更好地适应高吞吐量和低延迟的场景,因为服务端可以根据客户端的反馈灵活调整消息推送策略。
欢迎大家关注我的公众号【老周聊架构】,AI、大数据、云原生、物联网等相关领域的技术知识分享。