前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【RocketMq实战第五篇】-不同类型生产者(DefaultMQProducer)

【RocketMq实战第五篇】-不同类型生产者(DefaultMQProducer)

作者头像
胖虎
发布2019-06-26 16:59:46
1.6K0
发布2019-06-26 16:59:46
举报
文章被收录于专栏:晏霖晏霖

前言 本文来介绍RocketMQ生产者发送消息默认使用的DefaultMQProducer类。

生产者 向消息队列里写入消息,不 同的业务场景需要生产者采用不同的写入策略 。 比如同步发送、异步发送、 延迟发送、 发送事务消息等。

正文 我们结合代码来了解一下,

代码位置在package org.apache.rocketmq.example.simple;

代码语言:javascript
复制
public class AsyncProducer {
    public static void main(
        String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException {

        DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        producer.setRetryTimesWhenSendAsyncFailed(0);

        for (int i = 0; i < 10000000; i++) {
            try {
                final int index = i;
                Message msg = new Message("Jodie_topic_1023",
                    "TagA",
                    "OrderID188",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
                    }

                    @Override
                    public void onException(Throwable e) {
                        System.out.printf("%-10d Exception %s %n", index, e);
                        e.printStackTrace();
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        producer.shutdown();
    }
}

代码讲解

发送消息要经过几个步骤 :

(1 )设置 Producer 的 GroupName。

(2 )设置 lnstanceName,当一个 Jvm 需要启动多个 Producer 的时候,通过设置不同的 InstanceName来区分,不设置的话系统使用默认名称“DEFAULT”。(本例没有写)

( 3 )设置发送失败重试次数,当网络出现异常的时候,这个次数影响消息的重复投递次数。想保证不丢消息,可以设置多重试几次 。

(4 )设置 NameServer 地址 。 (5 )组装消息并发送 。

消息的发送有同步和异步两种方式,上面的代码使用的是异步方式 。消息发送的返回状态有如下四种 : FLUSH_DISK_TIMEOUT 、 FLUSH_SLAVE_TIMEOUT 、SLAVE_NOT_AVAILABLE 、SEND_OK,不同状态在不同的刷盘策略和同步策略的配置下含义是不同的 。

FLUSH_DISK_TIMEOUT : 表示没有在规定时间内完成刷盘(需要 Broker 的刷盘策被设置成 SYNC_FLUSH 才会报这个错误) 。 FLUSH_SLAVE_TIMEOUT :表示在主备方式下,并且 Broker被设 置 成 SYNC_MASTER 方式,没有在设定时间内完成 主从同步 。 SLAVE_NOT_AVAILABLE : 这个状态 产生的场景和 FLUSH_SLAVE_TIMEOUT 类似, 表示在主备 方式下,并且 Broker被设置成 SYNC_MASTER,但是没有找到被配置成 Slave 的 Broker。 SEN_ OK :表示发送成功,发送成功的具体含义,比如消息是否已经 被存储到融盘?消息是否被同步到了 Slave上?消息在 Slave上是否被 写人磁盘?需要结合所配置的刷盘策略、主从策略来定 。 这个状态还可 以简单理解为,没有发生上面列出的 三个问题状态就是 SEND OK。

发送延迟消息

Broker收到这类消息后 ,延迟一段时间再处理, 使消息在规定的一段时间后生效。

使用方法:在创建 Message对象时,调用 setDelayTimeLevel ( int level) 方法设置延迟时间, 然后再把这个消息发送 出去。 目前延迟的时间不支 持任意设置,仅支持预设值的时间长度 ( 1s/5s/1Os/30s/Im/2m/3m/4m/5m/6m/ 7m/8m/9m/1Om/20m/30m/1h/2h)。 比如 setDelayTimeLevel(3)表示延迟 10s。

自定义消息发送规则

一个 Topic会有多个 Message Queue,如果使用 Producer的默认配置,这 个 Producer 会轮流向各个 Message Queue 发 送 消息 。 Consumer 在消费消息的 时候,会根据负载均衡策略,消费被分配到的 Message Queue,如果不经过特 定的设置,某条消息被发往哪个 Message Queue,被哪个 Consumer 消费是未 知的。

如果业务 需 要我们把消息 发 送到指定的 Message Queue 里,比如把同 一 类型 的消息都发 往 相同的 Message Queue,可以用 Message­ QueueSelector。

发送消息的时候,把 MessageQueueSelector 的对象作为参数,使用 public SendResult send ( Message msg, MessageQueueSelector selector, Object arg)函 数发送消 息即可 。 在 MessageQueueSelector 的实现中,根据传人的 Object参 数,或者根据 Message 消息内容确定把消息发往那个 Message Queue,返回被 选中的 Message Queue。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-02-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 晏霖 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
消息队列 CMQ 版
消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档