

Name Server
管理Broker实例的注册,提供心跳检测机制
路由管理: Producer和Conumser通过NameServer可以获取整个Broker集群的路由信息
生产者 Producer
以生产者组的形式出现,一个生产者组可以同时发送多个主题的消息
Broker
存储消息、转发消息
Consumer消费者
以消费组的形式出现
同一类消费者的集合,这类Consumer消费的是同一个Topic类型的消息

生产者可靠性 - 重试策略
如果同步模式发送失败,则轮转到下一个Broker进行重试,重试2次
如果异步模式发送失败,则轮转到当前Broker进行重试,重试2次
Broker 可靠性 - 刷盘与同步机制

消息写入能力水平扩展,RocketMQ 对 Topic进行了分区,这种操作被称为队列(MessageQueue)
ConsumerGroup下的消费者主要有两种负载均衡模式,即 广播模式 ,和 集群模式(一般使用这个)
集群模式下,同一个 ConsumerGroup 中的 Consumer 实例是负载均衡消费
广播模式下,同一个 ConsumerGroup 中的每个 Consumer 实例都处理全部的队列
生产者可靠性 - 重试策略
如果同步模式发送失败,则轮转到下一个Broker进行重试,重试2次
如果异步模式发送失败,则轮转到当前Broker进行重试,重试2次
Broker 可靠性 - 刷盘与同步机制

刷盘机制
| 刷盘方式 | 说明 | 特点 | 
|---|---|---|
| 同步刷盘 | 写PageCache,立即刷盘,刷盘完成,返回成功 | 数据安全,吞吐量不大 | 
| 异步刷盘 | 写PageCache,返回成功 依靠刷盘机制刷盘 PageCache中的消息积累到一定的量 或定时触发一次写磁盘操作 | 吞吐量大,性能高,PageCache可能丢失 | 
同步机制
| 同步机制 | 说明 | 特地 | 
|---|---|---|
| 同步复制(推荐) | 主从,都写入成功后,返回成功 | 易恢复,写入延迟大,降低系统吞吐量 | 
| 异步复制 | 写主成功,就返回成功 | 数据可能丢失,写入性能高,系统吞吐量大 | 
消息者可靠性 - 重试策略
Exactly Once需要依托于本地事务表
首选选定唯一键,msgId,或者业务唯一键,例如订单Id
如果 本地事务表中,没有就插入之后执行消费。
实例- 事务消息,顺序消息,tag过滤
一般使用pull模式消费,一个应用一个topic,多个tags模式
pom
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.1.1</version>
</dependency>配置
#nameserver 的ip:host
rocketmq.name-server = ip:host
#消费者不配置
rocketmq.producer.group= wenlei-producer-group普通消息,带tag,keys
//普通消息,带tag,keys
@Autowired
private RocketMQTemplate rocketMQTemplate;
public SendResult  commonMsg() {
    Message message = MessageBuilder.withPayload("消息体")
       .setHeader("KEYS", "我是Key").build();
  //topic:tag 标记要发送的tag
    SendResult sendResult = rocketMQTemplate
          .syncSend("wenlei-topic:tag1", message);
    log.info("sendResult:{},{},sendStatus{}",
             sendResult.getMsgId(),keys,sendResult.getSendStatus().name());
    return sendResult;
}顺序消息
public SendResult  order() { 
    String shardingKey =  UUID.randomUUID().toString();
    Message message = MessageBuilder
      .withPayload("顺序消息体").setHeader("KEYS", shardingKey).build();
    SendResult sendResult = rocketMQTemplate
      .syncSendOrderly("wenlei-topic:tag1", message,shardingKey);
    log.info("sendResult:{},{},sendStatus{}"
             ,sendResult.getMsgId(),shardingKey
             ,sendResult.getSendStatus().name());
    return sendResult;
}事务消息
一个rocketMQTemplate 只能有一个RocketMQLocalTransactionListener, 下面是做额外的ExtRocketMQTemplate
@ExtRocketMQTemplateConfiguration
@Component("extRocketMQTemplate")
public class ExtRocketMQTemplate extends RocketMQTemplate {
}RocketMQLocalTransactionListener 执行本地事务,查询本地事务的状态。
@Slf4j
// 绑定extRocketMQTemplate
@RocketMQTransactionListener(
  rocketMQTemplateBeanName ="extRocketMQTemplate")
public class TransactionMsgListener 
  implements RocketMQLocalTransactionListener {
    @Override    
    public RocketMQLocalTransactionState
    executeLocalTransaction(Message msg, Object arg) {
        try {
           log.info("本地的业务工作");
            return RocketMQLocalTransactionState.COMMIT;
        }catch (Exception e){
            e.printStackTrace();
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }
  @Override    
  public RocketMQLocalTransactionState 
    checkLocalTransaction(Message msg) {
        log.info("本地的业务工作的状态");
        if(成功状态){
            return RocketMQLocalTransactionState.COMMIT;
        }else if(失败状态){
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        return RocketMQLocalTransactionState.UNKNOWN;
    }
}// 发送事务消息
public TransactionSendResult  tranction() {
    String transactionId = UUID.randomUUID().toString();
    TransactionSendResult result = this.extRocketMQTemplate
      .sendMessageInTransaction("wenlei-topic:tag2",
            MessageBuilder.withPayload(param)
            .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
            .build(), param);
    return result;
}按tag消费
@Component
@RocketMQMessageListener(
        // topic:消息的发送者使用同一个topic      
          topic = "wenlei-topic",
        //group:在RocketMQ中消费者和发送者组没有关系        
         consumerGroup = "test-group",
        //tag:设置为 * 时,表示全部。       
         selectorExpression = "tag1 || tag2 || tag3",
        //消费模式:默认 CLUSTERING ( CLUSTERING:负载均衡 )
        //( BROADCASTING:广播机制 ) 一般不用     
          messageModel = MessageModel.CLUSTERING  )
@Slf4j
public class MyConsumer implements RocketMQListener<MessageExt> {
    @Override    public void onMessage(MessageExt message) {
        log.info("consumer:{},tag:{},keys:{}",
        new String(message.getBody(), Charset.forName("utf8")),
        message.getTags(),message.getKeys());
    }
}