前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊chronos的cancelMessage

聊聊chronos的cancelMessage

原创
作者头像
code4it
修改2020-01-07 09:53:21
4160
修改2020-01-07 09:53:21
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下chronos的cancelMessage

MqPullService

DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/services/MqPullService.java

代码语言:javascript
复制
public class MqPullService implements Runnable {
    private static final Logger LOGGER = LoggerFactory.getLogger(MqPullService.class);
​
    private static final PullConfig PULL_CONFIG = ConfigManager.getConfig().getPullConfig();
    private static final Batcher BATCHER = Batcher.getInstance();
    private volatile boolean shouldStop = false;
    private CountDownLatch cdl;
​
    private final List<Long> succOffsets = new ArrayList<>();
    private final List<Long> failOffsets = new ArrayList<>();
    private SimpleCarreraConsumer carreraConsumer;
    private String mqPullServiceName;
​
    private final int INTERNAL_PAIR_COUNT = 5000;
    private final BlockingQueue<InternalPair> blockingQueue = new ArrayBlockingQueue<>(INTERNAL_PAIR_COUNT);
​
    //......
​
    private void cancelMessage(final InternalKey internalKey, final String topic, final int action) {
        InternalKey tombStoneInternalKey = internalKey.cloneTombstoneInternalKey();
​
        if (internalKey.getType() == MsgTypes.DELAY.getValue()) {
            MetricService.incPullQps(topic, MetricMsgAction.CANCEL, MetricMsgType.DELAY);
​
            BATCHER.putToDefaultCF(tombStoneInternalKey.genUniqDelayMsgId(),
                    new CancelWrap(internalKey.genUniqDelayMsgId(), topic).toJsonString(), topic, tombStoneInternalKey, action);
        } else if (internalKey.getType() == MsgTypes.LOOP_DELAY.getValue()) {
            MetricService.incPullQps(topic, MetricMsgAction.CANCEL, MetricMsgType.LOOP_DELAY);
​
            BATCHER.putLoopTombstoneKey(tombStoneInternalKey, internalKey, topic, action);
        } else if (internalKey.getType() == MsgTypes.LOOP_EXPONENT_DELAY.getValue()) {
            MetricService.incPullQps(topic, MetricMsgAction.CANCEL, MetricMsgType.LOOP_EXPONENT_DELAY);
​
            BATCHER.putLoopTombstoneKey(tombStoneInternalKey, internalKey, topic, action);
        } else {
            MetricService.incPullQps(topic, MetricMsgAction.CANCEL, MetricMsgType.UNKNOWN);
​
            LOGGER.error("should not go here, invalid message type: {}, internalKey: {}", internalKey.getType(),
                    internalKey.genUniqDelayMsgId());
        }
    }
​
    //......
}
  • cancelMessage方法首先通过internalKey.cloneTombstoneInternalKey()构造tombStoneInternalKey,之后对于MsgTypes.DELAY类型的执行BATCHER.putToDefaultCF(tombStoneInternalKey.genUniqDelayMsgId(), new CancelWrap(internalKey.genUniqDelayMsgId(), topic).toJsonString(), topic, tombStoneInternalKey, action);对于MsgTypes.LOOP_DELAY及MsgTypes.LOOP_EXPONENT_DELAY的执行BATCHER.putLoopTombstoneKey(tombStoneInternalKey, internalKey, topic, action)

InternalKey

DDMQ/carrera-common/src/main/java/com/xiaojukeji/carrera/chronos/model/InternalKey.java

代码语言:javascript
复制
public class InternalKey {
    private static final String SEPARATOR = "-";
    private static final int LEN_UUID = 36;
    private static final long ONE_DAY_SECONDS = 24 * 60 * 60;
​
    private long timestamp;
    private int type;
    private long expire;
    private long times;
    private long timed;
    private long interval;
    private int innerTopicSeq;
    private String uuid;
    private int segmentNum;
    private int segmentIndex;
​
    //......
​
    public InternalKey cloneTombstoneInternalKey() {
        InternalKey tombstoneInternalKey = new InternalKey(this);
        tombstoneInternalKey.setType(MsgTypes.TOMBSTONE.getValue());
        return tombstoneInternalKey;
    }
​
    //......
}
  • cloneTombstoneInternalKey方法设置type为MsgTypes.TOMBSTONE.getValue()

CancelWrap

DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/model/CancelWrap.java

代码语言:javascript
复制
public class CancelWrap {
    private String uniqDelayMsgId;
    private String topic;
​
    public CancelWrap() {
    }
​
    public CancelWrap(String uniqDelayMsgId, String topic) {
        this.uniqDelayMsgId = uniqDelayMsgId;
        this.topic = topic;
    }
​
    public String getUniqDelayMsgId() {
        return uniqDelayMsgId;
    }
​
    public void setUniqDelayMsgId(String uniqDelayMsgId) {
        this.uniqDelayMsgId = uniqDelayMsgId;
    }
​
    public String getTopic() {
        return topic;
    }
​
    public void setTopic(String topic) {
        this.topic = topic;
    }
​
    public String toJsonString() {
        return JsonUtils.toJsonString(this);
    }
​
    @Override
    public String toString() {
        return "CancelWrap{" +
                "uniqDelayMsgId='" + uniqDelayMsgId + '\'' +
                ", topic='" + topic + '\'' +
                '}';
    }
}
  • CancelWrap定义了uniqDelayMsgId及topic两个属性

Batcher

DDMQ/carrera-chronos/src/main/java/com/xiaojukeji/chronos/autobatcher/Batcher.java

代码语言:javascript
复制
public class Batcher {
    private static final Logger LOGGER = LoggerFactory.getLogger(Batcher.class);
​
    private static final int PULL_BATCH_ITEM_NUM = ConfigManager.getConfig().getPullConfig().getPullBatchItemNum();
    private static final int MSG_BYTE_BASE_LEN = ConfigManager.getConfig().getPullConfig().getMsgByteBaseLen();
​
    private WriteBatch wb = new WriteBatch();
    private volatile int itemNum = 0;
    private static volatile Batcher instance = null;
​
    public static volatile ReentrantLock lock = new ReentrantLock();
​
    //......
​
    public void putLoopTombstoneKey(final InternalKey tombstoneInternalKey, InternalKey internalKey, final String topic, final int action) {
        lock.lock();
        try {
            // 指数循环
            // 1536811267-4-1536911267-3-0-300-0-9e7952e0-b709-11e8-a709-aafb4bfc0bc5
            // 1536811567-4-1536911267-3-1-300-0-9e7952e0-b709-11e8-a709-aafb4bfc0bc5
            // 1536897967-4-1536911267-3-2-300-0-9e7952e0-b709-11e8-a709-aafb4bfc0bc5
​
            // 普通循环
            // 1536811267-3-1536911267-3-0-10-0-9e7952e0-b709-11e8-a709-aafb4bfc0bc5
            while (!KeyUtils.afterSeekTimestamp(internalKey.getTimestamp())) {
                internalKey = internalKey.nextUniqDelayMsgId();
            }
​
            tombstoneInternalKey.setTimestamp(internalKey.getTimestamp());
            tombstoneInternalKey.setTimes(internalKey.getTimed() + 2);
            tombstoneInternalKey.setTimed(internalKey.getTimed());
​
            if (!KeyUtils.isInvalidMsg(tombstoneInternalKey)) {
                putToDefaultCF(tombstoneInternalKey.genUniqDelayMsgId(),
                        new CancelWrap(internalKey.genUniqDelayMsgId(), topic).toJsonString(), topic, internalKey, action);
            }
        } finally {
            lock.unlock();
        }
    }
​
    //......
}
  • putLoopTombstoneKey方法通过KeyUtils.afterSeekTimestamp(internalKey.getTimestamp())寻找internalKey,然后通过putToDefaultCF添加一条CancelWrap记录

小结

cancelMessage方法首先通过internalKey.cloneTombstoneInternalKey()构造tombStoneInternalKey,之后对于MsgTypes.DELAY类型的执行BATCHER.putToDefaultCF(tombStoneInternalKey.genUniqDelayMsgId(), new CancelWrap(internalKey.genUniqDelayMsgId(), topic).toJsonString(), topic, tombStoneInternalKey, action);对于MsgTypes.LOOP_DELAY及MsgTypes.LOOP_EXPONENT_DELAY的执行BATCHER.putLoopTombstoneKey(tombStoneInternalKey, internalKey, topic, action)

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • MqPullService
  • InternalKey
  • CancelWrap
  • Batcher
  • 小结
  • doc
相关产品与服务
批量计算
批量计算(BatchCompute,Batch)是为有大数据计算业务的企业、科研单位等提供高性价比且易用的计算服务。批量计算 Batch 可以根据用户提供的批处理规模,智能地管理作业和调动其所需的最佳资源。有了 Batch 的帮助,您可以将精力集中在如何分析和处理数据结果上。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档