首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >在使用自动重新连接的IBM MQ客户端应用程序中检测重新连接。

在使用自动重新连接的IBM MQ客户端应用程序中检测重新连接。
EN

Stack Overflow用户
提问于 2020-01-27 21:20:38
回答 4查看 4K关注 0票数 5

我使用IBM类用于JMS (IBM版本8.0.0.4),并配置了自动重连接。根据文档,重连接是隐式发生的。在重新连接的情况下,我想发出一个简单的日志语句。因为这个原因,我需要知道什么时候会发生这种事。

应用恢复页面上的IBM中,我无意中看到了“检测故障转移”一节,其中写道:

重连接感知:向队列管理器注册MQCBT_EVENT_HANDLER事件处理程序。当客户端开始尝试重新连接到服务器时,事件处理程序将与MQRC_RECONNECTING一起发布,并在成功重新连接后使用MQRC_RECONNECTED。然后可以运行一个例程来重新建立可预测的状态,以便客户端应用程序能够继续处理。

不幸的是,我没有找到Java/JMS的代码示例来演示如何和在何处注册这样的事件处理程序。我不知道这在我的案子中是否得到支持。有人能为我提供正确的方向,甚至提供代码示例吗?非常感谢。

问题从2020年2月5日起更新:

在从2020年1月27日收到Sashi的初步答复后,我添加了下面的代码示例。

代码语言:javascript
运行
复制
public static void main(String[] args) {
    Connection connection = null;
    Session session = null;
    Object destination = null;
    MessageProducer producer = null;

    try {
        JmsFactoryFactory jmsFactoryFactory = JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER);
        JmsConnectionFactory cf = jmsFactoryFactory.createConnectionFactory();

        cf.setStringProperty(WMQConstants.WMQ_HOST_NAME, HOST);
        cf.setIntProperty(WMQConstants.WMQ_PORT, PORT);
        cf.setStringProperty(WMQConstants.WMQ_CHANNEL, CHANNEL);
        cf.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_CLIENT);
        cf.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER, QM_NAME);
        cf.setIntProperty(WMQConstants.WMQ_CLIENT_RECONNECT_OPTIONS, WMQConstants.WMQ_CLIENT_RECONNECT);
        cf.setIntProperty(WMQConstants.WMQ_CLIENT_RECONNECT_TIMEOUT, RECONNECT_TIMEOUT);

        connection = cf.createConnection();
        connection.setExceptionListener(new MQExceptionListener());
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        destination = session.createQueue(QUEUE);
        producer = session.createProducer((Destination)destination);
        connection.start();
    } catch (JMSException ex) {
        LOGGER.error(ex.toString());
    }
}

public class MQExceptionListener implements ExceptionListener {
    public void onException(JMSException e) {
        System.out.println(e);
        if(e.getLinkedException() != null)
            System.out.println(e.getLinkedException());
    }
}

这就是我在日志里得到的:

代码语言:javascript
运行
复制
ERROR [Main.main:57] (main) com.ibm.msg.client.jms.DetailedIllegalStateException: JMSWMQ0018: Failed to connect to queue manager '<hostname>' with connection mode 'Client' and host name '<hostname>(<port>)'.
Check the queue manager is started and if running in client mode, check there is a listener running. Please see the linked exception for more information.
ERROR [Main.main:61] (main) Inner exceptions:
ERROR [Main.main:65] (main) com.ibm.mq.MQException: JMSCMQ0001: IBM MQ call failed with compcode '2' ('MQCC_FAILED') reason '2538' ('MQRC_HOST_NOT_AVAILABLE').
ERROR [Main.main:65] (main) com.ibm.mq.jmqi.JmqiException: CC=2;RC=2538;AMQ9204: Connection to host '<hostname>(<port>)' rejected. [1=com.ibm.mq.jmqi.JmqiException[CC=2;RC=2538;AMQ9204: Connection to host '<hostname>/<ip>:<port>' rejected. [1=java.net.ConnectException[Connection refused: connect],3=<hostname>/<ip>:<port>,4=TCP,5=Socket.connect]],3=<hostname>(<port>),5=RemoteTCPConnection.bindAndConnectSocket]
ERROR [Main.main:65] (main) com.ibm.mq.jmqi.JmqiException: CC=2;RC=2538;AMQ9204: Connection to host '<hostname>/<ip>:<port>' rejected. [1=java.net.ConnectException[Connection refused: connect],3=<hostname>/<ip>:<port>,4=TCP,5=Socket.connect]
ERROR [Main.main:65] (main) java.net.ConnectException: Connection refused: connect

问题从2020年2月11日起更新:

我根据Sashi在2020年2月5日收到的反馈添加了这些内容。

我试图构建一个连接到IBM实例的最小应用程序。下面是代码:

Application.java

代码语言:javascript
运行
复制
public class Application {
    private static final Logger LOGGER = LoggerFactory.getLogger(Application.class);

    public static void main(String[] args) {
        new Application().run();
    }

    private void run() {
        MQWriter writer = new MQWriter();
        int i = 1;
        while (true) {
            String message = "Hello Testing " + i;
            LOGGER.info("Sending message {} to MQ server...", message);
            writer.write(message);
            i++;
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

MQConnectionDetails.java

代码语言:javascript
运行
复制
public class MQConnectionDetails {
    public static final String HOST = "XXX.XXX.XXX.XXX";
    public static final int PORT = 1414;
    public static final String QM_NAME = "QM1";
    public static final String CHANNEL = "DEV.APP.SVRCONN";
    public static final String QUEUE = "DEV.QUEUE.1";
    public static final int RECONNECT_TIMEOUT = 60; // 1 minute
}

MQWriter.java

代码语言:javascript
运行
复制
public class MQWriter {

    private static final Logger LOGGER = LoggerFactory.getLogger(MQWriter.class);

    private Connection connection = null;
    private Session session = null;
    private Object destination = null;
    private MessageProducer producer = null;

    public MQWriter() {
        try {
            JmsFactoryFactory jff = JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER);
            JmsConnectionFactory jcf = jff.createConnectionFactory();
            jcf.setStringProperty(WMQConstants.WMQ_HOST_NAME, MQConnectionDetails.HOST);
            jcf.setIntProperty(WMQConstants.WMQ_PORT, MQConnectionDetails.PORT);
            jcf.setStringProperty(WMQConstants.WMQ_CHANNEL, MQConnectionDetails.CHANNEL);
            jcf.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_CLIENT);
            jcf.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER, MQConnectionDetails.QM_NAME);
            jcf.setIntProperty(WMQConstants.WMQ_CLIENT_RECONNECT_OPTIONS, WMQConstants.WMQ_CLIENT_RECONNECT);
            jcf.setIntProperty(WMQConstants.WMQ_CLIENT_RECONNECT_TIMEOUT, MQConnectionDetails.RECONNECT_TIMEOUT);

            LOGGER.info("Initializing connection to write queue {} on {}:{}...",
                    MQConnectionDetails.QUEUE,
                    MQConnectionDetails.HOST,
                    MQConnectionDetails.PORT);
            connection = jcf.createConnection();
            connection.setExceptionListener(new MQExceptionListener());
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            destination = session.createQueue(MQConnectionDetails.QUEUE);
            producer = session.createProducer((Destination)destination);
            connection.start();
        } catch (JMSException ex) {
            LOGGER.error("Error initializing connection to write queue", ex);
        }
    }

    public void write(String message) {
        try {
            TextMessage textMessage = session.createTextMessage(message);
            producer.send(textMessage);
        } catch (Exception ex) {
            LOGGER.error("Error sending message to write queue", ex);
        }
    }
}

MQExceptionListener.java

代码语言:javascript
运行
复制
public class MQExceptionListener implements ExceptionListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(MQExceptionListener.class);

    public void onException(JMSException ex) {
        LOGGER.error("=====");
        LOGGER.error(ex.toString());
        if (ex.getLinkedException() != null) {
            LOGGER.error(ex.getLinkedException().toString());
        }
        LOGGER.error("=====");
    }
}

我运行的测试场景如下:

  1. 确保IBM在TCP端口1414 (运行在亚马逊EC2上的IBM容器)上可用。
  2. 运行上面的应用程序(Application.java)并确保它向队列发送消息。
  3. 通过将端口从1414更改为1415,更改亚马逊EC2安全组上的防火墙配置,这使得IBM对客户端不可用。

这就是我观察到的:

  • 只有在90秒的不活动之后,客户端才开始抛出异常。我不明白,因为我的RECONNECT_TIMEOUT被设置为60秒,所以离这里30秒。
  • MQExceptionListener只被调用一次(第一次)。
  • 没有理由代码2544(MQRC_RECONNECTING)只存在于2009年(MQRC_CONNECTION_BROKEN)。为什么会这样呢?

下面是抛出的异常的摘要:

控制台上的异常:

代码语言:javascript
运行
复制
2020-02-11 09:50:16,155 INFO [Application.run:21] (main) Sending message Hello Testing 13 to MQ server...
2020-02-11 09:50:17,285 INFO [Application.run:21] (main) Sending message Hello Testing 14 to MQ server...
2020-02-11 09:50:18,413 INFO [Application.run:21] (main) Sending message Hello Testing 15 to MQ server...
2020-02-11 09:50:19,555 INFO [Application.run:21] (main) Sending message Hello Testing 16 to MQ server...
2020-02-11 09:51:45,966 ERROR [MQExceptionListener.onException:14] (JMSCCThreadPoolWorker-6) =====
2020-02-11 09:51:45,966 ERROR [MQExceptionListener.onException:15] (JMSCCThreadPoolWorker-6) com.ibm.msg.client.jms.DetailedJMSException: JMSWMQ1107: A problem with this connection has occurred.
An error has occurred with the IBM MQ JMS connection.
Use the linked exception to determine the cause of this error.
2020-02-11 09:51:45,966 ERROR [MQExceptionListener.onException:17] (JMSCCThreadPoolWorker-6) com.ibm.mq.MQException: MQ delivered an asynchronous event with completion code '2', and reason '2009'.
2020-02-11 09:51:45,966 ERROR [MQExceptionListener.onException:19] (JMSCCThreadPoolWorker-6) =====
2020-02-11 09:51:45,967 ERROR [MQWriter.write:52] (main) Error sending message to write queue
com.ibm.msg.client.jms.DetailedJMSException: JMSWMQ2007: Failed to send a message to destination 'DEV.QUEUE.1'.
JMS attempted to perform an MQPUT or MQPUT1; however IBM MQ reported an error.
Use the linked exception to determine the cause of this error.
    at com.ibm.msg.client.wmq.common.internal.Reason.reasonToException(Reason.java:595)
    at com.ibm.msg.client.wmq.common.internal.Reason.createException(Reason.java:215)
    at com.ibm.msg.client.wmq.internal.WMQMessageProducer.checkJmqiCallSuccess(WMQMessageProducer.java:1288)
    at com.ibm.msg.client.wmq.internal.WMQMessageProducer.checkJmqiCallSuccess(WMQMessageProducer.java:1245)
    at com.ibm.msg.client.wmq.internal.WMQMessageProducer.access$800(WMQMessageProducer.java:76)
    at com.ibm.msg.client.wmq.internal.WMQMessageProducer$SpiIdentifiedProducerShadow.sendInternal(WMQMessageProducer.java:906)
    at com.ibm.msg.client.wmq.internal.WMQMessageProducer$ProducerShadow.send(WMQMessageProducer.java:566)
    at com.ibm.msg.client.wmq.internal.WMQMessageProducer.send(WMQMessageProducer.java:1428)
    at com.ibm.msg.client.jms.internal.JmsMessageProducerImpl.sendMessage(JmsMessageProducerImpl.java:855)
    at com.ibm.msg.client.jms.internal.JmsMessageProducerImpl.synchronousSendInternal(JmsMessageProducerImpl.java:2055)
    at com.ibm.msg.client.jms.internal.JmsMessageProducerImpl.sendInternal(JmsMessageProducerImpl.java:1993)
    at com.ibm.msg.client.jms.internal.JmsMessageProducerImpl.send(JmsMessageProducerImpl.java:1486)
    at com.ibm.mq.jms.MQMessageProducer.send(MQMessageProducer.java:293)
    at org.example.MQWriter.write(MQWriter.java:50)
    at org.example.Application.run(Application.java:22)
    at org.example.Application.main(Application.java:13)
Caused by: com.ibm.mq.MQException: JMSCMQ0001: IBM MQ call failed with compcode '2' ('MQCC_FAILED') reason '2009' ('MQRC_CONNECTION_BROKEN').
    at com.ibm.msg.client.wmq.common.internal.Reason.createException(Reason.java:203)
    ... 14 more
Caused by: com.ibm.mq.jmqi.JmqiException: CC=2;RC=2009
    at com.ibm.mq.jmqi.remote.api.RemoteHconn$ReconnectionState.recordFailure(RemoteHconn.java:4931)
    at com.ibm.mq.jmqi.remote.api.RemoteHconn.setReconnectionFailureInner(RemoteHconn.java:2650)
    at com.ibm.mq.jmqi.remote.api.RemoteParentHconn.setReconnectionFailure(RemoteParentHconn.java:152)
    at com.ibm.mq.jmqi.remote.impl.RemoteReconnectThread.bestHconn(RemoteReconnectThread.java:265)
    at com.ibm.mq.jmqi.remote.impl.RemoteReconnectThread.run(RemoteReconnectThread.java:115)
    at com.ibm.msg.client.commonservices.workqueue.WorkQueueItem.runTask(WorkQueueItem.java:319)
    at com.ibm.msg.client.commonservices.workqueue.SimpleWorkQueueItem.runItem(SimpleWorkQueueItem.java:99)
    at com.ibm.msg.client.commonservices.workqueue.WorkQueueItem.run(WorkQueueItem.java:343)
    at com.ibm.msg.client.commonservices.workqueue.WorkQueueManager.runWorkQueueItem(WorkQueueManager.java:312)
    at com.ibm.msg.client.commonservices.j2se.workqueue.WorkQueueManagerImplementation$ThreadPoolWorker.run(WorkQueueManagerImplementation.java:1227)
2020-02-11 09:51:46,969 INFO [Application.run:21] (main) Sending message Hello Testing 17 to MQ server...
2020-02-11 09:51:46,972 ERROR [MQWriter.write:52] (main) Error sending message to write queue
com.ibm.msg.client.jms.DetailedJMSException: JMSWMQ2007: Failed to send a message to destination 'DEV.QUEUE.1'.
JMS attempted to perform an MQPUT or MQPUT1; however IBM MQ reported an error.
Use the linked exception to determine the cause of this error.
    at com.ibm.msg.client.wmq.common.internal.Reason.reasonToException(Reason.java:595)
    at com.ibm.msg.client.wmq.common.internal.Reason.createException(Reason.java:215)
    at com.ibm.msg.client.wmq.internal.WMQMessageProducer.checkJmqiCallSuccess(WMQMessageProducer.java:1288)
    at com.ibm.msg.client.wmq.internal.WMQMessageProducer.checkJmqiCallSuccess(WMQMessageProducer.java:1245)
    at com.ibm.msg.client.wmq.internal.WMQMessageProducer.access$800(WMQMessageProducer.java:76)
    at com.ibm.msg.client.wmq.internal.WMQMessageProducer$SpiIdentifiedProducerShadow.sendInternal(WMQMessageProducer.java:906)
    at com.ibm.msg.client.wmq.internal.WMQMessageProducer$ProducerShadow.send(WMQMessageProducer.java:566)
    at com.ibm.msg.client.wmq.internal.WMQMessageProducer.send(WMQMessageProducer.java:1428)
    at com.ibm.msg.client.jms.internal.JmsMessageProducerImpl.sendMessage(JmsMessageProducerImpl.java:855)
    at com.ibm.msg.client.jms.internal.JmsMessageProducerImpl.synchronousSendInternal(JmsMessageProducerImpl.java:2055)
    at com.ibm.msg.client.jms.internal.JmsMessageProducerImpl.sendInternal(JmsMessageProducerImpl.java:1993)
    at com.ibm.msg.client.jms.internal.JmsMessageProducerImpl.send(JmsMessageProducerImpl.java:1486)
    at com.ibm.mq.jms.MQMessageProducer.send(MQMessageProducer.java:293)
    at org.example.MQWriter.write(MQWriter.java:50)
    at org.example.Application.run(Application.java:22)
    at org.example.Application.main(Application.java:13)
Caused by: com.ibm.mq.MQException: JMSCMQ0001: IBM MQ call failed with compcode '2' ('MQCC_FAILED') reason '2009' ('MQRC_CONNECTION_BROKEN').
    at com.ibm.msg.client.wmq.common.internal.Reason.createException(Reason.java:203)
    ... 14 more
Caused by: com.ibm.mq.jmqi.JmqiException: CC=2;RC=2009
    at com.ibm.mq.jmqi.remote.api.RemoteHconn$ReconnectionState.recordFailure(RemoteHconn.java:4931)
    at com.ibm.mq.jmqi.remote.api.RemoteHconn.setReconnectionFailureInner(RemoteHconn.java:2650)
    at com.ibm.mq.jmqi.remote.api.RemoteParentHconn.setReconnectionFailure(RemoteParentHconn.java:152)
    at com.ibm.mq.jmqi.remote.impl.RemoteReconnectThread.bestHconn(RemoteReconnectThread.java:265)
    at com.ibm.mq.jmqi.remote.impl.RemoteReconnectThread.run(RemoteReconnectThread.java:115)
    at com.ibm.msg.client.commonservices.workqueue.WorkQueueItem.runTask(WorkQueueItem.java:319)
    at com.ibm.msg.client.commonservices.workqueue.SimpleWorkQueueItem.runItem(SimpleWorkQueueItem.java:99)
    at com.ibm.msg.client.commonservices.workqueue.WorkQueueItem.run(WorkQueueItem.java:343)
    at com.ibm.msg.client.commonservices.workqueue.WorkQueueManager.runWorkQueueItem(WorkQueueManager.java:312)
    at com.ibm.msg.client.commonservices.j2se.workqueue.WorkQueueManagerImplementation$ThreadPoolWorker.run(WorkQueueManagerImplementation.java:1227)

问题从2020年2月12日起更新

从2020年2月11日起,根据JoshMc的回答,添加了这个样本和发现。我对这个样本的评论如下:

  • 我现在使用MQ*类,并按照建议设置重新连接选项。
  • 但是,重新连接仍然没有发生。

MQWriter2.java

代码语言:javascript
运行
复制
public class MQWriter2 {

    private static final Logger LOGGER = LoggerFactory.getLogger(MQWriter2.class);

    private Connection connection = null;
    private Session session = null;
    private Queue destination = null;
    private MessageProducer producer = null;

    public MQWriter2() {
        try {
            MQConnectionFactory factory = new MQConnectionFactory();
            factory.setTransportType(WMQConstants.WMQ_CM_CLIENT);
            factory.setConnectionNameList("XXX.XXX.XXX.XXX(1414)");
            factory.setQueueManager(MQConnectionDetails.QM_NAME);
            factory.setChannel(MQConnectionDetails.CHANNEL);
            factory.setClientReconnectOptions(WMQConstants.WMQ_CLIENT_RECONNECT);
            factory.setClientReconnectTimeout(MQConnectionDetails.RECONNECT_TIMEOUT);

            LOGGER.info("Initializing connection to write queue {} on {}:{}...",
                    MQConnectionDetails.QUEUE,
                    MQConnectionDetails.HOST,
                    MQConnectionDetails.PORT);
            connection = factory.createConnection();
            connection.setExceptionListener(new MQExceptionListener());
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            destination = session.createQueue(MQConnectionDetails.QUEUE);
            producer = session.createProducer(destination);
            connection.start();
        } catch (JMSException ex) {
            LOGGER.error("Error initializing connection to write queue", ex);
        }
    }

    public void write(String message) {
        try {
            TextMessage textMessage = session.createTextMessage(message);
            producer.send(textMessage);
        } catch (Exception ex) {
            LOGGER.error("Error sending message to write queue", ex);
        }
    }
}

控制台输出

代码语言:javascript
运行
复制
2020-02-12 08:39:11,628 INFO [MQWriter2.<init>:29] (main) Initializing connection to write queue DEV.QUEUE.1 on 54.161.121.207:1414...
2020-02-12 08:39:14,552 INFO [Application.run:19] (main) Sending message Hello Testing 1 to MQ server...
2020-02-12 08:39:15,710 INFO [Application.run:19] (main) Sending message Hello Testing 2 to MQ server...
2020-02-12 08:39:16,841 INFO [Application.run:19] (main) Sending message Hello Testing 3 to MQ server...
...
2020-02-12 08:39:41,973 INFO [Application.run:19] (main) Sending message Hello Testing 25 to MQ server...
2020-02-12 08:41:27,314 ERROR [MQExceptionListener.onException:14] (JMSCCThreadPoolWorker-10) =====
2020-02-12 08:41:27,314 ERROR [MQExceptionListener.onException:15] (JMSCCThreadPoolWorker-10) com.ibm.msg.client.jms.DetailedJMSException: JMSWMQ1107: A problem with this connection has occurred.
An error has occurred with the IBM MQ JMS connection.
Use the linked exception to determine the cause of this error.
2020-02-12 08:41:27,314 ERROR [MQWriter2.write:49] (main) Error sending message to write queue
com.ibm.msg.client.jms.DetailedJMSException: JMSWMQ2007: Failed to send a message to destination 'DEV.QUEUE.1'.
JMS attempted to perform an MQPUT or MQPUT1; however IBM MQ reported an error.
Use the linked exception to determine the cause of this error.
    at com.ibm.msg.client.wmq.common.internal.Reason.reasonToException(Reason.java:595)
    at com.ibm.msg.client.wmq.common.internal.Reason.createException(Reason.java:215)
    at com.ibm.msg.client.wmq.internal.WMQMessageProducer.checkJmqiCallSuccess(WMQMessageProducer.java:1288)
    at com.ibm.msg.client.wmq.internal.WMQMessageProducer.checkJmqiCallSuccess(WMQMessageProducer.java:1245)
    at com.ibm.msg.client.wmq.internal.WMQMessageProducer.access$800(WMQMessageProducer.java:76)
    at com.ibm.msg.client.wmq.internal.WMQMessageProducer$SpiIdentifiedProducerShadow.sendInternal(WMQMessageProducer.java:906)
    at com.ibm.msg.client.wmq.internal.WMQMessageProducer$ProducerShadow.send(WMQMessageProducer.java:566)
    at com.ibm.msg.client.wmq.internal.WMQMessageProducer.send(WMQMessageProducer.java:1428)
    at com.ibm.msg.client.jms.internal.JmsMessageProducerImpl.sendMessage(JmsMessageProducerImpl.java:855)
    at com.ibm.msg.client.jms.internal.JmsMessageProducerImpl.synchronousSendInternal(JmsMessageProducerImpl.java:2055)
    at com.ibm.msg.client.jms.internal.JmsMessageProducerImpl.sendInternal(JmsMessageProducerImpl.java:1993)
    at com.ibm.msg.client.jms.internal.JmsMessageProducerImpl.send(JmsMessageProducerImpl.java:1486)
    at com.ibm.mq.jms.MQMessageProducer.send(MQMessageProducer.java:293)
    at org.example.MQWriter2.write(MQWriter2.java:47)
    at org.example.Application.run(Application.java:20)
    at org.example.Application.main(Application.java:11)
Caused by: com.ibm.mq.MQException: JMSCMQ0001: IBM MQ call failed with compcode '2' ('MQCC_FAILED') reason '2009' ('MQRC_CONNECTION_BROKEN').
    at com.ibm.msg.client.wmq.common.internal.Reason.createException(Reason.java:203)
    ... 14 more
Caused by: com.ibm.mq.jmqi.JmqiException: CC=2;RC=2009
    at com.ibm.mq.jmqi.remote.api.RemoteHconn$ReconnectionState.recordFailure(RemoteHconn.java:4931)
    at com.ibm.mq.jmqi.remote.api.RemoteHconn.setReconnectionFailureInner(RemoteHconn.java:2650)
    at com.ibm.mq.jmqi.remote.api.RemoteParentHconn.setReconnectionFailure(RemoteParentHconn.java:152)
    at com.ibm.mq.jmqi.remote.impl.RemoteReconnectThread.bestHconn(RemoteReconnectThread.java:265)
    at com.ibm.mq.jmqi.remote.impl.RemoteReconnectThread.run(RemoteReconnectThread.java:115)
    at com.ibm.msg.client.commonservices.workqueue.WorkQueueItem.runTask(WorkQueueItem.java:319)
    at com.ibm.msg.client.commonservices.workqueue.SimpleWorkQueueItem.runItem(SimpleWorkQueueItem.java:99)
    at com.ibm.msg.client.commonservices.workqueue.WorkQueueItem.run(WorkQueueItem.java:343)
    at com.ibm.msg.client.commonservices.workqueue.WorkQueueManager.runWorkQueueItem(WorkQueueManager.java:312)
    at com.ibm.msg.client.commonservices.j2se.workqueue.WorkQueueManagerImplementation$ThreadPoolWorker.run(WorkQueueManagerImplementation.java:1227)
EN

回答 4

Stack Overflow用户

回答已采纳

发布于 2021-03-15 15:14:32

我所面临的问题与话题的起点是一样的。我花了几个小时来筛选互联网上的信息,和同事交谈,拉头发,试图重新连接,之后我放弃了,决定通过模仿无法理解的重新连接功能来解决这个问题。我希望它能帮助其他在IBM MQ中挣扎的人。我写的这门课基本上做了两件事:

  1. 多次尝试连接到IBM,每次尝试之间的间隔越来越长。
  2. 连接之后,设置一个错误处理程序,该处理程序在连接发生时由IBM触发(使用相同的逻辑重新连接)。

首先,这是类本身:

代码语言:javascript
运行
复制
package com.raiks.mqclient;

import javax.jms.Destination;
import javax.jms.JMSConsumer;
import javax.jms.JMSContext;
import javax.jms.JMSException;

import com.ibm.msg.client.jms.JmsConnectionFactory;
import com.ibm.msg.client.wmq.WMQConstants;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.raiks.mqclient.IbmMqMessageListener;

/**
 * This class implements the reconnection logic for JMS brokers that don't support it
 * In particular, it does it for IBM MQ with its incomprehensible reconnection algorithm
 * It's expected that each connection manager receives a separate connection factory
 * and a message listener - it's not guaranteed for those to be thread safe
 */
public final class IbmMqJmsConnectionManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(IbmMqJmsConnectionManager.class);
    private static final int INITIAL_RECONNECTION_DELAY_MS = 6000;
    private static final int MAX_RECONNECTION_DELAY_MS = 60000;
    private static final String QUEUE_PREIX = "queue:///";

    private final String connectorName;
    private final JmsConnectionFactory connectionFactory;
    private final String queueName;

    private final IbmMqMessageListener messageListener;

    private final int initialReconnectionDelayMs;
    private final int maxReconnectionDelayMs;

    public IbmMqJmsConnectionManager(
        String connectorName,
        JmsConnectionFactory connectionFactory,
        String queueName,
        IbmMqMessageListener messageListener,
        int initialReconnectionDelayMs,
        int maxReconnectionDelayMs
    ) {
        this.connectorName = connectorName;
        this.connectionFactory = connectionFactory;
        this.queueName = queueName;
        this.messageListener = messageListener;
        this.initialReconnectionDelayMs = initialReconnectionDelayMs;
        this.maxReconnectionDelayMs = maxReconnectionDelayMs;
    }

    /**
     * Attempts to connect to a JMS broker and makes continuous retries with an increasing interval if fails
     * When the maximum interval is reached, issues an error and keeps on trying
     * Sets the exception listener (a callback) in the created JMSContext which calls this method when the
     * connection with the broker goes down due to network issue or intentional connection termination
     */
    public void connectToBrokerWithRetries() {
        String connectionDetails = formatConnectionDetails();
        LOGGER.info("Attempting to connect to JMS broker '{}'. Connection details = {}", connectorName, connectionDetails);

        JMSContext context = null;
        int sleepingTimeMs = INITIAL_RECONNECTION_DELAY_MS;
        int accumulatedSleepingTimeMs = 0;

        // Try to reconnect until we succeed. IMPORTANT! This is a blocking loop that never ends so it must be run in a separate thread
        while (context == null) {
            try {
                context = connectionFactory.createContext(JMSContext.AUTO_ACKNOWLEDGE);
                LOGGER.info("Successfully connected to the JMS broker '{}'. Connection details = {}", connectorName, connectionDetails);

                boolean hadUnsuccessfulConnectionAttempts = accumulatedSleepingTimeMs > 0;
                if (hadUnsuccessfulConnectionAttempts) {
                    LOGGER.warn(
                        "Before this successful attempt, I spent {} ms repeatedly trying to connect to '{}'. Please check the broker's health. Connection details = {}",
                        accumulatedSleepingTimeMs, connectorName, connectionDetails
                    );
                }

                Destination destination = context.createQueue(QUEUE_PREIX + queueName);
                JMSConsumer jmsConsumer = context.createConsumer(destination);
                jmsConsumer.setMessageListener(messageListener);
                LOGGER.info("Successfully connected to the queue '{}' at '{}'. Connection details = {}", queueName, connectorName, connectionDetails);

                // Sets a callback that will be invoked when something happens with a connection to a broker
                context.setExceptionListener(
                    jmsException -> {
                        LOGGER.warn("Something bad happened to JMS broker connection to '{}'. I will try to reconnect. Connection details = {}", connectorName, connectionDetails);
                        connectToBrokerWithRetries();
                    }
                );
            } catch (Exception e) {
                LOGGER.warn(
                    "Failed to create a JMS context for '{}'. I will wait for {} ms and then make a reconnection attempt. Connection details = {}",
                    connectorName, sleepingTimeMs, connectionDetails, e
                );
                context = null;
                try {
                    Thread.sleep(sleepingTimeMs);
                    accumulatedSleepingTimeMs += sleepingTimeMs;
                    int doubledSleepingTime = sleepingTimeMs * 2;
                    // We double the sleeping time on each subsequent attempt until we hit the limit
                    // Then we just keep on reconnecting forever using the limit value
                    boolean nextReconnectionDelayWillExceedMaxDelay = doubledSleepingTime >= MAX_RECONNECTION_DELAY_MS;
                    if (nextReconnectionDelayWillExceedMaxDelay) {
                        sleepingTimeMs = MAX_RECONNECTION_DELAY_MS;
                        LOGGER.error(
                            "Repeatedly failed to create a JMS context for {} ms. I will keep on trying every {} ms but please check the broker availability. Connection details = {}",
                            accumulatedSleepingTimeMs, sleepingTimeMs, connectionDetails
                        );
                    } else {
                        sleepingTimeMs = doubledSleepingTime;
                    }
                } catch (InterruptedException ex) {
                    throw new RuntimeException(ex);
                }
            }
        }
    }

    private String formatConnectionDetails() {
        String connectionDetails = "[]";
        try {
            connectionDetails = String.format(
                "[ host = %s, port = %d, queueManager = %s, channel = %s, user = %s ]",
                connectionFactory.getStringProperty(WMQConstants.WMQ_HOST_NAME),
                connectionFactory.getIntProperty(WMQConstants.WMQ_PORT),
                connectionFactory.getStringProperty(WMQConstants.WMQ_QUEUE_MANAGER),
                connectionFactory.getStringProperty(WMQConstants.WMQ_CHANNEL),
                connectionFactory.getStringProperty(WMQConstants.USERID)
            );
        } catch (Exception e) {
            LOGGER.warn("Failed to get the connection details. This is not critical, but the details will be unavailable");
        }
        return connectionDetails;
    }
}

下面是你如何使用它:

代码语言:javascript
运行
复制
LOGGER.info("Starting the initial connection thread");
Thread cftInitialConnectionThread = new Thread(cftConnectionManager::connectToBrokerWithRetries);
cftInitialConnectionThread.start();
票数 2
EN

Stack Overflow用户

发布于 2020-01-27 23:54:39

在创建连接之后,可以在connection对象上设置ExceptionListener。在重连接尝试时调用ExceptionListener的ExceptionListener方法。下面是一个示例:

代码语言:javascript
运行
复制
    ExceptionListener exceptionListener = new ExceptionListener(){
        @Override
        public void onException(JMSException e) {
            System.out.println(e);
            if(e.getLinkedException() != null)
                System.out.println(e.getLinkedException());
        }
    };
    MQQueueConnection connection = (MQQueueConnection) cf.createQueueConnection();
    connection.setExceptionListener(exceptionListener);
票数 5
EN

Stack Overflow用户

发布于 2021-09-02 05:23:52

请检查这段代码,因为我正在使用IBMWMQ9.2.3,在CentOS 8上使用一个3节点IBM多实例和Pac怪人。

代码语言:javascript
运行
复制
package com.ibm.mq.samples.jms;

import javax.jms.Destination;
import javax.jms.JMSConsumer;
import javax.jms.JMSContext;
import javax.jms.JMSException;
import javax.jms.JMSProducer;
import javax.jms.TextMessage;

import com.ibm.msg.client.jms.JmsConnectionFactory;
import com.ibm.msg.client.jms.JmsFactoryFactory;
import com.ibm.msg.client.wmq.WMQConstants;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

public class JmsPutGet {

    // System exit status value (assume unset value to be 1)
    private static int status = 1;

    // Create variables for the connection to MQ
    private static final String HOST = "192.168.49.140"; // Host name or IP address
    private static final int PORT = 10200; // Listener port for your queue manager
    private static final String CHANNEL = "CHANNEL1"; // Channel name
    private static final String QMGR = "HAQM1"; // Queue manager name
    private static final String APP_USER = ""; // User name that application uses to connect to MQ
    private static final String APP_PASSWORD = ""; // Password that the application uses to connect to MQ
    private static final String QUEUE_NAME = "SOURCE"; // Queue that the application uses to put and get messages to and from
    private static final int RECONNECT_TIMEOUT = 60; // 1 minute
    private static JMSContext context = null;
    private static  Destination destination = null;

    public static void main(String[] args) {

        // Variables
        JMSProducer producer = null;
        JMSConsumer consumer = null;        
        LocalDateTime now = null;

        try {
            
            setupResources();

            long uniqueNumber = System.currentTimeMillis() % 1000;
            TextMessage message = context.createTextMessage("Your lucky number today is " + uniqueNumber);
            DateTimeFormatter dtf = DateTimeFormatter.ofPattern("uuuu/MM/dd HH:mm:ss");
            
            for(int i=0; i>=0; i++){
                producer = context.createProducer();
                producer.send(destination, message);
                //System.out.println("Sent message:\n " + i + " " + message);
                System.out.println("\nMensaje enviado:\n " + i );
                now = LocalDateTime.now();
                System.out.println(dtf.format(now));
                consumer = context.createConsumer(destination); // autoclosable
                String receivedMessage = consumer.receiveBody(String.class, 15000); // in ms or 15 seconds
                //System.out.println("\nReceived message:\n " + i + " " + receivedMessage);
                System.out.println("\nMensaje recibido:\n " + i );
                now = LocalDateTime.now();
                System.out.println(dtf.format(now));
                Thread.sleep(1000);
            }
            context.close();

            recordSuccess();
        } catch (Exception ex) {
            recordFailure(ex);
            System.out.println("DETECTING ERROR... RECONNECTING");
            setupResources();
            
        }

    } // end main()
        
    private static void setupResources() { 

        boolean connected = false; 
        while (!connected) { 
            try { 
                // Create a connection factory
                JmsFactoryFactory ff = JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER);
                JmsConnectionFactory cf = ff.createConnectionFactory();             
                
                // Set the properties
                cf.setStringProperty(WMQConstants.WMQ_HOST_NAME, HOST);
                cf.setIntProperty(WMQConstants.WMQ_PORT, PORT);             
                //cf.setStringProperty(WMQConstants.WMQ_CONNECTION_NAME_LIST, "192.168.49.140(10200),192.168.49.131(10200),192.168.49.132(10200)");
                cf.setStringProperty(WMQConstants.WMQ_CHANNEL, CHANNEL);
                cf.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_CLIENT);
                cf.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER, QMGR);
                cf.setStringProperty(WMQConstants.WMQ_APPLICATIONNAME, "JmsPutGet (JMS)");
                cf.setBooleanProperty(WMQConstants.USER_AUTHENTICATION_MQCSP, true);
                cf.setStringProperty(WMQConstants.USERID, APP_USER);
                cf.setStringProperty(WMQConstants.PASSWORD, APP_PASSWORD);
                cf.setIntProperty(WMQConstants.WMQ_CLIENT_RECONNECT_TIMEOUT, RECONNECT_TIMEOUT);
                cf.setIntProperty(WMQConstants.WMQ_CLIENT_RECONNECT_OPTIONS, WMQConstants.WMQ_CLIENT_RECONNECT);
                //cf.setStringProperty(WMQConstants.WMQ_SSL_CIPHER_SUITE, "*TLS12");                

                // Create JMS objects
                context = cf.createContext();
                destination = context.createQueue("queue:///" + QUEUE_NAME);
                // no exception? then we connected ok 
                connected = true; 
                System.out.println("CONNECTED");
            } 
            catch (JMSException je) { 
                // sleep and then have another attempt 
                System.out.println("RECONNECTING");
                try {Thread.sleep(30*1000);} catch (InterruptedException ie) {} 
            } 
        } 
    }

    private static void recordSuccess() {
        System.out.println("SUCCESS");
        status = 0;
        return;
    }

    private static void recordFailure(Exception ex) {
        if (ex != null) {
            if (ex instanceof JMSException) {
                processJMSException((JMSException) ex);
            } else {
                System.out.println(ex);
            }
        }
        System.out.println("FAILURE");
        status = -1;
        return;
    }

    private static void processJMSException(JMSException jmsex) {
        System.out.println(jmsex);
        Throwable innerException = jmsex.getLinkedException();
        if (innerException != null) {
            System.out.println("Inner exception(s):");
        }
        while (innerException != null) {
            System.out.println(innerException);
            innerException = innerException.getCause();
        }
        return;
   }
}

我在github中为任何参考https://github.com/fintecheando/IBMMQSample添加了一个链接

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59939068

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档