我们需要把RocketMQ运行起来,它包含两个组件,NameServer和Broker,把这两个组件运行起来就可以了,可以二进制运行,或者把源码拉下来运行,大家参考一下官方文档就可以运行起来了,这里我讲解一下拉取源码的方式运行。 先将distribution/conf目录,复制到源码的目录
运行成功输出
broker.conf添加配置:namesrvAddr=127.0.0.1:9876
运行成功输出
单机版的RocketMQ集群就搭建完成了,在本机运行可以方便我们以后调试。
maven需要依赖rocketmq-client
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.5.0</version>
</dependency>
public class Producer {
public static void main(String[] args) throws MQClientException{
DefaultMQProducer producer = new DefaultMQProducer("demoProducerGroup");
//设置nameserver地址
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 1; i++) {
try {
//生成mq消息
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("关注java面试教程 学习更多知识").getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//同步发送mq消息到broker,获取到结果,可以知道是否发送成功
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
}
}
producer.shutdown();
}
}
运行结果:发送成功,并返回了消息id
public class Consumer {
public static void main(String[] args) throws MQClientException {
//设置消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demoConsumerGroup");
//设置nameserver地址
consumer.setNamesrvAddr("127.0.0.1:9876");
//第一次消费从那开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//消费模式
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.subscribe("TopicTest", "*");
//mq消息的回调函数
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
输出刚刚发送到mq的消息:
今天我们学习了怎么运行RocketMQ, 以及使用它来生产消息和消费消息,下篇我们来学习一下RocketMQ的生产者和消费者的核心概念。