前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊rocketmq的SERVICE_NOT_AVAILABLE

聊聊rocketmq的SERVICE_NOT_AVAILABLE

原创
作者头像
code4it
修改2019-12-19 10:08:53
1.6K0
修改2019-12-19 10:08:53
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下rocketmq的SERVICE_NOT_AVAILABLE

SERVICE_NOT_AVAILABLE

rocketmq-all-4.6.0-source-release/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java

代码语言:javascript
复制
public class ResponseCode extends RemotingSysResponseCode {
​
    public static final int FLUSH_DISK_TIMEOUT = 10;
​
    public static final int SLAVE_NOT_AVAILABLE = 11;
​
    public static final int FLUSH_SLAVE_TIMEOUT = 12;
​
    public static final int MESSAGE_ILLEGAL = 13;
​
    public static final int SERVICE_NOT_AVAILABLE = 14;
​
    //......
​
}
  • ResponseCode定义了SERVICE_NOT_AVAILABLE

PutMessageStatus

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/PutMessageStatus.java

代码语言:javascript
复制
public enum PutMessageStatus {
    PUT_OK,
    FLUSH_DISK_TIMEOUT,
    FLUSH_SLAVE_TIMEOUT,
    SLAVE_NOT_AVAILABLE,
    SERVICE_NOT_AVAILABLE,
    CREATE_MAPEDFILE_FAILED,
    MESSAGE_ILLEGAL,
    PROPERTIES_SIZE_EXCEEDED,
    OS_PAGECACHE_BUSY,
    UNKNOWN_ERROR,
}
  • PutMessageStatus枚举中有一个是SERVICE_NOT_AVAILABLE

DefaultMessageStore

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java

代码语言:javascript
复制
public class DefaultMessageStore implements MessageStore {
​
    //......
​
    public PutMessageResult putMessage(MessageExtBrokerInner msg) {
        if (this.shutdown) {
            log.warn("message store has shutdown, so putMessage is forbidden");
            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
        }
​
        if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
            long value = this.printTimes.getAndIncrement();
            if ((value % 50000) == 0) {
                log.warn("message store is slave mode, so putMessage is forbidden ");
            }
​
            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
        }
​
        if (!this.runningFlags.isWriteable()) {
            long value = this.printTimes.getAndIncrement();
            if ((value % 50000) == 0) {
                log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());
            }
​
            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
        } else {
            this.printTimes.set(0);
        }
​
        if (msg.getTopic().length() > Byte.MAX_VALUE) {
            log.warn("putMessage message topic length too long " + msg.getTopic().length());
            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
        }
​
        if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
            log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
            return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
        }
​
        if (this.isOSPageCacheBusy()) {
            return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
        }
​
        long beginTime = this.getSystemClock().now();
        PutMessageResult result = this.commitLog.putMessage(msg);
​
        long elapsedTime = this.getSystemClock().now() - beginTime;
        if (elapsedTime > 500) {
            log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
        }
        this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
​
        if (null == result || !result.isOk()) {
            this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
        }
​
        return result;
    }
​
    //......
}
  • putMessage方法在shutdown为true、当前broker的role变为SLAVE、runningFlags.isWriteable()为false时都会返回PutMessageStatus.SERVICE_NOT_AVAILABLE的PutMessageResult

RunningFlags

rocketmq-all-4.6.0-source-release/store/src/main/java/org/apache/rocketmq/store/RunningFlags.java

代码语言:javascript
复制
public class RunningFlags {
​
    private static final int NOT_READABLE_BIT = 1;
​
    private static final int NOT_WRITEABLE_BIT = 1 << 1;
​
    private static final int WRITE_LOGICS_QUEUE_ERROR_BIT = 1 << 2;
​
    private static final int WRITE_INDEX_FILE_ERROR_BIT = 1 << 3;
​
    private static final int DISK_FULL_BIT = 1 << 4;
​
    private volatile int flagBits = 0;
​
    public RunningFlags() {
    }
​
    //......
​
    public boolean isWriteable() {
        if ((this.flagBits & (NOT_WRITEABLE_BIT | WRITE_LOGICS_QUEUE_ERROR_BIT | DISK_FULL_BIT | WRITE_INDEX_FILE_ERROR_BIT)) == 0) {
            return true;
        }
​
        return false;
    }
​
    public boolean getAndMakeReadable() {
        boolean result = this.isReadable();
        if (!result) {
            this.flagBits &= ~NOT_READABLE_BIT;
        }
        return result;
    }
​
    public boolean getAndMakeNotReadable() {
        boolean result = this.isReadable();
        if (result) {
            this.flagBits |= NOT_READABLE_BIT;
        }
        return result;
    }
​
    public boolean getAndMakeWriteable() {
        boolean result = this.isWriteable();
        if (!result) {
            this.flagBits &= ~NOT_WRITEABLE_BIT;
        }
        return result;
    }
​
    public boolean getAndMakeNotWriteable() {
        boolean result = this.isWriteable();
        if (result) {
            this.flagBits |= NOT_WRITEABLE_BIT;
        }
        return result;
    }
​
    public void makeLogicsQueueError() {
        this.flagBits |= WRITE_LOGICS_QUEUE_ERROR_BIT;
    }
​
    public void makeIndexFileError() {
        this.flagBits |= WRITE_INDEX_FILE_ERROR_BIT;
    }
​
    public boolean getAndMakeDiskFull() {
        boolean result = !((this.flagBits & DISK_FULL_BIT) == DISK_FULL_BIT);
        this.flagBits |= DISK_FULL_BIT;
        return result;
    }
​
    public boolean getAndMakeDiskOK() {
        boolean result = !((this.flagBits & DISK_FULL_BIT) == DISK_FULL_BIT);
        this.flagBits &= ~DISK_FULL_BIT;
        return result;
    }
​
    //......
}
  • RunningFlags提供了isWriteable方法,在NOT_WRITEABLE_BIT、WRITE_LOGICS_QUEUE_ERROR_BIT、DISK_FULL_BIT、WRITE_INDEX_FILE_ERROR_BIT的flag下都会返回false;而getAndMakeReadable、getAndMakeNotReadable、getAndMakeWriteable、getAndMakeNotWriteable、makeLogicsQueueError、makeIndexFileError、getAndMakeDiskFull、getAndMakeDiskOK均可能改变flagBits的值

SendMessageProcessor

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java

代码语言:javascript
复制
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
​
    //......
​
    private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand response,
                                                   RemotingCommand request, MessageExt msg,
                                                   SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx,
                                                   int queueIdInt) {
        if (putMessageResult == null) {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("store putMessage return null");
            return response;
        }
        boolean sendOK = false;
​
        switch (putMessageResult.getPutMessageStatus()) {
            // Success
            case PUT_OK:
                sendOK = true;
                response.setCode(ResponseCode.SUCCESS);
                break;
            case FLUSH_DISK_TIMEOUT:
                response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT);
                sendOK = true;
                break;
            case FLUSH_SLAVE_TIMEOUT:
                response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT);
                sendOK = true;
                break;
            case SLAVE_NOT_AVAILABLE:
                response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
                sendOK = true;
                break;
​
            // Failed
            case CREATE_MAPEDFILE_FAILED:
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("create mapped file failed, server is busy or broken.");
                break;
            case MESSAGE_ILLEGAL:
            case PROPERTIES_SIZE_EXCEEDED:
                response.setCode(ResponseCode.MESSAGE_ILLEGAL);
                response.setRemark(
                    "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");
                break;
            case SERVICE_NOT_AVAILABLE:
                response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);
                response.setRemark(
                    "service not available now, maybe disk full, " + diskUtil() + ", maybe your broker machine memory too small.");
                break;
            case OS_PAGECACHE_BUSY:
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while");
                break;
            case UNKNOWN_ERROR:
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("UNKNOWN_ERROR");
                break;
            default:
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("UNKNOWN_ERROR DEFAULT");
                break;
        }
​
        String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
        if (sendOK) {
​
            this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
            this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(),
                putMessageResult.getAppendMessageResult().getWroteBytes());
            this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());
​
            response.setRemark(null);
​
            responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
            responseHeader.setQueueId(queueIdInt);
            responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
​
            doResponse(ctx, request, response);
​
            if (hasSendMessageHook()) {
                sendMessageContext.setMsgId(responseHeader.getMsgId());
                sendMessageContext.setQueueId(responseHeader.getQueueId());
                sendMessageContext.setQueueOffset(responseHeader.getQueueOffset());
​
                int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
                int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();
                int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;
​
                sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);
                sendMessageContext.setCommercialSendTimes(incValue);
                sendMessageContext.setCommercialSendSize(wroteSize);
                sendMessageContext.setCommercialOwner(owner);
            }
            return null;
        } else {
            if (hasSendMessageHook()) {
                int wroteSize = request.getBody().length;
                int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);
​
                sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);
                sendMessageContext.setCommercialSendTimes(incValue);
                sendMessageContext.setCommercialSendSize(wroteSize);
                sendMessageContext.setCommercialOwner(owner);
            }
        }
        return response;
    }
​
    //......
}
  • handlePutMessageResult方法会将SERVICE_NOT_AVAILABLE转化为ResponseCode.SERVICE_NOT_AVAILABLE

小结

  • ResponseCode定义了SERVICE_NOT_AVAILABLE;PutMessageStatus枚举中有一个是SERVICE_NOT_AVAILABLE;handlePutMessageResult方法会将SERVICE_NOT_AVAILABLE转化为ResponseCode.SERVICE_NOT_AVAILABLE
  • DefaultMessageStore的putMessage方法在shutdown为true、当前broker的role变为SLAVE、runningFlags.isWriteable()为false时都会返回PutMessageStatus.SERVICE_NOT_AVAILABLE的PutMessageResult
  • RunningFlags提供了isWriteable方法,在NOT_WRITEABLE_BIT、WRITE_LOGICS_QUEUE_ERROR_BIT、DISK_FULL_BIT、WRITE_INDEX_FILE_ERROR_BIT的flag下都会返回false;而getAndMakeReadable、getAndMakeNotReadable、getAndMakeWriteable、getAndMakeNotWriteable、makeLogicsQueueError、makeIndexFileError、getAndMakeDiskFull、getAndMakeDiskOK均可能改变flagBits的值

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • SERVICE_NOT_AVAILABLE
  • PutMessageStatus
  • DefaultMessageStore
  • RunningFlags
  • SendMessageProcessor
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档