游标
在stored Routines调用中开的一个缓冲区,用于存放SQL调用的结果集。(结果集是只读的)
意味着我们的查询可以返回一个文档也可以返回一个游标去指向一个结果集,而后通过游标的切换而获取每个结果
Python连接数据库
涉及模块
mysqldb py3不再更新
pymysql py3用的模块
安装pymysql 模块
pip install pymysql
导入模块
import pymysql
查看pymysql源码
默认自动提交是False的
找到 Connection 类中,看如下代码:
def __init__(self, host=None, user=None, password="",
database=None, port=0, unix_socket=None,
charset='', sql_mode=None,
read_default_file=None, conv=None, use_unicode=None,
client_flag=0, cursorclass=Cursor, init_command=None,
connect_timeout=10, ssl=None, read_default_group=None,
compress=None, named_pipe=None, no_delay=None,
autocommit=False, db=None, passwd=None, local_infile=False,
max_allowed_packet=16*1024*1024, defer_connect=False,
auth_plugin_map={}, read_timeout=None, write_timeout=None,
bind_address=None, binary_prefix=False
):
连接数据库,判断数据库正常与否
使用try或者使用ping方法都可以
通过ping测试连通性
通过判断是否是None或者异常,则认为服务是否是存活的
import pymysql
conn = pymysql.connect('ip','root','123456')
print(conn.ping)
print(conn.ping(False))
如果连接失败我们要对其进行关闭,所以最好加入到try中
import pymysql
try:
conn = pymysql.connect('x.x5.x','root','123456','test110')
print(conn.ping(False))
finally:
if conn:
conn.close()
游标 Curosr
在stored Routines调用中开的一个缓冲区,用于存放SQL调用的结果集。(结果集是只读的)
意味着我们的查询可以返回一个文档也可以返回一个游标去指向一个结果集,而后通过游标的切换而获取每个结果
在操作数据库的时候必须使用cursor类的实例,提供execute()方法,执行sql语句返回成功的行数
执行sql语句
import pymysql
try:
conn = pymysql.connect('.x4.1.1','root','123456','test110')
print(conn.ping(False))
cursor = conn.cursor() # 获取游标
insert_sql = "insert into student(name,age) values('jerry',20)"
line = cursor.execute(insert_sql) # 执行
print(line)
finally:
if conn:
conn.close()
返回如下:
None
1
但是数据库中并无数据,因为没有commit
在Connection类中,默认设置的是
一般不需要开启自动提交,需要手动管理事务并统一提交
事物的管理
有任何异常conn都要回滚确保数据无误,如果没有异常则commit()
close和commit 没有先后关系顺序
一个标准的数据库连接和操作关闭流程
import pymysql
conn = None
try:
conn = pymysql.connect('47.xxx','root','123456','test110')
print(conn.ping(False))
# 获取游标
cursor = conn.cursor()
insert_sql = "insert into student(name,age) values('jerry',20)"
line = cursor.execute(insert_sql)
cursor.close()
conn.commit()
except:
conn.rollback()
finally:
if conn:
conn.close()
批量插入
try:
conn = pymysql.connect('111.11.110','root','123456','test110')
print(conn.ping(False))
# 获取游标
cursor = conn.cursor()
#插入数据
for i in range(10):
insert_sql = "insert into student(name,age) values('jerry',{})".format(i)
line = cursor.execute(insert_sql)
print('line:',line)
cursor.close()
conn.commit()
往往会将批量的修改全部写前面,最后统一执行commit,游标关闭和链接提交是可以不分先后的
操作流程
建立连接--> 获取游标 --> 执行SQL --> 提交事物 --> 释放资源
如果出现异常需要回滚再释放资源
查询
不需要事物的地方一定不要使用事物,影响效率
查看分别有什么区别
conn = pymysql.connect('47.4.xx.xx','root','123456','test110')
print(conn.ping(False))
# 获取游标
cursor = conn.cursor()
sql = 'select * from student'
line = cursor.execute(sql)
#
获取前两个
# print(cursor.fetchaone())
# print(cursor.fetchall())
获取5个并以元组方式返回
print(cursor.fetchmany(5))
print(cursor.fetchmany(5))
获取剩下的全部
print(cursor.fetchall())
查看返回结果fethchall之所以获取很少的数据,因为都使用的是游标,类似于一个指针
重置游标
print(cursor.fetchmany(5))
print(cursor.fetchall())
cursor.rownumber = 0
print(cursor.fetchall())
这样指针又指向了0,其实是指向的索引
查看fethchall源码
实际上就是做了一个切片,以切片方式并记录当前位置返回我们想要的结果
def fetchall(self):
"""Fetch all the rows"""
self._check_executed()
if self._rows is None:
return ()
if self.rownumber:
result = self._rows[self.rownumber:]
else:
result = self._rows
self.rownumber = len(self._rows)
return result
DictCursor 字典游标,带一些字段名方式进行返回
Cursor 类有一个Mixin的子类 DictCursor
导入模块
from pymysql.cursors import DictCursor
cursor = conn.cursor(cursor=DictCursor)
以上是元组和字典的返回差异
返回如下:
[{'name': 'jerry', 'en': None, 'age': 1}, {'name': 'jerry', 'en': None, 'age': 2}]
SQL注入***
在登陆的时候做了一些匹配或者明文匹配所导致
一般情况都需要进行加密
比如这样的语句:
select * from t where name='ben' and password='ben';
在登陆时要做唯一约束的,用户在写提交程序的时候,用户名需要异步去验证
这个过程已经是查过数据库了
在登陆时要做唯一约束的,用户在写提交程序的时候,用户名需要异步去验证
这个过程已经是查过数据库了
但是用户在执行的时候加了这么一句
select * from t where name='ben' and password='ben' or 1 = 1 ;
or 1=1 是真值,相当于 select *
正常字符串拼接所造成的
正常的拼接:
name = 'jerry'
age = '3 or 1=1'
sql = 'select * from student where name={} and age={}'.format(name,age)
这样是最原始的拼接字符串的方式,太危险
这样通过id 或者其他来获取你的数据,总有一个参数能返回数据,每个参数都会遭到***
SELECT * FROM t where a = 1 and b = 1 or 1 = 1 and id=5 or 1=1;
如果密码失败,那么还可以通过查询来进行***,这样也会返回数据
所以,不能使用字符串拼接的方式来拼写sql
凡是用户提交的数据都不可信,要做严格的检查,哪怕是调用函数
解决sql注入
参数化查询,可以有效防止注入***,并提高查询效率
通过cursor.execute参数进行防注入
cursor.execute(query,args=None)
查看args源码
def execute(self, query, args=None):
"""Execute a query
:param str query: Query to execute.
:param args: parameters used with query. (optional)
:type args: tuple, list or dict 明确写明args必须是一个元组列表或者字典
:return: Number of affected rows
:rtype: int
If args is a list or tuple, %s can be used as a placeholder in the query.
If args is a dict, %(name)s can be used as a placeholder in the query.
"""
while self.nextset():
pass
query = self.mogrify(query, args)
result = self._query(query)
self._executed = query
return result
通过拼接字符串的方式是能看到别人所有数据,那么改进如下:
try:
conn = pymysql.connect('4x.x.x.x','root','123456','test110')
cursor = conn.cursor(DictCursor)
age = '20 or 1=1'
sql = 'SELECT * FROM student where age=%s'
# sql =' SELECT * FROM student where age={}'.format(age)
cursor.execute(sql,(age,)) #将规则写入到execute中,判断是否有其他敏感字符
print(cursor.fetchall())
Warning: (1292, "Truncated incorrect DOUBLE value: '3 or 1=1'")
[{'name': 'jerry', 'age': 3, 'en': None}]
self._do_get_result()
尽可能转为目标的数据,友好的做了转换,但是发现有一些非法的字段被拦截掉了
提示非法DOUBLE的类型
更加复杂的例子
sql = 'SELECT * FROM student where age > %(age)s'
# sql =' SELECT * FROM student where age={}'.format(age)
cursor.execute(sql,{'age':10}) #参数化查询
print(cursor.fetchall())
参数化为啥可以提高效率?
因为sql语句缓存
客户端每发一次sql语句到mysql中都有一个sql编译过程,只是对sql语句进行编译
如果不用参数化查询,那么id=1 id=2 id=3 来了三条语句,需要重新编译
mysql服务端会对sql语句编译和缓存,编译只对sql部分,只是sql部分并不是结果
编译过程,需要语法分析,生成AST并优化生成执行计划等过程 这个过程比较耗费资源
可认为本身sql语句字符串就是一个key,找到key则直接找到结果
那么如果使用拼接的方案,每次发过去的sql都不一样,都需要编译并缓存
开发时,应该使用参数化查询
这里只是查询的字符串的缓存,并不是查询结果
游标的上下文
查看游标的源码 __enter__ 和 __exit__
游标类:
def __enter__(self):
return self #返回自己
def __exit__(self, *exc_info):
del exc_info
self.close() #调用close()关闭自己
连接类进入上下文的时候会返回一个游标对象,就是游标自己
游标类也使用上下文,用完了之后还会调用enter 和 exit
在退出时关闭游标对象,执行 self.close()
查看close源码:
def close(self):
"""
Closing a cursor just exhausts all remaining data.
"""
conn = self.connection
if conn is None:
return
try:
while self.nextset():
pass
finally:
self.connection = None
def __enter__(self):
return self
def __exit__(self, *exc_info):
del exc_info
self.close()
创建的时候用的是cursor,连接的时候也可以关闭
最后将连接 = None,说明没有连接,无法使用
连接类的上下文
有没有with as xxx ,是定义的问题,在退出with的时候,查看有否异常,如果存在异常则回滚
进入连接类的时候会返回一个游标
连接类:
def __enter__(self):
"""Context manager that returns a Cursor"""
return self.cursor()
def __exit__(self, exc, value, traceback):
"""On successful exit, commit. On exception, rollback"""
if exc:
self.rollback()
else:
self.commit() #如果没有异常则提交
创建的时候用的是self.cursor(),通过enter 进来的时候调用self.cursor(),直接调用了游标
游标通过调用本地方法获取
def cursor(self, cursor=None):
"""Create a new cursor to execute queries with"""
if cursor:
return cursor(self)
return self.cursorclass(self)
如果没有存在,那么直接调用cursorclass ,那么cursorclass直接调用游标类
而cursorclass 就是在Connection 初始化中去获取
self.cursorclass = cursorclass
cursorclass直接指向了游标类,通过调用游标类返回一个自己的实例提供调用
总结
连接:
游标的上下文是返回自己提供使用的,在close()将游标关闭,关闭的是自己将其标记为None
对于连接来讲,在with进入之后返回的是cursor()游标自己的对象
连接类如下,在调用它的时候,上下文先执行,并将游标类调用,所以调用的是cursor()
def __enter__(self):
"""Context manager that returns a Cursor"""
return self.cursor()
关键是关闭的时候并没有自行关闭连接,因为连接是共用连接(长连接),所以不会关闭连接的,但是游标需要关闭,完全由用户控制
退出:
但是如果退出with语句块,肯定会检查是否有异常,提交或者回滚
当离开语句块的时候会提交或回滚
所以,代码需要如下改进:
import pymysql
from pymysql.cursors import DictCursor
conn = None
try:
# 建立连接
conn = pymysql.connect('7.94.xx','root','123456','test110')
# cursor = conn.cursor(DictCursor) #注释游标获取,在with中已经获取了游标
with conn as cursor: #代替了上一上,在进入上下文的时候,conn已经获取了游标
d = {'age':'5'}
sql = 'select * from student where age>%(age)s'
print(sql)
# 执行
line = cursor.execute(sql,d)
print(line)
print(cursor.fetchall())
except:
print('errrrrr')
finally:
if conn:
conn.close()
这样进入到with 连接对象 as cursor ,的时候直接调用了conn连接对象的上下文,并调用了游标类
返回了游标的self,这样就可以直接在with中调用,免去了开销
如果使用游标的上下文则可以:
conn = None
try:
# 建立连接
conn = pymysql.connect('ip','root','123456','test110')
# cursor = conn.cursor(DictCursor)
with conn as cursor:
with cursor:
d = {'age':10}
sql = 'select * from student where age<%(age)s'
cursor.execute(sql,d)
print(cursor.fetchall())
except:
print('errrrrr')
finally:
if conn:
conn.close()
当with conn as的时候, 返回一个新的cursor对象,当退出时,只要提交或者回滚了事物,并没有关闭
不关闭游标表示可以继续反复使用它,节省了开销
但是在最后finally中定义了关闭
finally:
if conn:
conn.close()
连接池
数据库最大的开销其实是连接,所以引入了连接池的概念
设置一个数据库连接池,使用者如果需要则get一个
分析
一个连接池,应该是一个可以设置大小容器,存放数据库的连接,使用者需要连接从池中获取一个连接,用完归还
启动的时候开启连接到数据库中,连接数据库的时候避免了频繁连接数据库,也限制了主动连接
连接池中只是存放的连接,具体如何使用是用户的事情,只需要存放一个正常的连接即可
选型
对于线程、所机制、信号量等无非使用Queue比较合适当前情景
Queue本身就保证了线程的绝对安全性
# coding:utf-8
import pymysql
from pymysql.cursors import DictCursor
from queue import Queue
conn = None
class ConnPool:
def __init__(self,size,*args,**kwargs):
self.size = size
self._pool = Queue
for i in range(size):
conn = pymysql.connect(*args,**kwargs) # 传入用户名密码
self._pool.put(conn) # 传入到队列,生产者
def get_conn(self):
return self._pool.get()
def return_conn(self,conn:pymysql.connections.Connection):
self._pool.get(conn) # 消费连接
线程池引入:初始化线程池、初始化之后上面代码就等于有了连接了
接下来就是如何使用的问题
那么可否将id放入到集合里,集合中是不允许重复的,id也是
判断只要地址不同就可以,那么也有风险,如果内存被回收,或者在这时间又赋予这个id内存地址
所以是有风险的,也不排除用户的操作问题;
所以需要做类型的判断;
判断类型
import pymysql
from pymysql.cursors import DictCursor
from queue import Queue
conn = None
class ConnPool:
def __init__(self,size,*args,**kwargs):
self.size = size
self._pool = Queue(size)
for i in range(size):
conn = pymysql.connect(*args,**kwargs) # 传入用户名密码
self._pool.put(conn) # 传入到队列,生产者
def get_conn(self):
return self._pool.get()
# 判断类型是否是连接
def return_conn(self,conn:pymysql.connections.Connection):
if isinstance(conn,pymysql.connections.Connection):
self._pool.put(conn)
# 初始化类
pool = ConnPool(5,'4ip','root','123456','test')
#
conn = pool.get_conn()
print(conn)
# 消费连接
pool.return_conn(conn)
那么对于没有关闭的连接是需要手动的,所以能否在类中实现关闭,后期可以在当前类的上下文
引入Thread.Local ,前提是只有一个在用它,而且Thread.Local是顺序执行
在每个线程中用,限定在多线程的场景下使用,起码保证线程内,Thread.Local是安全的
import pymysql
from pymysql.cursors import DictCursor
from queue import Queue
import threading
class ConnPool:
def __init__(self,size,*args,**kwargs):
self.size = size
self._pool = Queue(size)
self.local = threading.local()
for i in range(size):
conn = pymysql.connect(*args,**kwargs) # 传入用户名密码
self._pool.put(conn) # 传入到队列,生产者
# 标记self.local.conn 获取的时候则赋值并返回get到的连接
def get_conn(self):
# return self._pool.get()
conn = self._pool.get()
self.local.conn = conn # 标记当前连接,用于获取put之后标记None,如果get到了则标记当前连接
return conn
# 线程是顺序的,就是说一直用完到还回线程
def return_conn(self,conn:pymysql.connections.Connection):
if isinstance(conn,pymysql.connections.Connection):
self._pool.put(conn)
self.local.conn = None # 用完之后标记为None,仿照连接类去写
# 初始化类
pool = ConnPool(5,'47.9.x.x.','root','123456','test')
#
conn = pool.get_conn()
print(conn)
# 消费连接
pool.return_conn(conn)
上下文改进
游标的上下文带来了一些列问题,那么可否自行增加上下文,如果是None则返回一个游标
如果不是None,那么就是连接了
enter 用于是否是None,是的话则赋予一个连接
exit 只要有一个退出,那么就标记当前为None
所以thread.local 还是可以的,因为都是在本地线程中使用,内存地址没有改动,直到关闭
# 线程是顺序的,就是说一直用完到还回线程
def return_conn(self,conn:pymysql.connections.Connection):
if isinstance(conn,pymysql.connections.Connection):
self._pool.put(conn)
self.local.conn = None # 用完之后标记为None,仿照连接类去写
def __enter__(self):
#刚进来的时候线程不存在则抛异常,肯定是None,所以给一个连接
if getattr(self.local,'conn',None) is None:
self.local.conn = self.get_conn()
return self.local.conn.cursor() #返回一个游标
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type:
self.local.conn.rollback()
else:
self.local.conn.commit()
self.return_conn(self.local.conn)
self.local.conn = None
# 初始化类
pool = ConnPool(5,'.x.x.x.x','root','123456','test')
with pool as cursor:
with cursor:
sql = 'select * from student'
cursor.execute(sql)
# print(cursor.fetchall())
for x in cursor:
print(x)