操作场景
本文以调用 tRPC-Go Pulsar SDK 为例,介绍通过 tRPC 框架实现消息收发的操作过程,帮助您更好地理解使用插件在 TDMQ Pulsar 进行消息收发的完整过程。
插件介绍
插件优势
1. 封装后调用 Pulsar 服务能更加符合 RPC 开发者的习惯;
2. 插件在执行生产消费逻辑时会添加消息 RPC 的相关元数据信息,这样能够使用 RPC 原生的可观测性轨迹链路,更有利于开发调试定位问题;
3. 使用 RPC 封装后的插件,能够复用 RPC 原生拦截器,配置,监控等功能,提高业务代码的开发效率。
实现逻辑
tRPC Pulsar 插件的主要实现逻辑为:
1. 声明新传输协议 Pulsar protocol;
2. 实现 Pulsar protocol 传输
clientTransport.RoundTrip
接口。在客户端使用 Pulsar 协议生产消息时,transport 中会在内部初始化 Pulsar producer 客户端,将 tRPC 消息解析成 Pulsar 消息的格式,并使用该 producer 生产消息到配置文件对应的 Pulsar 集群,随后将 Pulsar 集群返回的消息 id 封装成 tRPC 的消息返回结构;3. 实现 Pulsar protocol 传输
serverTransport.ListenAndServe
接口。在 tRPC 初始化 NewServer 读取配置文件识别 service 的协议是 Pulsar 时,在启动的 transport ListenAndServe 就会使用配置文件中 Pulsar 相关配置创建 Pulsar consumer 对象。当用户调用 RegisterConsumerService
方法绑定 service handler 作为接收消息后的回调处理函数,并且执行 s.Serve
启动服务后,serverTransport 会不断调用 consumer.Receive()
方法,每拉取一条消息就新建一个 go 协程执行用户传入的 handler 回调函数。从实现逻辑中可以推理出如下结论: tRPC Pulsar 消费者/生产者没有在服务发现配置里注册,而是作为当前 server 的一个 service 对当前客户端提供服务,因此一个 tRPC 客户端中配置的一个 Pulsar service 仅对应 Pulsar 订阅中的一个消费者/topic 的一个生产者。不同 tRPC 客户端之间生产/消费资源相互隔离,不会出现 service 并发抢占的情况。
前提条件
完成资源创建与准备。
安装 Go 运行环境,推荐 1.23 以上版本 Go 客户端。
参考 tRPC-database 中的 Pulsar example,在业务代码中引用 trpc-go/trpc-database/pulsar tRPC 插件。
说明:
tRPC-Pulsar 推荐使用 0.3.2 及以上版本。
典型案例
生产消息
生产消息到指定的 topic:
# trpc_go.yamlclient:service:- name: trpc.pulsar.producer.servicetarget: pulsar://pulsar-address # 注意生产者的 target 需要有前缀 pulsar://protocol: pulsarplugins:log:default:- writer: consolelevel: infopulsar_logger:- writer: consolelevel: info- writer: file # pulsar 日志输出到文件 ./pulsar.loglevel: infowriter_config:filename: ./pulsar.logmax_age: 7 # max expire daysmax_size: 100 # size of local rolling file, unit MBmax_backups: 10 # max number of log filesmq:pulsar:clients:- name: pulsar-address # 和 service 中的 target 对应url: http://pulsar-xxxxxxxx.eap-xxxxxxxx.tdmq.ap-xx.qcloud.tencenttdmq.com:8080 # 参考 Go SDK,在控制台获取接入点地址topic: pulsar-xxxxxxxx/<namespaceName>/<topicName> # 参考 Go SDK 在控制台获取获取 topic 名称logger_name: pulsar_loggerauth_name: tokenauth_param: '{"token":"eyJrxxxxxx"}' # 参考 Go SDK 在控制台获取获取角色 token,注意角色 token 需要有生产权限
生产代码如下所示:
// Package main for examplepackage mainimport ("context""fmt""strconv""time""git.code.oa.com/trpc-go/trpc-database/pulsar""git.code.oa.com/trpc-go/trpc-go""git.code.oa.com/trpc-go/trpc-go/codec""git.code.oa.com/trpc-go/trpc-go/log""git.code.oa.com/trpc-go/trpc-go/plugin"pulsargo "github.com/apache/pulsar-client-go/pulsar")func main() {// 注意:plugin.Register 要在 trpc.NewServer 之前执行plugin.Register("pulsar_logger", log.DefaultLogFactory)trpc.NewServer()defer func() {// 注意:这里需要显式调用 pulsar.close() 方法,刷新所有缓冲区的消息投递到 broker,从而避免丢失消息if err := pulsar.Close(); err != nil {log.Errorf("close pulsar client producters err:%v", err)}}()proxy := pulsar.NewClientProxy("trpc.pulsar.producer.service")for i := 0; i < 3; i++ {ctx, _ := codec.WithCloneMessage(context.Background())msgIdByte, err := proxy.Produce(ctx, &pulsar.ProducerMessage{Payload: []byte("value" + strconv.Itoa(i)),Key: strconv.Itoa(i),})msgId, _ := pulsargo.DeserializeMessageID(msgIdByte)if err != nil {panic(err)}fmt.Printf("messageId = %s, key = %d, payload = %s\\n", msgId, i, "value"+strconv.Itoa(i))}}
消费消息(不重推场景)
从指定 topic 中消费消息。本示例主要针对消费者业务逻辑可以自行处理消息,不需要服务端进行重推的场景。
# trpc_go.yamlserver:service:- name: trpc.pulsar.consumer.service # 如果需要同一个 server 创建多个消费者,这里 name 需要不同address: pulsar-address # 与下方 mq.pulsar.servers.name 一致protocol: pulsar # 应用层协议,需要为 pulsarplugins:log:default:- writer: consolelevel: infopulsar_logger:- writer: consolelevel: info- writer: filelevel: infowriter_config:filename: ./pulsar.log # pulsar 日志输出到文件 ./pulsar.logmax_age: 7 # max expire daysmax_size: 100 # size of local rolling file, unit MBmax_backups: 10 # max number of log filesmq:pulsar:servers:- name: pulsar-addressurl: http://pulsar-xxxxxxxx.eap-xxxxxxxx.tdmq.ap-xx.qcloud.tencenttdmq.com:8080 # 参考 Go SDK,在控制台获取接入点地址subscription_name: sub-test # 订阅名称subscription_type: sharedqueue_size: 100 # 通常配置为 500/当前订阅下消费者总数,例如有 5 个消费者,就配置为 100topic: pulsar-xxxxxxxx/<namespaceName>/<topicName> # 参考 Go SDK,在控制台获取 topic 完整名称logger_name: pulsar_loggerauth_name: tokenauth_param: '{"token":"eyJrxxxxxx"}' # 参考 Go SDK,在控制台获取角色 token,注意角色 token 需要有消费权限
消费代码如下所示,注意在 handle 函数中需要业务代码捕获可能的异常,并且需要返回 nil。否则如果 handle 回调函数返回 err 时,会导致当前消息调用 unack 方法,在 60s 后重新推送给消费者。
package mainimport ("context""git.code.oa.com/trpc-go/trpc-database/pulsar""git.code.oa.com/trpc-go/trpc-go""git.code.oa.com/trpc-go/trpc-go/log""git.code.oa.com/trpc-go/trpc-go/plugin"pulsargo "github.com/apache/pulsar-client-go/pulsar")func main() {// 注意:plugin.Register 要在 trpc.NewServer 之前执行plugin.Register("pulsar_logger", log.DefaultLogFactory)s := trpc.NewServer()pulsar.RegisterConsumerService(s.Service("trpc.pulsar.consumer.service"), &consumer{}) // 需要和配置文件中的 service 同名// 这里不需要显式调用 close 相关方法,// 在关闭当前 trpc server 时候,trpc 会执行 context cancel() 保证内部的 pulsar consumer 优雅退出if err := s.Serve(); err != nil {panic(err)}}type consumer struct{}func (consumer) Handle(ctx context.Context, message *pulsar.Message) error {messageId, _ := pulsargo.DeserializeMessageID(message.MsgID)log.Infof("[key]: %s,\\t[payload]: %s,\\t[topic]: %s,\\t[msgId]: %s",message.Key, string(message.PayLoad), message.Topic, messageId)return nil}
消费消息(重推场景)
本示例主要针对消费者业务可能存在一些轮询或异步逻辑,对于当前消息无法马上进行处理,需要等待一段时间后重新消费的场景。
# trpc_go.yamlserver:service:- name: trpc.pulsar.consumer.service # 如果需要同一个 server 创建多个消费者,这里 name 需要不同address: pulsar-address # 与下方 mq.pulsar.servers.name 一致protocol: pulsar # 应用层协议,需要为 pulsarplugins:log:default:- writer: consolelevel: infopulsar_logger:- writer: consolelevel: info- writer: filelevel: infowriter_config:filename: ./pulsar.log # pulsar 日志输出到文件 ./pulsar.logmax_age: 7 # max expire daysmax_size: 100 # size of local rolling file, unit MBmax_backups: 10 # max number of log filesmq:pulsar:servers:- name: pulsar-addressurl: http://pulsar-xxxxxxxx.eap-xxxxxxxx.tdmq.ap-xx.qcloud.tencenttdmq.com:8080 # 参考 Go SDK,在控制台获取接入点地址subscription_name: sub-test # 订阅名称subscription_type: sharedqueue_size: 100 # 通常配置为 500/当前订阅下消费者总数,例如有 5 个消费者,就配置为 100topic: pulsar-xxxxxxxx/<namespaceName>/<topicName> # 参考 Go SDK,在控制台获取 topic 完整名称logger_name: pulsar_loggerauth_name: tokenauth_param: '{"token":"eyJrxxxxxx"}' # 参考 Go SDK,在控制台获取角色 token,注意由于涉及重推消息,角色 token 需要同时有生产和消费权限dlq_policy_rlq_topic: pulsar-xxxxxxxx/<namespaceName>/<topicName> # 参考 Go SDK,通常为 -RETRY 结尾,可参考 https://cloud.tencent.com/document/product/1179/49607#.E8.87.AA.E5.AE.9A.E4.B9.89.E5.8F.82.E6.95.B0.E8.AE.BE.E7.BD.AEdlq_policy_topic: pulsar-xxxxxxxx/<namespaceName>/<topicName> # 参考 Go SDK,通常为 -DLQ 结尾,可参考 https://cloud.tencent.com/document/product/1179/49607#.E8.87.AA.E5.AE.9A.E4.B9.89.E5.8F.82.E6.95.B0.E8.AE.BE.E7.BD.AEdlq_policy_max_deliveries: 10 # 重试最大次数,超过最大次数后,会丢弃到死信队列,不再主动消费dlq_random_producer_name_enabled: true # 兼容性考虑默认为 true
当前消费模式下共分为三个 topic:原始主题,重试主题,死信主题。
当前重推模式下,如果消息调用 reconsumeLater,消费者会 Ack 当前消息,并使用 reconsumeLater 中的延时时间,发送一条新的延迟消息到重试主题。如果客户端超过最大重试次数 dlq_policy_max_deliveries 配置下,还没有在 handle 函数中返回 nil 将消息 Ack,这条消息就会被发送到死信主题。默认情况下死信主题不会被订阅,需要用户手动处理。
// Package main for examplepackage mainimport ("context""time""git.code.oa.com/trpc-go/trpc-database/pulsar""git.code.oa.com/trpc-go/trpc-go""git.code.oa.com/trpc-go/trpc-go/log""git.code.oa.com/trpc-go/trpc-go/plugin"pulsargo "github.com/apache/pulsar-client-go/pulsar")func main() {// 注意:plugin.Register 要在 trpc.NewServer 之前执行plugin.Register("pulsar_logger", log.DefaultLogFactory)s := trpc.NewServer()pulsar.RegisterConsumerService(s.Service("trpc.pulsar.consumer.service"), &consumer{})// 这里不需要显式调用 close 相关方法,// 在关闭当前 trpc server 时候,trpc 会执行 context cancel() 保证内部的 pulsar consumer 优雅退出if err := s.Serve(); err != nil {panic(err)}}type consumer struct{}func (consumer) Handle(ctx context.Context, message *pulsar.Message) error {messageId, _ := pulsargo.DeserializeMessageID(message.MsgID)log.Infof("[key]: %s,\\t[payload]: %s,\\t[topic]: %s,\\t[id]: %s,\\t[reconsumeTimes]: %d", message.Key, string(message.PayLoad), message.Topic, messageId, message.ReconsumeTimes)message.ReconsumeLater(1 * time.Minute) // 延迟时间最小值 1s,最大值 10 天,不过由于客户端和服务端时间可能不一致,这里尽量配置延迟消息最长时间在 9 天之内// 如果不调用 ReconsumeLater,而是返回 nil,则当前消息被 Ack 并不再被重推return nil}
消息 Id 转换
目前插件返回的消息格式为 byte 数组,需要将消息转换成可读的消息格式,即 (ledgerId: entryId: partitionIdx),才能在控制台消息查询里查询到对应的消息轨迹,转换示例如下:
package mainimport ("context"pulsargo "github.com/apache/pulsar-client-go/pulsar" // 这里需要重命名,防止与 trpc pulsar 包名冲突)func main() {s := "08e3bf6710da9d02180020003001"data, _ := hex.DecodeString(s) // data 为插件返回的 msgId byte[] 格式messageId, _ := pulsargo.DeserializeMessageID(data)log.Println(fmt.Sprintf("messageId = %s", messageId)) // 这里输出的结果即为控制台消息查询中的 msgId}
SDK 版本相关
低版本风险隐患
tRPC Pulsar 0.1.3 及以下(changelog 中标注 deprecated)版本,对于极端场景异常处理覆盖不够全面,当 broker 升级重启或网络故障等场景下,有极小概率客户端和服务端重连过程出现异常,导致发送超时或者停止消费等问题。这里强烈建议您先将客户端升级到 0.3.2 新版本后,再进行 broker 集群版本的更新。
低版本隐患处理手段
高版本的客户端在 broker 升级过程中能够正常重连,基本做到业务无感知。但如果您的客户端 SDK 确实无法升级到新版本,建议您在 broker 集群升级后,关注客户端的日志输出及控制台的生产消费相关指标。