前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RocketMQ 问题定位方法

RocketMQ 问题定位方法

原创
作者头像
派大星在吗
发布2021-12-18 11:09:18
7920
发布2021-12-18 11:09:18
举报
文章被收录于专栏:我的技术专刊我的技术专刊

问题分析

首先联想到的是,是否是 消费线程卡住了呢线程卡住一般因为

  1. 发生了 Stop-the-wolrd:
  2. GC 导致
  3. 其他 safepoint 原因导致(例如 jstack,定时进入 safepoint 等等,参考我的这篇文章JVM相关 - SafePoint 与 Stop The World 全解
  4. 线程处理消息时间过长,可能有锁获取不到,可能卡在某些 IO

采集当时的 JFR (关于

JFR,请参考我的另一系列JFR全解),发现:

  1. 在这个时间段并没有发生 停滞时间很长 的 GC 以及其他 Stop-the-world 的 safepoint 事件:

image

image

  1. 在这段时间,线程是 park 的,并且堆栈显示是 消费线程并没有消息可以消费

image

既然应用并没有什么问题,我们来看看 RocketMQ 是否有什么问题。一般的 RocketMQ Broker 的日志我们关心:

  1. 消息持久化的时间消耗统计,如果这里发生异常,我们需要调优 Java MMAP 相关的参数,请参考:
  2. 消息持久化异常,查看 storeerr.log
  3. 锁异常,查看 lock.log

那究竟应该去看哪一个 broker 呢?之前提到了,发送到这个 Topic 是指定了 hashKey 的,通过消息的 hashKey 我们可以定位到是哪个

broker:

代码语言:txt
复制
int hashCode = "我们的hashKey".hashCode();
代码语言:txt
复制
log.info("{}", Math.abs(hashCode % 24));

我们找到了消息的 hashKey,通过上面的代码,结果是 20,也就是队列 20,通过前面的描述, **我们知道每个 broker 是 8 个队列,20

对应的就是 broker-2 上面的队列 ,也就是 broker-2 queueId = 5 这个队列** 。我们来查看 broker-2

上面的日志定位问题。

我们发现 lock.log 里面有异常,如下所示,类似的有很多条,并且持续了 54s 左右,和线程 park 时间比较吻合,也和消息延迟比较吻合:

代码语言:txt
复制
2021-07-01 07:11:47 WARN AdminBrokerThread_10 - tryLockBatch, message queue locked by other client. Group: 消费group OtherClientId: 10.238.18.6@29 NewClientId: 10.238.18.122@29 MessageQueue [topic=消息topic, brokerName=broker-2, queueId=5]

这个日志的意思是,10.238.18.122@29 这个实例尝试锁住 queueId = 5 失败,因为 10.238.18.6@29

正在持有这个锁。那么为什么会发生这种情况呢?

RocketMQ 多队列顺序消费的原理

RocketMQ 想要实现多队列顺序消费,首先需要指定 hashKey,通过 hashKey

消息会被放入特定的队列,消费者消费这个队列的时候,如果指定了顺序消费,是 单线程消费 的,这样就保证了同一队列内有序。

那么是如何保证每个队列是单线程消费的呢?每个 Broker 维护一个:

代码语言:txt
复制
private final ConcurrentMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable =
代码语言:txt
复制
        new ConcurrentHashMap<String, ConcurrentHashMap<MessageQueue, LockEntry>>(1024);

他是一个 ConcurrentMap<消费组名称, ConcurrentHashMap<消息队列, 锁对象>>。锁对象 LockEntry 包括:

RebalanceLockManager.java:

代码语言:txt
复制
//读取 rocketmq.broker.rebalance.lockMaxLiveTime 这个环境变量,默认 60s
代码语言:txt
复制
private final static long REBALANCE_LOCK_MAX_LIVE_TIME = Long.parseLong(System.getProperty(
代码语言:txt
复制
        "rocketmq.broker.rebalance.lockMaxLiveTime", "60000"));
代码语言:txt
复制
static class LockEntry {
代码语言:txt
复制
    //RocketMQ 客户端唯一 id
代码语言:txt
复制
    private String clientId;
代码语言:txt
复制
    private volatile long lastUpdateTimestamp = System.currentTimeMillis();
代码语言:txt
复制
    //省略getter setter
代码语言:txt
复制
    public boolean isLocked(final String clientId) {
代码语言:txt
复制
        boolean eq = this.clientId.equals(clientId);
代码语言:txt
复制
        return eq && !this.isExpired();
代码语言:txt
复制
    }
代码语言:txt
复制
    public boolean isExpired() {
代码语言:txt
复制
        // 在 REBALANCE_LOCK_MAX_LIVE_TIME 这么长时间后过期
代码语言:txt
复制
        boolean expired =
代码语言:txt
复制
            (System.currentTimeMillis() - this.lastUpdateTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME;
代码语言:txt
复制
        return expired;
代码语言:txt
复制
    }
代码语言:txt
复制
}

RocketMQ 客户端发送 LOCK_BATCH_MQ 请求到 Broker 上面,Broker 会将客户端请求封装成为 LockEntry

并尝试更新这个 Map,如果更新成功就是获取到了锁,如果失败则没有获取这个锁。Broker 的详细更新逻辑是(

感兴趣可以查看,也可以直接跳过,不影响理解,后面有便于理解的图片 ):

代码语言:txt
复制
public boolean tryLock(final String group, final MessageQueue mq, final String clientId) {
代码语言:txt
复制
    //判断没有已经锁住
代码语言:txt
复制
    if (!this.isLocked(group, mq, clientId)) {
代码语言:txt
复制
        try {
代码语言:txt
复制
            //获取锁,这个锁是实例内的,因为每个 broker 维护自己的队列锁表,并不共享
代码语言:txt
复制
            this.lock.lockInterruptibly();
代码语言:txt
复制
            try {
代码语言:txt
复制
                //尝试获取,判断是否存在,存在就判断是否过期
代码语言:txt
复制
                ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
代码语言:txt
复制
                if (null == groupValue) {
代码语言:txt
复制
                    groupValue = new ConcurrentHashMap<>(32);
代码语言:txt
复制
                    this.mqLockTable.put(group, groupValue);
代码语言:txt
复制
                }
代码语言:txt
复制
                LockEntry lockEntry = groupValue.get(mq);
代码语言:txt
复制
                if (null == lockEntry) {
代码语言:txt
复制
                    lockEntry = new LockEntry();
代码语言:txt
复制
                    lockEntry.setClientId(clientId);
代码语言:txt
复制
                    groupValue.put(mq, lockEntry);
代码语言:txt
复制
                    log.info("tryLock, message queue not locked, I got it. Group: {} NewClientId: {} {}",
代码语言:txt
复制
                        group,
代码语言:txt
复制
                        clientId,
代码语言:txt
复制
                        mq);
代码语言:txt
复制
                }
代码语言:txt
复制
                if (lockEntry.isLocked(clientId)) {
代码语言:txt
复制
                    lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
代码语言:txt
复制
                    return true;
代码语言:txt
复制
                }
代码语言:txt
复制
                String oldClientId = lockEntry.getClientId();
代码语言:txt
复制
                if (lockEntry.isExpired()) {
代码语言:txt
复制
                    lockEntry.setClientId(clientId);
代码语言:txt
复制
                    lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
代码语言:txt
复制
                    log.warn(
代码语言:txt
复制
                        "tryLock, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}",
代码语言:txt
复制
                        group,
代码语言:txt
复制
                        oldClientId,
代码语言:txt
复制
                        clientId,
代码语言:txt
复制
                        mq);
代码语言:txt
复制
                    return true;
代码语言:txt
复制
                }
代码语言:txt
复制
                //这里就是我们刚刚看到的日志
代码语言:txt
复制
                log.warn(
代码语言:txt
复制
                    "tryLock, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}",
代码语言:txt
复制
                    group,
代码语言:txt
复制
                    oldClientId,
代码语言:txt
复制
                    clientId,
代码语言:txt
复制
                    mq);
代码语言:txt
复制
                return false;
代码语言:txt
复制
            } finally {
代码语言:txt
复制
                this.lock.unlock();
代码语言:txt
复制
            }
代码语言:txt
复制
        } catch (InterruptedException e) {
代码语言:txt
复制
            log.error("putMessage exception", e);
代码语言:txt
复制
        }
代码语言:txt
复制
    } else {
代码语言:txt
复制
    }
代码语言:txt
复制
    return true;
代码语言:txt
复制
}
代码语言:txt
复制
//判断是否是已经锁住了
代码语言:txt
复制
private boolean isLocked(final String group, final MessageQueue mq, final String clientId) {
代码语言:txt
复制
    //通过消费组名称获取
代码语言:txt
复制
    ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
代码语言:txt
复制
    //如果不为 null
代码语言:txt
复制
    if (groupValue != null) {
代码语言:txt
复制
        //尝试获取 lockEntry,看是否存在
代码语言:txt
复制
        LockEntry lockEntry = groupValue.get(mq);
代码语言:txt
复制
        if (lockEntry != null) {
代码语言:txt
复制
            //如果存在,判断是否过期
代码语言:txt
复制
            boolean locked = lockEntry.isLocked(clientId);
代码语言:txt
复制
            if (locked) {
代码语言:txt
复制
                lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
代码语言:txt
复制
            }
代码语言:txt
复制
            return locked;
代码语言:txt
复制
        }
代码语言:txt
复制
    }
代码语言:txt
复制
    return false;
代码语言:txt
复制
}

每个 MQ 客户端,会定时发送 LOCK_BATCH_MQ 请求,并且在本地维护获取到锁的所有队列:

ProcessQueue.java:

代码语言:txt
复制
//定时发送 **LOCK_BATCH_MQ** 间隔
代码语言:txt
复制
public final static long REBALANCE_LOCK_INTERVAL = Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockInterval", "20000"));

ConsumeMessageOrderlyService.java:

代码语言:txt
复制
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
代码语言:txt
复制
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
代码语言:txt
复制
        @Override
代码语言:txt
复制
        public void run() {
代码语言:txt
复制
            ConsumeMessageOrderlyService.this.lockMQPeriodically();
代码语言:txt
复制
        }
代码语言:txt
复制
    }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
代码语言:txt
复制
}

流程图如下所示

图片上传失败...(image-637be5-1625137977763)

ConsumeMessageOrderlyService 在关闭的时候,会 unlock 所有的队列:

代码语言:txt
复制
public void shutdown() {
代码语言:txt
复制
    this.stopped = true;
代码语言:txt
复制
    this.scheduledExecutorService.shutdown();
代码语言:txt
复制
    this.consumeExecutor.shutdown();
代码语言:txt
复制
    if (MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
代码语言:txt
复制
        this.unlockAllMQ();
代码语言:txt
复制
    }
代码语言:txt
复制
}

问题出现原因

我们这里客户端定时发送 LOCK_BATCH_MQ 间隔是默认的 20s, Broker 端锁过期的时间也是默认的 60s。

我们的集群容器编排使用了 k8s,并且有实例迁移的功能。在集群压力大的时候,自动扩容新的 Node

(可以理解为虚拟机)并将创建新的服务实例部署上去。集群某些服务压力小的时候,某些服务实例会缩容下去,这时候就不需要那么多 Node 了,就会回收一部分

Node,但是被回收的 Node 上面还有不能缩容的服务实例, 这时候就需要将这些服务实例迁移到其他 Node 上面

。这里我们的业务实例就是发生了这个情况。

在问题出现的时候,发生了 迁移 ,老的实例被关闭,但是没有等待 ConsumeMessageOrderlyService#shutdown

的执行, 导致锁没有被主动释放,而是等待 60s 的锁过期时间后,新的实例才拿到队列锁开始消费

问题解决

  1. 在下个版本,加入针对 RocketMQ 客户端的优雅关闭逻辑
  2. 所有服务实例(RocketMQ 客户端)配置 rocketmq.client.rebalance.lockInterval 缩短心跳时间(5s),RocketMQ Broker 配置 rocketmq.broker.rebalance.lockMaxLiveTime 缩短过期时间(例如 15s),但是保持过期时间是心跳时间的 3 倍(集群中的 3 倍设计公理)

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 问题分析
  • RocketMQ 多队列顺序消费的原理
  • 问题出现原因
  • 问题解决
相关产品与服务
消息队列 CMQ
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档