首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Python操作MySQL入门教程,使用pymysql操作MySQL,有录播直播私教课

创建数据库

create database gx character set utf8mb4;

连接数据库

#!/usr/bin/python3

import mysql as pymysql

# 打开数据库连接

db = pymysql.connect(host='localhost',

port=3306,

user='root',

password='zhangdapeng520',

database='gx')

# 使用 cursor() 方法创建一个游标对象 cursor

cursor = db.cursor()

# 使用 execute()  方法执行 SQL 查询

cursor.execute("SELECT VERSION()")

# 使用 fetchone() 方法获取单条数据.

data = cursor.fetchone()

print("Database version : %s " % data)

# 关闭数据库连接

db.close()

创建数据库表

#!/usr/bin/python3

import mysql as pymysql

# 打开数据库连接

db = pymysql.connect(host='localhost',

port=3306,

user='root',

password='zhangdapeng520',

database='gx')

# 使用 cursor() 方法创建一个游标对象 cursor

cursor = db.cursor()

# 使用 execute() 方法执行 SQL,如果表存在则删除

cursor.execute("drop table if exists user")

# 使用预处理语句创建表

sql = "create table user(id int primary key auto_increment, name varchar(36), age int)"

cursor.execute(sql)

# 关闭数据库连接

db.close()

新增数据

基本用法

#!/usr/bin/python3

import mysql as pymysql

# 打开数据库连接

db = pymysql.connect(host='localhost',

port=3306,

user='root',

password='zhangdapeng520',

database='gx')

# 使用 cursor() 方法创建一个游标对象 cursor

cursor = db.cursor()

# 使用 execute() 方法执行 SQL,如果表存在则删除

cursor.execute("drop table if exists user")

# 使用预处理语句创建表

sql = "create table user(id int primary key auto_increment, name varchar(36), age int)"

try:

# 执行sql语句

cursor.execute(sql)

# 提交到数据库执行

db.commit()

except:

# 如果发生错误则回滚

db.rollback()

# 使用预处理语句创建表

sql = "insert into user(name,age) values(%s,%s)"

args = ["张三", 23]

try:

# 执行sql语句

cursor.execute(sql, args)

# 提交到数据库执行

db.commit()

except:

# 如果发生错误则回滚

db.rollback()

# 关闭数据库连接

db.close()

封装execute方法

import mysql as pymysql

# 打开数据库连接

db = pymysql.connect(host='localhost',

port=3306,

user='root',

password='zhangdapeng520',

database='gx')

def execute(db, sql, args=None):

# 使用 cursor() 方法创建一个游标对象 cursor

cursor = db.cursor()

try:

# 执行sql语句

cursor.execute(sql, args)

# 提交到数据库执行

db.commit()

except:

# 如果发生错误则回滚

db.rollback()

# 使用 execute() 方法执行 SQL,如果表存在则删除

sql = "drop table if exists user"

execute(db, sql)

# 使用预处理语句创建表

sql = "create table user(id int primary key auto_increment, name varchar(36), age int)"

execute(db, sql)

# 使用预处理语句创建表

sql = "insert into user(name,age) values(%s,%s)"

args = ["张三", 23]

execute(db, sql, args)

# 关闭数据库连接

db.close()

批量新增

import mysql as pymysql

# 打开数据库连接

db = pymysql.connect(host='localhost',

port=3306,

user='root',

password='zhangdapeng520',

database='gx')

def execute(db, sql, args=None):

# 使用 cursor() 方法创建一个游标对象 cursor

cursor = db.cursor()

try:

# 执行sql语句

cursor.execute(sql, args)

# 提交到数据库执行

db.commit()

except:

# 如果发生错误则回滚

db.rollback()

def executemany(db, sql, args=None):

if type(args) is not list:

return

# 使用 cursor() 方法创建一个游标对象 cursor

cursor = db.cursor()

try:

# 执行sql语句

cursor.executemany(sql, args)

# 提交到数据库执行

db.commit()

except:

# 如果发生错误则回滚

db.rollback()

# 使用 execute() 方法执行 SQL,如果表存在则删除

sql = "drop table if exists user"

execute(db, sql)

# 使用预处理语句创建表

sql = "create table user(id int primary key auto_increment, name varchar(36), age int)"

execute(db, sql)

# 使用预处理语句创建表

sql = "insert into user(name,age) values(%s,%s)"

args = [

("张三1", 23),

("张三2", 33),

("张三3", 24),

]

executemany(db, sql, args)

# 关闭数据库连接

db.close()

查询数据

查询单条数据

import mysql as pymysql

# 打开数据库连接

db = pymysql.connect(host='localhost',

port=3306,

user='root',

password='zhangdapeng520',

database='gx')

def execute(db, sql, args=None):

# 使用 cursor() 方法创建一个游标对象 cursor

cursor = db.cursor()

try:

# 执行sql语句

cursor.execute(sql, args)

# 提交到数据库执行

db.commit()

except:

# 如果发生错误则回滚

db.rollback()

def fetchone(db, sql, args=None):

cursor = db.cursor()

cursor.execute(sql, args)

return cursor.fetchone()

def executemany(db, sql, args=None):

if type(args) is not list:

return

# 使用 cursor() 方法创建一个游标对象 cursor

cursor = db.cursor()

try:

# 执行sql语句

cursor.executemany(sql, args)

# 提交到数据库执行

db.commit()

except:

# 如果发生错误则回滚

db.rollback()

# 使用 execute() 方法执行 SQL,如果表存在则删除

sql = "drop table if exists user"

execute(db, sql)

# 使用预处理语句创建表

sql = "create table user(id int primary key auto_increment, name varchar(36), age int)"

execute(db, sql)

# 使用预处理语句创建表

sql = "insert into user(name,age) values(%s,%s)"

args = [

("张三1", 23),

("张三2", 33),

("张三3", 24),

]

executemany(db, sql, args)

# 查询

sql = "select * from user where id = %s"

args = [1]

print(fetchone(db, sql, args))

# 关闭数据库连接

db.close()

查询多条数据

import mysql as pymysql

# 打开数据库连接

db = pymysql.connect(host='localhost',

port=3306,

user='root',

password='zhangdapeng520',

database='gx')

def execute(db, sql, args=None):

# 使用 cursor() 方法创建一个游标对象 cursor

cursor = db.cursor()

try:

# 执行sql语句

cursor.execute(sql, args)

# 提交到数据库执行

db.commit()

except:

# 如果发生错误则回滚

db.rollback()

def fetchone(db, sql, args=None):

cursor = db.cursor()

cursor.execute(sql, args)

return cursor.fetchone()

def fetchall(db, sql, args=None):

cursor = db.cursor()

cursor.execute(sql, args)

return cursor.fetchall()

def executemany(db, sql, args=None):

if type(args) is not list:

return

# 使用 cursor() 方法创建一个游标对象 cursor

cursor = db.cursor()

try:

# 执行sql语句

cursor.executemany(sql, args)

# 提交到数据库执行

db.commit()

except:

# 如果发生错误则回滚

db.rollback()

# 使用 execute() 方法执行 SQL,如果表存在则删除

sql = "drop table if exists user"

execute(db, sql)

# 使用预处理语句创建表

sql = "create table user(id int primary key auto_increment, name varchar(36), age int)"

execute(db, sql)

# 使用预处理语句创建表

sql = "insert into user(name,age) values(%s,%s)"

args = [

("张三1", 23),

("张三2", 33),

("张三3", 24),

]

executemany(db, sql, args)

# 查询

sql = "select * from user"

print(fetchall(db, sql))

# 关闭数据库连接

db.close()

更新数据

更新单条数据

import mysql as pymysql

# 打开数据库连接

db = pymysql.connect(host='localhost',

port=3306,

user='root',

password='zhangdapeng520',

database='gx')

def execute(db, sql, args=None):

# 使用 cursor() 方法创建一个游标对象 cursor

cursor = db.cursor()

try:

# 执行sql语句

cursor.execute(sql, args)

# 提交到数据库执行

db.commit()

except:

# 如果发生错误则回滚

db.rollback()

def fetchone(db, sql, args=None):

cursor = db.cursor()

cursor.execute(sql, args)

return cursor.fetchone()

def fetchall(db, sql, args=None):

cursor = db.cursor()

cursor.execute(sql, args)

return cursor.fetchall()

def executemany(db, sql, args=None):

if type(args) is not list:

return

# 使用 cursor() 方法创建一个游标对象 cursor

cursor = db.cursor()

try:

# 执行sql语句

cursor.executemany(sql, args)

# 提交到数据库执行

db.commit()

except:

# 如果发生错误则回滚

db.rollback()

# 使用 execute() 方法执行 SQL,如果表存在则删除

sql = "drop table if exists user"

execute(db, sql)

# 使用预处理语句创建表

sql = "create table user(id int primary key auto_increment, name varchar(36), age int)"

execute(db, sql)

# 批量新增

sql = "insert into user(name,age) values(%s,%s)"

args = [

("张三1", 23),

("张三2", 33),

("张三3", 24),

]

executemany(db, sql, args)

# 更新

sql = "update user set name=%s where id = %s"

args = ["张三333", 1]

execute(db, sql, args)

# 查询

sql = "select * from user"

print(fetchall(db, sql))

# 关闭数据库连接

db.close()

更新多条数据

import mysql as pymysql

# 打开数据库连接

db = pymysql.connect(host='localhost',

port=3306,

user='root',

password='zhangdapeng520',

database='gx')

def execute(db, sql, args=None):

# 使用 cursor() 方法创建一个游标对象 cursor

cursor = db.cursor()

try:

# 执行sql语句

cursor.execute(sql, args)

# 提交到数据库执行

db.commit()

except:

# 如果发生错误则回滚

db.rollback()

def fetchone(db, sql, args=None):

cursor = db.cursor()

cursor.execute(sql, args)

return cursor.fetchone()

def fetchall(db, sql, args=None):

cursor = db.cursor()

cursor.execute(sql, args)

return cursor.fetchall()

def executemany(db, sql, args=None):

if type(args) is not list:

return

# 使用 cursor() 方法创建一个游标对象 cursor

cursor = db.cursor()

try:

# 执行sql语句

cursor.executemany(sql, args)

# 提交到数据库执行

db.commit()

except:

# 如果发生错误则回滚

db.rollback()

# 使用 execute() 方法执行 SQL,如果表存在则删除

sql = "drop table if exists user"

execute(db, sql)

# 使用预处理语句创建表

sql = "create table user(id int primary key auto_increment, name varchar(36), age int)"

execute(db, sql)

# 批量新增

sql = "insert into user(name,age) values(%s,%s)"

args = [

("张三1", 23),

("张三2", 33),

("张三3", 24),

]

executemany(db, sql, args)

# 更新

sql = "update user set age=%s where id > %s"

args = [34, 1]

execute(db, sql, args)

# 查询

sql = "select * from user"

print(fetchall(db, sql))

# 关闭数据库连接

db.close()

删除数据

删除单条数据

import mysql as pymysql

# 打开数据库连接

db = pymysql.connect(host='localhost',

port=3306,

user='root',

password='zhangdapeng520',

database='gx')

def execute(db, sql, args=None):

# 使用 cursor() 方法创建一个游标对象 cursor

cursor = db.cursor()

try:

# 执行sql语句

cursor.execute(sql, args)

# 提交到数据库执行

db.commit()

except:

# 如果发生错误则回滚

db.rollback()

def fetchone(db, sql, args=None):

cursor = db.cursor()

cursor.execute(sql, args)

return cursor.fetchone()

def fetchall(db, sql, args=None):

cursor = db.cursor()

cursor.execute(sql, args)

return cursor.fetchall()

def executemany(db, sql, args=None):

if type(args) is not list:

return

# 使用 cursor() 方法创建一个游标对象 cursor

cursor = db.cursor()

try:

# 执行sql语句

cursor.executemany(sql, args)

# 提交到数据库执行

db.commit()

except:

# 如果发生错误则回滚

db.rollback()

# 使用 execute() 方法执行 SQL,如果表存在则删除

sql = "drop table if exists user"

execute(db, sql)

# 使用预处理语句创建表

sql = "create table user(id int primary key auto_increment, name varchar(36), age int)"

execute(db, sql)

# 批量新增

sql = "insert into user(name,age) values(%s,%s)"

args = [

("张三1", 23),

("张三2", 33),

("张三3", 24),

]

executemany(db, sql, args)

# 删除

sql = "delete from user where id = %s"

args = [1]

execute(db, sql, args)

# 查询

sql = "select * from user"

print(fetchall(db, sql))

# 关闭数据库连接

db.close()

删除多条数据

import mysql as pymysql

# 打开数据库连接

db = pymysql.connect(host='localhost',

port=3306,

user='root',

password='zhangdapeng520',

database='gx')

def execute(db, sql, args=None):

# 使用 cursor() 方法创建一个游标对象 cursor

cursor = db.cursor()

try:

# 执行sql语句

cursor.execute(sql, args)

# 提交到数据库执行

db.commit()

except:

# 如果发生错误则回滚

db.rollback()

def fetchone(db, sql, args=None):

cursor = db.cursor()

cursor.execute(sql, args)

return cursor.fetchone()

def fetchall(db, sql, args=None):

cursor = db.cursor()

cursor.execute(sql, args)

return cursor.fetchall()

def executemany(db, sql, args=None):

if type(args) is not list:

return

# 使用 cursor() 方法创建一个游标对象 cursor

cursor = db.cursor()

try:

# 执行sql语句

cursor.executemany(sql, args)

# 提交到数据库执行

db.commit()

except:

# 如果发生错误则回滚

db.rollback()

# 使用 execute() 方法执行 SQL,如果表存在则删除

sql = "drop table if exists user"

execute(db, sql)

# 使用预处理语句创建表

sql = "create table user(id int primary key auto_increment, name varchar(36), age int)"

execute(db, sql)

# 批量新增

sql = "insert into user(name,age) values(%s,%s)"

args = [

("张三1", 23),

("张三2", 33),

("张三3", 24),

]

executemany(db, sql, args)

# 删除

sql = "delete from user where id > %s"

args = [1]

execute(db, sql, args)

# 查询

sql = "select * from user"

print(fetchall(db, sql))

# 关闭数据库连接

db.close()

初步实现

这种方案,有比较严重的注入的风险。

import mysql as pymysql

# 打开数据库连接

db = pymysql.connect(host='localhost',

port=3306,

user='root',

password='zhangdapeng520',

database='gx')

def execute(db, sql, args=None):

# 使用 cursor() 方法创建一个游标对象 cursor

cursor = db.cursor()

try:

# 执行sql语句

cursor.execute(sql, args)

# 提交到数据库执行

db.commit()

except:

# 如果发生错误则回滚

db.rollback()

def fetchone(db, sql, args=None):

cursor = db.cursor()

cursor.execute(sql, args)

return cursor.fetchone()

def fetchall(db, sql, args=None):

cursor = db.cursor()

cursor.execute(sql, args)

return cursor.fetchall()

def executemany(db, sql, args=None):

if type(args) is not list:

return

# 使用 cursor() 方法创建一个游标对象 cursor

cursor = db.cursor()

try:

# 执行sql语句

cursor.executemany(sql, args)

# 提交到数据库执行

db.commit()

except:

# 如果发生错误则回滚

db.rollback()

def fetchmany(db, sql, args=None):

if type(args) is not list:

return

# select * from user where id in (1,2,3)

in_arg = ",".join([str(i) for i in args])

print("in查询SQL:", sql)

print("in查询参数:", in_arg)

sql = sql % (in_arg,)

print("in查询最终SQL:", sql)

cursor = db.cursor()

cursor.execute(sql)

return cursor.fetchall()

# 使用 execute() 方法执行 SQL,如果表存在则删除

sql = "drop table if exists user"

execute(db, sql)

# 使用预处理语句创建表

sql = "create table user(id int primary key auto_increment, name varchar(36), age int)"

execute(db, sql)

# 批量新增

sql = "insert into user(name,age) values(%s,%s)"

args = [

("张三1", 23),

("张三2", 33),

("张三3", 24),

]

executemany(db, sql, args)

# 根据id列表查询

sql = "select * from user where id in (%s)"

args = [1, 2, 4]

print(fetchmany(db, sql, args))

# 查询

sql = "select * from user"

print(fetchall(db, sql))

# 关闭数据库连接

db.close()

安全性优化

核心代码:

def is_safe_id(id):

"""校验是否为安全的ID"""

return re.match(r"^\w+$", str(id)) is not None

def fetchin(db, sql, args=None):

if type(args) is not list:

return

# select * from user where id in (1,2,3)

in_arg = ",".join([str(i) for i in args if is_safe_id(i)])

print("in查询SQL:", sql)

print("in查询参数:", in_arg)

sql = sql % (in_arg,)

print("in查询最终SQL:", sql)

cursor = db.cursor()

cursor.execute(sql)

return cursor.fetchall()

完整代码:

import mysql as pymysql

import re

# 打开数据库连接

db = pymysql.connect(host='localhost',

port=3306,

user='root',

password='zhangdapeng520',

database='gx')

def execute(db, sql, args=None):

# 使用 cursor() 方法创建一个游标对象 cursor

cursor = db.cursor()

try:

# 执行sql语句

cursor.execute(sql, args)

# 提交到数据库执行

db.commit()

except:

# 如果发生错误则回滚

db.rollback()

def fetchone(db, sql, args=None):

cursor = db.cursor()

cursor.execute(sql, args)

return cursor.fetchone()

def fetchall(db, sql, args=None):

cursor = db.cursor()

cursor.execute(sql, args)

return cursor.fetchall()

def executemany(db, sql, args=None):

if type(args) is not list:

return

# 使用 cursor() 方法创建一个游标对象 cursor

cursor = db.cursor()

try:

# 执行sql语句

cursor.executemany(sql, args)

# 提交到数据库执行

db.commit()

except:

# 如果发生错误则回滚

db.rollback()

def is_safe_id(id):

"""校验是否为安全的ID"""

return re.match(r"^\w+$", str(id)) is not None

def fetchin(db, sql, args=None):

if type(args) is not list:

return

# select * from user where id in (1,2,3)

in_arg = ",".join([str(i) for i in args if is_safe_id(i)])

print("in查询SQL:", sql)

print("in查询参数:", in_arg)

sql = sql % (in_arg,)

print("in查询最终SQL:", sql)

cursor = db.cursor()

cursor.execute(sql)

return cursor.fetchall()

# 使用 execute() 方法执行 SQL,如果表存在则删除

sql = "drop table if exists user"

execute(db, sql)

# 使用预处理语句创建表

sql = "create table user(id int primary key auto_increment, name varchar(36), age int)"

execute(db, sql)

# 批量新增

sql = "insert into user(name,age) values(%s,%s)"

args = [

("张三1", 23),

("张三2", 33),

("张三3", 24),

]

executemany(db, sql, args)

# 根据id列表查询

sql = "select * from user where id in (%s)"

args = [1, 2, 4]

print(fetchin(db, sql, args))

# 查询

sql = "select * from user"

print(fetchall(db, sql))

# 关闭数据库连接

db.close()

根据ID列表删除

import mysql as pymysql

import re

# 打开数据库连接

db = pymysql.connect(host='localhost',

port=3306,

user='root',

password='zhangdapeng520',

database='gx')

def execute(db, sql, args=None):

# 使用 cursor() 方法创建一个游标对象 cursor

cursor = db.cursor()

try:

# 执行sql语句

cursor.execute(sql, args)

# 提交到数据库执行

db.commit()

except:

# 如果发生错误则回滚

db.rollback()

def fetchone(db, sql, args=None):

cursor = db.cursor()

cursor.execute(sql, args)

return cursor.fetchone()

def fetchall(db, sql, args=None):

cursor = db.cursor()

cursor.execute(sql, args)

return cursor.fetchall()

def executemany(db, sql, args=None):

if type(args) is not list:

return

# 使用 cursor() 方法创建一个游标对象 cursor

cursor = db.cursor()

try:

# 执行sql语句

cursor.executemany(sql, args)

# 提交到数据库执行

db.commit()

except:

# 如果发生错误则回滚

db.rollback()

def is_safe_id(id):

"""校验是否为安全的ID"""

return re.match(r"^\w+$", str(id)) is not None

def fetchin(db, sql, args=None):

if type(args) is not list:

return

# select * from user where id in (1,2,3)

in_arg = ",".join([str(i) for i in args if is_safe_id(i)])

print("in查询SQL:", sql)

print("in查询参数:", in_arg)

sql = sql % (in_arg,)

print("in查询最终SQL:", sql)

cursor = db.cursor()

cursor.execute(sql)

return cursor.fetchall()

def deletein(db, sql, args=None):

if type(args) is not list:

return

in_arg = ",".join([str(i) for i in args if is_safe_id(i)])

sql = sql % (in_arg,)

cursor = db.cursor()

cursor.execute(sql)

# 使用 execute() 方法执行 SQL,如果表存在则删除

sql = "drop table if exists user"

execute(db, sql)

# 使用预处理语句创建表

sql = "create table user(id int primary key auto_increment, name varchar(36), age int)"

execute(db, sql)

# 批量新增

sql = "insert into user(name,age) values(%s,%s)"

args = [

("张三1", 23),

("张三2", 33),

("张三3", 24),

]

executemany(db, sql, args)

# 根据id列表删除

sql = "delete from user where id in  (%s)"

args = [1, 2, 4]

deletein(db, sql, args)

# 查询

sql = "select * from user"

print(fetchall(db, sql))

# 关闭数据库连接

db.close()

使用with上下文执行SQL

import mysql as pymysql

import re

# 打开数据库连接

db = pymysql.connect(host='localhost',

port=3306,

user='root',

password='zhangdapeng520',

database='gx')

def execute(db, sql, args=None):

# 使用 cursor() 方法创建一个游标对象 cursor

with db.cursor() as cursor:

try:

# 执行sql语句

cursor.execute(sql, args)

# 提交到数据库执行

db.commit()

except:

# 如果发生错误则回滚

db.rollback()

def fetchone(db, sql, args=None):

with db.cursor() as cursor:

cursor.execute(sql, args)

return cursor.fetchone()

def fetchall(db, sql, args=None):

with db.cursor() as cursor:

cursor = db.cursor()

cursor.execute(sql, args)

return cursor.fetchall()

def executemany(db, sql, args=None):

if type(args) is not list:

return

with db.cursor() as cursor:

try:

# 执行sql语句

cursor.executemany(sql, args)

# 提交到数据库执行

db.commit()

except:

# 如果发生错误则回滚

db.rollback()

def is_safe_id(id):

"""校验是否为安全的ID"""

return re.match(r"^\w+$", str(id)) is not None

def fetchin(db, sql, args=None):

if type(args) is not list:

return

# select * from user where id in (1,2,3)

in_arg = ",".join([str(i) for i in args if is_safe_id(i)])

print("in查询SQL:", sql)

print("in查询参数:", in_arg)

sql = sql % (in_arg,)

print("in查询最终SQL:", sql)

cursor = db.cursor()

cursor.execute(sql)

return cursor.fetchall()

def deletein(db, sql, args=None):

if type(args) is not list:

return

in_arg = ",".join([str(i) for i in args if is_safe_id(i)])

sql = sql % (in_arg,)

cursor = db.cursor()

cursor.execute(sql)

# 使用 execute() 方法执行 SQL,如果表存在则删除

sql = "drop table if exists user"

execute(db, sql)

# 使用预处理语句创建表

sql = "create table user(id int primary key auto_increment, name varchar(36), age int)"

execute(db, sql)

# 批量新增

sql = "insert into user(name,age) values(%s,%s)"

args = [

("张三1", 23),

("张三2", 33),

("张三3", 24),

]

executemany(db, sql, args)

# 根据id列表删除

sql = "delete from user where id in  (%s)"

args = [1, 2, 4]

deletein(db, sql, args)

# 查询

sql = "select * from user"

print(fetchall(db, sql))

# 关闭数据库连接

db.close()

  • 发表于:
  • 原文链接https://page.om.qq.com/page/OKsrLqKm1DhWYYZR5oi7aquA0
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券