前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >golang 源码分析:ghz && mysql压测工具实现

golang 源码分析:ghz && mysql压测工具实现

作者头像
golangLeetcode
发布2022-08-03 13:51:02
7870
发布2022-08-03 13:51:02
举报
文章被收录于专栏:golang算法架构leetcode技术php

grpc压测工具golang实现的版本ghz:

代码语言:javascript
复制
https://github.com/bojand/ghz

通过对源码进行分析,实现了类似mysqlap的mysql压测工具:

代码语言:javascript
复制
https://github.com/xiazemin/mysqlslap-go

当然,在源码分析的过程中也发现了ghz的一个bug:

代码语言:javascript
复制
https://github.com/bojand/ghz/pull/323/files

ghz的使用:

代码语言:javascript
复制
./ghz --skipTLS --insecure --protoset ./bundle.protoset \
-B ./grpc_payload --call tensorflow.serving.PredictionService/Predict \ 
127.0.0.1:8500

具体用法可以参考wiki:https://ghz.sh/docs/examples,也可以使用-D参数来加载json格式的参数list,请求的时候会通过round robin的形式依次取对应数据,下面我们开始看下源码:

入口是:cmd/ghz/main.go,首先是参数解析:

代码语言:javascript
复制
isCallSet = false
call      = kingpin.Flag("call", `A fully-qualified method name in 'package.Service/method' or 'package.Service.Method' format.`).
PlaceHolder(" ").IsSetByUser(&isCallSet).String()
// Concurrency
isCSet = false
c      = kingpin.Flag("concurrency", "Number of request workers to run concurrently for const concurrency schedule. Default is 50.").
Short('c').Default("50").IsSetByUser(&isCSet).Uint()
isCPUSet = false
cpus     = kingpin.Flag("cpus", "Number of cpu cores to use.").
Default(strconv.FormatUint(uint64(nCPUs), 10)).IsSetByUser(&isCPUSet).Uint()

解析完命令行参数,就开始执行命令:

代码语言:javascript
复制
report, err := runner.Run(cfg.Call, cfg.Host, options...)

具体代码实现在runner/run.go

代码语言:javascript
复制
func Run(call, host string, options ...Option) (*Report, error) {
    c, err := NewConfig(call, host, options...)
    oldCPUs := runtime.NumCPU()
    runtime.GOMAXPROCS(c.cpus)
    defer runtime.GOMAXPROCS(oldCPUs)
     
      reqr, err := NewRequester(c)
      mtd, err = protodesc.GetMethodDescFromProto(c.call, c.proto, c.importPaths)
      mtd, err = protodesc.GetMethodDescFromProtoSet(c.call, c.protoset)
      cc, err = reqr.newClientConn(false)
      refClient := grpcreflect.NewClient(refCtx, reflectpb.NewServerReflectionClient(cc))
      mtd, err = protodesc.GetMethodDescFromReflect(c.call, refClient)
       
     go func() {
      <-cancel
       reqr.Stop(ReasonCancel)
     }()
        
    if c.z > 0 {
      go func() {
      time.Sleep(c.z)
      reqr.Stop(ReasonTimeout)
         }()
      }
      rep, err := reqr.Run()  

通过配置初始化option,获取对应请求,如果没有指定proto文件或者proto文件目录,通过grpc reflection 获取指定方法的describe。

runner/options.go里定义压测的选项参数:

代码语言:javascript
复制
func NewConfig(call, host string, options ...Option) (*RunConfig, error)
    type RunConfig struct {
  // call settings
  call              string
  host              string
  proto             string
  importPaths       []string
  protoset          string
  enableCompression bool


  // security settings
  creds      credentials.TransportCredentials
  cacert     string
  cert       string
  key        string
  cname      string
  skipVerify bool
  insecure   bool
  authority  string


  // load
  rps              int
  loadStart        uint
  loadEnd          uint
  loadStep         int
  loadSchedule     string
  loadDuration     time.Duration
  loadStepDuration time.Duration


  pacer load.Pacer


  // concurrency
  c             int
  cStart        uint
  cEnd          uint
  cStep         int
  cSchedule     string
  cMaxDuration  time.Duration
  cStepDuration time.Duration


  workerTicker load.WorkerTicker


  // test
  n     int
  async bool


  // number of connections
  nConns int


  // timeouts
  z             time.Duration
  timeout       time.Duration
  dialTimeout   time.Duration
  keepaliveTime time.Duration


  zstop string


  streamInterval        time.Duration
  streamCallDuration    time.Duration
  streamCallCount       uint
  streamDynamicMessages bool


  // lbStrategy
  lbStrategy string


  // TODO consolidate these actual value fields to be implemented via provider funcs
  // data & metadata
  data     []byte
  metadata []byte
  binary   bool


  dataFunc         BinaryDataFunc
  dataProviderFunc DataProviderFunc
  dataStreamFunc   StreamMessageProviderFunc
  mdProviderFunc   MetadataProviderFunc


  funcs template.FuncMap


  // reflection metadata
  rmd map[string]string


  // debug
  hasLog bool
  log    Logger


  // misc
  name        string
  cpus        int
  tags        []byte
  skipFirst   int
  countErrors bool
  recvMsgFunc StreamRecvMsgInterceptFunc
}

在runner/requester.go里处理参数的组装和发送请求相关的工作:

代码语言:javascript
复制
func NewRequester(c *RunConfig) (*Requester, error) 
      md := mtd.GetInputType()
      payloadMessage := dynamic.NewMessage(md)
      reqr.dataProvider = c.dataProviderFunc
      defaultDataProvider, err := newDataProvider(reqr.mtd, c.binary, c.dataFunc, c.data, c.funcs)
      reqr.metadataProvider = c.mdProviderFunc
      defaultMDProvider, err := newMetadataProvider(reqr.mtd, c.metadata, c.funcs)

request的定义如下

代码语言:javascript
复制
type Requester struct {

  conns    []*grpc.ClientConn
  stubs    []grpcdynamic.Stub
  handlers []*statsHandler


  mtd      *desc.MethodDescriptor
  reporter *Reporter


  config *RunConfig


  results chan *callResult
  stopCh  chan bool
  start   time.Time


  dataProvider     DataProviderFunc
  metadataProvider MetadataProviderFunc


  lock       sync.Mutex
  stopReason StopReason
  workers    []*Worker
}

Run函数是核心函数,发送请求,记录返回结果,并且最终会返回一个报告信息:

代码语言:javascript
复制
func (b *Requester) Run() (*Report, error) 
      cc, err := b.openClientConns()
      start := time.Now()
      stub := grpcdynamic.NewStub(cc[n])
      b.reporter = newReporter(b.results, b.config)
        go func() {
    b.reporter.Run()
  }()
      wt := createWorkerTicker(b.config)
      p := createPacer(b.config)
      err = b.runWorkers(wt, p)
      report := b.Finish()
      b.closeClientConns()

打开grpc连接请求的链路如下:

代码语言:javascript
复制
func (b *Requester) openClientConns() ([]*grpc.ClientConn, error) 

      c, err := b.newClientConn(true)  
代码语言:javascript
复制
func (b *Requester) newClientConn(withStatsHandler bool) (*grpc.ClientConn, error) 

      return grpc.DialContext(ctx, b.config.host, opts...)

创建定时器,定时将返回结果写入一个chan里面,供reporter消费:

代码语言:javascript
复制
func createWorkerTicker(config *RunConfig) load.WorkerTicker 
func createPacer(config *RunConfig) load.Pacer 

runWorkers执行最后的具体工作,最终调用w.runWorker():

代码语言:javascript
复制
func (b *Requester) runWorkers(wt load.WorkerTicker, p load.Pacer) error 
   wct := wt.Ticker()
  go func()
    wt.Run()
  }()
      go func() {
        for tv := range wct {
          b.workers = append(b.workers, &w)
                 go func() {
                     errC <- w.runWorker()
                  }()
                  for _, wrk := range b.workers {
                    wrk.Stop()
                    
      go func() {
        b.workers[i].Stop()
        for {
          wait, stop := p.Pace(time.Since(began), counter.Get())

Finish渲染压测报告:

代码语言:javascript
复制
func (b *Requester) Finish() *Report 

      total := time.Since(b.start)
      <-b.reporter.done
      return b.reporter.Finalize(r, total)

中间需要做json到proto的转换,代码在protodesc/protodesc.go

代码语言:javascript
复制
func GetMethodDescFromReflect(call string, client *grpcreflect.Client) (*desc.MethodDescriptor, error)
      call = strings.Replace(call, "/", ".", -1)
      file, err := client.FileContainingSymbol(call)
      return getMethodDesc(call, files)  
代码语言:javascript
复制
func getMethodDesc(call string, files map[string]*desc.FileDescriptor) (*desc.MethodDescriptor, error)

      svc, mth, err := parseServiceMethod(call)
      dsc, err := findServiceSymbol(files, svc)
      mtd := sd.FindMethodByName(mth)  
代码语言:javascript
复制
func GetMethodDescFromProto(call, proto string, imports []string) (*desc.MethodDescriptor, error)

      fds, err := p.ParseFiles(filename)
      return getMethodDesc(call, files)

最终的数据格式runner/data.go

代码语言:javascript
复制
func newDataProvider(mtd *desc.MethodDescriptor,
  binary bool, dataFunc BinaryDataFunc, data []byte,
  funcs template.FuncMap) (*dataProvider, error) 
      ctd := newCallData(mtd, funcs, "", 0)
      ha, err := ctd.hasAction(string(dp.data))
代码语言:javascript
复制
type dataProvider struct {

  binary   bool
  data     []byte
  mtd      *desc.MethodDescriptor
  dataFunc BinaryDataFunc


  arrayJSONData []string
  hasActions    bool


  // cached messages only for binary
  mutex          sync.RWMutex
  cachedMessages []*dynamic.Message
}

在runner/calldata.go里组装最终的请求上下文信息:

代码语言:javascript
复制
type CallData struct {
  WorkerID           string // unique worker ID
  RequestNumber      int64  // unique incremented request number for each request
  FullyQualifiedName string // fully-qualified name of the method call
  MethodName         string // shorter call method name
  ServiceName        string // the service name
  InputName          string // name of the input message type
  OutputName         string // name of the output message type
  IsClientStreaming  bool   // whether this call is client streaming
  IsServerStreaming  bool   // whether this call is server streaming
  Timestamp          string // timestamp of the call in RFC3339 format
  TimestampUnix      int64  // timestamp of the call as unix time in seconds
  TimestampUnixMilli int64  // timestamp of the call as unix time in milliseconds
  TimestampUnixNano  int64  // timestamp of the call as unix time in nanoseconds
  UUID               string // generated UUIDv4 for each call


  t *template.Template
}  
代码语言:javascript
复制
func newCallData(

  mtd *desc.MethodDescriptor,
  funcs template.FuncMap,
  workerID string, reqNum int64) *CallData 

定时器实现在load/worker_ticker.go

代码语言:javascript
复制
type WorkerTicker interface {
  // Ticker returns a channel which sends TickValues
  // When a value is received the number of workers should be appropriately
  // increased or decreased given by the delta property.
  Ticker() <-chan TickValue


  // Run starts the worker ticker
  Run()


  // Finish closes the channel
  Finish()
}

load/pacer.go里定义了分发任务的接口:

代码语言:javascript
复制
type Pacer interface {
  // Pace returns the duration the attacker should wait until
  // making next hit, given an already elapsed duration and
  // completed hits. If the second return value is true, an attacker
  // should stop sending hits.
  Pace(elapsed time.Duration, hits uint64) (wait time.Duration, stop bool)


  // Rate returns a Pacer's instantaneous hit rate (per seconds)
  // at the given elapsed duration of an attack.
  Rate(elapsed time.Duration) float64
}

最终完成请求调用的是runner/worker.go

代码语言:javascript
复制
type Worker struct {
  stub grpcdynamic.Stub
  mtd  *desc.MethodDescriptor


  config   *RunConfig
  workerID string
  active   bool
  stopCh   chan bool
  ticks    <-chan TickValue


  dataProvider     DataProviderFunc
  metadataProvider MetadataProviderFunc
  msgProvider      StreamMessageProviderFunc


  streamRecv StreamRecvMsgInterceptFunc
}

发送请求:

代码语言:javascript
复制
func (w *Worker) runWorker() error

      g := new(errgroup.Group)
      for {
        return g.Wait()
                g.Go(func() error {
          return w.makeRequest(tv)
        })
        rErr := w.makeRequest(tv)

组装请求参数

代码语言:javascript
复制
func (w *Worker) makeRequest(tv TickValue) error

      ctd := newCallData(w.mtd, w.config.funcs, w.workerID, reqNum)
      inputs, err := w.dataProvider(ctd)
      mp, err := newDynamicMessageProvider(w.mtd, w.config.data, w.config.streamCallCount)
      mp, err := newStaticMessageProvider(w.config.streamCallCount, inputs)
      _ = w.makeBidiRequest(&ctx, ctd, msgProvider)
      _ = w.makeClientStreamingRequest(&ctx, ctd, msgProvider)
      _ = w.makeServerStreamingRequest(&ctx, inputs[0])
      _ = w.makeUnaryRequest(&ctx, reqMD, inputs[0])

InvokeRpc请求的地方:

代码语言:javascript
复制
func (w *Worker) makeUnaryRequest(ctx *context.Context, reqMD *metadata.MD, input *dynamic.Message) error

      res, resErr = w.stub.InvokeRpc(*ctx, w.mtd, input, callOptions...)

中间的group是对waitgroup的一个简单封装:

pkg/mod/golang.org/x/sync@v0.0.0-20200625203802-6e8e738ad208/errgroup/errgroup.go

代码语言:javascript
复制
func (g *Group) Go(f func() error) 
      g.wg.Add(1)
      defer g.wg.Done()

压测报告的实现代码在:runner/reporter.go

代码语言:javascript
复制
func newReporter(results chan *callResult, c *RunConfig) *Reporter  
代码语言:javascript
复制
type Reporter struct {

  config *RunConfig


  results chan *callResult
  done    chan bool


  totalLatenciesSec float64


  details []ResultDetail


  errorDist      map[string]int
  statusCodeDist map[string]int
  totalCount     uint64
}

最大允许存储的结果是写死的

代码语言:javascript
复制
  const maxResult = 1000000
代码语言:javascript
复制
func (r *Reporter) Run()

      for res := range r.results 
        r.totalCount++
        r.totalLatenciesSec += res.duration.Seconds()
              r.details = append(r.details, ResultDetail{
        Latency:   res.duration,
        Timestamp: res.timestamp,
        Status:    res.status,
        Error:     errStr,
      })
代码语言:javascript
复制
type ResultDetail struct {

  Timestamp time.Time     `json:"timestamp"`
  Latency   time.Duration `json:"latency"`
  Error     string        `json:"error"`
  Status    string        `json:"status"`
}

生成最终的报告

代码语言:javascript
复制
func (r *Reporter) Finalize(stopReason StopReason, total time.Duration) *Report

      rep := &Report
      rep.Options = Options
      if len(r.details) > 0 
        average := r.totalLatenciesSec / float64(r.totalCount)
        rep.Average = time.Duration(average * float64(time.Second))
        rep.Rps = float64(r.totalCount) / total.Seconds()
        for _, d := range r.details {
          okLats = append(okLats, d.Latency.Seconds())
        sort.Float64s(okLats)
        fastestNum = okLats[0]
        slowestNum = okLats[len(okLats)-1]
        rep.Histogram = histogram(okLats, slowestNum, fastestNum)
        rep.LatencyDistribution = latencies(okLats)

报告的内容:

代码语言:javascript
复制
type Report struct {

  Name      string     `json:"name,omitempty"`
  EndReason StopReason `json:"endReason,omitempty"`


  Options Options   `json:"options,omitempty"`
  Date    time.Time `json:"date"`


  Count   uint64        `json:"count"`
  Total   time.Duration `json:"total"`
  Average time.Duration `json:"average"`
  Fastest time.Duration `json:"fastest"`
  Slowest time.Duration `json:"slowest"`
  Rps     float64       `json:"rps"`


  ErrorDist      map[string]int `json:"errorDistribution"`
  StatusCodeDist map[string]int `json:"statusCodeDistribution"`


  LatencyDistribution []LatencyDistribution `json:"latencyDistribution"`
  Histogram           []Bucket              `json:"histogram"`
  Details             []ResultDetail        `json:"details"`


  Tags map[string]string `json:"tags,omitempty"`
}

计算直方图:

代码语言:javascript
复制
func histogram(latencies []float64, slowest, fastest float64) []Bucket 
        bc := 10
  buckets := make([]float64, bc+1)
  counts := make([]int, bc+1)
  bs := (slowest - fastest) / float64(bc)
  for i := 0; i < bc; i++ {
    buckets[i] = fastest + bs*float64(i)
  }
        buckets[bc] = slowest
  var bi int
  var max int
  for i := 0; i < len(latencies); {
    if latencies[i] <= buckets[bi] {
      i++
      counts[bi]++
      if max < counts[bi] {
        max = counts[bi]
      }
    } else if bi < len(buckets)-1 {
      bi++
    }
  }
            Mark:      buckets[i],
      Count:     counts[i],
      Frequency: float64(counts[i]) / float64(len(latencies)),

计算耗时分布:

代码语言:javascript
复制
func latencies(latencies []float64) []LatencyDistribution 

      pctls := []int{10, 25, 50, 75, 90, 95, 99}
      data := make([]float64, len(pctls))
      lt := float64(len(latencies))
        for i, p := range pctls {
    ip := (float64(p) / 100.0) * lt
    di := int(ip)
            if ip == float64(di) {
      di = di - 1
    }


    if di < 0 {
      di = 0
    }
        data[i] = latencies[di]
      res := make([]LatencyDistribution, len(pctls))
      for i := 0; i < len(pctls); i++ {
        if data[i] > 0 {
                lat := time.Duration(data[i] * float64(time.Second))
      res[i] = LatencyDistribution{Percentage: pctls[i], Latency: lat}

就是将排序好的延迟按照百分比切割

如果对mysql压测工具感兴趣可以体验下:

https://github.com/xiazemin/mysqlslap-go

useage:

代码语言:javascript
复制
mysqlslap  -Hhost -uuser -ppassword -P3306 -ddatabase -q"select id from deviceattr where name='attr10' or name='attr20' group by id;" -ffilename -c 50 -i 10

输出报告:

代码语言:javascript
复制
Summary:
 Name:         mysqlslap
 Count:        40
 Total:        1.01 s
 Slowest:      1.00 s
 Fastest:      1.01 s
 Average:      1.00 s
 Requests/sec: 39.76

Response time histogram(ms):
 1003.000 [1]  |∎∎
 1003.300 [0]  |
 1003.600 [0]  |
 1003.900 [0]  |
 1004.200 [24] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎
 1004.500 [0]  |
 1004.800 [0]  |
 1005.100 [12] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎
 1005.400 [0]  |
 1005.700 [0]  |
 1006.000 [3]  |∎∎∎∎∎

Latency distribution:
 10 % in 1.00 s 
 25 % in 1.00 s 
 50 % in 1.00 s 
 75 % in 1.00 s 
 90 % in 1.00 s 
 95 % in 1.01 s 
 99 % in 1.01 s 

Status code distribution:
 [success]   40 responses   
 [failed]    0 responses 
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-11-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档