http://www.makeru.com.cn/live/5020_2051.html?s=165154
Python数据库编程
创建连接时,就指定: pymysql.connect(host=’172.25.254.178”, user=’cooffee”, password=’cooffee”, charset=’utf8’, autocommit=True) # host(要连接的主机ip), user(用户), passwd(用户密码), charset(默认编码格式),autocommit(对数据库操作是否自动提交) eg:
import pymysql #连接数据库 conn = pymysql.connect(host='172.25.254.178', user='cooffee', password='cooffee', charset='utf8', autocommit=True) #创建一个游标, 用来给数据库发送sql语句的 cur = conn.cursor() # 选择需要操作的数据库 conn.select_db('cooffee') try: create_sql = 'create table myuser (name varchar(30), age int );' cur.execute(create_sql) #执行的sql except Exception as e: print('fail:',e) else: print('Success') # 4. 先关闭游标 cur.close() # 5. 关闭数据库连接 conn.close()
单条记录: 实现思路: sqli='数据库的增删改查语句' cur.execute(isqli) 批量实现用: cur.executemany(insert_sqli, users) #users为一个列表 123456
import random import pymysql conn = pymysql.connect(host='172.25.254.178', user='cooffee', password='cooffee', charset='utf8', autocommit=True) cur = conn.cursor() conn.select_db('cooffee') try: insert_sqli1 = 'insert into myuser VALUES ("user1", 32);' insert_sqli2 = 'insert into myuser VALUES ("user2", 18);' cur.execute(insert_sqli1) cur.execute(insert_sqli2) #批量对数据实现操作 users=[('user'+str(i),random.choice(range(18,40))) for i in range(10)] insert_sqli = 'insert into myuser VALUES (%s, %s);' cur.executemany(insert_sqli, users) select_sqli = 'select * from myuser;' res= cur.execute(select_sqli) print("查看语句的返回结果:", res) except Exception as e: print('fail:',e) else: print('Success') cur.close() conn.close()
import pymysql conn = pymysql.connect(host='172.25.254.178', user='cooffee', password='cooffee', charset='utf8', autocommit=True) cur = conn.cursor() conn.select_db('cooffee') try: delete_sqli = 'delete from myuser where name="user3";' cur.execute(delete_sqli) update_sqli = 'update myuser set age="1" where name="user2";' cur.execute(update_sqli) select_sqli = 'select * from myuser;' res = cur.execute(select_sqli) print("查看语句的返回结果:", res) # cur.fetchone类似与文件的操作f.readline, 每次只读取一条记录; print("查找一条记录:", cur.fetchone()) print("查找一条记录:", cur.fetchone()) #cur.fetchmany, 类似于f.readlines, 返回的是一个元组; print("查找5条记录:",cur.fetchmany(5)) #cur.fetchall返回的是一个元组; print("1查找所有记录", cur.fetchall()) #移动游标的位置, 到记录的最开始 cur.scroll(0, mode='absolute') print("查找2所有记录", cur.fetchall()) cur.scroll(-3, mode='relative') print("查找2所有记录", cur.fetchall()) except Exception as e: print('fail:',e) else: print('Success') cur.close() conn.close()
import pymysql # 安全管理器: with conn = pymysql.connect(host='172.25.254.178', user='cooffee', password='cooffee', charset='utf8', autocommit=True, db='cooffee') with conn: print("is_open", conn.open) # 2. 创建一个游标, 用来给数据库发送sql语句的; cur = conn.cursor() # 显示有多少行记录? res = cur.execute('select * from myuser;') # 显示每列的详细信息 desc = cur.description print("表的描述:",desc ) # 获取表头 print("表头", ",".join([item[0] for item in desc])) # 4. 先关闭游标 cur.close() conn.close()
import pymysql def create_data(): # 1. 连接数据库, host, user, passwd, charset conn = pymysql.connect(host='172.25.254.178', user='cooffee', password='cooffee', charset='utf8', autocommit=True, db='cooffee') # 2. 创建一个游标, 用来给数据库发送sql语句的; cur = conn.cursor() # 3. 创建一个表 try: create_sqli = 'create table bankData( id int PRIMARY KEY, ' \ 'name varchar(10), money FLOAT);' cur.execute(create_sqli) except Exception as e: print("Error: 表已经创建", e) else: print("表创建成功") #4. 创建数据 try: users = [(610001, '张三', 1000), (610002, '李四', 1000),(610003, '粉条', 1000)] insert_sqli = 'insert into bankData VALUES (%s, %s, %s);' cur.executemany(insert_sqli, users) except Exception as e: print('Error:', e) else: print("初始化数据成功!") # 4. 先关闭游标 cur.close() # 5. 关闭数据库连接 conn.close() class TransferMoney(object): def __init__(self, conn): self.conn = conn self.cursor = conn.cursor() def transfer(self, source_accid, target_accid, money): """ 转账方法: # 1. source_accid帐号是否存在; # 2. target_accid帐号是否存在; # 3. 是否有足够的钱 # 4. source_accid扣钱 # 5. target_acci加钱 # 6. 提交对数据库的操作 :param source_accid: 源帐号id :param target_accid: 目标帐号id :param money: 转账金额 :return: bool """ # 判断帐号是否存在 self.check_account_avaiable(source_accid) self.check_account_avaiable(target_accid) # 是否有足够的钱 self.has_enough_money(source_accid, money) try: # source_accid扣钱 self.reduce_money(source_accid, money) # target_acci加钱 self.add_money(target_accid, money) self.conn.commit() except Exception as e: # ********************撤销对于数据库的更改操作, 回滚****************** self.conn.rollback() else: print("%s给%s转账%s成功" %(source_accid, target_accid, money)) def check_account_avaiable(self, accid): """判断帐号是否存在, 传递参数为帐号id""" select_sqli = 'select * from bankData where id=%s' %(accid) print("execute sql:", select_sqli) res = self.cursor.execute(select_sqli) # 判断是否能找到帐号为accid的记录; if res == 1: return True else: raise Exception("帐号%s不存在" %(accid)) def has_enough_money(self, accid, money): """是否有足够的钱""" select_sqli = 'select money from bankData where id=%s' %(accid) print('execute sql:', select_sqli) self.cursor.execute(select_sqli) # 获取查询到的金钱数额 acc_money = self.cursor.fetchone()[0] print(acc_money, type(acc_money)) # 判断 if acc_money >= money: return True else: raise Exception("账户%s没有足够的金额, 当前余额为%s" %(accid, acc_money)) def reduce_money(self, accid, money): # 对于accid减少的金额为money try: update_sqli = 'update bankData set money=money-%s where id="%s"' %(money, accid) print("redcue_money sql:", update_sqli) self.cursor.execute(update_sqli) except Exception as e: print('Error:',e) def add_money(self, accid, money): # 对于accid减少的金额为money try: update_sqli = 'update bankData set money=money+%s where id="%s"' %(money, accid) print("add_money sql:", update_sqli) self.cursor.execute(update_sqli) except Exception as e: print('Error:',e) def __del__(self): # 当删除对象时, 自动执行, 关闭游标; self.cursor.close() conn.close() if __name__ == '__main__': create_data() conn = pymysql.connect(host='172.25.254.178', user='cooffee', password='cooffee', charset='utf8', db='cooffee') trans = TransferMoney(conn) trans.transfer('610003', '610002', 100)
正整数A和正整数B 的最小公倍数是指 能被A和B整除的最小的正整数值,设计一个算法,求输入A和B的最小公倍数。 - 输入描述: 输入两个正整数A和B。 - 输出描述: 输出A和B的最小公倍数。
num1 = int(input('Num1:')) num2 = int(input('Num2:')) min_num = num1 if num1<num2 else num2 for i in range(1,min_num+1): if num1%i == 0 and num2%i ==0: gys=i lcm = int((num1*num2)/gys) print("%s和%s的最大公约数为%s" %(num1,num2,gys)) print("%s和%s的最小公倍数为%s" %(num1,num2,lcm))
Catcher 是MCA国的情报员,他工作时发现敌国会用一些对称的密码进行通信,比如像这些ABBA,ABA,A,123321,但是他们有时会在开始或结束时加入一些无关的字符以防止别国破解。比如进行下列变化 ABBA->12ABBA,ABA->ABAKK,123321->51233214 。因为截获的串太长了,而且存在多种可能的情况(abaaab可看作是aba,或baaab的加密形式),Cathcer的工作量实在是太大了,他只能向电脑高手求助,你能帮Catcher找出最长的有效密码串吗? (注意:记得加上while处理多个测试用例)
while 1: s=input('输入:') slen=len(s) print(max([ len(s[i:j]) for i in range(slen) for j in range(i+1,slen+1) if s[i:j]==s[i:j][::-1]]))
文件score.dat中保存的是100名学生的姓名和Python课、高数和英语成绩。
(1)定义学生类,其中包含姓名、Python课、高数和英语成绩及总分、均分数据成员,成员函数根据需要确定。
(2)读入这名学生的成绩,用对象列表进行存储。
(3)求出各科和总分的最高分。
(4)请按总分的降序(高成绩在前,低成绩在后)排序
(5)在屏幕上显示各科及总分的最高分,排序后的成绩单(包括总分)保存到文件odered_score.dat中。
(6) 将文件中的所有学生信息, 保存在mariadb数据库中;
import random def create_scoredata(filename='score.dat'): databases=('name'+str(i)+','+str(random.randint(0,100))+','+str(random.randint(0,100)) +','+str(random.randint(0,100)) for i in range(100)) with open(filename,mode="a+") as f: for items in databases: f.write(items+'\n') def reade_scoreadata(filename='score.dat'): with open(filename) as f: readelise=f.readlines() return [items[0].split(',') for items in [reade.split() for reade in readelise]] class Student(object): def __init__(self,list): self.total=0 self.average=0 self.list=list def Total_average(self): for items in self.list: self.total=int(items[1])+int(items[2])+int(items[3]) self.average=self.total/3.0 items.append(self.total) items.append(self.average) def max_high(self): self.python=max([items[1]for items in self.list]) self.math=max([items[2]for items in self.list]) self.English=max([items[3]for items in self.list]) self.total=max([items[4]for items in self.list]) print('各科的最高分及总分的最高分:python:%s math:%s English:%s 总分:%s'%(self.python,self.math,self.English,self.total)) def sorte(self): return sorted(list,key=lambda x:-x[4]) def write_data(list,filename='odered_score.dat'): with open(filename,mode='w+') as f: for items in list: items[4]=str(items[4]) items[5]=str(items[5]) f.write(','.join(items)+'\n') if __name__=='__main__': create_scoredata() list=reade_scoreadata('score.dat') student=Student(list) student.Total_average() student.max_high() list1=student.sorte() write_data(list1)
窗口显示
文件score.dat
文件odered_score.dat
原文链接:https://blog.csdn.net/weixin_42635252/article/details/82453918
我来说两句