配置点对点订阅

最近更新时间:2025-04-28 15:17:22

我的收藏

场景说明

除了 MQTT 标准协议定义的发布/订阅(Pub/Sub)消息模型外,消息队列 MQTT 版还支持点对点(Point to Point,简称 P2P)模式。
说明:
当前仅专业版提供 P2P 能力。

什么是 P2P 消息模式?

当需要发送消息给指定的一个消费者时,可以使用点对点消息模式。对比主要提供单个 Publisher 多个 Subscriber 的 1:N,或者 M:N 场景方案的 Pub/Sub 消息模式, P2P 消息模式提供了高效的点对点通信方案。
使用 P2P 模式时,Publisher 已经明确该消息的目标接收者信息,并且该消息只需要被指定的单一客户端消费。发送者发送消息时通过设置符合命名规则的 Topic 指定接收者,接收者无需提前订阅即可消费到该消息。
使用 P2P 模式不仅可以节省接收者注册订阅关系的成本,还可以降低推送延迟。

发布 P2P 消息

String firstTopic = ...;
String targetClientId = ...;
String topic = firstTopic + "/p2p/" + targetClientId;
MqttMessage message = ...;
mqttClient.publish(topic, message);

订阅 P2P 消息

接收消息的客户端无需任何订阅处理,正确初始化和连接到集群的目标客户端即可收到 P2P 消息。

Paho Golang SDK 使用注意事项:

Paho Golang SDK 在收到消息后, 会根据收到消息的 Topic 进行 matchAndDispatch
// matchAndDispatch takes a channel of Message pointers as input and starts a go routine that
// takes messages off the channel, matches them against the internal route list and calls the
// associated callback (or the defaultHandler, if one exists and no other route matched). If
// anything is sent down the stop channel the function will end.
func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order bool, client *client) <-chan *PacketAndToken {
var wg sync.WaitGroup
ackOutChan := make(chan *PacketAndToken) // Channel returned to caller; closed when messages channel closed
var ackInChan chan *PacketAndToken // ACKs generated by ackFunc get put onto this channel

stopAckCopy := make(chan struct{}) // Closure requests stop of go routine copying ackInChan to ackOutChan
ackCopyStopped := make(chan struct{}) // Closure indicates that it is safe to close ackOutChan
goRoutinesDone := make(chan struct{}) // closed on wg.Done()
if order {
ackInChan = ackOutChan // When order = true no go routines are used so safe to use one channel and close when done
} else {
// When order = false ACK messages are sent in go routines so ackInChan cannot be closed until all goroutines done
ackInChan = make(chan *PacketAndToken)
go func() { // go routine to copy from ackInChan to ackOutChan until stopped
for {
select {
case a := <-ackInChan:
ackOutChan <- a
case <-stopAckCopy:
close(ackCopyStopped) // Signal main go routine that it is safe to close ackOutChan
for {
select {
case <-ackInChan: // drain ackInChan to ensure all goRoutines can complete cleanly (ACK dropped)
DEBUG.Println(ROU, "matchAndDispatch received acknowledgment after processing stopped (ACK dropped).")
case <-goRoutinesDone:
close(ackInChan) // Nothing further should be sent (a panic is probably better than silent failure)
DEBUG.Println(ROU, "matchAndDispatch order=false copy goroutine exiting.")
return
}
}
}
}
}()
}

go func() { // Main go routine handling inbound messages
for message := range messages {
// DEBUG.Println(ROU, "matchAndDispatch received message")
sent := false
r.RLock()
m := messageFromPublish(message, ackFunc(ackInChan, client.persist, message))
var handlers []MessageHandler
for e := r.routes.Front(); e != nil; e = e.Next() {
if e.Value.(*route).match(message.TopicName) {
if order {
handlers = append(handlers, e.Value.(*route).callback)
} else {
hd := e.Value.(*route).callback
wg.Add(1)
go func() {
hd(client, m)
if !client.options.AutoAckDisabled {
m.Ack()
}
wg.Done()
}()
}
sent = true
}
}
if !sent {
if r.defaultHandler != nil {
if order {
handlers = append(handlers, r.defaultHandler)
} else {
wg.Add(1)
go func() {
r.defaultHandler(client, m)
if !client.options.AutoAckDisabled {
m.Ack()
}
wg.Done()
}()
}
} else {
DEBUG.Println(ROU, "matchAndDispatch received message and no handler was available. Message will NOT be acknowledged.")
}
}
r.RUnlock()
for _, handler := range handlers {
handler(client, m)
if !client.options.AutoAckDisabled {
m.Ack()
}
}
// DEBUG.Println(ROU, "matchAndDispatch handled message")
}
if order {
close(ackOutChan)
} else { // Ensure that nothing further will be written to ackOutChan before closing it
close(stopAckCopy)
<-ackCopyStopped
close(ackOutChan)
go func() {
wg.Wait() // Note: If this remains running then the user has handlers that are not returning
close(goRoutinesDone)
}()
}
DEBUG.Println(ROU, "matchAndDispatch exiting")
}()
return ackOutChan
}
收到的消息可能不匹配任何一个 topic-filter,为了能正确处理这类消息,需要配置 defaultHandler:ClientOptions:SetDefaultPublishHandler(messagePubHandler)使 fallback 逻辑符合预期
package main

import (
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"time"
)

var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("Received message: %s from topic: %s\\n", msg.Payload(), msg.Topic())
}

var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
fmt.Println("Connected")
}

var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
fmt.Printf("Connect lost: %v", err)
}

func sub(client mqtt.Client) {
topic := "home/room"
// Message handler per topic-filter
token := client.Subscribe(topic, 1, messagePubHandler)
result := token.Wait()
fmt.Printf("Subscribed to topic %s, %s", topic, result)
}

func publish(client mqtt.Client) {
num := 10
for i := 0; i < num; i++ {
text := fmt.Sprintf("Message %d", i)
token := client.Publish("home/test", 0, false, text)
token.Wait()
time.Sleep(time.Second)
}
}

func main() {
// Acquire your instance access point from MQTT Console
var broker = "mqtt-xxx.mqtt.tencenttdmq.com"
var port = 1883
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
opts.SetClientID("mqttGolangClient")
opts.SetUsername("YOUR-USERNAME")
opts.SetPassword("YOUR-PASSWORD")
// Need to configure defaultHandler to make P2P message fallback properly
opts.SetDefaultPublishHandler(messagePubHandler)
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectLostHandler
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
sub(client)
publish(client)
time.Sleep(time.Minute * 3)
}