前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Python采用并发查询mysql以及调用API灌数据 (七)- 字典合并处理以及并发实现分析

Python采用并发查询mysql以及调用API灌数据 (七)- 字典合并处理以及并发实现分析

作者头像
Devops海洋的渔夫
发布2019-05-31 16:23:40
9340
发布2019-05-31 16:23:40
举报
文章被收录于专栏:Devops专栏Devops专栏

前情回顾

上一篇文章已经编写了解决datetime类型需要序列化的问题,那么本章节我们来继续编写循环请求API灌入数据,以及并发实现的初步分析。

实战任务

本次因为服务架构重构,表优化、重构,带来的任务就是需要从原来的mysql数据库中,读取原表数据(部分存在多张关联查询)然后通过调用API的服务方式灌入新的数据库表中(包含mysql、mongodb)。

执行流程如下

那么根据流程所需要的功能,需要以下的实例进行支撑: 1.并发实例 2.查询数据实例 3.执行post请求实例

目标:循环请求API灌入数据以及并发实现分析

循环请求API示例

在编写执行API请求之前,首先在查询过程有些特俗的字段需要加入api_body中,添加插入的数据,那么该如何处理呢?

合并dict字典数据

代码语言:javascript
复制
In [34]: dict1 = {'field1':'1'}

In [35]: dict2 = {'field2':'2'}

In [36]: 

In [36]: dict3 = dict( dict1, **dict2 )

In [37]: print dict3
{'field2': '2', 'field1': '1'}

In [38]: 

在model方法加入该功能,并循环请求API

在models.py增加方法:

代码语言:javascript
复制
    # 根据查询的结果以及字段字典,转化为请求API的body,最后再合并一个特殊数据的字典
    def convertMergeApiBody(self,result,dict_fields,special_fields):
        # 循环生成每条查询数据的请求body
        body = {}
        for result in result:
            for field in result:
                if field == "null":
                    body[field] = None
                else:
                    body[field] = result[field]
        # 更新body的字段为新表的字段
        new_body = {}
        for key, value in dict_fields.items():
            # print "key = %s , value = %s" % (key, value)
            if key == "null":
                new_body[value] = None
            elif isinstance(body[key],datetime.datetime): # 将datetime类型转str,解决json的序列化问题
                new_body[value] = body[key].strftime("%Y-%m-%d %H:%M:%S")
            else:
                new_body[value] = body[key]

        # 合并特俗数据字典
        api_body = dict( new_body , **special_fields)

        return api_body

编写循环请求API方法:

代码语言:javascript
复制
# -*- coding: utf-8 -*-

from tools.MysqlTools import MysqldbHelper
import pymysql
from tools.PostTools import PostHelper
from core.models import ModelHelper
import datetime
import urllib2,json

if __name__ == "__main__":

    # 定义数据库访问参数
    config = {
        'host': '######注释#########',
        'port': 3361,
        'user': 'root',
        'passwd': '######注释#########',
        'charset': 'utf8',
        'cursorclass': pymysql.cursors.DictCursor
    }
    # 初始化数据模型
    model = ModelHelper(config)
    # 设置需要查询的数据库
    DB_NAME = '######注释#########'
    # 设置需要查询的表明
    TABLE_NAME = '######注释#########'
    # 设置字段映射字典: 旧表查询字段 ==》 新表的字段
    # "id": "id",
    dict_fields = {
        ######注释#########
    }

    # 特殊字段,用于合并api_body请求中
    special_fields = {
        ######注释#########
    }

    # 获取旧表字段数组
    select_fields = model.getSelectFields(dict_fields)
    # 获取查询旧表数据数组
    select_order = "order by id desc limit 2000"
    select_result = model.selectTable(DB_NAME,TABLE_NAME,select_fields,select_order)
    print "select_result="
    print type(select_result)
    print select_result
    print
    result_row = []
    for row in select_result:
        result_row.append(row)  # 将查询结果加入list中
        # print "result=",result_row[0]

        # 生成API请求body
        api_body = model.convertMergeApiBody(result_row, dict_fields ,special_fields)
        print "api_body=", api_body, type(api_body)
        # print
        # 定义请求参数
        url='http://######注释#########'
        model.postInsertData(url=url,body=api_body)

        result_row.pop(0)  # 将查询结果剔除list中,保证传入api_body的参数只有一个字典的list

执行上面的代码,已经可以循环请求API请求灌入数据了。 但是插入的时间比较长。

可以使用一个time方法来计算一下耗时:

代码语言:javascript
复制
import time

start = time.clock()

#当中是你的程序

elapsed = (time.clock() - start)
print("Time used:",elapsed)

耗时如下:

代码语言:javascript
复制
('Time used:', 113.28491424571229)

使用了113秒,接近2分钟。这个效率是不能满足我们快速进行数据迁移的。那么下一步就是要考虑如何并发高效处理这些数据了。

那么下面来分析一下,哪个步骤耗时比较长。

看看代码,可以知道这个循环是需要等待每次API请求后,返回结果再进行下一个循环执行的。

代码语言:javascript
复制
   # 开始定时
    start = time.clock()
    result_row = []
    for row in select_result:
        result_row.append(row)  # 将查询结果加入list中
        # print "result=",result_row[0]

        # 生成API请求body
        api_body = model.convertMergeApiBody(result_row, dict_fields ,special_fields)
        print "api_body=", api_body, type(api_body)
        # print
        # 定义请求参数
        url='http://######注释############'
        model.postInsertData(url=url,body=api_body)

        result_row.pop(0)  # 将查询结果剔除list中,保证传入api_body的参数只有一个字典的list

    # 结束计时
    elapsed = (time.clock() - start)
    print("Time used:",elapsed)

那么将这个耗时较长请求API的工作进行异步并发,是否就可以解决问题了呢?

代码优化 - 构建生产者和消费者

根据这个处理图,首先将代码优化生产者和消费者两部分方法,然后再进行调用。

代码语言:javascript
复制
# 定义生产者: 返回查询mysql数据
def produce(model,DB_NAME,TABLE_NAME,dict_fields,select_order):
    # 获取旧表字段数组
    select_fields = model.getSelectFields(dict_fields)
    # 获取旧表查询数据
    select_result = model.selectTable(DB_NAME, TABLE_NAME, select_fields, select_order)
    return select_result

# 定义消费者:直接API请求
def consume(row,url,model):
    result_row.append(row)  # 将查询结果加入list中
    # print "result=",result_row[0]
    # 生成API请求body
    api_body = model.convertMergeApiBody(result_row, dict_fields, special_fields)
    model.postInsertData(url=url, body=api_body)
    result_row.pop(0)  # 将查询结果剔除list中,保证传入api_body的参数只有一个字典的list
代码语言:javascript
复制
    # 调用生成mysql查询数据
    select_result = produce(model,DB_NAME,TABLE_NAME,dict_fields,select_order)

    result_row = []
    for row in select_result:
        consume(row, url, model) # 消费请求API

好了,下面来看看怎么并发异步处理消费方法的这部分。

首先看一个并发异步的调用示例

参考:python 实现异步执行

代码语言:javascript
复制
#coding:utf-8
from threading import Thread
from time import sleep

def async(f):
    def wrapper(*args, **kwargs):
        thr = Thread(target = f, args = args, kwargs = kwargs)
        thr.start()
    return wrapper

@async
def A():
    sleep(2)
    print "a function"

def B():
    print "b function"

if __name__=='__main__':
    i = 0
    for i in range(0,5):
        A()
        B()
        print "i=",i

分析一下: 定义了一个装饰器 async 和 A 、B 两个function A 里面sleep 2s , 然后打印 a function 字符串 B 里面直接打印 b function 字符串 我们循环调用两个功能,查看打印数据的时间:

代码语言:javascript
复制
b function
i= 0
b function
i= 1
b function
i= 2
b function
i= 3
b function
i= 4
a functiona function

a functiona function

 a function

基本上 a function 的打印是同时出现的,所以sleep方法并没有阻塞其他A()方法的请求。 很好,这个就满足了我们消费者的这个场景。

将装饰器 async引入消费者,进行使用

加了这个异步处理修饰器之后,CPU就快速飞涨。效率迅速提升,但是由于对端API处理效率不足,也会出现报错的情况。

对于这种报错的情况,其实只要给数据库增加一个is_import的字段,作为保证字段。每调用成功一个API,那么就修改一下这个is_import字段为1,那么下次只查询is_import为0的数据插入,这样就可以保证数据插入失败后能够再次查询插入了。

但是还要考虑一下,如果我循环调用这个异步,第一次查询2000左右的数据,第二次再查询2000的数据,这两份数据是否存在交集的情况,此时应该就要使用加锁来进行处理了。

下一篇章,继续讲解加锁的处理方法。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018.11.30 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 SQL Server
腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档