首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >聊聊 Pulsar:Consumer 源码解析

聊聊 Pulsar:Consumer 源码解析

作者头像
老周聊架构
发布2025-11-20 10:52:49
发布2025-11-20 10:52:49
880
举报

一、前言

Apache Pulsar 是一款开源的分布式消息系统,以其多租户、高性能、存储与计算分离等特性在现代消息队列领域占据一席之地。作为消息队列的重要组成部分,Consumer(消费者)承担了从 Broker 获取消息并处理的关键角色,其设计直接关系到系统的吞吐量、延迟以及可靠性。理解 Pulsar Consumer 的源码实现,不仅能帮助我们更好地掌握消息消费的流程和机制,还能为性能优化、故障排查及二次开发提供指导。

在本文中,我们将深入分析 Pulsar Consumer 的源码结构与实现细节,从创建流程、消息拉取、消息确认到负载均衡等方面全面剖析其工作原理,揭示其背后的设计思想与工程实践。无论是初学者还是有经验的开发者,相信都能通过这篇文章获得关于 Pulsar Consumer 的新认知。

二、Consumer测试类

老规矩,我们先以Consumer消费消息,来跟进Consumer的相关源码流程。

代码语言:javascript
复制
/**
 * @author: 微信公众号【老周聊架构】
 */
publicclass PulsarClientTest {
    public static void main(String[] args) throws PulsarClientException {
        PulsarClient client = PulsarClient.builder()
                .listenerThreads(1)
                .ioThreads(1)
                .serviceUrl("pulsar://127.0.0.1:6650")
                .build();

        Consumer<byte[]> consumer = client.newConsumer()
            .topic("my-topic")
            .subscriptionName("my-sub")
            .subscribe();
        Message<byte[]> message = consumer.receive();
        System.out.println("receive message" + message.getData());
    }
}

从上面的代码可以看出 Pulsar 为用户提供了非常简洁方便的 API,在使用时,只需要如下两步:

  • 创建 Pulsar Consumer 实例
  • 调用 receive 接口消费数据

三、Pulsar Consumer 实例化与消费数据

3.1 消费者的创建

在这里插入图片描述
在这里插入图片描述

创建 PulsarClient 时实际上是创建了 PulsarClientImpl 对象,其中 newConsumer 方法是创建一个 builder 用于链式调用:

其中 builder 的方法就不仔细阅读了,主要是对参数进行必要的验证后设置相应字段,比如必要的是主题名和订阅名,都保存在 ClientConfigurationData conf 中。

主要跟到 org.apache.pulsar.client.impl.ConsumerBuilderImpl#subscribeAsync

在这里插入图片描述
在这里插入图片描述

上述代码小结:

这段代码的主要功能是通过一系列配置验证和处理步骤来确保消费者配置的正确性,并根据配置动态生成重试和死信队列的主题名称。最终,它会创建并返回一个 Consumer实例。具体来说:

  • 配置验证:确保主题名称或模式、订阅名称等必要参数已正确设置。
  • 重试和死信队列处理:根据配置生成重试和死信队列的主题名称,并进行兼容性检查。
  • 创建消费者:根据是否有拦截器列表选择不同的创建方式,并返回创建好的消费者实例。

继续跟进 org.apache.pulsar.client.impl.PulsarClientImpl#subscribeAsync(org.apache.pulsar.client.impl.conf.ConsumerConfigurationData, org.apache.pulsar.client.api.Schema, org.apache.pulsar.client.impl.ConsumerInterceptors)

在这里插入图片描述
在这里插入图片描述

简单起见,我们这只看单主题订阅的情况:

代码语言:javascript
复制
private <T> CompletableFuture<Consumer<T>> singleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
    // 调用 preProcessSchemaBeforeSubscribe 方法预处理模式(schema)
    return preProcessSchemaBeforeSubscribe(this, schema, conf.getSingleTopic())
        // 使用 thenCompose 将返回的 CompletableFuture 进行链式调用
        .thenCompose(schemaClone -> doSingleTopicSubscribeAsync(conf, schemaClone, interceptors));
}

preProcessSchemaBeforeSubscribe方法:

在这里插入图片描述
在这里插入图片描述

doSingleTopicSubscribeAsync方法:

在这里插入图片描述
在这里插入图片描述

多分区订阅和多主题订阅本质上是一样的,都是使用 MultiTopicsConsumerImpl 管理多个主题(因为 Pulsar 中分区只不过是一个包含后缀 -partition-的主题)。

这里我们还是来关注单分区订阅,ConsumerImpl.newConsumerImpl(...) 只是将参数原封不动传给其构造方法,构造方法包括了 consumer 内部一些字段的初始化。

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

我们还是只关注重点,那就是 Consumer 和 Broker 如何交互的?实际上这部分逻辑在构造方法的最后面,调用的 grabCnx 方法:

代码语言:javascript
复制
void grabCnx() {
    this.connectionHandler.grabCnx();
}

3.2 Consumer 与 Broker 连接的建立

我们先来看下 connectionHandler 的构造:

代码语言:javascript
复制
// 创建一个新的 ConnectionHandler 实例,并将其赋值给当前类的 connectionHandler 属性
this.connectionHandler = new ConnectionHandler(
    this, // 当前类的实例,作为上下文或回调传递给 ConnectionHandler
    // 使用 BackoffBuilder 构建一个回退策略对象
    new BackoffBuilder()
        .setInitialTime(
            client.getConfiguration().getInitialBackoffIntervalNanos(), // 从客户端配置中获取初始回退时间间隔(纳秒)
            TimeUnit.NANOSECONDS // 指定时间单位为纳秒
        )
        .setMax(
            client.getConfiguration().getMaxBackoffIntervalNanos(), // 从客户端配置中获取最大回退时间间隔(纳秒)
            TimeUnit.NANOSECONDS // 指定时间单位为纳秒
        )
        .setMandatoryStop(0, TimeUnit.MILLISECONDS) // 设置强制停止时间为 0 毫秒,意味着没有强制停止时间
        .create(), // 根据上述配置创建具体的回退策略对象
    this// 再次传递当前类的实例,可能是为了在 ConnectionHandler 中使用当前类的某些方法或属性
);

可以看出,当前代码片段,只体现了固定时间间隔的退避策略,我发现其它很多中间件喜欢用指数退避逻辑。那么问题来了,如果这里要实现指数退避该怎么操作呢?

可以考虑修改 BackoffBuilder 的配置或使用支持指数退避的库。例如,可以添加类似 .setMultiplier(2) 的方法来指定每次重试时等待时间的增长因子。

在这里插入图片描述
在这里插入图片描述

可以看出 ConsumerImpl 与 ProducerImpl 一样去构造connectionHandler的实例,意思就是connectionHandler连接的组件在生产者和消费者中都持有,connectionHandler相当于两者的通信门面。

在这里插入图片描述
在这里插入图片描述

这段grabCnx()代码同样也适用于生产者以及多主题消费者,它们对应的类都实现了 Connection 接口,只需要实现各自的回调即可。

在这里插入图片描述
在这里插入图片描述

3.3 连接成功的回调

在这里插入图片描述
在这里插入图片描述

可以看到连接成功后,主要是发送 CommandSubscribe(订阅命令),Broker 处理成功后,consumer 就处于 Ready 状态,并且会发送 Flow 请求携带 permits 为接收队列大小的一半再发送给Broker,从这里看到Flow指令消息也只有consumer_id、messagePermits,消费者只给了Broker消费者id和消息许可的最大消息数量,从这里也可以看出,Pulsar的Consumer是push模式,消费者只告诉Broker许可的最大消息数量,剩下的交给Broker去推。

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

3.4 Broker 处理 Consumer 指令消息

Broker 对 TCP 协议的处理代码位于 org.apache.pulsar.broker.service.ServerCnx

对于 Consumer 而言,比较独有的就是 CommandSubscribe 和 CommandFlow。

3.4.1 CommandSubscribe 处理

代码语言:javascript
复制
@Override
protected void handleSubscribe(final CommandSubscribe subscribe) {
    /**
     * 1.参数验证与初始化
     * 2.权限检查
     * 3.提取订阅参数
     * 这三步的代码省略
     */
    // 检查是否允许操作主题
    CompletableFuture<Boolean> isAuthorizedFuture = isTopicOperationAllowed(
            topicName,
            subscriptionName,
            TopicOperation.CONSUME
    );
    isAuthorizedFuture.thenApply(isAuthorized -> {
        if (isAuthorized) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Client is authorized to subscribe with role {}",
                        remoteAddress, getPrincipal());
            }

            log.info("[{}] Subscribing on topic {} / {}", remoteAddress, topicName, subscriptionName);
            try {
                // 元数据验证
                Metadata.validateMetadata(metadata);
            } catch (IllegalArgumentException iae) {
                final String msg = iae.getMessage();
                commandSender.sendErrorResponse(requestId, ServerError.MetadataError, msg);
                returnnull;
            }
            CompletableFuture<Consumer> consumerFuture = new CompletableFuture<>();
            // 检查是否已有相同ID的消费者
            CompletableFuture<Consumer> existingConsumerFuture = consumers.putIfAbsent(consumerId,
                    consumerFuture);
            // 已经创建过 broker consumer
            if (existingConsumerFuture != null) {
                if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) {
                    Consumer consumer = existingConsumerFuture.getNow(null);
                    log.info("[{}] Consumer with the same id is already created:"
                             + " consumerId={}, consumer={}",
                             remoteAddress, consumerId, consumer);
                    // 创建完成则直接发送成功的响应
                    commandSender.sendSuccessResponse(requestId);
                    returnnull;
                } else {
                    // There was an early request to create a consumer with same consumerId. This can happen
                    // when
                    // client timeout is lower the broker timeouts. We need to wait until the previous
                    // consumer
                    // creation request either complete or fails.
                    log.warn("[{}][{}][{}] Consumer with id is already present on the connection,"
                             + " consumerId={}", remoteAddress, topicName, subscriptionName, consumerId);
                    // 之前已经有相同 consumerId 的 subscribe 请求,这是因为 client timeout 小于 broker timeout,
                    // 因此 client 发生重试,此时需要等待之前的 consumer future 完成。
                    ServerError error = null;
                    if (!existingConsumerFuture.isDone()) {
                        // 前一个 subscribe 请求还未完成,直接返回 ServiceNotReady。
                        error = ServerError.ServiceNotReady;
                    } else {
                        // 前一个 subscribe 请求异常完成,则返回同样的错误码并将其移除 cache 避免下次重新进入此分支。
                        error = getErrorCode(existingConsumerFuture);
                        consumers.remove(consumerId, existingConsumerFuture);
                    }
                    commandSender.sendErrorResponse(requestId, error,
                            "Consumer is already present on the connection");
                    returnnull;
                }
            }

            // 判断是否自动创建 topic,forceTopicCreation 为 client 填充的字段(proto字段定义默认为true)
            // service 对象则是通过配置或者 system topic 的配置来判断是否允许自动创建 topic
            boolean createTopicIfDoesNotExist = forceTopicCreation
                    && service.isAllowAutoTopicCreation(topicName.toString());
            // 当前 broker 获取或创建 Topic 对象
            service.getTopic(topicName.toString(), createTopicIfDoesNotExist)
                    .thenCompose(optTopic -> {
                        if (!optTopic.isPresent()) {
                            return FutureUtil
                                    .failedFuture(new TopicNotFoundException(
                                            "Topic " + topicName + " does not exist"));
                        }

                        Topic topic = optTopic.get();
                        // 对于 durable cursor 而言,如果该订阅不存在且不允许订阅自动创建,subscribe 会失败
                        boolean rejectSubscriptionIfDoesNotExist = isDurable
                            && !service.isAllowAutoSubscriptionCreation(topicName.toString())
                            && !topic.getSubscriptions().containsKey(subscriptionName);

                        if (rejectSubscriptionIfDoesNotExist) {
                            return FutureUtil
                                    .failedFuture(
                                            new SubscriptionNotFoundException(
                                                    "Subscription does not exist"));
                        }
                        // 若带有 schema 则先检查 schema 兼容性,最后都会调用 Topic#subscribe
                        if (schema != null) {
                            return topic.addSchemaIfIdleOrCheckCompatible(schema)
                                    .thenCompose(v -> topic.subscribe(
                                            ServerCnx.this, subscriptionName, consumerId,
                                            subType, priorityLevel, consumerName, isDurable,
                                            startMessageId, metadata,
                                            readCompacted, initialPosition, startMessageRollbackDurationSec,
                                            isReplicated, keySharedMeta));
                        } else {
                            return topic.subscribe(ServerCnx.this, subscriptionName, consumerId,
                                subType, priorityLevel, consumerName, isDurable,
                                startMessageId, metadata, readCompacted, initialPosition,
                                startMessageRollbackDurationSec, isReplicated, keySharedMeta);
                        }
                    })
                    .thenAccept(consumer -> {
                        if (consumerFuture.complete(consumer)) {
                            log.info("[{}] Created subscription on topic {} / {}",
                                    remoteAddress, topicName, subscriptionName);
                            commandSender.sendSuccessResponse(requestId);
                        } else {
                            // The consumer future was completed before by a close command
                            // 如果 consumerFuture 已经完成,则当前 consumer 是 client timeout 重新创建的 consumer
                            // 此时需要关闭 consumer 并移除这个 future
                            try {
                                consumer.close();
                                log.info("[{}] Cleared consumer created after timeout on client side {}",
                                        remoteAddress, consumer);
                            } catch (BrokerServiceException e) {
                                log.warn(
                                        "[{}] Error closing consumer created"
                                                + " after timeout on client side {}: {}",
                                        remoteAddress, consumer, e.getMessage());
                            }
                            consumers.remove(consumerId, consumerFuture);
                        }

                    })
                    .exceptionally(exception -> {
                        if (exception.getCause() instanceof ConsumerBusyException) {
                            if (log.isDebugEnabled()) {
                                log.debug(
                                        "[{}][{}][{}] Failed to create consumer because exclusive consumer"
                                                + " is already connected: {}",
                                        remoteAddress, topicName, subscriptionName,
                                        exception.getCause().getMessage());
                            }
                        } elseif (exception.getCause() instanceof BrokerServiceException) {
                            log.warn("[{}][{}][{}] Failed to create consumer: consumerId={}, {}",
                                     remoteAddress, topicName, subscriptionName,
                                     consumerId,  exception.getCause().getMessage());
                        } else {
                            log.warn("[{}][{}][{}] Failed to create consumer: consumerId={}, {}",
                                     remoteAddress, topicName, subscriptionName,
                                     consumerId, exception.getCause().getMessage(), exception);
                        }

                        // If client timed out, the future would have been completed by subsequent close.
                        // Send error
                        // back to client, only if not completed already.
                        if (consumerFuture.completeExceptionally(exception)) {
                            commandSender.sendErrorResponse(requestId,
                                    BrokerServiceException.getClientErrorCode(exception),
                                    exception.getCause().getMessage());
                        }
                        consumers.remove(consumerId, consumerFuture);

                        returnnull;

                    });
        } else {
            String msg = "Client is not authorized to subscribe";
            log.warn("[{}] {} with role {}", remoteAddress, msg, getPrincipal());
            ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg));
        }
        returnnull;
    }).exceptionally(ex -> {
        logAuthException(remoteAddress, "subscribe", getPrincipal(), Optional.of(topicName), ex);
        commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, ex.getMessage());
        returnnull;
    });
}

小结:

处理 subscribe 请求核心调用是:

  • BrokerService#getTopic:获取当前 Broker 所拥有(own)的 Topic 对象
  • Topic#subscribe:在 Topic 对象中创建对应的订阅,并得到 Consumer 对象

其中 Topic 和 Consumer 是 Broker 端对主题和消费者的抽象,负责管理对应的资源。均位于 org.apache.pulsar.broker.service 包下。

在这里插入图片描述
在这里插入图片描述

可以看出,这里有持久化主题和非持久化主题,这里我们重点看 PersistentTopic#subscribe

代码语言:javascript
复制
/**
 * 订阅主题并创建消费者
 *
 * @param cnx                     客户端连接对象
 * @param subscriptionName        订阅名称
 * @param consumerId              消费者ID
 * @param subType                 订阅类型(如 Failover, Exclusive 等)
 * @param priorityLevel           消费者的优先级
 * @param consumerName            消费者名称
 * @param isDurable               是否为持久化订阅
 * @param startMessageId          开始消费的消息ID
 * @param metadata                元数据
 * @param readCompacted           是否读取压缩后的消息
 * @param initialPosition         初始位置(最早或最晚)
 * @param startMessageRollbackDurationSec 消息回滚时间
 * @param replicatedSubscriptionStateArg 是否启用复制订阅状态
 * @param keySharedMeta           Key-Shared 元数据
 * @return                        返回一个 CompletableFuture<Consumer>,表示订阅操作的结果
 */
@Override
public CompletableFuture<Consumer> subscribe(final TransportCnx cnx, String subscriptionName, long consumerId,
                                             SubType subType, int priorityLevel, String consumerName,
                                             boolean isDurable, MessageId startMessageId,
                                             Map<String, String> metadata, boolean readCompacted,
                                             InitialPosition initialPosition,
                                             long startMessageRollbackDurationSec,
                                             boolean replicatedSubscriptionStateArg,
                                             KeySharedMeta keySharedMeta) {
    // 如果启用了读取压缩消息,但订阅类型不是 Failover 或 Exclusive,则返回失败
    if (readCompacted && !(subType == SubType.Failover || subType == SubType.Exclusive)) {
        return FutureUtil.failedFuture(new NotAllowedException(
                "readCompacted only allowed on failover or exclusive subscriptions"));
    }
    // 检查主题的命名空间所有权
    return brokerService.checkTopicNsOwnership(getName()).thenCompose(__ -> {
        /**
         * 1.如果启用了复制订阅状态,但代理配置中未启用,则禁用复制订阅状态
         * 2.如果订阅类型是 Key_Shared,但代理配置中未启用,则返回失败
         * 3.检查主题是否支持指定的订阅类型
         * 4.检查订阅名称是否为空
         * 5.检查消费者是否支持批量消息
         * 6.检查订阅名称是否为保留名称
         * 以上检查代码省略
         */
        
        // 检查订阅速率限制
        // 用连接地址作为 key,检查 consumer 对应的 RateLimiter 是否存在,限制重连次数,因为重连的连接地址是相同的。
        // 参考 https://github.com/apache/pulsar/pull/2977
        if (cnx.clientAddress() != null && cnx.clientAddress().toString().contains(":")) {
            SubscribeRateLimiter.ConsumerIdentifier consumer = new SubscribeRateLimiter.ConsumerIdentifier(
                    cnx.clientAddress().toString().split(":")[0], consumerName, consumerId);
            if (subscribeRateLimiter.isPresent() && (!subscribeRateLimiter.get().subscribeAvailable(consumer)
                    || !subscribeRateLimiter.get().tryAcquire(consumer))) {
                log.warn("[{}] Failed to create subscription for {} {} limited by {}, available {}",
                        topic, subscriptionName, consumer, subscribeRateLimiter.get().getSubscribeRate(),
                        subscribeRateLimiter.get().getAvailableSubscribeRateLimit(consumer));
                return FutureUtil.failedFuture(
                        new NotAllowedException("Subscribe limited by subscribe rate limit per consumer."));
            }
        }

        lock.readLock().lock();
        try {
            // 当 topic 被删除或关闭,或者 producer 写消息失败时都会标记为 fence 状态,此时禁止订阅。
            if (isFenced) {
                log.warn("[{}] Attempting to subscribe to a fenced topic", topic);
                return FutureUtil.failedFuture(new TopicFencedException("Topic is temporarily unavailable"));
            }
            // 实际上是增加 usageCount(连接的 producer 和 consumer 总数),传参数是为了打印 debug 日志
            handleConsumerAdded(subscriptionName, consumerName);
        } finally {
            lock.readLock().unlock();
        }
        // 根据是否持久化订阅,获取相应的订阅对象
        CompletableFuture<? extends Subscription> subscriptionFuture = isDurable ? //
                getDurableSubscription(subscriptionName, initialPosition, startMessageRollbackDurationSec,
                        replicatedSubscriptionState)
                : getNonDurableSubscription(subscriptionName, startMessageId, initialPosition,
                startMessageRollbackDurationSec);
        // 对于持久化订阅,可以获取最大的未确认的消息
        int maxUnackedMessages = isDurable
                ? getMaxUnackedMessagesOnConsumer()
                : 0;

        CompletableFuture<Consumer> future = subscriptionFuture.thenCompose(subscription -> {
            // 创建 Consumer 对象并加入到对应的订阅中
            Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel,
                    consumerName, maxUnackedMessages, cnx, cnx.getAuthRole(), metadata,
                    readCompacted, initialPosition, keySharedMeta, startMessageId);
            return addConsumerToSubscription(subscription, consumer).thenCompose(v -> {
                checkBackloggedCursors();
                if (!cnx.isActive()) { // 连接已经断开,则需要关闭 consumer
                    try {
                        consumer.close();
                    } catch (BrokerServiceException e) {
                        if (e instanceof ConsumerBusyException) {
                            log.warn("[{}][{}] Consumer {} {} already connected",
                                    topic, subscriptionName, consumerId, consumerName);
                        } elseif (e instanceof SubscriptionBusyException) {
                            log.warn("[{}][{}] {}", topic, subscriptionName, e.getMessage());
                        }
                        // 减少使用计数
                        decrementUsageCount();
                        return FutureUtil.failedFuture(e);
                    }
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] [{}] [{}] Subscribe failed -- count: {}", topic, subscriptionName,
                                consumer.consumerName(), currentUsageCount());
                    }

                    decrementUsageCount();
                    return FutureUtil.failedFuture(
                            new BrokerServiceException("Connection was closed while the opening the cursor "));
                } else {
                    // 连接仍存活,则继续检查复制订阅的状态,至此完成整个订阅。
                    checkReplicatedSubscriptionControllerState();
                    log.info("[{}][{}] Created new subscription for {}", topic, subscriptionName, consumerId);
                    return CompletableFuture.completedFuture(consumer);
                }
            });
        });

        future.exceptionally(ex -> {
            decrementUsageCount();

            if (ex.getCause() instanceof ConsumerBusyException) {
                log.warn("[{}][{}] Consumer {} {} already connected", topic, subscriptionName, consumerId,
                        consumerName);
            } elseif (ex.getCause() instanceof SubscriptionBusyException) {
                log.warn("[{}][{}] {}", topic, subscriptionName, ex.getMessage());
            } else {
                log.error("[{}] Failed to create subscription: {}", topic, subscriptionName, ex);
            }
            returnnull;
        });
        return future;
    });
}

核心流程:

  • 获取 Subscription(Broker 对订阅的抽象),若不存在则创建。
  • 创建 Consumer(Broker 对消费者的抽象)后加入订阅。

小结:

1. 持久化与非持久化订阅的区别

  • 持久化订阅(PersistentSubscription):
    • Cursor 管理:持久化订阅会打开一个 cursor,该 cursor 对应于 PersistentTopic 内部的 ledger。如果 cursor 不存在,则会创建一个新的 cursor。
    • 消息 ID 处理:在创建持久化订阅时,如果 cursor 已存在,则直接使用现有的 cursor,而忽略掉 CommandSubscribe 中提供的初始消息 ID (MessageId)。这意味着持久化订阅依赖于 cursor 的状态来确定从哪里开始消费消息。
  • 非持久化订阅(NonPersistentSubscription):
    • 内存管理:非持久化订阅不会创建或打开 cursor,而是直接在内存中维护消息 ID (MessageIdImpl)。
    • 消息 ID 处理:非持久化订阅会根据 CommandSubscribe 中提供的初始消息 ID 来确定从哪里开始消费消息。由于没有持久化的 cursor,所有状态都保存在内存中。

2.消费者加入订阅的过程

当消费者加入订阅时,系统会根据订阅类型创建相应的分发器(Dispatcher)。以默认的 Exclusive 订阅为例:

  • 创建 Dispatcher:
    • 根据订阅类型(如 Exclusive、Failover、Shared 等),创建对应的 Dispatcher 实例。Dispatcher 负责管理和分发消息给消费者。
  • 添加 Consumer:
    • 将新创建的 Consumer 添加到订阅的 Dispatcher 中。这一步确保了消费者能够接收到订阅中的消息。
    • 如果订阅已经存在其他消费者,并且订阅类型不允许多个消费者(如 Exclusive),则会拒绝新的消费者加入,并返回相应的错误信息。

注:https://github.com/apache/pulsar/pull/9056 引入了 streaming dispatcher。

在这里插入图片描述
在这里插入图片描述

将 consumer 加入 dispatcher:

在这里插入图片描述
在这里插入图片描述

订阅里实际负责消息分发的就是 dispatcher。

3.4.2 CommandFlow 处理

直接定位到Netty读取消息的位置: org.apache.pulsar.common.protocol.PulsarDecoder#channelRead

在这里插入图片描述
在这里插入图片描述

指定handleFlow处理器处理:org.apache.pulsar.broker.service.ServerCnx#handleFlow

在这里插入图片描述
在这里插入图片描述

然后递交给 Consumer:

在这里插入图片描述
在这里插入图片描述

可见,更新完 Consumer 内部的 messagePermits 后,交由对应的 Subscription 对象处理。这里仅看 PersistentSubscription 的实现:

在这里插入图片描述
在这里插入图片描述

仅仅是传递给 dispatcher,因此实际的处理是由 dispatcher 完成的。

四、Consumer整体流程UML图

在这里插入图片描述
在这里插入图片描述

五、总结

5.1 Pulsar 消费者订阅 Topic 的流程

5.1.1 Client 端

  • 建立连接
    • 客户端(Client)首先与 Broker 建立连接。这一部分和生产者的连接过程是通用的。
  • 发送 Subscribe 命令
    • 连接成功后,在回调中客户端会发送一个 subscribe 命令给 Broker,注册自己为该 Topic 的消费者。
  • 发送 Flow 请求
    • 注册成功后,客户端会发送一个 Flow 请求,携带 permits 参数。permits 的值通常是内部缓冲区大小的一半(对于无缓冲区的零队列消费者例外)。这个请求告知 Broker 客户端可以接收的消息数量。

5.1.2 Broker 端

  • 处理 Subscribe 命令
    • Broker 接收到 subscribe 命令后,在对应的 Topic 中创建或查找已有的 Subscription。
    • 每个 Topic 可以有多个订阅(Subscription),每个订阅可以对应多个消费者(Consumer)。
  • 创建 Dispatcher
    • 根据消费者的订阅类型(如 ExclusiveFailoverShared 等),Broker 会创建相应的 Dispatcher 实例。
    • Dispatcher 负责管理和分发消息给消费者。
  • 维护 Cursor
    • 订阅(Subscription)主要负责维护消费进度(Cursor)。对于持久化订阅(PersistentSubscription),会创建一个持久化的 cursor;对于非持久化订阅(NonPersistentSubscription),则在内存中管理消费进度。
  • 处理 Flow 请求
    • 当 Broker 收到 Flow 请求时,它会根据 permits 的值调整 Dispatcher 的行为,确保按照客户端的需求发送消息。

5.2 Push vs Pull 模型

5.2.1 Kafka 的 Pull 模型

  • FETCH 请求:
    • Kafka 的消费者通过发送 FETCH 请求给 Broker 来拉取消息。
    • Broker 在 FETCH 响应中返回读取的消息。
    • 流量控制由客户端(consumer)发起,即客户端决定何时拉取消息以及拉取多少消息。

5.2.2 Pulsar 的 Push 模型

  • Flow 请求:
    • Pulsar 的消费者发送 Flow 请求,告知 Broker 自己可以缓存多少条消息。
    • Broker 根据 Flow 请求中的 permits 参数灵活定制 Dispatcher 的行为,并主动推送消息给客户端。
    • 流量控制由服务端(Broker)进行,即 Broker 决定何时推送消息以及推送多少消息。

5.2.3 Kafka与Pulsar消费者模型对比

  • Kafka:采用 Pull 模型,流量控制由客户端发起,客户端通过 FETCH 请求拉取消息。
  • Pulsar:采用 Push 模型,流量控制由服务端(Broker)进行,客户端通过 Flow 请求告知 Broker 缓存能力,Broker 主动推送消息。

这种设计使得 Pulsar 的 Push 模型能够更好地适应高吞吐量和低延迟的场景,因为服务端可以根据客户端的反馈灵活调整消息推送策略。


欢迎大家关注我的公众号【老周聊架构】,AI、大数据、云原生、物联网等相关领域的技术知识分享。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-06-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 老周聊架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、前言
  • 二、Consumer测试类
  • 三、Pulsar Consumer 实例化与消费数据
  • 四、Consumer整体流程UML图
  • 五、总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档