聊聊rocketmq的AclClientRPCHook

本文主要研究一下rocketmq的AclClientRPCHook

RPCHook

rocketmq-remoting-4.5.2-sources.jar!/org/apache/rocketmq/remoting/RPCHook.java

public interface RPCHook {
    void doBeforeRequest(final String remoteAddr, final RemotingCommand request);

    void doAfterResponse(final String remoteAddr, final RemotingCommand request,
        final RemotingCommand response);
}
  • RPCHook定义了doBeforeRequest、doAfterResponse方法

AclClientRPCHook

rocketmq-acl-4.5.2-sources.jar!/org/apache/rocketmq/acl/common/AclClientRPCHook.java

public class AclClientRPCHook implements RPCHook {
    private final SessionCredentials sessionCredentials;
    protected ConcurrentHashMap<Class<? extends CommandCustomHeader>, Field[]> fieldCache =
        new ConcurrentHashMap<Class<? extends CommandCustomHeader>, Field[]>();

    public AclClientRPCHook(SessionCredentials sessionCredentials) {
        this.sessionCredentials = sessionCredentials;
    }

    @Override
    public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
        byte[] total = AclUtils.combineRequestContent(request,
            parseRequestContent(request, sessionCredentials.getAccessKey(), sessionCredentials.getSecurityToken()));
        String signature = AclUtils.calSignature(total, sessionCredentials.getSecretKey());
        request.addExtField(SIGNATURE, signature);
        request.addExtField(ACCESS_KEY, sessionCredentials.getAccessKey());
        
        // The SecurityToken value is unneccessary,user can choose this one.
        if (sessionCredentials.getSecurityToken() != null) {
            request.addExtField(SECURITY_TOKEN, sessionCredentials.getSecurityToken());
        }
    }

    @Override
    public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand response) {

    }

    protected SortedMap<String, String> parseRequestContent(RemotingCommand request, String ak, String securityToken) {
        CommandCustomHeader header = request.readCustomHeader();
        // Sort property
        SortedMap<String, String> map = new TreeMap<String, String>();
        map.put(ACCESS_KEY, ak);
        if (securityToken != null) {
            map.put(SECURITY_TOKEN, securityToken);
        }
        try {
            // Add header properties
            if (null != header) {
                Field[] fields = fieldCache.get(header.getClass());
                if (null == fields) {
                    fields = header.getClass().getDeclaredFields();
                    for (Field field : fields) {
                        field.setAccessible(true);
                    }
                    Field[] tmp = fieldCache.putIfAbsent(header.getClass(), fields);
                    if (null != tmp) {
                        fields = tmp;
                    }
                }

                for (Field field : fields) {
                    Object value = field.get(header);
                    if (null != value && !field.isSynthetic()) {
                        map.put(field.getName(), value.toString());
                    }
                }
            }
            return map;
        } catch (Exception e) {
            throw new RuntimeException("incompatible exception.", e);
        }
    }

    public SessionCredentials getSessionCredentials() {
        return sessionCredentials;
    }
}
  • AclClientRPCHook实现了RPCHook接口,其构造器接收SessionCredentials参数;其doBeforeRequest首先通过parseRequestContent从request读取CommandCustomHeader,将其field连同accessKey、securityToken放到一个SortedMap,再通过AclUtils.combineRequestContent计算要发送的请求内容;然后通过AclUtils.calSignature计算出signature,最后往request的extFields添加SIGNATURE、ACCESS_KEY;若设置securityToken,则会往request的extFields添加SECURITY_TOKEN

SessionCredentials

rocketmq-acl-4.5.2-sources.jar!/org/apache/rocketmq/acl/common/SessionCredentials.java

public class SessionCredentials {
    public static final Charset CHARSET = Charset.forName("UTF-8");
    public static final String ACCESS_KEY = "AccessKey";
    public static final String SECRET_KEY = "SecretKey";
    public static final String SIGNATURE = "Signature";
    public static final String SECURITY_TOKEN = "SecurityToken";

    public static final String KEY_FILE = System.getProperty("rocketmq.client.keyFile",
        System.getProperty("user.home") + File.separator + "key");

    private String accessKey;
    private String secretKey;
    private String securityToken;
    private String signature;

    public SessionCredentials() {
        String keyContent = null;
        try {
            keyContent = MixAll.file2String(KEY_FILE);
        } catch (IOException ignore) {
        }
        if (keyContent != null) {
            Properties prop = MixAll.string2Properties(keyContent);
            if (prop != null) {
                this.updateContent(prop);
            }
        }
    }

    public SessionCredentials(String accessKey, String secretKey) {
        this.accessKey = accessKey;
        this.secretKey = secretKey;
    }

    public SessionCredentials(String accessKey, String secretKey, String securityToken) {
        this(accessKey, secretKey);
        this.securityToken = securityToken;
    }

    public void updateContent(Properties prop) {
        {
            String value = prop.getProperty(ACCESS_KEY);
            if (value != null) {
                this.accessKey = value.trim();
            }
        }
        {
            String value = prop.getProperty(SECRET_KEY);
            if (value != null) {
                this.secretKey = value.trim();
            }
        }
        {
            String value = prop.getProperty(SECURITY_TOKEN);
            if (value != null) {
                this.securityToken = value.trim();
            }
        }
    }

    //......
}
  • SessionCredentials提供了三个构造器,一个无参构造器从KEY_FILE加载keyContent然后解析为Properties再通过updateContent方法给accessKey、secretKey、securityToken赋值;一个是accessKey, secretKey的构造器;还有一个是accessKey, secretKey, securityToken的构造器

AclUtils

rocketmq-acl-4.5.2-sources.jar!/org/apache/rocketmq/acl/common/AclUtils.java

public class AclUtils {

    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);

    public static byte[] combineRequestContent(RemotingCommand request, SortedMap<String, String> fieldsMap) {
        try {
            StringBuilder sb = new StringBuilder("");
            for (Map.Entry<String, String> entry : fieldsMap.entrySet()) {
                if (!SessionCredentials.SIGNATURE.equals(entry.getKey())) {
                    sb.append(entry.getValue());
                }
            }

            return AclUtils.combineBytes(sb.toString().getBytes(CHARSET), request.getBody());
        } catch (Exception e) {
            throw new RuntimeException("Incompatible exception.", e);
        }
    }

    public static byte[] combineBytes(byte[] b1, byte[] b2) {
        int size = (null != b1 ? b1.length : 0) + (null != b2 ? b2.length : 0);
        byte[] total = new byte[size];
        if (null != b1)
            System.arraycopy(b1, 0, total, 0, b1.length);
        if (null != b2)
            System.arraycopy(b2, 0, total, b1.length, b2.length);
        return total;
    }

    public static String calSignature(byte[] data, String secretKey) {
        String signature = AclSigner.calSignature(data, secretKey);
        return signature;
    }

    //......
}
  • combineRequestContent首先将fieldsMap拼接为字符串,然后通过AclUtils.combineBytes将其与request.getBody()结合在一起;calSignature方法内部是委托给AclSigner.calSignature(data, secretKey)来实现

AclSigner

rocketmq-acl-4.5.2-sources.jar!/org/apache/rocketmq/acl/common/AclSigner.java

public class AclSigner {
    public static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8");
    public static final SigningAlgorithm DEFAULT_ALGORITHM = SigningAlgorithm.HmacSHA1;
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.ROCKETMQ_AUTHORIZE_LOGGER_NAME);
    private static final int CAL_SIGNATURE_FAILED = 10015;
    private static final String CAL_SIGNATURE_FAILED_MSG = "[%s:signature-failed] unable to calculate a request signature. error=%s";

    public static String calSignature(String data, String key) throws AclException {
        return calSignature(data, key, DEFAULT_ALGORITHM, DEFAULT_CHARSET);
    }

    public static String calSignature(String data, String key, SigningAlgorithm algorithm,
        Charset charset) throws AclException {
        return signAndBase64Encode(data, key, algorithm, charset);
    }

    private static String signAndBase64Encode(String data, String key, SigningAlgorithm algorithm, Charset charset)
        throws AclException {
        try {
            byte[] signature = sign(data.getBytes(charset), key.getBytes(charset), algorithm);
            return new String(Base64.encodeBase64(signature), DEFAULT_CHARSET);
        } catch (Exception e) {
            String message = String.format(CAL_SIGNATURE_FAILED_MSG, CAL_SIGNATURE_FAILED, e.getMessage());
            log.error(message, e);
            throw new AclException("CAL_SIGNATURE_FAILED", CAL_SIGNATURE_FAILED, message, e);
        }
    }

    private static byte[] sign(byte[] data, byte[] key, SigningAlgorithm algorithm) throws AclException {
        try {
            Mac mac = Mac.getInstance(algorithm.toString());
            mac.init(new SecretKeySpec(key, algorithm.toString()));
            return mac.doFinal(data);
        } catch (Exception e) {
            String message = String.format(CAL_SIGNATURE_FAILED_MSG, CAL_SIGNATURE_FAILED, e.getMessage());
            log.error(message, e);
            throw new AclException("CAL_SIGNATURE_FAILED", CAL_SIGNATURE_FAILED, message, e);
        }
    }

    public static String calSignature(byte[] data, String key) throws AclException {
        return calSignature(data, key, DEFAULT_ALGORITHM, DEFAULT_CHARSET);
    }

    public static String calSignature(byte[] data, String key, SigningAlgorithm algorithm,
        Charset charset) throws AclException {
        return signAndBase64Encode(data, key, algorithm, charset);
    }

    private static String signAndBase64Encode(byte[] data, String key, SigningAlgorithm algorithm, Charset charset)
        throws AclException {
        try {
            byte[] signature = sign(data, key.getBytes(charset), algorithm);
            return new String(Base64.encodeBase64(signature), DEFAULT_CHARSET);
        } catch (Exception e) {
            String message = String.format(CAL_SIGNATURE_FAILED_MSG, CAL_SIGNATURE_FAILED, e.getMessage());
            log.error(message, e);
            throw new AclException("CAL_SIGNATURE_FAILED", CAL_SIGNATURE_FAILED, message, e);
        }
    }

}
  • calSignature默认使用的是SigningAlgorithm.HmacSHA1及Charset.forName("UTF-8")字符集来签名;signAndBase64Encode方法首先通过sign方法签名,然后将其转为Base64的字符串

小结

AclClientRPCHook实现了RPCHook接口,其构造器接收SessionCredentials参数;其doBeforeRequest首先通过parseRequestContent从request读取CommandCustomHeader,将其field连同accessKey、securityToken放到一个SortedMap,再通过AclUtils.combineRequestContent计算要发送的请求内容;然后通过AclUtils.calSignature计算出signature,最后往request的extFields添加SIGNATURE、ACCESS_KEY;若设置securityToken,则会往request的extFields添加SECURITY_TOKEN

doc

  • AclClientRPCHook

本文分享自微信公众号 - 码匠的流水账(geek_luandun)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-11-09

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Crossin的编程教室

【Python 第75课】可迭代对象和迭代器

for 循环是我们在 Python 里非常常用的一个语法,但你有没有思考过 for 循环是怎样实现的?

10520
来自专栏丑胖侠

面试官,Java8 JVM内存结构变了,永久代到元空间

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 ...

10660
来自专栏Java3y

慌了!面试官又拿JVM开怼!

对于Java人来说,JVM无疑是进阶时必须迈过的坎。不管初入职场还是跳槽升职,JVM更是面试时的必考题。如果不懂JVM的话,薪酬会非常吃亏(近70%的面试者挂在...

8440
来自专栏码洞

每个阿里程序员都必须搞懂的Maven基础知识

以前我们写代码时,jar包都默认放在一个叫 /lib 的目录下,然后把该目录设置为classpath可以读取到的目录,如下图所示:

9320
来自专栏JAVA杂谈

Java日志Log4j或者Logback的NDC和MDC功能

Java中使用的日志的实现框架有很多种,常用的log4j和logback以及java.util.logging,而log4j是apache实现的一个开源日志组件...

15120
来自专栏北京宏哥

Java自动化测试框架-04 - TestNG之Test Method篇 - 道法自然,法力无边(详细教程)

测试方法是可以带有参数的。每个测试方法都可以带有任意数量的参数,并且可以通过使用TestNG的@Parameters向方法传递正确的参数。

13020
来自专栏业余草

面试再问ThreadLocal,别说你不会

以前面试的时候问到ThreadLocal总是一脸懵逼,只知道有这个哥们,不了解他是用来做什么的,更不清楚他的原理了。表面上看他是和多线程,线程同步有关的一个工具...

6910
来自专栏Java识堂

帮你少写一大半参数校验代码的小技巧

几乎每个web网站都会对用户提交的参数进行校验,前端要做,后端也要做。防止用户直接通过接口调用的方式来请求或保存数据,从而导致产生脏数据等其他严重的后果。

10420
来自专栏搜云库技术团队

常问的15个顶级Java多线程面试题

在任何Java面试当中多线程和并发方面的问题都是必不可少的一部分。如果你想获得更多职位,那么你应该准备很多关于多线程的问题。

10530
来自专栏芋道源码1024

一个 Java 对象到底有多大?

编写Java代码的时候,大多数情况下,我们很少关注一个Java对象究竟有多大(占据多少内存),更多的是关注业务与逻辑。但是殊不知,在我们不经意间,大量的内存被无...

10410

扫码关注云+社区

领取腾讯云代金券

年度创作总结 领取年终奖励