说说 MQ之RocketMQ(二)

来源:Valleylord ,

valleylord.github.io/post/201607-mq-rocketmq/

RocketMQ 的 Java API

RocketMQ 是用 Java 语言开发的,因此,其 Java API 相对是比较丰富的,当然也有部分原因是 RocketMQ 本身提供的功能就比较多。RocketMQ API 提供的功能包括,

广播消费,这个在之前已经提到过;

消息过滤,支持简单的 Message Tag 过滤,也支持按 Message Header、body 过滤;

顺序消费和乱序消费,之前也提到过,这里的顺序消费应该指的是普通顺序性,这一点与 Kafka 相同;

Pull 模式消费,这个是相对 Push 模式来说的,Kafka 就是 Pull 模式消费;

事务消息,这个好像没有开源,但是 example 代码中有示例,总之,不推荐用;

Tag,RocketMQ 在 Topic 下面又分了一层 Tag,用于表示消息类别,可以用来过滤,但是顺序性还是以 Topic 来看;

单看功能的话,即使不算事务消息,也不算 Tag,RocketMQ 也远超 Kafka,Kafka 应该只实现了 Pull 模式消费 + 顺序消费这2个功能。RocketMQ 的代码示例在 rocketmq-example 中,注意,代码是不能直接运行的,因为所有的代码都少了设置 name server 的部分,需要自己手动加上,例如,producer.setNamesrvAddr("192.168.232.23:9876");。

先来看一下生产者的 API,比较简单,只有一种,如下,

import java.util.List;

public class Producer {

public static void main(String[] args) throws MQClientException, InterruptedException {

DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");

producer.setNamesrvAddr("192.168.232.23:9876");

producer.start();

for (int i = 0; i

try {

{

Message msg = new Message("TopicTest1",// topic

"TagA",// tag

"OrderID188",// key

("RocketMQ "+String.format("%05d", i)).getBytes());// body

SendResult sendResult = producer.send(msg, new MessageQueueSelector() {

@Override

public MessageQueue select(List mqs, Message msg, Object arg) {

Integer id = (Integer) arg;

int index = id % mqs.size();

return mqs.get(index);

}

}, i));

}

}

catch (Exception e) {

e.printStackTrace();

}

producer.shutdown();

}

}

可以发现,相比 Kafka 的 API,只多了 Tag,但实际上行为有很大不同。Kafka 的生产者客户端,有同步和异步两种模式,但都是阻塞模式,send 方法返回发送状态的 Future,可以通过 Future 的 get 方法阻塞获得发送状态。而 RocketMQ 采用的是同步非阻塞模式,发送之后立刻返回发送状态(而不是 Future)。正常情况下,两者使用上差别不大,但是在高可用场景中发生主备切换的时候,Kafka 的同步可以等待切换完成并重连,最后返回;而 RocketMQ 只能立刻报错,由生产者选择是否重发。所以,在生产者的 API 上,其实 Kafka 是要强一些的。

另外,RocketMQ 可以通过指定 MessageQueueSelector 类的实现来指定将消息发送到哪个分区去,Kafka 是通过指定生产者的 partitioner.class 参数来实现的,灵活性上 RocketMQ 略胜一筹。

再来看消费者的API,由于 RocketMQ 的功能比较多,我们先看 Pull 模式消费的API,如下,

import java.util.HashMap;

import java.util.Map;

import java.util.Set;

import com.alibaba.rocketmq.client.consumer.store.OffsetStore;

import com.alibaba.rocketmq.client.consumer.store.RemoteBrokerOffsetStore;

public class PullConsumer {

private static final Map offseTable = new HashMap();

public static void main(String[] args) throws MQClientException {

DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");

consumer.setNamesrvAddr("192.168.232.23:9876");

consumer.start();

Set mqs = consumer.fetchSubscribeMessageQueues("TopicTest1");

for (MessageQueue mq : mqs) {

SINGLE_MQ: while (true) {

try {

long offset = consumer.fetchConsumeOffset(mq, true);

PullResult pullResult =

consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);

if (null != pullResult.getMsgFoundList()) {

for (MessageExt messageExt : pullResult.getMsgFoundList()) {

}

}

putMessageQueueOffset(mq, pullResult.getNextBeginOffset());

switch (pullResult.getPullStatus()) {

case FOUND:

// TODO

break;

case NO_MATCHED_MSG:

break;

case NO_NEW_MSG:

break SINGLE_MQ;

case OFFSET_ILLEGAL:

break;

default:

break;

}

}

catch (Exception e) {

e.printStackTrace();

}

}

}

consumer.shutdown();

}

private static void putMessageQueueOffset(MessageQueue mq, long offset) {

offseTable.put(mq, offset);

}

private static long getMessageQueueOffset(MessageQueue mq) {

Long offset = offseTable.get(mq);

if (offset != null)

return offset;

return 0;

}

}

这部分的 API 其实是和 Kafka 很相似的,唯一不同的是,RocketMQ 需要手工管理 offset 和指定分区,而 Kafka 可以自动管理(当然也可以手动管理),并且不需要指定分区(分区是在 Kafka 订阅的时候指定的)。例子中,RocketMQ 使用 HashMap 自行管理,也可以用 OffsetStore 接口,提供了两种管理方式,本地文件和远程 Broker。这部分感觉两者差不多。

下面再看看 Push 模式顺序消费,代码如下,

import java.util.List;

public class Consumer {

public static void main(String[] args) throws MQClientException {

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");

consumer.setNamesrvAddr("192.168.232.23:9876");

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicTest1", "TagA || TagC || TagD");

consumer.registerMessageListener(new MessageListenerOrderly() {

AtomicLong consumeTimes = new AtomicLong(0);

@Override

public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) {

context.setAutoCommit(false);

this.consumeTimes.incrementAndGet();

if ((this.consumeTimes.get() % 2) == 0) {

return ConsumeOrderlyStatus.SUCCESS;

}

else if ((this.consumeTimes.get() % 3) == 0) {

return ConsumeOrderlyStatus.ROLLBACK;

}

else if ((this.consumeTimes.get() % 4) == 0) {

return ConsumeOrderlyStatus.COMMIT;

}

else if ((this.consumeTimes.get() % 5) == 0) {

context.setSuspendCurrentQueueTimeMillis(3000);

return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;

}

return ConsumeOrderlyStatus.SUCCESS;

}

});

consumer.start();

}

}

虽然提供了 Push 模式,RocketMQ 内部实际上还是 Pull 模式的 MQ,Push 模式的实现应该采用的是长轮询,这点与 Kafka 一样。使用该方式有几个注意的地方,

接收消息的监听类要使用 MessageListenerOrderly;

ConsumeFromWhere 有几个参数,表示从头开始消费,从尾开始消费,还是从某个 TimeStamp 开始消费;

可以控制 offset 的提交,应该就是 context.setAutoCommit(false); 的作用;

控制 offset 提交这个特性非常有用,某种程度上扩展一下,就可以当做事务来用了,看代码 ConsumeMessageOrderlyService 的实现,其实并没有那么复杂,在不启用 AutoCommit 的时候,只有返回 COMMIT 才 commit offset;启用 AutoCommit 的时候,返回 COMMIT、ROLLBACK(这个比较扯)、SUCCESS 的时候,都 commit offset。

后来发现,commit offset 功能在 Kafka 里面也有提供,使用新的 API,调用 consumer.commitSync。

再看一个 Push 模式乱序消费 + 消息过滤的例子,消费者的代码如下,

import java.util.List;

public class Consumer {

public static void main(String[] args) throws InterruptedException, MQClientException {

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupNamecc4");

consumer.setNamesrvAddr("192.168.232.23:9876");

consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override

public ConsumeConcurrentlyStatus consumeMessage(List msgs,

ConsumeConcurrentlyContext context) {

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

});

consumer.start();

}

}

这个例子与之前顺序消费不同的地方在于,

接收消息的监听类使用的是 MessageListenerConcurrently;

回调方法中,使用的是自动 offset commit;

订阅的时候增加了消息过滤类 MessageFilterImpl;

消息过滤类 MessageFilterImpl 的代码如下,

public class MessageFilterImpl implements MessageFilter {

@Override

public boolean match(MessageExt msg) {

String property = msg.getUserProperty("SequenceId");

if (property != null) {

int id = Integer.parseInt(property);

if ((id % 3) == 0 && (id > 10)) {

return true;

}

}

return false;

}

}

RocketMQ 执行过滤是在 Broker 端,Broker 所在的机器会启动多个 FilterServer 过滤进程;Consumer 启动后,会向 FilterServer 上传一个过滤的 Java 类;Consumer 从 FilterServer 拉消息,FilterServer 将请求转发给 Broker,FilterServer 从 Broker 收到消息后,按照 Consumer 上传的 Java 过滤程序做过滤,过滤完成后返回给 Consumer。这种过滤方法可以节省网络流量,但是增加了 Broker 的负担。可惜我没有实验出来使用过滤的效果,即使是用 github wiki 上的例子8也没成功,不纠结了。RocketMQ 的按 Tag 过滤的功能也是在 Broker 上做的过滤,能用,是个很方便的功能。

还有一种广播消费模式,比较简单,可以去看代码,不再列出。

总之,RocketMQ 提供的功能比较多,比 Kafka 多很多易用的 API。

【关于投稿】

如果大家有原创好文投稿,请直接给公号发送留言。

① 留言格式:

【投稿】+《 文章标题》+ 文章链接

② 示例:

【投稿】《不要自称是程序员,我十多年的 IT 职场总结》:http://blog.jobbole.com/94148/

③ 最后请附上您的个人简介哈~

看完本文有收获?请转发分享给更多人

关注「ImportNew」,提升Java技能

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20181014B11J8400?refer=cp_1026
  • 腾讯「云+社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。

扫码关注云+社区

领取腾讯云代金券