前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >rocketmq使用DefaultMQPushConsumer创建消费者客户端

rocketmq使用DefaultMQPushConsumer创建消费者客户端

作者头像
路过君
发布2021-10-15 15:15:47
1.7K0
发布2021-10-15 15:15:47
举报
文章被收录于专栏:路过君BLOG from CSDN

样例

代码语言:javascript
复制
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("organization");
consumer.setNamesrvAddr("172.22.0.64:9876"); // NAME_SERVER地址
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 从哪个位置开始消费消息
consumer.subscribe("my-topic", "*"); // 订阅主题
consumer.registerMessageListener(new MessageListenerOrderly() { // 注册消息监听(顺序方式)
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
        for (MessageExt msg : msgs) {
            System.out.println(new String(msg.getBody()));
        }
        return ConsumeOrderlyStatus.SUCCESS;
    }
});
consumer.start();

主要参数

  • ConsumeFromWhere 控制新的消费者组从哪个位置开始消费

枚举值

效果

CONSUME_FROM_LAST_OFFSET

从最新的消息开始消费

CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST

废弃,效果同上

CONSUME_FROM_MIN_OFFSET

废弃,效果同上

CONSUME_FROM_MAX_OFFSET

废弃 ,效果同上

CONSUME_FROM_FIRST_OFFSET

从最早的消息开始消费

CONSUME_FROM_TIMESTAMP

从指定时间开始消费

注:如果使用CONSUME_FROM_TIMESTAMP ,需设置参数 DefaultMQPushConsumer.setConsumeTimestamp(“20131223171201”) 时间戳字符串格式为yyyyMMddHHmmss

  • DefaultMQPushConsumer.subscribe(String topic, String subExpression) subExpression参数为tag选择表达式 语法:
  1. 不过滤tag:"*" 或者null
  2. 根据多个tag过滤:“tag1 || tag2 || tag3”
  • DefaultMQPushConsumer.registerMessageListener注册消费监听器
  1. 顺序消费监听器MessageListenerOrderly 返回ConsumeOrderlyStatus

枚举值

效果

SUCCESS

成功

ROLLBACK

废弃

COMMIT

废弃

SUSPEND_CURRENT_QUEUE_A_MOMENT

暂停当前队列一会

  1. 并行消费监听器MessageListenerConcurrently 返回ConsumeConcurrentlyStatus

枚举值

效果

CONSUME_SUCCESS

成功

RECONSUME_LATER

失败,稍后重试

源码

  • org.apache.rocketmq.client.impl.consumer.RebalancePushImpl
代码语言:javascript
复制
switch (consumeFromWhere) {
    case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:
    case CONSUME_FROM_MIN_OFFSET:
    case CONSUME_FROM_MAX_OFFSET:
    case CONSUME_FROM_LAST_OFFSET: {
        long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
        if (lastOffset >= 0) {
            result = lastOffset;
        }
        // First start,no offset
        else if (-1 == lastOffset) {
            if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                result = 0L;
            } else {
                try {
                    result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
                } catch (MQClientException e) {
                    result = -1;
                }
            }
        } else {
            result = -1;
        }
        break;
    }
    case CONSUME_FROM_FIRST_OFFSET: {
        long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
        if (lastOffset >= 0) {
            result = lastOffset;
        } else if (-1 == lastOffset) {
            result = 0L;
        } else {
            result = -1;
        }
        break;
    }
    case CONSUME_FROM_TIMESTAMP: {
        long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
        if (lastOffset >= 0) {
            result = lastOffset;
        } else if (-1 == lastOffset) {
            if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                try {
                    result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
                } catch (MQClientException e) {
                    result = -1;
                }
            } else {
                try {
                    long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(),
                        UtilAll.YYYYMMDDHHMMSS).getTime();
                    result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
                } catch (MQClientException e) {
                    result = -1;
                }
            }
        } else {
            result = -1;
        }
        break;
    }

    default:
        break;
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021/03/15 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 样例
  • 主要参数
  • 源码
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档