前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >golang源码分析:sarama kafka client(part I:生产者)

golang源码分析:sarama kafka client(part I:生产者)

作者头像
golangLeetcode
发布2022-08-02 19:27:41
4950
发布2022-08-02 19:27:41
举报
文章被收录于专栏:golang算法架构leetcode技术php

https://github.com/Shopify/sarama 是一个纯go实现的kafka客户端,是gopher学习kafka一个很好的资料。说实话sarama的代码组织很烂,密密麻麻一堆源码文件都在一个目录,让人无从下手,下面列出了一部分:

代码语言:javascript
复制
examples
mocks
tools //基于客户端,实现的kafka客户端工具
    tools/kafka-producer-performance
    tools/kafka-console-producer
    tools/kafka-console-partitionconsumer
    tools/kafka-console-consumer
vagrant  //启动虚拟机的配置文件
acl_xx.go //权限相关的逻辑
add_partitions_to_txn_response.go
add_partitions_to_txn_request.go
add_offsets_to_txn_response.go
add_offsets_to_txn_request.go
admin.go
alter_xx.go  //修改相关的逻辑
async_producer.go
balance_strategy.go
broker.go
client.go
config.go
consumer_group.go
consumer_group_members.go
consumer.go
create_xx.go
fetch_request.go
delete_xx.go
describe_xx.go
list_xx.go
offset_xx.go
partitioner.go
sarama.go
sync_producer.go
produce_request.go
produce_response.go
utils.go

其实我们重点关注下面几个文件就好了

代码语言:javascript
复制
admin.go
async_producer.go
broker.go
client.go
consumer_group.go
consumer.go
sync_producer.go

还是从例子开始:

生产者

代码语言:javascript
复制
package main

import (
  "fmt"
  "log"

  "github.com/Shopify/sarama"
)

func main() {
  // 构建 生产者
  // 生成 生产者配置文件
  config := sarama.NewConfig()
  // 设置生产者 消息 回复等级 0 1 all
  config.Producer.RequiredAcks = sarama.NoResponse //sarama.WaitForAll
  //kafka server: Replication-factor is invalid.
  // 设置生产者 成功 发送消息 将在什么 通道返回
  config.Producer.Return.Successes = true
  // 设置生产者 发送的分区
  config.Producer.Partitioner = sarama.NewRandomPartitioner
  // 构建 消息
  msg := &sarama.ProducerMessage{}
  msg.Topic = "test"
  msg.Value = sarama.StringEncoder("123")

  // 连接 kafka
  producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
  if err != nil {
    log.Print(err)
    return
  }
  defer producer.Close()
  // 发送消息
  message, offset, err := producer.SendMessage(msg)
  if err != nil {
    log.Println(err)
    return
  }
  fmt.Println(message, " ", offset)
}

1,创建一个生产者:sarama.NewSyncProducer

代码在sync_producer.go中

代码语言:javascript
复制
func NewSyncProducer(addrs []string, config *Config) (SyncProducer, error) {
  if err := verifyProducerConfig(config); err != nil {
  }
  p, err := NewAsyncProducer(addrs, config)
  return newSyncProducerFromAsyncProducer(p.(*asyncProducer)), nil
}

先校验参数,然后调用NewAsyncProducer生成一个producer,将它转化成syncProducer

代码语言:javascript
复制
type syncProducer struct
 {
  producer *asyncProducer
  wg       sync.WaitGroup
}

可以看到syncProducer 本质上还是一个asyncProducer,通过waitGroup的方式来实现的同步。

NewAsyncProducer的代码实现在async_producer.go中:

代码语言:javascript
复制
func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) {
  client, err := NewClient(addrs, conf)
  if err != nil {
    return nil, err
  }
  return newAsyncProducer(client)
}

首先创建了一个client,Client是对kafka broker连接的一个包装,生产者消费者都通过client和kafka broker进行通信的。代码位于client.go中

代码语言:javascript
复制
// Client is a generic Kafka client. It manages connections to one or more Kafka brokers.
// You MUST call Close() on a client to avoid leaks, it will not be garbage-collected
// automatically when it passes out of scope. It is safe to share a client amongst many
// users, however Kafka will process requests from a single client strictly in serial,
// so it is generally more efficient to use the default one client per producer/consumer.
type Client interface {
  // Config returns the Config struct of the client. This struct should not be
  // altered after it has been created.
  Config() *Config

  // Controller returns the cluster controller broker. It will return a
  // locally cached value if it's available. You can call RefreshController
  // to update the cached value. Requires Kafka 0.10 or higher.
  Controller() (*Broker, error)

  // RefreshController retrieves the cluster controller from fresh metadata
  // and stores it in the local cache. Requires Kafka 0.10 or higher.
  RefreshController() (*Broker, error)

  // Brokers returns the current set of active brokers as retrieved from cluster metadata.
  Brokers() []*Broker

  // Broker returns the active Broker if available for the broker ID.
  Broker(brokerID int32) (*Broker, error)

  // Topics returns the set of available topics as retrieved from cluster metadata.
  Topics() ([]string, error)

  // Partitions returns the sorted list of all partition IDs for the given topic.
  Partitions(topic string) ([]int32, error)

  // WritablePartitions returns the sorted list of all writable partition IDs for
  // the given topic, where "writable" means "having a valid leader accepting
  // writes".
  WritablePartitions(topic string) ([]int32, error)

  // Leader returns the broker object that is the leader of the current
  // topic/partition, as determined by querying the cluster metadata.
  Leader(topic string, partitionID int32) (*Broker, error)

  // Replicas returns the set of all replica IDs for the given partition.
  Replicas(topic string, partitionID int32) ([]int32, error)

  // InSyncReplicas returns the set of all in-sync replica IDs for the given
  // partition. In-sync replicas are replicas which are fully caught up with
  // the partition leader.
  InSyncReplicas(topic string, partitionID int32) ([]int32, error)

  // OfflineReplicas returns the set of all offline replica IDs for the given
  // partition. Offline replicas are replicas which are offline
  OfflineReplicas(topic string, partitionID int32) ([]int32, error)

  // RefreshBrokers takes a list of addresses to be used as seed brokers.
  // Existing broker connections are closed and the updated list of seed brokers
  // will be used for the next metadata fetch.
  RefreshBrokers(addrs []string) error

  // RefreshMetadata takes a list of topics and queries the cluster to refresh the
  // available metadata for those topics. If no topics are provided, it will refresh
  // metadata for all topics.
  RefreshMetadata(topics ...string) error

  // GetOffset queries the cluster to get the most recent available offset at the
  // given time (in milliseconds) on the topic/partition combination.
  // Time should be OffsetOldest for the earliest available offset,
  // OffsetNewest for the offset of the message that will be produced next, or a time.
  GetOffset(topic string, partitionID int32, time int64) (int64, error)

  // Coordinator returns the coordinating broker for a consumer group. It will
  // return a locally cached value if it's available. You can call
  // RefreshCoordinator to update the cached value. This function only works on
  // Kafka 0.8.2 and higher.
  Coordinator(consumerGroup string) (*Broker, error)

  // RefreshCoordinator retrieves the coordinator for a consumer group and stores it
  // in local cache. This function only works on Kafka 0.8.2 and higher.
  RefreshCoordinator(consumerGroup string) error

  // InitProducerID retrieves information required for Idempotent Producer
  InitProducerID() (*InitProducerIDResponse, error)

  // Close shuts down all broker connections managed by this client. It is required
  // to call this function before a client object passes out of scope, as it will
  // otherwise leak memory. You must close any Producers or Consumers using a client
  // before you close the client.
  Close() error

  // Closed returns true if the client has already had Close called on it
  Closed() bool
}

然后创建了一个asyncProducer对象

代码语言:javascript
复制
type asyncProducer struct {
  client Client
  conf   *Config

  errors                    chan *ProducerError
  input, successes, retries chan *ProducerMessage
  inFlight                  sync.WaitGroup

  brokers    map[*Broker]*brokerProducer
  brokerRefs map[*brokerProducer]int
  brokerLock sync.Mutex

  txnmgr *transactionManager
}

transactionManager是它的成员

代码语言:javascript
复制
type transactionManager struct {
  producerID      int64
  producerEpoch   int16
  sequenceNumbers map[string]int32
  mutex           sync.Mutex
}

创建完producer对象后起了两个协程

代码语言:javascript
复制
func newAsyncProducer(client Client) (AsyncProducer, error) {
....
  go withRecover(p.dispatcher)
  go withRecover(p.retryHandler)
}

重点关注下peoducer的input成员

代码语言:javascript
复制
input, successes, retries chan *ProducerMessage

dispatcher这个协程,不断消费input里面的消息,然后发送给topicProducer的input chanel,这样我们发送消息的时候,值需要不断往peoducer的input里面发送就可以了。

代码语言:javascript
复制
func (p *asyncProducer) dispatcher() {
    for msg := range p.input {
        handler = p.newTopicProducer(msg.Topic)
        handler <- msg
    }
}

这里面分两步:

1,获取topicProducer,返回topicProducer的input chanel

2,向这个chanel里发送消息。

代码语言:javascript
复制
// one per topic
// partitions messages, then dispatches them by partition
type topicProducer struct {
  parent *asyncProducer
  topic  string
  input  <-chan *ProducerMessage

  breaker     *breaker.Breaker
  handlers    map[int32]chan<- *ProducerMessage
  partitioner Partitioner
}

每一个topicProducer同样会起一个协程

代码语言:javascript
复制
func (p *asyncProducer) newTopicProducer(topic string) chan<- *ProducerMessage 
   input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
   go withRecover(tp.dispatch)
}

dispatch 方法的内容很相似,把收到的消息转发给partitionProducer

代码语言:javascript
复制
func (tp *topicProducer) dispatch(){
     for msg := range tp.input {
          handler = tp.parent.newPartitionProducer(msg.Topic, msg.Partition)
          handler <- msg
     }
}

接着看下partitionProducer做了什么:

代码语言:javascript
复制
// one per partition per topic
// dispatches messages to the appropriate broker
// also responsible for maintaining message order during retries
type partitionProducer struct {
  parent    *asyncProducer
  topic     string
  partition int32
  input     <-chan *ProducerMessage

  leader         *Broker
  breaker        *breaker.Breaker
  brokerProducer *brokerProducer

  // highWatermark tracks the "current" retry level, which is the only one where we actually let messages through,
  // all other messages get buffered in retryState[msg.retries].buf to preserve ordering
  // retryState[msg.retries].expectChaser simply tracks whether we've seen a fin message for a given level (and
  // therefore whether our buffer is complete and safe to flush)
  highWatermark int
  retryState    []partitionRetryState
}
代码语言:javascript
复制
func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan<- *ProducerMessage {
   input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
   go withRecover(pp.dispatch)
     return input
}

没错,它同样起了个协程,返回了一个input channel用来接受消息,我们看看dispatch 的具体实现:

代码语言:javascript
复制
func (pp *partitionProducer) dispatch() {
   pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition)
   pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader)
   pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
   for msg := range pp.input {
     pp.brokerProducer.input <- msg
   }
}

对,你没有看错,它同样创建了一个brokerProducer,然后把msg 发送到了brokerProducer的input channel。getBrokerProducer依赖于这个partitation的leader,发送消息都是发送到partation的leader,获取leader的方式是通过存储在kafka中的元数据得到的,后面会详细介绍。

代码语言:javascript
复制
func (p *asyncProducer) getBrokerProducer(broker *Broker) *brokerProducer {
   bp = p.newBrokerProducer(broker)
}
代码语言:javascript
复制
// groups messages together into appropriately-sized batches for sending to the broker
// handles state related to retries etc
type brokerProducer struct {
  parent *asyncProducer
  broker *Broker

  input     chan *ProducerMessage
  output    chan<- *produceSet
  responses <-chan *brokerProducerResponse
  abandoned chan struct{}
  stopchan  chan struct{}

  buffer     *produceSet
  timer      <-chan time.Time
  timerFired bool

  closing        error
  currentRetries map[string]map[int32]error
}

brokerProducer同样起了两个协程

代码语言:javascript
复制
func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
      go withRecover(bp.run)
        // minimal bridge to make the network response `select`able
      go withRecover(func() {
        for set := range bridge {
          request := set.buildRequest()
    
          response, err := broker.Produce(request)
    
          responses <- &brokerProducerResponse{
            set: set,
            err: err,
            res: response,
          }
        }
        close(responses)
      })
}

run是一个循环,不断从input消费message,为请求kafka做准备。

代码语言:javascript
复制
func (bp *brokerProducer) run() {
      for {
         select {
            case msg, ok := <-bp.input:
                bp.buffer.add(msg)
            case output <- bp.buffer:
                 bp.rollOver()
            case response, ok := <-bp.responses:
                if ok {
                    bp.handleResponse(response)
               }
}

第二个协程,就是做了构建请求,发起请求,传递返回结果三件事情。

代码语言:javascript
复制
func (ps *produceSet) buildRequest() *ProduceRequest {
  for topic, partitionSets := range ps.msgs {
    for partition, set := range partitionSets {
         req.AddBatch(topic, partition, rb)
         }
    } 
    req.AddMessage(topic, partition, compMsg)
}

message 定义在async_producer.go中

代码语言:javascript
复制
type Message struct {
  Codec            CompressionCodec // codec used to compress the message contents
  CompressionLevel int              // compression level
  LogAppendTime    bool             // the used timestamp is LogAppendTime
  Key              []byte           // the message key, may be nil
  Value            []byte           // the message contents
  Set              *MessageSet      // the message set a message might wrap
  Version          int8             // v1 requires Kafka 0.10
  Timestamp        time.Time        // the timestamp of the message (version 1+ only)

  compressedCache []byte
  compressedSize  int // used for computing the compression ratio metrics
}

接着就是发送消息

代码语言:javascript
复制
func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
    response = new(ProduceResponse)
    err = b.sendAndReceive(request, response)
}
代码语言:javascript
复制
func (b *Broker) sendAndReceive(req protocolBody, res protocolBody) error {
     promise, err := b.send(req, res != nil, responseHeaderVersion)
}
代码语言:javascript
复制
func (b *Broker) send(rb protocolBody, promiseResponse bool, responseHeaderVersion int16) (*responsePromise, error) {
      bytes, err := b.write(buf)
}

调用tcp连接发送数据

代码语言:javascript
复制
func (b *Broker) write(buf []byte) (n int, err error) {
   return b.conn.Write(buf)
}

上面就是整个数据的传递路径。

2, producer.SendMessage 发送消息

代码语言:javascript
复制
func (sp *syncProducer) SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error) {
      sp.producer.Input() <- msg
}

SendMessage 就很简单了,往producer.Input的chanel里扔数据就好了。

上面就是sarama的生产消息流程,总结下,核心流程如下:

syncProducer->topicProducer->partitionProducer->brokerProducer

消息就是沿着这几个对象的input chanel 向下流动,最后通过tcp连接发送给kafka。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-09-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档