我的目标:是设置每分钟600个请求的速率限制,在下一分钟重置。我的意图是通过http.client
设置一个带有limit.wait()
的RoundTrip
来实现这一点。这样,我就可以为不同的http.clients()
设置不同的限制,并通过roundtrip
来处理限制,而不是在其他地方增加代码的复杂性。
问题是利率限制没有得到遵守,我仍然超过了允许的请求数量,设置超时会产生致命的恐慌net/http: request canceled (Client.Timeout exceeded while awaiting headers)
。
我创建了一个基本的main.go
来复制这个问题。请注意,对于我来说,64000循环是一个现实的场景。
Update:设置ratelimiter: rate.NewLimiter(10, 10),
仍然以某种方式超过了600的速率限制,并且在设置超时时会产生Context deadline exceeded
错误。
package main
import (
"fmt"
"io/ioutil"
"net/http"
"sync"
"time"
"golang.org/x/time/rate"
)
var client http.Client
// ThrottledTransport Rate Limited HTTP Client
type ThrottledTransport struct {
roundTripperWrap http.RoundTripper
ratelimiter *rate.Limiter
}
func (c *ThrottledTransport) RoundTrip(r *http.Request) (*http.Response, error) {
err := c.ratelimiter.Wait(r.Context()) // This is a blocking call. Honors the rate limit
if err != nil {
return nil, err
}
return c.roundTripperWrap.RoundTrip(r)
}
// NewRateLimitedTransport wraps transportWrap with a rate limitter
func NewRateLimitedTransport(transportWrap http.RoundTripper) http.RoundTripper {
return &ThrottledTransport{
roundTripperWrap: transportWrap,
//ratelimiter: rate.NewLimiter(rate.Every(limitPeriod), requestCount),
ratelimiter: rate.NewLimiter(10, 10),
}
}
func main() {
concurrency := 20
var ch = make(chan int, concurrency)
var wg sync.WaitGroup
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
go func() {
for {
a, ok := <-ch
if !ok { // if there is nothing to do and the channel has been closed then end the goroutine
wg.Done()
return
}
resp, err := client.Get("https://api.guildwars2.com/v2/items/12452")
if err != nil {
fmt.Println(err)
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
fmt.Println(err)
}
fmt.Println(a, ":", string(body[4:29]))
}
}()
}
client = http.Client{}
client.Timeout = time.Second * 10
// Rate limits 600 requests per 60 seconds via RoundTripper
transport := NewRateLimitedTransport(http.DefaultTransport)
client.Transport = transport
for i := 0; i < 64000; i++ {
ch <- i // add i to the queue
}
wg.Wait()
fmt.Println("done")
}
发布于 2022-10-29 01:55:10
rate.NewLimiter(rate.Every(60*time.Second), 600)
不是你想要的。
根据https://pkg.go.dev/golang.org/x/time/rate#Limiter
限制器控制允许事件发生的频率。它实现了一个大小为b的“令牌桶”,最初是满的,并以每秒r令牌的速率重新填充。非正式地,在足够大的时间间隔内,限制器将速率限制为每秒r令牌(),最大突发大小为b events。
函数NewLimiter(r极限,b int) *限制器
NewLimiter返回一个新的限制器,它允许事件速率达到r,最多允许b标记的突发。
每一个(间隔time.Duration)极限
每个事件将事件之间的最小时间间隔转换为一个限制。
rate.Every(60*time.Second)
意味着它每隔60年代就会装满1枚令牌。也就是说,速率是每秒1/60
令牌。
大多数情况下,600 requests per minute
意味着在开始时允许600
请求,并将在下一分钟立即重置为600
。在我看来,golang.org/x/time/rate
不太适合这个用例。也许rate.NewLimiter(10, 10)
是一个安全的选择。
发布于 2022-11-02 20:31:18
下面是一个操场示例,其中roundTripper模拟了会馆API的响应:
https://go.dev/play/p/FTw6IGo_moP
对代码的唯一有意义的修正是:
resp.Body
(也许这就是您恐慌的原因?)之后关闭通道
简单的回答是:使用此设置(没有网络问题,不取决于实际api服务器的行为),它可以工作:
# excerpt from the output:
...
235 : "name": "Omnomberry Bar"
236 : "name": "Omnomberry Bar"
237 : "name": "Omnomberry Bar"
238 : "name": "Omnomberry Bar"
239 : "name": "Omnomberry Bar"
--- 60 reqs/sec
240 : "name": "Omnomberry Bar"
241 : "name": "Omnomberry Bar"
242 : "name": "Omnomberry Bar"
...
也许您的问题来自于与实际服务器的联系。
如果它在没有警告的情况下开始断开连接,或者给出延迟时间越长越长的响应,这可能会解释超时问题。
尝试测量RoundTrip
仍然停留在ratelimiter.Wait()
上的实际时间,以及请求/服务器响应所需的实际时间。
我以较短的时间运行示例,如果程序运行时间足够长( 10 req/s下的64k请求仍然是6400 s,接近2h ),您可能会遇到运行时问题:
由于传输检查单个请求的超时设置后的速率限制,如果运行时选择(出于某些糟糕的原因)安排在10秒后等待rate.Wait(...)
的20个工作人员中的一个,则会遇到context.Deadline错误。
(注:我没有事实支持这一说法,只是假设而已)
最简单的解决办法是:
client.Get(...)
.
ratelimiter.Wait(...)
在
另一个测试选项是:
client.Timeout
,ratelimiter.Wait(...)
警卫后设置了传输超时。https://stackoverflow.com/questions/74242183
复制相似问题