聊聊rocketmq的ProducerImpl

本文主要研究一下rocketmq的ProducerImpl

ProducerImpl

io/openmessaging/rocketmq/producer/ProducerImpl.java

public class ProducerImpl extends AbstractOMSProducer implements Producer {

    public ProducerImpl(final KeyValue properties) {
        super(properties);
    }

    @Override
    public KeyValue properties() {
        return properties;
    }

    @Override
    public SendResult send(final Message message) {
        return send(message, this.rocketmqProducer.getSendMsgTimeout());
    }

    @Override
    public SendResult send(final Message message, final KeyValue properties) {
        long timeout = properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)
            ? properties.getInt(PropertyKeys.OPERATION_TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout();
        return send(message, timeout);
    }

    private SendResult send(final Message message, long timeout) {
        checkMessageType(message);
        org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);
        try {
            org.apache.rocketmq.client.producer.SendResult rmqResult = this.rocketmqProducer.send(rmqMessage, timeout);
            if (!rmqResult.getSendStatus().equals(SendStatus.SEND_OK)) {
                log.error(String.format("Send message to RocketMQ failed, %s", message));
                throw new OMSRuntimeException("-1", "Send message to RocketMQ broker failed.");
            }
            message.headers().put(MessageHeader.MESSAGE_ID, rmqResult.getMsgId());
            return OMSUtil.sendResultConvert(rmqResult);
        } catch (Exception e) {
            log.error(String.format("Send message to RocketMQ failed, %s", message), e);
            throw checkProducerException(rmqMessage.getTopic(), message.headers().getString(MessageHeader.MESSAGE_ID), e);
        }
    }

    @Override
    public Promise<SendResult> sendAsync(final Message message) {
        return sendAsync(message, this.rocketmqProducer.getSendMsgTimeout());
    }

    @Override
    public Promise<SendResult> sendAsync(final Message message, final KeyValue properties) {
        long timeout = properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)
            ? properties.getInt(PropertyKeys.OPERATION_TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout();
        return sendAsync(message, timeout);
    }

    private Promise<SendResult> sendAsync(final Message message, long timeout) {
        checkMessageType(message);
        org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);
        final Promise<SendResult> promise = new DefaultPromise<>();
        try {
            this.rocketmqProducer.send(rmqMessage, new SendCallback() {
                @Override
                public void onSuccess(final org.apache.rocketmq.client.producer.SendResult rmqResult) {
                    message.headers().put(MessageHeader.MESSAGE_ID, rmqResult.getMsgId());
                    promise.set(OMSUtil.sendResultConvert(rmqResult));
                }

                @Override
                public void onException(final Throwable e) {
                    promise.setFailure(e);
                }
            }, timeout);
        } catch (Exception e) {
            promise.setFailure(e);
        }
        return promise;
    }

    @Override
    public void sendOneway(final Message message) {
        checkMessageType(message);
        org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);
        try {
            this.rocketmqProducer.sendOneway(rmqMessage);
        } catch (Exception ignore) { //Ignore the oneway exception.
        }
    }

    @Override
    public void sendOneway(final Message message, final KeyValue properties) {
        sendOneway(message);
    }
}
  • 发送消息的方法主要是代理给rocketmqProducer
  • 另外调用OMSUtil.msgConvert将api的BytesMessage转换为org.apache.rocketmq.common.message.Message
  • 对于异步采用的是DefaultPromise,其callback为SendCallback

OMSUtil.msgConvert

io/openmessaging/rocketmq/utils/OMSUtil.java

    public static org.apache.rocketmq.common.message.Message msgConvert(BytesMessage omsMessage) {
        org.apache.rocketmq.common.message.Message rmqMessage = new org.apache.rocketmq.common.message.Message();
        rmqMessage.setBody(omsMessage.getBody());

        KeyValue headers = omsMessage.headers();
        KeyValue properties = omsMessage.properties();

        //All destinations in RocketMQ use Topic
        if (headers.containsKey(MessageHeader.TOPIC)) {
            rmqMessage.setTopic(headers.getString(MessageHeader.TOPIC));
            rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
        } else {
            rmqMessage.setTopic(headers.getString(MessageHeader.QUEUE));
            rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "QUEUE");
        }

        for (String key : properties.keySet()) {
            MessageAccessor.putProperty(rmqMessage, key, properties.getString(key));
        }

        //Headers has a high priority
        for (String key : headers.keySet()) {
            MessageAccessor.putProperty(rmqMessage, key, headers.getString(key));
        }

        return rmqMessage;
    }
  • 这里主要是转换header及topic信息

SendCallback

org/apache/rocketmq/client/producer/SendCallback.java

public interface SendCallback {
    void onSuccess(final SendResult sendResult);

    void onException(final Throwable e);
}
  • 对于成功,将SendResult传递过来,对于异常则传递Throwable

DefaultPromise

io/openmessaging/rocketmq/promise/DefaultPromise.java

public class DefaultPromise<V> implements Promise<V> {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultPromise.class);
    private final Object lock = new Object();
    private volatile FutureState state = FutureState.DOING;
    private V result = null;
    private long timeout;
    private long createTime;
    private Throwable exception = null;
    private List<PromiseListener<V>> promiseListenerList;

    public DefaultPromise() {
        createTime = System.currentTimeMillis();
        promiseListenerList = new ArrayList<>();
        timeout = 5000;
    }

    //......

    @Override
    public boolean set(final V value) {
        if (value == null)
            return false;
        this.result = value;
        return done();
    }

    @Override
    public boolean setFailure(final Throwable cause) {
        if (cause == null)
            return false;
        this.exception = cause;
        return done();
    }

    private boolean done() {
        synchronized (lock) {
            if (!isDoing()) {
                return false;
            }

            state = FutureState.DONE;
            lock.notifyAll();
        }

        notifyListeners();
        return true;
    }

    private void notifyListeners() {
        if (promiseListenerList != null) {
            for (PromiseListener<V> listener : promiseListenerList) {
                notifyListener(listener);
            }
        }
    }

    private void notifyListener(final PromiseListener<V> listener) {
        try {
            if (exception != null)
                listener.operationFailed(this);
            else
                listener.operationCompleted(this);
        } catch (Throwable t) {
            LOG.error("notifyListener {} Error:{}", listener.getClass().getSimpleName(), t);
        }
    }

    //......
}
  • set或者setFailure方法都会调用done方法
  • done方法会调用notifyListeners,回调listener的operationCompleted或者operationFailed

小结

  • ProducerImpl主要是为rocketmq自身的rocketmqProducer适配open-messaging的api接口
  • 异步采用自定义的SendCallback回调和DefaultPromise

doc

  • ProducerImpl

原文发布于微信公众号 - 码匠的流水账(geek_luandun)

原文发表时间:2018-07-28

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏码匠的流水账

聊聊openmessaging-java

openmessaging-java/openmessaging-api/src/main/java/io/openmessaging/producer/Pro...

28210
来自专栏小白鼠

【DUBBO】 Schema解析Spring扩展机制集成Spring

dubbo是如何做到与spring集成的?这都依赖于Spring提供的XML Schema可扩展机制,用户可以自定义XML Schema文件,并自定义XML B...

19730
来自专栏码匠的流水账

kafka0.8生产者异常处理

本文简单解析一下kafka0.8.2.2版本中的java producer的异常处理。

11110
来自专栏码匠的流水账

聊聊spring cloud gateway的GlobalFilter

本文主要研究一下spring cloud gateway的GlobalFilter

23810
来自专栏函数式编程语言及工具

SDP(6):分布式数据库运算环境- Cassandra-Engine

    现代信息系统应该是避不开大数据处理的。作为一个通用的系统集成工具也必须具备大数据存储和读取能力。cassandra是一种分布式的数据库,具备了分布式数据...

34340
来自专栏曾大稳的博客

Android ClassLoader流程解读并简单方式实现热更新

ClassLoader在启动Activity的时候会调用loadClass方法,我们就从这里入手:

31420
来自专栏码匠的流水账

聊聊HystrixPlugins

hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/strategy/HystrixPlugins.jav...

17910
来自专栏码匠的流水账

聊聊storm的IEventLogger

storm-2.0.0/storm-client/src/jvm/org/apache/storm/metric/IEventLogger.java

14730
来自专栏码匠的流水账

聊聊storm的window trigger

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/WindowTridentPr...

10840
来自专栏码匠的流水账

聊聊rocketmq的PullConsumerImpl

io/openmessaging/rocketmq/consumer/PullConsumerImpl.java

24910

扫码关注云+社区

领取腾讯云代金券