Title: 编写企业微信应用 - 回调URL验证笔记
2021-08-11 09:36:32
写的,有点乱,优化下Token
和EncodingAESKey
Token
和EncodingAESKey
Token
在校验消息体签名时会用到EncodingAESKey
在解密接收到的消息内容时会用到python 在 Windows下使用AES时要安装的是pycryptodome 模块:pip install pycryptodome
python 在 Linux下使用AES时要安装的是pycrypto模块:pip install pycrypto
假设企业的接收消息的URL设置为http://api.3dept.com。
企业管理员在保存回调配置信息时,企业微信会发送一条验证消息到填写的URL,请求内容如下:
请求方式:GET
请求地址:http://api.3dept.com/?msg_signature=ASDFQWEXZCVAQFASDFASDFSS×tamp=13500001234&nonce=123412323&echostr=ENCRYPT_STR
参数 | 类型 | 说明 |
---|---|---|
msg_signature | String | 企业微信加密签名,msg_signature计算结合了企业填写的token、请求中的timestamp、nonce、加密的消息体。签名计算方法参考 消息体签名检验 |
timestamp | Integer | 时间戳。与nonce结合使用,用于防止请求重放攻击。 |
nonce | String | 随机数。与timestamp结合使用,用于防止请求重放攻击。 |
echostr | String | 加密的字符串。需要解密得到消息内容明文,解密后有random、msg_len、msg、receiveid四个字段,其中msg即为消息内容明文 |
msg_signature
(消息签名)参数验证消息正确性echostr
参数的内容,获得消息明文token(添加回调URL时从企业微信处获得的)
、timestamp
、nonce
、encrypt
参数按字母字典从小到大排序(用sort)msg_signature
相同,则通过# 消息体签名校验
def check_msg_signature(msg_signature,token,timestamp,nonce,echostr):
# 使用sort()从小到大排序[].sort()是在原地址改值的,所以如果使用li_s = li.sort(),li_s是空的,li的值变为排序后的值]
li = [token,timestamp,nonce,echostr]
li.sort()
# 将排序结果拼接
li_str = li[0]+li[1]+li[2]+li[3]
# 计算SHA-1值
sha1 = hashlib.sha1()
# update()要指定加密字符串字符代码,不然要报错:
# "Unicode-objects must be encoded before hashing"
sha1.update(li_str.encode("utf8"))
sha1_result = sha1.hexdigest()
# 比较并返回比较结果
if sha1_result == msg_signature:
return True
else:
return False
msg_len
]+[msg_len
字节消息正文]+[N字节receiveid
]msg_len
存放的是正文消息字节数,用的是16进制存储。msg_len
为000A
则正文消息共有10字节,第21到31字节为正文消息。EncodingAESKey
AES-CBC
解密encrypt
encrypt
中获得msg_len
msg_len
得到正文字节数,并从解密的encrypt
中截取消息明文import base64
from Crypto.Cipher import AES
# 检查base64编码后数据位数是否正确
def check_base64_len(base64_str):
len_remainder = 4 - (len(base64_str) % 4)
if len_remainder == 0:
return base64_str
else:
for temp in range(0,len_remainder):
base64_str = base64_str + "="
return base64_str
ciphertext = "rgF0ehjjxx4fdkdwZyeJ5qxJUAsfGczgK2VQDORoML4K7ou+TGFKNicYDgdpPTU0/AZEgOFQAh5bU3MmX2pOlw=="
key = "eeg80S7mUubAJwsPuIEg3bfRfghCbN4zC864e7PV928"
# 处理密文、密钥和iv
ciphertext_bytes = base64.b64decode(check_base64_len(ciphertext))
key_bytes = base64.b64decode(check_base64_len(key))
iv_bytes = key_bytes[:16]
# 解密
decr = AES.new(key_bytes,AES.MODE_CBC,iv_bytes)
plaintext_bytes = decr.decrypt(ciphertext_bytes)
# 截取数据,判断消息正文字节数
msg_len_bytes = plaintext_bytes[16:20]
msg_len = int.from_bytes(msg_len_bytes,byteorder='big', signed=False)
# 根据消息正文字节数截取消息正文,并转为字符串格式
msg_bytes = plaintext_bytes[20:20+msg_len]
msg = str(msg_bytes,encoding='utf-8')
# 打印消息正文
print(msg)
EncodingAESKey
和token
变量值需要到企业微信管理后台获取from flask import Flask
from flask.globals import request
from Crypto.Cipher import AES
import base64,hashlib,xmltodict,json
app = Flask(__name__)
# 检查base64编码后数据位数是否正确
def check_base64_len(base64_str):
len_remainder = 4 - (len(base64_str) % 4)
if len_remainder == 0:
return base64_str
else:
for temp in range(0,len_remainder):
base64_str = base64_str + "="
return base64_str
# 解密并提取消息正文
def msg_base64_decrypt(ciphertext_base64,key_base64):
# 处理密文、密钥和iv
ciphertext_bytes = base64.b64decode(check_base64_len(ciphertext_base64))
key_bytes = base64.b64decode(check_base64_len(key_base64))
iv_bytes = key_bytes[:16]
# 解密
decr = AES.new(key_bytes,AES.MODE_CBC,iv_bytes)
plaintext_bytes = decr.decrypt(ciphertext_bytes)
# 截取数据,判断消息正文字节数
msg_len_bytes = plaintext_bytes[16:20]
msg_len = int.from_bytes(msg_len_bytes,byteorder='big', signed=False)
# 根据消息正文字节数截取消息正文,并转为字符串格式
msg_bytes = plaintext_bytes[20:20+msg_len]
msg = str(msg_bytes,encoding='utf-8')
return msg
# 消息体签名校验
def check_msg_signature(msg_signature,token,timestamp,nonce,echostr):
# 使用sort()从小到大排序[].sort()是在原地址改值的,所以如果使用li_s = li.sort(),li_s是空的,li的值变为排序后的值]
li = [token,timestamp,nonce,echostr]
li.sort()
# 将排序结果拼接
li_str = li[0]+li[1]+li[2]+li[3]
# 计算SHA-1值
sha1 = hashlib.sha1()
# update()要指定加密字符串字符代码,不然要报错:
# "Unicode-objects must be encoded before hashing"
sha1.update(li_str.encode("utf8"))
sha1_result = sha1.hexdigest()
# 比较并返回比较结果
if sha1_result == msg_signature:
return True
else:
return False
@app.route("/",methods=["get"])
def wx_check_api():
EncodingAESKey = ""
token = ""
msg_signature = request.args.to_dict().get("msg_signature")
timestamp = request.args.to_dict().get("timestamp")
nonce = request.args.to_dict().get("nonce")
echostr = request.args.to_dict().get("echostr")
# 获取消息体签名校验结果
check_result = check_msg_signature(msg_signature,token,timestamp,nonce,echostr)
if check_result:
decrypt_result = msg_base64_decrypt(echostr,EncodingAESKey)
print("通过")
return decrypt_result
else:
return ""
if __name__ == "__main__":
app.run(debug=False,host='0.0.0.0',port=6363)
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。