前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ActiveMQ源码分析——生产消息

ActiveMQ源码分析——生产消息

作者头像
歪歪梯
发布2020-06-19 16:20:17
1.2K0
发布2020-06-19 16:20:17
举报
文章被收录于专栏:歪歪梯Club歪歪梯Club

分析结果

先看看最终由本博客分析后绘制出来的总体流程结构图

activeMQ流程

创建连接

         //1、创建工厂连接对象,需要制定ip和端口号
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");
        //2、使用连接工厂创建一个连接对象
        Connection connection = connectionFactory.createConnection();
        //3、开启连接
        connection.start();
        //4、使用连接对象创建会话(session)对象
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

创建Session时,第一个传入是否开启事务,第二个传入session提交消费消息的方式 接下来看源码处理,生产者id对象由当前sessionID加上使用内部session序列号生成器(long类型自增)生成的id组合构成

public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
        this.checkClosedOrFailed();
        this.ensureConnectionInfoSent();
        if (!transacted) {
            if (acknowledgeMode == 0) {
                throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session");
            }

            if (acknowledgeMode < 0 || acknowledgeMode > 4) {
                throw new JMSException("...异常信息很长,省略了");
            }
        }
        return new ActiveMQSession(this, this.getNextSessionId(), transacted ? 0 : acknowledgeMode, this.isDispatchAsync(), this.isAlwaysSessionAsync());
    }
    protected ProducerId getNextProducerId() {
        return new ProducerId(this.info.getSessionId(), this.producerIdGenerator.getNextSequenceId());
    }

最终调用ActiveMQSession的构造方法 new ActiveMQSession(this, this.getNextSessionId(), transacted ? 0 : acknowledgeMode, this.isDispatchAsync(), this.isAlwaysSessionAsync()) 也就是说,如果传入了开启事务,使用ack为0的模式创建session,否则使用给进来的ack参数,根据不同ack,使用不同的提交确认策略

AUTO_ACKNOWLEDGE = 1 自动确认 CLIENT_ACKNOWLEDGE = 2 客户端手动确认 DUPS_OK_ACKNOWLEDGE = 3 自动批量确认 SESSION_TRANSACTED = 0 事务提交并确认 INDIVIDUAL_ACKNOWLEDGE = 4 单条消息确认

翻阅了session里面无果,决定先从connection和connectionFactory入手,factory中也没开启发送相关的参数,进入connection源码查看 Connection connection = connectionFactory.createConnection(); 最终会create一个Transport 对象,并调用他的start方法(有内味了),继续

protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException {
        if (this.brokerURL == null) {
            throw new ConfigurationException("brokerURL not set.");
        } else {
            ActiveMQConnection connection = null;

            try {
                Transport transport = this.createTransport();
                connection = this.createActiveMQConnection(transport, this.factoryStats);
                connection.setUserName(userName);
                connection.setPassword(password);
                this.configureConnection(connection);
                transport.start();
                if (this.clientID != null) {
                    connection.setDefaultClientID(this.clientID);
                }

                return connection;
            } catch (JMSException var8) {
                try {
                    connection.close();
                } catch (Throwable var6) {
                }

                throw var8;
            } catch (Exception var9) {
                try {
                    connection.close();
                } catch (Throwable var7) {
                }

                throw JMSExceptionSupport.create("Could not connect to broker URL: " + this.brokerURL + ". Reason: " + var9, var9);
            }
        }
    }
    protected Transport createTransport() throws JMSException {
        try {
            URI connectBrokerUL = this.brokerURL;
            String scheme = this.brokerURL.getScheme();
            if (scheme == null) {
                throw new IOException("Transport not scheme specified: [" + this.brokerURL + "]");
            } else {
                if (scheme.equals("auto")) {
                    connectBrokerUL = new URI(this.brokerURL.toString().replace("auto", "tcp"));
                } else if (scheme.equals("auto+ssl")) {
                    connectBrokerUL = new URI(this.brokerURL.toString().replace("auto+ssl", "ssl"));
                } else if (scheme.equals("auto+nio")) {
                    connectBrokerUL = new URI(this.brokerURL.toString().replace("auto+nio", "nio"));
                } else if (scheme.equals("auto+nio+ssl")) {
                    connectBrokerUL = new URI(this.brokerURL.toString().replace("auto+nio+ssl", "nio+ssl"));
                }

                return TransportFactory.connect(connectBrokerUL);
            }
        } catch (Exception var3) {
            throw JMSExceptionSupport.create("Could not create Transport. Reason: " + var3, var3);
        }
    }

先根据传进来的broker的url解析成新的url,接着调用TransportFactory.connect(connectBrokerUL),在里面根据url解析得到一个wireFormat对象(主要是对流的字节数据的解析协议,比如有mqtt协议的wireformat,有amqp协议的wireformat),接着使用wireformat和broker的url信息创建一个Transport

Transport包装

public static Transport connect(URI location) throws Exception {
        TransportFactory tf = findTransportFactory(location);
        return tf.doConnect(location);
    }
public Transport doConnect(URI location) throws Exception {
        try {
            Map<String, String> options = new HashMap(URISupport.parseParameters(location));
            if (!options.containsKey("wireFormat.host")) {
                options.put("wireFormat.host", location.getHost());
            }

            WireFormat wf = this.createWireFormat(options);
            Transport transport = this.createTransport(location, wf);
            Transport rc = this.configure(transport, wf, options);
            IntrospectionSupport.extractProperties(options, "auto.");
            if (!options.isEmpty()) {
                throw new IllegalArgumentException("Invalid connect parameters: " + options);
            } else {
                return rc;
            }
        } catch (URISyntaxException var6) {
            throw IOExceptionSupport.create(var6);
        }
    }

源码中使用了装饰器模式,首先,使用WireFormat创建一个Transport

//TcpTransportFactory.class
protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException {
        URI localLocation = null;
        String path = location.getPath();
        if (path != null && path.length() > 0) {
            int localPortIndex = path.indexOf(58);

            try {
                Integer.parseInt(path.substring(localPortIndex + 1, path.length()));
                String localString = location.getScheme() + ":/" + path;
                localLocation = new URI(localString);
            } catch (Exception var7) {
                LOG.warn("path isn't a valid local location for TcpTransport to use", var7.getMessage());
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Failure detail", var7);
                }
            }
        }

        SocketFactory socketFactory = this.createSocketFactory();
        return this.createTcpTransport(wf, socketFactory, location, localLocation);
    }
    protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
        return new TcpTransport(wf, socketFactory, location, localLocation);
    }

最终构建了一个TcpTransport,TcpTransport的构造方法中创建了一个socket(哪里跑),此处传进来的SocketFactory是在TcpTransportFactory中创建的,使用的是DefaultSocketFactory

 public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
        this.connectionTimeout = 30000;
        this.socketBufferSize = 65536;
        this.ioBufferSize = 8192;
        this.closeAsync = true;
        this.buffOut = null;
        this.trafficClass = 0;
        this.trafficClassSet = false;
        this.diffServChosen = false;
        this.typeOfServiceChosen = false;
        this.trace = false;
        this.logWriterName = TransportLoggerSupport.defaultLogWriterName;
        this.dynamicManagement = false;
        this.startLogging = true;
        this.jmxPort = 1099;
        this.useLocalHost = false;
        this.stoppedLatch = new AtomicReference();
        this.soLinger = -2147483648;
        this.wireFormat = wireFormat;
        this.socketFactory = socketFactory;

        try {
            this.socket = socketFactory.createSocket();
        } catch (SocketException var6) {
            this.socket = null;
        }

        this.remoteLocation = remoteLocation;
        this.localLocation = localLocation;
        this.initBuffer = null;
        this.setDaemon(false);
    }

在DefaultSocketFactory中,创建方法就是单纯实例化一个socket

    DefaultSocketFactory() {
    }

    public Socket createSocket() {
        return new Socket();
    }

回到TransportFactory的方法Connect,这时往下

public Transport doConnect(URI location) throws Exception {
        try {
            Map<String, String> options = new HashMap(URISupport.parseParameters(location));
            if (!options.containsKey("wireFormat.host")) {
                options.put("wireFormat.host", location.getHost());
            }

            WireFormat wf = this.createWireFormat(options);
            Transport transport = this.createTransport(location, wf);
            Transport rc = this.configure(transport, wf, options);
            IntrospectionSupport.extractProperties(options, "auto.");
            if (!options.isEmpty()) {
                throw new IllegalArgumentException("Invalid connect parameters: " + options);
            } else {
                return rc;
            }
        } catch (URISyntaxException var6) {
            throw IOExceptionSupport.create(var6);
        }
    }

继续执行this.configure(transport, wf, options);

public Transport configure(Transport transport, WireFormat wf, Map options) throws Exception {
        transport = this.compositeConfigure(transport, wf, options);
        transport = new MutexTransport(transport);
        transport = new ResponseCorrelator(transport);
        return transport;
    }

发现由对刚刚生成的TcpTransport进行三次包装 第一层包装加入TcpTransport发送数据超时的控制过滤器

public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
        if (options.containsKey("soWriteTimeout")) {
            transport = new WriteTimeoutFilter((Transport)transport);
            String soWriteTimeout = (String)options.remove("soWriteTimeout");
            if (soWriteTimeout != null) {
                ((WriteTimeoutFilter)transport).setWriteTimeout(Long.parseLong(soWriteTimeout));
            }
        }

        IntrospectionSupport.setProperties(transport, options);
        return (Transport)transport;
    }

看看WriteTimeoutFilter源码,其存在一个静态成员,完成了加载一个TimeoutThread,并且该类时线程子类,并在构造中完成了对自己的启动

    public class WriteTimeoutFilter extends TransportFilter {
        protected static ConcurrentLinkedQueue<WriteTimeoutFilter> writers = new ConcurrentLinkedQueue();
        protected static WriteTimeoutFilter.TimeoutThread timeoutThread = new WriteTimeoutFilter.TimeoutThread();
        public WriteTimeoutFilter(Transport next) {
            super(next);
        }
        protected static class TimeoutThread extends Thread {
        static AtomicInteger instance = new AtomicInteger(0);
        boolean run = true;

        public TimeoutThread() {
            this.setName("WriteTimeoutFilter-Timeout-" + instance.incrementAndGet());
            this.setDaemon(true);
            this.setPriority(1);
            this.start();
        }

        public void run() {
            while(this.run) {
                boolean error = false;

                try {
                    if (!interrupted()) {
                        Iterator filters = WriteTimeoutFilter.writers.iterator();

                        label48:
                        while(true) {
                            WriteTimeoutFilter filter;
                            do {
                                if (!this.run || !filters.hasNext()) {
                                    break label48;
                                }

                                filter = (WriteTimeoutFilter)filters.next();
                            } while(filter.getWriteTimeout() <= 0L);

                            long writeStart = filter.getWriter().getWriteTimestamp();
                            long delta = filter.getWriter().isWriting() && writeStart > 0L ? System.currentTimeMillis() - writeStart : -1L;
                            if (delta > filter.getWriteTimeout()) {
                                WriteTimeoutFilter.deRegisterWrite(filter, true, (IOException)null);
                            }
                        }
                    }

                    try {
                        Thread.sleep(WriteTimeoutFilter.getSleep());
                        error = false;
                    } catch (InterruptedException var8) {
                    }
                } catch (Throwable var9) {
                    if (!error) {
                        WriteTimeoutFilter.LOG.error("WriteTimeout thread unable validate existing sockets.", var9);
                        error = true;
                    }
                }
            }

        }
    }
    }

可以看到WriteTimeoutFilter内部维护了一个ConcurrentLinkedQueue保存所有构造的WriteTimeoutFilter对象,在启动的TimeoutThread中,循环迭代检测每个WriteTimeoutFilter对象的输出流是否已经超时,其时通过next.narrow(TimeStampStream.class),也就是TcpTransport中给方法获得一个TimeStampStream对象,从而获得其开始输出的时间

    protected TimeStampStream getWriter() {
        return (TimeStampStream)this.next.narrow(TimeStampStream.class);
    }
    //TcpTransport.class
    public <T> T narrow(Class<T> target) {
        if (target == Socket.class) {
            return target.cast(this.socket);
        } else {
            return target == TimeStampStream.class ? target.cast(this.buffOut) : super.narrow(target);
        }
    }

如果超时了就会调用deRegisterWrite方法,将WriteTimeoutFilter的socker关闭并从维护的内部集合中删除

    protected Socket getSocket() {
        return (Socket)this.next.narrow(Socket.class);
    }
    protected static boolean deRegisterWrite(WriteTimeoutFilter filter, boolean fail, IOException iox) {
        boolean result = writers.remove(filter);
        if (result && fail) {
            String message = "Forced write timeout for:" + filter.getNext().getRemoteAddress();
            LOG.warn(message);
            Socket sock = filter.getSocket();
            if (sock == null) {
                LOG.error("Destination socket is null, unable to close socket.(" + message + ")");
            } else {
                try {
                    sock.close();
                } catch (IOException var7) {
                }
            }
        }

        return result;
    }

最终获取到的socket也就是使用TcpTransport中的narrow方法,方法中主要时做一下校验,没问题就返回真实的socket对象(也就是一开始构造时创建的)

    public <T> T narrow(Class<T> target) {
        if (target == Socket.class) {
            return target.cast(this.socket);
        } else {
            return target == TimeStampStream.class ? target.cast(this.buffOut) : super.narrow(target);
        }
    }
    @HotSpotIntrinsicCandidate
    public T cast(Object obj) {
        if (obj != null && !this.isInstance(obj)) {
            throw new ClassCastException(this.cannotCastMsg(obj));
        } else {
            return obj;
        }
    }

接下来看到构造函数中调用了super(next),看看其父类TransportFilter的代码

public class TransportFilter implements TransportListener, Transport {
    protected final Transport next;
    protected TransportListener transportListener;

    public TransportFilter(Transport next) {
        this.next = next;
    }

    public TransportListener getTransportListener() {
        return this.transportListener;
    }

    public void setTransportListener(TransportListener channelListener) {
        this.transportListener = channelListener;
        if (channelListener == null) {
            this.next.setTransportListener((TransportListener)null);
        } else {
            this.next.setTransportListener(this);
        }

    }

    public void start() throws Exception {
        if (this.next == null) {
            throw new IOException("The next channel has not been set.");
        } else if (this.transportListener == null) {
            throw new IOException("The command listener has not been set.");
        } else {
            this.next.start();
        }
    }

    public void onCommand(Object command) {
        this.transportListener.onCommand(command);
    }

    public void oneway(Object command) throws IOException {
        this.next.oneway(command);
    }
    .....
}

TransportFilter 其实是一个提供模板代码的父类,实现了Transport接口,其中聚合了一个Transport对象作为自己的next(这也符合其Filter的名称,类似过滤器链,带Filter就是带next),TransportFilter 对于接口的实现都是调用next对象的对应方法(因为TransportFilter 聚合的是下一个Transport ),所以真正的实现得看当前具体是TransportFilter的哪个实现,因此我们先记一下Transport 链条的顺序,待会再看是先执行什么方法,毫无疑问的,剩下两个包装类MutexTransport 和ResponseCorrelator 也是TransportFilter的子类(呜呜呜,只有TcpTransport是亲儿子,不用继承TransportFilter,因为传输层是最后的一个呀,不需要next了O(∩_∩)O)

    public class MutexTransport extends TransportFilter {
        private final ReentrantLock writeLock = new ReentrantLock();
        private boolean syncOnCommand;

        public MutexTransport(Transport next) {
            super(next);
            this.syncOnCommand = false;
        }
    }
    public class ResponseCorrelator extends TransportFilter {
        private final Map<Integer, FutureResponse> requestMap;
        private IntSequenceGenerator sequenceGenerator;
        private IOException error;
        public ResponseCorrelator(Transport next) {
            this(next, new IntSequenceGenerator());
        }
    }

最终包装下来的Transport就是如下图

Transport包装顺序

transport.start

回到一开始创建connection那里,发现接下来执行的就是transport的start方法

protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException {
        if (this.brokerURL == null) {
            throw new ConfigurationException("brokerURL not set.");
        } else {
            ActiveMQConnection connection = null;
             Transport transport = this.createTransport();
             connection = this.createActiveMQConnection(transport, this.factoryStats);
             connection.setUserName(userName);
             connection.setPassword(password);
             this.configureConnection(connection);
             transport.start();
             if (this.clientID != null) {
                 connection.setDefaultClientID(this.clientID);
             }
             return connection;
       }

这里有个大坑,先是通过createActiveMQConnection创建connecttion,其需要聚合transport,然后我们看connection的实际情况(居然丧心病狂的在构造里将connection的this作为transport的transportListener绑定,这波循环引用,害我找了好久的transportListener),同时TransportFilter的子类对象的setTransportListener方法部署简单的setter,同时会将自己作为next的TransportListener进行绑定(者代码实在难懂)

    //ActiveMQConnectionFactory.class
    protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception {
        ActiveMQConnection connection = new ActiveMQConnection(transport, this.getClientIdGenerator(), this.getConnectionIdGenerator(), stats);
        return connection;
    }
    //ActiveMQConnection.class
    protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats) throws Exception {
        this.maxThreadPoolSize = DEFAULT_THREAD_POOL_SIZE;
        this.rejectedTaskHandler = null;
        this.trustedPackages = new ArrayList();
        this.trustAllPackages = false;
        this.transport = transport;
        this.clientIdGenerator = clientIdGenerator;
        this.factoryStats = factoryStats;
        this.executor = new ThreadPoolExecutor(1, 1, 5L, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() {
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport);
                return thread;
            }
        });
        String uniqueId = connectionIdGenerator.generateId();
        this.info = new ConnectionInfo(new ConnectionId(uniqueId));
        this.info.setManageable(true);
        this.info.setFaultTolerant(transport.isFaultTolerant());
        this.connectionSessionId = new SessionId(this.info.getConnectionId(), -1L);
        this.transport.setTransportListener(this);
        this.stats = new JMSConnectionStatsImpl(this.sessions, this instanceof XAConnection);
        this.factoryStats.addConnection(this);
        this.timeCreated = System.currentTimeMillis();
        this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant());
    }
    //TransportFilter.class
    public void setTransportListener(TransportListener channelListener) {
        this.transportListener = channelListener;
        if (channelListener == null) {
            this.next.setTransportListener((TransportListener)null);
        } else {
            this.next.setTransportListener(this);
        }

    }

所以这时结构如下

Transport结构 按照包装顺序,此时的transport应该是ResponseCorrelator ,该类中没有该方法,代表这个TransportFilter在start时不需要额外操作,执行父类的start方法,直接调用next的start

    //TransportFilter.class
    public void start() throws Exception {
        if (this.next == null) {
            throw new IOException("The next channel has not been set.");
        } else if (this.transportListener == null) {
            throw new IOException("The command listener has not been set.");
        } else {
            this.next.start();
        }
    }

此时就到了MutexTransport,发现里面也没有start,也就是再次走父类TransportFilter的start,继续到next,也就是WriteTimeoutFilter

    public void start() throws Exception {
        super.start();
    }

    public void stop() throws Exception {
        super.stop();
    }

???黑人问号,这里也是没事情做,直接走父类TransportFilter的方法,就是不知道为啥在这里不省略这两个方法,反而是实现,结果还是调用super(秀技?),结果这时就来到TcpTransport了,因为TcpTransport继承了TransportThreadSupport,TransportThreadSupport又继承了TransportSupport,TransportSupport又继承了ServiceTransport,这几个类中唯一实现了start方法的是ServiceTransport,于是进入里面源码

    public void start() throws Exception {
        if (this.started.compareAndSet(false, true)) {
            boolean success = false;
            this.stopped.set(false);

            try {
                this.preStart();
                this.doStart();
                success = true;
            } finally {
                this.started.set(success);
            }

            Iterator var2 = this.serviceListeners.iterator();

            while(var2.hasNext()) {
                ServiceListener l = (ServiceListener)var2.next();
                l.started(this);
            }
        }

    }

也就是会先调用preStart方法(空实现),然后调用doStart方法,此时执行TcpTransport的doStart方法,完成连接到一开始经过ConnectionFactory传递到TransportFactory再到TcpTransport里的mqtt远程的地址和端口,完成对端口和流的初始化

    //TcpTransport.class
    protected void doStart() throws Exception {
        this.connect();
        this.stoppedLatch.set(new CountDownLatch(1));
        super.doStart();
    }
    protected void connect() throws Exception {
        if (this.socket == null && this.socketFactory == null) {
            throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set");
        } else {
            InetSocketAddress localAddress = null;
            InetSocketAddress remoteAddress = null;
            if (this.localLocation != null) {
                localAddress = new InetSocketAddress(InetAddress.getByName(this.localLocation.getHost()), this.localLocation.getPort());
            }

            if (this.remoteLocation != null) {
                String host = this.resolveHostName(this.remoteLocation.getHost());
                remoteAddress = new InetSocketAddress(host, this.remoteLocation.getPort());
            }

            this.trafficClassSet = this.setTrafficClass(this.socket);
            if (this.socket != null) {
                if (localAddress != null) {
                    this.socket.bind(localAddress);
                }

                if (remoteAddress != null) {
                    if (this.connectionTimeout >= 0) {
                        this.socket.connect(remoteAddress, this.connectionTimeout);
                    } else {
                        this.socket.connect(remoteAddress);
                    }
                }
            } else if (localAddress != null) {
                this.socket = this.socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(), localAddress.getAddress(), localAddress.getPort());
            } else {
                this.socket = this.socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort());
            }

            this.initialiseSocket(this.socket);
            this.initializeStreams();
        }
    }

接着再调用super.doStart方法也就是TransportThreadSupport中的方法,开启一个线程,将this作为runnable传递并运行

    protected void doStart() throws Exception {
        this.runner = new Thread((ThreadGroup)null, this, "ActiveMQ Transport: " + this.toString(), this.stackSize);
        this.runner.setDaemon(this.daemon);
        this.runner.start();
    }

因为当前是TcpTransport,所以线程的运行方法是TcpTransport中的run方法,在里面会循环调用doRun方法,根据一开始我们传递进来的协议格式(wireFormat)读取接收到的命令,调用doConsume方法去处理收到的服务端命令消息

    public void run() {
        LOG.trace("TCP consumer thread for " + this + " starting");
        this.runnerThread = Thread.currentThread();

        try {
            while(!this.isStopped()) {
                this.doRun();
            }
        } catch (IOException var7) {
            ((CountDownLatch)this.stoppedLatch.get()).countDown();
            this.onException(var7);
        } catch (Throwable var8) {
            ((CountDownLatch)this.stoppedLatch.get()).countDown();
            IOException ioe = new IOException("Unexpected error occurred: " + var8);
            ioe.initCause(var8);
            this.onException(ioe);
        } finally {
            ((CountDownLatch)this.stoppedLatch.get()).countDown();
        }

    }
    protected void doRun() throws IOException {
        try {
            Object command = this.readCommand();
            this.doConsume(command);
        } catch (SocketTimeoutException var2) {
        } catch (InterruptedIOException var3) {
        }

    }

也就是说,此时会开启一个线程,持续读取服务端的命令消息,并处理该消息,因为这时服务端给到客户端的,先了解完客户端发送的逻辑再回来看这个,接下来就到了

    //6、使用会话对象创建生产者对象
    MessageProducer producer = session.createProducer(queue);
    //7、使用会话对象创建一个消息对象
    TextMessage textMessage = session.createTextMessage("hello!test-queue");
    //8、发送消息
    producer.send(textMessage);

发送消息

前面两个都是创建对象,没啥好说的,主要吧session聚合上了,看看发送消息源码

    //ActiveMQMessageProducerSupport.class
    private MemoryUsage producerWindow;
    public void send(Message message) throws JMSException {
        this.send(this.getDestination(), message, this.defaultDeliveryMode, this.defaultPriority, this.defaultTimeToLive);
    }
    public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, AsyncCallback onComplete) throws JMSException {
        this.checkClosed();
        if (destination == null) {
            if (this.info.getDestination() == null) {
                throw new UnsupportedOperationException("A destination must be specified.");
            } else {
                throw new InvalidDestinationException("Don't understand null destinations");
            }
        } else {
            ActiveMQDestination dest;
            if (destination.equals(this.info.getDestination())) {
                dest = (ActiveMQDestination)destination;
            } else {
                if (this.info.getDestination() != null) {
                    throw new UnsupportedOperationException("This producer can only send messages to: " + this.info.getDestination().getPhysicalName());
                }

                dest = ActiveMQDestination.transform(destination);
            }

            if (dest == null) {
                throw new JMSException("No destination specified");
            } else {
                if (this.transformer != null) {
                    Message transformedMessage = this.transformer.producerTransform(this.session, this, message);
                    if (transformedMessage != null) {
                        message = transformedMessage;
                    }
                }

                if (this.producerWindow != null) {
                    try {
                        this.producerWindow.waitForSpace();
                    } catch (InterruptedException var10) {
                        throw new JMSException("Send aborted due to thread interrupt.");
                    }
                }

                this.session.send(this, dest, message, deliveryMode, priority, timeToLive, this.producerWindow, this.sendTimeout, onComplete);
                this.stats.onMessage();
            }
        }
    }

首先调用checkClose检查会话状态,确保不会被关闭了,接着第一步主要是根据你用的Destination是什么?(queue\topic)进行了一下转化为通用的ActiveMQDestination,如果有指定特定的transformer则再进行一下消息的producerTransform,这里没有跳过,接下来核心代码就是

        if (this.producerWindow != null) {
                  try {
                      this.producerWindow.waitForSpace();
                  } catch (InterruptedException var10) {
                      throw new JMSException("Send aborted due to thread interrupt.");
                  }
              }

              this.session.send(this, dest, message, deliveryMode, priority, timeToLive, this.producerWindow, this.sendTimeout, onComplete);
              this.stats.onMessage();

producerWindow是一个MemoryUsage的对象,主要是记录当前消息堆积了多少内存大小的空间,进行内存的控制,我们看看其部分代码

    public class MemoryUsage extends Usage<MemoryUsage> {
    private long usage;

    public void waitForSpace() throws InterruptedException {
        if (this.parent != null) {
            ((MemoryUsage)this.parent).waitForSpace();
        }

        this.usageLock.readLock().lock();

        try {
            if (this.percentUsage >= 100 && this.isStarted()) {
                this.usageLock.readLock().unlock();
                this.usageLock.writeLock().lock();

                try {
                    while(this.percentUsage >= 100 && this.isStarted()) {
                        this.waitForSpaceCondition.await();
                    }
                } finally {
                    this.usageLock.writeLock().unlock();
                    this.usageLock.readLock().lock();
                }
            }

            if (this.percentUsage >= 100 && !this.isStarted()) {
                throw new InterruptedException("waitForSpace stopped during wait.");
            }
        } finally {
            this.usageLock.readLock().unlock();
        }

    }
   }

其内部维护一个long类型的成员代表占用内存的字节大小,waitForSpace方法获得字节的读锁进行加锁,判断如果当前使用的内存超过限定内存的100%则改为使用写锁,释放读锁获取写锁,一直等待直到使用的内存小于100%(有空闲可用),此处之所以是>=是因为,没办法刚好处理到100%,可能到了99%,多加一条消息,消息大小超过1%的内存,总而言之,waitForSpace是在等待有空闲的内存,防止消息堆积过多,接下来就进入发送了this.session.send(this, dest, message, deliveryMode, priority, timeToLive, this.producerWindow, this.sendTimeout, onComplete);

    //ActiveMQSession.class
     protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive, MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException {
        this.checkClosed();
        if (destination.isTemporary() && this.connection.isDeleted(destination)) {
            throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination);
        } else {
            synchronized(this.sendMutex) {
                this.doStartTransaction();
                TransactionId txid = this.transactionContext.getTransactionId();
                long sequenceNumber = producer.getMessageSequence();
                message.setJMSDeliveryMode(deliveryMode);
                long expiration = 0L;
                if (!producer.getDisableMessageTimestamp()) {
                    long timeStamp = System.currentTimeMillis();
                    message.setJMSTimestamp(timeStamp);
                    if (timeToLive > 0L) {
                        expiration = timeToLive + timeStamp;
                    }
                }

                message.setJMSExpiration(expiration);
                message.setJMSPriority(priority);
                message.setJMSRedelivered(false);
                ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, this.connection);
                msg.setDestination(destination);
                msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
                if (msg != message) {
                    message.setJMSMessageID(msg.getMessageId().toString());
                    message.setJMSDestination(destination);
                }

                msg.setBrokerPath((BrokerId[])null);
                msg.setTransactionId(txid);
                if (this.connection.isCopyMessageOnSend()) {
                    msg = (ActiveMQMessage)msg.copy();
                }

                msg.setConnection(this.connection);
                msg.onSend();//将消息变为只读
                msg.setProducerId(msg.getMessageId().getProducerId());
                if (LOG.isTraceEnabled()) {
                    LOG.trace(this.getSessionId() + " sending message: " + msg);
                }

                if (onComplete != null || sendTimeout > 0 || msg.isResponseRequired() || this.connection.isAlwaysSyncSend() || msg.isPersistent() && !this.connection.isUseAsyncSend() && txid == null) {
                    if (sendTimeout > 0 && onComplete == null) {
                        this.connection.syncSendPacket(msg, sendTimeout);
                    } else {
                        this.connection.syncSendPacket(msg, onComplete);
                    }
                } else {
                    this.connection.asyncSendPacket(msg);
                    if (producerWindow != null) {
                        int size = msg.getSize();
                        producerWindow.increaseUsage((long)size);
                    }
                }

            }
        }
    }

可以看到,发送都是会加锁,锁是session里面的sendMutex对象,也就是session是同步处理消息的,然后根据一些配置,选择syncSendPacket(同步发送)或者asyncSendPacket(异步发送),如果使用了异步发送并且有限制堆积的消息大小,此时会维护producerWindow已经使用的内存大小(即堆积的待发送完成消息大小),先看看异步发送吧

    //ActiveMQConnection.class
    public void asyncSendPacket(Command command) throws JMSException {
        if (this.isClosed()) {
            throw new ConnectionClosedException();
        } else {
            this.doAsyncSendPacket(command);
        }
    }
    private void doAsyncSendPacket(Command command) throws JMSException {
        try {
            this.transport.oneway(command);
        } catch (IOException var3) {
            throw JMSExceptionSupport.create(var3);
        }
    }

前面讲过,最外层第一个transport应该是ResponseCorrelator,查看其oneway方法

    public void oneway(Object o) throws IOException {
        Command command = (Command)o;
        command.setCommandId(this.sequenceGenerator.getNextSequenceId());
        command.setResponseRequired(false);
        this.next.oneway(command);
    }

标记了不需要回调,然后进入下一个oneway,也就是MutexTransport,MutexTransport中主要是对其加了一个MutexTransport的写锁,也就是保证了同个connection下消息的同步控制

 public void oneway(Object command) throws IOException {
        this.writeLock.lock();

        try {
            this.next.oneway(command);
        } finally {
            this.writeLock.unlock();
        }

    }

接下来到了WriteTimeoutFilter中,前面讲过其维护一个内部集合,并开启一个线程,定期检查该集合中的WriteTimeoutFilter是否超时,此时就是将当前消息加入该集合,再调用下一个oneway(super的实现)

    public void oneway(Object command) throws IOException {
        try {
            registerWrite(this);
            super.oneway(command);
        } catch (IOException var6) {
            throw var6;
        } finally {
            deRegisterWrite(this, false, (IOException)null);
        }

    }

这时就来的TcpTransport了,进行数据的传输

    public void oneway(Object command) throws IOException {
        this.checkStarted();
        this.wireFormat.marshal(command, this.dataOut);
        this.dataOut.flush();
    }

因为不需要回调,所以在这里直接就返回了,异步去IO。接下来看看同步发送消息的,可以看到走的是request方法而不是oneway

    public Response syncSendPacket(Command command, int timeout) throws JMSException {
        if (this.isClosed()) {
            throw new ConnectionClosedException();
        } else {
            try {
                Response response = (Response)((Response)(timeout > 0 ? this.transport.request(command, timeout) : this.transport.request(command)));
                if (response.isException()) {
                    ExceptionResponse er = (ExceptionResponse)response;
                    if (er.getException() instanceof JMSException) {
                        throw (JMSException)er.getException();
                    }

                    if (this.isClosed() || this.closing.get()) {
                        LOG.debug("Received an exception but connection is closing");
                    }

                    JMSException jmsEx = null;

                    try {
                        jmsEx = JMSExceptionSupport.create(er.getException());
                    } catch (Throwable var8) {
                        LOG.error("Caught an exception trying to create a JMSException for " + er.getException(), var8);
                    }

                    if (er.getException() instanceof SecurityException && command instanceof ConnectionInfo) {
                        try {
                            this.forceCloseOnSecurityException(er.getException());
                        } catch (Throwable var7) {
                        }
                    }

                    if (jmsEx != null) {
                        throw jmsEx;
                    }
                }

                return response;
            } catch (IOException var9) {
                throw JMSExceptionSupport.create(var9);
            }
        }
    }

那么进入ResponseCorrelator的request方法,发现实际也是使用asyncRequest异步IO发送,将这个future加入一个内部map,利用Future模型等待结果

    public Object request(Object command, int timeout) throws IOException {
        FutureResponse response = this.asyncRequest(command, (ResponseCallback)null);
        return response.getResult(timeout);
    }
    public FutureResponse asyncRequest(Object o, ResponseCallback responseCallback) throws IOException {
        Command command = (Command)o;
        command.setCommandId(this.sequenceGenerator.getNextSequenceId());
        command.setResponseRequired(true);
        FutureResponse future = new FutureResponse(responseCallback, this);
        IOException priorError = null;
        synchronized(this.requestMap) {
            priorError = this.error;
            if (priorError == null) {
                this.requestMap.put(new Integer(command.getCommandId()), future);
            }
        }

        if (priorError != null) {
            future.set(new ExceptionResponse(priorError));
            throw priorError;
        } else {
            this.next.oneway(command);
            return future;
        }
    }

回调处理

Future模型收到回调的逻辑就得回到前面我们讲到的OnCommand方法了,前面讲到,在connection调用start后,外面几个Transport都没干事情,直到TcpTransport的start方法,该start方法会完成连接,并开启一个线程持续调用doRun方法,doRun方法调用readCommand读取收到的服务端发来的命令消息,然后调用doConsume方法,doConsume方法在父类TransportSupport中实现,通过transportListener去处理命令

    //TransportSupport.class
    public void doConsume(Object command) {
        if (command != null) {
            if (this.transportListener != null) {
                this.transportListener.onCommand(command);
            } else {
                LOG.error("No transportListener available to process inbound command: " + command);
            }
        }
    }

因为这时TcpTransport的transportListener也就是WriteTimeoutFilter,因为WriteTimeoutFilter没有重写,所以直接走TransportFilter中的实现,继续调用自己的transportListener的onCommand方法

    //TransportFilter.class
    public void onCommand(Object command) {
        this.transportListener.onCommand(command);
    }

这时就到了MutexTransport中的onCommand方法

    public void onCommand(Object command) {
        if (this.syncOnCommand) {
            this.writeLock.lock();

            try {
                this.transportListener.onCommand(command);
            } finally {
                this.writeLock.unlock();
            }
        } else {
            this.transportListener.onCommand(command);
        }
    }

先是判断是否配置了同步的处理回调,如果是就要先拿写锁的锁,最终进入MutexTransport的transportListener的onCommand方法,此时就来到了Transport最外层的ResponseCorrelator的onCommand方法

    //ResponseCorrelator.class
    //key : commandId
    private final Map<Integer, FutureResponse> requestMap;
    public void onCommand(Object o) {
        Command command = null;
        if (o instanceof Command) {
            command = (Command)o;
            if (command.isResponse()) {
                Response response = (Response)command;
                FutureResponse future = null;
                synchronized(this.requestMap) {
                    future = (FutureResponse)this.requestMap.remove(response.getCorrelationId());
                }

                if (future != null) {
                    future.set(response);
                } else if (this.debug) {
                    LOG.debug("Received unexpected response: {" + command + "}for command id: " + response.getCorrelationId());
                }
            } else {
                this.getTransportListener().onCommand(command);
            }

        } else {
            throw new ClassCastException("Object cannot be converted to a Command,  Object: " + o);
        }
    }

ResponseCorrelator内部维护了一个存储每个待响应request的集合,再收到服务端消息进入onCommand方法后,根据id获得对应command的future,对该Future进行返回值设置和解锁处理,然后再进入ActiveMQConnection的onCommand方法

    public void onCommand(Object o) {
        final Command command = (Command)o;
        if (!this.closed.get() && command != null) {
            try {
                command.visit(new CommandVisitorAdapter() {
                    public Response processMessageDispatch(MessageDispatch md) throws Exception {
                        ActiveMQConnection.this.waitForTransportInterruptionProcessingToComplete();
                        ActiveMQDispatcher dispatcher = (ActiveMQDispatcher)ActiveMQConnection.this.dispatchers.get(md.getConsumerId());
                        if (dispatcher != null) {
                            Message msg = md.getMessage();
                            if (msg != null) {
                                msg = msg.copy();
                                msg.setReadOnlyBody(true);
                                msg.setReadOnlyProperties(true);
                                msg.setRedeliveryCounter(md.getRedeliveryCounter());
                                msg.setConnection(ActiveMQConnection.this);
                                msg.setMemoryUsage((MemoryUsage)null);
                                md.setMessage(msg);
                            }

                            dispatcher.dispatch(md);
                        } else {
                            ActiveMQConnection.LOG.debug("{} no dispatcher for {} in {}", new Object[]{this, md, ActiveMQConnection.this.dispatchers});
                        }

                        return null;
                    }

                    public Response processProducerAck(ProducerAck pa) throws Exception {
                        if (pa != null && pa.getProducerId() != null) {
                            ActiveMQMessageProducer producer = (ActiveMQMessageProducer)ActiveMQConnection.this.producers.get(pa.getProducerId());
                            if (producer != null) {
                                producer.onProducerAck(pa);
                            }
                        }

                        return null;
                    }

                    public Response processBrokerInfo(BrokerInfo info) throws Exception {
                        ActiveMQConnection.this.brokerInfo = info;
                        ActiveMQConnection.this.brokerInfoReceived.countDown();
                        ActiveMQConnection.this.optimizeAcknowledge = ActiveMQConnection.this.optimizeAcknowledge & !ActiveMQConnection.this.brokerInfo.isFaultTolerantConfiguration();
                        ActiveMQConnection.this.getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl());
                        return null;
                    }

                    public Response processConnectionError(final ConnectionError error) throws Exception {
                        ActiveMQConnection.this.executor.execute(new Runnable() {
                            public void run() {
                                ActiveMQConnection.this.onAsyncException(error.getException());
                            }
                        });
                        return null;
                    }

                    public Response processControlCommand(ControlCommand commandx) throws Exception {
                        return null;
                    }

                    public Response processConnectionControl(ConnectionControl control) throws Exception {
                        ActiveMQConnection.this.onConnectionControl((ConnectionControl)command);
                        return null;
                    }

                    public Response processConsumerControl(ConsumerControl control) throws Exception {
                        ActiveMQConnection.this.onConsumerControl((ConsumerControl)command);
                        return null;
                    }

                    public Response processWireFormat(WireFormatInfo info) throws Exception {
                        ActiveMQConnection.this.onWireFormatInfo((WireFormatInfo)command);
                        return null;
                    }
                });
            } catch (Exception var5) {
                this.onClientInternalException(var5);
            }
        }

        Iterator iter = this.transportListeners.iterator();

        while(iter.hasNext()) {
            TransportListener listener = (TransportListener)iter.next();
            listener.onCommand(command);
        }

    }

这里使用了适配器模式,传入一个CommandVisitor,visit方法在根据Command是什么具体实现类使用不同实现去决定调用什么方法来处理命令(Command是tcp传输回来消息数据反序列化包装出来的对象,可能是各种类型的消息(包括broker连接,主题删除等,ack处理)),最后的循环主要是如果还设置了一些别的处理TransportListener,则会继续执行对应的onCommand方法(扩展接口) 所以ActiveMQ的完整发送消息的流程如下:

activeMQ流程

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-05-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 歪歪梯Club 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 分析结果
  • 创建连接
  • Transport包装
  • transport.start
  • 发送消息
  • 回调处理
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档