前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ActiveMQ源码分析——消费消息

ActiveMQ源码分析——消费消息

作者头像
歪歪梯
发布2020-06-19 16:19:57
1.8K0
发布2020-06-19 16:19:57
举报
文章被收录于专栏:歪歪梯Club

分析结果

请先查看上一篇分析生产消息源码的博客之后再查看本篇 先看看本博客把consumer端分析后完整的activemq流程图

activemq完整流程

程序代码

前面分析了一篇博客关于producer如何生产消息:activemq源码笔记(一),最终还是没有找到与ack相关的内容,因为ack的提交逻辑主要在消费者。本篇博客继续跟踪消费者消费消息的源码。 先看看代码

代码语言:javascript
复制
         //1、创建工厂连接对象,需要制定ip和端口号
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");
        //2、使用连接工厂创建一个连接对象
        Connection connection = connectionFactory.createConnection();
        //3、开启连接
        connection.start();
        //4、使用连接对象创建会话(session)对象
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
        Queue queue = session.createQueue("test-queue");
        //使用session创建到达queue的consumer
        MessageConsumer consumer = session.createConsumer(queue);
                //为consumer添加消息处理方法---异步等待服务器推
        consumer.setMessageListener((message)->{
            TextMessage textMessage = (TextMessage) message;
            try {
                System.out.println(textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        });
        Message msg = consumer.receive();//同步,消费者主动拉
        session.close();
        connection.close();

前面关于到创建queue的都跟踪过来,接下来看看createConsumer方法,其使用session的id与内部的consume序列号生成器(long类型自增)生成一个ConsumerId对象

createConsumer

代码语言:javascript
复制
    public MessageConsumer createConsumer(Destination destination) throws JMSException {
        return this.createConsumer(destination, (String)null);
    }
    public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
        return this.createConsumer(destination, messageSelector, false);
    }
    public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException {
        return this.createConsumer(destination, messageSelector, noLocal, (MessageListener)null);
    }
    public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal, MessageListener messageListener) throws JMSException {
        this.checkClosed();
        if (destination instanceof CustomDestination) {
            CustomDestination customDestination = (CustomDestination)destination;
            return customDestination.createConsumer(this, messageSelector, noLocal);
        } else {
            ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
            int prefetch = false;
            int prefetch;
            if (destination instanceof Topic) {
                prefetch = prefetchPolicy.getTopicPrefetch();
            } else {
                prefetch = prefetchPolicy.getQueuePrefetch();
            }

            ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination);
            return new ActiveMQMessageConsumer(this, this.getNextConsumerId(), activemqDestination, (String)null, messageSelector, prefetch, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, this.isAsyncDispatch(), messageListener);
        }
    }

    protected ConsumerId getNextConsumerId() {
        return new ConsumerId(this.info.getSessionId(), this.consumerIdGenerator.getNextSequenceId());
    }

最终构造一个ActiveMQMessageConsumer的对象

代码语言:javascript
复制
    //ActiveMQMessageConsumer.class
     public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest, String name, String selector, int prefetch, int maximumPendingMessageCount, boolean noLocal, boolean browser, boolean dispatchAsync, MessageListener messageListener) throws JMSException {
            //省略了一些异常代码

            this.session = session;
            this.redeliveryPolicy = session.connection.getRedeliveryPolicyMap().getEntryFor(dest);
            if (this.redeliveryPolicy == null) {
                this.redeliveryPolicy = new RedeliveryPolicy();
            }
            //省略部分此处不需要关注代码
            this.info = new ConsumerInfo(consumerId);
            this.info.setClientId(this.session.connection.getClientID());
            this.info.setDestination(dest);
            this.optimizeAcknowledge = session.connection.isOptimizeAcknowledge() && session.isAutoAcknowledge() && !this.info.isBrowser();
            this.info.setOptimizedAcknowledge(this.optimizeAcknowledge);
            if (messageListener != null) {
                this.setMessageListener(messageListener);
            }
            this.session.addConsumer(this);
            this.session.syncSendPacket(this.info);
            if (session.connection.isStarted()) {
                this.start();
            }
    }
代码语言:javascript
复制
    //ActiveMQSession.class
    protected void addConsumer(ActiveMQMessageConsumer consumer) throws JMSException {
        this.consumers.add(consumer);
        if (consumer.isDurableSubscriber()) {
            this.stats.onCreateDurableSubscriber();
        }

        this.connection.addDispatcher(consumer.getConsumerId(), this);
    }

可以看到主要是一些参数的绑定,将ActiveMQSession聚合到了这个ActiveMQMessageConsumer里面,然后又反过来把这个Consumer添加到ActiveMQSession的Consumer集合里(继续吐槽,又开始整循环引用),此处添加Consumer方法中还将这个consumer的id与当前session的对应关系作为一个Dispatcher加入ActiveMQConnection对象的Dispatcher集合,也就是说ActiveMQSession也是一个ActiveMQDispatcher

代码语言:javascript
复制
    //ActiveMQConnection.class
    public void addDispatcher(ConsumerId consumerId, ActiveMQDispatcher dispatcher) {
        this.dispatchers.put(consumerId, dispatcher);
    }

最终调用ActiveMQMessageConsumer的start方法

ActiveMQMessageConsumer.start

代码语言:javascript
复制
    public void start() throws JMSException {
        if (!this.unconsumedMessages.isClosed()) {
            this.started.set(true);
            this.unconsumedMessages.start();
            this.session.executor.wakeup();
        }
    }

在start方法里,先是调用了unconsumedMessages的start再调用session的executor的wakeup,unconsumedMessages中没有开启线程先看看executor

代码语言:javascript
复制
    public void wakeup() {
        if (!this.dispatchedBySessionPool) {
            if (this.session.isSessionAsyncDispatch()) {
                try {
                    TaskRunner taskRunner = this.taskRunner;
                    if (taskRunner == null) {
                        synchronized(this) {
                            if (this.taskRunner == null) {
                                if (!this.isRunning()) {
                                    return;
                                }

                                this.taskRunner = this.session.connection.getSessionTaskRunner().createTaskRunner(this, "ActiveMQ Session: " + this.session.getSessionId());
                            }

                            taskRunner = this.taskRunner;
                        }
                    }

                    taskRunner.wakeup();
                } catch (InterruptedException var5) {
                    Thread.currentThread().interrupt();
                }
            } else {
                while(true) {
                    if (this.iterate()) {
                        continue;
                    }
                }
            }
        }

    }

executor的方法中判断当前是否以及有创建TaskRunner,没有会去创建一个TaskRunner和调用其wakeup方法,再来看看 TaskRunner

代码语言:javascript
复制
    public TaskRunner createTaskRunner(Task task, String name) {
        this.init();
        ExecutorService executor = (ExecutorService)this.executorRef.get();
        return (TaskRunner)(executor != null ? new PooledTaskRunner(executor, task, this.maxIterationsPerRun) : new DedicatedTaskRunner(task, name, this.priority, this.daemon));
    }

最终使用传进来的ActiveMQSessionExecutor对象作为Task去创建一个TaskRunner,

代码语言:javascript
复制
    //DedicatedTaskRunner.class
    public DedicatedTaskRunner(final Task task, String name, int priority, boolean daemon) {
        this.task = task;
        this.thread = new Thread(name) {
            public void run() {
                try {
                    DedicatedTaskRunner.this.runTask();
                } finally {
                    DedicatedTaskRunner.LOG.trace("Run task done: {}", task);
                }

            }
        };
        this.thread.setDaemon(daemon);
        this.thread.setName(name);
        this.thread.setPriority(priority);
        this.thread.start();
    }

task对象有一个线程成员,在构造时开启了运行,需要留意runTask及wakeup方法

代码语言:javascript
复制
    public void wakeup() throws InterruptedException {
        synchronized(this.mutex) {
            if (!this.shutdown) {
                this.pending = true;
                this.mutex.notifyAll();
            }
        }
    }
    final void runTask() {
            //为了方便省略部分处理代码
            while(true) {
                synchronized(this.mutex) {
                   this.pending = false;
                   if (this.shutdown) {
                       break;
                   }
                }
                if (this.task.iterate()) {
                    continue;
                }
                synchronized(this.mutex) {
                   if (!this.shutdown) {
                      while(true) {
                         if (this.pending) {
                              continue;
                         }
                         this.mutex.wait();
                      }
                    }
                }
         }
    }

ActiveMQSessionExecutor.iterate

主要逻辑就是,循环去调用task的iterate方法,如果成功就继续,失败就进入阻塞等待直到其pengind标志被取消(被调用了wakeup方法),继续进入下一轮的循环调用task的iterate方法,所以主要的业务处理应该就在iterate中,前面讲到task是ActiveMQSessionExecutor,查看其源码

代码语言:javascript
复制
    //ActiveMQSessionExecutor.class
    public boolean iterate() {
        Iterator var1 = this.session.consumers.iterator();
        ActiveMQMessageConsumer consumer;
        do {
            if (!var1.hasNext()) {
                MessageDispatch message = this.messageQueue.dequeueNoWait();
                if (message == null) {
                    return false;
                }
                this.dispatch(message);
                return !this.messageQueue.isEmpty();
            }
            consumer = (ActiveMQMessageConsumer)var1.next();
        } while(!consumer.iterate());
        return true;
    }
    void dispatch(MessageDispatch message) {
        Iterator var2 = this.session.consumers.iterator();
        while(var2.hasNext()) {
            ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)var2.next();
            ConsumerId consumerId = message.getConsumerId();
            if (consumerId.equals(consumer.getConsumerId())) {
                consumer.dispatch(message);
                break;
            }
        }
代码语言:javascript
复制
    //ActiveMQMessageConsumer.class
    public boolean iterate() {
        MessageListener listener = (MessageListener)this.messageListener.get();
        if (listener != null) {
            MessageDispatch md = this.unconsumedMessages.dequeueNoWait();
            if (md != null) {
                this.dispatch(md);
                return true;
            }
        }
        return false;
    }

可以看到逻辑是,先判断当前session是否有注册消费者,有注册消费者则迭代判断每个消费者是否有注册Listener(异步等待消息),如果有注册Listener并且当前刚好取得到消息,就调用consumer的dispatch由消费者主动去转发消息。如果没有,就dequeue,如果刚好有消息就调用executor的dispatch去转发消息(最终是去迭代是否有注册消费者使用消费者来转发消息),没有则继续挂起等待有人继续调用wakeup修改pending再继续循环。Consumer拿到MessageDispatch调用自己的disptach方法进行消费,这个我们后面再讲,先不展开。 前面这里讲了这么久都是对于已经有消息在队列,而直接dequeue的,那么消息是什么时候入队的呢? 这里得回忆前面那篇博客,我们讲到了session启动后,会开启tcpTransport的线程接收消息,最终回调是到ActiveMQConnection的onCommand方法

onCommand

代码语言:javascript
复制
    //ActiveMQConnection.class
    public void onCommand(Object o) {
        final Command command = (Command)o;
        if (!this.closed.get() && command != null) {
            try {
                command.visit(new CommandVisitorAdapter() {
                    public Response processMessageDispatch(MessageDispatch md) throws Exception {
                        ActiveMQConnection.this.waitForTransportInterruptionProcessingToComplete();
                        ActiveMQDispatcher dispatcher = (ActiveMQDispatcher)ActiveMQConnection.this.dispatchers.get(md.getConsumerId());
                        if (dispatcher != null) {
                            Message msg = md.getMessage();
                            if (msg != null) {
                                msg = msg.copy();
                                msg.setReadOnlyBody(true);
                                msg.setReadOnlyProperties(true);
                                msg.setRedeliveryCounter(md.getRedeliveryCounter());
                                msg.setConnection(ActiveMQConnection.this);
                                msg.setMemoryUsage((MemoryUsage)null);
                                md.setMessage(msg);
                            }

                            dispatcher.dispatch(md);
                        } else {
                            ActiveMQConnection.LOG.debug("{} no dispatcher for {} in {}", new Object[]{this, md, ActiveMQConnection.this.dispatchers});
                        }

                        return null;
                    }
                    //省略部分代码
                });
            } catch (Exception var5) {
                this.onClientInternalException(var5);
            }
        }

        Iterator iter = this.transportListeners.iterator();

        while(iter.hasNext()) {
            TransportListener listener = (TransportListener)iter.next();
            listener.onCommand(command);
        }

    }

补充上次的逻辑流程图

activeMQ发送流程

MessageDispatch

查看Command实现类有一个MessageDispatch,代表要转发的消息(被订阅的queue或者topic产生的消息),查看其visit方法果然是调用CommandVisitor的processMessageDispatch方法处理消息给Consumer

代码语言:javascript
复制
    //MessageDispatch.class
    public Response visit(CommandVisitor visitor) throws Exception {
        return visitor.processMessageDispatch(this);
    }

根据前面代码,在处理好等到传输完成,封装好消息对象后,会从Session获取该消息的订阅者Dispatch进行转发

代码语言:javascript
复制
     ActiveMQDispatcher dispatcher = (ActiveMQDispatcher)ActiveMQConnection.this.dispatchers.get(md.getConsumerId());
    Message msg = md.getMessage();
    if (msg != null) {
        msg = msg.copy();
        msg.setReadOnlyBody(true);
        msg.setReadOnlyProperties(true);
        msg.setRedeliveryCounter(md.getRedeliveryCounter());
        msg.setConnection(ActiveMQConnection.this);
        msg.setMemoryUsage((MemoryUsage)null);
        md.setMessage(msg);
    }
    dispatcher.dispatch(md);
 ```
 前面我们已经讲过,消息对应的ActiveMQDispatcher 就是与他相关的ActiveMQSession对象,所以查看其转发源码
代码语言:javascript
复制
//ActiveMQSession.class
 public void dispatch(MessageDispatch messageDispatch) {
        try {
            this.executor.execute(messageDispatch);
        } catch (InterruptedException var3) {
            Thread.currentThread().interrupt();
            this.connection.onClientInternalException(var3);
        }
    }

ActiveMQSession会调用自己的executor的execute方法去处理这个消息 //ActiveMQSessionExecutor.class void execute(MessageDispatch message) throws InterruptedException { //省略部分处理连接未启动的异常代码 if (!this.session.isSessionAsyncDispatch() && !this.dispatchedBySessionPool) { this.dispatch(message); } else { this.messageQueue.enqueue(message); this.wakeup(); } } }

代码语言:javascript
复制
此处根据是配置了是异步处理消息还是同步处理,同步的情况下会直接调用dispatch,异步是把消息先入队,调用wakeup唤醒,前面讲过调用wakeup后会改变pending进而继续运行调用iterate方法去从本地队列取出消息后,再调用dispatch方法去处理消息。
在executor的dispatch方法中,回去从session里拿到Consumer,调用consumer自己的dispatch方法去处理
## ActiveMQMessageConsumer.dispatch
代码语言:javascript
复制
//ActiveMQMessageConsumer.class
public void dispatch(MessageDispatch md) {
    MessageListener listener = (MessageListener)this.messageListener.get();
    try {
        this.clearMessagesInProgress();
        this.clearDeliveredList();
        synchronized(this.unconsumedMessages.getMutex()) {
            if (!this.unconsumedMessages.isClosed()) {
                if (!this.info.isBrowser() && this.session.connection.isDuplicate(this, md.getMessage())) {
                    if (this.redeliveryExpectedInCurrentTransaction(md, true)) {
                        LOG.debug("{} tracking transacted redelivery {}", this.getConsumerId(), md.getMessage());
                        if (this.transactedIndividualAck) {
                            this.immediateIndividualTransactedAck(md);
                        } else {
                            this.session.sendAck(new MessageAck(md, (byte)0, 1));
                        }
                    } else {
                        ConsumerId consumerWithPendingTransaction;
                        if ((consumerWithPendingTransaction = this.redeliveryPendingInCompetingTransaction(md)) != null) {
                            LOG.warn("{} delivering duplicate {}, pending transaction completion on {} will rollback", new Object[]{this.getConsumerId(), md.getMessage(), consumerWithPendingTransaction});
                            this.session.getConnection().rollbackDuplicate(this, md.getMessage());
                            this.dispatch(md);
                        } else {
                            LOG.warn("{} suppressing duplicate delivery on connection, poison acking: {}", this.getConsumerId(), md);
                            this.posionAck(md, "Suppressing duplicate delivery on connection, consumer " + this.getConsumerId());
                        }
                    }
                } else if (listener != null && this.unconsumedMessages.isRunning()) {
                    if (this.redeliveryExceeded(md)) {
                        this.posionAck(md, "listener dispatch[" + md.getRedeliveryCounter() + "] to " + this.getConsumerId() + " exceeds redelivery policy limit:" + this.redeliveryPolicy);
                        return;
                    }

                    ActiveMQMessage message = this.createActiveMQMessage(md);
                    this.beforeMessageIsConsumed(md);

                    try {
                        boolean expired = this.isConsumerExpiryCheckEnabled() && message.isExpired();
                        if (!expired) {
                            listener.onMessage(message);
                        }

                        this.afterMessageIsConsumed(md, expired);
                    } catch (RuntimeException var7) {
                        LOG.error("{} Exception while processing message: {}", new Object[]{this.getConsumerId(), md.getMessage().getMessageId(), var7});
                        md.setRollbackCause(var7);
                        if (!this.isAutoAcknowledgeBatch() && !this.isAutoAcknowledgeEach() && !this.session.isIndividualAcknowledge()) {
                            this.afterMessageIsConsumed(md, false);
                        } else {
                            this.rollback();
                        }
                    }
                } else {
                    if (!this.unconsumedMessages.isRunning()) {
                        this.session.connection.rollbackDuplicate(this, md.getMessage());
                    }

                    if (md.getMessage() == null) {
                        this.unconsumedMessages.enqueue(md);
                    } else if (!this.consumeExpiredMessage(md)) {
                        this.unconsumedMessages.enqueue(md);
                        if (this.availableListener != null) {
                            this.availableListener.onMessageAvailable(this);
                        }
                    } else {
                        this.beforeMessageIsConsumed(md);
                        this.afterMessageIsConsumed(md, true);
                        if (this.info.getCurrentPrefetchSize() == 0) {
                            this.unconsumedMessages.enqueue((MessageDispatch)null);
                        }
                    }
                }
            }
        }

        if (++this.dispatchedCount % 1000 == 0) {
            this.dispatchedCount = 0;
            Thread.yield();
        }
    } catch (Exception var9) {
        this.session.connection.onClientInternalException(var9);
    }

}
代码语言:javascript
复制
## 幂等性保证
前面使用this.session.connection.isDuplicate(this, md.getMessage())判断是否是已经被消费了的消息重复发送,如果是就根据当前是否运行在事务模式下选择只发送ack还是还要调起事务回滚,判断重复主要是内部维护了一个生产者隔离的BitArrayBin来存储已经消费的消息的producerSequeueId
代码语言:javascript
复制
//ActiveMQConnection.class
protected boolean isDuplicate(ActiveMQDispatcher dispatcher, Message message) {
    return this.checkForDuplicates && this.connectionAudit.isDuplicate(dispatcher, message);
}
代码语言:javascript
复制
```
    //ConnectionAudit.class
     synchronized boolean isDuplicate(ActiveMQDispatcher dispatcher, Message message) {
        if (this.checkForDuplicates && message != null) {
            ActiveMQDestination destination = message.getDestination();
            if (destination != null) {
                ActiveMQMessageAudit audit;
                boolean result;
                if (destination.isQueue()) {
                    audit = (ActiveMQMessageAudit)this.destinations.get(destination);
                    if (audit == null) {
                        audit = new ActiveMQMessageAudit(this.auditDepth, this.auditMaximumProducerNumber);
                        this.destinations.put(destination, audit);
                    }

                    result = audit.isDuplicate(message);
                    return result;
                }

                audit = (ActiveMQMessageAudit)this.dispatchers.get(dispatcher);
                if (audit == null) {
                    audit = new ActiveMQMessageAudit(this.auditDepth, this.auditMaximumProducerNumber);
                    this.dispatchers.put(dispatcher, audit);
                }

                result = audit.isDuplicate(message);
                return result;
            }
        }

        return false;
    }
代码语言:javascript
复制
    //ActiveMQMessageAuditNotSync.class
    public boolean isDuplicate(MessageReference message) {
        MessageId id = message.getMessageId();
        return this.isDuplicate(id);
    }
    public boolean isDuplicate(MessageId id) {
        boolean answer = false;
        if (id != null) {
            ProducerId pid = id.getProducerId();
            if (pid != null) {
                BitArrayBin bab = (BitArrayBin)this.map.get(pid.toString());
                if (bab == null) {
                    bab = new BitArrayBin(this.auditDepth);
                    this.map.put(pid.toString(), bab);
                    this.modified = true;
                }

                answer = bab.setBit(id.getProducerSequenceId(), true);
            }
        }

        return answer;
    }

消息处理

如果不是重复消息,接下来判断是否本地注册了MessageListener,是进入下面代码

代码语言:javascript
复制
    this.beforeMessageIsConsumed(md);
    boolean expired = this.isConsumerExpiryCheckEnabled() && message.isExpired();
    if (!expired) {
        listener.onMessage(message);
    }
    this.afterMessageIsConsumed(md, expired);

第一步先调用beforeMessageIsConsumed做消息前置处理,主要是如果ack设置的不是自动提交,就将消息加入deliveryMessages(已处理待提交告知服务端队列)

代码语言:javascript
复制
    private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException {
        md.setDeliverySequenceId(this.session.getNextDeliveryId());
        this.lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId();
        if (!this.isAutoAcknowledgeBatch()) {
            synchronized(this.deliveredMessages) {
                this.deliveredMessages.addFirst(md);
            }

            if (this.session.getTransacted()) {
                if (this.transactedIndividualAck) {
                    this.immediateIndividualTransactedAck(md);
                } else {
                    this.ackLater(md, (byte)0);
                }
            }
        }

    }

接着判断是否有设定超时并且消息是否超时了,没有超时检测或者没有超时就调用注册的MessageListener的onMessage方法,接着进行后置处理,迭代deliveryMessages,告知服务器已经消费,并发送ack

代码语言:javascript
复制
    private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException {
        if (!this.unconsumedMessages.isClosed()) {
            if (messageExpired) {
                this.acknowledge(md, (byte)6);
                this.stats.getExpiredMessageCount().increment();
            } else {
                this.stats.onMessage();
                if (!this.session.getTransacted()) {
                    if (this.isAutoAcknowledgeEach()) {
                        if (this.deliveryingAcknowledgements.compareAndSet(false, true)) {
                            synchronized(this.deliveredMessages) {
                                if (!this.deliveredMessages.isEmpty()) {
                                    MessageAck ack;
                                    if (this.optimizeAcknowledge) {
                                        ++this.ackCounter;
                                        if ((double)(this.ackCounter + this.deliveredCounter) >= (double)this.info.getPrefetchSize() * 0.65D || this.optimizeAcknowledgeTimeOut > 0L && System.currentTimeMillis() >= this.optimizeAckTimestamp + this.optimizeAcknowledgeTimeOut) {
                                            ack = this.makeAckForAllDeliveredMessages((byte)2);
                                            if (ack != null) {
                                                this.deliveredMessages.clear();
                                                this.ackCounter = 0;
                                                this.session.sendAck(ack);
                                                this.optimizeAckTimestamp = System.currentTimeMillis();
                                            }

                                            if (this.pendingAck != null && this.deliveredCounter > 0) {
                                                this.session.sendAck(this.pendingAck);
                                                this.pendingAck = null;
                                                this.deliveredCounter = 0;
                                            }
                                        }
                                    } else {
                                        ack = this.makeAckForAllDeliveredMessages((byte)2);
                                        if (ack != null) {
                                            this.deliveredMessages.clear();
                                            this.session.sendAck(ack);
                                        }
                                    }
                                }
                            }

                            this.deliveryingAcknowledgements.set(false);
                        }
                    } else if (this.isAutoAcknowledgeBatch()) {
                        this.ackLater(md, (byte)2);
                    } else {
                        if (!this.session.isClientAcknowledge() && !this.session.isIndividualAcknowledge()) {
                            throw new IllegalStateException("Invalid session state.");
                        }

                        boolean messageUnackedByConsumer = false;
                        synchronized(this.deliveredMessages) {
                            messageUnackedByConsumer = this.deliveredMessages.contains(md);
                        }

                        if (messageUnackedByConsumer) {
                            this.ackLater(md, (byte)0);
                        }
                    }
                }
            }

        }
    }
    private void ackLater(MessageDispatch md, byte ackType) throws JMSException {
        if (this.session.getTransacted()) {
            this.registerSync();
        }

        ++this.deliveredCounter;
        synchronized(this.deliveredMessages) {
            MessageAck oldPendingAck = this.pendingAck;
            this.pendingAck = new MessageAck(md, ackType, this.deliveredCounter);
            this.pendingAck.setTransactionId(this.session.getTransactionContext().getTransactionId());
            if (oldPendingAck == null) {
                this.pendingAck.setFirstMessageId(this.pendingAck.getLastMessageId());
            } else if (oldPendingAck.getAckType() == this.pendingAck.getAckType()) {
                this.pendingAck.setFirstMessageId(oldPendingAck.getFirstMessageId());
            } else if (!oldPendingAck.isDeliveredAck()) {
                LOG.debug("Sending old pending ack {}, new pending: {}", oldPendingAck, this.pendingAck);
                this.session.sendAck(oldPendingAck);
            } else {
                LOG.debug("dropping old pending ack {}, new pending: {}", oldPendingAck, this.pendingAck);
            }

            if (0.5D * (double)this.info.getPrefetchSize() <= (double)(this.deliveredCounter + this.ackCounter - this.additionalWindowSize)) {
                LOG.debug("ackLater: sending: {}", this.pendingAck);
                this.session.sendAck(this.pendingAck);
                this.pendingAck = null;
                this.deliveredCounter = 0;
                this.additionalWindowSize = 0;
            }
        }
    }

首先判断不是事务模式(因为事务模式要手动去提交),然后判断isAutoAcknowledgeEach(自动提交,一次ack提交所有消息),发送对应的ack,清空deliveryMessages,如果是isAutoAcknowledgeBatch(批量自动提交),则调用ackLater方法延迟发送,具体实现为,使用pendingAck参数,因为前面代码里有,在isAutoAcknowledgeEach中,如果pendingAck不为空,最后是会发送的,同时当满足

代码语言:javascript
复制
0.5D * (double)this.info.getPrefetchSize() <= (double)(this.deliveredCounter + this.ackCounter - this.additionalWindowSize)

即已经消费待提交的消息数量与ackCounter减去配置的额外窗口大小 >= prefetchSize的一半,ackLater方法也会发送pendingAck,将累计的已消费消息都提交。而这里的ackCounter是由参数optimizeAcknowledge决定的,如果不开启一直是0,条件变为

代码语言:javascript
复制
已经消费待提交的消息数量 >= prefetchSize的一半

这个参数代表根据ack累计优化,实际在前面创建consumer时根据connection的isOptimizeAcknowledge和session的isAutoAcknowledge(因为要自动提交模式的才会有需要对ack优化)和browser(默认是false)参数配置了

代码语言:javascript
复制
this.optimizeAcknowledge = session.connection.isOptimizeAcknowledge() && session.isAutoAcknowledge() && !this.info.isBrowser();

拉取消息

前面就讲了异步地等待服务器推的处理模式,接下来简单讲解recieve方法这种拉的工作模式

代码语言:javascript
复制
    //ActiveMQConsumer.class
    protected void sendPullCommand(long timeout) throws JMSException {
        this.clearDeliveredList();
        if (this.info.getCurrentPrefetchSize() == 0 && this.unconsumedMessages.isEmpty()) {
            MessagePull messagePull = new MessagePull();
            messagePull.configure(this.info);
            messagePull.setTimeout(timeout);
            this.session.asyncSendPacket(messagePull);
        }
    }
    public Message receive() throws JMSException {
        this.checkClosed();
        this.checkMessageListener();
        this.sendPullCommand(0L);
        MessageDispatch md = this.dequeue(-1L);
        if (md == null) {
            return null;
        } else {
            this.beforeMessageIsConsumed(md);
            this.afterMessageIsConsumed(md, false);
            return this.createActiveMQMessage(md);
        }
    }

主要逻辑就是异步发送一个pull的命令,然后调用dequeue方法去阻塞(没有设定超时时)直到获得一个消息。 所以,在前一篇博客的基础上,实际是由ActiveMQConnection的onCommand方法与consumer端的处理衔接上了,最终流程图如下

activemq最终流程图

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-05-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 歪歪梯Club 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 分析结果
  • 程序代码
  • createConsumer
  • ActiveMQMessageConsumer.start
  • ActiveMQSessionExecutor.iterate
  • onCommand
  • MessageDispatch
  • 消息处理
  • 拉取消息
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档