前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊rocketmq5的PushConsumer

聊聊rocketmq5的PushConsumer

作者头像
code4it
发布2024-08-12 14:54:12
1100
发布2024-08-12 14:54:12
举报
文章被收录于专栏:码匠的流水账

本文主要研究一下rocketmq5的PushConsumer的消费逻辑

PushConsumerImpl

org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java

代码语言:javascript
复制
class PushConsumerImpl extends ConsumerImpl implements PushConsumer {

	//......

    protected void startUp() throws Exception {
        try {
            log.info("Begin to start the rocketmq push consumer, clientId={}", clientId);
            GaugeObserver gaugeObserver = new ProcessQueueGaugeObserver(processQueueTable, clientId, consumerGroup);
            this.clientMeterManager.setGaugeObserver(gaugeObserver);
            super.startUp();
            final ScheduledExecutorService scheduler = this.getClientManager().getScheduler();
            this.consumeService = createConsumeService();
            // Scan assignments periodically.
            scanAssignmentsFuture = scheduler.scheduleWithFixedDelay(() -> {
                try {
                    scanAssignments();
                } catch (Throwable t) {
                    log.error("Exception raised while scanning the load assignments, clientId={}", clientId, t);
                }
            }, 1, 5, TimeUnit.SECONDS);
            log.info("The rocketmq push consumer starts successfully, clientId={}", clientId);
        } catch (Throwable t) {
            log.error("Exception raised while starting the rocketmq push consumer, clientId={}", clientId, t);
            shutDown();
            throw t;
        }
    }

	//......

}

PushConsumerImpl实现了AbstractIdleService的startUp方法,该方法会每隔5秒定时调度执行scanAssignments

scanAssignments

代码语言:javascript
复制
    @VisibleForTesting
    void scanAssignments() {
        try {
            log.debug("Start to scan assignments periodically, clientId={}", clientId);
            for (Map.Entry<String, FilterExpression> entry : subscriptionExpressions.entrySet()) {
                final String topic = entry.getKey();
                final FilterExpression filterExpression = entry.getValue();
                final Assignments existed = cacheAssignments.get(topic);
                final ListenableFuture<Assignments> future = queryAssignment(topic);
                Futures.addCallback(future, new FutureCallback<Assignments>() {
                    @Override
                    public void onSuccess(Assignments latest) {
                        if (latest.getAssignmentList().isEmpty()) {
                            if (null == existed || existed.getAssignmentList().isEmpty()) {
                                log.info("Acquired empty assignments from remote, would scan later, topic={}, "
                                    + "clientId={}", topic, clientId);
                                return;
                            }
                            log.info("Attention!!! acquired empty assignments from remote, but existed assignments"
                                + " is not empty, topic={}, clientId={}", topic, clientId);
                        }

                        if (!latest.equals(existed)) {
                            log.info("Assignments of topic={} has changed, {} => {}, clientId={}", topic, existed,
                                latest, clientId);
                            syncProcessQueue(topic, latest, filterExpression);
                            cacheAssignments.put(topic, latest);
                            return;
                        }
                        log.debug("Assignments of topic={} remains the same, assignments={}, clientId={}", topic,
                            existed, clientId);
                        // Process queue may be dropped, need to be synchronized anyway.
                        syncProcessQueue(topic, latest, filterExpression);
                    }

                    @Override
                    public void onFailure(Throwable t) {
                        log.error("Exception raised while scanning the assignments, topic={}, clientId={}", topic,
                            clientId, t);
                    }
                }, MoreExecutors.directExecutor());
            }
        } catch (Throwable t) {
            log.error("Exception raised while scanning the assignments for all topics, clientId={}", clientId, t);
        }
    }

scanAssignments通过queryAssignment(topic)查询Assignments,然后执行syncProcessQueue

syncProcessQueue

代码语言:javascript
复制
    void syncProcessQueue(String topic, Assignments assignments, FilterExpression filterExpression) {
        Set<MessageQueueImpl> latest = new HashSet<>();

        final List<Assignment> assignmentList = assignments.getAssignmentList();
        for (Assignment assignment : assignmentList) {
            latest.add(assignment.getMessageQueue());
        }

        Set<MessageQueueImpl> activeMqs = new HashSet<>();

        for (Map.Entry<MessageQueueImpl, ProcessQueue> entry : processQueueTable.entrySet()) {
            final MessageQueueImpl mq = entry.getKey();
            final ProcessQueue pq = entry.getValue();
            if (!topic.equals(mq.getTopic())) {
                continue;
            }

            if (!latest.contains(mq)) {
                log.info("Drop message queue according to the latest assignmentList, mq={}, clientId={}", mq,
                    clientId);
                dropProcessQueue(mq);
                continue;
            }

            if (pq.expired()) {
                log.warn("Drop message queue because it is expired, mq={}, clientId={}", mq, clientId);
                dropProcessQueue(mq);
                continue;
            }
            activeMqs.add(mq);
        }

        for (MessageQueueImpl mq : latest) {
            if (activeMqs.contains(mq)) {
                continue;
            }
            final Optional<ProcessQueue> optionalProcessQueue = createProcessQueue(mq, filterExpression);
            if (optionalProcessQueue.isPresent()) {
                log.info("Start to fetch message from remote, mq={}, clientId={}", mq, clientId);
                optionalProcessQueue.get().fetchMessageImmediately();
            }
        }
    }

syncProcessQueue方法获取维护最新的以及已有的MessageQueueImpl,然后进行遍历,对于新的执行createProcessQueue及其fetchMessageImmediately方法

createProcessQueue

代码语言:javascript
复制
    protected Optional<ProcessQueue> createProcessQueue(MessageQueueImpl mq, final FilterExpression filterExpression) {
        final ProcessQueueImpl processQueue = new ProcessQueueImpl(this, mq, filterExpression);
        final ProcessQueue previous = processQueueTable.putIfAbsent(mq, processQueue);
        if (null != previous) {
            return Optional.empty();
        }
        return Optional.of(processQueue);
    }

createProcessQueue会创建ProcessQueueImpl,并维护到processQueueTable中

ProcessQueueImpl

org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java

代码语言:javascript
复制
    public void fetchMessageImmediately() {
        receiveMessageImmediately();
    }

    private void receiveMessageImmediately() {
        receiveMessageImmediately(this.generateAttemptId());
    }

    private void receiveMessageImmediately(String attemptId) {
        final ClientId clientId = consumer.getClientId();
        if (!consumer.isRunning()) {
            log.info("Stop to receive message because consumer is not running, mq={}, clientId={}", mq, clientId);
            return;
        }
        try {
            final Endpoints endpoints = mq.getBroker().getEndpoints();
            final int batchSize = this.getReceptionBatchSize();
            final Duration longPollingTimeout = consumer.getPushConsumerSettings().getLongPollingTimeout();
            final ReceiveMessageRequest request = consumer.wrapReceiveMessageRequest(batchSize, mq, filterExpression,
                longPollingTimeout, attemptId);
            activityNanoTime = System.nanoTime();

            // Intercept before message reception.
            final MessageInterceptorContextImpl context = new MessageInterceptorContextImpl(MessageHookPoints.RECEIVE);
            consumer.doBefore(context, Collections.emptyList());

            final ListenableFuture<ReceiveMessageResult> future = consumer.receiveMessage(request, mq,
                longPollingTimeout);
            Futures.addCallback(future, new FutureCallback<ReceiveMessageResult>() {
                @Override
                public void onSuccess(ReceiveMessageResult result) {
                    // Intercept after message reception.
                    final List<GeneralMessage> generalMessages = result.getMessageViewImpls().stream()
                        .map((Function<MessageView, GeneralMessage>) GeneralMessageImpl::new)
                        .collect(Collectors.toList());
                    final MessageInterceptorContextImpl context0 =
                        new MessageInterceptorContextImpl(context, MessageHookPointsStatus.OK);
                    consumer.doAfter(context0, generalMessages);

                    try {
                        onReceiveMessageResult(result);
                    } catch (Throwable t) {
                        // Should never reach here.
                        log.error("[Bug] Exception raised while handling receive result, mq={}, endpoints={}, "
                            + "clientId={}", mq, endpoints, clientId, t);
                        onReceiveMessageException(t, attemptId);
                    }
                }

                @Override
                public void onFailure(Throwable t) {
                    String nextAttemptId = null;
                    if (t instanceof StatusRuntimeException) {
                        StatusRuntimeException exception = (StatusRuntimeException) t;
                        if (org.apache.rocketmq.shaded.io.grpc.Status.DEADLINE_EXCEEDED.getCode() == exception.getStatus().getCode()) {
                            nextAttemptId = request.getAttemptId();
                        }
                    }
                    // Intercept after message reception.
                    final MessageInterceptorContextImpl context0 =
                        new MessageInterceptorContextImpl(context, MessageHookPointsStatus.ERROR);
                    consumer.doAfter(context0, Collections.emptyList());

                    log.error("Exception raised during message reception, mq={}, endpoints={}, attemptId={}, " +
                            "nextAttemptId={}, clientId={}", mq, endpoints, request.getAttemptId(), nextAttemptId,
                        clientId, t);
                    onReceiveMessageException(t, nextAttemptId);
                }
            }, MoreExecutors.directExecutor());
            receptionTimes.getAndIncrement();
            consumer.getReceptionTimes().getAndIncrement();
        } catch (Throwable t) {
            log.error("Exception raised during message reception, mq={}, clientId={}", mq, clientId, t);
            onReceiveMessageException(t, attemptId);
        }
    }        

receiveMessageImmediately方法执行consumer.receiveMessage,成功时执行onReceiveMessageResult,异常时执行onReceiveMessageException

onReceiveMessageResult

代码语言:javascript
复制
    private void onReceiveMessageResult(ReceiveMessageResult result) {
        final List<MessageViewImpl> messages = result.getMessageViewImpls();
        if (!messages.isEmpty()) {
            cacheMessages(messages);
            receivedMessagesQuantity.getAndAdd(messages.size());
            consumer.getReceivedMessagesQuantity().getAndAdd(messages.size());
            consumer.getConsumeService().consume(this, messages);
        }
        receiveMessage();
    }

    public void receiveMessage() {
        receiveMessage(this.generateAttemptId());
    }

    public void receiveMessage(String attemptId) {
        final ClientId clientId = consumer.getClientId();
        if (dropped) {
            log.info("Process queue has been dropped, no longer receive message, mq={}, clientId={}", mq, clientId);
            return;
        }
        if (this.isCacheFull()) {
            log.warn("Process queue cache is full, would receive message later, mq={}, clientId={}", mq, clientId);
            receiveMessageLater(RECEIVING_BACKOFF_DELAY_WHEN_CACHE_IS_FULL, attemptId);
            return;
        }
        receiveMessageImmediately(attemptId);
    }    

onReceiveMessageResult执行consumer.getConsumeService().consume(this, messages),最后再执行receiveMessage,它会判断isCacheFull,为true则返回,否则再次出发receiveMessageImmediately

onReceiveMessageException

代码语言:javascript
复制
    public void onReceiveMessageException(Throwable t, String attemptId) {
        Duration delay = t instanceof TooManyRequestsException ? RECEIVING_FLOW_CONTROL_BACKOFF_DELAY :
            RECEIVING_FAILURE_BACKOFF_DELAY;
        receiveMessageLater(delay, attemptId);
    }

    private void receiveMessageLater(Duration delay, String attemptId) {
        final ClientId clientId = consumer.getClientId();
        final ScheduledExecutorService scheduler = consumer.getScheduler();
        try {
            log.info("Try to receive message later, mq={}, delay={}, clientId={}", mq, delay, clientId);
            scheduler.schedule(() -> receiveMessage(attemptId), delay.toNanos(), TimeUnit.NANOSECONDS);
        } catch (Throwable t) {
            if (scheduler.isShutdown()) {
                return;
            }
            // Should never reach here.
            log.error("[Bug] Failed to schedule message receiving request, mq={}, clientId={}", mq, clientId, t);
            onReceiveMessageException(t, attemptId);
        }
    }    

onReceiveMessageException会判断是不是TooManyRequestsException异常,是则delay取RECEIVING_FLOW_CONTROL_BACKOFF_DELAY(Duration.ofMillis(20)),否则取RECEIVING_FAILURE_BACKOFF_DELAY(Duration.ofSeconds(1)),最后执行receiveMessageLater,它会延时调度执行receiveMessage(attemptId)

ConsumeService

org/apache/rocketmq/client/java/impl/consumer/ConsumeService.java

代码语言:javascript
复制
public abstract class ConsumeService {
    private static final Logger log = LoggerFactory.getLogger(ConsumeService.class);

    protected final ClientId clientId;
    private final MessageListener messageListener;
    private final ThreadPoolExecutor consumptionExecutor;
    private final MessageInterceptor messageInterceptor;
    private final ScheduledExecutorService scheduler;

    public ConsumeService(ClientId clientId, MessageListener messageListener, ThreadPoolExecutor consumptionExecutor,
        MessageInterceptor messageInterceptor, ScheduledExecutorService scheduler) {
        this.clientId = clientId;
        this.messageListener = messageListener;
        this.consumptionExecutor = consumptionExecutor;
        this.messageInterceptor = messageInterceptor;
        this.scheduler = scheduler;
    }

    public abstract void consume(ProcessQueue pq, List<MessageViewImpl> messageViews);

    public ListenableFuture<ConsumeResult> consume(MessageViewImpl messageView) {
        return consume(messageView, Duration.ZERO);
    }

    public ListenableFuture<ConsumeResult> consume(MessageViewImpl messageView, Duration delay) {
        final ListeningExecutorService executorService = MoreExecutors.listeningDecorator(consumptionExecutor);
        final ConsumeTask task = new ConsumeTask(clientId, messageListener, messageView, messageInterceptor);
        // Consume message with no delay.
        if (Duration.ZERO.compareTo(delay) >= 0) {
            return executorService.submit(task);
        }
        final SettableFuture<ConsumeResult> future0 = SettableFuture.create();
        scheduler.schedule(() -> {
            final ListenableFuture<ConsumeResult> future = executorService.submit(task);
            Futures.addCallback(future, new FutureCallback<ConsumeResult>() {
                @Override
                public void onSuccess(ConsumeResult consumeResult) {
                    future0.set(consumeResult);
                }

                @Override
                public void onFailure(Throwable t) {
                    // Should never reach here.
                    log.error("[Bug] Exception raised while submitting scheduled consumption task, clientId={}",
                        clientId, t);
                }
            }, MoreExecutors.directExecutor());
        }, delay.toNanos(), TimeUnit.NANOSECONDS);
        return future0;
    }
}

ConsumeService是个抽象类,它定义了consume方法接收ProcessQueue及批量messageViews,同时它还内置了consume单个messageView的方法,支持delay参数,该方法主要就是创建ConsumeTask,然后往executorService提交ConsumeTask

ConsumeTask

org/apache/rocketmq/client/java/impl/consumer/ConsumeTask.java

代码语言:javascript
复制
public class ConsumeTask implements Callable<ConsumeResult> {
    private static final Logger log = LoggerFactory.getLogger(ConsumeTask.class);

    private final ClientId clientId;
    private final MessageListener messageListener;
    private final MessageViewImpl messageView;
    private final MessageInterceptor messageInterceptor;

    public ConsumeTask(ClientId clientId, MessageListener messageListener, MessageViewImpl messageView,
        MessageInterceptor messageInterceptor) {
        this.clientId = clientId;
        this.messageListener = messageListener;
        this.messageView = messageView;
        this.messageInterceptor = messageInterceptor;
    }

    /**
     * Invoke {@link MessageListener} to consumer message.
     *
     * @return message(s) which is consumed successfully.
     */
    @Override
    public ConsumeResult call() {
        ConsumeResult consumeResult;
        final List<GeneralMessage> generalMessages = Collections.singletonList(new GeneralMessageImpl(messageView));
        MessageInterceptorContextImpl context = new MessageInterceptorContextImpl(MessageHookPoints.CONSUME);
        messageInterceptor.doBefore(context, generalMessages);
        try {
            consumeResult = messageListener.consume(messageView);
        } catch (Throwable t) {
            log.error("Message listener raised an exception while consuming messages, clientId={}, mq={}, " +
                "messageId={}", clientId, messageView.getMessageQueue(), messageView.getMessageId(), t);
            // If exception was thrown during the period of message consumption, mark it as failure.
            consumeResult = ConsumeResult.FAILURE;
        }
        MessageHookPointsStatus status = ConsumeResult.SUCCESS.equals(consumeResult) ? MessageHookPointsStatus.OK :
            MessageHookPointsStatus.ERROR;
        context = new MessageInterceptorContextImpl(context, status);
        messageInterceptor.doAfter(context, generalMessages);
        // Make sure that the return value is the subset of messageViews.
        return consumeResult;
    }
}

ConsumeTask实现了Callable接口,其call方法主要是执行messageListener.consume(messageView),同时处理messageInterceptor的doBefore及doAfter

StandardConsumeService

org/apache/rocketmq/client/java/impl/consumer/StandardConsumeService.java

代码语言:javascript
复制
public class StandardConsumeService extends ConsumeService {
    private static final Logger log = LoggerFactory.getLogger(StandardConsumeService.class);

    public StandardConsumeService(ClientId clientId, MessageListener messageListener,
        ThreadPoolExecutor consumptionExecutor, MessageInterceptor messageInterceptor,
        ScheduledExecutorService scheduler) {
        super(clientId, messageListener, consumptionExecutor, messageInterceptor, scheduler);
    }

    @Override
    public void consume(ProcessQueue pq, List<MessageViewImpl> messageViews) {
        for (MessageViewImpl messageView : messageViews) {
            // Discard corrupted message.
            if (messageView.isCorrupted()) {
                log.error("Message is corrupted for standard consumption, prepare to discard it, mq={}, "
                    + "messageId={}, clientId={}", pq.getMessageQueue(), messageView.getMessageId(), clientId);
                pq.discardMessage(messageView);
                continue;
            }
            final ListenableFuture<ConsumeResult> future = consume(messageView);
            Futures.addCallback(future, new FutureCallback<ConsumeResult>() {
                @Override
                public void onSuccess(ConsumeResult consumeResult) {
                    pq.eraseMessage(messageView, consumeResult);
                }

                @Override
                public void onFailure(Throwable t) {
                    // Should never reach here.
                    log.error("[Bug] Exception raised in consumption callback, clientId={}", clientId, t);
                }
            }, MoreExecutors.directExecutor());
        }
    }
}

StandardConsumeService继承了ConsumeService,其consume方法遍历messageViews,挨个执行父类的consume方法(创建ConsumeTask,然后提交到executorService执行),之后针对该future注册FutureCallback,onSuccess的时候执行pq.eraseMessage(messageView, consumeResult)

FifoConsumeService

org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java

代码语言:javascript
复制
class FifoConsumeService extends ConsumeService {
    private static final Logger log = LoggerFactory.getLogger(FifoConsumeService.class);

    public FifoConsumeService(ClientId clientId, MessageListener messageListener,
        ThreadPoolExecutor consumptionExecutor, MessageInterceptor messageInterceptor,
        ScheduledExecutorService scheduler) {
        super(clientId, messageListener, consumptionExecutor, messageInterceptor, scheduler);
    }

    @Override
    public void consume(ProcessQueue pq, List<MessageViewImpl> messageViews) {
        consumeIteratively(pq, messageViews.iterator());
    }

    public void consumeIteratively(ProcessQueue pq, Iterator<MessageViewImpl> iterator) {
        if (!iterator.hasNext()) {
            return;
        }
        final MessageViewImpl messageView = iterator.next();
        if (messageView.isCorrupted()) {
            // Discard corrupted message.
            log.error("Message is corrupted for FIFO consumption, prepare to discard it, mq={}, messageId={}, "
                + "clientId={}", pq.getMessageQueue(), messageView.getMessageId(), clientId);
            pq.discardFifoMessage(messageView);
            consumeIteratively(pq, iterator);
            return;
        }
        final ListenableFuture<ConsumeResult> future0 = consume(messageView);
        ListenableFuture<Void> future = Futures.transformAsync(future0, result -> pq.eraseFifoMessage(messageView,
            result), MoreExecutors.directExecutor());
        future.addListener(() -> consumeIteratively(pq, iterator), MoreExecutors.directExecutor());
    }
}

FifoConsumeService继承了ConsumeService,其consume方法获取messageViews的iterator,然后执行consumeIteratively,该方法通过iterator.next()获取一条记录,然后调用父类的consume方法(创建ConsumeTask,然后提交到executorService执行),之后针对该future0注册了一个function执行pq.eraseMessage(messageView, consumeResult),同时它对这个新future增加了listener在其执行完毕之后继续执行consumeIteratively处理下一个消息

小结

  • rocketmq5的PushConsumerImpl实现了AbstractIdleService的startUp方法,该方法会每隔5秒定时调度执行scanAssignments;scanAssignments通过queryAssignment(topic)查询Assignments,然后执行syncProcessQueue;syncProcessQueue方法获取维护最新的以及已有的MessageQueueImpl,然后进行遍历,对于新的执行createProcessQueue及其fetchMessageImmediately方法
  • receiveMessageImmediately方法执行consumer.receiveMessage,成功时执行onReceiveMessageResult,异常时执行onReceiveMessageException;onReceiveMessageResult执行consumer.getConsumeService().consume(this, messages),最后再执行receiveMessage,它会判断isCacheFull,为true则返回,否则再次出发receiveMessageImmediately;onReceiveMessageException会判断是不是TooManyRequestsException异常,是则delay取RECEIVING_FLOW_CONTROL_BACKOFF_DELAY(Duration.ofMillis(20)),否则取RECEIVING_FAILURE_BACKOFF_DELAY(Duration.ofSeconds(1)),最后执行receiveMessageLater,它会延时调度执行receiveMessage(attemptId)
  • ConsumeService是个抽象类,它定义了consume方法接收ProcessQueue及批量messageViews,同时它还内置了consume单个messageView的方法,支持delay参数,该方法主要就是创建ConsumeTask,然后往executorService提交ConsumeTask;ConsumeTask实现了Callable接口,其call方法主要是执行messageListener.consume(messageView),同时处理messageInterceptor的doBefore及doAfter;StandardConsumeService是遍历messageViews,挨个执行父类的consume方法,同时对该future注册FutureCallback,onSuccess的时候执行pq.eraseMessage(messageView, consumeResult);FifoConsumeService则是利用iterator以及listener通过递归调用consumeIteratively实现消息的顺序消费
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2024-08-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • PushConsumerImpl
    • scanAssignments
      • syncProcessQueue
        • createProcessQueue
        • ProcessQueueImpl
          • onReceiveMessageResult
            • onReceiveMessageException
            • ConsumeService
              • ConsumeTask
                • StandardConsumeService
                  • FifoConsumeService
                  • 小结
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档