首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊rocketmq的ProducerImpl

聊聊rocketmq的ProducerImpl

作者头像
code4it
发布2018-09-17 17:13:36
5200
发布2018-09-17 17:13:36
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下rocketmq的ProducerImpl

ProducerImpl

io/openmessaging/rocketmq/producer/ProducerImpl.java

public class ProducerImpl extends AbstractOMSProducer implements Producer {

    public ProducerImpl(final KeyValue properties) {
        super(properties);
    }

    @Override
    public KeyValue properties() {
        return properties;
    }

    @Override
    public SendResult send(final Message message) {
        return send(message, this.rocketmqProducer.getSendMsgTimeout());
    }

    @Override
    public SendResult send(final Message message, final KeyValue properties) {
        long timeout = properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)
            ? properties.getInt(PropertyKeys.OPERATION_TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout();
        return send(message, timeout);
    }

    private SendResult send(final Message message, long timeout) {
        checkMessageType(message);
        org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);
        try {
            org.apache.rocketmq.client.producer.SendResult rmqResult = this.rocketmqProducer.send(rmqMessage, timeout);
            if (!rmqResult.getSendStatus().equals(SendStatus.SEND_OK)) {
                log.error(String.format("Send message to RocketMQ failed, %s", message));
                throw new OMSRuntimeException("-1", "Send message to RocketMQ broker failed.");
            }
            message.headers().put(MessageHeader.MESSAGE_ID, rmqResult.getMsgId());
            return OMSUtil.sendResultConvert(rmqResult);
        } catch (Exception e) {
            log.error(String.format("Send message to RocketMQ failed, %s", message), e);
            throw checkProducerException(rmqMessage.getTopic(), message.headers().getString(MessageHeader.MESSAGE_ID), e);
        }
    }

    @Override
    public Promise<SendResult> sendAsync(final Message message) {
        return sendAsync(message, this.rocketmqProducer.getSendMsgTimeout());
    }

    @Override
    public Promise<SendResult> sendAsync(final Message message, final KeyValue properties) {
        long timeout = properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)
            ? properties.getInt(PropertyKeys.OPERATION_TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout();
        return sendAsync(message, timeout);
    }

    private Promise<SendResult> sendAsync(final Message message, long timeout) {
        checkMessageType(message);
        org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);
        final Promise<SendResult> promise = new DefaultPromise<>();
        try {
            this.rocketmqProducer.send(rmqMessage, new SendCallback() {
                @Override
                public void onSuccess(final org.apache.rocketmq.client.producer.SendResult rmqResult) {
                    message.headers().put(MessageHeader.MESSAGE_ID, rmqResult.getMsgId());
                    promise.set(OMSUtil.sendResultConvert(rmqResult));
                }

                @Override
                public void onException(final Throwable e) {
                    promise.setFailure(e);
                }
            }, timeout);
        } catch (Exception e) {
            promise.setFailure(e);
        }
        return promise;
    }

    @Override
    public void sendOneway(final Message message) {
        checkMessageType(message);
        org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);
        try {
            this.rocketmqProducer.sendOneway(rmqMessage);
        } catch (Exception ignore) { //Ignore the oneway exception.
        }
    }

    @Override
    public void sendOneway(final Message message, final KeyValue properties) {
        sendOneway(message);
    }
}
  • 发送消息的方法主要是代理给rocketmqProducer
  • 另外调用OMSUtil.msgConvert将api的BytesMessage转换为org.apache.rocketmq.common.message.Message
  • 对于异步采用的是DefaultPromise,其callback为SendCallback

OMSUtil.msgConvert

io/openmessaging/rocketmq/utils/OMSUtil.java

    public static org.apache.rocketmq.common.message.Message msgConvert(BytesMessage omsMessage) {
        org.apache.rocketmq.common.message.Message rmqMessage = new org.apache.rocketmq.common.message.Message();
        rmqMessage.setBody(omsMessage.getBody());

        KeyValue headers = omsMessage.headers();
        KeyValue properties = omsMessage.properties();

        //All destinations in RocketMQ use Topic
        if (headers.containsKey(MessageHeader.TOPIC)) {
            rmqMessage.setTopic(headers.getString(MessageHeader.TOPIC));
            rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
        } else {
            rmqMessage.setTopic(headers.getString(MessageHeader.QUEUE));
            rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "QUEUE");
        }

        for (String key : properties.keySet()) {
            MessageAccessor.putProperty(rmqMessage, key, properties.getString(key));
        }

        //Headers has a high priority
        for (String key : headers.keySet()) {
            MessageAccessor.putProperty(rmqMessage, key, headers.getString(key));
        }

        return rmqMessage;
    }
  • 这里主要是转换header及topic信息

SendCallback

org/apache/rocketmq/client/producer/SendCallback.java

public interface SendCallback {
    void onSuccess(final SendResult sendResult);

    void onException(final Throwable e);
}
  • 对于成功,将SendResult传递过来,对于异常则传递Throwable

DefaultPromise

io/openmessaging/rocketmq/promise/DefaultPromise.java

public class DefaultPromise<V> implements Promise<V> {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultPromise.class);
    private final Object lock = new Object();
    private volatile FutureState state = FutureState.DOING;
    private V result = null;
    private long timeout;
    private long createTime;
    private Throwable exception = null;
    private List<PromiseListener<V>> promiseListenerList;

    public DefaultPromise() {
        createTime = System.currentTimeMillis();
        promiseListenerList = new ArrayList<>();
        timeout = 5000;
    }

    //......

    @Override
    public boolean set(final V value) {
        if (value == null)
            return false;
        this.result = value;
        return done();
    }

    @Override
    public boolean setFailure(final Throwable cause) {
        if (cause == null)
            return false;
        this.exception = cause;
        return done();
    }

    private boolean done() {
        synchronized (lock) {
            if (!isDoing()) {
                return false;
            }

            state = FutureState.DONE;
            lock.notifyAll();
        }

        notifyListeners();
        return true;
    }

    private void notifyListeners() {
        if (promiseListenerList != null) {
            for (PromiseListener<V> listener : promiseListenerList) {
                notifyListener(listener);
            }
        }
    }

    private void notifyListener(final PromiseListener<V> listener) {
        try {
            if (exception != null)
                listener.operationFailed(this);
            else
                listener.operationCompleted(this);
        } catch (Throwable t) {
            LOG.error("notifyListener {} Error:{}", listener.getClass().getSimpleName(), t);
        }
    }

    //......
}
  • set或者setFailure方法都会调用done方法
  • done方法会调用notifyListeners,回调listener的operationCompleted或者operationFailed

小结

  • ProducerImpl主要是为rocketmq自身的rocketmqProducer适配open-messaging的api接口
  • 异步采用自定义的SendCallback回调和DefaultPromise

doc

  • ProducerImpl
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-07-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ProducerImpl
  • OMSUtil.msgConvert
  • SendCallback
  • DefaultPromise
  • 小结
  • doc
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档