前情回顾
上一篇文章已经编写了http请求的基本类方法封装,那么本章节我们来继续编写使用mysql查询后的拼接数据发送POST请求。
实战任务
本次因为服务架构重构,表优化、重构,带来的任务就是需要从原来的mysql数据库中,读取原表数据(部分存在多张关联查询)然后通过调用API的服务方式灌入新的数据库表中(包含mysql、mongodb)。
执行流程如下
那么根据流程所需要的功能,需要以下的实例进行支撑: 1.并发实例 2.查询数据实例 3.执行post请求实例
目标:编写Http执行POST请求的基本类方法
编写test03.py查询mysql相关对应字段数据
# -*- coding: utf-8 -*-
from tools.MysqlTools import MysqldbHelper
import pymysql
from tools.PostTools import PostHelper
if __name__ == "__main__":
# 定义数据库访问参数
config = {
'host': '******#注释',
'port': 3361,
'user': 'root',
'passwd': '******#注释',
'charset': 'utf8',
'cursorclass': pymysql.cursors.DictCursor
}
# 初始化打开数据库连接
mydb = MysqldbHelper(config)
# 选择数据库
DB_NAME = '******#注释'
# mydb.createDataBase(DB_NAME)
# 选择数据库
print "========= 选择数据库%s ===========" % DB_NAME
mydb.selectDataBase(DB_NAME)
#选择表
TABLE_NAME = '******#注释'
# 数据查询
print "========= 数据查询 ==========="
select_fields = [
"id",
******#注释
"1",
"1",
"null",
"NOW()",
"last_sync_time",
]
select_order = "order by id desc limit 1"
select_result = mydb.select(TABLE_NAME, fields=select_fields,order=select_order)
# 拆分打印的字段
for row in select_result:
print "id=", row['id']
print "user_id=", row['user_id']
******#注释
print "1=", row['1']
print "1=", row['1']
print "null=", None # 注意:查询null的需要写为None
print "NOW()=", row['NOW()']
print "last_sync_time=", row['last_sync_time']
print
执行后的结果如下:
========= 数据查询 ===========
id= 1066511830261694464
user_id= ******#注释
user_code= None
******#注释
null= None
NOW()= 2018-11-27 11:20:22
last_sync_time= 2018-11-25 10:00:10
从上面的代码可以看出,写一个表的查询,由于字段太多,重复去写这个过程的话也是很累的,那么再抽象多一层model模型,专门进行这个数据处理的。
编写model类,抽象查询的过程方法
models.py
我新建了一个core文件夹目录,然后新建一个models,专门用来处理查询以及调用API发送请求的业务处理。
models.py代码如下:
# -*- coding: utf-8 -*-
from tools.MysqlTools import MysqldbHelper
class ModelHelper(object): # 继承object类所有方法
# 初始化数据库连接
def __init__(self , config):
self.mydb = MysqldbHelper(config) # 初始化打开数据库连接
def selectTable(self,DB_NAME,TABLE_NAME,fields,order):
# 选择数据库
self.mydb.selectDataBase(DB_NAME)
# 数据查询
print "========= 数据查询 ==========="
result = self.mydb.select(TABLE_NAME, fields=fields,order=order)
# 返回查询的数据
return result
编写test04.py测试文件,执行测试一下:
# -*- coding: utf-8 -*-
from tools.MysqlTools import MysqldbHelper
import pymysql
from tools.PostTools import PostHelper
from core.models import ModelHelper
if __name__ == "__main__":
# 定义数据库访问参数
config = {
'host': '#####注释####',
'port': 3361,
'user': 'root',
'passwd': '#####注释####',
'charset': 'utf8',
'cursorclass': pymysql.cursors.DictCursor
}
# 初始化数据模型
model = ModelHelper(config)
# 设置需要查询的数据库
DB_NAME = '#####注释####'
# 设置需要查询的表明
TABLE_NAME = '#####注释####'
# 数据查询
select_fields = [
"id",
#####注释####
"1",
"1",
"null",
"NOW()",
"last_sync_time",
]
select_order = "order by id desc limit 1"
select_result = model.selectTable(DB_NAME,TABLE_NAME,select_fields,select_order)
# 拆分打印的字段
for row in select_result:
print "id=", row['id']
#####注释####
print "1=", row['1']
print "1=", row['1']
print "null=", None
print "NOW()=", row['NOW()']
print "last_sync_time=", row['last_sync_time']
print
测试结果如下:
id= 1066511830261694464
#####注释####
1= 1
1= 1
null= None
NOW()= 2018-11-27 11:45:08
last_sync_time= 2018-11-25 10:00:10
那么已经抽象了这部分查询在model中处理了,还有下一步请求API也要写入到model中自动处理。 那么下面来继续写写。
将返回的查询结果转化为字典类型数据
其中查询的旧表字段与新表的字段应该要用字典进行一一映射关联,方便后续调用。 1、定义字典存储 旧表字段 《==》新表字段的映射关系 2、获取旧表字段数据,进行数据查询 3、获取新表字段对应存储数据,再次使用API请求新表,灌入数据
# 设置字段映射字典: 旧表查询字段 ==》 新表的字段
dict_fields = {
"id": "id",
"user_id": "user_id",
## 注释部分字段
"1": "purpose",
"1": "status",
"null": "data_source",
"NOW()": "create_time",
"last_sync_time": "last_modify_time"
}
# 获取旧表字段数组
select_fields = []
for key, value in dict_fields.items():
print "key = %s , value = %s" % (key,value)
select_fields.append(key)
print "select_fields = "
print select_fields
执行结果如下:
# 映射字典的查询数据
key = user_code , value = user_code
key = null , value = data_source
###### 注释部分 #######
key = NOW() , value = create_time
key = 1 , value = status
key = user_level , value = user_level
# 获取生成旧表需要查询的字段
select_fields = [
'census_town',
###### 注释部分 #######
'NOW()', '1',
'user_level'
]
1、那么下面就可以根据获取的字段数据,进行mysql数据查询 2、然后生成一个body请求体字典数据,但是此时body的请求体key是旧表的字段,请求API的时候需要新表的字段,那么就需要进行字段替换 3、再写一个字段映射字典的循环,生成请求API的new_body
# 此时已有查询字段的数组
print "select_fields = "
print select_fields
select_order = "order by id desc limit 2"
select_result = model.selectTable(DB_NAME,TABLE_NAME,select_fields,select_order)
# 循环生成每条查询数据的请求body
body = {}
for result in select_result:
for field in select_fields:
if field == "null":
body[field] = None
else:
body[field] = result[field]
# 打印查看已生成的body数据
for field in select_fields:
print body[field]
print body
# 更新body的字段为新表的字段
new_body = {}
for key, value in dict_fields.items():
print "key = %s , value = %s" % (key,value)
new_body[value] = body[key]
print "new_body="
print new_body
执行结果如下:
那么上面的过程最好写在model中,这样可以方便使用。
编写model增加生成请求API的body数据相关方法
# -*- coding: utf-8 -*-
from tools.MysqlTools import MysqldbHelper
from tools.PostTools import PostHelper
class ModelHelper(object): # 继承object类所有方法
# 初始化数据库连接
def __init__(self , config):
self.mydb = MysqldbHelper(config) # 初始化打开数据库连接
# 根据设置的旧表字段,查询旧库的数据库数据
def selectTable(self,DB_NAME,TABLE_NAME,fields,order):
# 选择数据库
self.mydb.selectDataBase(DB_NAME)
# 数据查询
result = self.mydb.select(TABLE_NAME, fields=fields,order=order)
# 返回查询的数据
return result
# 根据字段映射字典获取旧表字段数组
def getSelectFields(self,dict_fields):
# 获取旧表字段数组
select_fields = []
for key, value in dict_fields.items():
# print "key = %s , value = %s" % (key, value)
select_fields.append(key)
return select_fields
# 根据查询的结果以及字段字典,转化为请求API的body
def convertApiBody(self,result,dict_fields):
# 循环生成每条查询数据的请求body
body = {}
for result in result:
for field in result:
if field == "null":
body[field] = None
else:
body[field] = result[field]
# 更新body的字段为新表的字段
new_body = {}
for key, value in dict_fields.items():
# print "key = %s , value = %s" % (key, value)
if key == "null":
new_body[value] = None
else:
new_body[value] = body[key]
return new_body
使用model方法,以及执行结果:
那么下一步就是再编写一个执行API的方法。
但是在请求API之前,需要将body序列化为json格式,这个存在datetime类型导致序列化失败的情况,下一个篇章继续。