应用接入企业微信

最近更新时间:2025-02-25 19:54:52

我的收藏

通过企业微信自建应用接入

说明:
创建企业微信自建应用需要在企业微信管理平台当中配置,且需要准备好服务器的 IP 地址和域名(如有)。

1. 创建应用

进入企业微信管理平台后,点击菜单栏中的应用管理,并选择自建应用中的创建应用。可参考如下截图:
企业微信管理平台界面示例
企业微信管理平台界面示例

说明:
1. “未命名企业”为本篇用作示例的企业名称,其涉及的公司 logo 、应用 logo 为 AI 生成的 logo 图片。如有侵权,请联系删除。
2. “TencentLKE 小助手”为本篇用作示例所创建的应用,后续图片展示都将以“TencentLKE 小助手”为例。
进入创建应用的界面后,需要先配置好自建应用的 logo 、名称、可见范围,此三项为必填项。
创建企业微信自建应用界面示例
创建企业微信自建应用界面示例

设置好之后,就会看到如下的界面:
企业微信自建应用配置界面示例
企业微信自建应用配置界面示例


2. 设置API接收

自建应用创建完成后,需要配置接收自建应用消息的服务器 IP 地址或域名。在应用配置界面的功能栏中找到接收消息,并点击设置 API 接收。可参考如下截图:
设置API接收示例
设置API接收示例

点击后会进入配置 API 接收地址的界面。从这一步开始,就需要服务器根据企业微信的配置指引,正确响应企业微信验证 URL 的请求,如下图所示:
注意:
1. 这里需要严格遵守企业微信提供的配置指引,填写的是接收用户消息的企业微信应用对应的服务器的 IP 地址或者域名,服务器需要自行开发符合企业微信要求的 API。
2. 接收用户消息的服务器 IP 地址或域名 ≠ 大模型知识引擎智能应用的体验链接
接收消息服务器配置示例
接收消息服务器配置示例

企业微信提供了加解密 SDK,这里主要应用到了 SDK 中提供的 VerifyURL 函数。除了上图示例中展示的 TokenEncodingAESKey 两个参数外,还需要 CorpID ,即企业 ID 。企业 ID 的获取方式为:点击菜单栏中的我的企业,可以在界面最下方找到企业 ID 。如下图所示:
企业ID获取示例
企业ID获取示例

在 HTTP 或 HTTPS 服务器按照企业微信的要求配置好后,点击保存即可。企业微信会发送一个验证请求到服务器上,服务器正确返回就能够保存成功。

3. 企业微信自建应用与大模型知识引擎联动

3.1. 创建大模型知识引擎智能应用

大模型知识引擎平台创建应用,完成应用配置、对话测试和发布。参考下图:
大模型知识引擎平台界面示例
大模型知识引擎平台界面示例

进入应用,获取 AppKey 以后,就可以以 API 的方式通过对话端接口与大模型知识引擎进行交互。参考下图:
获取智能应用AppKey示例
获取智能应用AppKey示例


3.2. 动态获取企业微信 AccessToken

在和企业微信的消息收发联动之前,需要注意的是:企业微信自建应用需要 AccessToken 来调用企业微信的各个API接口,如回复消息等。需要在先前配置的API服务器中实现 AccessToken 的动态获取。具体接口信息请参考企业微信官方文档。获取 AccessToken 时,还需要获取应用的 Secret ,需要前往应用的配置界面查看,如下图所示:


代码示例

调用逻辑依赖的常量和结构体
package cron

import (
"sync"
"time"
)

const (
WxTokenURL = "https://qyapi.weixin.qq.com/cgi-bin/gettoken"
)

var (
accessToken string
tokenMutex sync.RWMutex
timer *time.Timer
)

type AccessTokenResponse struct {
ErrCode int `json:"errcode"`
ErrMsg string `json:"errmsg"`
AccessToken string `json:"access_token"`
ExpiresIn int `json:"expires_in"`
}
调用逻辑
package cron

import (
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"time"
)

// GetAccessToken 返回缓存的access_token
func GetAccessToken() string {
tokenMutex.RLock()
defer tokenMutex.RUnlock()
return accessToken
}

// StartTokenRefresher 启动access_token刷新定时器
func StartTokenRefresher(corpID string, secret string) {
refreshToken(corpID, secret)
}

func refreshToken(corpID string, secret string) {
url := fmt.Sprintf("%s?corpid=%s&corpsecret=%s", WxTokenURL, corpID, secret)
resp, err := http.Get(url)
if err != nil {
log.Printf("Failed to get access token: %v", err)
return
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
log.Printf("Failed to read response body: %v", err)
return
}
var tokenResp AccessTokenResponse
if err := json.Unmarshal(body, &tokenResp); err != nil {
log.Printf("Failed to unmarshal response: %v", err)
return
}
if tokenResp.ErrCode != 0 {
log.Printf("Error getting access token: %s", tokenResp.ErrMsg)
return
}
tokenMutex.Lock()
accessToken = tokenResp.AccessToken
tokenMutex.Unlock()
// 在access_token过期前10秒再次刷新
refreshTime := time.Duration(tokenResp.ExpiresIn-10) * time.Second
if timer != nil {
timer.Stop()
}
timer = time.AfterFunc(refreshTime, func() {
refreshToken(corpID, secret)
})
log.Printf("Access token refreshed, will refresh again in %v", refreshTime)
}

3.3. 接收用户消息

接收用户消息的服务器需要与先前配置的 API 服务器相同,接收消息为企业微信向服务器发送 POST 请求传递用户发送给应用的信息。可参考企业微信官方文档,对用户发来的消息进行解密和处理。

代码示例

调用逻辑依赖的常量和结构体
package entity

// 定义与 XML 数据结构对应的结构体
type WxBizMsg struct {
ToUserName string `xml:"ToUserName"`
FromUserName string `xml:"FromUserName"`
CreateTime int64 `xml:"CreateTime"`
MsgType string `xml:"MsgType"`
Content string `xml:"Content"`
MsgId int64 `xml:"MsgId"`
AgentID int64 `xml:"AgentID"`
}
调用逻辑
package main

import (
"encoding/xml"
"log"

"example.com/play/wecom/api"
wecomEntity "example.com/play/wecom/entity"
"example.com/play/wecom/wxbizmsgcrypt"
)

const (
WxToken = ""
WxEncodingAESKey = ""
WxCorpID = ""
WxAppSecret = ""
)

func ReceiveMessageHandler(w http.ResponseWriter, p *wecomEntity.WxBizURLParam, msgBodyStr []byte) {
// 解密用户消息
wxcpt := wxbizmsgcrypt.NewWXBizMsgCrypt(WxToken, WxEncodingAESKey, WxCorpID, wxbizmsgcrypt.XmlType)
msgStr, cryptErr := wxcpt.DecryptMsg(p.MsgSignature, p.Timestamp, p.Nonce, msgBodyStr)
if cryptErr != nil {
http.Error(w, "DecryptMsg process failed", http.StatusUnauthorized)
log.Println("DecryptMsg process failed", cryptErr)
}
var msg wecomEntity.WxBizMsg
err := xml.Unmarshal(msgStr, &msg)
if err != nil {
http.Error(w, "ParseMsg process failed", http.StatusInternalServerError)
log.Println("ParseMsg process failed, err:", err)
}
log.Printf("ParseMsg process success, msg: %+v", msg)
go func(wecomParam *wecomEntity.WxBizURLParam, wecomMsg *wecomEntity.WxBizMsg) {
// 将用户的消息传入腾讯云大模型知识引擎
appReply := CallTencentLKEApp(wecomMsg.FromUserName, wecomMsg.Content)
log.Printf("Call TencentLKEApp done, msgID: %d, reply: %s", wecomMsg.MsgId, appReply)
// 发送应用消息给用户
// wecomResp, wecomErr := api.SendTextMessage(int(wecomMsg.AgentID), appReply, wecomMsg.FromUserName)
wecomResp, wecomErr := api.SendMarkdownMessage(int(wecomMsg.AgentID), appReply, wecomMsg.FromUserName)
if wecomErr != nil {
log.Printf("SendBackMessage failed, msgID: %d, err: %v", wecomMsg.MsgId, wecomErr)
return
}
log.Printf("SendBackMessage success, msgId: %d, resp: %v", wecomMsg.MsgId, *wecomResp)
}(p, &msg)
w.Write(nil)
}

3.4. 接入对话接口

在解析出用户消息后,可以利用大模型知识引擎提供的对话接口把用户的消息发送给智能应用。由于企业微信无法流式返回响应,这里需要获取到完整的应用响应后再发回给用户。采用 Web Socket 或者 HTTP SSE 的方式均可。

代码示例

示例中采用的是 HTTP SSE 的方式与大模型知识引擎对接:
调用逻辑依赖的常量和结构体
package entity

import "encoding/json"

const (
TencentLKESSEUrl = "https://wss.lke.cloud.tencent.com/v1/qbot/chat/sse"
AppKey = ""
)

// SseSendEvent SSE 发送事件
type SseSendEvent struct {
ReqID string `json:"req_id"`
Content string `json:"content"`
BotAppKey string `json:"bot_app_key"`
VisitorBizID string `json:"visitor_biz_id"`
SessionID string `json:"session_id"`
StreamingThrottle int `json:"streaming_throttle"`
Timeout int64 `json:"timeout"`
SystemRole string `json:"system_role"`
IsEvaluateTest bool `json:"is_evaluate_test"` // 是否来自应用评测
}

// ChatResponse 回复事件
type ChatResponse struct {
ReqID string `json:"reqID"`
Type string `json:"type,omitempty"`
Payload ReplyEvent `json:"payload"`
Error json.RawMessage `json:"error"`
MessageID string `json:"message_id,omitempty"`
}

// ReplyEvent 回复/确认事件消息体
type ReplyEvent struct {
RequestID string `json:"request_id"`
SessionID string `json:"session_id"`
Content string `json:"content"`
FromName string `json:"from_name"`
FromAvatar string `json:"from_avatar"`
RecordID string `json:"record_id"`
RelatedRecordID string `json:"related_record_id"`
Timestamp int64 `json:"timestamp"`
IsFinal bool `json:"is_final"`
IsFromSelf bool `json:"is_from_self"`
CanRating bool `json:"can_rating"`
IsEvil bool `json:"is_evil"`
IsLLMGenerated bool `json:"is_llm_generated"`
Knowledge []ReplyKnowledge `json:"knowledge"`
ReplyMethod ReplyMethod `json:"reply_method"`
TraceId string `json:"trace_id"`
}

// ReplyKnowledge 回复事件中的知识
type ReplyKnowledge struct {
ID string `json:"id"`
Type uint32 `json:"type"`
}

// ReplyMethod 回复方式
type ReplyMethod uint8

// 回复方式
const (
ReplyMethodModel ReplyMethod = 1 // 大模型直接回复
ReplyMethodBare ReplyMethod = 2 // 保守回复, 未知问题回复
ReplyMethodRejected ReplyMethod = 3 // 拒答问题回复
ReplyMethodEvil ReplyMethod = 4 // 敏感回复
ReplyMethodPriorityQA ReplyMethod = 5 // 问答对直接回复, 已采纳问答对优先回复
ReplyMethodGreeting ReplyMethod = 6 // 欢迎语回复
ReplyMethodBusy ReplyMethod = 7 // 并发超限回复
ReplyGlobalKnowledge ReplyMethod = 8 // 全局干预知识
ReplyMethodTaskFlow ReplyMethod = 9 // 任务流程过程回复, 当历史记录中 task_flow.type = 0 时, 为大模型回复
ReplyMethodTaskAnswer ReplyMethod = 10 // 任务流程答案回复
ReplyMethodSearch ReplyMethod = 11 // 搜索引擎回复
ReplyMethodDecorator ReplyMethod = 12 // 知识润色后回复
ReplyMethodImage ReplyMethod = 13 // 图片理解回复
ReplyMethodFile ReplyMethod = 14 // 实时文档回复
ReplyMethodClarifyConfirm ReplyMethod = 15 // 澄清确认回复
ReplyMethodWorkflow ReplyMethod = 16 // 工作流回复
)
调用逻辑
package sse

import (
"bufio"
"bytes"
"encoding/json"
"net/http"
"strings"
"time"

"log"

"example.com/play/tencentlke/entity"
)

func CallTencentLKEApp(userName string, userInput string) string {
// 调用SSE客户端
sessionID := session.GetSessionID()
event := &eventEntity.SseSendEvent{
Content: userInput,
BotAppKey: entity.AppKey,
VisitorBizID: userName,
SessionID: sessionID,
StreamingThrottle: 1, // 节流控制,选填
}
log.Printf("Send SSE message, sessionID: %s, userInput: %s", sessionID, userInput)
appOutput := sse.SendEvent(entity.TencentLKESSEUrl, event)
log.Printf("Recv SSE message, sessionID: %s, appOutput: %s", sessionID, appOutput)
return appOutput
}

func SendEvent(url string, event *entity.SseSendEvent) string {
client := &http.Client{
Timeout: 30 * time.Second,
}
payloadBytes, err := json.Marshal(&event)
if err != nil {
log.Println("JsonMarshal failed, err:", err)
}
req, err := http.NewRequest("POST", url, bytes.NewBuffer(payloadBytes))
if err != nil {
log.Println("HttpNewRequest failed, err:", err)
return ""
}
resp, err := client.Do(req)
if err != nil {
log.Println("DoHttpRequest failed, err:", err)
return ""
}
defer func() {
err = resp.Body.Close()
if err != nil {
log.Println("HttpCloseRespBody failed, err:", err)
}
}()
if resp.StatusCode != http.StatusOK {
log.Println("Get http response failed, statusCode:", resp.StatusCode)
return ""
}
// 读取数据
scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
line := scanner.Text()
if strings.HasPrefix(line, "event:") {
event := strings.TrimPrefix(line, "event:")
if event != "reply" && event != "thought" && event != "token_stat" {
log.Println("Inspect event:", line)
}
} else if strings.HasPrefix(line, "data:") {
data := strings.TrimPrefix(line, "data:")
result := &entity.ChatResponse{}
err = json.Unmarshal([]byte(data), result)
if err != nil {
log.Println("JsonUnmarshal response failed, err:", err)
return ""
}
if result.Type == "token_stat" {
continue
}
if result.Type == "error" {
log.Println("Get error from response:", data)
continue
}
if result.Type == "reply" && result.Payload.IsFromSelf {
log.Printf("Get streaming reply: %s, traceID: %s", result.Payload.Content, result.Payload.TraceId)
continue
}
if result.Type == "reply" && result.Payload.IsFinal {
log.Printf("Get final Reply: %s, traceID: %s", result.Payload.Content, result.Payload.TraceId)
return strings.TrimSpace(result.Payload.Content)
}
} else if line == "" {
continue
} else {
log.Println("Unknown data:", line)
}
}
return ""
}


3.5. 回复用户消息

在获取到智能应用的响应后,需要参考企业微信 官方文档 把响应发回给用户。需要注意的是,发送到企业微信的所在服务器需要在应用管理界面配置白名单。方法如下:在应用管理界面的开发者接口一栏,选择企业可信 IP 选项卡,点击配置,把服务器 IP 填入后,就可以正常调用企业微信的接口了。
配置企业可信IP示例
配置企业可信IP示例


代码示例

调用逻辑依赖的常量和结构体
package api

const (
WxMessageSendURL = "https://qyapi.weixin.qq.com/cgi-bin/message/send"
)

// TextMessage represents a text message to be sent
type TextMessage struct {
ToUser string `json:"touser,omitempty"`
ToParty string `json:"toparty,omitempty"`
ToTag string `json:"totag,omitempty"`
MsgType string `json:"msgtype"`
AgentID int `json:"agentid"`
Text TextBody `json:"text"`
Safe int `json:"safe"`
EnableIDTrans int `json:"enable_id_trans"`
EnableDuplicateCheck int `json:"enable_duplicate_check"`
DuplicateCheckInterval int `json:"duplicate_check_interval"`
}

// MarkdownMessage represents a markdown message to be sent
type MarkdownMessage struct {
ToUser string `json:"touser,omitempty"`
ToParty string `json:"toparty,omitempty"`
ToTag string `json:"totag,omitempty"`
MsgType string `json:"msgtype"`
AgentID int `json:"agentid"`
Markdown TextBody `json:"markdown"`
EnableDuplicateCheck int `json:"enable_duplicate_check"`
DuplicateCheckInterval int `json:"duplicate_check_interval"`
}

// TextBody represents the content of a text message
type TextBody struct {
Content string `json:"content"`
}

// MessageResponse represents the response from the message sending API
type MessageResponse struct {
ErrCode int `json:"errcode"`
ErrMsg string `json:"errmsg"`
InvalidUser string `json:"invaliduser"`
InvalidParty string `json:"invalidparty"`
InvalidTag string `json:"invalidtag"`
UnlicensedUser string `json:"unlicenseduser"`
MsgID string `json:"msgid"`
ResponseCode string `json:"response_code"`
}
调用逻辑
package api

import (
"bytes"
"encoding/json"
"fmt"
"io"
"log"
"net/http"

"example.com/play/wecom/cron"
)

func SendTextMessage(agentID int, content string, userID string) (*MessageResponse, error) {
msg := TextMessage{
ToUser: userID,
MsgType: "text",
AgentID: agentID,
Text: TextBody{Content: content},
Safe: 0,
EnableIDTrans: 0,
EnableDuplicateCheck: 0,
DuplicateCheckInterval: 1800,
}
payloadBytes, err := json.Marshal(msg)
if err != nil {
return nil, fmt.Errorf("failed to marshal message: %v", err)
}
return doRequest(payloadBytes)
}

func SendMarkdownMessage(agentID int, content string, userID string) (*MessageResponse, error) {
msg := MarkdownMessage{
ToUser: userID,
MsgType: "markdown",
AgentID: agentID,
Markdown: TextBody{Content: content},
EnableDuplicateCheck: 0,
DuplicateCheckInterval: 1800,
}
payloadBytes, err := json.Marshal(msg)
if err != nil {
return nil, fmt.Errorf("failed to marshal message: %v", err)
}
return doRequest(payloadBytes)
}

func doRequest(payloadBytes []byte) (*MessageResponse, error) {
accessToken := cron.GetAccessToken()
url := fmt.Sprintf("%s?access_token=%s", WxMessageSendURL, accessToken)
resp, err := http.Post(url, "application/json", bytes.NewBuffer(payloadBytes))
if err != nil {
return nil, fmt.Errorf("failed to send message: %v", err)
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read response body: %v", err)
}
var result MessageResponse
if err := json.Unmarshal(body, &result); err != nil {
return nil, fmt.Errorf("failed to unmarshal response: %v", err)
}
if result.ErrCode != 0 {
log.Printf("Message sending failed with error: %s", result.ErrMsg)
return &result, fmt.Errorf("API error: %s", result.ErrMsg)
}
return &result, nil
}

4. 完整示例

目前完整代码示例支持的语言为 Golang 和 Python3,可直接基于代码示例进行二次开发:
Golang 示例(已附带编译后的二进制结果,可直接运行;如需编译,请参考内附 README 指引)
Python3 示例(建议 Python 版本 >= 3.10,请参考内附 README 指引)

通过企业微信群机器人接入

目前企业微信群机器人仅支持推送消息,这里与大模型知识引擎联动需要群机器人能够接收消息,因此大模型知识引擎暂不支持通过群机器人接入企业微信。企业微信补齐能力后将同步更新,敬请期待。