python-MySQLdb的二三事

追寻

介绍

mysqldb是python操作mysql数据库的一个库.mysql的几乎所有的操作都可以实现,另外,mysqldb的一些比较的option让数据操作更符合pythonic风格.在python2系列使用Mysqldb,在python3系列使用pymysql和mysql.connect.

Mysqldb的安装

下面按python2系列安装

1. pip方式安装
pip install MySQL-python

2. yum安装
sudo yum install python-mysqldb

3. apt安装
sudo apt-get install  python-mysqldb

4.源码安装(这里就不介绍了,源码地址给出)
https://pypi.python.org/packages/a5/e9/51b544da85a36a68debe7a7091f068d802fc515a3a202652828c73453cad/MySQL-python-1.2.5.zip#md5=654f75b302db6ed8dc5a898c625e030c
Mysqldb的使用介绍

我们直接操作mysql的流程如下: 1.使用mysql-client连上数据库 mysql -uuser_anme -ppassword -hhost_name 2.再执行具体的sql语句,获取所需的数据 >use db_name; >select * from table_name;

python-mysqldb的使用方式和上面的流程是一样的.下面进入正题.

1. 连接数据库

import MySQLdb
conn = MySQLdb.connect(db='database', host='172.16.0.1', user='user', passwd='password', port=3306)
上面只是进行了数据库的连接

2. 游标对象

cur = conn.cursor()

3. 执行语句

sql = '****'
cur.execute(sql)

介绍一下具体的对象的method:

下面介绍一下method的使用

connect对象
conn = MySQLdb.connect(db='database', host='172.16.0.1', user='user', passwd='password', port=3306)

connect()的参数列表

参数

描述

user

username

passwd

password

host

hostname

database

databasename

dsn

data source name

port

端口,int

conv

数据转换

charset

字符集

其他的一些参数:unix_socket,compress,connect_timeout,named_pipe, init_command,read_default_file,read_default_group,cursorclass,use_unicode,sql_mode,ssl

下面详细介绍一下conv这个参数 介绍一个类型对象的概念,通常不同的系统的接口要求的参数类型是不一致的,譬如python调用c函数时python对象和c类型之间就需要进行数据格式的转换.所以,在python对象和原生数据库对象之间也需要进行数据格式的转换. 在MySQLdb.converters.conversions中

encoders = {
        bool: escape_bool,
        int: escape_int,
        long_type: escape_int,
        float: escape_float,
        str: escape_str,
        text_type: escape_unicode,
        tuple: escape_sequence,
        list: escape_sequence,
        set: escape_sequence,
        dict: escape_dict,
        type(None): escape_None,
        datetime.date: escape_date,
        datetime.datetime: escape_datetime,
        datetime.timedelta: escape_timedelta,
        datetime.time: escape_time,
        time.struct_time: escape_struct_time,
        Decimal: str,
        }
decoders = {
        FIELD_TYPE.BIT: convert_bit,
        FIELD_TYPE.TINY: int,
        FIELD_TYPE.SHORT: int,
        FIELD_TYPE.LONG: int,
        FIELD_TYPE.FLOAT: float,
        FIELD_TYPE.DOUBLE: float,
        FIELD_TYPE.DECIMAL: float,
        FIELD_TYPE.NEWDECIMAL: float,
        FIELD_TYPE.LONGLONG: int,
        FIELD_TYPE.INT24: int,
        FIELD_TYPE.YEAR: int,
        FIELD_TYPE.TIMESTAMP: convert_mysql_timestamp,
        FIELD_TYPE.DATETIME: convert_datetime,
        FIELD_TYPE.TIME: convert_timedelta,
        FIELD_TYPE.DATE: convert_date,
        FIELD_TYPE.SET: convert_set,
        FIELD_TYPE.BLOB: through,
        FIELD_TYPE.TINY_BLOB: through,
        FIELD_TYPE.MEDIUM_BLOB: through,
        FIELD_TYPE.LONG_BLOB: through,
        FIELD_TYPE.STRING: through,
        FIELD_TYPE.VAR_STRING: through,
        FIELD_TYPE.VARCHAR: through,
        FIELD_TYPE.DECIMAL: Decimal,
        FIELD_TYPE.NEWDECIMAL: Decimal,
        }

下面来说说,自己如何自定义使用:

from MySQLdb.constants import FIELD_TYPE

my_conv = { 
    FIELD_TYPE.LONG: int,   # 长整型转成int,默认数据后面有一个L,去掉
    FIELD_TYPE.DATE: str    # 日期转成字符串
    }
conn = MySQLdb.connect(db='database', host='172.16.0.1', user='user', passwd='password', port=3306,conv=my_conv)

开始扩展connect对象的方法

方法名

描述

close()

关闭连接

commit()

提交当前事务

autocommit()

自动提交事务

rollback()

取消当前事务

cursor()

实例一个游标对象

errorhandler(cxn,cur,errcls,errval)

作为已给游标的句柄

这里介绍一下事务 先来举个小例子:

conn = MySQLdb.connect(*args, **kwags)
cur = conn.cursor()
sql = "insert into tb_name values (%(id)s,%(name)s,%(age)s);" % {'id': 1, 'name': 'ruyu', 'age': 99}
cur.excute(sql)
cur.close()
conn.close()

插入一行数据,上面操作是不生效的 需要添加一行

conn.commit()
补充知识: 数据库事务

数据库事务(Database Transaction) ,是指作为单个逻辑工作单元执行的一系列操作,要么完全地执行,要么完全地不执行。 事务处理可以确保除非事务性单元内的所有操作都成功完成,否则不会永久更新面向数据的资源。 事务的属性:必须满足ACID

  1. (Atomic)(Atomicity)原子性
  2. (Consistent)(Consistency)一致性
  3. (Insulation)(Isolation)隔离性
  4. (Duration)(Durability) 持久性

顺便提一下mysql的隔离级别吧! **

  1. Read Uncommitted(读取未提交内容)
  2. Read Committed(读取提交内容)
  3. Repeatable Read(可重读)
  4. Serializable(可串行化)

这里涉及一些脏读,幻读,不可重复读的概念,希望读者自己百度,找个教程,做一下数据库事务的实验,理解一下事务的概念.

让心去旅行

游标对象

cur = MySQLdb.connect(*args, **kwags).cursor()

先介绍一下cursor()的函数

    def cursor(self, cursorclass=None):
        return (cursorclass or self.cursorclass)(self)

其中一个参数是cursorclass 在MySQLdb.cursors中有所有的cursorclass 分别如下:

BaseCursor,CursorStoreResultMixIn,CursorUseResultMixIn,CursorTupleRowsMixIn,CursorDictRowsMixIn,CursorOldDictRowsMixIn,CursorDictRowsMixIn,CursorOldDictRowsMixIn,Cursor,DictCursor,SSCursor,SSDictCursor

经常使用的应该是DictCursor,Cursor 一种是返回的数据以字典格式,一种是tuple格式.

下面介绍cursor对象的属性和方法

对象属性和方法

描述

arraysize

使用fetchmany()方法一次取出多少条记录,默认值为1

connection

创建此游标对象的连接connect对象

description

返回游标活动状态(七个元素的元祖):(name,type_code,display_size,interal_size,precision,scale,null_ok);只有name和type_code是必须提供的

lastrowid

返回最后更新的id(可选),适用场景,在插入数据,返回插入数据的id

rowcount

最后一次execute()操作返回或影响的行数

callproc(func[,args])

调用一个存储过程

close()

关闭游标对象

execute(op[,args])

执行一个数据库查询或命令

executemany(op,args)

类似execute()和map()的结合,为给定的每一个参数准备并执行一个数据库的查询/命令

fetchone()

得到结果集的下一行

fetchmany([size=cursor.arraysize])

得到结果集的下几行

fetchall()

返回所有的结果

_iter_()

创建一个迭代对象

messages

游标执行后数据库返回的信息列表

next()

使用迭代对象得到结果集的下一行

nextset()

移到下一个结果集

rownumber

当前结果集中游标的索引

setinput-sizes(sizes)

设置输入最大值

setoutput-size(size[,col])

设置大列的缓冲区大写

提示: 使用最多的就是execute和fetch.

上面的介绍差不多了,这里给出一个mysqldb的封装的库torndb的源码:

#!/usr/bin/env python
from __future__ import absolute_import, division, with_statement

import copy
import logging
import os
import time

#下面是解决python2和python3的mysqldb不同的
try:
    import MySQLdb.constants
    import MySQLdb.converters
    import MySQLdb.cursors
except ImportError:
    try:
        import pymysql as MySQLdb
    except ImportError:
        if 'READTHEDOCS' in os.environ:
            MySQLdb = None
        else:
            raise

version = "0.3"
version_info = (0, 3, 0, 0)


# 定义一个connect类,可以实例connect()对象
class Connection(object):
    def __init__(self, host, database, user=None, password=None,
                 max_idle_time=7 * 3600, connect_timeout=0,
                 time_zone="+0:00", charset="utf8", sql_mode="TRADITIONAL",
                 **kwargs):
        self.host = host
        self.database = database
        self.max_idle_time = float(max_idle_time)

        args = dict(conv=CONVERSIONS, use_unicode=True, charset=charset,
                    db=database, init_command=('SET time_zone = "%s"' % time_zone),
                    connect_timeout=connect_timeout, sql_mode=sql_mode, **kwargs)
        if user is not None:
            args["user"] = user
        if password is not None:
            args["passwd"] = password

        # We accept a path to a MySQL socket file or a host(:port) string
        if "/" in host:
            args["unix_socket"] = host
        else:
            self.socket = None
            pair = host.split(":")
            if len(pair) == 2:
                args["host"] = pair[0]
                args["port"] = int(pair[1])
            else:
                args["host"] = host
                args["port"] = 3306

        self._db = None
        self._db_args = args
        self._last_use_time = time.time()
        try:
            self.reconnect()
        except Exception:
            logging.error("Cannot connect to MySQL on %s", self.host,
                          exc_info=True)

    def __del__(self):  # 删除和关闭
        self.close()

    def close(self):  # 关闭并置位空
        if getattr(self, "_db", None) is not None:
            self._db.close()
            self._db = None

    def reconnect(self): # 重新连接
        self.close()
        self._db = MySQLdb.connect(**self._db_args)
        self._db.autocommit(True)  # 开启自动提交的功能

    def iter(self, query, *parameters, **kwparameters):  # 进行迭代的
        self._ensure_connected()
        cursor = MySQLdb.cursors.SSCursor(self._db)
        try:
            self._execute(cursor, query, parameters, kwparameters)
            column_names = [d[0] for d in cursor.description]
            for row in cursor:
                yield Row(zip(column_names, row))
        finally:
            cursor.close()

    def query(self, query, *parameters, **kwparameters):  # 进行执行,返回结果
        """Returns a row list for the given query and parameters."""
        cursor = self._cursor()
        try:
            self._execute(cursor, query, parameters, kwparameters)
            column_names = [d[0] for d in cursor.description]
            return [Row(zip(column_names, row)) for row in cursor]
        finally:
            cursor.close()

    def get(self, query, *parameters, **kwparameters):
        rows = self.query(query, *parameters, **kwparameters)
        if not rows:
            return None
        elif len(rows) > 1:
            raise Exception("Multiple rows returned for Database.get() query")
        else:
            return rows[0]


    def execute(self, query, *parameters, **kwparameters):
        return self.execute_lastrowid(query, *parameters, **kwparameters)

    def execute_lastrowid(self, query, *parameters, **kwparameters):
        cursor = self._cursor()
        try:
            self._execute(cursor, query, parameters, kwparameters)
            return cursor.lastrowid
        finally:
            cursor.close()

    def execute_rowcount(self, query, *parameters, **kwparameters):
        cursor = self._cursor()
        try:
            self._execute(cursor, query, parameters, kwparameters)
            return cursor.rowcount
        finally:
            cursor.close()

    def executemany(self, query, parameters):
        return self.executemany_lastrowid(query, parameters)

    def executemany_lastrowid(self, query, parameters):
        cursor = self._cursor()
        try:
            cursor.executemany(query, parameters)
            return cursor.lastrowid
        finally:
            cursor.close()

    def executemany_rowcount(self, query, parameters):
        cursor = self._cursor()
        try:
            cursor.executemany(query, parameters)
            return cursor.rowcount
        finally:
            cursor.close()

    update = delete = execute_rowcount
    updatemany = executemany_rowcount

    insert = execute_lastrowid
    insertmany = executemany_lastrowid

    def _ensure_connected(self):
        if (self._db is None or
                (time.time() - self._last_use_time > self.max_idle_time)):
            self.reconnect()
        self._last_use_time = time.time()

    def _cursor(self):
        self._ensure_connected()
        return self._db.cursor()

    def _execute(self, cursor, query, parameters, kwparameters):
        try:
            return cursor.execute(query, kwparameters or parameters)
        except OperationalError:
            logging.error("Error connecting to MySQL on %s", self.host)
            self.close()
            raise


class Row(dict):
    def __getattr__(self, name):
        try:
            return self[name]
        except KeyError:
            raise AttributeError(name)


if MySQLdb is not None:
    FIELD_TYPE = MySQLdb.constants.FIELD_TYPE
    FLAG = MySQLdb.constants.FLAG
    CONVERSIONS = copy.copy(MySQLdb.converters.conversions)

    field_types = [FIELD_TYPE.BLOB, FIELD_TYPE.STRING, FIELD_TYPE.VAR_STRING]
    if 'VARCHAR' in vars(FIELD_TYPE):
        field_types.append(FIELD_TYPE.VARCHAR)

    for field_type in field_types:
        CONVERSIONS[field_type] = [(FLAG.BINARY, str)] + CONVERSIONS[field_type]
    IntegrityError = MySQLdb.IntegrityError
    OperationalError = MySQLdb.OperationalError

上面源码就不解释了,都是很基础的内容,很适合入门学习.

这里说一个python-mysqldb遇到的问题,很复杂的sql语句,在mysql中有数据,但是在mysqldb第一次执行确有部分字段是None,第二次或后面都是没问题的,我也请教了我们的python大神,他说他遇到,t并且他说这不是bug.他不能重现问题,所以也没解决,我的缓兵之计如下:

    def query(self, query):
        cursor = self._cursor()
        try:
            cursor.execute(query)
            data = cursor.fetchall()
            cursor.execute(query)
            raw = cursor.fetchall()
            return raw
        finally:
            cursor.close()

安静

内容到这里就结束了!!!! 预告一下,后续的内容: ORM,自己带着写一个ORM,Django ORM的queryset,manager以及一些定制化的,当然还有sqlalchemy.

再见

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏chenssy

【死磕Java并发】—–Java内存模型之从JMM角度分析DCL

DCL,即Double Check Lock,中卫双重检查锁定。其实DCL很多人在单例模式中用过,LZ面试人的时候也要他们写过,但是有很多人都会写错。他们为什么...

421110
来自专栏xingoo, 一个梦想做发明家的程序员

Java程序员的日常——存储过程知识普及

存储过程是保存可以接受或返回用户提供参数的SQL语句集合。在日常的使用中,经常会遇到复杂的业务逻辑和对数据库的操作,使用存储过程可以进行封装。可以在数据库中定...

19880
来自专栏Java3y

纳税服务系统六(信息发布管理模块)【Ueditor、异步信息交互、抽取BaseService、条件查询、分页】

需求分析 我们现在来到了纳税服务系统的信息发布管理模块,首先我们跟着原型图来进行需求分析把: 一些普通的CRUD,值得一做的就是状态之间的切换了。停用和发布切换...

52460
来自专栏闻道于事

JPA实体类中的注解

@Entity   标注于实体类上,通常和@Table是结合使用的,代表是该类是实体类 @Table   标注于实体类上,表示该类映射到数据库中的表,没有指定名...

31670
来自专栏Samego开发资源

数据库Dao层抽象出BasicDao类 | 许久没碰Java了、致Java初学者

19140
来自专栏python学习指南

Elasticsearch多索引

 在Elasticsearch中,一般的查询都支持多索引。 只有文档API或者别名API等不支持多索引操作,因此本篇就翻译一下多索引相关的内容。 首先,先...

60960
来自专栏Hongten

spring开发_Spring+Struts2

http://www.cnblogs.com/hongten/gallery/image/112920.html

10720
来自专栏听雨堂

Python防止sql注入

看了网上文章,说的都挺好的,给cursor.execute传递格式串和参数,就能防止注入,但是我写了代码,却死活跑不通,怀疑自己用了一个假的python 最后,...

40770
来自专栏JMCui

再学习之MyBatis.

一、框架基本介绍 1、概念 支持普通SQL查询、存储过程和高级映射,简化和实现了Java 数据持久化层的的开源框架,主要流行的原因在于他的简单性和易使用性。 2...

49680
来自专栏芋道源码1024

数据库中间件 MyCAT 源码解析 —— 分片结果合并(一)

1. 概述 相信很多同学看过 MySQL 各种优化的文章,里面 99% 会提到:单表数据量大了,需要进行分片(水平拆分 or 垂直拆分)。分片之后,业务上必然面...

474130

扫码关注云+社区

领取腾讯云代金券