前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >死信队列监听补充

死信队列监听补充

作者头像
疯狂的KK
发布2020-09-14 15:46:34
5080
发布2020-09-14 15:46:34
举报
文章被收录于专栏:Java项目实战Java项目实战

死信队列监听一开始的逻辑是正确的,但关于监听的内容以及动态判断有了新的思路,不断发现不断改善。

监听新思路

1.不必去破坏生产者消费者的关系,去创建死信队列的对应消费者,如果不同队列去创建对应的死信队列监听,没什么意义,复用刚开始的思路进行更改。mq配置更改指定监听死信队列,死信队列的名称是可以配置指定的

代码语言:javascript
复制
@Bean
    public ActiveMQConnectionFactory connectionFactory() throws JMSException {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerurl);
        factory.setTrustAllPackages(true);//信任所有包下的序列化对象,解决无法发送对象消息
        javax.jms.Connection connection = factory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue("ActiveMQ.DLQ");
        MessageConsumer messageConsumer = session.createConsumer(queue);
        messageConsumer.setMessageListener(new QueueListener());
        factory.setUserName(userName);
        factory.setPassword(password);
        return factory;
    }

新建监听类

代码语言:javascript
复制
/**
 * @author zhaokkstart
 * @create 2020-09-11 11:18
 */
public class QueueListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        System.out.println("****消费者的消息:"+message);
    }
}

看下能从message获取到什么东西

代码语言:javascript
复制
public interface Message {
    static final int DEFAULT_DELIVERY_MODE = DeliveryMode.PERSISTENT;
    static final int DEFAULT_PRIORITY = 4;
    static final long DEFAULT_TIME_TO_LIVE = 0;
    String getJMSMessageID() throws JMSException;
    void setJMSMessageID(String id) throws JMSException;
    long getJMSTimestamp() throws JMSException;
    void setJMSTimestamp(long timestamp) throws JMSException;
    byte[] getJMSCorrelationIDAsBytes() throws JMSException;
    void setJMSCorrelationIDAsBytes(byte[] correlationID) throws JMSException;
    void setJMSCorrelationID(String correlationID) throws JMSException;
    String getJMSCorrelationID() throws JMSException;
    Destination getJMSReplyTo() throws JMSException;
    void setJMSReplyTo(Destination replyTo) throws JMSException;
    Destination getJMSDestination() throws JMSException;
    void setJMSDestination(Destination destination) throws JMSException;
    int getJMSDeliveryMode() throws JMSException;
    void setJMSDeliveryMode(int deliveryMode) throws JMSException;
    boolean getJMSRedelivered() throws JMSException;
    void setJMSRedelivered(boolean redelivered) throws JMSException;
    String getJMSType() throws JMSException;
    void setJMSType(String type) throws JMSException;
    long getJMSExpiration() throws JMSException;
    void setJMSExpiration(long expiration) throws JMSException;
    int getJMSPriority() throws JMSException;
    void setJMSPriority(int priority) throws JMSException;
    void clearProperties() throws JMSException;
    boolean propertyExists(String name) throws JMSException;
    boolean getBooleanProperty(String name) throws JMSException;
    byte getByteProperty(String name) throws JMSException;
    short getShortProperty(String name) throws JMSException;
    int getIntProperty(String name) throws JMSException;
    long getLongProperty(String name) throws JMSException;
    float getFloatProperty(String name) throws JMSException;
    double getDoubleProperty(String name) throws JMSException;
    String getStringProperty(String name) throws JMSException;
    Object getObjectProperty(String name) throws JMSException;
    Enumeration getPropertyNames() throws JMSException;
    void setBooleanProperty(String name, boolean value) throws JMSException;
    void setByteProperty(String name, byte value) throws JMSException;
    void setShortProperty(String name, short value) throws JMSException;
    void setIntProperty(String name, int value) throws JMSException;
    void setLongProperty(String name, long value) throws JMSException;
    void setFloatProperty(String name, float value) throws JMSException;
    void setDoubleProperty(String name, double value) throws JMSException;
    void setStringProperty(String name, String value) throws JMSException;
    void setObjectProperty(String name, Object value) throws JMSException;
    void acknowledge() throws JMSException;
    void clearBody() throws JMSException;}

还是对象里的那点东西,到有个新思路

代码语言:javascript
复制
commandId = 5,
    responseRequired = true,
    messageId = ID: kk - 59648 - 1599635155556 - 1: 239: 1: 1: 1, 
    originalDestination = null, originalTransactionId = null, 
    producerId = ID: kk - 59648 - 1599635155556 - 1: 239: 1: 1, 
    destination = queue: //add_xxxxxx, 
    transactionId = null, expiration = 0, timestamp = 1599636301936, arrival = 0, 
    brokerInTime = 1599636301937, brokerOutTime = 1599636302110, 
    correlationId = null, replyTo = null, persistent = true, type = null, 
    priority = 4, groupID = null, groupSequence = 0,
    targetConsumerId = null, compressed = false, userID = null, 
    content = org.apache.activemq.util.ByteSequence@54eae153, 
    marshalledProperties = org.apache.activemq.util.ByteSequence@1318dd4d,
    dataStructure = null, redeliveryCounter = 0, size = 0, properties = {timestamp=1599636300958}, 
   readOnlyProperties = true, readOnlyBody = true, droppable = false, 
   jmsXGroupFirstForConsumer = false}

不必去指定type维护队列的信息关系,获取

destination = queue: //add_xxxxxx,

即获取originalDestination属性,判断此消息的入队队列是哪个,然后获取该队列的消费者入队消息进行转换

这个方法是肯定能监听到死信队列的,亲测有效。

(1) 死信队列的配置(一般采用默认)

1. sharedDeadLetterStrategy

不管是queue还是topic,失败的消息都放到这个队列中。下面修改activemq.xml的配置,可以达到修改队列的名字。

2. individualDeadLetterStrategy

可以为queue和topic单独指定两个死信队列。还可以为某个话题,单独指定一个死信队列。

代码语言:javascript
复制
activemq的API文档 http://activemq.apache.org/maven/apidocs/index.html
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-09-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 赵KK日常技术记录 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档