需要你的帮助来找到解决方案。目前的实施细节:
SolConnectionFactory connectionFactory = SolJmsUtility.createConnectionFactory(); // for create connection factory using host , vpn,trust store,keystore with auth scheme AUTHENTICATION_SCHEME_CLIENT_CERTIFICATE
this.connection = connectionFactory.createConnection(); // connection creation
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); // session creation using client acknowledge
使用MessageListener侦听队列
public class MyListener implements MessageListener
public void onMessage(Message message) // receive the message and process
{
/* in this method validating the message (poison message)
* and publish to kafka, once receive the success message
* from kafka acknowledge the message
*/
message.acknowledge();
}
问题:如果卡夫卡经纪人倒下了,该怎么办重试?我们和内部安全小组进行了讨论。
我们假设,如果客户端没有调用message.acknowledge()
Solace,那么将执行重试,但是内部Solace团队澄清了它使用的是窗口机制,所以如果后续消息被确认,那么前面的消息也会被确认和删除。
他们的建议是发送失败消息的NACK (否定确认),然后Solace可以重试该消息。如何使用Java发送NACK?
Max_Un_Ack_Message
和Max_Redeliver_Count
的建议值是多少?
Maven对Solace的依赖:
<dependency>
<groupId>com.solacesystems</groupId>
<artifactId>sol-jms</artifactId>
<version>10.0.0</version>
</dependency>
发布于 2021-09-28 13:30:29
如果您正在使用JMS,并且希望触发重发,那么您有几个选项。
您可以使用事务会话。当处理消息成功时,您可以确认消息并commit()
javax.jms.Session
。处理失败时,可以在rollback()
上调用javax.jms.Session
。
或者,您可以使用javax.jms.Session.recover()
。JavaDoc是这么说的:
在此会话中停止邮件传递,并使用最古老的未确认邮件重新启动邮件传递。 所有消费者都按顺序传递消息。确认接收到的消息会自动确认已传递给客户端的所有消息。 重新启动会话将导致它采取以下操作:
当然,这假定Solace JMS客户机实际上在这里实现了指定的行为。
发布于 2021-09-28 13:57:45
根据讨论这里,message.acknowledge()
与窗口无关。由于您在示例中使用的是异步解决方案,如果不调用stop()
,则传输窗口将关闭,消息将不被确认。
message.acknowledge()
只用于确认消息已被接收和使用,在此消息已经从Solace上的持久存储中删除。请注意,未确认的消息仅在要连接的下一个使用者上重新传递。
https://stackoverflow.com/questions/69361900
复制相似问题