首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RocketMQ system busy

RocketMQ system busy

作者头像
疯狂的KK
发布2022-11-11 09:38:45
3770
发布2022-11-11 09:38:45
举报

近期线上MQ持续发生了消息丢失的情况,因为磁盘扩容问题,在对mq broker进行升级,今天反馈某单未进行结算,也未产生异常,接到反馈开始定位。

首先定位消费记录,发现并没有消费记录,然后进行单据状态查询,是正常节点状态,然后查询单据发送节点,发现满足发送条件,再事务提交后正常发送,接着查询发送记录,定位到错误如下:

org.apache.rocketmq.client.exception.MQBrokerException: CODE: 2  DESC: [REJECTREQUEST]system busy, start flow control for a while
For more information, please visit the url, http://rocketmq.apache.org/docs/faq/
  at org.apache.rocketmq.client.impl.MQClientAPIImpl.processSendResponse(MQClientAPIImpl.java:556) ~[rocketmq-client-4.5.0.jar!/:4.5.0]
  at org.apache.rocketmq.client.impl.MQClientAPIImpl.access$000(MQClientAPIImpl.java:155) ~[rocketmq-client-4.5.0.jar!/:4.5.0]
  at org.apache.rocketmq.client.impl.MQClientAPIImpl$1.operationComplete(MQClientAPIImpl.java:396) ~[rocketmq-client-4.5.0.jar!/:4.5.0]
  at org.apache.rocketmq.remoting.netty.ResponseFuture.executeInvokeCallback(ResponseFuture.java:54) ~[rocketmq-remoting-4.5.0.jar!/:4.5.0]
  at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract$2.run(NettyRemotingAbstract.java:309) ~[rocketmq-remoting-4.5.0.jar!/:4.5.0]
  at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
  at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]

首先将类似单据重推后解决业务问题,定位原因,搜索全局异常如下

if (pair.getObject1().rejectRequest()) {
                final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                    "[REJECTREQUEST]system busy, start flow control for a while");
                response.setOpaque(opaque);
                ctx.writeAndFlush(response);
                return;
}

在源码中位置

org.apache.rocketmq.remoting.netty.NettyRemotingAbstract

向上查询

public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        final RemotingCommand cmd = msg;
        if (cmd != null) {
            switch (cmd.getType()) {
                case REQUEST_COMMAND:
                    processRequestCommand(ctx, cmd);
                    break;
                case RESPONSE_COMMAND:
                    processResponseCommand(ctx, cmd);
                    break;
                default:
                    break;
            }
        }
    }
    
  class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
            processMessageReceived(ctx, msg);
        }
    }

结合近期mq在进行频繁broker内存参数调整,磁盘抽取,查询资料得知在broker中如下代码

org.apache.rocketmq.broker.latency.BrokerFastFailure#cleanExpiredRequest

private void cleanExpiredRequest() {
        while (this.brokerController.getMessageStore().isOSPageCacheBusy()) {
            try {
                if (!this.brokerController.getSendThreadPoolQueue().isEmpty()) {
                    final Runnable runnable = this.brokerController.getSendThreadPoolQueue().poll(0, TimeUnit.SECONDS);
                    if (null == runnable) {
                        break;
                    }

                    final RequestTask rt = castRunnable(runnable);
                    rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", System.currentTimeMillis() - rt.getCreateTimestamp(), this.brokerController.getSendThreadPoolQueue().size()));
                } else {
                    break;
                }
            } catch (Throwable ignored) {
            }
        }

        cleanExpiredRequestInQueue(this.brokerController.getSendThreadPoolQueue(),
            this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue());

        cleanExpiredRequestInQueue(this.brokerController.getPullThreadPoolQueue(),
            this.brokerController.getBrokerConfig().getWaitTimeMillsInPullQueue());

        cleanExpiredRequestInQueue(this.brokerController.getHeartbeatThreadPoolQueue(),
            this.brokerController.getBrokerConfig().getWaitTimeMillsInHeartbeatQueue());

        cleanExpiredRequestInQueue(this.brokerController.getEndTransactionThreadPoolQueue(), this
            .brokerController.getBrokerConfig().getWaitTimeMillsInTransactionQueue());
    }

客户端链接超过了默认等待时间,或者调大发送消息线程池的数量,默认值为1在mq的4.x后引进了相关配置,另外应在客户端配置发送失败重试。但主要原因是由于mq消息积压导致内存写入变慢超时了。随着集群扩展希望能得到解决,另外此类消息是会丢失消息的。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-10-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 赵KK日常技术记录 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档