前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >aiomysql异步操作mysql

aiomysql异步操作mysql

作者头像
py3study
发布2020-05-09 15:46:53
6.2K0
发布2020-05-09 15:46:53
举报
文章被收录于专栏:python3python3

一、概述

aiomysql是一个从asyncio(PEP-3156/tulip)框架访问MySQL数据库的库。它依赖并重用PyMySQL的大部分部分。aiomysql试图成为一个很棒的aiopg库,并保留相同的api、外观和感觉。

在内部aimysql是PyMySQL的副本,底层io调用切换到async,基本上是等待并在适当的位置添加async def coroutine。从aiopg移植的sqlalchemy支持。

安装模块

代码语言:javascript
复制
pip3 install aiomysql

简单示例

代码语言:javascript
复制
import asyncio
import aiomysql

loop = asyncio.get_event_loop()


async def test_example():
    conn = await aiomysql.connect(host='127.0.0.1', port=3306,
                                       user='root', password='', db='mysql',
                                       loop=loop)

    cur = await conn.cursor()
    await cur.execute("SELECT Host,User FROM user")
    print(cur.description)
    r = await cur.fetchall()
    print(r)
    await cur.close()
    conn.close()

loop.run_until_complete(test_example())

二、demo演示

环境说明

操作系统:centos 7.6

mysql版本:5.7

数据库名:test

数据库默认编码:utf8mb4

具体表结构以及数据,请参考链接:

https://www.cnblogs.com/xiao987334176/p/12721498.html

这里面有2个表

单次执行

 执行select和update

代码语言:javascript
复制
#!/usr/bin/env python3
# coding: utf-8
"""
mysql 异步版本
"""
import traceback
import logging
import aiomysql
import asyncio
import time

logobj = logging.getLogger('mysql')


class Pmysql:
    def __init__(self):
        self.coon = None
        self.pool = None

    async def initpool(self):
        try:
            logobj.debug("will connect mysql~")
            __pool = await aiomysql.create_pool(
                minsize=5,  # 连接池最小值
                maxsize=10,  # 连接池最大值
                host='192.168.31.230',
                port=3306,
                user='root',
                password='abcd1234',
                db='test',
                autocommit=True,  # 自动提交模式
            )
            return __pool
        except:
            logobj.error('connect error.', exc_info=True)

    async def getCurosr(self):
        conn = await self.pool.acquire()
        # 返回字典格式
        cur = await conn.cursor(aiomysql.DictCursor)
        return conn, cur

    async def query(self, query, param=None):
        """
        查询操作
        :param query: sql语句
        :param param: 参数
        :return:
        """
        conn, cur = await self.getCurosr()
        try:
            await cur.execute(query, param)
            return await cur.fetchall()
        except:
            logobj.error(traceback.format_exc())
        finally:
            if cur:
                await cur.close()
            # 释放掉conn,将连接放回到连接池中
            await self.pool.release(conn)

    async def execute(self, query, param=None):
        """
        增删改 操作
        :param query: sql语句
        :param param: 参数
        :return:
        """
        conn, cur = await self.getCurosr()
        try:
            await cur.execute(query, param)
            if cur.rowcount == 0:
                return False
            else:
                return True
        except:
            logobj.error(traceback.format_exc())
        finally:
            if cur:
                await cur.close()
            # 释放掉conn,将连接放回到连接池中
            await self.pool.release(conn)


async def getAmysqlobj():
    mysqlobj = Pmysql()
    pool = await mysqlobj.initpool()
    mysqlobj.pool = pool
    return mysqlobj


async def test_select():
    mysqlobj = await getAmysqlobj()
    # UPDATE `youku`.`person` SET `psName` = '张三丰' WHERE (`id` = '3');
    exeRtn = await mysqlobj.query("select * from users")
    # print("查询结果",exeRtn)
    return exeRtn


async def test_update():
    mysqlobj = await getAmysqlobj()
    # UPDATE `youku`.`person` SET `psName` = '张三丰' WHERE (`id` = '3');
    exeRtn = await mysqlobj.execute("update users set username='xiao1' where id='1'")
    # print("exeRtn", exeRtn, type(exeRtn))
    if exeRtn:
        # print('操作成功')
        return '操作成功'
    else:
        # print('操作失败')
        return '操作失败'


async def main():  # 调用方
    tasks = [test_select(), test_update()]  # 把所有任务添加到task中
    done, pending = await asyncio.wait(tasks)  # 子生成器
    for r in done:  # done和pending都是一个任务,所以返回结果需要逐个调用result()
        # print('协程无序返回值:'+r.result())
        print(r.result())


if __name__ == '__main__':
    start = time.time()
    loop = asyncio.get_event_loop()  # 创建一个事件循环对象loop
    try:
        loop.run_until_complete(main())  # 完成事件循环,直到最后一个任务结束
    finally:
        loop.close()  # 结束事件循环
    print('所有IO任务总耗时%.5f秒' % float(time.time() - start))

执行输出:

代码语言:javascript
复制
操作成功
[{'id': 1, 'username': 'xiao', 'password': '123', 'phone': '12345678910', 'email': '123@qq.com', 'create_time': datetime.datetime(2020, 4, 10, 1, 22, 7)}]
所有IO任务总耗时0.03948秒

批量插入

批量插入使用executemany

插入3万条数据

代码语言:javascript
复制
#!/usr/bin/env python3
# coding: utf-8

import time
import asyncio
import aiomysql

start = time.time()
loop = asyncio.get_event_loop()

async def test_example():
    conn = await aiomysql.connect(host='192.168.31.230', port=3306,
                                       user='root', password='abcd1234',
                                       db='test', loop=loop)

    # create default cursor
    cursor = await conn.cursor()

    # execute sql query
    data = []
    for i in range(1,30000):
        data.append(('xiao%s'%i, '123', '12345678910', '123@qq.com', '2020-04-10 01:22:07'),)

    stmt = "INSERT INTO users (username,password,phone,email,create_time) VALUES(%s,%s,%s,%s,%s);"

    await cursor.executemany(stmt, data)
    await conn.commit()
    # detach cursor from connection
    await cursor.close()

    # close connection
    conn.close()

loop.run_until_complete(test_example())
print('所有IO任务总耗时%.5f秒' % float(time.time() - start))

执行输出:

代码语言:javascript
复制
所有IO任务总耗时11.96885秒

本文参考链接:

https://www.cnblogs.com/ygy1997/p/11753335.html

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-05-07 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、概述
    • 安装模块
      • 简单示例
      • 二、demo演示
        • 环境说明
          • 单次执行
            • 批量插入
            相关产品与服务
            云数据库 MySQL
            腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档