前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RocketMQ 生产者 Producer 发送消息

RocketMQ 生产者 Producer 发送消息

作者头像
java404
发布2018-12-25 14:56:58
2.1K0
发布2018-12-25 14:56:58
举报
文章被收录于专栏:java 成神之路java 成神之路

概述

Producer 发送消息,RocketMQ 提供了三种模式。

  • 同步发送
  • 异步发送
  • OneWay 发送

示例代码如下:

代码语言:javascript
复制
// 1、同步发送
SendResult sendResult = producer.send(msg);

//2、异步发送
producer.send(msg, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
    }
    @Override
    public void onException(Throwable e) {
    }
});

//3、 Oneway发送
producer.sendOneway(msg);
  • 1、同步发送 Producer 向 broker 发送消息,阻塞当前线程等待 broker 响应 发送结果。
  • 2、异步发送 Producer 首先构建一个向 broker 发送消息的任务,把该任务提交给线程池,等执行完该任务时,回调用户自定义的回调函数,执行处理结果。
  • 3、Oneway 发送 Oneway 方式只负责发送请求,不等待应答,Producer 只负责把请求发出去,而不处理响应结果。

一、同步发送

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl

代码语言:javascript
复制
private SendResult sendDefaultImpl(Message msg,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback, final long timeout
  ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    //省略代码
    ......

    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
        //省略代码
        ......
        // 1、计算重发次数
        int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
        int times = 0;
        String[] brokersSent = new String[timesTotal];

        // 2、循环执行发送消息
        for (; times < timesTotal; times++) {
            String lastBrokerName = null == mq ? null : mq.getBrokerName();
            // 3、选择要发送的 MessageQueue
            MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
            if (mqSelected != null) {
                mq = mqSelected;
                brokersSent[times] = mq.getBrokerName();
                try {
                    beginTimestampPrev = System.currentTimeMillis();
                    long costTime = beginTimestampPrev - beginTimestampFirst;
                    if (timeout < costTime) {
                        callTimeout = true;
                        break;
                    }
                    // 4、调用 sendKernelImpl 方法进行发送消息
                    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                    endTimestamp = System.currentTimeMillis();
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                    switch (communicationMode) {
                        case ASYNC:
                            return null;
                        case ONEWAY:
                            return null;
                        case SYNC:
                            // 5、如果发送失败,则continue,继续循环发送,发送成功则直接 return 返回
                            if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                    continue;
                                }
                            }
                            
                            return sendResult;
                        default:
                            break;
                    }
                } 
                //省略代码
                ......
            } else {
                break;
            }
        }

        if (sendResult != null) {
            return sendResult;
        }
        //省略代码
        ......
}
1、计算消息最多发送的次数。

同步发送:org.apache.rocketmq.client.producer.DefaultMQProducer#retryTimesWhenSendFailed + 1,默认retryTimesWhenSendFailed是2,所以除了正常调用一次外,发送消息如果失败了会重试2次。总共会发送3次,如果3次都失败则返回发送失败的消息。 异步发送:不会重试(调用总次数等于1)

2、循环执行发送消息

如果发送的消息未成功发送,则循环继续发送,直到发送的次数达到 timesTotal 。

3、选择要发送的 MessageQueue

选择消息要发送的 MessageQueue。

4、调用 sendKernelImpl 方法进行发送消息
5、如果发送失败,则continue,继续循环发送,发送成功则直接 return 返回

同步发送原理

RocketMQ 通讯是使用 Netty 进行发送的,Netty 通讯默认都是异步的,那么同步是怎么实现的呢?

代码语言:javascript
复制
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
    final long timeoutMillis)
    throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
    final int opaque = request.getOpaque();

    try {
        // 1、定义一个 ResponseFuture 来处理响应结果
        final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
        this.responseTable.put(opaque, responseFuture);
        final SocketAddress addr = channel.remoteAddress();
        channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture f) throws Exception {
                if (f.isSuccess()) {
                    // 2、通过 Netty 执行完成后回调处理请求的结果
                    responseFuture.setSendRequestOK(true);
                    return;
                } else {
                    responseFuture.setSendRequestOK(false);
                }

                responseTable.remove(opaque);
                responseFuture.setCause(f.cause());
                // 唤醒阻塞的线程
                responseFuture.putResponse(null);
                log.warn("send a request command to channel <" + addr + "> failed.");
            }
        });
        // 3、请求结果默认等待请求3秒钟,如果超过3秒则抛出异常。
        RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
        if (null == responseCommand) {
            if (responseFuture.isSendRequestOK()) {
                throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                    responseFuture.getCause());
            } else {
                throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
            }
        }

        return responseCommand;
    } finally {
        this.responseTable.remove(opaque);
    }
}
1、定义一个 ResponseFuture 来处理响应结果
代码语言:javascript
复制
public class ResponseFuture {
    //省略其它代码
    ......
    private final CountDownLatch countDownLatch = new CountDownLatch(1);

    public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {
        this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
        return this.responseCommand;
    }

    public void putResponse(final RemotingCommand responseCommand) {
        this.responseCommand = responseCommand;
        this.countDownLatch.countDown();
    }
}

ResponseFuture 内部使用了 CountDownLatch 来实现的。当调用waitResponse() 方法时阻塞当前线程,当返回结果时调用 putResponse() 方法存放结果,然后执行this.countDownLatch.countDown() 唤醒阻塞的线程。

2、通过 Netty 执行完成后回调处理请求的结果

使用 Netty 进行发送消息,当 Netty 收到结果后会执行自定义的 ChannelFutureListener.operationComplete()方法。 如果执行完成,调用responseFuture.putResponse(null);立即唤醒阻塞的线程,处理请求结果。

3、默认最长等待请求3秒钟,如果超过3秒则抛出异常。

调用 responseFuture.waitResponse(timeoutMillis)方法阻塞等待 Netty 返回结果。默认最长等待时间为3秒,如果超过3秒则认为调用超时,抛出 RemotingSendRequestException 异常信息。

二、异步发送

代码语言:javascript
复制
public void send(final Message msg, final SendCallback sendCallback, final long timeout)
    throws MQClientException, RemotingException, InterruptedException {
    final long beginStartTime = System.currentTimeMillis();
    //获取执行异步请求的线程池
    ExecutorService executor = this.getCallbackExecutor();
    try {
        executor.submit(new Runnable() {
            @Override
            public void run() {
                long costTime = System.currentTimeMillis() - beginStartTime;
                if (timeout > costTime) {
                    try {
                        // 执行发送消息
                        sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime);
                    } catch (Exception e) {
                        sendCallback.onException(e);
                    }
                } else {
                    sendCallback.onException(
                        new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));
                }
            }

        });
    } catch (RejectedExecutionException e) {
        throw new MQClientException("executor rejected ", e);
    }

}

异步发送的消息是把发送的消息提交给线程池来进行调度执行的。因为不需要同步返回结果。

异步发送原理

代码语言:javascript
复制
private void sendMessageAsync(
    final String addr,
    final String brokerName,
    final Message msg,
    final long timeoutMillis,
    final RemotingCommand request,
    final SendCallback sendCallback,
    final TopicPublishInfo topicPublishInfo,
    final MQClientInstance instance,
    final int retryTimesWhenSendFailed,
    final AtomicInteger times,
    final SendMessageContext context,
    final DefaultMQProducerImpl producer
        ) throws InterruptedException, RemotingException {
    this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
        @Override
        public void operationComplete(ResponseFuture responseFuture) {
            RemotingCommand response = responseFuture.getResponseCommand();
            if (null == sendCallback && response != null) {

                try {
                    SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);
                    if (context != null && sendResult != null) {
                        context.setSendResult(sendResult);
                        context.getProducer().executeSendMessageHookAfter(context);
                    }
                } catch (Throwable e) {
                }

                producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
                return;
            }

            if (response != null) {
                try {
                    SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);
                    assert sendResult != null;
                    if (context != null) {
                        context.setSendResult(sendResult);
                        context.getProducer().executeSendMessageHookAfter(context);
                    }

                    try {
                        sendCallback.onSuccess(sendResult);
                    } catch (Throwable e) {
                    }

                    producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
                } catch (Exception e) {
                    producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
                    onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
                        retryTimesWhenSendFailed, times, e, context, false, producer);
                }
            } else {
                producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
                if (!responseFuture.isSendRequestOK()) {
                    MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());
                    onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
                        retryTimesWhenSendFailed, times, ex, context, true, producer);
                } else if (responseFuture.isTimeout()) {
                    MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
                        responseFuture.getCause());
                    onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
                        retryTimesWhenSendFailed, times, ex, context, true, producer);
                } else {
                    MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());
                    onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
                        retryTimesWhenSendFailed, times, ex, context, true, producer);
                }
            }
        }
    });
}

``

未完待续 ...

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018.12.11 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 概述
  • 一、同步发送
  • 同步发送原理
  • 二、异步发送
    • 异步发送原理
    • 未完待续 ...
    相关产品与服务
    消息队列 CMQ 版
    消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档