前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊chronos的addMessage

聊聊chronos的addMessage

原创
作者头像
code4it
修改2020-01-03 10:04:59
3200
修改2020-01-03 10:04:59
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下chronos的addMessage

addMessage

DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/services/MqPullService.java

代码语言:javascript
复制
public class MqPullService implements Runnable {
    private static final Logger LOGGER = LoggerFactory.getLogger(MqPullService.class);
​
    private static final PullConfig PULL_CONFIG = ConfigManager.getConfig().getPullConfig();
    private static final Batcher BATCHER = Batcher.getInstance();
    private volatile boolean shouldStop = false;
    private CountDownLatch cdl;
​
    private final List<Long> succOffsets = new ArrayList<>();
    private final List<Long> failOffsets = new ArrayList<>();
    private SimpleCarreraConsumer carreraConsumer;
    private String mqPullServiceName;
​
    private final int INTERNAL_PAIR_COUNT = 5000;
    private final BlockingQueue<InternalPair> blockingQueue = new ArrayBlockingQueue<>(INTERNAL_PAIR_COUNT);
​
    //......
​
    private void addMessage(final InternalKey internalKey, final InternalValue internalValue, final int action) {
        if (internalKey.getType() == MsgTypes.DELAY.getValue()) {
            MetricService.incPullQps(internalValue.getTopic(), MetricMsgAction.ADD, MetricMsgType.DELAY);
​
            if (BATCHER.checkAndPutToDefaultCF(internalKey, internalValue.toJsonString(), internalValue.getTopic(), action)) {
                MetricService.incWriteQps(internalValue.getTopic(), MetricMsgAction.ADD, MetricMsgType.DELAY, MetricMsgToOrFrom.DB);
​
                return;
            }
​
            MetricService.putMsgSizePercent(internalValue.getTopic(), internalValue.toJsonString().getBytes(Charsets.UTF_8).length);
            MetricService.incWriteQps(internalValue.getTopic(), MetricMsgAction.ADD, MetricMsgType.DELAY, MetricMsgToOrFrom.SEND);
​
            putToBlockingQueue(internalKey, internalValue);
        } else if (internalKey.getType() == MsgTypes.LOOP_DELAY.getValue()
                || internalKey.getType() == MsgTypes.LOOP_EXPONENT_DELAY.getValue()) {
​
            MetricMsgType msgType = MetricMsgType.LOOP_DELAY;
            if (internalKey.getType() == MsgTypes.LOOP_EXPONENT_DELAY.getValue()) {
                msgType = MetricMsgType.LOOP_EXPONENT_DELAY;
            }
​
            MetricService.incPullQps(internalValue.getTopic(), MetricMsgAction.ADD, msgType);
​
            while (true) {
                if (KeyUtils.isInvalidMsg(internalKey)) {
                    return;
                }
​
                // 循环消息只写入rocksdb一次, seek到的时候再进行添加
                if (BATCHER.checkAndPutToDefaultCF(internalKey, internalValue.toJsonString(), internalValue.getTopic(), action)) {
                    MetricService.incWriteQps(internalValue.getTopic(), MetricMsgAction.ADD, msgType, MetricMsgToOrFrom.DB);
                    return;
                }
​
                MetricService.incWriteQps(internalValue.getTopic(), MetricMsgAction.ADD, msgType, MetricMsgToOrFrom.SEND);
​
                putToBlockingQueue(new InternalKey(internalKey), internalValue);
                internalKey.nextUniqDelayMsgId();
            }
        } else {
            MetricService.incPullQps(internalValue.getTopic(), MetricMsgAction.ADD, MetricMsgType.UNKNOWN);
​
            LOGGER.error("should not go here, invalid message type:{}, internalKey:{}", internalKey.getType(),
                    internalKey.genUniqDelayMsgId());
        }
    }
​
    private void putToBlockingQueue(InternalKey internalKey, InternalValue internalValue) {
        try {
            blockingQueue.put(new InternalPair(internalKey, internalValue));
        } catch (InterruptedException e) {
            LOGGER.error("error while put to blockingQueue, dMsgId:{}", internalKey.genUniqDelayMsgId());
        }
    }
​
    //......
}
  • MqPullService实现了Runnable接口,其addMessage方法执行BATCHER.checkAndPutToDefaultCF(internalKey, internalValue.toJsonString(), internalValue.getTopic(), action)

InternalKey

DDMQ/carrera-common/src/main/java/com/xiaojukeji/carrera/chronos/model/InternalKey.java

代码语言:javascript
复制
public class InternalKey {
    private static final String SEPARATOR = "-";
    private static final int LEN_UUID = 36;
    private static final long ONE_DAY_SECONDS = 24 * 60 * 60;
​
    private long timestamp;
    private int type;
    private long expire;
    private long times;
    private long timed;
    private long interval;
    private int innerTopicSeq;
    private String uuid;
    private int segmentNum;
    private int segmentIndex;
​
    //......
}
  • InternalKey定义了timestamp、times、interval等属性

InternalValue

DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/model/InternalValue.java

代码语言:javascript
复制
public class InternalValue {
    @JSONField(name="a")
    private String topic;
​
    @JSONField(name="b")
    private String body;
​
    @JSONField(name="c")
    private long createTime;
​
    @JSONField(name="d")
    private String tags;
​
    @JSONField(name="e")
    private Map<String, String> properties;
​
    //......
}
  • InternalValue定义了topic、body、createTime、tags、properties属性

checkAndPutToDefaultCF

DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/autobatcher/Batcher.java

代码语言:javascript
复制
public class Batcher {
    private static final Logger LOGGER = LoggerFactory.getLogger(Batcher.class);
​
    private static final int PULL_BATCH_ITEM_NUM = ConfigManager.getConfig().getPullConfig().getPullBatchItemNum();
    private static final int MSG_BYTE_BASE_LEN = ConfigManager.getConfig().getPullConfig().getMsgByteBaseLen();
​
    private WriteBatch wb = new WriteBatch();
    private volatile int itemNum = 0;
    private static volatile Batcher instance = null;
​
    public static volatile ReentrantLock lock = new ReentrantLock();
​
    //......
​
    public boolean checkAndPutToDefaultCF(final InternalKey internalKey, final String strVal, final String topic, final int action) {
        lock.lock();
        try {
            if (KeyUtils.afterSeekTimestamp(internalKey.getTimestamp())) {
                byte[] bytes = strVal.getBytes(Charsets.UTF_8);
                MetricService.putMsgSizePercent(topic, bytes.length);
                if (bytes.length <= MSG_BYTE_BASE_LEN) {
                    putToDefaultCF(internalKey.genUniqDelayMsgId(), bytes, topic, internalKey, action);
                } else {
                    // 如果字节数据超过一定长度, 则进行字节数组切分, 以便降低io.util
                    List<byte[]> list = ByteUtils.divideArray(bytes, MSG_BYTE_BASE_LEN);
                    final int segmentNum = list.size();
                    for (int segmentIndex = 0; segmentIndex < segmentNum; segmentIndex++) {
                        internalKey.setSegmentNum(segmentNum);
                        internalKey.setSegmentIndex(Constants.SEGMENT_INDEX_BASE + segmentIndex);
                        putToDefaultCF(internalKey.genUniqDelayMsgIdWithSegmentInfoIfHas(), list.get(segmentIndex), topic, internalKey, action);
                        LOGGER.info("segment split, dMsgId:{}, len:{}, value.totalLen:{}", internalKey.genUniqDelayMsgIdWithSegmentInfoIfHas(), list.get(segmentIndex).length, bytes.length);
                    }
                }
                return true;
            }
            return false;
        } finally {
            lock.unlock();
        }
    }
​
    public void putToDefaultCF(final String key, final byte[] value, final String topic, final InternalKey internalKey, final int action) {
        put(CFManager.CFH_DEFAULT, key.getBytes(Charsets.UTF_8), value, topic, internalKey, action);
    }
​
    private void put(final ColumnFamilyHandle cfh, final byte[] key, final byte[] value, final String topic, final InternalKey internalKey, final int action) {
        lock.lock();
        try {
            int len = 0;
            if (value != null) {
                len = value.length;
            }
            wb.put(cfh, key, value);
​
            LOGGER.info("put to cf, dMsgId:{}, len:{}", new String(key), len);
​
            itemNum++;
            checkFrequency();
​
            if (action == Actions.ADD.getValue()) {
                if (internalKey.getType() == MsgTypes.DELAY.getValue()) {
                    MetricService.incWriteQpsAfterSplit(topic, MetricMsgAction.ADD, MetricMsgType.DELAY);
                } else if (internalKey.getType() == MsgTypes.LOOP_DELAY.getValue()) {
                    MetricService.incWriteQpsAfterSplit(topic, MetricMsgAction.ADD, MetricMsgType.LOOP_DELAY);
                } else if (internalKey.getType() == MsgTypes.LOOP_EXPONENT_DELAY.getValue()) {
                    MetricService.incWriteQpsAfterSplit(topic, MetricMsgAction.ADD, MetricMsgType.LOOP_EXPONENT_DELAY);
                }
            } else {
                if (internalKey.getType() == MsgTypes.DELAY.getValue()) {
                    MetricService.incWriteQps(topic, MetricMsgAction.CANCEL, MetricMsgType.DELAY, MetricMsgToOrFrom.DB);
                    MetricService.incWriteQpsAfterSplit(topic, MetricMsgAction.CANCEL, MetricMsgType.DELAY);
                } else if (internalKey.getType() == MsgTypes.LOOP_DELAY.getValue()) {
                    MetricService.incWriteQps(topic, MetricMsgAction.CANCEL, MetricMsgType.LOOP_DELAY, MetricMsgToOrFrom.DB);
                    MetricService.incWriteQpsAfterSplit(topic, MetricMsgAction.CANCEL, MetricMsgType.LOOP_DELAY);
                } else if (internalKey.getType() == MsgTypes.LOOP_EXPONENT_DELAY.getValue()) {
                    MetricService.incWriteQps(topic, MetricMsgAction.CANCEL, MetricMsgType.LOOP_EXPONENT_DELAY, MetricMsgToOrFrom.DB);
                    MetricService.incWriteQpsAfterSplit(topic, MetricMsgAction.CANCEL, MetricMsgType.LOOP_EXPONENT_DELAY);
                }
            }
        } finally {
            lock.unlock();
        }
    }
​
    private void checkFrequency() {
        if (itemNum >= PULL_BATCH_ITEM_NUM) {
            flush();
        }
    }
​
     public void flush() {
        lock.lock();
        try {
            if (itemNum > 0) {
                // make sure write succ
                while (!RDB.writeSync(wb)) {
                    LOGGER.error("error while flush to db!");
                    try {
                        TimeUnit.MILLISECONDS.sleep(200);
                    } catch (InterruptedException e) {
                    }
                }
                wb.clear();
                itemNum = 0;
            }
        } finally {
            lock.unlock();
        }
    }
​
    //......
}    
  • Batcher的checkAndPutToDefaultCF主要是执行putToDefaultCF,而putToDefaultCF主要是执行put方法,该方法会执行wb.put(cfh, key, value),将数据写入到rocksdb的WriteBatch;之后执行checkFrequency在必要的时候进行flush;flush方法主要是执行RDB.writeSync(wb)以及wb.clear()

writeSync

DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/db/RDB.java

代码语言:javascript
复制
public class RDB {
    private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(RDB.class);
​
    static RocksDB DB;
​
    public static void init(final String dbPath) {
        try {
            final long start = System.currentTimeMillis();
            boolean result = FileIOUtils.createOrExistsDir(new File(dbPath));
            assert(result != false);
​
            result = FileIOUtils.createOrExistsDir(new File(DB_PATH_BACKUP));
            assert(result != false);
​
            result = FileIOUtils.createOrExistsDir(new File(DB_PATH_RESTORE));
            assert(result != false);
​
            DB = RocksDB.open(OptionsConfig.DB_OPTIONS, dbPath, CF_DESCRIPTORS, CF_HANDLES);
            assert (DB != null);
​
            initCFManger(CF_HANDLES);
​
            final long cost = System.currentTimeMillis() - start;
            LOGGER.info("succ open rocksdb, path:{}, cost:{}ms", dbPath, cost);
        } catch (RocksDBException e) {
            LOGGER.error("error while open rocksdb, path:{}, err:{}", dbPath, e.getMessage(), e);
        }
    }
​
    //......
​
    public static boolean writeSync(final WriteBatch writeBatch) {
        return write(OptionsConfig.WRITE_OPTIONS_SYNC, writeBatch);
    }
​
    private static boolean write(final WriteOptions writeOptions, final WriteBatch writeBatch) {
        try {
            DB.write(writeOptions, writeBatch);
            LOGGER.debug("succ write writeBatch, size:{}", writeBatch.count());
        } catch (RocksDBException e) {
            // TODO: 2017/11/8 上报写入失败
            LOGGER.error("error while write batch, err:{}", e.getMessage(), e);
            return false;
        }
        return true;
    }
​
    //......
}
  • writeSync方法主要是使用OptionsConfig.WRITE_OPTIONS_SYNC参数执行RocksDB.write方法

小结

MqPullService实现了Runnable接口,其addMessage方法执行BATCHER.checkAndPutToDefaultCF(internalKey, internalValue.toJsonString(), internalValue.getTopic(), action);Batcher的checkAndPutToDefaultCF主要是执行putToDefaultCF,而putToDefaultCF主要是执行put方法,该方法会执行wb.put(cfh, key, value),将数据写入到rocksdb的WriteBatch;之后执行checkFrequency在必要的时候进行flush;flush方法主要是执行RDB.writeSync(wb)以及wb.clear()

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • addMessage
  • InternalKey
  • InternalValue
  • checkAndPutToDefaultCF
  • writeSync
  • 小结
  • doc
相关产品与服务
批量计算
批量计算(BatchCompute,Batch)是为有大数据计算业务的企业、科研单位等提供高性价比且易用的计算服务。批量计算 Batch 可以根据用户提供的批处理规模,智能地管理作业和调动其所需的最佳资源。有了 Batch 的帮助,您可以将精力集中在如何分析和处理数据结果上。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档