有奖:语音产品征文挑战赛火热进行中> HOT

背景

TDMQ CKafka 是一个分布式流处理平台,用于构建实时数据管道和流式应用程序。它提供了高吞吐量、低延迟、可伸缩性和容错性等特性。
本文着重介绍 tRpc-Go-Kafka 客户端的关键参数和最佳实践,以及常见问题。

调优实践

tRPC-GO-Kafka 封装开源 Kakfa SDK,利用 tRPC-Go 拦截器等功能,接入 tRPC-Go 生态。因此最佳实践参见 Sarama Go

常见问题

生产者问题

1. 使用 CKafka 生产消息时,出现错误 Message contents does not match its CRC
err:type:framework, code:141, msg:kafka client transport SendMessage: kafka server: Message contents does not match its CRC.
插件默认启用了 gzip 压缩,在 target 上加上参数 compression=none 关闭压缩即可。
target: kafka://ip1:port1?compression=none
2. 生产时同一用户需要有序,如何配置?
客户端增加参数 partitioner,可选 random(默认),roundrobin,hash(按 key 分区)。
target: kafka://ip1:port1?clientid=xxx&partitioner=hash
3. 如何异步生产?
客户端增加参数 async=1
target: kafka://ip1:port1,ip2:port2?clientid=xxx&async=1
4. 如何使用异步生产写数据回调?
需要在代码中重写异步生产写数据的成功/失败的回调函数,例如:
import (
"git.code.oa.com/vicenteli/trpc-database/kafka"
)

func init() {
// 重写默认的异步生产写数据错误回调
kafka.AsyncProducerErrorCallback = func(err error, topic string, key, value []byte, headers []sarama.RecordHeader) {
// do something if async producer occurred error.
}

// 重写默认的异步生产写数据成功回调
kafka.AsyncProducerSuccCallback = func(topic string, key, value []byte, headers []sarama.RecordHeader) {
// do something if async producer succeed.
}
}

消费者问题

1. 如果消费时 Handle 返回了非 nil 会发生什么?
会休眠 3s 后重新消费,不建议这么做,因为返回错误会导致无限重试消费,失败的应该由业务做重试逻辑。
2. 使用 ckafka 消费消息时,出现错误 client has run out of available brokers to talk to
kafka server transport: consume fail:kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
优先检查 brokers 是否可达,然后检查支持的 kafka 客户端版本,尝试在配置文件 address 中加上参数例如 version=0.10.2.0
address: ip1:port1?topics=topic1&group=my-group&version=0.10.2.0
3. 当多个生产者生产时,部分生产者建立连接失败会影响正常的生产者超时?
请更新至 v0.2.18。 低版本插件在创建生产者时先加锁,再建立连接,建立连接结束后释放锁。如果存在一部分异常生产者建立连接耗时很长,就会导致其他正常生产请求在获取生产者的时候加锁失败,最终超时。此行为在 v0.2.18 已经修复。
4. 消费消息时,提示 The provider group protocol type is incompatible with the other members
kafka server transport: consume fail:kafka server: The provider group protocol type is incompatible with the other members.
同一消费者组的客户端重分组策略不一样,可修改参数 strategy,可选:sticky(默认),range,roundrobin。
address: ip1:port1?topics=topic12&group=my-group&strategy=range
5. 如何注入自定义配置(远端配置)?
如果需要代码中指定配置,先在trpc_go.yaml中配置 fake_address,然后配合 kafka.RegisterAddrConfig 方法注入,trpc_go.yaml配置如下:
address: fake_address
在服务启动前,注入自定义配置:
func main() {
s := trpc.NewServer()
// 使用自定义 addr,需在启动 server 前注入
cfg := kafka.GetDefaultConfig()
cfg.Brokers = []string{"127.0.0.1:9092"}
cfg.Topics = []string{"test_topic"}
kafka.RegisterAddrConfig("fake_address", cfg)
kafka.RegisterKafkaConsumerService(s.Service("fake_address"), &Consumer{})

s.Serve()
}
6. 如何获取底层 sarama 的上下文信息?
通过 kafka.GetRawSaramaContext 可以获取底层 sarama ConsumerGroupSessionConsumerGroupClaim。但是此处暴露这两个接口只是方便用户做监控日志,应该只使用其读方法,调用任何写方法在这里都是未定义行为,可能造成未知结果。
// RawSaramaContext 存放 sarama ConsumerGroupSession 和 ConsumerGroupClaim
// 导出此结构体是为了方便用户实现监控,提供的内容仅用于读,调用任何写方法属于未定义行为
type RawSaramaContext struct {
Session sarama.ConsumerGroupSession
Claim sarama.ConsumerGroupClaim
}
使用实例:
func (Consumer) Handle(ctx context.Context, msg *sarama.ConsumerMessage) error {
if rawContext, ok := kafka.GetRawSaramaContext(ctx); ok {
log.Infof("InitialOffset: %d", rawContext.Claim.InitialOffset())
}
// ...
return nil
}