大部分聊天软件的机器人自动回复消息流程
文档:QQ 机器人 - 简介
控制台:QQ 开放平台
使用 python SDK,Github - botpy
安装
pip install qq-botpy
示例脚本如下
import botpy
from botpy.message import Message
class MyClient(botpy.Client):
# 接受频道所有消息
async def on_message_create(self, message: Message):
logging.info(f'QQ received message from {message.author.username}: {message.content}')
reply = 'Hello, ' + message.content
await message.reply(content=reply)
# 接收 @机器人 的消息
async def on_at_message_create(self, message: Message):
at_pattern = r"^<@!?(\d+)>[\s]*"
content = re.sub(at_pattern, '', message.content)
logging.info(f'QQ received at message from {message.author.username}: {content}')
reply = 'Hello, ' + message.content
await message.reply(content=reply)
# 接收私信的消息
async def on_direct_message_create(self, message: Message):
at_pattern = r"^<@!?(\d+)>[\s]*"
content = re.sub(at_pattern, '', message.content)
logging.info(f'QQ received at message from {message.author.username}: {content}')
reply = 'Hello, ' + message.content
await message.reply(content=reply)
intents = botpy.Intents(
guild_messages=True,
public_guild_messages=True,
direct_message=True
)
client = MyClient(intents=intents)
client.run(appid=YOUR_APP_ID, app_secret=YOUR_APP_SECRET)
其中 intents 表示监听事件,监听事件后通过实现 client
的 on_XX
方法可以获取并响应对应事件
所有监听事件见文档
申请流程,也可以参考文档 Getting Started
使用 python SDK,discord.py - Quickstart
安装
pip install -U discord.py
示例脚本如下
import discord
intents = discord.Intents.default()
intents.message_content = True
client = discord.Client(intents=intents)
@client.event
async def on_ready():
print(f'We have logged in as {client.user}')
@client.event
async def on_message(message):
if message.author == client.user:
return
if message.content.startswith('$hello'):
await message.channel.send('Hello!')
client.run('your token here')
修改 token 后运行该脚本即可,机器人客户端会自动请求 Discord 服务器并保持心跳,无需配置回调地址或者放开服务端口。
注意:机器人后台最好部署在海外服务器,不然可能请求不通 Discord 的服务器
from fastapi import FastAPI
from pydantic import BaseModel
class LarkRequest(BaseModel):
encrypt: str
app = FastAPI()
@app.post('/bot/lark')
def bot_lark(req: LarkRequest):
cipher = AESCipher(YOUR_ENCRYPT_KEY)
decrypted = bot.cipher.decrypt_string(encrypt)
data = json.loads(decrypted)
logging.info(data)
if data.get('type') == 'url_verification':
return {
"challenge": data.get('challenge', 'challenge_failed')
}
message = data.get('event', {}).get('message', {})
msg_content = json.loads(message.get('content', '{}'))
content = msg_content.get('text', '')
sender = data.get('event', {}).get('sender', {})
user_id = sender.get('sender_id', {}).get('user_id', '')
logging.info(f"received message `{content}` from user `{user_id}`")
其中解密模块如下
#!pip install pycryptodome
from Crypto.Cipher import AES
import base64
import hashlib
class AESCipher(object):
def __init__(self, key):
self.bs = AES.block_size
self.key=hashlib.sha256(AESCipher.str_to_bytes(key)).digest()
@staticmethod
def str_to_bytes(data):
u_type = type(b"".decode('utf8'))
if isinstance(data, u_type):
return data.encode('utf8')
return data
@staticmethod
def _unpad(s):
return s[:-ord(s[len(s) - 1:])]
def decrypt(self, enc):
iv = enc[:AES.block_size]
cipher = AES.new(self.key, AES.MODE_CBC, iv)
return self._unpad(cipher.decrypt(enc[AES.block_size:]))
def decrypt_string(self, enc):
enc = base64.b64decode(enc)
return self.decrypt(enc).decode('utf8')
获取 access token
import httpx
import logging
url = 'https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal'
headers = {
'Content-Type': 'application/json; charset=utf-8'
}
data = {
'app_id': YOUR_APP_ID,
'app_secret': YOUR_APP_SECRET,
}
with httpx.Client() as client:
r = client.post(url, headers=headers, json=data)
try:
token = r.json()['tenant_access_token']
logging.info(f'token updated: {token}')
except Exception as e:
logging.error(f'update token failed: {e}')
logging.error(r.text)
发送消息
url = 'https://open.feishu.cn/open-apis/im/v1/messages?receive_id_type=chat_id'
headers = {
'Authorization': 'Bearer ' + YOUR_ACCESS_TOKEN,
'Content-Type': 'application/json; charset=utf-8',
}
content = {
"text": 'YOUR_REPLY',
}
data = {
'receive_id': 'CHAT_ID_FROM_EVENT',
'msg_type': 'text',
'content': json.dumps(content)
}
with httpx.Client() as client:
r = client.post(url, headers=headers, json=data)
logging.info(r.text)
开发流程与接口与飞书类似,有以下几个区别点
配置回调地址时会发送验证消息,需要将消息解密后返回才能通过验证
from fastapi.responses import PlainTextResponse
from fastapi import FastAPI
import logging
from urllib.parse import unquote
app = FastAPI()
@app.get('/bot', response_class=PlainTextResponse)
def handle_echo(msg_signature: str, timestamp: str, nonce: str, echostr: str) -> str:
echostr = unquote(echostr)
logging.info(f'msg_signature: {msg_signature}, timestamp: {timestamp}, nonce: {nonce}, echostr: {echostr}')
ret, echostr = bot.wxcpt.VerifyURL(msg_signature, timestamp, nonce, echostr)
logging.info(f'ret: {ret}, echostr: {echostr}')
if ret != 0:
logging.error("error, VerifyURL ret: " + str(ret))
return ''
return echostr.decode('utf-8')
其中解密模块见:Github - weworkapi_python - WXBizMsgCrypt3.py
需要安装 pycryptodome
pip install pycryptodome
与验证消息的 handler 类似,但是是 post 请求
from fastapi import Body
@app.post('/bot', response_class=PlainTextResponse)
def handle_echo(msg_signature: str, timestamp: str, nonce: str, data: str = Body(..., media_type="text/plain")) -> str:
ret, xml = bot.wxcpt.DecryptMsg(data, msg_signature, timestamp, nonce)
if ret != 0:
logging.error("error, DecryptMsg ret: " + str(ret))
return ''
msg = _xml.read_xml(xml)
if msg is None:
logging.error(f"parse xml error, xml: {xml}")
return ''
logging.info(f'receive msg: {msg}')
return ''
消息解密后是 xml 格式,可以用以下代码解析
from typing import NamedTuple
import logging
import xml.etree.cElementTree as ET
class Message(NamedTuple):
user_id: str
user_name: str
user_alias: str
webhook_url: str
chat_id: str
chat_info_url: str
msg_id: str
msg_type: str
chat_type: str
content: str
def read_xml(xml: bytes) -> Message:
try:
xml_tree = ET.fromstring(xml.decode())
sender = xml_tree.find('From')
user_id = sender.find('UserId').text
user_name = sender.find('Name').text
user_alias = sender.find('Alias').text
webhook_url = xml_tree.find('WebhookUrl').text
chat_id = xml_tree.find('ChatId').text
chat_info_url = xml_tree.find('GetChatInfoUrl').text
msg_id = xml_tree.find('MsgId').text
msg_type = xml_tree.find('MsgType').text
chat_type = xml_tree.find('ChatType').text
content = ''
if msg_type == 'text':
content = xml_tree.find('Text').find('Content').text
return Message(
user_id,
user_name,
user_alias,
webhook_url,
chat_id,
chat_info_url,
msg_id,
msg_type,
chat_type,
content
)
except Exception as e:
logging.error(f"read xml error: {str(e)}")
return None
发送文字示例,支持 markdown 格式
import httpx
body = {
'msgtype': 'markdown',
'markdown': {
'content': 'YOUR_MESSAGE'
}
}
if chat_id is not None:
body['chatid'] = chat_id # 发送到指定群聊
if user_id is not None:
content = '<@{user_id}>\n' + content # 加上 @用户
r = httpx.post(webhook_url, json=body)
发送图片示例
body = {
'msgtype': 'image',
'image': {
'base64': image_base64,
'md5': image_md5,
}
}
r = httpx.post(self.webhook_url, json=body)
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。