通过企业微信自建应用接入
说明:
1. 创建应用
进入企业微信管理平台后,点击菜单栏中的应用管理,并选择自建应用中的创建应用。可参考如下截图:

说明:
1. “未命名企业”为本篇用作示例的企业名称,其涉及的公司 logo 、应用 logo 为 AI 生成的 logo 图片。如有侵权,请联系删除。
2. “TencentLKE 小助手”为本篇用作示例所创建的应用,后续图片展示都将以“TencentLKE 小助手”为例。
进入创建应用的界面后,需要先配置好自建应用的 logo 、名称、可见范围,此三项为必填项。

设置好之后,就会看到如下的界面:

2. 设置API接收
自建应用创建完成后,需要配置接收自建应用消息的服务器 IP 地址或域名。在应用配置界面的功能栏中找到接收消息,并点击设置 API 接收。可参考如下截图:

注意:
1. 这里需要严格遵守企业微信提供的配置指引,填写的是接收用户消息的企业微信应用对应的服务器的 IP 地址或者域名,服务器需要自行开发符合企业微信要求的 API。
2. 接收用户消息的服务器 IP 地址或域名 ≠ 大模型知识引擎智能应用的体验链接。

企业微信提供了加解密 SDK,这里主要应用到了 SDK 中提供的
VerifyURL
函数。除了上图示例中展示的 Token
和 EncodingAESKey
两个参数外,还需要 CorpID
,即企业 ID 。企业 ID 的获取方式为:点击菜单栏中的我的企业,可以在界面最下方找到企业 ID 。如下图所示:
在 HTTP 或 HTTPS 服务器按照企业微信的要求配置好后,点击保存即可。企业微信会发送一个验证请求到服务器上,服务器正确返回就能够保存成功。
3. 企业微信自建应用与大模型知识引擎联动
3.1. 创建大模型知识引擎智能应用

进入应用,获取 AppKey 以后,就可以以 API 的方式通过对话端接口与大模型知识引擎进行交互。参考下图:

3.2. 动态获取企业微信 AccessToken
在和企业微信的消息收发联动之前,需要注意的是:企业微信自建应用需要 AccessToken 来调用企业微信的各个API接口,如回复消息等。需要在先前配置的API服务器中实现
AccessToken
的动态获取。具体接口信息请参考企业微信官方文档。获取 AccessToken
时,还需要获取应用的 Secret
,需要前往应用的配置界面查看,如下图所示:
代码示例
调用逻辑依赖的常量和结构体
package cronimport ("sync""time")const (WxTokenURL = "https://qyapi.weixin.qq.com/cgi-bin/gettoken")var (accessToken stringtokenMutex sync.RWMutextimer *time.Timer)type AccessTokenResponse struct {ErrCode int `json:"errcode"`ErrMsg string `json:"errmsg"`AccessToken string `json:"access_token"`ExpiresIn int `json:"expires_in"`}
调用逻辑
package cronimport ("encoding/json""fmt""io""log""net/http""time")// GetAccessToken 返回缓存的access_tokenfunc GetAccessToken() string {tokenMutex.RLock()defer tokenMutex.RUnlock()return accessToken}// StartTokenRefresher 启动access_token刷新定时器func StartTokenRefresher(corpID string, secret string) {refreshToken(corpID, secret)}func refreshToken(corpID string, secret string) {url := fmt.Sprintf("%s?corpid=%s&corpsecret=%s", WxTokenURL, corpID, secret)resp, err := http.Get(url)if err != nil {log.Printf("Failed to get access token: %v", err)return}defer resp.Body.Close()body, err := io.ReadAll(resp.Body)if err != nil {log.Printf("Failed to read response body: %v", err)return}var tokenResp AccessTokenResponseif err := json.Unmarshal(body, &tokenResp); err != nil {log.Printf("Failed to unmarshal response: %v", err)return}if tokenResp.ErrCode != 0 {log.Printf("Error getting access token: %s", tokenResp.ErrMsg)return}tokenMutex.Lock()accessToken = tokenResp.AccessTokentokenMutex.Unlock()// 在access_token过期前10秒再次刷新refreshTime := time.Duration(tokenResp.ExpiresIn-10) * time.Secondif timer != nil {timer.Stop()}timer = time.AfterFunc(refreshTime, func() {refreshToken(corpID, secret)})log.Printf("Access token refreshed, will refresh again in %v", refreshTime)}
3.3. 接收用户消息
代码示例
调用逻辑依赖的常量和结构体
package entity// 定义与 XML 数据结构对应的结构体type WxBizMsg struct {ToUserName string `xml:"ToUserName"`FromUserName string `xml:"FromUserName"`CreateTime int64 `xml:"CreateTime"`MsgType string `xml:"MsgType"`Content string `xml:"Content"`MsgId int64 `xml:"MsgId"`AgentID int64 `xml:"AgentID"`}
调用逻辑
package mainimport ("encoding/xml""log""example.com/play/wecom/api"wecomEntity "example.com/play/wecom/entity""example.com/play/wecom/wxbizmsgcrypt")const (WxToken = ""WxEncodingAESKey = ""WxCorpID = ""WxAppSecret = "")func ReceiveMessageHandler(w http.ResponseWriter, p *wecomEntity.WxBizURLParam, msgBodyStr []byte) {// 解密用户消息wxcpt := wxbizmsgcrypt.NewWXBizMsgCrypt(WxToken, WxEncodingAESKey, WxCorpID, wxbizmsgcrypt.XmlType)msgStr, cryptErr := wxcpt.DecryptMsg(p.MsgSignature, p.Timestamp, p.Nonce, msgBodyStr)if cryptErr != nil {http.Error(w, "DecryptMsg process failed", http.StatusUnauthorized)log.Println("DecryptMsg process failed", cryptErr)}var msg wecomEntity.WxBizMsgerr := xml.Unmarshal(msgStr, &msg)if err != nil {http.Error(w, "ParseMsg process failed", http.StatusInternalServerError)log.Println("ParseMsg process failed, err:", err)}log.Printf("ParseMsg process success, msg: %+v", msg)go func(wecomParam *wecomEntity.WxBizURLParam, wecomMsg *wecomEntity.WxBizMsg) {// 将用户的消息传入腾讯云大模型知识引擎appReply := CallTencentLKEApp(wecomMsg.FromUserName, wecomMsg.Content)log.Printf("Call TencentLKEApp done, msgID: %d, reply: %s", wecomMsg.MsgId, appReply)// 发送应用消息给用户// wecomResp, wecomErr := api.SendTextMessage(int(wecomMsg.AgentID), appReply, wecomMsg.FromUserName)wecomResp, wecomErr := api.SendMarkdownMessage(int(wecomMsg.AgentID), appReply, wecomMsg.FromUserName)if wecomErr != nil {log.Printf("SendBackMessage failed, msgID: %d, err: %v", wecomMsg.MsgId, wecomErr)return}log.Printf("SendBackMessage success, msgId: %d, resp: %v", wecomMsg.MsgId, *wecomResp)}(p, &msg)w.Write(nil)}
3.4. 接入对话接口
在解析出用户消息后,可以利用大模型知识引擎提供的对话接口把用户的消息发送给智能应用。由于企业微信无法流式返回响应,这里需要获取到完整的应用响应后再发回给用户。采用 Web Socket 或者 HTTP SSE 的方式均可。
代码示例
示例中采用的是 HTTP SSE 的方式与大模型知识引擎对接:
调用逻辑依赖的常量和结构体
package entityimport "encoding/json"const (TencentLKESSEUrl = "https://wss.lke.cloud.tencent.com/v1/qbot/chat/sse"AppKey = "")// SseSendEvent SSE 发送事件type SseSendEvent struct {ReqID string `json:"req_id"`Content string `json:"content"`BotAppKey string `json:"bot_app_key"`VisitorBizID string `json:"visitor_biz_id"`SessionID string `json:"session_id"`StreamingThrottle int `json:"streaming_throttle"`Timeout int64 `json:"timeout"`SystemRole string `json:"system_role"`IsEvaluateTest bool `json:"is_evaluate_test"` // 是否来自应用评测}// ChatResponse 回复事件type ChatResponse struct {ReqID string `json:"reqID"`Type string `json:"type,omitempty"`Payload ReplyEvent `json:"payload"`Error json.RawMessage `json:"error"`MessageID string `json:"message_id,omitempty"`}// ReplyEvent 回复/确认事件消息体type ReplyEvent struct {RequestID string `json:"request_id"`SessionID string `json:"session_id"`Content string `json:"content"`FromName string `json:"from_name"`FromAvatar string `json:"from_avatar"`RecordID string `json:"record_id"`RelatedRecordID string `json:"related_record_id"`Timestamp int64 `json:"timestamp"`IsFinal bool `json:"is_final"`IsFromSelf bool `json:"is_from_self"`CanRating bool `json:"can_rating"`IsEvil bool `json:"is_evil"`IsLLMGenerated bool `json:"is_llm_generated"`Knowledge []ReplyKnowledge `json:"knowledge"`ReplyMethod ReplyMethod `json:"reply_method"`TraceId string `json:"trace_id"`}// ReplyKnowledge 回复事件中的知识type ReplyKnowledge struct {ID string `json:"id"`Type uint32 `json:"type"`}// ReplyMethod 回复方式type ReplyMethod uint8// 回复方式const (ReplyMethodModel ReplyMethod = 1 // 大模型直接回复ReplyMethodBare ReplyMethod = 2 // 保守回复, 未知问题回复ReplyMethodRejected ReplyMethod = 3 // 拒答问题回复ReplyMethodEvil ReplyMethod = 4 // 敏感回复ReplyMethodPriorityQA ReplyMethod = 5 // 问答对直接回复, 已采纳问答对优先回复ReplyMethodGreeting ReplyMethod = 6 // 欢迎语回复ReplyMethodBusy ReplyMethod = 7 // 并发超限回复ReplyGlobalKnowledge ReplyMethod = 8 // 全局干预知识ReplyMethodTaskFlow ReplyMethod = 9 // 任务流程过程回复, 当历史记录中 task_flow.type = 0 时, 为大模型回复ReplyMethodTaskAnswer ReplyMethod = 10 // 任务流程答案回复ReplyMethodSearch ReplyMethod = 11 // 搜索引擎回复ReplyMethodDecorator ReplyMethod = 12 // 知识润色后回复ReplyMethodImage ReplyMethod = 13 // 图片理解回复ReplyMethodFile ReplyMethod = 14 // 实时文档回复ReplyMethodClarifyConfirm ReplyMethod = 15 // 澄清确认回复ReplyMethodWorkflow ReplyMethod = 16 // 工作流回复)
调用逻辑
package sseimport ("bufio""bytes""encoding/json""net/http""strings""time""log""example.com/play/tencentlke/entity")func CallTencentLKEApp(userName string, userInput string) string {// 调用SSE客户端sessionID := session.GetSessionID()event := &eventEntity.SseSendEvent{Content: userInput,BotAppKey: entity.AppKey,VisitorBizID: userName,SessionID: sessionID,StreamingThrottle: 1, // 节流控制,选填}log.Printf("Send SSE message, sessionID: %s, userInput: %s", sessionID, userInput)appOutput := sse.SendEvent(entity.TencentLKESSEUrl, event)log.Printf("Recv SSE message, sessionID: %s, appOutput: %s", sessionID, appOutput)return appOutput}func SendEvent(url string, event *entity.SseSendEvent) string {client := &http.Client{Timeout: 30 * time.Second,}payloadBytes, err := json.Marshal(&event)if err != nil {log.Println("JsonMarshal failed, err:", err)}req, err := http.NewRequest("POST", url, bytes.NewBuffer(payloadBytes))if err != nil {log.Println("HttpNewRequest failed, err:", err)return ""}resp, err := client.Do(req)if err != nil {log.Println("DoHttpRequest failed, err:", err)return ""}defer func() {err = resp.Body.Close()if err != nil {log.Println("HttpCloseRespBody failed, err:", err)}}()if resp.StatusCode != http.StatusOK {log.Println("Get http response failed, statusCode:", resp.StatusCode)return ""}// 读取数据scanner := bufio.NewScanner(resp.Body)for scanner.Scan() {line := scanner.Text()if strings.HasPrefix(line, "event:") {event := strings.TrimPrefix(line, "event:")if event != "reply" && event != "thought" && event != "token_stat" {log.Println("Inspect event:", line)}} else if strings.HasPrefix(line, "data:") {data := strings.TrimPrefix(line, "data:")result := &entity.ChatResponse{}err = json.Unmarshal([]byte(data), result)if err != nil {log.Println("JsonUnmarshal response failed, err:", err)return ""}if result.Type == "token_stat" {continue}if result.Type == "error" {log.Println("Get error from response:", data)continue}if result.Type == "reply" && result.Payload.IsFromSelf {log.Printf("Get streaming reply: %s, traceID: %s", result.Payload.Content, result.Payload.TraceId)continue}if result.Type == "reply" && result.Payload.IsFinal {log.Printf("Get final Reply: %s, traceID: %s", result.Payload.Content, result.Payload.TraceId)return strings.TrimSpace(result.Payload.Content)}} else if line == "" {continue} else {log.Println("Unknown data:", line)}}return ""}
3.5. 回复用户消息
在获取到智能应用的响应后,需要参考企业微信 官方文档 把响应发回给用户。需要注意的是,发送到企业微信的所在服务器需要在应用管理界面配置白名单。方法如下:在应用管理界面的开发者接口一栏,选择企业可信 IP 选项卡,点击配置,把服务器 IP 填入后,就可以正常调用企业微信的接口了。

代码示例
调用逻辑依赖的常量和结构体
package apiconst (WxMessageSendURL = "https://qyapi.weixin.qq.com/cgi-bin/message/send")// TextMessage represents a text message to be senttype TextMessage struct {ToUser string `json:"touser,omitempty"`ToParty string `json:"toparty,omitempty"`ToTag string `json:"totag,omitempty"`MsgType string `json:"msgtype"`AgentID int `json:"agentid"`Text TextBody `json:"text"`Safe int `json:"safe"`EnableIDTrans int `json:"enable_id_trans"`EnableDuplicateCheck int `json:"enable_duplicate_check"`DuplicateCheckInterval int `json:"duplicate_check_interval"`}// MarkdownMessage represents a markdown message to be senttype MarkdownMessage struct {ToUser string `json:"touser,omitempty"`ToParty string `json:"toparty,omitempty"`ToTag string `json:"totag,omitempty"`MsgType string `json:"msgtype"`AgentID int `json:"agentid"`Markdown TextBody `json:"markdown"`EnableDuplicateCheck int `json:"enable_duplicate_check"`DuplicateCheckInterval int `json:"duplicate_check_interval"`}// TextBody represents the content of a text messagetype TextBody struct {Content string `json:"content"`}// MessageResponse represents the response from the message sending APItype MessageResponse struct {ErrCode int `json:"errcode"`ErrMsg string `json:"errmsg"`InvalidUser string `json:"invaliduser"`InvalidParty string `json:"invalidparty"`InvalidTag string `json:"invalidtag"`UnlicensedUser string `json:"unlicenseduser"`MsgID string `json:"msgid"`ResponseCode string `json:"response_code"`}
调用逻辑
package apiimport ("bytes""encoding/json""fmt""io""log""net/http""example.com/play/wecom/cron")func SendTextMessage(agentID int, content string, userID string) (*MessageResponse, error) {msg := TextMessage{ToUser: userID,MsgType: "text",AgentID: agentID,Text: TextBody{Content: content},Safe: 0,EnableIDTrans: 0,EnableDuplicateCheck: 0,DuplicateCheckInterval: 1800,}payloadBytes, err := json.Marshal(msg)if err != nil {return nil, fmt.Errorf("failed to marshal message: %v", err)}return doRequest(payloadBytes)}func SendMarkdownMessage(agentID int, content string, userID string) (*MessageResponse, error) {msg := MarkdownMessage{ToUser: userID,MsgType: "markdown",AgentID: agentID,Markdown: TextBody{Content: content},EnableDuplicateCheck: 0,DuplicateCheckInterval: 1800,}payloadBytes, err := json.Marshal(msg)if err != nil {return nil, fmt.Errorf("failed to marshal message: %v", err)}return doRequest(payloadBytes)}func doRequest(payloadBytes []byte) (*MessageResponse, error) {accessToken := cron.GetAccessToken()url := fmt.Sprintf("%s?access_token=%s", WxMessageSendURL, accessToken)resp, err := http.Post(url, "application/json", bytes.NewBuffer(payloadBytes))if err != nil {return nil, fmt.Errorf("failed to send message: %v", err)}defer resp.Body.Close()body, err := io.ReadAll(resp.Body)if err != nil {return nil, fmt.Errorf("failed to read response body: %v", err)}var result MessageResponseif err := json.Unmarshal(body, &result); err != nil {return nil, fmt.Errorf("failed to unmarshal response: %v", err)}if result.ErrCode != 0 {log.Printf("Message sending failed with error: %s", result.ErrMsg)return &result, fmt.Errorf("API error: %s", result.ErrMsg)}return &result, nil}
4. 完整示例
目前完整代码示例支持的语言为 Golang 和 Python3,可直接基于代码示例进行二次开发:
Golang 示例(已附带编译后的二进制结果,可直接运行;如需编译,请参考内附 README 指引)
Python3 示例(建议 Python 版本 >= 3.10,请参考内附 README 指引)
通过企业微信群机器人接入
目前企业微信群机器人仅支持推送消息,这里与大模型知识引擎联动需要群机器人能够接收消息,因此大模型知识引擎暂不支持通过群机器人接入企业微信。企业微信补齐能力后将同步更新,敬请期待。