前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >go 如何给服务做限流?

go 如何给服务做限流?

原创
作者头像
Johns
修改2022-06-30 10:29:05
2.7K0
修改2022-06-30 10:29:05
举报
文章被收录于专栏:代码工具代码工具

一。什么是限流,限流/降级/熔断/隔离的区别是什么?

限流

【1】限流就是限制流量进入或者从系统出去的速率,防止流量过高导致系统过载或者崩溃。

【2】限流需要关注的是在哪里进行限制和限制多少速率合适。一般我们会在系统入口或者

系统出口限制流量的速率,至于限制的速率有时候我们限制的是一个固定的阈值,比如设置

系统QPS<=2000,有的时候则是算法动态地感知系统的压力来自动化地设置这个阈值。

降级

【1】降级就是在主流程、主方案以外,还有兜底方案,当主方案出现问题时,可以马上切换到兜底方案,

说白了就是当在系统过载时,尽可能地保证用户产品体验。

【2】降级也分为自动降级和手动降级,前者是系统自动检测到问题时自动切换,后者是系统检测到问题报警,

人为进行切换。所以需要在做设计的时候结合业务场景考虑哪些环节可能会出问题,出了问题如何降级,是自

动降级还是手动降级,降级后需要启用怎么样的应急处理流程等等。

【3】一般以下情況均可采用降級方法:超时降级、异常降级、失败次数降级、拒绝服务降级、限流也是降级。

熔断

【1】熔断一般是在服务异常或调用出现问题时,及时断掉,不再调用。就像保险丝,当达到一定电流时,

保险丝熔断,保护其他电器不受损。除了服务调用异常可以熔断外,还有一种情况是,在秒杀或大促时,

可以熔断一些边缘服务,从而保证下单等主要服务的可用性。

【2】需要熔断的情况一般是:涉及到核心功能运行时,熔断边缘服务、服务调用异常时,配合降级的兜底

方案,进行熔断、服务被攻击时熔断等等。

隔离

【1】我们的系统通常提供了不止一个服务,但是这些服务在运行时是部署在一个实例,或者一台物理机

上面的,如果不对服务资源做隔离,一旦一个服务出现了问题,整个系统的稳定性都会受到影响,甚至

发生雪崩事件。隔离的目的就是避免服务之间相互影响。

【2】隔离需要考虑的是隔离什么,何处隔离。一般我们都会隔离线程池,数据库, 缓冲等资源。至于何处

隔离,需要根据隔离的对象具体分析。

总结:限流/降级/熔断/隔离,尽管关注的点不一样,但都是保证系统的稳定性,可用性的方法。

在开发高并发系统时,有三把利器用来保护系统:缓存、降级和限流。限流是保障服务高可用的方式之一.

尤其是在微服务架构中,对接口或资源进行限流可以有效地保障服务的可用性和稳定性。

二。常用的限流算法有哪些?

计数器算法

最简单的限流算法就是维护一个计数器 Counter,当一个请求来时,就做加一操作,当一个请求处理完后就做减一操作。如果这个 Counter 大于某个数了(我们设定的限流阈值),那么就出发限流行为以保护系统的负载了。这个算法足够的简单粗暴。

队列算法

在这个算法下,请求的速度可以是波动的,而处理的速度则是非常均速的。这个算法其实有点像一个 FIFO 的算法。

普通的队列算法
普通的队列算法

在队列基础上,又衍生出了一些有趣的设计。比如优先级队列算法, 带权重的队列算法

优先级队列算法
优先级队列算法
带权重的队列算法
带权重的队列算法

令牌桶算法

以恒定的速度往令牌桶中放入令牌,当有请求过来则从令牌桶中获取令牌进行后续请求,当获取令牌失败后则会出发限流行为。一般来讲令牌桶算法控制的是请求打到当前服务节点的服务速率。

令牌桶算法 图例1
令牌桶算法 图例1
令牌桶算法 图例2
令牌桶算法 图例2

漏桶算法

请求先进入到漏桶里,而漏桶以固定的速率处理请求,当请求数量超过漏桶的容量时,超出的请求则会出发限流行为,不适合突发请求场景。本质上来说,漏桶算法本质上是一种队列算法的实现,一般来讲漏桶算法控制的是请求打到当前服务节点下一个节点的速率。

漏桶算法 图例
漏桶算法 图例

动态自适应限流算法

前面的几个算法都是要预先设置一个固定的阈值,但是更多时候我们希望这一切都是自动化的。算法能够动态地感知系统的压力来自动化地限流。

这方面设计的典范是 TCP 协议的拥塞控制的算法。我在这边简单介绍一下:

慢启动算法阶段:

刚刚加入网络的连接,需要慢慢的增加调整到网络的最佳值,算法如下:。

1)连接建好的开始先初始化cwnd = 1,表明可以传一个数据包M。

2)每当收到一个ACK,cwnd++; 呈线性上升。

3)每过一个传输伦次(RTT) cwnd翻倍,呈指数提升。

4)cwnd不可能无限增长,所以会有一个ssthresh(slow start threshold)参数,当cwnd >= ssthresh时,就会进入“拥塞避免算法”。

慢启动
慢启动

拥塞避免算法阶段:

当cwnd >= ssthresh时,就会进入“拥塞避免算法”。一般来说ssthresh的值是65535,单位是字节,当cwnd达到这个值时后,算法如下:

1)收到一个ACK时,cwnd = cwnd + 1/cwnd

2)每过一个传输伦次(RTT) cwnd + 1

这样就可以避免增长过快导致网络拥塞,慢慢的增加调整到网络的最佳值。很明显,是一个线性上升的算法。

快重传算法阶段:

1)当收到3个duplicate ACK时就开启重传,并令 sshthresh = cwnd, cwnd=cwnd/2。并进入到快恢复算法阶段

2)当RTO (数据包M重传超时)的时候,直接将sshthresh = cwnd /2, cwnd=1,进入慢启动算法阶段

快重传
快重传

快恢复算法阶段:

快恢复算法阶段进入前已经存在 sshthresh = cwnd, cwnd=cwnd/2。恢复阶段算法比较多,但是根本思想是

1)如果回传包在RTO内就ACK了或者已经收到新的ACK了, 那么应该尽快将cwnd提升上去,进入拥塞避免算法阶段:

2)如果重传出现RTO, 那么就需要进入慢启动算法阶段

因此

复用TCP协议的这种流控设计,我们完全可以对每次请求的响应情况进行统计分析,让系统根据分析报告自动调整阈值。比如,我现在有个广告竞价服务,约定50ms内必须返回素材结果,而我们的业务目标是P95, 于是我每10分钟统计一次服务的一次P95响应时延的值,如果大于50ms那么就将QPS阈值减半。 如果发现P95远小于50ms那么我就会慢慢把QPS+20的方式每次提高阈值看P95时延是否快接近50ms了,如果在某个阈值下达到了PP95时延达到了45.5ms那么就停止对QPS阈值的滑动。

三。单机节点限流方案

【1】 使用channel实现限流

使用go语言提供了一个通道结构,初始化一个带缓冲区的chan,当缓冲区满的时候,往通道写数据会被堵塞,当缓冲区没有数据时,读取会被堵塞。借助这个特征我们可以实现一个简单的限流方案,具体代码如下:

代码语言:txt
复制
package client

import (
	"fmt"
	"strconv"
	"testing"
	"time"
)

/**
 * @Description
 * @Author guirongguo
 * @Date 2021/7/14 11:13
 **/
const layout = "2006-01-01 15:04:05"

// TestRateLimitByChan 使用channel进行限流, 通过大小固定的通道堵塞的方式实现
func TestRateLimitByChan(t *testing.T) {
	startTime := time.Now()
	t.Log("task start at " + startTime.Format(layout))
	length := 200
	limitRate := make(chan bool, 10)
	chs := make([]chan string, length)
	for i := 0; i < length; i++ {
		limitRate <- true
		chs[i] = make(chan string, 1)
		go subTask(strconv.FormatInt(int64(i), 10), chs[i], limitRate)
	}

	for _, ch := range chs {
		t.Log("task start at " + <-ch)
	}
}

// subTask子任务
func subTask(taskId string, result chan string, limitRate chan bool) {
	ch := make(chan string, 1)
	go func() {
		time.Sleep(time.Duration(1) * time.Second)
		ch <- "Task-" + taskId + " run success " + time.Now().Format(layout)
	}()

	select {
	case ret := <-ch:
		result <- ret
		<-limitRate
	case <-time.After(time.Duration(2) * time.Second):
		<-limitRate
		result <- "Task-" + taskId + " run timeout " + time.Now().Format(layout)
	}
}

【2】 使用计数器实现限流

go语言的"time"库里面有一个time.NewTicker方法,可以定时地产生令牌。借此我们可以实现一个简单的令牌桶算法。

代码语言:txt
复制
// 使用计数器方式进行限流
func TestRateLimitByCount(t *testing.T) {
     // 设置每50ms往通道发一个数据
	ticker := time.NewTicker(time.Millisecond * 50)
	defer ticker.Stop()
	length := 20
	chs := make([]chan string, length)
	for i := 0; i < length; i++ {
		chs[i] = make(chan string, 1)
		go func(taskId string, ch chan string) {
			<-ticker.C
			time.Sleep(time.Duration(4) * time.Millisecond)
			ch <- "Task-" + taskId + " run success " + time.Now().Format(layout)
		}(strconv.FormatInt(int64(i), 10), chs[i])
	}
	for _, ch := range chs {
		t.Log("task start at " + <-ch)
	}
}

定时任务一般有3种模式FixDelay, Cron, FixedRate, 主要区别如下:

(1)FixDelay:以固定的时间间隔执行任务,如果前一个任务执行完成后,间隔3秒执行下一个任务,不关注任务的执行时长。

(2)Cron: 周期性检查是否可以执行任务,到了时间点检查如果前一个任务跑完了,就跑下一个任务,如果没跑完,就继续跑这个任务。只有到了周期的时间点才回检查。

(3)FixedRate: 周期性执行任务,如果前一个任务执行完成后,发现后一个任务其实早就应该执行了,则马上执行后一个任务。如果前一个没有执行完成,就会等到任务执行完成。 time.NewTicker方法就是一个FixedRate模式的任务。

【3】 使用netutils对服务器请求进行限流

一般我们的限流都是针对http请求的,而go官方就自带了一个简单的限流库golang.org/x/net/netutil

代码语言:txt
复制
import (
	"fmt"
	"golang.org/x/net/netutil"
	"net"
	"net/http"
	"strconv"
	"testing"
	"time"
)
// TestRateLimitByHttpServer 使用通过httpServer的官方的限流工具进行限流
func TestRateLimitByHttpServer(t *testing.T) {
	addr := ":8080"
	listener, err := net.Listen("tcp", addr)
	if err != nil {
		fmt.Errorf("start server at %s failed", addr)
	}
	defer listener.Close()
    // 这里我限制了每次只能并发处理20个请求,多了会被拒绝
	listener = netutil.LimitListener(listener, 20)

	mux := http.DefaultServeMux
	mux.HandleFunc("/index", func(writer http.ResponseWriter, request *http.Request) {
		writer.WriteHeader(http.StatusOK)
		writer.Write([]byte("welcome to index"))
		return
	})

	http.Serve(listener, mux)
}

我们来看一下netutil提供的limiter实现:

代码语言:txt
复制
package netutil // import "golang.org/x/net/netutil"

import (
	"net"
	"sync"
)

// LimitListener returns a Listener that accepts at most n simultaneous
// connections from the provided Listener.
func LimitListener(l net.Listener, n int) net.Listener {
	return &limitListener{
		Listener: l,
		sem:      make(chan struct{}, n),
		done:     make(chan struct{}),
	}
}

type limitListener struct {
	net.Listener
	sem       chan struct{}
	closeOnce sync.Once     // ensures the done chan is only closed once
	done      chan struct{} // no values sent; closed when Close is called
}

// acquire acquires the limiting semaphore. Returns true if successfully
// accquired, false if the listener is closed and the semaphore is not
// acquired.
func (l *limitListener) acquire() bool {
	select {
	case <-l.done:
		return false
	case l.sem <- struct{}{}:
		return true
	}
}

// 请求结束后释放sem信号量
func (l *limitListener) release() { <-l.sem }

func (l *limitListener) Accept() (net.Conn, error) {
	acquired := l.acquire()
	
	// 能够进入到这里一定是获得了信号量
	c, err := l.Listener.Accept()
	if err != nil {
		// 如果请求处理异常的话,需要把已经占用的信号量释放掉。
		if acquired {
			l.release()
		}
		return nil, err
	}
	return &limitListenerConn{Conn: c, release: l.release}, nil
}

func (l *limitListener) Close() error {
	err := l.Listener.Close()
	l.closeOnce.Do(func() { close(l.done) })
	return err
}

type limitListenerConn struct {
	net.Conn
	releaseOnce sync.Once
	release     func()
}

func (l *limitListenerConn) Close() error {
	err := l.Conn.Close()
	l.releaseOnce.Do(l.release)
	return err
}

可以看到netutil也是基于借助带缓存区的chan实现的限流。

1)在每次请求的时候,都会往sem发送一个数据获得一个信号量,当通道sem槽位满了的时候,往通道写数据会堵塞,请求因此也会被堵塞。

2)每次请求完成的时候,尝试从通道消费一个数据归还一个信号量。

3)服务关闭时,所有的将会在Accept上直接返回异常,程序退出

4)请求处理异常的时候,需要将hold的信号量释放掉。

【4】 使用golang.org/x/time/rate库提供的令牌桶算法

代码语言:txt
复制
import (
	"fmt"
	"golang.org/x/time/rate"
	"net"
	"net/http"
	"strconv"
	"testing"
	"time"
)
// 使用golang/rate 实现, 令牌桶算法
func TestRateLimitByGoRate(t *testing.T) {
	// 以每秒10个的速度放入ticker, 桶容量为4
	ticker := rate.NewLimiter(3, 2)
	length := 20
	chs := make([]chan string, length)
	for i := 0; i < length; i++ {
		chs[i] = make(chan string, 1)
		go func(taskId string, ch chan string, r *rate.Limiter) {
			if ticker.Allow() {
				ch <- "Task-" + taskId + " not allow " + time.Now().Format(layout)
			}
			time.Sleep(time.Duration(4) * time.Millisecond)
			ch <- "Task-" + taskId + " run success " + time.Now().Format(layout)
		}(strconv.FormatInt(int64(i), 10), chs[i], ticker)
	}
	for _, ch := range chs {
		t.Log("task start at " + <-ch)
	}
}

golang/rate 使用说明:

1) 支持堵塞Wait()/WaitN()和非堵塞方式Allow()/AllowN()获取token。

2)能够通过SetLimit()和SetBurst()动态调整每秒token生成的速率和令牌桶大小。

【5】 使用go.uber.org/ratelimit库提供的漏桶算法

代码语言:txt
复制
import (
	"fmt"
	"go.uber.org/ratelimit"
	"net"
	"net/http"
	"strconv"
	"testing"
	"time"
)
// 使用uber/ratelimit 实现, 漏桶算法
func TestRateLimitByUber(t *testing.T) {
	// 每秒请求100次
	ticker := ratelimit.New(3)
	length := 20
	chs := make([]chan string, length)
	for i := 0; i < length; i++ {
		chs[i] = make(chan string, 1)
		go func(taskId string, ch chan string) {
			ticker.Take()
			time.Sleep(time.Duration(1) * time.Second)
			ch <- "Task-" + taskId + " run success " + time.Now().Format(layout)
		}(strconv.FormatInt(int64(i), 10), chs[i])
	}
	for _, ch := range chs {
		t.Log("task start at " + <-ch)
	}
}

go.uber.org/ratelimit 使用说明:

1)使用Take限制消费的速度,token会堵塞知道知道可以执行。

2)传统的 Leaky Bucket,每个请求的间隔是固定的,然而,在实际上的互联网应用中,流量经常是突发性的。对于这种情况,uber-go 对 Leaky Bucket 做了一些改良,引入了最大松弛量 (maxSlack) 的概念。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一。什么是限流,限流/降级/熔断/隔离的区别是什么?
    • 限流
      • 降级
        • 熔断
          • 隔离
          • 二。常用的限流算法有哪些?
            • 计数器算法
              • 队列算法
                • 令牌桶算法
                  • 漏桶算法
                    • 动态自适应限流算法
                    • 三。单机节点限流方案
                      • 【1】 使用channel实现限流
                        • 【2】 使用计数器实现限流
                          • 【3】 使用netutils对服务器请求进行限流
                            • 【4】 使用golang.org/x/time/rate库提供的令牌桶算法
                              • 【5】 使用go.uber.org/ratelimit库提供的漏桶算法
                              领券
                              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档