功能概述
Eclipse Paho MQTT Go Client 为 Eclipse Paho 项目下的 Go 语言版客户端库,该库能够连接到 MQTT Broker 以发布消息,订阅主题并接收已发布的消息,支持完全异步的操作模式。
云资源准备
环境准备
安装 Eclipse Paho SDK:
go get github.com/eclipse/paho.golang
go get github.com/eclipse/paho.mqtt.golang
示例代码
package mainimport ("context""fmt""net/url""os""os/signal""strconv""syscall""time""github.com/eclipse/paho.golang/autopaho""github.com/eclipse/paho.golang/paho")// 接入点, 从控制台获取const accessPoint = "mqtt://mqtt-xxx-sh-public.mqtt.tencenttdmq.com:1883"// Change this to something random if using a public test serverconst clientID = "PahoGoClient"const topic = "PahoGoTestTopic"// 用户名, 从控制台获取const username = "YOUR_USERNAME"// 密码, 从控制台获取var password = []byte("YOUR_PASSWORD")func main() {// App will run until cancelled by user (e.g. ctrl-c)ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)defer stop()// We will connect to the Eclipse test server (note that you may see messages that other users publish)u, err := url.Parse(accessPoint)if err != nil {panic(err)}cliCfg := autopaho.ClientConfig{ServerUrls: []*url.URL{u},ConnectUsername: username,ConnectPassword: password,// Keepalive message should be sent every 60 secondsKeepAlive: 60,// CleanStartOnInitialConnection defaults to false. Setting this to true will clear the session on the first connection.CleanStartOnInitialConnection: false,// SessionExpiryInterval - Seconds that a session will survive after disconnection.// It is important to set this because otherwise, any queued messages will be lost if the connection drops and// the server will not queue messages while it is down. The specific setting will depend upon your needs// (60 = 1 minute, 3600 = 1 hour, 86400 = one day, 259200 = 3 days)// MQTT server permits expiry interval up to 3 daysSessionExpiryInterval: 60,OnConnectionUp: func(cm *autopaho.ConnectionManager, connAck *paho.Connack) {fmt.Println("mqtt connection up")// Subscribing in the OnConnectionUp callback is recommended (ensures the subscription is reestablished if// the connection drops)if _, err := cm.Subscribe(context.Background(), &paho.Subscribe{Subscriptions: []paho.SubscribeOptions{{Topic: topic, QoS: 1},},}); err != nil {fmt.Printf("failed to subscribe (%s). This is likely to mean no messages will be received.", err)}fmt.Println("mqtt subscription made")},OnConnectError: func(err error) { fmt.Printf("error whilst attempting connection: %s\\n", err) },// eclipse/paho.golang/paho provides base mqtt functionality, the below config will be passed in for each connectionClientConfig: paho.ClientConfig{// If you are using QOS 1/2, then it's important to specify a client id (which must be unique)ClientID: clientID,// OnPublishReceived is a slice of functions that will be called when a message is received.// You can write the function(s) yourself or use the supplied RouterOnPublishReceived: []func(paho.PublishReceived) (bool, error){func(pr paho.PublishReceived) (bool, error) {fmt.Printf("received message on topic %s; body: %s (retain: %t)\\n", pr.Packet.Topic, pr.Packet.Payload, pr.Packet.Retain)return true, nil}},OnClientError: func(err error) { fmt.Printf("client error: %s\\n", err) },OnServerDisconnect: func(d *paho.Disconnect) {if d.Properties != nil {fmt.Printf("server requested disconnect: %s\\n", d.Properties.ReasonString)} else {fmt.Printf("server requested disconnect; reason code: %d\\n", d.ReasonCode)}},},}c, err := autopaho.NewConnection(ctx, cliCfg) // starts process; will reconnect until context cancelledif err != nil {panic(err)}// Wait for the connection to come upif err = c.AwaitConnection(ctx); err != nil {panic(err)}ticker := time.NewTicker(time.Second)msgCount := 0defer ticker.Stop()for {select {case <-ticker.C:msgCount++// Publish a test message (use PublishViaQueue if you don't want to wait for a response)if _, err = c.Publish(ctx, &paho.Publish{QoS: 1,Topic: topic,Payload: []byte("TestMessage: " + strconv.Itoa(msgCount)),}); err != nil {if ctx.Err() == nil {panic(err) // Publish will exit when context cancelled or if something went wrong}}continuecase <-ctx.Done():}break}fmt.Println("signal caught - exiting")<-c.Done() // Wait for clean shutdown (cancelling the context triggered the shutdown)}
package mainimport ("fmt""log""os""time"mqtt "github.com/eclipse/paho.mqtt.golang")// 集群接入点, 从控制台获取const accessPoint = "mqtt-xxx-sh-public.mqtt.tencenttdmq.com:1883"// 用户名, 从控制台获取const username = "your-username"// 密码, 从控制台获取const password = "your-password"// 客户端标识, 需保持集群唯一, 一般为产品序列号, 车辆VIN码等const clientId = "VIN0001"// 发送消息主题const topic = "testtopic/1"// 订阅表达式const topicFilter = "testtopic/#"// 发送、订阅QoSconst qos = 1var f mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {fmt.Println("Received a message:")fmt.Printf("TOPIC: %s\\n", msg.Topic())fmt.Printf("MSG: %s\\n", msg.Payload())}func main() {mqtt.DEBUG = log.New(os.Stdout, "", 0)mqtt.ERROR = log.New(os.Stdout, "", 0)opts := mqtt.NewClientOptions().AddBroker(accessPoint).SetClientID(clientId)opts.SetKeepAlive(60 * time.Second)opts.SetUsername(username)opts.SetPassword(password)// 设置消息回调处理函数opts.SetDefaultPublishHandler(f)opts.SetPingTimeout(1 * time.Second)c := mqtt.NewClient(opts)if token := c.Connect(); token.Wait() && token.Error() != nil {panic(token.Error())}// 订阅主题if token := c.Subscribe(topicFilter, qos, nil); token.Wait() && token.Error() != nil {fmt.Println(token.Error())os.Exit(1)}// 发布消息token := c.Publish(topic, qos, false, "Hello World")token.Wait()time.Sleep(6 * time.Second)// 取消订阅if token := c.Unsubscribe(topicFilter); token.Wait() && token.Error() != nil {fmt.Println(token.Error())os.Exit(1)}// 断开连接c.Disconnect(250)time.Sleep(1 * time.Second)}