前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Python之并发请求(下)

Python之并发请求(下)

作者头像
无涯WuYa
发布2020-11-12 15:11:28
9500
发布2020-11-12 15:11:28
举报
文章被收录于专栏:Python自动化测试

Python之并发请求(上)中详细了介绍了使用多线程的方式来编写一个测试服务端程序的高并发请求的性能测试工具。在这个测试的工具中,依据高并发的请求之后,我们得到很全面的响应时间,吞吐量,错误率,以及其他的相关信息。

在性能的测试中,更多的是CPU密集型和IO密集型的,基本很多服务端的程序都是基于IO密集型的,那么这样使用多线程的方式它的效率会更高。我们对前面的代码进行改造,既然我们已经可以很轻松的拿到了我们性能测试的结果信息数据,那么我们是否基于结合Flask-Restful的框架,把它提供成一个API,这样别人直接调用我们的API就可以测试被测试的服务端的程序了,这样的方式更加高效和简单。当然,这只是一种思想,在服务端的测试中,我们需要更多考虑的是服务在高并发下以及连续请求的前提下,是否会出现OOM以及SockedTimeOut以及TimeOut等其他的程序信息,如MQ的消息积压,服务的崩溃以及其他的异常情况。本文章这里就不详细的讨论服务的监控,关于saas化下服务的监控可以参考我的文章saas化架构中服务的监控实现思路和具体的代码。

下面我们对具体的被测试的API进行封装,这里就以测试淘宝首页为案例,我们的目的是我们在PostMan的测试工具中,我只需要输入并发数以及被测试的地址(这里是淘宝),点击发送请求后,就可以得到响应时间等其他的性能测试数据,具体源码如下:

代码语言:javascript
复制
#!/usr/bin/env python
#!coding:utf-8
from flask import  Flask,make_response,jsonify,abort,request
from flask_restful import  Api,Resource
from flask import Flask


import  requests
import time
import  matplotlib.pyplot as plt
from threading import  Thread
import  datetime
import  numpy as np
import  json
import re
import hashlib
from urllib import parse
import datetime


app=Flask(__name__)
api=Api(app=app)

def getHeaders(url=None,caller='ucenter-interface-service',secret='6A3012039B5746CEA350B119535C45E0'):
   '''
   :param pathurl: 请求地址
   :param caller: 请求头字段caller
   :param secret: 请求头字段secret
   :return:对请求地址进行拆分后,然后拼接进行md5的加密,属于请求头
   '''
   result1 = re.compile(r'\/([^\/]*)\/(v\d+|\d\.\d)\/(.+)')
   result2 = result1.search(url)
   contextPath = result2[1]
   version = result2[2]
   requestPath = result2[3]

   if requestPath.endswith('/'):
      requestPath = requestPath[:-1]

   tempArray = requestPath.split("/")
   requestPath = ""
   i = 0
   for str in tempArray:
      # print(i)
      if tempArray[i] != "":
         # print(parse.quote(tempArray[i]))
         # print(tempArray[i])
         requestPath = requestPath + "/" + parse.quote(tempArray[i])
      i = i + 1
   nowTime = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
   singStr= secret + "callerService" + caller + "contextPath" + contextPath + "requestPath" + requestPath + "timestamp" + nowTime + "v" + version + secret
   sign=hashlib.md5(singStr.encode(encoding='utf-8')).hexdigest().upper()
   dict1={
      'X-Caller-Service':'ucenter-interface-service',
      'X-Caller-Timestamp':nowTime,
      'appSecret':'kOsqkHX2QcB7KVzY03NmqbafIjSnCneG',
      'Content-Type':'application/json',
      'X-Caller-Sign':sign,}
   return dict1

class OlapThread(Thread):
   def __init__(self,func,args=()):
      '''
      :param func: 被测试的函数
      :param args: 被测试的函数的返回值
      '''
      super(OlapThread,self).__init__()
      self.func=func
      self.args=args

   def run(self) -> None:
      self.result=self.func(*self.args)

   def getResult(self):
      try:
         return self.result
      except BaseException as e:
         return e.args[0]



def taoBao(code,seconds,text,requestUrl):
   '''
   高并发亲求淘宝
   :param code:
   :param seconds:
   :param text:
   :param requestUrl: 请求地址
   :return:
   '''
   r=requests.get(url=requestUrl)
   print('输出信息昨状态码:{0},响应结果:{1}'.format(r.status_code,r.text))
   code=r.status_code
   seconds=r.elapsed.total_seconds()
   text=r.text
   return code,seconds,text


def calculationTime(startTime,endTime):
   '''计算两个时间之差,单位是秒'''
   return (endTime-startTime).seconds

def getResult(seconds):
   '''获取服务端的响应时间信息'''
   data={
      'Max':sorted(seconds)[-1],
      'Min':sorted(seconds)[0],
      'Median':np.median(seconds),
      '99%Line':np.percentile(seconds,99),
      '95%Line':np.percentile(seconds,95),
      '90%Line':np.percentile(seconds,90)
   }
   return data

def show(i,j):
   '''
   :param i: 请求总数
   :param j: 请求响应时间列表
   :return:
   '''
   fig,ax=plt.subplots()
   ax.plot(list_count,seconds)
   ax.set(xlabel='number of times', ylabel='Request time-consuming',
          title='olap continuous request response time (seconds)')
   ax.grid()
   fig.savefig('olap.png')
   plt.show()

def highConcurrent(count,requestUrl):
   '''
   对服务端发送高并发的请求
   :param count: 并发数
   :param requestData:请求参数
   :param requestUrl: 请求地址
   :return:
   '''
   startTime=datetime.datetime.now()
   sum=0
   list_count=list()
   tasks=list()
   results = list()
   #失败的信息
   fails=[]
   #成功任务数
   success=[]
   codes = list()
   seconds = list()
   texts=[]

   for i in range(0,count):
      t=OlapThread(taoBao,args=(i,i,i,requestUrl))
      tasks.append(t)
      t.start()
      print('测试中:{0}'.format(i))

   for t in tasks:
      t.join()
      if t.getResult()[0]!=200:
         fails.append(t.getResult())
      results.append(t.getResult())

   for item in fails:
      print('请求失败的信息:\n',item[2])
   endTime=datetime.datetime.now()
   for item in results:
      codes.append(item[0])
      seconds.append(item[1])
      texts.append(item[2])
   for i in range(len(codes)):
      list_count.append(i)

   #生成可视化的趋势图
   fig,ax=plt.subplots()
   ax.plot(list_count,seconds)
   ax.set(xlabel='number of times', ylabel='Request time-consuming',
          title='taobao continuous request response time (seconds)')
   ax.grid()
   fig.savefig('taobao.png')
   plt.show()

   for i in seconds:
      sum+=i
   rate=sum/len(list_count)
   # print('\n总共持续时间:\n',endTime-startTime)
   totalTime=calculationTime(startTime=startTime,endTime=endTime)
   if totalTime<1:
      totalTime=1
   #吞吐量的计算
   try:
      throughput=int(len(list_count)/totalTime)
   except Exception as e:
      print(e.args[0])
   getResult(seconds=seconds)
   errorRate=0
   if len(fails)==0:
      errorRate=0.00
   else:
      errorRate=len(fails)/len(tasks)*100
   throughput=str(throughput)+'/S'
   timeData=getResult(seconds=seconds)
   # print('总耗时时间:',(endTime-startTime))
   timeConsuming=(endTime-startTime)
   return timeConsuming,throughput,rate,timeData,errorRate,len(list_count),len(fails)

class Index(Resource):
   def get(self):
      return {'status':0,'msg':'ok','datas':[]}

   def post(self):
      if not request.json:
         return jsonify({'status':1001,'msg':'请求参数不是JSON的数据,请检查,谢谢!'})
      else:
         try:
            data={
               'count':request.json.get('count'),
               'requestUrl':request.json.get('requestUrl')
            }
            timeConsuming,throughput,rate,timeData,errorRate,sum,fails=highConcurrent(
               count=data['count'],
               requestUrl=data['requestUrl'])
            print('执行总耗时:',timeConsuming)
            return  jsonify({'status':0,'msg': '请求成功','datas':[{
               '吞吐量':throughput,
               '平均响应时间':rate,
               '响应时间信息':timeData,
               '错误率':errorRate,
               '请求总数':sum,
               '失败数':fails
            }]}, 200)
         except Exception as e:
            return e.args[0]



api.add_resource(Index,'/v1/index')

if __name__ == '__main__':
   app.run(debug=True,port=5003,host='0.0.0.0')
# print(highConcurrent(count=5))

在PostMan中进行调用,如下图所示:

点击Sending后,在PostMan中显示的结果信息如下:

每次请求响应时间可视化的趋势图如下所示:

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-11-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Python自动化测试 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
测试服务
测试服务 WeTest 包括标准兼容测试、专家兼容测试、手游安全测试、远程调试等多款产品,服务于海量腾讯精品游戏,涵盖兼容测试、压力测试、性能测试、安全测试、远程调试等多个方向,立体化安全防护体系,保卫您的信息安全。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档