前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >MYSQL 从项目经理的一次查询, 到PYTHON 解决问题(2) --传统企业使用MYSQL的问题

MYSQL 从项目经理的一次查询, 到PYTHON 解决问题(2) --传统企业使用MYSQL的问题

作者头像
AustinDatabases
发布2021-04-22 16:36:58
5400
发布2021-04-22 16:36:58
举报
文章被收录于专栏:AustinDatabasesAustinDatabases

上一期的读者这个话题的读者浏览量不是太多,有点可惜了, 实际上这就是传统企业在使用MYSQL时的问题. 解决方案很多,作为上一期的续集,我想从几点来阐述一下传统企业使用MYSQL的一些问题. 1 不少传统企业的软件开发是外包性质的,外包企业都是有一些成熟的架构的,大部分企业支持的数据库的列表都包含MYSQL ,并且MYSQL也是大部分企业使用的开源数据库之一. 那问题在哪里

1 传统企业并未有互联网的企业的技术水平,包含运维的水平,MYSQL的维护水平差,对MYSQL的认知水平也差,例如如果你问 MYSQL 是否适合所有业务的场景,大部分的回答可能是YES.

2 部分软件外包企业的人员流动大,技术本身积累的一般,当然大的软件外包商还是可以的,小的软件外包,就不好说了,问什么都支持,其实都是话术,真正能会使用MYSQL的软件人员就更少了,并且为了和涨春笋形式的软件开发速度一致,部分软件外包将ORACLE的表结构直接在MYSQL中实现,是部分企业的软件运行不畅和频频出问题的一个原因. 所以呢,真心希望某些软件外包上,能请一个资深的数据库专家,给你们普及一下表怎么设计, 怎么能符合数据库原理的使用数据库

2 另外在MYSQL 中火热的分表,尤其是多个物理主机形式的分表方式 ,逻辑分表或者 DBLE 方式的分表,在不少传统企业做起来比较困难,维护MYSQL的难度也提高了.所以软件外包上的分库分表,就变成了在一个MYSQL实例上的 分库分表, 通过逻辑关系将一个表打散变为N 张表. 这样解决很好,可使用的人员,尤其是需要通过SQL 来查询业务问题的一批人,就感到困惑了. 所以就有了下面的这个程序,(如果不清楚这个程序的产生的原因,和在MYSQL的之前通过SQL来查询产生的问题可以翻翻上一篇前传) 这个程序主要的想法是充分利用MYSQL的高并发,将数据查询打散,通过一个SESSION 处理 一个逻辑的查询,将几十万与几千万的两个表进行程序方式的JOIN ,最终获得需要的数据这里我们开了200个并发,并且计算了120万次,在6分钟交付了数据的分析结果,下面是相关的程序.

代码语言:javascript
复制
import configparserimport pymysql
import time
import asyncio  #标准库异步线程的库,协程,与多并发不同的是这个是单线程的
from aiomysql import create_pool  #第三方库,为MYSQL 来进行的异步线程操作库




class Solution:  #定义类
    def __init__(self, sql1, sql2):  #初始化参数
        config = configparser.ConfigParser()
        config.read('config.ini')   #读取配置文件
        self.host = config.get('config', 'host')
        self.user = config.get('config', 'user')
        self.password = config.get('config', 'password')
        self.database = config.get('config', 'database')
        self.save_database = config.get('config', 'save_database')
        self.save_user = config.get('config', 'save_user')
        self.save_password = config.get('config', 'save_password')

        self.conn = pymysql.connect(host=self.host, user=self.user, password=self.password, database=self.database,
                                    charset='utf8mb4')  #
        self.sql1 = sql1  #定义两个SQL
        self.sql2 = sql2

        self.task_num = 300 #异步并发数量, 一次可以干300个事务
              self.pool_size = 100 #连接池size读写各100
    def get_ids(self, ):
        cursor = self.conn.cursor()
        cursor.execute(self.sql1)
        self.data_a_full = cursor.fetchall()
        self.data_a_full_len = len(self.data_a_full)    # 获取查询的COUNT

    def run(self, ):

        self.get_ids()
        cur_loop = asyncio.get_event_loop()  #获取一个事件的循环
        cur_loop.run_until_complete(self.start(cur_loop))  

    async def start(self, loop):
        pool = await create_pool(host=self.host, port=3306,
                                 user=self.user, password=self.password,
                                 db=self.database, loop=loop, maxsize=self.pool_size)
        save_pool = await create_pool(host=self.host, port=3306,
                                      user=self.save_user, password=self.save_password,
                                      db=self.save_database, loop=loop, maxsize=self.pool_size)
        i = 0
        while True:
            start = time.time()
            m = self.data_a_full[i:i + self.task_num]   
            i += self.task_num
            tasks = []
            if len(m) == 0:    #每次给300个事务,直到这个池里面的没有数据取出
                break   
            for s in m:
                # print(s)
                sql = f"{sql2}'{s[0]}'"   #PYTHON3.X新特性简写,拼接SQL
                c1 = self.select(loop=loop, sql=sql, pool=pool, save_pool=save_pool, data=s)
                tasks.append(c1)   #执行SQL 语句
            await asyncio.gather(*tasks)
            print(time.time() - start,'----------',i,'/',self.data_a_full_len)

    async def select(self, loop, sql, pool, save_pool, data):  #处理的业务逻辑
        async with pool.acquire() as conn:
            async with conn.cursor() as cur:
                await cur.execute(sql)
                r = await cur.fetchall()
                table_name_dict = {
                    'F': 'tb_f',
                    'T': 'tb_t',
                    'D': 'tb_d',
                }
                sum_dict = {
                    'F': 0, 'T': 0, 'D': 0
                }
                for item in r:
                    if item[1] is not None:
                        sum_dict[item[1]] = sum_dict.get(item[1]) + item[0]
                data = list(data)
                sql_list = []
                for key,value in sum_dict.items():
                    table_name = table_name_dict.get(key)
                    new_data = data+[value]
                    new_data[2] = str(new_data[2])
                    if new_data[4] is not None:
                        new_data[4] = str(float(new_data[4]))
                    else:
                        new_data[4] = 0
                    new_data[3] = str(new_data[3])
                    if table_name:   #结果插入到MYSQL数据库中
                        insert_sql = "insert into {} (CONTRACTNO,APPLYNO,ACTIVEDATE,term,AMORTIZEAMT) values ('{}','{}','{}',{},{})".format(
                            table_name, *new_data)
                        sql_list.append(insert_sql)
                    sql_do = ';'.join(sql_list)
                    # print(sql_do)
                async with save_pool.acquire() as save_conn:
                    async with save_conn.cursor() as save_cur:
                        await save_cur.execute(sql_do)
                        await save_conn.commit()



if __name__ == '__main__':
    s = time.time()
    sql1 = "select CONTRACTNO,APPLYNO,ACTIVEDATE,term from tb_sync_contract where ACTIVEDATE>='2020-07-01'"
    sql2 = "select AMORTIZEAMT,SAP_POSTING_IND from tb_amortize where CAMAINID="
    solution = Solution(sql1, sql2)
    solution.run()
print(time.time() - s)

最终我们在一个16G 4CORE 核心的MYSQL 5.7.23 的数据库中,成功的产生200并发,模拟了75万与2千600百万的数据的JOIN的计算,产生结果 时间在6分钟. 感谢程序的提供者,我们的TEAM的 PYTHON专家兼 REDIS DBA 闫树爽. 另外随着我的TEAM的人员增多, 有PYTHON专家,有POSTGRESQL, MYSQL 的专家,估计以后能SHARE的文字会越来越多.

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-04-02,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 AustinDatabases 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 SQL Server
腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档