专栏首页歪歪梯ClubActiveMQ源码分析——生产消息

ActiveMQ源码分析——生产消息

分析结果

先看看最终由本博客分析后绘制出来的总体流程结构图

activeMQ流程

创建连接

         //1、创建工厂连接对象,需要制定ip和端口号
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");
        //2、使用连接工厂创建一个连接对象
        Connection connection = connectionFactory.createConnection();
        //3、开启连接
        connection.start();
        //4、使用连接对象创建会话(session)对象
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

创建Session时,第一个传入是否开启事务,第二个传入session提交消费消息的方式 接下来看源码处理,生产者id对象由当前sessionID加上使用内部session序列号生成器(long类型自增)生成的id组合构成

public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
        this.checkClosedOrFailed();
        this.ensureConnectionInfoSent();
        if (!transacted) {
            if (acknowledgeMode == 0) {
                throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session");
            }

            if (acknowledgeMode < 0 || acknowledgeMode > 4) {
                throw new JMSException("...异常信息很长,省略了");
            }
        }
        return new ActiveMQSession(this, this.getNextSessionId(), transacted ? 0 : acknowledgeMode, this.isDispatchAsync(), this.isAlwaysSessionAsync());
    }
    protected ProducerId getNextProducerId() {
        return new ProducerId(this.info.getSessionId(), this.producerIdGenerator.getNextSequenceId());
    }

最终调用ActiveMQSession的构造方法 new ActiveMQSession(this, this.getNextSessionId(), transacted ? 0 : acknowledgeMode, this.isDispatchAsync(), this.isAlwaysSessionAsync()) 也就是说,如果传入了开启事务,使用ack为0的模式创建session,否则使用给进来的ack参数,根据不同ack,使用不同的提交确认策略

AUTO_ACKNOWLEDGE = 1 自动确认 CLIENT_ACKNOWLEDGE = 2 客户端手动确认 DUPS_OK_ACKNOWLEDGE = 3 自动批量确认 SESSION_TRANSACTED = 0 事务提交并确认 INDIVIDUAL_ACKNOWLEDGE = 4 单条消息确认

翻阅了session里面无果,决定先从connection和connectionFactory入手,factory中也没开启发送相关的参数,进入connection源码查看 Connection connection = connectionFactory.createConnection(); 最终会create一个Transport 对象,并调用他的start方法(有内味了),继续

protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException {
        if (this.brokerURL == null) {
            throw new ConfigurationException("brokerURL not set.");
        } else {
            ActiveMQConnection connection = null;

            try {
                Transport transport = this.createTransport();
                connection = this.createActiveMQConnection(transport, this.factoryStats);
                connection.setUserName(userName);
                connection.setPassword(password);
                this.configureConnection(connection);
                transport.start();
                if (this.clientID != null) {
                    connection.setDefaultClientID(this.clientID);
                }

                return connection;
            } catch (JMSException var8) {
                try {
                    connection.close();
                } catch (Throwable var6) {
                }

                throw var8;
            } catch (Exception var9) {
                try {
                    connection.close();
                } catch (Throwable var7) {
                }

                throw JMSExceptionSupport.create("Could not connect to broker URL: " + this.brokerURL + ". Reason: " + var9, var9);
            }
        }
    }
    protected Transport createTransport() throws JMSException {
        try {
            URI connectBrokerUL = this.brokerURL;
            String scheme = this.brokerURL.getScheme();
            if (scheme == null) {
                throw new IOException("Transport not scheme specified: [" + this.brokerURL + "]");
            } else {
                if (scheme.equals("auto")) {
                    connectBrokerUL = new URI(this.brokerURL.toString().replace("auto", "tcp"));
                } else if (scheme.equals("auto+ssl")) {
                    connectBrokerUL = new URI(this.brokerURL.toString().replace("auto+ssl", "ssl"));
                } else if (scheme.equals("auto+nio")) {
                    connectBrokerUL = new URI(this.brokerURL.toString().replace("auto+nio", "nio"));
                } else if (scheme.equals("auto+nio+ssl")) {
                    connectBrokerUL = new URI(this.brokerURL.toString().replace("auto+nio+ssl", "nio+ssl"));
                }

                return TransportFactory.connect(connectBrokerUL);
            }
        } catch (Exception var3) {
            throw JMSExceptionSupport.create("Could not create Transport. Reason: " + var3, var3);
        }
    }

先根据传进来的broker的url解析成新的url,接着调用TransportFactory.connect(connectBrokerUL),在里面根据url解析得到一个wireFormat对象(主要是对流的字节数据的解析协议,比如有mqtt协议的wireformat,有amqp协议的wireformat),接着使用wireformat和broker的url信息创建一个Transport

Transport包装

public static Transport connect(URI location) throws Exception {
        TransportFactory tf = findTransportFactory(location);
        return tf.doConnect(location);
    }
public Transport doConnect(URI location) throws Exception {
        try {
            Map<String, String> options = new HashMap(URISupport.parseParameters(location));
            if (!options.containsKey("wireFormat.host")) {
                options.put("wireFormat.host", location.getHost());
            }

            WireFormat wf = this.createWireFormat(options);
            Transport transport = this.createTransport(location, wf);
            Transport rc = this.configure(transport, wf, options);
            IntrospectionSupport.extractProperties(options, "auto.");
            if (!options.isEmpty()) {
                throw new IllegalArgumentException("Invalid connect parameters: " + options);
            } else {
                return rc;
            }
        } catch (URISyntaxException var6) {
            throw IOExceptionSupport.create(var6);
        }
    }

源码中使用了装饰器模式,首先,使用WireFormat创建一个Transport

//TcpTransportFactory.class
protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException {
        URI localLocation = null;
        String path = location.getPath();
        if (path != null && path.length() > 0) {
            int localPortIndex = path.indexOf(58);

            try {
                Integer.parseInt(path.substring(localPortIndex + 1, path.length()));
                String localString = location.getScheme() + ":/" + path;
                localLocation = new URI(localString);
            } catch (Exception var7) {
                LOG.warn("path isn't a valid local location for TcpTransport to use", var7.getMessage());
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Failure detail", var7);
                }
            }
        }

        SocketFactory socketFactory = this.createSocketFactory();
        return this.createTcpTransport(wf, socketFactory, location, localLocation);
    }
    protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
        return new TcpTransport(wf, socketFactory, location, localLocation);
    }

最终构建了一个TcpTransport,TcpTransport的构造方法中创建了一个socket(哪里跑),此处传进来的SocketFactory是在TcpTransportFactory中创建的,使用的是DefaultSocketFactory

 public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
        this.connectionTimeout = 30000;
        this.socketBufferSize = 65536;
        this.ioBufferSize = 8192;
        this.closeAsync = true;
        this.buffOut = null;
        this.trafficClass = 0;
        this.trafficClassSet = false;
        this.diffServChosen = false;
        this.typeOfServiceChosen = false;
        this.trace = false;
        this.logWriterName = TransportLoggerSupport.defaultLogWriterName;
        this.dynamicManagement = false;
        this.startLogging = true;
        this.jmxPort = 1099;
        this.useLocalHost = false;
        this.stoppedLatch = new AtomicReference();
        this.soLinger = -2147483648;
        this.wireFormat = wireFormat;
        this.socketFactory = socketFactory;

        try {
            this.socket = socketFactory.createSocket();
        } catch (SocketException var6) {
            this.socket = null;
        }

        this.remoteLocation = remoteLocation;
        this.localLocation = localLocation;
        this.initBuffer = null;
        this.setDaemon(false);
    }

在DefaultSocketFactory中,创建方法就是单纯实例化一个socket

    DefaultSocketFactory() {
    }

    public Socket createSocket() {
        return new Socket();
    }

回到TransportFactory的方法Connect,这时往下

public Transport doConnect(URI location) throws Exception {
        try {
            Map<String, String> options = new HashMap(URISupport.parseParameters(location));
            if (!options.containsKey("wireFormat.host")) {
                options.put("wireFormat.host", location.getHost());
            }

            WireFormat wf = this.createWireFormat(options);
            Transport transport = this.createTransport(location, wf);
            Transport rc = this.configure(transport, wf, options);
            IntrospectionSupport.extractProperties(options, "auto.");
            if (!options.isEmpty()) {
                throw new IllegalArgumentException("Invalid connect parameters: " + options);
            } else {
                return rc;
            }
        } catch (URISyntaxException var6) {
            throw IOExceptionSupport.create(var6);
        }
    }

继续执行this.configure(transport, wf, options);

public Transport configure(Transport transport, WireFormat wf, Map options) throws Exception {
        transport = this.compositeConfigure(transport, wf, options);
        transport = new MutexTransport(transport);
        transport = new ResponseCorrelator(transport);
        return transport;
    }

发现由对刚刚生成的TcpTransport进行三次包装 第一层包装加入TcpTransport发送数据超时的控制过滤器

public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
        if (options.containsKey("soWriteTimeout")) {
            transport = new WriteTimeoutFilter((Transport)transport);
            String soWriteTimeout = (String)options.remove("soWriteTimeout");
            if (soWriteTimeout != null) {
                ((WriteTimeoutFilter)transport).setWriteTimeout(Long.parseLong(soWriteTimeout));
            }
        }

        IntrospectionSupport.setProperties(transport, options);
        return (Transport)transport;
    }

看看WriteTimeoutFilter源码,其存在一个静态成员,完成了加载一个TimeoutThread,并且该类时线程子类,并在构造中完成了对自己的启动

    public class WriteTimeoutFilter extends TransportFilter {
        protected static ConcurrentLinkedQueue<WriteTimeoutFilter> writers = new ConcurrentLinkedQueue();
        protected static WriteTimeoutFilter.TimeoutThread timeoutThread = new WriteTimeoutFilter.TimeoutThread();
        public WriteTimeoutFilter(Transport next) {
            super(next);
        }
        protected static class TimeoutThread extends Thread {
        static AtomicInteger instance = new AtomicInteger(0);
        boolean run = true;

        public TimeoutThread() {
            this.setName("WriteTimeoutFilter-Timeout-" + instance.incrementAndGet());
            this.setDaemon(true);
            this.setPriority(1);
            this.start();
        }

        public void run() {
            while(this.run) {
                boolean error = false;

                try {
                    if (!interrupted()) {
                        Iterator filters = WriteTimeoutFilter.writers.iterator();

                        label48:
                        while(true) {
                            WriteTimeoutFilter filter;
                            do {
                                if (!this.run || !filters.hasNext()) {
                                    break label48;
                                }

                                filter = (WriteTimeoutFilter)filters.next();
                            } while(filter.getWriteTimeout() <= 0L);

                            long writeStart = filter.getWriter().getWriteTimestamp();
                            long delta = filter.getWriter().isWriting() && writeStart > 0L ? System.currentTimeMillis() - writeStart : -1L;
                            if (delta > filter.getWriteTimeout()) {
                                WriteTimeoutFilter.deRegisterWrite(filter, true, (IOException)null);
                            }
                        }
                    }

                    try {
                        Thread.sleep(WriteTimeoutFilter.getSleep());
                        error = false;
                    } catch (InterruptedException var8) {
                    }
                } catch (Throwable var9) {
                    if (!error) {
                        WriteTimeoutFilter.LOG.error("WriteTimeout thread unable validate existing sockets.", var9);
                        error = true;
                    }
                }
            }

        }
    }
    }

可以看到WriteTimeoutFilter内部维护了一个ConcurrentLinkedQueue保存所有构造的WriteTimeoutFilter对象,在启动的TimeoutThread中,循环迭代检测每个WriteTimeoutFilter对象的输出流是否已经超时,其时通过next.narrow(TimeStampStream.class),也就是TcpTransport中给方法获得一个TimeStampStream对象,从而获得其开始输出的时间

    protected TimeStampStream getWriter() {
        return (TimeStampStream)this.next.narrow(TimeStampStream.class);
    }
    //TcpTransport.class
    public <T> T narrow(Class<T> target) {
        if (target == Socket.class) {
            return target.cast(this.socket);
        } else {
            return target == TimeStampStream.class ? target.cast(this.buffOut) : super.narrow(target);
        }
    }

如果超时了就会调用deRegisterWrite方法,将WriteTimeoutFilter的socker关闭并从维护的内部集合中删除

    protected Socket getSocket() {
        return (Socket)this.next.narrow(Socket.class);
    }
    protected static boolean deRegisterWrite(WriteTimeoutFilter filter, boolean fail, IOException iox) {
        boolean result = writers.remove(filter);
        if (result && fail) {
            String message = "Forced write timeout for:" + filter.getNext().getRemoteAddress();
            LOG.warn(message);
            Socket sock = filter.getSocket();
            if (sock == null) {
                LOG.error("Destination socket is null, unable to close socket.(" + message + ")");
            } else {
                try {
                    sock.close();
                } catch (IOException var7) {
                }
            }
        }

        return result;
    }

最终获取到的socket也就是使用TcpTransport中的narrow方法,方法中主要时做一下校验,没问题就返回真实的socket对象(也就是一开始构造时创建的)

    public <T> T narrow(Class<T> target) {
        if (target == Socket.class) {
            return target.cast(this.socket);
        } else {
            return target == TimeStampStream.class ? target.cast(this.buffOut) : super.narrow(target);
        }
    }
    @HotSpotIntrinsicCandidate
    public T cast(Object obj) {
        if (obj != null && !this.isInstance(obj)) {
            throw new ClassCastException(this.cannotCastMsg(obj));
        } else {
            return obj;
        }
    }

接下来看到构造函数中调用了super(next),看看其父类TransportFilter的代码

public class TransportFilter implements TransportListener, Transport {
    protected final Transport next;
    protected TransportListener transportListener;

    public TransportFilter(Transport next) {
        this.next = next;
    }

    public TransportListener getTransportListener() {
        return this.transportListener;
    }

    public void setTransportListener(TransportListener channelListener) {
        this.transportListener = channelListener;
        if (channelListener == null) {
            this.next.setTransportListener((TransportListener)null);
        } else {
            this.next.setTransportListener(this);
        }

    }

    public void start() throws Exception {
        if (this.next == null) {
            throw new IOException("The next channel has not been set.");
        } else if (this.transportListener == null) {
            throw new IOException("The command listener has not been set.");
        } else {
            this.next.start();
        }
    }

    public void onCommand(Object command) {
        this.transportListener.onCommand(command);
    }

    public void oneway(Object command) throws IOException {
        this.next.oneway(command);
    }
    .....
}

TransportFilter 其实是一个提供模板代码的父类,实现了Transport接口,其中聚合了一个Transport对象作为自己的next(这也符合其Filter的名称,类似过滤器链,带Filter就是带next),TransportFilter 对于接口的实现都是调用next对象的对应方法(因为TransportFilter 聚合的是下一个Transport ),所以真正的实现得看当前具体是TransportFilter的哪个实现,因此我们先记一下Transport 链条的顺序,待会再看是先执行什么方法,毫无疑问的,剩下两个包装类MutexTransport 和ResponseCorrelator 也是TransportFilter的子类(呜呜呜,只有TcpTransport是亲儿子,不用继承TransportFilter,因为传输层是最后的一个呀,不需要next了O(∩_∩)O)

    public class MutexTransport extends TransportFilter {
        private final ReentrantLock writeLock = new ReentrantLock();
        private boolean syncOnCommand;

        public MutexTransport(Transport next) {
            super(next);
            this.syncOnCommand = false;
        }
    }
    public class ResponseCorrelator extends TransportFilter {
        private final Map<Integer, FutureResponse> requestMap;
        private IntSequenceGenerator sequenceGenerator;
        private IOException error;
        public ResponseCorrelator(Transport next) {
            this(next, new IntSequenceGenerator());
        }
    }

最终包装下来的Transport就是如下图

Transport包装顺序

transport.start

回到一开始创建connection那里,发现接下来执行的就是transport的start方法

protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException {
        if (this.brokerURL == null) {
            throw new ConfigurationException("brokerURL not set.");
        } else {
            ActiveMQConnection connection = null;
             Transport transport = this.createTransport();
             connection = this.createActiveMQConnection(transport, this.factoryStats);
             connection.setUserName(userName);
             connection.setPassword(password);
             this.configureConnection(connection);
             transport.start();
             if (this.clientID != null) {
                 connection.setDefaultClientID(this.clientID);
             }
             return connection;
       }

这里有个大坑,先是通过createActiveMQConnection创建connecttion,其需要聚合transport,然后我们看connection的实际情况(居然丧心病狂的在构造里将connection的this作为transport的transportListener绑定,这波循环引用,害我找了好久的transportListener),同时TransportFilter的子类对象的setTransportListener方法部署简单的setter,同时会将自己作为next的TransportListener进行绑定(者代码实在难懂)

    //ActiveMQConnectionFactory.class
    protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception {
        ActiveMQConnection connection = new ActiveMQConnection(transport, this.getClientIdGenerator(), this.getConnectionIdGenerator(), stats);
        return connection;
    }
    //ActiveMQConnection.class
    protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats) throws Exception {
        this.maxThreadPoolSize = DEFAULT_THREAD_POOL_SIZE;
        this.rejectedTaskHandler = null;
        this.trustedPackages = new ArrayList();
        this.trustAllPackages = false;
        this.transport = transport;
        this.clientIdGenerator = clientIdGenerator;
        this.factoryStats = factoryStats;
        this.executor = new ThreadPoolExecutor(1, 1, 5L, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() {
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport);
                return thread;
            }
        });
        String uniqueId = connectionIdGenerator.generateId();
        this.info = new ConnectionInfo(new ConnectionId(uniqueId));
        this.info.setManageable(true);
        this.info.setFaultTolerant(transport.isFaultTolerant());
        this.connectionSessionId = new SessionId(this.info.getConnectionId(), -1L);
        this.transport.setTransportListener(this);
        this.stats = new JMSConnectionStatsImpl(this.sessions, this instanceof XAConnection);
        this.factoryStats.addConnection(this);
        this.timeCreated = System.currentTimeMillis();
        this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant());
    }
    //TransportFilter.class
    public void setTransportListener(TransportListener channelListener) {
        this.transportListener = channelListener;
        if (channelListener == null) {
            this.next.setTransportListener((TransportListener)null);
        } else {
            this.next.setTransportListener(this);
        }

    }

所以这时结构如下

Transport结构 按照包装顺序,此时的transport应该是ResponseCorrelator ,该类中没有该方法,代表这个TransportFilter在start时不需要额外操作,执行父类的start方法,直接调用next的start

    //TransportFilter.class
    public void start() throws Exception {
        if (this.next == null) {
            throw new IOException("The next channel has not been set.");
        } else if (this.transportListener == null) {
            throw new IOException("The command listener has not been set.");
        } else {
            this.next.start();
        }
    }

此时就到了MutexTransport,发现里面也没有start,也就是再次走父类TransportFilter的start,继续到next,也就是WriteTimeoutFilter

    public void start() throws Exception {
        super.start();
    }

    public void stop() throws Exception {
        super.stop();
    }

???黑人问号,这里也是没事情做,直接走父类TransportFilter的方法,就是不知道为啥在这里不省略这两个方法,反而是实现,结果还是调用super(秀技?),结果这时就来到TcpTransport了,因为TcpTransport继承了TransportThreadSupport,TransportThreadSupport又继承了TransportSupport,TransportSupport又继承了ServiceTransport,这几个类中唯一实现了start方法的是ServiceTransport,于是进入里面源码

    public void start() throws Exception {
        if (this.started.compareAndSet(false, true)) {
            boolean success = false;
            this.stopped.set(false);

            try {
                this.preStart();
                this.doStart();
                success = true;
            } finally {
                this.started.set(success);
            }

            Iterator var2 = this.serviceListeners.iterator();

            while(var2.hasNext()) {
                ServiceListener l = (ServiceListener)var2.next();
                l.started(this);
            }
        }

    }

也就是会先调用preStart方法(空实现),然后调用doStart方法,此时执行TcpTransport的doStart方法,完成连接到一开始经过ConnectionFactory传递到TransportFactory再到TcpTransport里的mqtt远程的地址和端口,完成对端口和流的初始化

    //TcpTransport.class
    protected void doStart() throws Exception {
        this.connect();
        this.stoppedLatch.set(new CountDownLatch(1));
        super.doStart();
    }
    protected void connect() throws Exception {
        if (this.socket == null && this.socketFactory == null) {
            throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set");
        } else {
            InetSocketAddress localAddress = null;
            InetSocketAddress remoteAddress = null;
            if (this.localLocation != null) {
                localAddress = new InetSocketAddress(InetAddress.getByName(this.localLocation.getHost()), this.localLocation.getPort());
            }

            if (this.remoteLocation != null) {
                String host = this.resolveHostName(this.remoteLocation.getHost());
                remoteAddress = new InetSocketAddress(host, this.remoteLocation.getPort());
            }

            this.trafficClassSet = this.setTrafficClass(this.socket);
            if (this.socket != null) {
                if (localAddress != null) {
                    this.socket.bind(localAddress);
                }

                if (remoteAddress != null) {
                    if (this.connectionTimeout >= 0) {
                        this.socket.connect(remoteAddress, this.connectionTimeout);
                    } else {
                        this.socket.connect(remoteAddress);
                    }
                }
            } else if (localAddress != null) {
                this.socket = this.socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(), localAddress.getAddress(), localAddress.getPort());
            } else {
                this.socket = this.socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort());
            }

            this.initialiseSocket(this.socket);
            this.initializeStreams();
        }
    }

接着再调用super.doStart方法也就是TransportThreadSupport中的方法,开启一个线程,将this作为runnable传递并运行

    protected void doStart() throws Exception {
        this.runner = new Thread((ThreadGroup)null, this, "ActiveMQ Transport: " + this.toString(), this.stackSize);
        this.runner.setDaemon(this.daemon);
        this.runner.start();
    }

因为当前是TcpTransport,所以线程的运行方法是TcpTransport中的run方法,在里面会循环调用doRun方法,根据一开始我们传递进来的协议格式(wireFormat)读取接收到的命令,调用doConsume方法去处理收到的服务端命令消息

    public void run() {
        LOG.trace("TCP consumer thread for " + this + " starting");
        this.runnerThread = Thread.currentThread();

        try {
            while(!this.isStopped()) {
                this.doRun();
            }
        } catch (IOException var7) {
            ((CountDownLatch)this.stoppedLatch.get()).countDown();
            this.onException(var7);
        } catch (Throwable var8) {
            ((CountDownLatch)this.stoppedLatch.get()).countDown();
            IOException ioe = new IOException("Unexpected error occurred: " + var8);
            ioe.initCause(var8);
            this.onException(ioe);
        } finally {
            ((CountDownLatch)this.stoppedLatch.get()).countDown();
        }

    }
    protected void doRun() throws IOException {
        try {
            Object command = this.readCommand();
            this.doConsume(command);
        } catch (SocketTimeoutException var2) {
        } catch (InterruptedIOException var3) {
        }

    }

也就是说,此时会开启一个线程,持续读取服务端的命令消息,并处理该消息,因为这时服务端给到客户端的,先了解完客户端发送的逻辑再回来看这个,接下来就到了

    //6、使用会话对象创建生产者对象
    MessageProducer producer = session.createProducer(queue);
    //7、使用会话对象创建一个消息对象
    TextMessage textMessage = session.createTextMessage("hello!test-queue");
    //8、发送消息
    producer.send(textMessage);

发送消息

前面两个都是创建对象,没啥好说的,主要吧session聚合上了,看看发送消息源码

    //ActiveMQMessageProducerSupport.class
    private MemoryUsage producerWindow;
    public void send(Message message) throws JMSException {
        this.send(this.getDestination(), message, this.defaultDeliveryMode, this.defaultPriority, this.defaultTimeToLive);
    }
    public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, AsyncCallback onComplete) throws JMSException {
        this.checkClosed();
        if (destination == null) {
            if (this.info.getDestination() == null) {
                throw new UnsupportedOperationException("A destination must be specified.");
            } else {
                throw new InvalidDestinationException("Don't understand null destinations");
            }
        } else {
            ActiveMQDestination dest;
            if (destination.equals(this.info.getDestination())) {
                dest = (ActiveMQDestination)destination;
            } else {
                if (this.info.getDestination() != null) {
                    throw new UnsupportedOperationException("This producer can only send messages to: " + this.info.getDestination().getPhysicalName());
                }

                dest = ActiveMQDestination.transform(destination);
            }

            if (dest == null) {
                throw new JMSException("No destination specified");
            } else {
                if (this.transformer != null) {
                    Message transformedMessage = this.transformer.producerTransform(this.session, this, message);
                    if (transformedMessage != null) {
                        message = transformedMessage;
                    }
                }

                if (this.producerWindow != null) {
                    try {
                        this.producerWindow.waitForSpace();
                    } catch (InterruptedException var10) {
                        throw new JMSException("Send aborted due to thread interrupt.");
                    }
                }

                this.session.send(this, dest, message, deliveryMode, priority, timeToLive, this.producerWindow, this.sendTimeout, onComplete);
                this.stats.onMessage();
            }
        }
    }

首先调用checkClose检查会话状态,确保不会被关闭了,接着第一步主要是根据你用的Destination是什么?(queue\topic)进行了一下转化为通用的ActiveMQDestination,如果有指定特定的transformer则再进行一下消息的producerTransform,这里没有跳过,接下来核心代码就是

        if (this.producerWindow != null) {
                  try {
                      this.producerWindow.waitForSpace();
                  } catch (InterruptedException var10) {
                      throw new JMSException("Send aborted due to thread interrupt.");
                  }
              }

              this.session.send(this, dest, message, deliveryMode, priority, timeToLive, this.producerWindow, this.sendTimeout, onComplete);
              this.stats.onMessage();

producerWindow是一个MemoryUsage的对象,主要是记录当前消息堆积了多少内存大小的空间,进行内存的控制,我们看看其部分代码

    public class MemoryUsage extends Usage<MemoryUsage> {
    private long usage;

    public void waitForSpace() throws InterruptedException {
        if (this.parent != null) {
            ((MemoryUsage)this.parent).waitForSpace();
        }

        this.usageLock.readLock().lock();

        try {
            if (this.percentUsage >= 100 && this.isStarted()) {
                this.usageLock.readLock().unlock();
                this.usageLock.writeLock().lock();

                try {
                    while(this.percentUsage >= 100 && this.isStarted()) {
                        this.waitForSpaceCondition.await();
                    }
                } finally {
                    this.usageLock.writeLock().unlock();
                    this.usageLock.readLock().lock();
                }
            }

            if (this.percentUsage >= 100 && !this.isStarted()) {
                throw new InterruptedException("waitForSpace stopped during wait.");
            }
        } finally {
            this.usageLock.readLock().unlock();
        }

    }
   }

其内部维护一个long类型的成员代表占用内存的字节大小,waitForSpace方法获得字节的读锁进行加锁,判断如果当前使用的内存超过限定内存的100%则改为使用写锁,释放读锁获取写锁,一直等待直到使用的内存小于100%(有空闲可用),此处之所以是>=是因为,没办法刚好处理到100%,可能到了99%,多加一条消息,消息大小超过1%的内存,总而言之,waitForSpace是在等待有空闲的内存,防止消息堆积过多,接下来就进入发送了this.session.send(this, dest, message, deliveryMode, priority, timeToLive, this.producerWindow, this.sendTimeout, onComplete);

    //ActiveMQSession.class
     protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive, MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException {
        this.checkClosed();
        if (destination.isTemporary() && this.connection.isDeleted(destination)) {
            throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination);
        } else {
            synchronized(this.sendMutex) {
                this.doStartTransaction();
                TransactionId txid = this.transactionContext.getTransactionId();
                long sequenceNumber = producer.getMessageSequence();
                message.setJMSDeliveryMode(deliveryMode);
                long expiration = 0L;
                if (!producer.getDisableMessageTimestamp()) {
                    long timeStamp = System.currentTimeMillis();
                    message.setJMSTimestamp(timeStamp);
                    if (timeToLive > 0L) {
                        expiration = timeToLive + timeStamp;
                    }
                }

                message.setJMSExpiration(expiration);
                message.setJMSPriority(priority);
                message.setJMSRedelivered(false);
                ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, this.connection);
                msg.setDestination(destination);
                msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
                if (msg != message) {
                    message.setJMSMessageID(msg.getMessageId().toString());
                    message.setJMSDestination(destination);
                }

                msg.setBrokerPath((BrokerId[])null);
                msg.setTransactionId(txid);
                if (this.connection.isCopyMessageOnSend()) {
                    msg = (ActiveMQMessage)msg.copy();
                }

                msg.setConnection(this.connection);
                msg.onSend();//将消息变为只读
                msg.setProducerId(msg.getMessageId().getProducerId());
                if (LOG.isTraceEnabled()) {
                    LOG.trace(this.getSessionId() + " sending message: " + msg);
                }

                if (onComplete != null || sendTimeout > 0 || msg.isResponseRequired() || this.connection.isAlwaysSyncSend() || msg.isPersistent() && !this.connection.isUseAsyncSend() && txid == null) {
                    if (sendTimeout > 0 && onComplete == null) {
                        this.connection.syncSendPacket(msg, sendTimeout);
                    } else {
                        this.connection.syncSendPacket(msg, onComplete);
                    }
                } else {
                    this.connection.asyncSendPacket(msg);
                    if (producerWindow != null) {
                        int size = msg.getSize();
                        producerWindow.increaseUsage((long)size);
                    }
                }

            }
        }
    }

可以看到,发送都是会加锁,锁是session里面的sendMutex对象,也就是session是同步处理消息的,然后根据一些配置,选择syncSendPacket(同步发送)或者asyncSendPacket(异步发送),如果使用了异步发送并且有限制堆积的消息大小,此时会维护producerWindow已经使用的内存大小(即堆积的待发送完成消息大小),先看看异步发送吧

    //ActiveMQConnection.class
    public void asyncSendPacket(Command command) throws JMSException {
        if (this.isClosed()) {
            throw new ConnectionClosedException();
        } else {
            this.doAsyncSendPacket(command);
        }
    }
    private void doAsyncSendPacket(Command command) throws JMSException {
        try {
            this.transport.oneway(command);
        } catch (IOException var3) {
            throw JMSExceptionSupport.create(var3);
        }
    }

前面讲过,最外层第一个transport应该是ResponseCorrelator,查看其oneway方法

    public void oneway(Object o) throws IOException {
        Command command = (Command)o;
        command.setCommandId(this.sequenceGenerator.getNextSequenceId());
        command.setResponseRequired(false);
        this.next.oneway(command);
    }

标记了不需要回调,然后进入下一个oneway,也就是MutexTransport,MutexTransport中主要是对其加了一个MutexTransport的写锁,也就是保证了同个connection下消息的同步控制

 public void oneway(Object command) throws IOException {
        this.writeLock.lock();

        try {
            this.next.oneway(command);
        } finally {
            this.writeLock.unlock();
        }

    }

接下来到了WriteTimeoutFilter中,前面讲过其维护一个内部集合,并开启一个线程,定期检查该集合中的WriteTimeoutFilter是否超时,此时就是将当前消息加入该集合,再调用下一个oneway(super的实现)

    public void oneway(Object command) throws IOException {
        try {
            registerWrite(this);
            super.oneway(command);
        } catch (IOException var6) {
            throw var6;
        } finally {
            deRegisterWrite(this, false, (IOException)null);
        }

    }

这时就来的TcpTransport了,进行数据的传输

    public void oneway(Object command) throws IOException {
        this.checkStarted();
        this.wireFormat.marshal(command, this.dataOut);
        this.dataOut.flush();
    }

因为不需要回调,所以在这里直接就返回了,异步去IO。接下来看看同步发送消息的,可以看到走的是request方法而不是oneway

    public Response syncSendPacket(Command command, int timeout) throws JMSException {
        if (this.isClosed()) {
            throw new ConnectionClosedException();
        } else {
            try {
                Response response = (Response)((Response)(timeout > 0 ? this.transport.request(command, timeout) : this.transport.request(command)));
                if (response.isException()) {
                    ExceptionResponse er = (ExceptionResponse)response;
                    if (er.getException() instanceof JMSException) {
                        throw (JMSException)er.getException();
                    }

                    if (this.isClosed() || this.closing.get()) {
                        LOG.debug("Received an exception but connection is closing");
                    }

                    JMSException jmsEx = null;

                    try {
                        jmsEx = JMSExceptionSupport.create(er.getException());
                    } catch (Throwable var8) {
                        LOG.error("Caught an exception trying to create a JMSException for " + er.getException(), var8);
                    }

                    if (er.getException() instanceof SecurityException && command instanceof ConnectionInfo) {
                        try {
                            this.forceCloseOnSecurityException(er.getException());
                        } catch (Throwable var7) {
                        }
                    }

                    if (jmsEx != null) {
                        throw jmsEx;
                    }
                }

                return response;
            } catch (IOException var9) {
                throw JMSExceptionSupport.create(var9);
            }
        }
    }

那么进入ResponseCorrelator的request方法,发现实际也是使用asyncRequest异步IO发送,将这个future加入一个内部map,利用Future模型等待结果

    public Object request(Object command, int timeout) throws IOException {
        FutureResponse response = this.asyncRequest(command, (ResponseCallback)null);
        return response.getResult(timeout);
    }
    public FutureResponse asyncRequest(Object o, ResponseCallback responseCallback) throws IOException {
        Command command = (Command)o;
        command.setCommandId(this.sequenceGenerator.getNextSequenceId());
        command.setResponseRequired(true);
        FutureResponse future = new FutureResponse(responseCallback, this);
        IOException priorError = null;
        synchronized(this.requestMap) {
            priorError = this.error;
            if (priorError == null) {
                this.requestMap.put(new Integer(command.getCommandId()), future);
            }
        }

        if (priorError != null) {
            future.set(new ExceptionResponse(priorError));
            throw priorError;
        } else {
            this.next.oneway(command);
            return future;
        }
    }

回调处理

Future模型收到回调的逻辑就得回到前面我们讲到的OnCommand方法了,前面讲到,在connection调用start后,外面几个Transport都没干事情,直到TcpTransport的start方法,该start方法会完成连接,并开启一个线程持续调用doRun方法,doRun方法调用readCommand读取收到的服务端发来的命令消息,然后调用doConsume方法,doConsume方法在父类TransportSupport中实现,通过transportListener去处理命令

    //TransportSupport.class
    public void doConsume(Object command) {
        if (command != null) {
            if (this.transportListener != null) {
                this.transportListener.onCommand(command);
            } else {
                LOG.error("No transportListener available to process inbound command: " + command);
            }
        }
    }

因为这时TcpTransport的transportListener也就是WriteTimeoutFilter,因为WriteTimeoutFilter没有重写,所以直接走TransportFilter中的实现,继续调用自己的transportListener的onCommand方法

    //TransportFilter.class
    public void onCommand(Object command) {
        this.transportListener.onCommand(command);
    }

这时就到了MutexTransport中的onCommand方法

    public void onCommand(Object command) {
        if (this.syncOnCommand) {
            this.writeLock.lock();

            try {
                this.transportListener.onCommand(command);
            } finally {
                this.writeLock.unlock();
            }
        } else {
            this.transportListener.onCommand(command);
        }
    }

先是判断是否配置了同步的处理回调,如果是就要先拿写锁的锁,最终进入MutexTransport的transportListener的onCommand方法,此时就来到了Transport最外层的ResponseCorrelator的onCommand方法

    //ResponseCorrelator.class
    //key : commandId
    private final Map<Integer, FutureResponse> requestMap;
    public void onCommand(Object o) {
        Command command = null;
        if (o instanceof Command) {
            command = (Command)o;
            if (command.isResponse()) {
                Response response = (Response)command;
                FutureResponse future = null;
                synchronized(this.requestMap) {
                    future = (FutureResponse)this.requestMap.remove(response.getCorrelationId());
                }

                if (future != null) {
                    future.set(response);
                } else if (this.debug) {
                    LOG.debug("Received unexpected response: {" + command + "}for command id: " + response.getCorrelationId());
                }
            } else {
                this.getTransportListener().onCommand(command);
            }

        } else {
            throw new ClassCastException("Object cannot be converted to a Command,  Object: " + o);
        }
    }

ResponseCorrelator内部维护了一个存储每个待响应request的集合,再收到服务端消息进入onCommand方法后,根据id获得对应command的future,对该Future进行返回值设置和解锁处理,然后再进入ActiveMQConnection的onCommand方法

    public void onCommand(Object o) {
        final Command command = (Command)o;
        if (!this.closed.get() && command != null) {
            try {
                command.visit(new CommandVisitorAdapter() {
                    public Response processMessageDispatch(MessageDispatch md) throws Exception {
                        ActiveMQConnection.this.waitForTransportInterruptionProcessingToComplete();
                        ActiveMQDispatcher dispatcher = (ActiveMQDispatcher)ActiveMQConnection.this.dispatchers.get(md.getConsumerId());
                        if (dispatcher != null) {
                            Message msg = md.getMessage();
                            if (msg != null) {
                                msg = msg.copy();
                                msg.setReadOnlyBody(true);
                                msg.setReadOnlyProperties(true);
                                msg.setRedeliveryCounter(md.getRedeliveryCounter());
                                msg.setConnection(ActiveMQConnection.this);
                                msg.setMemoryUsage((MemoryUsage)null);
                                md.setMessage(msg);
                            }

                            dispatcher.dispatch(md);
                        } else {
                            ActiveMQConnection.LOG.debug("{} no dispatcher for {} in {}", new Object[]{this, md, ActiveMQConnection.this.dispatchers});
                        }

                        return null;
                    }

                    public Response processProducerAck(ProducerAck pa) throws Exception {
                        if (pa != null && pa.getProducerId() != null) {
                            ActiveMQMessageProducer producer = (ActiveMQMessageProducer)ActiveMQConnection.this.producers.get(pa.getProducerId());
                            if (producer != null) {
                                producer.onProducerAck(pa);
                            }
                        }

                        return null;
                    }

                    public Response processBrokerInfo(BrokerInfo info) throws Exception {
                        ActiveMQConnection.this.brokerInfo = info;
                        ActiveMQConnection.this.brokerInfoReceived.countDown();
                        ActiveMQConnection.this.optimizeAcknowledge = ActiveMQConnection.this.optimizeAcknowledge & !ActiveMQConnection.this.brokerInfo.isFaultTolerantConfiguration();
                        ActiveMQConnection.this.getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl());
                        return null;
                    }

                    public Response processConnectionError(final ConnectionError error) throws Exception {
                        ActiveMQConnection.this.executor.execute(new Runnable() {
                            public void run() {
                                ActiveMQConnection.this.onAsyncException(error.getException());
                            }
                        });
                        return null;
                    }

                    public Response processControlCommand(ControlCommand commandx) throws Exception {
                        return null;
                    }

                    public Response processConnectionControl(ConnectionControl control) throws Exception {
                        ActiveMQConnection.this.onConnectionControl((ConnectionControl)command);
                        return null;
                    }

                    public Response processConsumerControl(ConsumerControl control) throws Exception {
                        ActiveMQConnection.this.onConsumerControl((ConsumerControl)command);
                        return null;
                    }

                    public Response processWireFormat(WireFormatInfo info) throws Exception {
                        ActiveMQConnection.this.onWireFormatInfo((WireFormatInfo)command);
                        return null;
                    }
                });
            } catch (Exception var5) {
                this.onClientInternalException(var5);
            }
        }

        Iterator iter = this.transportListeners.iterator();

        while(iter.hasNext()) {
            TransportListener listener = (TransportListener)iter.next();
            listener.onCommand(command);
        }

    }

这里使用了适配器模式,传入一个CommandVisitor,visit方法在根据Command是什么具体实现类使用不同实现去决定调用什么方法来处理命令(Command是tcp传输回来消息数据反序列化包装出来的对象,可能是各种类型的消息(包括broker连接,主题删除等,ack处理)),最后的循环主要是如果还设置了一些别的处理TransportListener,则会继续执行对应的onCommand方法(扩展接口) 所以ActiveMQ的完整发送消息的流程如下:

activeMQ流程

本文分享自微信公众号 - 歪歪梯Club(bcbkmgzh),作者:歪歪梯

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-05-28

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • ActiveMQ源码分析——消费消息

    请先查看上一篇分析生产消息源码的博客之后再查看本篇 先看看本博客把consumer端分析后完整的activemq流程图

    歪歪梯
  • 从majority voting解读Raft

    Raft是一个分布式一致性协调算法,采用状态机复制方法和majority voting选举算法。

    歪歪梯
  • 并发基石-Markword与锁升级

    synchronized关键字是java提供的互斥锁关键字,我们常说的互斥锁一般都是非自旋锁,即竞争不到锁的线程会进入阻塞状态知道被唤醒 今天我们来讲讲java...

    歪歪梯
  • DedeCMS后台500错误一种原因是不支持PHP5.3、5.4及以上版本

      我们在迁移网站的时候,可能会出现DedeCMS后台500错误,有可能是因为dedecms不支持PHP5.3、5.4及以上版本,这时我们要改动一些设置才能修复...

    ytkah
  • Flutter第4天--基础控件(下)+Flex布局详解

    张风捷特烈
  • Flutter开发:TextField常用属性的使用

    在flutter开发过程中,掌握常用组件的使用是必备技能,flutter常用的组件和App开发时候常用的控件基本一模一样,只是使用的方式不一样罢了。

    三掌柜
  • TRTC学习之旅(三)-- 使用vue+ts集成互动直播

    上次我们已经用vue+ts实现了多人会议室的搭建,这次我们继续在上次项目的基础上,实现互动直播功能。

    黑眼圈云豆
  • 冬天到了,分享两款雪花特效代码

    小小鱼儿小小林
  • ActiveMQ源码分析——消费消息

    请先查看上一篇分析生产消息源码的博客之后再查看本篇 先看看本博客把consumer端分析后完整的activemq流程图

    歪歪梯
  • JavaScript this的指向

    在 JavaScript 中 this 取什么值,是在函数真正被调用执行的时候确定的,函数定义的时候确定不了,因为 this 的取值是执行上下文环境的一部分,每...

    Nian糕

扫码关注云+社区

领取腾讯云代金券