场景说明
除了 MQTT 标准协议定义的发布/订阅(Pub/Sub)消息模型外,消息队列 MQTT 版还支持点对点(Point to Point,简称 P2P)模式。
说明:
当前仅专业版提供 P2P 能力。
什么是 P2P 消息模式?
当需要发送消息给指定的一个消费者时,可以使用点对点消息模式。对比主要提供单个 Publisher 多个 Subscriber 的 1:N,或者 M:N 场景方案的 Pub/Sub 消息模式, P2P 消息模式提供了高效的点对点通信方案。
使用 P2P 模式时,Publisher 已经明确该消息的目标接收者信息,并且该消息只需要被指定的单一客户端消费。发送者发送消息时通过设置符合命名规则的 Topic 指定接收者,接收者无需提前订阅即可消费到该消息。
使用 P2P 模式不仅可以节省接收者注册订阅关系的成本,还可以降低推送延迟。
发布 P2P 消息
String firstTopic = ...;String targetClientId = ...;String topic = firstTopic + "/p2p/" + targetClientId;MqttMessage message = ...;mqttClient.publish(topic, message);
订阅 P2P 消息
接收消息的客户端无需任何订阅处理,正确初始化和连接到集群的目标客户端即可收到 P2P 消息。
Paho Golang SDK 使用注意事项:
// matchAndDispatch takes a channel of Message pointers as input and starts a go routine that// takes messages off the channel, matches them against the internal route list and calls the// associated callback (or the defaultHandler, if one exists and no other route matched). If// anything is sent down the stop channel the function will end.func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order bool, client *client) <-chan *PacketAndToken {var wg sync.WaitGroupackOutChan := make(chan *PacketAndToken) // Channel returned to caller; closed when messages channel closedvar ackInChan chan *PacketAndToken // ACKs generated by ackFunc get put onto this channelstopAckCopy := make(chan struct{}) // Closure requests stop of go routine copying ackInChan to ackOutChanackCopyStopped := make(chan struct{}) // Closure indicates that it is safe to close ackOutChangoRoutinesDone := make(chan struct{}) // closed on wg.Done()if order {ackInChan = ackOutChan // When order = true no go routines are used so safe to use one channel and close when done} else {// When order = false ACK messages are sent in go routines so ackInChan cannot be closed until all goroutines doneackInChan = make(chan *PacketAndToken)go func() { // go routine to copy from ackInChan to ackOutChan until stoppedfor {select {case a := <-ackInChan:ackOutChan <- acase <-stopAckCopy:close(ackCopyStopped) // Signal main go routine that it is safe to close ackOutChanfor {select {case <-ackInChan: // drain ackInChan to ensure all goRoutines can complete cleanly (ACK dropped)DEBUG.Println(ROU, "matchAndDispatch received acknowledgment after processing stopped (ACK dropped).")case <-goRoutinesDone:close(ackInChan) // Nothing further should be sent (a panic is probably better than silent failure)DEBUG.Println(ROU, "matchAndDispatch order=false copy goroutine exiting.")return}}}}}()}go func() { // Main go routine handling inbound messagesfor message := range messages {// DEBUG.Println(ROU, "matchAndDispatch received message")sent := falser.RLock()m := messageFromPublish(message, ackFunc(ackInChan, client.persist, message))var handlers []MessageHandlerfor e := r.routes.Front(); e != nil; e = e.Next() {if e.Value.(*route).match(message.TopicName) {if order {handlers = append(handlers, e.Value.(*route).callback)} else {hd := e.Value.(*route).callbackwg.Add(1)go func() {hd(client, m)if !client.options.AutoAckDisabled {m.Ack()}wg.Done()}()}sent = true}}if !sent {if r.defaultHandler != nil {if order {handlers = append(handlers, r.defaultHandler)} else {wg.Add(1)go func() {r.defaultHandler(client, m)if !client.options.AutoAckDisabled {m.Ack()}wg.Done()}()}} else {DEBUG.Println(ROU, "matchAndDispatch received message and no handler was available. Message will NOT be acknowledged.")}}r.RUnlock()for _, handler := range handlers {handler(client, m)if !client.options.AutoAckDisabled {m.Ack()}}// DEBUG.Println(ROU, "matchAndDispatch handled message")}if order {close(ackOutChan)} else { // Ensure that nothing further will be written to ackOutChan before closing itclose(stopAckCopy)<-ackCopyStoppedclose(ackOutChan)go func() {wg.Wait() // Note: If this remains running then the user has handlers that are not returningclose(goRoutinesDone)}()}DEBUG.Println(ROU, "matchAndDispatch exiting")}()return ackOutChan}
收到的消息可能不匹配任何一个 topic-filter,为了能正确处理这类消息,需要配置 defaultHandler:
ClientOptions:SetDefaultPublishHandler(messagePubHandler)
,使 fallback 逻辑符合预期。package mainimport ("fmt"mqtt "github.com/eclipse/paho.mqtt.golang""time")var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {fmt.Printf("Received message: %s from topic: %s\\n", msg.Payload(), msg.Topic())}var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {fmt.Println("Connected")}var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {fmt.Printf("Connect lost: %v", err)}func sub(client mqtt.Client) {topic := "home/room"// Message handler per topic-filtertoken := client.Subscribe(topic, 1, messagePubHandler)result := token.Wait()fmt.Printf("Subscribed to topic %s, %s", topic, result)}func publish(client mqtt.Client) {num := 10for i := 0; i < num; i++ {text := fmt.Sprintf("Message %d", i)token := client.Publish("home/test", 0, false, text)token.Wait()time.Sleep(time.Second)}}func main() {// Acquire your instance access point from MQTT Consolevar broker = "mqtt-xxx.mqtt.tencenttdmq.com"var port = 1883opts := mqtt.NewClientOptions()opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))opts.SetClientID("mqttGolangClient")opts.SetUsername("YOUR-USERNAME")opts.SetPassword("YOUR-PASSWORD")// Need to configure defaultHandler to make P2P message fallback properlyopts.SetDefaultPublishHandler(messagePubHandler)opts.OnConnect = connectHandleropts.OnConnectionLost = connectLostHandlerclient := mqtt.NewClient(opts)if token := client.Connect(); token.Wait() && token.Error() != nil {panic(token.Error())}sub(client)publish(client)time.Sleep(time.Minute * 3)}