专栏首页万物皆可Serverless【玩转腾讯云】万物皆可Serverless之使用SCF+COS免费运营微信公众号
原创

【玩转腾讯云】万物皆可Serverless之使用SCF+COS免费运营微信公众号

万物皆可Serverless系列文章

  1. 万物皆可Serverless之免费搭建自己的不限速大容量云盘(5TB)
  2. 万物皆可Serverless之使用云函数Timer触发器实现每天自动定时打卡
  3. 万物皆可Serverless之使用SCF+COS快速开发全栈应用
  4. 万物皆可Serverless之使用SCF+COS免费运营微信公众号
  5. 万物皆可Serverless之使用SCF快速部署验证码识别接口
  6. 万物皆可Serverless之Kaggle+SCF端到端验证码识别从训练到部署
  7. 万物皆可Serverless之借助微信公众号简单管理用户激活码
  8. 万物皆可Serverless之使用SCF+COS给未来写封信
  9. 万物皆可Serverless之在Flutter中快速接入腾讯云开发
  10. 万物皆可Serverless之在Flutter中写一个Dart原生腾讯云对象存储插件
  11. 万物皆可Serverless之我的Serverless之路

一、本文介绍

是的,你没听错,这一次我来带大家直接上手运营微信公众号。

而且像我这种基本没人关注的微信公众号运营起来是100%免费的。

震惊,Awesome,哼,我才不信捏,所谓无图无真相 ~

废话少说,上图 ?

最终效果1
最终效果2

更多的体验,可以关注我的微信公众号: 乂乂又又 (仅供测试,不要乱搞哈~)

嗯,这次我信了,快点教一下我吧,嘤嘤嘤~

二、操作步骤

在上一篇《万物皆可Serverless之使用SCF+COS快速开发全栈应用》教程中,

我们用腾讯云无服务器函数和对象存储实现了一个后端云函数,这个云函数可以根据我们的请求返回对应的结果

现在我们将尝试在这个云函数的基础上解析微信XML消息,实现公众号消息的自动回复,关键词回复,文字菜单等功能

第一步:添加相关依赖

为了快速完成开发,这里我们选择python第三方开源库wechatpy来接入微信公众平台。

wechatpy

wechatpy支持以下功能

  1. 普通公众平台被动响应和主动调用 API
  2. 企业微信 API
  3. 微信支付 API
  4. 第三方平台代公众号调用接口 API
  5. 小程序云开发 API

可见功能是十分完整的,不仅支持普通公众平台主被动调用,企业微信和微信支付,

甚至还支持第三方平台代公众号调用接口,拿来运营微信公众号是十分绰绰有余的~

完整项目依赖

由于腾讯云函数的运行环境中缺少第三方库,需要我们自己手动上传添加依赖,

这里我们需要添加的第三方依赖有:wechatpy、otionaldict、xmltodict以及timeout_decorator

其中wechatpy需要依赖otionaldict、xmltodict,timeout_decorator是用来限制函数运行时长的

具体的依赖文件可以自行pip安装后copy到云函数项目根目录,如上图

第二步:接入微信公众号

微信公众号开发者后台

这里需要记下自己的AppID、Token和EncodingAESKey,消息加密方式建议选为安全模式

这个页面先不要关,一会我们上线发布好云函数还需要过来再次修改配置

第三步:编写云函数解析并回复微信公众号消息

这一步可以直接参考wechatpy的官方文档,地址 http://docs.wechatpy.org/zh_CN/master/quickstart.html#id2

wechat文档

Life is short, show me the code.

这里我就直接上代码了(原始业务代码已略去,可以按照自己的需求开发)

import json
import timeout_decorator
from wechatpy.replies import ArticlesReply
from wechatpy.utils import check_signature
from wechatpy.crypto import WeChatCrypto
from wechatpy import parse_message, create_reply
from wechatpy.exceptions import InvalidSignatureException, InvalidAppIdException

# 是否开启本地debug模式
debug = False

# 腾讯云对象存储依赖
if debug:
    from qcloud_cos import CosConfig
    from qcloud_cos import CosS3Client
    from qcloud_cos import CosServiceError
    from qcloud_cos import CosClientError
else:
    from qcloud_cos_v5 import CosConfig
    from qcloud_cos_v5 import CosS3Client
    from qcloud_cos_v5 import CosServiceError
    from qcloud_cos_v5 import CosClientError
    
# 配置存储桶
appid = '66666666666'
secret_id = u'xxxxxxxxxxxxxxx'
secret_key = u'xxxxxxxxxxxxxxx'
region = u'ap-chongqing'
bucket = 'name'+'-'+appid

# 对象存储实例
config = CosConfig(Secret_id=secret_id, Secret_key=secret_key, Region=region)
client = CosS3Client(config)

# cos 文件读写
def cosRead(key):
    try:
        response = client.get_object(Bucket=bucket, Key=key)
        txtBytes = response['Body'].get_raw_stream()
        return txtBytes.read().decode()
    except CosServiceError as e:
        return ""

def cosWrite(key, txt):
    try:
        response = client.put_object(
            Bucket=bucket,
            Body=txt.encode(encoding="utf-8"),
            Key=key,
        )
        return True
    except CosServiceError as e:
        return False

def getReplys():
    replyMap = {}
    replyTxt = cosRead('Replys.txt')  # 读取数据
    if len(replyTxt) > 0:
        replyMap = json.loads(replyTxt)
    return replyMap

def addReplys(reply):
    replyMap = getReplys()
    if len(replyMap) > 0:
        replyMap[reply]='我是黑名单'
    return cosWrite('Replys.txt', json.dumps(replyMap, ensure_ascii=False)) if len(replyMap) > 0 else False


def delReplys(reply):
    replyMap = getReplys()
    if len(replyMap) > 0:
        replyMap.pop(reply)
    return cosWrite('Replys.txt', json.dumps(replyMap, ensure_ascii=False)) if len(replyMap) > 0 else False


# 微信公众号对接
wecaht_id = 'xxxxxxxxxxxxxxx'
WECHAT_TOKEN = 'xxxxxxxxxxxxxxxxxxx'
encoding_aes_key = 'xxxxxxxxxxxxxxxxxxxxxx'

crypto = WeChatCrypto(WECHAT_TOKEN, encoding_aes_key, wecaht_id)

# api网关响应集成
def apiReply(reply, txt=False, content_type='application/json', code=200):
    return {
        "isBase64Encoded": False,
        "statusCode": code,
        "headers": {'Content-Type': content_type},
        "body": json.dumps(reply, ensure_ascii=False) if not txt else str(reply)
    }

def replyMessage(msg):
    txt = msg.content
    ip = msg.source
    print('请求信息--->'+ip+'%'+txt)  # 用来在腾讯云控制台打印请求日志
    replysTxtMap = getReplys() # 获取回复关键词
    if '@' in txt:
        keys = txt.split('@')
        if keys[0] == '电影': #do something
            return
        if keys[0] == '音乐': #do something
            return
        if keys[0] == '下架': #do something
            return
        if keys[0] == '上架': #do something
            return
        if keys[0] == '回复': #do something
            return
        if keys[0] == '删除': #do something
            return
    elif txt in replysTxtMap.keys(): # 如果消息在回复关键词内则自动回复
        return create_reply(replysTxtMap[txt], msg)
    return create_reply("喵呜 ฅ'ω'ฅ", msg)

def wechat(httpMethod, requestParameters, body=''):
    if httpMethod == 'GET':
        signature = requestParameters['signature']
        timestamp = requestParameters['timestamp']
        nonce = requestParameters['nonce']
        echo_str = requestParameters['echostr']
        try:
            check_signature(WECHAT_TOKEN, signature, timestamp, nonce)
        except InvalidSignatureException:
            echo_str = 'error'
        return apiReply(echo_str, txt=True, content_type="text/plain")
    elif httpMethod == 'POST':
        msg_signature = requestParameters['msg_signature']
        timestamp = requestParameters['timestamp']
        nonce = requestParameters['nonce']
        try:
            decrypted_xml = crypto.decrypt_message(
                body,
                msg_signature,
                timestamp,
                nonce
            )
        except (InvalidAppIdException, InvalidSignatureException):
            return
        msg = parse_message(decrypted_xml)
        if msg.type == 'text':
            reply = replyMessage(msg)
        elif msg.type == 'image':
            reply = create_reply('哈◔ ‸◔?\n好端端的,给我发图片干啥~', msg)
        elif msg.type == 'voice':
            reply = create_reply('哈◔ ‸◔?\n好端端的,给我发语音干啥~', msg)
        else:
            reply = create_reply('哈◔ ‸◔?\n搞不明白你给我发了啥~', msg)
        reply = reply.render()
        print('返回结果--->'+str(reply))  # 用来在腾讯云控制台打印请求日志
        reply = crypto.encrypt_message(reply, nonce, timestamp)
        return apiReply(reply, txt=True, content_type="application/xml")
    else:
        msg = parse_message(body)
        reply = create_reply("喵呜 ฅ'ω'ฅ", msg)
        reply = reply.render()
        print('返回结果--->'+str(reply))  # 用来在腾讯云控制台打印请求日志
        reply = crypto.encrypt_message(reply, nonce, timestamp)
        return apiReply(reply, txt=True, content_type="application/xml")


@timeout_decorator.timeout(4, timeout_exception=StopIteration)
def myMain(httpMethod, requestParameters, body=''):
    return wechat(httpMethod, requestParameters, body=body)


def timeOutReply(httpMethod, requestParameters, body=''):
    msg_signature = requestParameters['msg_signature']
    timestamp = requestParameters['timestamp']
    nonce = requestParameters['nonce']
    try:
        decrypted_xml = crypto.decrypt_message(
            body,
            msg_signature,
            timestamp,
            nonce
        )
    except (InvalidAppIdException, InvalidSignatureException):
        return
    msg = parse_message(decrypted_xml)
    reply = create_reply("出了点小问题,请稍后再试", msg).render()
    print('返回结果--->'+str(reply))  # 用来在腾讯云控制台打印请求日志
    reply = crypto.encrypt_message(reply, nonce, timestamp)
    return apiReply(reply, txt=True, content_type="application/xml")


def main_handler(event, context):
    body = ''
    httpMethod = event["httpMethod"]
    requestParameters = event['queryString']
    if 'body' in event.keys():
        body = event['body']
    try:
        response = myMain(httpMethod, requestParameters, body=body)
    except:
        response = timeOutReply(httpMethod, requestParameters, body=body)
    return response

请求参数解析和COS读写部分可参考我上一篇《万物皆可Serverless之使用SCF+COS快速开发全栈应用》教程

下面我来捋一下整个云函数的思路

def main_handler(event, context):
    body = ''
    httpMethod = event["httpMethod"]
    requestParameters = event['queryString']
    if 'body' in event.keys():
        body = event['body']
    try:
        response = myMain(httpMethod, requestParameters, body=body)
    except:
        response = timeOutReply(httpMethod, requestParameters, body=body)
    return response

我们先从main_handler入手,

这里我们通过aip网关触发云函数在event里拿到了微信公众号请求的方法、头部和请求体

然后传给myMain函数做处理,需要注意的是myMain是通过timeout_decorator包装的限时运行函数,

@timeout_decorator.timeout(4, timeout_exception=StopIteration)
def myMain(httpMethod, requestParameters, body=''):
    return wechat(httpMethod, requestParameters, body=body)

当myMain函数运行市场超过设定的4秒后,就会抛出异常,

然后我们可以通过设置一个timeOutReply函数来处理超时后的微信公众号消息回复

可是为什么要这么做呢?

函数运行超时后

可以看到,当云函数运行超时后,微信这边就会显示“该公众号提供的服务器出现故障,请稍后再试”

这对用户体验是极不友好的,所以我们需要一个函数超时后的回复来兜底。

微信消息请求频次

那么对于一次微信公众号后台消息请求多长时间算是超时呢?

答案是5秒左右,从云函数后台的调用日志我们可以得到这个结果。

不过需要注意的是对于用户的一次消息请求,微信可能会每隔1秒左右重拨一次请求,直到收到服务器第一次响应

另外,超过3次应该就不会再重拨了,并且在5秒超时后即使云函数调用成功并返回了数据,用户也不会再接收到消息了~

所以我们就很有必要将自己的云函数的运行时长限制在5秒之内了!

当然只通过配置云函数超时时长得方式来处理是不正确的,因为这样做云函数超时后就被系统停掉了,并不会向微信返回消息

所以从一开始我就导入了timeout_decorator库来限制主函数的运行时长,并用一个超时后回复函数来兜底。

另外值得一提的是,在我原始的业务代码中是有一些爬虫,

这些爬虫本来我是单线程顺序执行的,考虑到超时问题,我在微信云函数版这里全部改成了多线程运行来压缩时间

所以如果你也有一些比较耗时的小任务话,也可以尝试通过多线程的方式来压缩云函数的运行时长

OK,一不小心扯太多了,我们接着向下看

def wechat(httpMethod, requestParameters, body=''):
    if httpMethod == 'GET':
        signature = requestParameters['signature']
        timestamp = requestParameters['timestamp']
        nonce = requestParameters['nonce']
        echo_str = requestParameters['echostr']
        try:
            check_signature(WECHAT_TOKEN, signature, timestamp, nonce)
        except InvalidSignatureException:
            echo_str = 'error'
        return apiReply(echo_str, txt=True, content_type="text/plain")
    elif httpMethod == 'POST':
        msg_signature = requestParameters['msg_signature']
        timestamp = requestParameters['timestamp']
        nonce = requestParameters['nonce']
        try:
            decrypted_xml = crypto.decrypt_message(
                body,
                msg_signature,
                timestamp,
                nonce
            )
        except (InvalidAppIdException, InvalidSignatureException):
            return
        msg = parse_message(decrypted_xml)
        if msg.type == 'text':
            reply = replyMessage(msg)
        elif msg.type == 'image':
            reply = create_reply('哈◔ ‸◔?\n好端端的,给我发图片干啥~', msg)
        elif msg.type == 'voice':
            reply = create_reply('哈◔ ‸◔?\n好端端的,给我发语音干啥~', msg)
        else:
            reply = create_reply('哈◔ ‸◔?\n搞不明白你给我发了啥~', msg)
        reply = reply.render()
        print('返回结果--->'+str(reply))  # 用来在腾讯云控制台打印请求日志
        reply = crypto.encrypt_message(reply, nonce, timestamp)
        return apiReply(reply, txt=True, content_type="application/xml")
    else:
        msg = parse_message(body)
        reply = create_reply("喵呜 ฅ'ω'ฅ", msg)
        reply = reply.render()
        print('返回结果--->'+str(reply))  # 用来在腾讯云控制台打印请求日志
        reply = crypto.encrypt_message(reply, nonce, timestamp)
        return apiReply(reply, txt=True, content_type="application/xml")

这里的wechat函数就是整个微信消息的解析过程

首先判断请求方法是GET还是POST,GET方法只在第一次绑定微信后台时会用到,

这时我们会从微信服务器推送的请求参数中拿到 signature, timestamp, echostrnonce 参数,

check_signature(WECHAT_TOKEN, signature, timestamp, nonce)

我们只需根据自己的公众号 token 和来生成签名与微信服务器传过来的 signature 对比看是否一致,

若一致就说明我们的消息加解密验证是OK的,然后再将 echostr 原样返回即可接入微信公众号后台。

接入好微信公众号后,如果有用户在后台给我们发送消息,这里云函数收到的就是POST方法,

elif httpMethod == 'POST':
        msg_signature = requestParameters['msg_signature']
        timestamp = requestParameters['timestamp']
        nonce = requestParameters['nonce']
        try:
            decrypted_xml = crypto.decrypt_message(
                body,
                msg_signature,
                timestamp,
                nonce
            )
        except (InvalidAppIdException, InvalidSignatureException):
            return
        msg = parse_message(decrypted_xml)
        if msg.type == 'text':
            reply = replyMessage(msg)
        elif msg.type == 'image':
            reply = create_reply('哈◔ ‸◔?\n好端端的,给我发图片干啥~', msg)
        elif msg.type == 'voice':
            reply = create_reply('哈◔ ‸◔?\n好端端的,给我发语音干啥~', msg)
        else:
            reply = create_reply('哈◔ ‸◔?\n搞不明白你给我发了啥~', msg)
        reply = reply.render()
        print('返回结果--->'+str(reply))  # 用来在腾讯云控制台打印请求日志
        reply = crypto.encrypt_message(reply, nonce, timestamp)
        return apiReply(reply, txt=True, content_type="application/xml")

然后我们根据前面在微信公众号后台拿到的id,

token和aes加密key来初始化消息加解密实例并解密还原用户发送的消息

# 微信公众号对接
wecaht_id = 'xxxxxxxxxxxxxxx'
WECHAT_TOKEN = 'xxxxxxxxxxxxxxxxxxx'
encoding_aes_key = 'xxxxxxxxxxxxxxxxxxxxxx'

crypto = WeChatCrypto(WECHAT_TOKEN, encoding_aes_key, wecaht_id)

接着判断一下消息类型,不同类型的消息可自行处理

        msg = parse_message(decrypted_xml)
        if msg.type == 'text':
            reply = replyMessage(msg)
        elif msg.type == 'image':
            reply = create_reply('哈◔ ‸◔? 好端端的,给我发图片干啥~', msg)
        elif msg.type == 'voice':
            reply = create_reply('哈◔ ‸◔? 好端端的,给我发语音干啥~', msg)
        else:
            reply = create_reply('哈◔ ‸◔? 搞不明白你给我发了啥~', msg)

需要注意的是当一个用户新关注自己的公众号时,我们收到的是一个其他类型的消息,

也就是上面的最后一个判断项,这里你可以自己设置新关注用户的欢迎语

        reply = create_reply('哈◔ ‸◔?\n搞不明白你给我发了啥~', msg)
        reply = reply.render()
        print('返回结果--->'+str(reply))  # 用来在腾讯云控制台打印请求日志
        reply = crypto.encrypt_message(reply, nonce, timestamp)
        return apiReply(reply, txt=True, content_type="application/xml")        

之后我们通过create_reply来快速创建一个文本回复,并通过render()来生成xml回复消息文本

因为我之前在后台设置的是安全模式,所以还需要把xml重新通过crypto.encrypt_message方法加密,

然后才能把加密后的回复消息返回给微信服务器

上一篇文章我有提到我们不能直接返回消息,需要按照特定的格式返回数据(api网关需要开启响应集成)

# api网关响应集成
def apiReply(reply, txt=False, content_type='application/json', code=200):
    return {
        "isBase64Encoded": False,
        "statusCode": code,
        "headers": {'Content-Type': content_type},
        "body": json.dumps(reply, ensure_ascii=False) if not txt else str(reply)
    }

这里我再拿小竹竿给大家敲一下黑板,注意一定要给api网关开启响应集成哈。

第四步:上线发布云函数、添加Api网关触发器、启用响应集成

参考我上一篇教程 《万物皆可Serverless之使用SCF+COS快速开发全栈应用》

第五步:修改微信公众号后台服务器配置

终于到最后一步了,如果你已经上线发布了好自己的云函数,

那么快去微信公众号后台绑定一下自己的后台服务器配置吧~

后台服务器配置

三、文章最后

如果你还有啥问题或者遇到啥问题的话,

可以直接贴到评论里大家一起来解决下

呼~ 终于又写完了一篇

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 【玩转腾讯云】万物皆可Serverless之借助微信公众号简单管理用户激活码

    就可以添加并回复一个指定有效期的会员激活码,实现了在微信公众号简单管理用户激活码的需求

    乂乂又又
  • 【玩转腾讯云】万物皆可Serverless之Kaggle+SCF端到端验证码识别从训练到部署

    近些年来人工智能迅速发展,尤其是在深度学习神经网络这一块生态尤为繁荣,各种算法和模型层出不穷。

    乂乂又又
  • 【玩转腾讯云】万物皆可Serverless之在Flutter中写一个Dart原生腾讯云对象存储插件

    比如将用户头像上传存储到自己的对象存储桶中,然后返回文件下载链接保存到本地数据库中,

    乂乂又又
  • Python 微信机器人:属于自己的微信机器人制作,简单易懂。图灵机器人接口api调用。

    首先你需要安装itchat库。 进入cmd,先直接pip install itchat就好了。

    小蓝枣
  • Innodb存储引擎的几个小知识点

    在Innodb存储引擎中,采用LRU算法来来对热数据进行管理的。关于LRU算法,可以在之前的文章中进行了解:

    AsiaYe
  • 【5min+】帮我排个队,谢谢。await Task.Yield()

    【五分钟的dotnet】是一个利用您的碎片化时间来学习和丰富.net知识的博文系列。它所包含了.net体系中可能会涉及到的方方面面,比如C#的小细节,Aspne...

    句幽
  • 软件推荐(方片收集) -- 浏览器收藏网页必备

    今天是软件专场的倒数第93场,跟大家分享的是收集利器--方片。下面我们把舞台交给方片,大家掌声欢迎。

    丰臣正一
  • 云拜年攻略!用Python自动回复拜年消息,还能“抗”七姑八姨的灵魂问题

    从年三十到今天,手机上的拜年消息就没停过,大多还是群发,不回复显得很没有礼貌,一一回复又累心劳神。

    大数据文摘
  • lumen for sae

    码农二狗
  • R语言系列第一期:R语言背景、下载安装及功能介绍

    之前的文章中我们总体上为大家介绍了R软件的强大功能及其便利性,那么我们就利用这个专题为大家分享一下这款科学绘图和计算的计算机程序的使用方法。作为这个系列的开始,...

    微点

扫码关注云+社区

领取腾讯云代金券