前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >流量回放工具之GoReplay output_http 源码分析

流量回放工具之GoReplay output_http 源码分析

作者头像
高楼Zee
发布2021-11-10 15:00:15
4860
发布2021-11-10 15:00:15
举报
文章被收录于专栏:7DGroup7DGroup
前言

GoReplay 对数据流的抽象出了两个概念,即用输入(input )输出(output )来表示数据来源与去向,统称为 plugin,用介于输入和输出模块之间的中间件实现拓展机制。

output_http.go:主要是http输出的插件,实现 http 协议, 实现 io.Writer 接口,最后根据配置注册到 Plugin.outputs 队列里。

参数说明

代码语言:javascript
复制
-output-http value  //转发进入的请求到一个http地址上
        Forwards incoming requests to given http address.
                # Redirect all incoming requests to staging.com address 
                gor --input-raw :80 --output-http http://staging.com
  -output-http-elasticsearch string  //把请求和响应状态发送到 ElasticSearch
        Send request and response stats to ElasticSearch:
                gor --input-raw :8080 --output-http staging.com --output-http-elasticsearch 'es_host:api_port/index_name'
  -output-http-queue-len int //http输出队列大小
        Number of requests that can be queued for output, if all workers are busy. default = 1000 (default 1000)
  -output-http-redirects int  // 设置多少次重定向被允许,默认忽略
        Enable how often redirects should be followed.
  -output-http-response-buffer value  //最大接收响应大小(缓冲区)
        HTTP response buffer size, all data after this size will be discarded.
  -output-http-skip-verify
        Don't verify hostname on TLS secure connection.
  -output-http-stats  //每5秒钟输出一次输出队列的状态
        Report http output queue stats to console every N milliseconds. See output-http-stats-ms
  -output-http-stats-ms int
        Report http output queue stats to console every N milliseconds. default: 5000 (default 5000)
  -output-http-timeout duration  //指定 http 的 request/response 超时时间,默认是 5 秒 
        Specify HTTP request/response timeout. By default 5s. Example: --output-http-timeout 30s (default 5s)
  -output-http-track-response
        If turned on, HTTP output responses will be set to all outputs like stdout, file and etc.
  -output-http-worker-timeout duration
        Duration to rollback idle workers. (default 2s)
  -output-http-workers int  //gor默认是动态的扩展工作者数量,你也可以指定固定数量的工作者
        Gor uses dynamic worker scaling. Enter a number to set a maximum number of workers. default = 0 = unlimited.
  -output-http-workers-min int
        Gor uses dynamic worker scaling. Enter a number to set a minimum number of workers. default = 1.

默认情况下,Gor 创建一个动态工作池: 它从 10 开始,并在 HTTP 输出队列长度大于 10 时创建更多的 HTTP 输出工作者。创建的工人数量(N)等于该工作时间的队列长度检查并发现其长度大于10。每次将消息写入 HTTP 输出队列时都检查队列长度。在产生 N 名工人的请求得到满足之前,不会再有工人产卵。如果动态工作人员当时不能处理消息,它将睡眠 100 毫秒。如果动态工作人员无法处理消息 2 秒钟,则会死亡。可以使用 --output-http-workers=20 选项指定固定数量的工人

HTTP 输出工作数量

NewHTTPOutput 默认情况

代码语言:javascript
复制
// NewHTTPOutput constructor for HTTPOutput
// Initialize workers
func NewHTTPOutput(address string, config *HTTPOutputConfig) PluginReadWriter {
 o := new(HTTPOutput)
 var err error
 config.url, err = url.Parse(address)
 if err != nil {
  log.Fatal(fmt.Sprintf("[OUTPUT-HTTP] parse HTTP output URL error[%q]", err))
 }
 if config.url.Scheme == "" {
  config.url.Scheme = "http"
 }
 config.rawURL = config.url.String()
 if config.Timeout < time.Millisecond*100 {
  config.Timeout = time.Second
 }
 if config.BufferSize <= 0 {
  config.BufferSize = 100 * 1024 // 100kb
 }
 if config.WorkersMin <= 0 {
  config.WorkersMin = 1
 }
 if config.WorkersMin > 1000 {
  config.WorkersMin = 1000
 }
 if config.WorkersMax <= 0 {
  config.WorkersMax = math.MaxInt32 // idealy so large
 }
 if config.WorkersMax < config.WorkersMin {
  config.WorkersMax = config.WorkersMin
 }
 if config.QueueLen <= 0 {
  config.QueueLen = 1000
 }
 if config.RedirectLimit < 0 {
  config.RedirectLimit = 0
 }
 if config.WorkerTimeout <= 0 {
  config.WorkerTimeout = time.Second * 2
 }
 o.config = config
 o.stop = make(chan bool)
 //是否收集统计信息,统计输出间隔是多少
 if o.config.Stats {
  o.queueStats = NewGorStat("output_http", o.config.StatsMs)
 }

 o.queue = make(chan *Message, o.config.QueueLen)
 if o.config.TrackResponses {
  o.responses = make(chan *response, o.config.QueueLen)
 }
 // it should not be buffered to avoid races
 o.stopWorker = make(chan struct{})

 if o.config.ElasticSearch != "" {
  o.elasticSearch = new(ESPlugin)
  o.elasticSearch.Init(o.config.ElasticSearch)
 }
 o.client = NewHTTPClient(o.config)
 o.activeWorkers += int32(o.config.WorkersMin)
 for i := 0; i < o.config.WorkersMin; i++ {
  go o.startWorker()
 }
 go o.workerMaster()
 return o
}

配置后启动 httpclient:

代码语言:javascript
复制
o.client = NewHTTPClient(o.config)
 o.activeWorkers += int32(o.config.WorkersMin)
 for i := 0; i < o.config.WorkersMin; i++ {
  go o.startWorker()
 }

启动多个发送进程:

代码语言:javascript
复制
func (o *HTTPOutput) startWorker() {
 for {
  select {
  case <-o.stopWorker:
   return
  case msg := <-o.queue:
   o.sendRequest(o.client, msg)
  }
 }
}

执行请求发送:

代码语言:javascript
复制
func (o *HTTPOutput) sendRequest(client *HTTPClient, msg *Message) {
 if !isRequestPayload(msg.Meta) {
  return
 }

 uuid := payloadID(msg.Meta)
 start := time.Now()
 resp, err := client.Send(msg.Data)
 stop := time.Now()

 if err != nil {
  Debug(1, fmt.Sprintf("[HTTP-OUTPUT] error when sending: %q", err))
  return
 }
 if resp == nil {
  return
 }

 if o.config.TrackResponses {
  o.responses <- &response{resp, uuid, start.UnixNano(), stop.UnixNano() - start.UnixNano()}
 }

 if o.elasticSearch != nil {
  o.elasticSearch.ResponseAnalyze(msg.Data, resp, start, stop)
 }
}

发送请求细节,各种配置生效点:

代码语言:javascript
复制
// Send sends an http request using client create by NewHTTPClient
func (c *HTTPClient) Send(data []byte) ([]byte, error) {
 var req *http.Request
 var resp *http.Response
 var err error

 req, err = http.ReadRequest(bufio.NewReader(bytes.NewReader(data)))
 if err != nil {
  return nil, err
 }
 // we don't send CONNECT or OPTIONS request
 if req.Method == http.MethodConnect {
  return nil, nil
 }

 if !c.config.OriginalHost {
  req.Host = c.config.url.Host
 }

 // fix #862
 if c.config.url.Path == "" && c.config.url.RawQuery == "" {
  req.URL.Scheme = c.config.url.Scheme
  req.URL.Host = c.config.url.Host
 } else {
  req.URL = c.config.url
 }

 // force connection to not be closed, which can affect the global client
 req.Close = false
 // it's an error if this is not equal to empty string
 req.RequestURI = ""

 resp, err = c.Client.Do(req)
 if err != nil {
  return nil, err
 }
 if c.config.TrackResponses {
  return httputil.DumpResponse(resp, true)
 }
 _ = resp.Body.Close()
 return nil, nil

HTTP 输出队列

队列使用的地方。

代码逻辑调用图

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-11-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 7DGroup 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 参数说明
  • HTTP 输出工作数量
  • HTTP 输出队列
  • 代码逻辑调用图
相关产品与服务
Elasticsearch Service
腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档