我正在尝试将持久消息放到WebSphere MQ队列中,但是这些消息需要是异步put。我似乎能够使异步工作的唯一方法是消息是非持久的(这里我的意思是putSuccessCount等于放在MQAsyncStatus上的消息的数量,其他时候都是零)。下面的代码概述了我正在尝试做的事情:
import com.ibm.mq.MQAsyncStatus;
import com.ibm.mq.MQDestination;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQQueueManager;
import com.ibm.mq.constants.MQConstants;
public class MQPutTest extends TestCase{
private static Logger log = Logger.getLogger(MQPutTest.class);
public void testPut() throws Exception{
Hashtable<String, Object> props = new Hashtable<String, Object>();
props.put(MQConstants.CHANNEL_PROPERTY, "my_channel");
props.put(MQConstants.PORT_PROPERTY, 1414);
props.put(MQConstants.HOST_NAME_PROPERTY, "localhost");
String qManager = "my_queue_manager";
MQQueueManager qMgr = new MQQueueManager(qManager, props);
int openOptions = MQConstants.MQOO_OUTPUT | MQConstants.MQOO_INPUT_AS_Q_DEF;
MQDestination queue = qMgr.accessQueue("my_queue", openOptions);
MQPutMessageOptions pmo = new MQPutMessageOptions();
pmo.options = MQConstants.MQPMO_ASYNC_RESPONSE;
MQMessage message = new MQMessage();
message.format = MQConstants.MQFMT_STRING;
message.writeString("test message");
queue.put(message, pmo);
queue.close();
MQAsyncStatus asyncStatus = qMgr.getAsyncStatus();
qMgr.disconnect();
}
}我将我在大量消息中看到的性能提升归因于队列被设置为非持久性,而不是消息被异步放置的事实。我已经在MQ explorer的队列扩展属性中将默认的put响应类型设置为Asynchronous,但这没有任何影响。
任何帮助都将不胜感激。
发布于 2011-12-23 04:06:25
根据问题中的评论,WMQ没有“持久队列”的概念(这就是我更改帖子标题的原因)。持久性的队列属性不会以任何方式改变QMgr处理队列或其中任何消息的方式。它所做的一切就是告诉程序,如果程序员没有显式地设置该值,那么应该使用哪个持久性选项。任何给定的消息是否持久取决于消息第一次放入队列时MQMD是否指定了持久性。任何队列都可以包含持久消息和非持久消息的任意组合。
具体地说,代码片段int he post没有指定使用message.setPersistence()的持久性,因此它将继承队列的默认值。这反过来又取决于从队列属性继承的值。在任何情况下,队列属性设置都不会覆盖应用程序的显式设置。
因此,您看到的性能差异实际上很可能反映了队列的DEFPSIST属性的设置,但这并不意味着异步puts不起作用。请记住,异步put不会以任何方式减少将消息放入队列所需的时间。无论put是否是同步的,QMgr都有相同的代码路径来持久化消息。不同之处在于,您的应用程序不再等待QMgr完成其工作。完全有可能的是,当您调用MQAsyncStatus更新时,WMQ还没有写入任何消息。如果它们都在单个工作单元中,则更有可能发生这种情况,因为在COMMIT处理完成之前,WMQ不会返回所有消息的状态。除非显式调用COMMIT,否则在关闭队列时会发生这种情况。
您可以通过在COMMIT或CLOSE之后的几秒钟内每秒重复qMgr.getAsyncStatus()调用来验证这一点。您应该会看到,第一个返回的消息没有成功放入,但最终您可以解释所有这些消息。
顺便提一下,消息是否持久的问题几乎总是应该在您的代码中处理。消息持久性通常作为在应用程序设计中捕获的业务需求派生而来。因此,确保满足要求是项目经理和开发人员的责任。可靠地确保它得到满足的唯一方法是应用程序显式地调用setPErsistence()。否则,应用程序将使用队列的DEFPSIST属性中的值来隐式调用setPersistence()。那么,为什么要在API中允许这种默认设置呢?因为在一些合法的情况下,持久性需要能够在运行时进行更改。编写应用程序以有意采用默认值,并在每个工作单元完成此操作后重新打开队列。在所有其他情况下,应在程序或托管对象中设置持久性。
最后,如果您将10,000条消息放在单个工作单元下,您可能会得到一个表明没有成功放置消息的响应,另一个原因是缺少日志空间或未设置可调参数来适应负载。例如,如果MAXUMSGS设置为小于10,000,则整个事务将回滚。另一方面,如果MAXUMSGS设置正确,但主日志和辅助日志的大小和数量不足以容纳事务中的数据量,则整个事务将再次回滚。您应该稍微调整一下COMMIT间隔,因为最佳值取决于消息的大小与队列和日志缓冲区的大小的关系。一旦超过可优化为单个写入操作的数据量,额外的消息实际上会降低性能。当人们将10,000条消息放入单个工作单元中时,这几乎不是为了性能,而是因为这是他们适当的恢复单元,而相应的性能影响是次要的恢复要求。
发布于 2011-12-23 00:31:02
如果可以丢失一条或两条消息,您可以将消息添加到内部发送队列,例如
ExecutorService service = Executors.newSingleThreadedExecutor();
// build you message here
// add the message to be sent asynchronously.
service.execute(new Runnable() {
public void run() {
queue.put(message, pmo);
}
});当进程终止时,队列中的所有消息都会丢失。
https://stackoverflow.com/questions/8606306
复制相似问题