前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Python采用并发查询mysql以及调用API灌数据 (八)- 异步并发加锁,保证数据安全

Python采用并发查询mysql以及调用API灌数据 (八)- 异步并发加锁,保证数据安全

作者头像
Devops海洋的渔夫
发布2019-05-31 16:24:24
1.1K0
发布2019-05-31 16:24:24
举报
文章被收录于专栏:Devops专栏Devops专栏

前情回顾

上一篇文章已经编写了异步并发API请求灌数据,那么本章节我们来继续编写异步并发加锁,保证数据安全

实战任务

本次因为服务架构重构,表优化、重构,带来的任务就是需要从原来的mysql数据库中,读取原表数据(部分存在多张关联查询)然后通过调用API的服务方式灌入新的数据库表中(包含mysql、mongodb)。

执行流程如下

那么根据流程所需要的功能,需要以下的实例进行支撑: 1.并发实例 2.查询数据实例 3.执行post请求实例

目标:循环查询处理并发数据,并且加锁保证数据安全

给查询数据表添加is_import字段,在mysql表中添加查询标识,插入成功则为1,无插入则为0

然后初始化 is_import = 0 即可,下面来给我们之前的model方法的查询中添加条件查询。

编写model类中selectTable方法,增加条件查询

代码语言:javascript
复制
    # 根据设置的旧表字段,查询旧库的数据库数据
    def selectTable(self,DB_NAME,TABLE_NAME,fields,order,cond_dict=''):
        # 选择数据库
        self.mydb.selectDataBase(DB_NAME)
        # 数据查询
        result = self.mydb.select(TABLE_NAME, fields=fields,order=order,cond_dict=cond_dict)
        # 关闭连接
        self.mydb.close()
        # 返回查询的数据
        return result

增加条件查询cond_dict字典,测试使用。

测试成功之后,就要在model方法中增加一个更新is_import为1的方法了。

在model类中增加更新is_import为1的方法

有些时候,因为传入的可能字段名不是is_import,可能是is_import_xxx。那么就要根据传入的字典获取字段名称了。

代码语言:javascript
复制
    # 更新is_import字段为1的方法
    def updateIsImport(self, TABLE_NAME, attrs_dict, cond_dict):
        """更新数据

                    args:
                        tablename  :表名字
                        attrs_dict  :更新属性键值对字典
                        cond_dict  :更新条件字典

                    example:
                        params = {"name" : "caixinglong", "age" : "38"}
                        cond_dict = {"name" : "liuqiao", "age" : "18"}
                        mydb.update(table, params, cond_dict)

        """
        # 选择数据库
        result = self.mydb.update(TABLE_NAME, attrs_dict=attrs_dict, cond_dict=cond_dict)
        return result

写好了,更新字段的方法之后,下面我们在API请求成功之后进行使用。

在消费者方法中引用更新方法

此时消费者已经在上一个篇章中写了异步并发的方法,但是这样调用的话,会导致mysql更新的时候报错。 为了保证数据安全,我只能降低效率,增加锁了。

首先先看一个线程加锁的伪代码

代码语言:javascript
复制
#-* coding: utf-8 -*
import threading
import time
import os

def func1(k):
    global lock
    while True:
        lock.acquire()  # 开始锁进程
         .... 执行任务 ...
        lock.release()  # 释放进程锁

if __name__=='__main__':
    
    # 初始化进程锁
    lock = threading.Lock()
    
    # 使用4个CPU开启进程并发
    for k in range(4):
        new_thread = threading.Thread(target=func1,args=(k,))  # 开启一个进程调用func1,并且传入参数k
        new_thread.start()

从示例代码可以看出,进程锁的基本使用方法。下面我们来使用一下进程锁来保证数据安全。

使用进程锁

代码语言:javascript
复制
    result_row = []
    lock = threading.Lock()  # 初始化进程锁
    for row in select_result:
        lock.acquire()  # 开启进程锁
        consume(row, url, model,lock=lock) # 消费请求API

初步代码基本就写到这里了。后面肯定有很多需要优化的地方。 例如: 1、使用查询分页再开启线程并发处理。 2、拆分生产者与消费者,加入rabbitmq等中间件来对付异常处理

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018.12.01 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 SQL Server
腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档