使用一机一密认证

最近更新时间:2025-08-29 14:24:12

我的收藏

使用场景

在部分安全性要求较高的业务场景下, 业务流程要求每一台设备具有独一无二的认证凭证,从而确保每个设备连接的合法性和身份唯一性。如果此时设备本身算力有限, 则无法使用基于 PKI 证书的“一机一证”认证方式,因此需要另外一种保证高安全性的同时,对设备端的 CPU 和内存资源消耗相对较小的认证方式。
为此,腾讯云消息队列 MQTT 版推出了“一机一密”的认证方式,每个设备使用独立的 Key,通过共享密钥的方式生成临时令牌(MAC Token)完成校验和服务端连接。

实现原理


如上图所示,客户侧仅需要实现客户端和 Token 服务的对接。腾讯云消息队列 MQTT 版提供 设备身份注册表,用于保证每台设备使用的 Key 的唯一性和 Token 的正确性。“一机一密” 的整体的步骤如下:
1. 客户实现的 Token 服务通过创建设备标识云 API 在 MQTT 的 设备身份注册表中创建设备身份记录,其中包含了设备的 PrimaryKey 和 SecondaryKey。后续如果对应的签名 Key 需要进行查询、修改和删除等操作,可以继续调用对应的云 API 进行相关操作;
2. Token 服务使用指定的算法,如下文 Demo 中的共享密钥签名算法(SAS Token, Shared Access Signature Token),对返回的 PrimaryKey/SecondaryKey 签署 Token,也可以按照实际需求将生成后的 Token 进行一些自由组装;
3. 客户端(设备)在连接 MQTT 服务端前,首先访问 Token 服务获取生成的 Token;
4. 客户端(设备)将生成的 Token 传入 Password 字段,用于连接 MQTT 服务器。

Token (共享密钥签名算法)服务实现

以下为腾讯云消息队列 MQTT 版提供的一个 Token 服务实现的简单示例。在示例中,Token 服务做了以下设定:
1. 规定了 Token 的时效性,即使 Token 被截获,攻击者也只在1小时内有效,极大降低了风险;
2. 使用 HMAC-SHA256 算法计算出加密后的签名;
3. 将上一步的签名按照指定的格式组装,得到最终的 Token。
以下仅是以共享密钥签名算法(SAS Token, Shared Access Signature Token)为例实现的 Token 服务,您也可以根据业务需求,实现自己的 Token 服务。
Java
Python
Node.js
/**
* @clientId 客户端标识符
* @key 设备身份注册表中 PrimaryKey 或 SecondaryKey
*/
public static String generateSasToken(String clientId, String key) throws Exception {
// Token 有效期 3600 秒
var expiry = Instant.now().getEpochSecond() + 3600;

String stringToSign = URLEncoder.encode(clientId, StandardCharsets.UTF_8) + "\\n" + expiry;
byte[] decodedKey = Base64.getDecoder().decode(key);

Mac sha256HMAC = Mac.getInstance("HmacSHA256");
SecretKeySpec secretKey = new SecretKeySpec(decodedKey, "HmacSHA256");
sha256HMAC.init(secretKey);
Base64.Encoder encoder = Base64.getEncoder();

String signature = new String(encoder.encode(
sha256HMAC.doFinal(stringToSign.getBytes(StandardCharsets.UTF_8))), StandardCharsets.UTF_8);

String token = "SharedAccessSignature sr=" + URLEncoder.encode(clientId, StandardCharsets.UTF_8)
+ "&sig=" + URLEncoder.encode(signature, StandardCharsets.UTF_8.name()) + "&se=" + expiry;
return token;
}

from base64 import b64encode, b64decode
from hashlib import sha256
from time import time
from urllib import parse
from hmac import HMAC

def generate_sas_token(clientId, key, expiry=3600):
ttl = time() + expiry
sign_key = "%s\\n%d" % ((parse.quote_plus(clientId)), int(ttl))
signature = b64encode(HMAC(b64decode(key), sign_key.encode('utf-8'), sha256).digest())

rawtoken = {
'sr' : clientId,
'sig': signature,
'se' : str(int(ttl))
}
return 'SharedAccessSignature ' + parse.urlencode(rawtoken)
var generateSasToken = function(clientId, key) {
resourceUri = encodeURIComponent(clientId);
// 设置 Token 有效期 3600 秒
var expires = (Date.now() / 1000) + 3600;
expires = Math.ceil(expires);
var toSign = resourceUri + '\\n' + expires;

// Use crypto
var hmac = crypto.createHmac('sha256', Buffer.from(key, 'base64'));
hmac.update(toSign);
var base64UriEncoded = encodeURIComponent(hmac.digest('base64'));

// Construct authorization string
var token = "SharedAccessSignature sr=" + resourceUri + "&sig="
+ base64UriEncoded + "&se=" + expires;
return token;
};