前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RocketMQ详解(6)——Producer详解

RocketMQ详解(6)——Producer详解

作者头像
张申傲
发布2020-09-03 10:50:38
9850
发布2020-09-03 10:50:38
举报
文章被收录于专栏:漫漫架构路

RocketMQ详解(6)——Producer详解

一. Producer的特性

  1. 消息过滤 对于Producer,可以对单个主题发送消息,也可以对多个主题发送消息,这种设计非常灵活。而且,可以通过Tag定义一些简单的过滤,通常已经可以满足我们90%的需求了。对于一些更复杂的过滤场景,可以使用Filter实现。
  2. Producer的模式 RocketMQ提供了三种不同模式的Producer:
    1. 普通模式:NormalProducer 这种模式自不必说,使用传统的send()方法发送消息即可。这种模式下不能保证消息的顺序一致性。
    2. 顺序模式:OrderProducer 这种模式可以保证消息的严格顺序消费。如果想要实现全局顺序,可以将消息发往同一个Queue;如果要保证局部顺序,则可以发往多个Queue。
    3. 事务模式:TransactionProducer 支持以事务的方式对消息进行提交处理,在RocketMQ中事务消息分为两个阶段:
      1. 第一个阶段将消息预发送给Broker,此时消息已经在队列中了,但是消费端不可见。
      2. 第二个阶段为本地消息回调处理,如果事务处理成功,返回COMMIT_MESSAGE,将消息正式发送且消费端可见;处理失败则返回ROLLBACK_MESSAGE,此消息直接丢弃。

二. DefaultMQProducer——普通生产者

DefaultMQProducer是一个默认的消息生产者,可以支持发送普通消息和顺序消息。

DefaultMQProducer中定义了一些发送消息相关的属性,还提供了发送消息的相关方法,可以支持同步发送和异步发送,可以发往Broker,由Broker决定具体发往的Queue,也可以指定发往的Queue。

下面简单结合源码,对其API进行介绍:

代码语言:javascript
复制
//生产者组
private String producerGroup;

//创建Topic时的topicKey,在测试时可指定Broker自增模式
private String createTopicKey = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;

//每个Topic中默认有4个Queue来存储消息
private volatile int defaultTopicQueueNums = 4;

//默认发送超时3000ms
private int sendMsgTimeout = 3000;

//默认情况下,当消息体字节数超过4k时,消息会被压缩(Consumer收到消息会自动解压缩)
private int compressMsgBodyOverHowmuch = 1024 * 4;

//同步发送消息时,消息发送失败后的最大重试次数。RocketMQ在消息重试机制上有很好的支持,但是重试可能会引起重复消息的问题,这需要在逻辑上进行幂等处理。
private int retryTimesWhenSendFailed = 2;

//异步发送时的最大重试次数,类似retryTimesWhenSendFailed
private int retryTimesWhenSendAsyncFailed = 2;

//如果消息发送成功,但是返回SendResult != SendStatus.SEND_OK,是否尝试发往别的Broker
private boolean retryAnotherBrokerWhenNotStoreOK = false;

//默认最大消息长度:4M,当消息长度超过限制时,RocketMQ会自动抛出异常
private int maxMessageSize = 1024 * 1024 * 4; 

public DefaultMQProducer() {
    this(MixAll.DEFAULT_PRODUCER_GROUP, null);
}

public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
    this.producerGroup = producerGroup;
    defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
}

public DefaultMQProducer(final String producerGroup) {
    this(producerGroup, null);
}

public DefaultMQProducer(RPCHook rpcHook) {
    this(MixAll.DEFAULT_PRODUCER_GROUP, rpcHook);
}

//启动方法,发送消息前必须调用
@Override
public void start() throws MQClientException {
    this.defaultMQProducerImpl.start();
}

//关闭方法,释放资源
@Override
public void shutdown() {
    this.defaultMQProducerImpl.shutdown();
}


//同步发送消息,该方法会阻塞直到消息发送成功
@Override
public SendResult send(
    Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return this.defaultMQProducerImpl.send(msg);
}

//同步发送消息,指定超时时间
@Override
public SendResult send(Message msg,
                       long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return this.defaultMQProducerImpl.send(msg, timeout);
}

//异步发送消息,并注册发送回调。该方法会立即返回,当发送完成后,Broker会回调SendCallback通知发送结果
@Override
public void send(Message msg,
                 SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
    this.defaultMQProducerImpl.send(msg, sendCallback);
}

//异步发送消息,并指定超时时间
@Override
public void send(Message msg, SendCallback sendCallback, long timeout)
    throws MQClientException, RemotingException, InterruptedException {
    this.defaultMQProducerImpl.send(msg, sendCallback, timeout);
}


//同步发送消息,发往指定队列
@Override
public SendResult send(Message msg, MessageQueue mq)
    throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return this.defaultMQProducerImpl.send(msg, mq);
}

//异步发送消息到指定队列
@Override
public void send(Message msg, MessageQueue mq, SendCallback sendCallback)
    throws MQClientException, RemotingException, InterruptedException {
    this.defaultMQProducerImpl.send(msg, mq, sendCallback);
}

//创建一个Topic。RocketMQ可支持Topic的自动创建,也可手动调用createTopic方法创建。创建Topic时需指定Topic的key,测试时可使用Broker自增key的方式,但是实际生产情况下应使用具有业务意义的key。创建时还可以指定Topic的name、Topic中Queue的数量等。
@Override
public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
    createTopic(key, newTopic, queueNum, 0);
}


}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018/09/11 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • RocketMQ详解(6)——Producer详解
    • 一. Producer的特性
      • 二. DefaultMQProducer——普通生产者
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档