前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >数据库里查询到最大的手机号,在这个基础上加1进行登录

数据库里查询到最大的手机号,在这个基础上加1进行登录

作者头像
清菡
发布2020-12-02 15:47:53
7100
发布2020-12-02 15:47:53
举报
文章被收录于专栏:清菡软件测试

logging

代码语言:javascript
复制
import logging#python自带的
'''
调用logging:日志按照你的想法去输出的话,必须经过两道门槛。
1.logger是收集日志。debug info error,指定一个级别,就会输出指定级别的日志。
2.handdler是输出日志的渠道。输出到指定的文件,还是控制台,还是默认到控制台。

因为我们收集日志和输出渠道是一一对应的。如果没有给这个logger这个收集器指定的渠道的话,
默认到控制台。
代码里设置的,不指定的话就默认到控制台,指定的话,就输出到你指定的地方。
日志收集器取得名字叫root。root是我们不指定,它就自己创建一个这样的收集器,名字叫root。
logging就是默认的root收集器,默认到控制台。

源码指出WARNING或者WARNING级别以上的才可输出,debug和info级别太低。
'''
# logging.debug('今天天气很好')
# logging.info('lagom666')
# logging.warning('kevin')
# logging.error('Monica')
# logging.critical('清菡  还是那么不优秀')
#注意:千万不要以logging这样的内置库命名,不然就会报错。



#定义一个日志收集器 my_logger
my_logger=logging.getLogger('python2020')#调用getLogger定义一个日志收集器


#设定级别,setLevel
my_logger.setLevel('DEBUG')

#formatter设置日志最终输出格式
formatter=logging.Formatter('%(asctime)s-%(levelname)s-%(filename)s-%(name)s-日志信息:%(message)s')

#创建一个我们自己的输出渠道,StreamHandler是默认把它输出到控制台,但是它可以设置级别。
ch=logging.StreamHandler()
ch.setLevel('ERROR')
ch.setFormatter(formatter)

#输出到文本渠道,指定一个文件
fh=logging.FileHandler('py11.txt',encoding='UTF-8')
fh.setLevel('DEBUG')
fh.setFormatter(formatter)

#两者对接--指定输出渠道
my_logger.addHandler(ch)
my_logger.addHandler(fh)



#收集日志
my_logger.debug("python2020年学习python")
my_logger.error('python2020年,一定要挺过去!')

#收集如果不设置级别,就默认从warning或warning以上去收集。输出如果不设置级别,
# 就默认从warning或warning以上去输出。
# getLogger和StreamHandler都可以调用setLevel函数帮它们设置级别。

formatter常用信息

formatter=logging.Formatter('%(asctime)s-%(levelname)s-%(filename)s-%(name)s-日志信息:%(message)s')

数据库相关知识

Python中使用数据库,用法简介

代码语言:javascript
复制
# 安装 mysql 驱动 pip install mysql-connector

import mysql.connector
from tools import project_path2
from tools.read_config2 import ReadConfig

class DoMysql:
        def do_msql(self,query_sql,state='all'):#query_sql--->查询语句,state---all多条  1 一条
                # 利用这个类从配置文件里读取db info
                db_config=eval(ReadConfig().get_config(project_path2.case_config_path,'DB','db_config'))
                # 创建一个数据库连接
                cnn=mysql.connector.connect(**db_config)
                cnn=mysql.connector.connect(**db_config)

                #游标 cursor
                cursor=cnn.cursor()

                #写sql语句--字符串
                # query_sql='select mobilephone from people where id<5622'

                #执行语句
                cursor.execute(query_sql)

                #获取结果 打印结果
                if state==1:
                    res=cursor.fetchone()#针对一条数据用fetchone,返回元组类型的数据
                else:
                    res=cursor.fetchall()#针对多行数据,返回列表嵌套元组类型的数据
                # print(res)
                # print(int(res[0])+1)
                #关闭游标
                cursor.close()
                #关闭连接
                cnn.close

                return res


if __name__ =='__main__':
        from tools.get_data2 import GetData
        query_sql = 'select max(id) from loans where MemberID={0}'.format(getattr(GetData, 'loan_member_id'))
        # query_sql='select max(mobilephone) from people where mobilephone like "132%"'
                #这种模糊查询,只查询132字段开头的最大的手机号
        res = DoMysql().do_msql(query_sql)#返回列表嵌套元组
        print(res)
        # print(res[0][0])#取手机号码
        # res=DoMysql().do_msql(query_sql,1)#返回的就是个元组
        # print(res[0])

第二种方法

每次从数据库里查询到最大的手机号,在这个基础上加1。

logging引入日志模块。

测试目标

1.添加投资,充值,提现,加标等业务流程。

2.测试用例增加数据库校验。

  • 业务逻辑:后台添加数据 加标-审核-发标

由于小编没有p2p的接口,随便找的接口模拟的,所以没跑通,准备留下代码,工作中再实践,以下代码具有可质疑性,如有代码问题,请和小编联系。

来自文件case2.config

来自文件test_data_xiejinjieguo_shoujihao2.xlsx

来自文件do_excel_shoujihao2.py

代码语言:javascript
复制
from openpyxl import load_workbook
from tools.read_config2 import ReadConfig
from tools import project_path2
from tools.get_data2 import GetData

class DoExcel:
    @classmethod
    def get_data(cls,file_name):
        wb = load_workbook(file_name)
        mode=eval(ReadConfig.get_config(project_path2.case_config_path, 'MODE', 'mode'))

        tel=getattr(GetData,'NoRegTel')#利用反射拿到数据

#利用python查询数据库的方式,来拿到最大的手机号--这里可以加,也可以放到get_data里面

        test_data = []#把字典里所有数据都拿到
        for key in mode:#遍历这个存在配置文件里的字典
            sheet = wb[key]#key是表单名
            if mode[key]=='all':
                for i in range(2,sheet.max_row+1):#
                    row_data={}#字典
                    row_data['case_id']= sheet.cell(i, 1).value  #行号 第1列第2行
                    row_data['url']=sheet.cell(i,2).value
                    # row_data['data'] = sheet.cell(i, 3).value
                    if sheet.cell(i,3).value.find('${tel}')!=-1:#有找到这个${tel}
                         row_data['data']=sheet.cell(i,3).value.replace('${tel}',str(tel))#替换的时候只能用字符串去替换
                         tel = tel + 1#每次完成tel的调用后就加1
                    elif sheet.cell(i,3).value.find('${admin_tel}')!=-1:
                        row_data['data'] = sheet.cell(i, 3).value.replace('${admin_tel}', str(getattr(GetData,'admin_tel')))
                    elif sheet.cell(i,3).value.find('${loan_member_id}')!=-1:
                        row_data['data'] = sheet.cell(i, 3).value.replace('${loan_member_id}', str(getattr(GetData,'loan_member_id')))
                    elif sheet.cell(i, 3).value.find('${normal_tel}') != -1:
                        row_data['data'] = sheet.cell(i, 3).value.replace('${normal_tel}',str(getattr(GetData, 'normal_tel')))
                    elif sheet.cell(i, 3).value.find('${memberID}') != -1:
                        row_data['data'] = sheet.cell(i, 3).value.replace('${memberID }',str(getattr(GetData, 'memberID')))
                    else:#如果没有找到的话

                        row_data['data'] = sheet.cell(i, 3).value
                    row_data['title'] = sheet.cell(i, 4).value
                    row_data['http_method'] = sheet.cell(i,5).value
                    row_data['expected'] = sheet.cell(i, 6).value#添加了一个期望值到测试数据里面去
                    row_data['sheet_name'] = key
                    test_data.append(row_data)
                    cls.update_tel(tel,file_name,'init')#更新手机号,针对Excel的操作。
#这里也是可以优化的?什么时候对手机号进行更新?更新的手机号是进行加1,还是加2?还是加3?
            else:
                for case_id in mode[key]:#
                    row_data = {}  # 字典
                    row_data['case_id'] = sheet.cell(case_id+1, 1).value  # 行号 第1列第2行
                    row_data['url'] = sheet.cell(case_id+1, 2).value
                    #做手机号的替换
                    if sheet.cell(case_id+1, 3).value.find('${tel}') != -1:
                        row_data['data'] = sheet.cell(case_id+1, 3).value.replace('${tel}', str(tel))
                        tel+=1#有找到这个${tel}
                    elif sheet.cell(case_id+1, 3).value.find('${admin_tel}') != -1:
                        row_data['data'] = sheet.cell(case_id+1, 3).value.replace('${admin_tel}',
                                                                          str(getattr(GetData, 'admin_tel')))
                    elif sheet.cell(case_id+1, 3).value.find('${loan_member_id}') != -1:
                        row_data['data'] = sheet.cell(case_id+1, 3).value.replace('${loan_member_id}',
                                                                          str(getattr(GetData, 'loan_member_id')))
                    elif sheet.cell(case_id+1, 3).value.find('${normal_tel}') != -1:
                        row_data['data'] = sheet.cell(case_id+1, 3).value.replace('${normal_tel}',
                                                                          str(getattr(GetData, 'normal_tel')))
                    elif sheet.cell(case_id+1, 3).value.find('${memberID}') != -1:
                        row_data['data'] = sheet.cell(case_id+1, 3).value.replace('${memberID }', str(getattr(GetData, 'memberID')))

                    else:  # 如果没有找到的话
                        row_data['data'] = sheet.cell(i, 3).value

                    row_data['title'] = sheet.cell(case_id+1, 4).value
                    row_data['http_method'] = sheet.cell(case_id+1,5).value
                    row_data['expected'] = sheet.cell(case_id+1, 6).value  # 添加了一个期望值到测试数据里面去
                    row_data['sheet_name']=key
                    test_data.append(row_data)
                    cls.update_tel(tel + 2, file_name, 'init')  # 更新手机号,改造理由同上


        return test_data

    @staticmethod
#把结果写进Excel
    def write_back(file_name,sheet_name,i,result,Testresult):#专门写回数据
        wb=load_workbook(file_name)
        sheet=wb[sheet_name]
        sheet.cell(i,7).value=result
        sheet.cell(i,8).value=Testresult
        wb.save(file_name)#保存结果

    @classmethod
    def update_tel(cls,tel,filename,sheet_name):
        #函数更新Excel里面手机号的数据
        wb=load_workbook(filename)
        sheet=wb[sheet_name]
        sheet.cell(2,1).value=tel#赋值操作
        wb.save(filename)


if __name__ =='__main__':

    test_data=DoExcel().get_data(project_path2.test_case_path)
    print(test_data)

来自文件do_mysql.py

代码语言:javascript
复制

# 安装 mysql 驱动 pip install mysql-connector

import mysql.connector
from tools import project_path2
from tools.read_config2 import ReadConfig

class DoMysql:
        def do_msql(self,query_sql,state='all'):#query_sql--->查询语句,state---all多条  1 一条
                # 利用这个类从配置文件里读取db info
                db_config=eval(ReadConfig().get_config(project_path2.case_config_path,'DB','db_config'))
                # 创建一个数据库连接
                cnn=mysql.connector.connect(**db_config)
                cnn=mysql.connector.connect(**db_config)

                #游标 cursor
                cursor=cnn.cursor()

                #写sql语句--字符串
                # query_sql='select mobilephone from people where id<5622'

                #执行语句
                cursor.execute(query_sql)

                #获取结果 打印结果
                if state==1:
                    res=cursor.fetchone()#针对一条数据用fetchone,返回元组类型的数据
                else:
                    res=cursor.fetchall()#针对多行数据,返回列表嵌套元组类型的数据
                # print(res)
                # print(int(res[0])+1)
                #关闭游标
                cursor.close()
                #关闭连接
                cnn.close

                return res


if __name__ =='__main__':
        from tools.get_data2 import GetData
        query_sql = 'select max(id) from loans where MemberID={0}'.format(getattr(GetData, 'loan_member_id'))
        # query_sql='select max(mobilephone) from people where mobilephone like "132%"'
                #这种模糊查询,只查询132字段开头的最大的手机号
        res = DoMysql().do_msql(query_sql)#返回列表嵌套元组
        print(res)
        # print(res[0][0])#取手机号码
        # res=DoMysql().do_msql(query_sql,1)#返回的就是个元组
        # print(res[0])

来自文件get_data2.py

代码语言:javascript
复制
#反射:不管何时何地,只要传入一个类名,它就可以帮你操作。
from tools import project_path2
import pandas as pd
from tools.read_config2 import ReadConfig
class GetData:
    Cookie=None
    loan_id=None
    check_list=eval(ReadConfig().get_config(project_path2.case_config_path,'CHECKLEAVEAMOUNT','check_list'))
    NoRegTel = pd.read_excel(project_path2.test_case_path, sheet_name='init').iloc[0, 0]  # 从Excel里面拿到的数据
    # 参照init表单查看我们这个变量的用处
    normal_tel=pd.read_excel(project_path2.test_case_path, sheet_name='init').iloc[1,0]
    admin_tel=pd.read_excel(project_path2.test_case_path, sheet_name='init').iloc[2,0]
    loan_member_id=pd.read_excel(project_path2.test_case_path,sheet_name='init').iloc[3,0]
    memberID = pd.read_excel(project_path2.test_case_path, sheet_name='init').iloc[4, 0]
    # 利用pandas从Excel里面拿到的手机号

print(GetData.admin_tel)

来自文件http_request2.py

代码语言:javascript
复制
import requests
from tools.my_log2 import MyLog
my_logger=MyLog()


class HttpRequest:
    @staticmethod
    def http_request(url,data,http_method,cookie=None):
        try :
            if http_method.upper()=='POST':
                    res=requests.post(url,json=data,cookies=cookie)
            # elif http_method.upper()=='POST':
            #         res=requests.post(url,data,cookies=cookie)
            else:
                my_logger.info("输入的请求方法不对")
        except Exception as e:
            my_logger.error("请求报错了:{0}".format(e))
            raise e
        return res#返回结果

if __name__ =='__main__':
        #注册
        register_url= 'http://api.nnzhp.cn/api/user/add_stu'
        register_data= { "name":"niuhanyang","grade":"天蝎座","phone":'18614711314'}

        #登录
        login_url= 'http://api.nnzhp.cn/api/user/login'
        login_data={"username": "niuhanyang", "passwd": 'aA123456'}


        # 充值
        recharge_url= 'http://api.nnzhp.cn/api/user/gold_add'
        recharge_data={"stu_id": "2170", "gold": '1'}

        login_res=HttpRequest().http_request(login_url,login_data,'post')
        recharge_res=HttpRequest().http_request(recharge_url,recharge_data,'post',login_res.cookies)
        print("充值结果:{}".format(recharge_res.json()))

来自文件my_log2.py

代码语言:javascript
复制


# 学习logging:引入日志模块

import logging#python自带的
class MyLog:

    def my_log(self,msg,level):
        #定义一个日志收集器 my_logger
        my_logger=logging.getLogger('python2020')#调用getLogger定义一个日志收集器


        #设定级别,setLevel
        my_logger.setLevel('DEBUG')

        #formatter设置日志最终输出格式
        formatter=logging.Formatter('%(asctime)s-%(levelname)s-%(filename)s-%(name)s-日志信息:%(message)s')

        #创建一个我们自己的输出渠道,StreamHandler是默认把它输出到控制台,但是它可以设置级别。
        ch=logging.StreamHandler()
        ch.setLevel('ERROR')
        ch.setFormatter(formatter)

        #输出到文本渠道,指定一个文件
        fh=logging.FileHandler('py11.txt',encoding='UTF-8')
        fh.setLevel('DEBUG')
        fh.setFormatter(formatter)

        #两者对接--指定输出渠道
        my_logger.addHandler(ch)
        my_logger.addHandler(fh)



        #收集日志
        if level=='DEBUG':
            my_logger.debug(msg)
        elif level=='INFO':
            my_logger.info(msg)
        elif level=='WARNING':
            my_logger.warning(msg)
        elif level=='ERROR':
            my_logger.error(msg)
        elif level=='CRITICAL':
            my_logger.critical(msg)

        #关闭渠道
        my_logger.removeHandler(ch)
        my_logger.removeHandler(fh)

    def debug(self,msg):
        self.my_log(msg,'DEBUG')

    def info(self,msg):
        self.my_log(msg,'INFO')

    def error(self,msg):
        self.my_log(msg,'ERROR')

if __name__ == '__main__':
    pass

来自文件project_path2.py

代码语言:javascript
复制
#路径的可配置
import os
'''专门来读取路径的值'''
project_path=os.path.split(os.path.split(os.path.realpath(__file__))[0])[0]
#对路径进行了切割,返回了这样一个元组
# path=os.path.realpath(__file__)

#测试用例的路径
test_case_path=os.path.join(project_path,'test_data','test_data_xiejinjieguo_shoujihao2.xlsx')
#这就是绝对路径,顶级目录变,它就跟着变

#测试报告的路径
test_report_path=os.path.join(project_path,'test_result','html_report','test_api.html')

#配置文件的路径
case_config_path=os.path.join(project_path,'conf','case2.config')
# print(case_config_path)

#前提是test_data的上一级目录打开,并执行run文件。这种办法,文件在哪里执行都行。
#某个时间,相对路径很好用,但是如果参照物变了就不行了。绝对路径,换台电脑就不行了。

来自文件read_config2.py

代码语言:javascript
复制
import configparser

class ReadConfig:
    @staticmethod
    def get_config(file_path,section,option):
        cf=configparser.ConfigParser()
        cf.read(file_path)
        return cf[section][option]

if __name__ == '__main__':
    from tools import project_path2
    print(ReadConfig.get_config(project_path2.case_config_path, 'MODE', 'mode'))

来自文件test_http_request_shoujihao2.py

代码语言:javascript
复制

import unittest
from tools.project_path2 import *
from tools.http_request2 import HttpRequest
from ddt import ddt,data
from tools.do_excel_shoujihao2 import DoExcel
from tools.my_log2 import MyLog
from tools.do_mysql import DoMysql
from tools.get_data2 import GetData
test_data=DoExcel.get_data(test_case_path)#执行所有的用例
my_logger=MyLog()

@ddt#装饰测试类
class TestHttpRequest(unittest.TestCase):
    def setUp(self):
       pass

    @data(*test_data)
    def test_api(self,item):
        #请求之前完成load_id的替换
        my_logger.info('开始执行用例{0}:{1}'.format(item['case_id'],item['title']))
        # 方式1
        if item['data'].find('${loan_id')!=-1:
            if hasattr(GetData,'load_id')==None:
                query_sql='select max(id) from loans where MemberID={0}'.format(getattr(GetData,'loan_member_id'))
                loan_id=DoMysql().do_msql(query_sql)[0][0]
                item['data']=item['data'].replace('${loan_id}',str(loan_id))
                setattr(GetData,'loan_id',loan_id)#利用这个去做反射去存储结果
                my_logger.info(loan_id)
            else:
                my_logger.info(getattr(GetData,'loan_id'))
                item['data'] = item['data'].replace('${loan_id}', str(getattr(GetData,'loan_id')))

        my_logger.info('获取到的请求结果是:{0}'.format(item['data']))
        my_logger('-----开始http----接口请求')

        if item['sheet_name'] in getattr(GetData,'check_list'):
            # 请求之前查询余额
            #查询数据库
            query_sql='select LeaveAmount from member where mobilephone={0}'.format(eval(item['data'])['mobilephone'])
            Before_Amount=DoMysql().do_msql(query_sql,1)[0]

            res=HttpRequest.http_request(item['url'],eval(item['data']),item['http_method'],getattr(GetData,'Cookie'))
             #请求之后校验余额是否正确
            After_Amount = DoMysql().do_msql(query_sql, 1)[0]

            if abs(Before_Amount-After_Amount)==eval(item['data'])['amount']:
                check_res='金额正确'
            else:
                check_res = '金额不对'
#1.并不是所有的用例都能这么做。是否做数据库检查?可否放到excel里面去呢?
#2.怎么把检查结果写入excel里面去?
#3.拓展:如果要检查多个语句怎么办?

        else:
             res = HttpRequest.http_request(item['url'], eval(item['data']), item['http_method'], getattr(GetData, 'Cookie'))

        if res.cookies:#利用反射存储cookie值
            setattr(GetData,'Cookie',res.cookies)
        try:
            self.assertEqual(item['expected'],res.json()['error_code'])#做断言,是拿实际结果和期望结果去比对,判断用例通不通过,不加断言,根本没有期望结果,用例都是通过的。
                             #预期结果
            TestResult='PASS'
        except AssertionError as e:
            TestResult='Failed'
            my_logger.info('执行用例出错:{0}'.format(e))
            raise e
        finally:#加finally,不管用例有没有执行通过,它里面的代码是一定会执行的。
            DoExcel.write_back(test_case_path,item['sheet_name'],item['case_id']+1,str(res.json()),TestResult)
            my_logger.error('获取到的结果是:{0}'.format(res.json()))  # 打印结果



    def tearDown(self):
        pass

来自文件run2.py

代码语言:javascript
复制
#方法二
#通过配置文件去决定执行哪个模块的用例。通过配置文件,以字典的形式key去存它的表单,
# value去存它执行所有用例还是些其它的用例
import unittest#引入单元测试
import HTMLTestRunner#HTML测试报告
from tools.project_path2 import *
from tools.test_http_request_shoujihao2 import TestHttpRequest


suite=unittest.TestSuite()
# suite.addTest(TestHttpRequest('test_api'))#测试类的实例
loader=unittest.TestLoader()
suite.addTest(loader.loadTestsFromTestCase(TestHttpRequest))


with open(test_report_path,'wb') as file:
    #执行用例
    runner=HTMLTestRunner.HTMLTestRunner(stream=file,
                                         title='第一阶段项目实战',
                                         description='这个是单元测试报告1115',
                                         tester='清菡')
    runner.run(suite)

我的代码结构如下:

运行run2.py文件,可输出结果。正确结果可根据实际项目,我就不附图了。

知识补充

代码语言:javascript
复制
# s='hello'
# print(s.find('h'))#find是字符串里面的一个方法,寻找字符串里面的子字符或者子字符串
#找到的话,返回的是字符所在的索引
# 找不到的话,返回的是-1

# if s.find('9'):#if后面非空非零,成立表达式都为True,只要是True,if下面的表达式都执行
# new_s=s.replace('o','kevin')
# print(new_s)

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-02-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 清菡软件测试 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • logging
  • formatter常用信息
  • 数据库相关知识
    • Python中使用数据库,用法简介
    • 第二种方法
      • 测试目标
      • 知识补充
      相关产品与服务
      数据库
      云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档