操作场景
本文以调用 tRPC-Go TDMQ SDK 为例介绍通过 tRPC 框架实现消息收发的操作过程,帮助您更好地理解使用插件在 TDMQ Pulsar 进行消息收发的完整过程。
插件介绍
TDMQ 插件后续仅增加稳定性需求更新,基本不进行新特性迭代开发。同时 tRPC-Go Pulsar 插件对于客户端的优雅退出会更完善,对于消息的重试逻辑上会更加灵活。
注意:
插件优势
1. 封装后调用 Pulsar 服务能更加符合 RPC 开发者的习惯。
2. 插件在执行生产消费逻辑时会添加消息 RPC 的相关元数据信息,这样能够使用 RPC 原生的可观测性轨迹链路,更有利于开发调试定位问题。
3. 使用 RPC 封装后的插件,能够复用 RPC 原生拦截器,配置,监控等功能,提高业务代码的开发效率。
实现逻辑
tRPC TDMQ 插件的主要实现逻辑为:
1. 声明新传输协议 TDMQ protocol。
2. 实现 TDMQ protocol 传输
clientTransport.RoundTrip 接口。在客户端使用 TDMQ 协议生产消息时,transport 中会在内部初始化 Pulsar producer 客户端,将 tRPC 消息解析成 Pulsar 消息的格式,并使用该 producer 生产消息到配置文件对应的 Pulsar 集群,随后将 Pulsar 集群返回的消息 id 封装成 tRPC 的消息返回结构。3. 实现 TDMQ protocol 传输
serverTransport.ListenAndServe接口。在 tRPC 初始化 NewServer 读取配置文件识别 service 的协议是 TDMQ 时,在启动的 transport ListenAndServe 就会使用配置文件中 TDMQ 相关配置创建 Pulsar consumer 对象。当用户调用 RegisterConsumerService 方法绑定 service handler 作为接收消息后的回调处理函数,并且执行 s.Serve 启动服务后,serverTransport 会不断调用 consumer.Receive() 方法,每拉取一条消息就新建一个 go 协程执行用户传入的 handler 回调函数。从实现逻辑中可以推理出如下结论:
tRPC TDMQ 消费者/生产者没有在服务发现配置里注册,而是作为当前 server 的一个 service 对当前客户端提供服务,因此一个 tRPC 客户端中配置的一个 TDMQ service 仅对应 Pulsar 订阅中的一个消费者/topic 的一个生产者。不同 tRPC 客户端之间生产/消费资源相互隔离,不会出现 service 并发抢占的情况。
前提条件
完成资源创建与准备。
安装 Go 运行环境,推荐 1.23 以上版本 Go 客户端。
参考 tRPC-database 中的 TDMQ example,在业务代码中引用 trpc-go/trpc-database/tdmq tRPC 插件。
说明:
tRPC-TDMQ 推荐使用 0.3.2 及以上版本。
典型案例
生产消息
生产消息到指定的 topic:
client:service:- name: trpc.tdmq.producer.servicetarget: tdmq://tdmq-address # 注意生产者的 target 需要有前缀 tdmq://plugins:log:default:- writer: consolelevel: infotdmq_logger:- writer: consolelevel: info- writer: file # pulsar 日志输出到文件 ./tdmq.loglevel: infowriter_config:filename: ./tdmq.logmax_age: 7 # max expire daysmax_size: 100 # size of local rolling file, unit MBmax_backups: 10 # max number of log filesmq:tdmq:clients:- name: tdmq-address # 和 service 中的 target 对应url: http://pulsar-xxxxxxxx.eap-xxxxxxxx.tdmq.ap-xx.qcloud.tencenttdmq.com:8080 # 参考 Go SDK,在控制台获取接入点地址topic: pulsar-xxxxxxxx/<namespaceName>/<topicName> # 参考 Go SDK 在控制台获取获取 topic 名称logger_name: tdmq_loggerauth_name: tokenauth_params: >- # 参考 Go SDK 在控制台获取获取角色 token,注意角色 token 需要有生产权限{"token":"eyJrxxxxxxx"}
生产代码如下所示:
// Package main comment.package mainimport ("context""fmt""strconv""git.code.oa.com/trpc-go/trpc-database/tdmq""git.code.oa.com/trpc-go/trpc-go""git.code.oa.com/trpc-go/trpc-go/codec""git.code.oa.com/trpc-go/trpc-go/log""git.code.oa.com/trpc-go/trpc-go/plugin")func main() {plugin.Register("tdmq_logger", log.DefaultLogFactory)trpc.NewServer()proxy := tdmq.NewClientProxy("trpc.tdmq.producer.service")for i := 0; i < 3; i++ {ctx, _ := codec.WithCloneMessage(context.Background())msgIdByte, err := proxy.Produce(ctx, &tdmq.ProducerMessage{Payload: []byte("value" + strconv.Itoa(i)),Key: strconv.Itoa(i),})msgId, _ := tdmq.DeserializeMessageID(msgIdByte)if err != nil {panic(err)}fmt.Printf("messageId = %s, key = %d, payload = %s\\n", msgId, i, "value"+strconv.Itoa(i))}}
消费消息(不重推场景)
从指定 topic 中消费消息。本示例主要针对消费者业务逻辑可以自行处理消息,不需要服务端进行重推的场景。
# trpc_go.yamlserver:service:- name: trpc.tdmq.consumer.serviceprotocol: tdmqaddress: tdmq-addressplugins: # 插件配置log: # 日志配置default: # 默认日志的配置,可支持多输出- writer: console # 控制台标准输出 默认level: info # 标准输出日志的级别tdmq_logger:- writer: console # tdmq 日志输出到控制台level: info- writer: file # tdmq 日志输出到文件 ./tdmq.loglevel: infowriter_config:filename: ./tdmq.logmax_age: 7 # max expire daysmax_size: 100 # size of local rolling file, unit MBmax_backups: 10 # max number of log filesmq:tdmq:servers:- name: tdmq-address # 和 service 中的 address 对应url: http://pulsar-xxxxxxxx.eap-xxxxxxxx.tdmq.ap-xx.qcloud.tencenttdmq.com:8080 # 参考 Go SDK,在控制台获取接入点地址topic: pulsar-xxxxxxxx/<namespaceName>/<topicName> # 参考 Go SDK,在控制台获取 topic 完整名称subscription_name: sub-test # 订阅名称subscription_type: sharedqueue_size: 100 # 通常配置为 500/当前订阅下消费者总数,如果有 5 个消费者,就配置为 100logger_name: tdmq_loggerauth_name: tokenauth_params: '{"token":"eyJrxxxxxxx"}' # 参考 Go SDK,在控制台获取角色 token,注意角色 token 需要有消费权限
消费代码如下所示,注意在 handle 函数中需要业务代码捕获可能的异常,并且需要返回 nil。否则如果 handle 回调函数返回 err 时,会导致当前消息调用 unack 方法,在 60s 后重新推送给消费者。
// Package main comment.package mainimport ("context""git.code.oa.com/trpc-go/trpc-go""git.code.oa.com/trpc-go/trpc-go/log""git.code.oa.com/trpc-go/trpc-go/plugin""git.code.oa.com/trpc-go/trpc-database/tdmq")func main() {// 注意:plugin.Register 要在 trpc.NewServer 之前执行plugin.Register("tdmq_logger", log.DefaultLogFactory)s := trpc.NewServer()tdmq.RegisterConsumerService(s.Service("trpc.tdmq.consumer.service"), &Consumer{})s.Serve()}type Consumer struct {}func (Consumer) Handle(ctx context.Context, message *tdmq.Message) error {messageId, _ := tdmq.DeserializeMessageID(message.MsgID)log.Infof("[key]: %s,\\t[payload]: %s,\\t[topic]: %s,\\t[id]: %s",message.Key, string(message.PayLoad), message.Topic, messageId)// 返回 nil 才会确认消费成功return nil}
消费消息(重推场景)
本示例主要针对消费者业务可能存在一些轮询或异步逻辑,对于当前消息无法马上进行处理,需要等待一段时间后重新消费的场景。
# trpc_go.yamlserver:service:- name: trpc.tdmq.consumer.serviceprotocol: tdmqaddress: tdmq-addressplugins: # 插件配置log: # 日志配置default: # 默认日志的配置,可支持多输出- writer: console # 控制台标准输出 默认level: info # 标准输出日志的级别tdmq_logger:- writer: console # tdmq 日志输出到控制台level: info- writer: file # tdmq 日志输出到文件 ./tdmq.loglevel: infowriter_config:filename: ./tdmq.logmax_age: 7 # max expire daysmax_size: 100 # size of local rolling file, unit MBmax_backups: 10 # max number of log filesmq:tdmq:servers:- name: tdmq-address # 和 service 中的 address 对应url: http://pulsar-xxxxxxxx.eap-xxxxxxxx.tdmq.ap-xx.qcloud.tencenttdmq.com:8080 # 参考 Go SDK,在控制台获取接入点地址topic: pulsar-xxxxxxxx/<namespaceName>/<topicName> # 参考 Go SDK,在控制台获取 topic 完整名称subscription_name: sub-test # 订阅名称subscription_type: sharedenable_retry: 1dlq_policy_topic: pulsar-xxxxxxxx/<namespaceName>/<topicName> # 参考 Go SDK,通常为 -DLQ 结尾,可参考 https://cloud.tencent.com/document/product/1179/49607#.E8.87.AA.E5.AE.9A.E4.B9.89.E5.8F.82.E6.95.B0.E8.AE.BE.E7.BD.AEdlq_policy_max_deliveries: 3 # 重试最大次数,超过最大次数后,会丢弃到死信队列,不再主动消费dlq_random_producer_name_enabled: truenack_redelivery_delay: 1m # 每次重推的间隔时间,建议不要超过 10 min,防止出现过多的消息空洞queue_size: 100 # 通常配置为 500/当前订阅下消费者总数,如果有 5 个消费者,就配置为 100logger_name: tdmq_loggerauth_name: tokenauth_params: '{"token":"eyJrxxxxxxx"}' # 参考 Go SDK,在控制台获取角色 token,注意由于涉及重推消息,角色 token 需要同时有生产和消费权限
当前消费模式下共分为两个 topic:原始主题,死信主题。
当前重推模式下,如果 handle 返回 err,服务端会在
nack_redelivery_delay 后重新推送该消息,并且增加消息中的 redeliverCount 次数。当消息超过 dlq_policy_max_deliveries handle 函数还是返回 err 时,这条消息就会被发送到死信主题。默认情况下死信主题不会被订阅,需要用户手动处理。// Package main comment.package mainimport ("context""errors""git.code.oa.com/trpc-go/trpc-go""git.code.oa.com/trpc-go/trpc-go/log""git.code.oa.com/trpc-go/trpc-go/plugin""git.code.oa.com/trpc-go/trpc-database/tdmq")func main() {// 注意:plugin.Register 要在 trpc.NewServer 之前执行plugin.Register("tdmq_logger", log.DefaultLogFactory)s := trpc.NewServer()tdmq.RegisterConsumerService(s.Service("trpc.tdmq.consumer.service"), &Consumer{})s.Serve()}type Consumer struct {}func (Consumer) Handle(ctx context.Context, message *tdmq.Message) error {messageId, _ := tdmq.DeserializeMessageID(message.MsgID)log.Infof("[key]: %s,\\t[payload]: %s,\\t[topic]: %s,\\t[id]: %s,\\t [redeliverCount]: %d",message.Key, string(message.PayLoad), message.Topic, messageId, message.RedeliveryCount)// 如果业务处理失败需要重推,返回 err;否则返回 nilreturn errors.New("mock handle err")}
消息 Id 转换
目前插件返回的消息格式为 byte 数组,需要将消息转换成可读的消息格式,即 (ledgerId: entryId: partitionIdx),才能在控制台消息查询里查询到对应的消息轨迹,转换示例如下:
package mainimport ("context""git.code.oa.com/trpc-go/trpc-database/tdmq")func main() {s := "08e3bf6710da9d02180020003001"data, _ := hex.DecodeString(s) // data 为插件返回的 msgId byte[] 格式messageId, _ := tdmq.DeserializeMessageID(data)log.Println(fmt.Sprintf("messageId = %s", messageId)) // 这里输出的结果即为控制台消息查询中的 msgId}
SDK 版本相关
低版本风险隐患
tRPC TDMQ 0.2.9 及以下(changelog 中标注 deprecated)版本,对于极端场景异常处理覆盖不够全面,当 broker 升级重启或网络故障等场景下,有极小概率客户端和服务端重连过程出现异常,导致发送超时或者停止消费等问题。这里强烈建议您先将客户端升级到 0.3.2 新版本后,再进行 broker 集群版本的更新。
低版本隐患处理手段
高版本的客户端在 broker 升级过程中能够正常重连,基本做到业务无感知。但如果您的客户端 SDK 确实无法升级到新版本,建议您在 broker 集群升级后,关注客户端的日志输出及控制台的生产消费相关指标。