源数据库汇中,PYTHON 的使用不是一个可选项,主要在很多地方,监控,处理一些DEVOPS的事情,或者与业务有关的处理的工作都是需要PYTHON 来进行的。下面就用PTYHON 来完成一个很小的打印MYSQL 系统的内存占用率的小脚本来开始 PYTHON travel。(由于是初级水平有待提高,部分代码的有待进步)
在学习PYTHON 的过程中,(很菜)领会到PYTHON 本身的语法是一回事,你使用的各种包的熟悉又是另一回事。所以下面先得说说程序中使用的mysql 的 python connector.
PYTHON 连接到MYSQL 的包有很多 PYMYSQL , MYSQLAB, 这里没有使用而是使用了官方的 Connector/Python 的方式进行连接
下面相关的代码的初衷主要在分析一段时间INNODB BUFFER 的使用率,查看相关的变动情况,当然这样的监控也有各种图形化的监控软件,但灵活性不高
#!/usr/bin/env python3
# coding: utf-8
import mysql.connector
from mysql.connector import errorcode
import re
import time
import datetime
import sys
class DBFREEMEMORY:
def __init__(self, user=None, passwd=None, host=None, db=None):
self.user = user
self.passwd = passwd
self.host = host
self.db = db
def mysql_connect(self):
remotedb = {
'host': self.host ,
'user': self.user,
'passwd': self.passwd,
'database': self.db,
'charset': 'utf8',
'connection_timeout': 30,
'use_pure': True
}
try:
connect = mysql.connector.connect(**remotedb)
mycursor = connect.cursor(dictionary=True)
sql = "select substring_index(substring_index(event_name,'/',2),'/',-1) as event_type, round(sum(current_number_of_bytes_used) / 1024/1024, 2) as MB_CURRENTLY_USED from performance_schema.memory_summary_global_by_event_name group by event_type having mb_currently_used >0"
) as event_type, round(sum(current_number_of_bytes_used) / 1024/1024, 2) as MB_CURRENTLY_USED from performance_schema.memory_summary_global_by_event_name group by event_type having mb_currently_used >0"
mycursor.execute(sql)
memory = mycursor.fetchall()#收集当前使用的内存数
sql_1 = "show global variables like 'innodb_buffer_pool_size';"
mycursor.execute(sql_1)
full_memory = mycursor.fetchall()
#收集当前整体数据库占有的内存
for t in full_memory: if t['Value'] != None:
fmem = float(t['Value']) / 1024 / 1024
else:
t['Value'] = 1
for i in memory:
if i['MB_CURRENTLY_USED'] != None:
mem = i['MB_CURRENTLY_USED']
else:
i['MB_CURRENTLY_USED'] = 1
result = format(float(mem) / float(fmem) * 100, '.2f')
print(str(result) + '%' + ' ' + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
#将当前的内存使用数的百分比进行比较,并和当前时间一起打印
except mysql.connector.Error as err:
if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
print("Something is wrong with your user name or password")
elif err.errno == errorcode.ER_BAD_DB_ERROR:
print("Database does not exist")
else:
print(err)
finally:
mycursor.close()
connect.close()
if __name__ == '__main__':
info = DBFREEMEMORY(user='admin', passwd='1234.Com', host='192.168.198.9', db='performance_schema')
info.mysql_connect()
'host': self.host ,
'user': self.user,
'passwd': self.passwd,
'database': self.db,
'charset': 'utf8',
'connection_timeout': 30,
'use_pure': True
}
try:
connect = mysql.connector.connect(**remotedb)
mycursor = connect.cursor(dictionary=True)
sql = "select substring_index(substring_index(event_name,'/',2),'/',-1) as event_type, round(sum(current_number_of_bytes_used) / 1024/1024, 2) as MB_CURRENTLY_USED from performance_schema.memory_summary_global_by_event_name group by event_type having mb_currently_used >0"
mycursor.execute(sql)
memory = mycursor.fetchall()
sql_1 = "show global variables like 'innodb_buffer_pool_size';"
mycursor.execute(sql_1)
full_memory = mycursor.fetchall()
for t in full_memory:
if t['Value'] != None:
fmem = float(t['Value']) / 1024 / 1024
else:
t['Value'] = 1
for i in memory:
if i['MB_CURRENTLY_USED'] != None:
mem = i['MB_CURRENTLY_USED']
else:
i['MB_CURRENTLY_USED'] = 1
result = format(float(mem) / float(fmem) * 100, '.2f')
print(str(result) + '%' + ' ' + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
except mysql.connector.Error as err:
if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
print("Something is wrong with your user name or password")
elif err.errno == errorcode.ER_BAD_DB_ERROR:
print("Database does not exist")
else:
print(err)
if __name__ == '__main__':
info = DBFREEMEMORY(user='admin', passwd='1234.Com', host='192.168.198.9', db='performance_schema')
info.mysql_connect()
下面一个程序是针对自动生成测试数据库表,下面会在数据库层面自动生成test 库 以及 test1表,并插入随机数 150万
#!/usr/bin/env python3
# coding: utf-8
from __future__ import print_function
import mysql.connector
from mysql.connector import errorcode
from datetime import date, datetime, timedelta
import re
import time
import datetime
import sys
import random
class DBFREEMEMORY:
def __init__(self, user=None, passwd=None, host=None, db=None):
self.user = user
self.passwd = passwd
self.host = host
self.db = db
def gen_random_string(self): #产生随机内容的方法
char_list = list('1234567890' + '0123456789')
random.shuffle(char_list)
return ''.join(char_list)
def mysql_connect(self):
remotedb = {
'host': self.host,
'user': self.user,
'passwd': self.passwd,
'database': self.db,
'charset': 'utf8',
'connection_timeout': 30,
'use_pure': True
}
try:
connect = mysql.connector.connect(**remotedb)
mycursor = connect.cursor(dictionary=True)
#判断当前的服务器是否已经存在test数据库
mycursor.execute("show databases")
database = [mycursor.fetchall()]
# print (tables)
database_list = re.findall('(\'.*?\')', str(database))
database_list = [re.sub("'", '', each) for each in database_list]
print(database_list)
#如果存在test 数据库就直接退出
if 'test' in database_list:
print('The database of test has existed,it has deleted it,please run the job again')
else:#创建相关 mycursor.execute("create database test") print('You have test database')
DB_NAME = 'test'
mycursor.execute("USE test".format(DB_NAME))
#建表
TABLES = {}
TABLES['test'] = (
"CREATE TABLE `test1` ("
" `id` int(11) NOT NULL AUTO_INCREMENT,"
" `content` varchar(200) NULL,"
" `hash` varchar(200) NULL,"
" `insert_date` date NULL,"
" PRIMARY KEY (`id`)"
") ENGINE=InnoDB")
table_name = TABLES['test']
# mycursor.execute(table_name)
mycursor.execute("show tables")
table = [mycursor.fetchall()]
# print (tables)
table_list = re.findall('(\'.*?\')', str(table))
table_list = [re.sub("'", '', each) for each in table_list]
print(table_list)
#判断如果没有
if 'test1' in table_list:
print('The table of test has existed,please delete it')
else:
try: #执行并开始插入数据 10000条一提交
mycursor.execute(table_name)
#print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
s_time = time.time()
i = 0
while i < 150000:
content = self.gen_random_string()
sql = "INSERT INTO test1 (content, hash,insert_date) VALUES ('%s', '%d',now())" \
% (content, hash(content))
mycursor.execute(sql)
i += 1
if i % 10000 == 0:
connect.commit()
print(i)
connect.close()
print('You have test table')
en_time = time.time()
print(en_time-s_time)
#print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
except Exception as e:
print(e)
except mysql.connector.Error as err:
if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
print("Something is wrong with your user name or password")
elif err.errno == errorcode.ER_BAD_DB_ERROR:
print("Database does not exist")
else:
print(err)
finally:
mycursor.close()
connect.close()
if __name__ == '__main__':
info = DBFREEMEMORY(user='admin', passwd='1234.Com', host='192.168.198.9', db='performance_schema')
info.mysql_connect()
本文分享自 AustinDatabases 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有