前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊rocketmq的SequenceProducerImpl

聊聊rocketmq的SequenceProducerImpl

作者头像
code4it
发布2018-09-17 17:12:56
4690
发布2018-09-17 17:12:56
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下rocketmq的SequenceProducerImpl

SequenceProducerImpl

io/openmessaging/rocketmq/producer/SequenceProducerImpl.java

代码语言:javascript
复制
public class SequenceProducerImpl extends AbstractOMSProducer implements SequenceProducer {

    private BlockingQueue<Message> msgCacheQueue;

    public SequenceProducerImpl(final KeyValue properties) {
        super(properties);
        this.msgCacheQueue = new LinkedBlockingQueue<>();
    }

    @Override
    public KeyValue properties() {
        return properties;
    }

    @Override
    public void send(final Message message) {
        checkMessageType(message);
        org.apache.rocketmq.common.message.Message rmqMessage = OMSUtil.msgConvert((BytesMessage) message);
        try {
            Validators.checkMessage(rmqMessage, this.rocketmqProducer);
        } catch (MQClientException e) {
            throw checkProducerException(rmqMessage.getTopic(), message.headers().getString(MessageHeader.MESSAGE_ID), e);
        }
        msgCacheQueue.add(message);
    }

    @Override
    public void send(final Message message, final KeyValue properties) {
        send(message);
    }

    @Override
    public synchronized void commit() {
        List<Message> messages = new ArrayList<>();
        msgCacheQueue.drainTo(messages);

        List<org.apache.rocketmq.common.message.Message> rmqMessages = new ArrayList<>();

        for (Message message : messages) {
            rmqMessages.add(OMSUtil.msgConvert((BytesMessage) message));
        }

        if (rmqMessages.size() == 0) {
            return;
        }

        try {
            SendResult sendResult = this.rocketmqProducer.send(rmqMessages);
            String[] msgIdArray = sendResult.getMsgId().split(",");
            for (int i = 0; i < messages.size(); i++) {
                Message message = messages.get(i);
                message.headers().put(MessageHeader.MESSAGE_ID, msgIdArray[i]);
            }
        } catch (Exception e) {
            throw checkProducerException("", "", e);
        }
    }

    @Override
    public synchronized void rollback() {
        msgCacheQueue.clear();
    }
}
  • 采用的是LinkedBlockingQueue,send方法实际调用的是添加到队列
  • 另外提供了commit以及rollback方法,都加了synchronized保证对LinkedBlockingQueue操作的线程安全
  • commit的时候,将queue的数据drainTo到list,然后批量发送;rollback的时候清空整个LinkedBlockingQueue

小结

rocketmq的SequenceProducerImpl在send方法的时候不是真正方法,而是添加到队列,只有在commit的时候才批量发送,rollback的时候清空队列。这里的send方法语义不是太好,可以改为pending之类的名称。

doc

  • SequenceProducerImpl
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-07-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • SequenceProducerImpl
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档