RocketMQ在4.4.0版本开始支持ACL。ACL是access control list的简称,俗称访问控制列表。访问控制,基本上会涉及到用户、资源、权限、角色等概念,那在RocketMQ中上述会对应哪些对象呢?
另外,RocketMQ还支持按照客户端IP进行白名单设置。
在讲解如何使用ACL之前,我们先简单看一下RocketMQ ACL的请求流程:
对于上述具体的实现,将在后续文章中重点讲解,本文的目的只是希望给读者一个大概的了解。
acl默认的配置文件名:plain_acl.yml,需要放在{ROCKETMQ_HOME}/store/config目录下。下面对其配置项一一介绍。
全局白名单,其类型为数组,即支持多个配置。其支持的配置格式如下:
配置用户信息,该类型为数组类型。拥有accessKey、secretKey、whiteRemoteAddress、admin、defaultTopicPerm、defaultGroupPerm、topicPerms、groupPerms子元素。
登录用户名,长度必须大于6个字符。
登录密码。长度必须大于6个字符。
用户级别的IP地址白名单。其类型为一个字符串,其配置规则与globalWhiteRemoteAddresses,但只能配置一条规则。
boolean类型,设置是否是admin。如下权限只有admin=true时才有权限执行。
默认topic权限。该值默认为DENY(拒绝)。
默认消费组权限,该值默认为DENY(拒绝),建议值为SUB。
设置topic的权限。其类型为数组,其可选择值在下节介绍。
设置消费组的权限。其类型为数组,其可选择值在下节介绍。可以为每一消费组配置不一样的权限。
上面定义了全局白名单、用户级别的白名单,用户级别的权限,为了更好的配置ACL权限规则,下面给出权限匹配逻辑。
首先,需要在broker.conf文件中,增加参数aclEnable=true。并拷贝distribution/conf/plain_acl.yml文件到${ROCKETMQ_HOME}/conf目录。
broker.conf的配置文件如下:
1 brokerClusterName = DefaultCluster
2 brokerName = broker-b
3 brokerId = 0
4 deleteWhen = 04
5 fileReservedTime = 48
6 brokerRole = ASYNC_MASTER
7 flushDiskType = ASYNC_FLUSH
8 listenPort=10915
9 storePathRootDir=E:/SH2019/tmp/rocketmq_home/rocketmq4.5MB/store
10 storePathCommitLog=E:/SH2019/tmp/rocketmq_home/rocketmq4.5MB/store/commitlog
11 namesrvAddr=127.0.0.1:9876
12 autoCreateTopicEnable=false
13 aclEnable=true
plain_acl.yml文件内容如下:
1 globalWhiteRemoteAddresses:
2
3 accounts:
4 - accessKey: RocketMQ
5 secretKey: 12345678
6 whiteRemoteAddress:
7 admin: false
8 defaultTopicPerm: DENY
9 defaultGroupPerm: SUB
10 topicPerms:
11 - TopicTest=PUB
12 groupPerms:
13 # the group should convert to retry topic
14 - oms_consumer_group=DENY
15
16- accessKey: admin
17 secretKey: 12345678
18 whiteRemoteAddress:
19 # if it is admin, it could access all resources
20 admin: true
从上面的配置可知,用户RocketMQ只能发送TopicTest的消息,其他topic无权限发送;拒绝oms_consumer_group消费组的消息消费,其他消费组默认可消费。
1 public class AclProducer {
2 public static void main(String[] args) throws MQClientException, InterruptedException {
3 DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name", getAclRPCHook());
4 producer.setNamesrvAddr("127.0.0.1:9876");
5 producer.start();
6 for (int i = 0; i < 1; i++) {
7 try {
8 Message msg = new Message("TopicTest3" ,"TagA" , ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
9 SendResult sendResult = producer.send(msg);
10 System.out.printf("%s%n", sendResult);
11 } catch (Exception e) {
12 e.printStackTrace();
13 Thread.sleep(1000);
14 }
15 }
16 producer.shutdown();
17 }
18
19 static RPCHook getAclRPCHook() {
20 return new AclClientRPCHook(new SessionCredentials("rocketmq","12345678"));
21 }
22 }
运行效果如图所示:
1 public class AclConsumer {
2
3 public static void main(String[] args) throws InterruptedException, MQClientException {
4 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("oms_consumer_group", getAclRPCHook(),new AllocateMessageQueueAveragely());
5 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
6 consumer.subscribe("TopicTest", "*");
7 consumer.setNamesrvAddr("127.0.0.1:9876");
8 consumer.registerMessageListener(new MessageListenerConcurrently() {
9 @Override
10 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
11 ConsumeConcurrentlyContext context) {
12 System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
13 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
14 }
15 });
16 consumer.start();
17 System.out.printf("Consumer Started.%n");
18 }
19
20 static RPCHook getAclRPCHook() {
21 return new AclClientRPCHook(new SessionCredentials("rocketmq","12345678"));
22 }
23}
发现并不没有消费消息,符合预期。
关于RocketMQ ACL的使用就介绍到这里了,下一篇将介绍RocketMQ ACL实现原理。