Go SDK

最近更新时间:2025-06-25 14:27:24

我的收藏

功能概述

Eclipse Paho MQTT Go Client 为 Eclipse Paho 项目下的 Go 语言版客户端库,该库能够连接到 MQTT Broker 以发布消息,订阅主题并接收已发布的消息,支持完全异步的操作模式。

云资源准备

请您先参见 创建资源 操作步骤完成云资源准备。

环境准备

安装 Eclipse Paho SDK:
MQTT 5.0
MQTT 3.1.1
go get github.com/eclipse/paho.golang

go get github.com/eclipse/paho.mqtt.golang

示例代码

MQTT 5.0
MQTT 3.1.1
package main

import (
"context"
"fmt"
"net/url"
"os"
"os/signal"
"strconv"
"syscall"
"time"

"github.com/eclipse/paho.golang/autopaho"
"github.com/eclipse/paho.golang/paho"
)

// 接入点, 从控制台获取
const accessPoint = "mqtt://mqtt-xxx-sh-public.mqtt.tencenttdmq.com:1883"

// Change this to something random if using a public test server
const clientID = "PahoGoClient"

const topic = "PahoGoTestTopic"

// 用户名, 从控制台获取
const username = "YOUR_USERNAME"

// 密码, 从控制台获取
var password = []byte("YOUR_PASSWORD")

func main() {
// App will run until cancelled by user (e.g. ctrl-c)
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()

// We will connect to the Eclipse test server (note that you may see messages that other users publish)
u, err := url.Parse(accessPoint)
if err != nil {
panic(err)
}

cliCfg := autopaho.ClientConfig{
ServerUrls: []*url.URL{u},
ConnectUsername: username,
ConnectPassword: password,

// Keepalive message should be sent every 60 seconds
KeepAlive: 60,

// CleanStartOnInitialConnection defaults to false. Setting this to true will clear the session on the first connection.
CleanStartOnInitialConnection: false,

// SessionExpiryInterval - Seconds that a session will survive after disconnection.
// It is important to set this because otherwise, any queued messages will be lost if the connection drops and
// the server will not queue messages while it is down. The specific setting will depend upon your needs
// (60 = 1 minute, 3600 = 1 hour, 86400 = one day, 259200 = 3 days)
// MQTT server permits expiry interval up to 3 days
SessionExpiryInterval: 60,
OnConnectionUp: func(cm *autopaho.ConnectionManager, connAck *paho.Connack) {
fmt.Println("mqtt connection up")
// Subscribing in the OnConnectionUp callback is recommended (ensures the subscription is reestablished if
// the connection drops)
if _, err := cm.Subscribe(context.Background(), &paho.Subscribe{
Subscriptions: []paho.SubscribeOptions{
{Topic: topic, QoS: 1},
},
}); err != nil {
fmt.Printf("failed to subscribe (%s). This is likely to mean no messages will be received.", err)
}
fmt.Println("mqtt subscription made")
},
OnConnectError: func(err error) { fmt.Printf("error whilst attempting connection: %s\\n", err) },
// eclipse/paho.golang/paho provides base mqtt functionality, the below config will be passed in for each connection
ClientConfig: paho.ClientConfig{
// If you are using QOS 1/2, then it's important to specify a client id (which must be unique)
ClientID: clientID,
// OnPublishReceived is a slice of functions that will be called when a message is received.
// You can write the function(s) yourself or use the supplied Router
OnPublishReceived: []func(paho.PublishReceived) (bool, error){
func(pr paho.PublishReceived) (bool, error) {
fmt.Printf("received message on topic %s; body: %s (retain: %t)\\n", pr.Packet.Topic, pr.Packet.Payload, pr.Packet.Retain)
return true, nil
}},
OnClientError: func(err error) { fmt.Printf("client error: %s\\n", err) },
OnServerDisconnect: func(d *paho.Disconnect) {
if d.Properties != nil {
fmt.Printf("server requested disconnect: %s\\n", d.Properties.ReasonString)
} else {
fmt.Printf("server requested disconnect; reason code: %d\\n", d.ReasonCode)
}
},
},
}

c, err := autopaho.NewConnection(ctx, cliCfg) // starts process; will reconnect until context cancelled
if err != nil {
panic(err)
}
// Wait for the connection to come up
if err = c.AwaitConnection(ctx); err != nil {
panic(err)
}

ticker := time.NewTicker(time.Second)
msgCount := 0
defer ticker.Stop()
for {
select {
case <-ticker.C:
msgCount++
// Publish a test message (use PublishViaQueue if you don't want to wait for a response)
if _, err = c.Publish(ctx, &paho.Publish{
QoS: 1,
Topic: topic,
Payload: []byte("TestMessage: " + strconv.Itoa(msgCount)),
}); err != nil {
if ctx.Err() == nil {
panic(err) // Publish will exit when context cancelled or if something went wrong
}
}
continue
case <-ctx.Done():
}
break
}

fmt.Println("signal caught - exiting")
<-c.Done() // Wait for clean shutdown (cancelling the context triggered the shutdown)
}

package main

import (
"fmt"
"log"
"os"
"time"

mqtt "github.com/eclipse/paho.mqtt.golang"
)
// 集群接入点, 从控制台获取
const accessPoint = "mqtt-xxx-sh-public.mqtt.tencenttdmq.com:1883"

// 用户名, 从控制台获取
const username = "your-username"

// 密码, 从控制台获取
const password = "your-password"

// 客户端标识, 需保持集群唯一, 一般为产品序列号, 车辆VIN码等
const clientId = "VIN0001"

// 发送消息主题
const topic = "testtopic/1"

// 订阅表达式
const topicFilter = "testtopic/#"

// 发送、订阅QoS
const qos = 1

var f mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Println("Received a message:")
fmt.Printf("TOPIC: %s\\n", msg.Topic())
fmt.Printf("MSG: %s\\n", msg.Payload())
}

func main() {
mqtt.DEBUG = log.New(os.Stdout, "", 0)
mqtt.ERROR = log.New(os.Stdout, "", 0)
opts := mqtt.NewClientOptions().AddBroker(accessPoint).SetClientID(clientId)

opts.SetKeepAlive(60 * time.Second)
opts.SetUsername(username)
opts.SetPassword(password)

// 设置消息回调处理函数
opts.SetDefaultPublishHandler(f)
opts.SetPingTimeout(1 * time.Second)

c := mqtt.NewClient(opts)
if token := c.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}

// 订阅主题
if token := c.Subscribe(topicFilter, qos, nil); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
os.Exit(1)
}

// 发布消息
token := c.Publish(topic, qos, false, "Hello World")
token.Wait()

time.Sleep(6 * time.Second)

// 取消订阅
if token := c.Unsubscribe(topicFilter); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
os.Exit(1)
}

// 断开连接
c.Disconnect(250)
time.Sleep(1 * time.Second)
}