前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Locust + Boomer 基于 K8S 分布式压测使用说明

Locust + Boomer 基于 K8S 分布式压测使用说明

原创
作者头像
少年yr
修改2021-04-15 19:28:44
5.3K0
修改2021-04-15 19:28:44
举报

什么是 Locust

Locust 是近几年新兴的开源测试压力项目,主要是基于 Python 的多进程和协程的方式来实现并发压力。

其压测脚本实现的方式比较多样,可以通过手写编程函数,丰富可测试的范围,并配置并发占比。

举一个简单的 Locust 脚本例子:

代码语言:txt
复制
from locust import HttpUser, task, between

class MyUser(HttpUser):
    wait_time = between(5, 15)

    @task(1)
    def index(self):
        self.client.get("/")

    @task(2)
    def text(self):
        self.client.get("/text")

这个脚本的含义是,用户(压力线程)会随机等待 5-15s 发起请求,并发策略是三分之一的并发量请求 /,三分之二的并发量请求 /text。

此外,由于压测脚本是基于函数的编写,所以我们可以轻而易举地实现前置的测试数据准备函数,和后置的测试数据清理函数等等。

Locust 相比于 Jmeter,扩展性更强,可以自定义开发函数,支持 grpc & tcp & udp 等协议的压测,且在相同配置服务器机器资源下,能产生更大的压力,因为线程所消耗的资源,比协程要多得多。

Locust 部署的时候,主要是采取分布式的部署方式,1 个 Master 搭配若干个 Slave,这里 Slave 的数量主要取决于系统的 CPU 核数,比如是 4 核 CPU 的服务器,就可以开启 4 个 Slave 来实现并发。

Locust Master 常用的两个默认端口,与 Slave 通信的 5557 端口,以及 Web 端数据展示的 8089 端口。

什么是 Boomer

Boomer 主要是用 Go 重写了 Locust 的 Slave,而其 Master 还是用 Locust 的 Master 。

那么问题来了,有了 Locust,为什么还要配置 Boomer 呢?

因为 Locust 本身是基于 Python 实现的,受限制于 GIL 锁,虽然并发所占用消耗的资源极少,但是无法稳定维持高并发数量,且在高并发压力下表现一般。

然而用了 Boomer 来实现 Slave,其并发方式,由原来 Python 的 Gevent,变成了 Go 的 Goroutine,大大提高了高并发的质量。

此外,Boomer 还实现了指定 RPS 下的精准压力控制。

代码语言:txt
复制
// Start to refill the bucket periodically.
func (limiter *RampUpRateLimiter) Start() {
    limiter.quitChannel = make(chan bool)
    quitChannel := limiter.quitChannel
    // bucket updater
    go func() {
        for {
            select {
            case <-quitChannel:
                return
            default:
                atomic.StoreInt64(&limiter.currentThreshold, limiter.nextThreshold)
                time.Sleep(limiter.refillPeriod)
                close(limiter.broadcastChannel)
                limiter.broadcastChannel = make(chan bool)
            }
        }
    }()
    // threshold updater
    go func() {
        for {
            select {
            case <-quitChannel:
                return
            default:
                nextValue := limiter.nextThreshold + limiter.rampUpStep
                if nextValue < 0 {
                    // int64 overflow
                    nextValue = int64(math.MaxInt64)
                }
                if nextValue > limiter.maxThreshold {
                    nextValue = limiter.maxThreshold
                }
                atomic.StoreInt64(&limiter.nextThreshold, nextValue)
                time.Sleep(limiter.rampUpPeroid)
            }
        }
    }()
}

只要不设置 rampUpPeroid (协程增加的间隔)、 rampUpStep(每个间隔协程增加多少),只设置 maxThreshold(最大的协程数量),资源充足的情况下马上就能触发这个逻辑。

代码语言:txt
复制
if nextValue > limiter.maxThreshold {
    nextValue = limiter.maxThreshold
}

这样就可以以指定的 RPS 压力来测试系统。

如何编写 Boomer 脚本

代码语言:txt
复制
// 压测 / 接口为一个函数
func fun1() {
	start := time.Now()
	url := URL + '/'
	resp, err := http.Get(url)

	if err != nil {
	log.Println(err)
	return
	}

	defer resp.Body.Close()
	body, _ := ioutil.ReadAll(resp.Body)
	elapsed := time.Since(start)

	if resp.Status == "200 OK" {
	boomer.RecordSuccess("Get", "/", elapsed.Nanoseconds()/int64(time.Millisecond), resp.ContentLength)
	} 
	else {
	boomer.RecordFailure("Get", "/", elapsed.Nanoseconds()/int64(time.Millisecond), resp.Status + string(body))
	}
}

// 压测 /text 接口为一个函数
func fun2() {
	start := time.Now()
	url := URL + '/text'
	resp, err := http.Get(url)

	if err != nil {
	log.Println(err)
	return
	}

	defer resp.Body.Close()
	body, _ := ioutil.ReadAll(resp.Body)
	elapsed := time.Since(start)

	if resp.Status == "200 OK" {
	boomer.RecordSuccess("Get", "/text", elapsed.Nanoseconds()/int64(time.Millisecond), resp.ContentLength)
	} 
	else {
	boomer.RecordFailure("Get", "/text", elapsed.Nanoseconds()/int64(time.Millisecond), resp.Status + string(body))
	}
}
// 配置两个接口的并发占比
func main() {
	flag.StringVar(&URL, "url", "", "url or app:port")
	flag.Parse()

	if targetURL == "" {
		log.Println("Boomer target url is null")
		return
	}

	task1 := &boomer.Task{
		Name:   “fun1”,
		Weight: 10,
		Fn:     fun1,
	}

	task2 := &boomer.Task{
		Name:   “fun2”,
		Weight: 20,
		Fn:     fun2,
	}
	boomer.Run(task1, task2)
}

如何将 Locust Master 的数据持久化

正常情况下,数据展示是在 Locust 自带的 Web 端展示,无法永久保存数据。

Web 端数据示例:

  • 配置压力线程
配置压力线程
配置压力线程
  • TPS曲线
TPS曲线
TPS曲线
  • 具体接口响应耗时
具体接口响应耗时
具体接口响应耗时
  • 各 worker 资源消耗
各 worker 资源消耗
各 worker 资源消耗

我们可以通过配置启动函数,来将数据存储到 Influxdb 或者 Prometheus 中,并通过 Grafana 报表展示。

比如,我们编写个 prometheus_exporter.py 脚本。

代码语言:txt
复制
class LocustCollector(object):
    registry = REGISTRY

    def __init__(self, environment, runner):
        self.environment = environment
        self.runner = runner

    def collect(self):
        # collect metrics only when locust runner is hatching or running.
        runner = self.runner

        if runner and runner.state in (locust_runners.STATE_HATCHING, locust_runners.STATE_RUNNING):
            stats = []
            for s in chain(locust_stats.sort_stats(runner.stats.entries), [runner.stats.total]):
                stats.append({
                    "method": s.method,
                    "name": s.name,
                    "num_requests": s.num_requests,
                    "num_failures": s.num_failures,
                    "avg_response_time": s.avg_response_time,
                    "min_response_time": s.min_response_time or 0,
                    "max_response_time": s.max_response_time,
                    "current_rps": s.current_rps,
                    "median_response_time": s.median_response_time,
                    "ninetieth_response_time": s.get_response_time_percentile(0.9),
                    # only total stats can use current_response_time, so sad.
                    #"current_response_time_percentile_95": s.get_current_response_time_percentile(0.95),
                    "avg_content_length": s.avg_content_length,
                    "current_fail_per_sec": s.current_fail_per_sec
                })

            # perhaps StatsError.parse_error in e.to_dict only works in python slave, take notices!
            errors = [e.to_dict() for e in six.itervalues(runner.stats.errors)]

            metric = Metric('locust_user_count', 'Swarmed users', 'gauge')
            metric.add_sample('locust_user_count', value=runner.user_count, labels={})
            yield metric
            
            metric = Metric('locust_errors', 'Locust requests errors', 'gauge')
            for err in errors:
                metric.add_sample('locust_errors', value=err['occurrences'],
                                  labels={'path': err['name'], 'method': err['method'],
                                          'error': err['error']})
            yield metric

            is_distributed = isinstance(runner, locust_runners.MasterRunner)
            if is_distributed:
                metric = Metric('locust_slave_count', 'Locust number of slaves', 'gauge')
                metric.add_sample('locust_slave_count', value=len(runner.clients.values()), labels={})
                yield metric

            metric = Metric('locust_fail_ratio', 'Locust failure ratio', 'gauge')
            metric.add_sample('locust_fail_ratio', value=runner.stats.total.fail_ratio, labels={})
            yield metric

            metric = Metric('locust_state', 'State of the locust swarm', 'gauge')
            metric.add_sample('locust_state', value=1, labels={'state': runner.state})
            yield metric

            stats_metrics = ['avg_content_length', 'avg_response_time', 'current_rps', 'current_fail_per_sec',
                             'max_response_time', 'ninetieth_response_time', 'median_response_time', 'min_response_time',
                             'num_failures', 'num_requests']

            for mtr in stats_metrics:
                mtype = 'gauge'
                if mtr in ['num_requests', 'num_failures']:
                    mtype = 'counter'
                metric = Metric('locust_stats_' + mtr, 'Locust stats ' + mtr, mtype)
                for stat in stats:
                    # Aggregated stat's method label is None, so name it as Aggregated
                    # locust has changed name Total to Aggregated since 0.12.1
                    if 'Aggregated' != stat['name']:
                        metric.add_sample('locust_stats_' + mtr, value=stat[mtr],
                                          labels={'path': stat['name'], 'method': stat['method']})
                    else:
                        metric.add_sample('locust_stats_' + mtr, value=stat[mtr],
                                          labels={'path': stat['name'], 'method': 'Aggregated'})
                yield metric


@events.init.add_listener
def locust_init(environment, runner, **kwargs):
    print("locust init event received")
    if environment.web_ui and runner:
        @environment.web_ui.app.route("/export/prometheus")
        def prometheus_exporter():
            registry = REGISTRY
            encoder, content_type = exposition.choose_encoder(request.headers.get('Accept'))
            if 'name[]' in request.args:
                registry = REGISTRY.restricted_registry(request.args.get('name[]'))
            body = encoder(registry)
            return Response(body, content_type=content_type)
        REGISTRY.register(LocustCollector(environment, runner))

然后再启动 Locust Master 的时候指定启动脚本 locust --master -f prometheus_exporter.py 即可,压测数据将会存储到 Prometheus 中,并在 Grafana 中展示。

Grafana 报表示例
Grafana 报表示例

在 K8S 中部署压测套件

在业务测试场景中,我们经常会遇到很多的内部组件和或者中间件不对外暴露,仅供内部访问,如果从外部构造压力,无法定位到具体的组件,或者无法对内部单一的组件或中间件进行压测。

而我们将 Locust + Boomer 这个压测套件作为一个内部组件部署在用户的集群系统中,即可从用户集群系统链路的任意环节发起压力。

此时,压测的 URL 不再是对外可访问的地址,而是组件的 SVC 地址,比如 svc_name.namspace:app_port

Master Deployment 示例:

代码语言:txt
复制
---
kind: Deployment
apiVersion: apps/v1
metadata:
  name: locust-master-controller
  namespace: boomer
  labels:
    k8s-app: locust-master
spec:
  selector:
    matchLabels:
      k8s-app: locust-master
  replicas: 1
  template:
    metadata:
      labels:
        k8s-app: locust-master
        name: locust-master
    spec:
      containers:
        - name: locust-master
          image: image/locust-master:latest
          ports:
            - name: loc-master-web
              containerPort: 8089
              protocol: TCP
            - name: loc-master-p1
              containerPort: 5557
              protocol: TCP

---
kind: Service
apiVersion: v1
metadata:
  name: locust-master
  namespace: boomer
spec:
  selector:
    k8s-app: locust-master
  ports:
    - port: 8089
      targetPort: loc-master-web
      protocol: TCP
      name: loc-master-web
    - port: 5557
      targetPort: loc-master-p1
      protocol: TCP
      name: loc-master-p1
  type: ClusterIP

Slave Deployment 示例:

代码语言:txt
复制
---
kind: Deployment
apiVersion: apps/v1
metadata:
  name: locust-slave-controller
  namespace: boomer
  labels:
    k8s-app: locust-slave
spec:
  selector:
    matchLabels:
      k8s-app: locust-slave
  replicas: 3
  template:
    metadata:
      labels:
        k8s-app: locust-slave
        name: locust-slave
    spec:
      containers:
        - name: locust-slave
          image: image/locust-slave:latest
          command: ["./main", "--master-host=locust-master", "--master-port=5557", "--url=svc.namespace:app_port"]

总结与扩展

看到这里,相信对这套性能套件或多或少都有一些了解,它可以基于云原生构建,脚本设计面向自定义函数开发,可以丰富扩展更多的业务场景,添加更多的排查手段或定位工具。且基于 Goroutine 的并发,在相同服务器配置下,比多线程产生的并发数量多得多,大大节省硬件配置的成本。

如果牺牲一些脚本面向函数编程的特性,能否抽取出来,做一些通用的封装,实现 UI 界面编辑压测脚本呢?

开源的 k6 压测项目,给我了一些灵感。

k6 底层也是基于 Go 的 Goroutine 方式实现并发,但是脚本却是用简单的 Js 脚本设计的,比如:

代码语言:txt
复制
import http from 'k6/http';
import { sleep } from 'k6';
export default function () {
  http.get('http://test.k6.io');
  sleep(1);
}

这样就是一个压测脚本,打开了 Js 到 Go 的编译通道。如果我们从前端获取的数据能编译成 Js 脚本,是否就意味着实现了 UI 界面编辑压测脚本呢?

再或者,就算不使用 k6 的方案,如果能够根据前端提供的数据自动编译出一个 Go 的 Slave 容器,注册在集群中,是否也意味着实现了 UI 界面编辑压测脚本呢?

这里给了我们更多的探索空间,值得思考。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 什么是 Locust
  • 什么是 Boomer
  • 如何编写 Boomer 脚本
  • 如何将 Locust Master 的数据持久化
  • 在 K8S 中部署压测套件
  • 总结与扩展
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档