TDMQ CKafka 是一个分布式流处理平台,用于构建实时数据管道和流式应用程序。它提供了高吞吐量、低延迟、可伸缩性和容错性等特性。
本文着重介绍 tRpc-Go-Kafka 客户端的关键参数和最佳实践,以及常见问题。


tRPC-GO-Kafka 封装开源 Kakfa SDK,利用 tRPC-Go 拦截器等功能,接入 tRPC-Go 生态。因此最佳实践参见 Sarama Go



1. 使用 CKafka 生产消息时,出现错误 Message contents does not match its CRC
err:type:framework, code:141, msg:kafka client transport SendMessage: kafka server: Message contents does not match its CRC.
插件默认启用了 gzip 压缩,在 target 上加上参数 compression=none 关闭压缩即可。
target: kafka://ip1:port1?compression=none
2. 生产时同一用户需要有序,如何配置?
客户端增加参数 partitioner,可选 random(默认),roundrobin,hash(按 key 分区)。
target: kafka://ip1:port1?clientid=xxx&partitioner=hash
3. 如何异步生产?
客户端增加参数 async=1
target: kafka://ip1:port1,ip2:port2?clientid=xxx&async=1
4. 如何使用异步生产写数据回调?
import (

func init() {
// 重写默认的异步生产写数据错误回调
kafka.AsyncProducerErrorCallback = func(err error, topic string, key, value []byte, headers []sarama.RecordHeader) {
// do something if async producer occurred error.

// 重写默认的异步生产写数据成功回调
kafka.AsyncProducerSuccCallback = func(topic string, key, value []byte, headers []sarama.RecordHeader) {
// do something if async producer succeed.


1. 如果消费时 Handle 返回了非 nil 会发生什么?
会休眠 3s 后重新消费,不建议这么做,因为返回错误会导致无限重试消费,失败的应该由业务做重试逻辑。
2. 使用 ckafka 消费消息时,出现错误 client has run out of available brokers to talk to
kafka server transport: consume fail:kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
优先检查 brokers 是否可达,然后检查支持的 kafka 客户端版本,尝试在配置文件 address 中加上参数例如 version=
address: ip1:port1?topics=topic1&group=my-group&version=
3. 当多个生产者生产时,部分生产者建立连接失败会影响正常的生产者超时?
请更新至 v0.2.18。 低版本插件在创建生产者时先加锁,再建立连接,建立连接结束后释放锁。如果存在一部分异常生产者建立连接耗时很长,就会导致其他正常生产请求在获取生产者的时候加锁失败,最终超时。此行为在 v0.2.18 已经修复。
4. 消费消息时,提示 The provider group protocol type is incompatible with the other members
kafka server transport: consume fail:kafka server: The provider group protocol type is incompatible with the other members.
同一消费者组的客户端重分组策略不一样,可修改参数 strategy,可选:sticky(默认),range,roundrobin。
address: ip1:port1?topics=topic12&group=my-group&strategy=range
5. 如何注入自定义配置(远端配置)?
如果需要代码中指定配置,先在trpc_go.yaml中配置 fake_address,然后配合 kafka.RegisterAddrConfig 方法注入,trpc_go.yaml配置如下:
address: fake_address
func main() {
s := trpc.NewServer()
// 使用自定义 addr,需在启动 server 前注入
cfg := kafka.GetDefaultConfig()
cfg.Brokers = []string{""}
cfg.Topics = []string{"test_topic"}
kafka.RegisterAddrConfig("fake_address", cfg)
kafka.RegisterKafkaConsumerService(s.Service("fake_address"), &Consumer{})

6. 如何获取底层 sarama 的上下文信息?
通过 kafka.GetRawSaramaContext 可以获取底层 sarama ConsumerGroupSessionConsumerGroupClaim。但是此处暴露这两个接口只是方便用户做监控日志,应该只使用其读方法,调用任何写方法在这里都是未定义行为,可能造成未知结果。
// RawSaramaContext 存放 sarama ConsumerGroupSession 和 ConsumerGroupClaim
// 导出此结构体是为了方便用户实现监控,提供的内容仅用于读,调用任何写方法属于未定义行为
type RawSaramaContext struct {
Session sarama.ConsumerGroupSession
Claim sarama.ConsumerGroupClaim
func (Consumer) Handle(ctx context.Context, msg *sarama.ConsumerMessage) error {
if rawContext, ok := kafka.GetRawSaramaContext(ctx); ok {
log.Infof("InitialOffset: %d", rawContext.Claim.InitialOffset())
// ...
return nil