Locust(俗称 蝗虫)一个轻量级的开源压测工具,基本功能是用Python代码描述所有测试。不需要笨拙的UI或庞大的XML,只需简单的代码即可。
Locust支持Python 2.7, 3.4, 3.5, and 3.6的版本,小编的环境是python3.6直接用pip安装就行
安装命令:pip install locustio
官方文档
Locust Documentationdocs.locust.io
from locust import HttpLocust, TaskSet, task
class Testlocust(TaskSet):
def on_start(self):
print("start")
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36'
}
@task(1)
def baidu_demo(self):
r = self.client.get("/", headers=self.headers, verify=False)
print(r.status_code)
assert r.status_code == 200
class WebsiteUser(HttpLocust):
task_set = Testlocust
min_wait = 1500
max_wait = 5000
if __name__ == "__main__":
import os
os.system("locust -f locust4.py --host=https://www.baidu.com")
web模式启动:os.system("locust -f locust4.py --host=https://www.baidu.com")
启动成功:
在浏览器中输入:http://localhost:8089/ 出现如下图说明启动成功
测试结果:
no-web模式启动:os.system("locust -f locust4.py --host=https://www.baidu.com --no-web --csv=example -c 100 -r 10 -t 10s")
测试结果:
开始第二个实例压kafka
import time
from locust import TaskSet, task, Locust, events
from kafka import KafkaProducer
import json
class UserBehavior(TaskSet):
def on_start(self):
self.producer = KafkaProducer(bootstrap_servers=['x.x.x.x:9092'])
def on_stop(self):
# 该方法当程序结束时每用户进行调用,关闭连接
self.producer.close()
@task(1)
def sendAddCmd(self):
start_time = time.time()
time_unique_id = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime())
print(time_unique_id)
print("===========================",start_time)
try:
timestamp = int(time.time())
message = {
'timestamp': timestamp,
'message': "121314"
}
msg = json.dumps(message)
msg = msg.encode('utf-8')
self.producer.send('preHandleTopic', msg)
except Exception as e:
total_time = int((time.time() - start_time) * 1000)
events.request_failure.fire(request_type="kafka", name="add", response_time=total_time,
response_length=0, exception=e)
else:
total_time = int((time.time() - start_time) * 1000)
events.request_success.fire(request_type="kafka", name="add", response_time=total_time,
response_length=0)
class SocketUser(Locust):
task_set = UserBehavior
min_wait = 1000 # 单位毫秒
max_wait = 1000 # 单位毫秒
if __name__ == '__main__':
import os
os.system("locust -f SendKafka.py --host=x.x.x.x:9092"
启动方式跟实例一相同
开始第三个实例压tcp
from locust import HttpLocust, TaskSet, task
import socket # 导入 socket 模块
host = socket.gethostname() # 获取本地主机名
port = 8888 # 设置端口号
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
class UserBehaviora(TaskSet):
def on_start(self):
print("start")
@task(1)
def bky_index(self):
sock.send(
b'121314')
re_mes = sock.recv(1024).decode()
print(re_mes)
class WebsiteUser(HttpLocust):
task_set = UserBehaviora
min_wait = 1500
max_wait = 5000
if __name__ == "__main__":
import os
os.system("locust -f locust6.py --host=x.x.x.x:xxxx")
启动方式跟实例一相同
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值
本文分享自 自动化测试 To share 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体分享计划 ,欢迎热爱写作的你一起参与!