前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >MySQL亿级数据快速导出

MySQL亿级数据快速导出

作者头像
嘉美伯爵
发布2021-01-21 05:33:33
3.7K0
发布2021-01-21 05:33:33
举报
文章被收录于专栏:微服务架构日记

mysql千万级数据如何快速导出

今天给大家讲解如何快速的导出千万级MySQL中的数据,大家平时在进行MySQL数据导出的时候,如何数据量不大(万级记录)可能不会遇到这样那样的问题,下面就我前段事件导出MySQL千万级(目前量级8千万,已快到一亿)数据遇到问题的一个回放和代码优化。

查询优化

当你接到需求,可能第一时间想到,直接全量查询不就好了,如果数据记录在几万条还好,当MySQL一个表的数据大于200W的时候,这个时候去查询已经非常吃力了,即使在添加索引的情况下。

查询需求

收到的需求是,为满足算法团队模型训练用,需要满足根据网元名称和时间范围动态快速的查询出数据,由于后台每天产生的数据是惊人的,该数据由kafka生产,MySQL存储。MySQL表中的下列字段nenamestarttimeendtime均加有索引,查询时我们需要将产生逻辑运算的字段,放在后面,不然会引起索引失效的情况。

代码语言:javascript
复制
select * from workflow_hugealarm where nename = {nename} and eventtime > {starttime} and eventtime < {endtime};

多线程+分页查

由于上面的方案查询速度和效率都非常慢,因此对程序进行了改进,使用多线程+分页的方式进行查询。具体思路是:批量开启多个线程,然后用数量行数除以每页查询的条数,得到总页数,最后交给线程池进行批量执行。

  • 获取数据行数
代码语言:javascript
复制
select max(id) as counts from workflow_hugealarm;
  • 数据等分
代码语言:javascript
复制
def thread_pool(self, **kwargs):
    pages = self.get_count() / self.step
    with ThreadPoolExecutor(100) as execute:
        for step in range(1, math.ceil(pages) + 1):
            try:
                execute.submit(self._query, step, **kwargs)
            except Exception as e:
                logging.error(e)
                continue
    return pd.DataFrame(self.data)

多线程+MySQL连接池查

上面的查询方案运行一段时间后会发现程序报错,经过多方定位发现是MySQL连接数超限,很快就到了MySQL的最大连接数,即1024个连接数。最后想到的解决方案是维护一个MySQL的连接池,这里我们使用Python字典类型进行存储维护。

  • 报错原因
代码语言:javascript
复制
AttributeError: 'NoneType' object has no attribute 'settimeout'
  • 连接池
代码语言:javascript
复制
def get_instance(self):
    """
    创建数据库连接池
    :return:
    """
    name = threading.current_thread().name
    if name not in self.pool:
        conn = pymysql.connect(**self.db_info)
        self.pool[name] = conn
    return self.pool[name]

多进程+多线程查

上面的方案其实已经满足大批量查询了,为了使导出速度变的更快,我们使用多进程+多线程组合的方式进行查询。这也是处理IO密集型业务的最佳实践方案,使用该方案,可以极大的规避GIL所带来的弊端。

  • 多进程+多线程
代码语言:javascript
复制
# Processing pool (see below for initiazation)
pool = None

# Performs a large calculation (CPU bound)
def some_work(args):
    ...
    return result

# A thread that calls the above function
def some_thread():
    while True:
        ...
        r = pool.apply(some_work, (args))
        ...

# Initiaze the pool
if __name__ == '__main__':
    import multiprocessing
    pool = multiprocessing.Pool()

总结

由于公司每天产生的数据是惊人的,因此经过我们商讨已经不适合用MySQL来进行存储了,我们使用了PostgreSql数据库集群来进行了存储,每天分别建表来存储。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021/01/20 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • mysql千万级数据如何快速导出
    • 查询优化
      • 查询需求
    • 多线程+分页查
      • 多线程+MySQL连接池查
        • 多进程+多线程查
          • 总结
          相关产品与服务
          云数据库 SQL Server
          腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档