前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >--如何用PYTHON 定时打印 MYSQL FREE 使用率,与自动创建测试数据库表

--如何用PYTHON 定时打印 MYSQL FREE 使用率,与自动创建测试数据库表

作者头像
AustinDatabases
发布2020-06-11 14:33:04
1.2K0
发布2020-06-11 14:33:04
举报
文章被收录于专栏:AustinDatabasesAustinDatabases

源数据库汇中,PYTHON 的使用不是一个可选项,主要在很多地方,监控,处理一些DEVOPS的事情,或者与业务有关的处理的工作都是需要PYTHON 来进行的。下面就用PTYHON 来完成一个很小的打印MYSQL 系统的内存占用率的小脚本来开始 PYTHON travel。(由于是初级水平有待提高,部分代码的有待进步)

在学习PYTHON 的过程中,(很菜)领会到PYTHON 本身的语法是一回事,你使用的各种包的熟悉又是另一回事。所以下面先得说说程序中使用的mysql 的 python connector.

PYTHON 连接到MYSQL 的包有很多 PYMYSQL , MYSQLAB, 这里没有使用而是使用了官方的 Connector/Python 的方式进行连接

下面相关的代码的初衷主要在分析一段时间INNODB BUFFER 的使用率,查看相关的变动情况,当然这样的监控也有各种图形化的监控软件,但灵活性不高

代码语言:javascript
复制
#!/usr/bin/env python3
# coding: utf-8
import mysql.connector
from mysql.connector import errorcode
import re
import time
import datetime
import sys


class DBFREEMEMORY:
    def __init__(self, user=None, passwd=None, host=None, db=None):
        self.user = user
        self.passwd = passwd
        self.host = host
        self.db = db


    def mysql_connect(self):
        remotedb = {
          'host': self.host ,
          'user': self.user,
          'passwd': self.passwd,
          'database': self.db,
          'charset': 'utf8',
          'connection_timeout': 30,
          'use_pure': True
        }

        try:
          connect = mysql.connector.connect(**remotedb)
          mycursor = connect.cursor(dictionary=True)
代码语言:javascript
复制
          sql = "select substring_index(substring_index(event_name,'/',2),'/',-1) as event_type, round(sum(current_number_of_bytes_used) / 1024/1024, 2) as MB_CURRENTLY_USED from performance_schema.memory_summary_global_by_event_name group by event_type having mb_currently_used >0"
代码语言:javascript
复制
          ) as event_type, round(sum(current_number_of_bytes_used) / 1024/1024, 2) as MB_CURRENTLY_USED from performance_schema.memory_summary_global_by_event_name group by event_type having mb_currently_used >0"
          mycursor.execute(sql)
memory = mycursor.fetchall()#收集当前使用的内存数
sql_1 = "show global variables  like 'innodb_buffer_pool_size';"
          mycursor.execute(sql_1)
          full_memory = mycursor.fetchall()
          #收集当前整体数据库占有的内存

          for t in full_memory:          if t['Value'] != None:
              fmem = float(t['Value']) / 1024 / 1024
            else:
              t['Value'] = 1
          for i in memory:
            if i['MB_CURRENTLY_USED'] != None:
               mem = i['MB_CURRENTLY_USED']
            else:
              i['MB_CURRENTLY_USED'] = 1
          result = format(float(mem) / float(fmem) * 100, '.2f')
          print(str(result) + '%' + '  ' + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
          #将当前的内存使用数的百分比进行比较,并和当前时间一起打印

        except mysql.connector.Error as err:
          if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
            print("Something is wrong with your user name or password")
          elif err.errno == errorcode.ER_BAD_DB_ERROR:
            print("Database does not exist")
          else:
            print(err)
        finally:
          mycursor.close()
          connect.close()


if __name__ == '__main__':
   info = DBFREEMEMORY(user='admin', passwd='1234.Com', host='192.168.198.9', db='performance_schema')
   info.mysql_connect()
代码语言:javascript
复制
          'host': self.host ,
          'user': self.user,
          'passwd': self.passwd,
          'database': self.db,
          'charset': 'utf8',
          'connection_timeout': 30,
          'use_pure': True
        }

        try:
          connect = mysql.connector.connect(**remotedb)
          mycursor = connect.cursor(dictionary=True)

          sql = "select substring_index(substring_index(event_name,'/',2),'/',-1) as event_type, round(sum(current_number_of_bytes_used) / 1024/1024, 2) as MB_CURRENTLY_USED from performance_schema.memory_summary_global_by_event_name group by event_type having mb_currently_used >0"
          mycursor.execute(sql)
          memory = mycursor.fetchall()
          sql_1 = "show global variables  like 'innodb_buffer_pool_size';"
          mycursor.execute(sql_1)
          full_memory = mycursor.fetchall()

          for t in full_memory:
            if t['Value'] != None:
              fmem = float(t['Value']) / 1024 / 1024
            else:
              t['Value'] = 1
          for i in memory:
            if i['MB_CURRENTLY_USED'] != None:
               mem = i['MB_CURRENTLY_USED']
            else:
              i['MB_CURRENTLY_USED'] = 1
          result = format(float(mem) / float(fmem) * 100, '.2f')
          print(str(result) + '%' + '  ' + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))


        except mysql.connector.Error as err:
          if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
            print("Something is wrong with your user name or password")
          elif err.errno == errorcode.ER_BAD_DB_ERROR:
            print("Database does not exist")
          else:
            print(err)


if __name__ == '__main__':
   info = DBFREEMEMORY(user='admin', passwd='1234.Com', host='192.168.198.9', db='performance_schema')
   info.mysql_connect()

下面一个程序是针对自动生成测试数据库表,下面会在数据库层面自动生成test 库 以及 test1表,并插入随机数 150万

代码语言:javascript
复制
#!/usr/bin/env python3
# coding: utf-8
from __future__ import print_function
import mysql.connector
from mysql.connector import errorcode
from datetime import date, datetime, timedelta
import re
import time
import datetime
import sys
import random


class DBFREEMEMORY:
    def __init__(self, user=None, passwd=None, host=None, db=None):
        self.user = user
        self.passwd = passwd
        self.host = host
        self.db = db

    def gen_random_string(self):  #产生随机内容的方法

        char_list = list('1234567890' + '0123456789')
        random.shuffle(char_list)
        return ''.join(char_list)

    def mysql_connect(self):
        remotedb = {
            'host': self.host,
            'user': self.user,
            'passwd': self.passwd,
            'database': self.db,
            'charset': 'utf8',
            'connection_timeout': 30,
            'use_pure': True
        }

        try:
            connect = mysql.connector.connect(**remotedb)
            mycursor = connect.cursor(dictionary=True)
            #判断当前的服务器是否已经存在test数据库
            mycursor.execute("show databases")
            database = [mycursor.fetchall()]
            # print (tables)
            database_list = re.findall('(\'.*?\')', str(database))
            database_list = [re.sub("'", '', each) for each in database_list]
            print(database_list)
            #如果存在test 数据库就直接退出
            if 'test' in database_list:
                print('The database of test has existed,it has deleted it,please run the job again')
               

            else:#创建相关                         mycursor.execute("create database test")          print('You have test database')

            DB_NAME = 'test'
            mycursor.execute("USE test".format(DB_NAME))
            #建表
            TABLES = {}
            TABLES['test'] = (
                "CREATE TABLE `test1` ("
                "  `id` int(11) NOT NULL AUTO_INCREMENT,"
                "  `content` varchar(200)  NULL,"
                "  `hash` varchar(200)  NULL,"
                "  `insert_date` date  NULL,"
                "  PRIMARY KEY (`id`)"
                ") ENGINE=InnoDB")

            table_name = TABLES['test']
            # mycursor.execute(table_name)
            mycursor.execute("show tables")
            table = [mycursor.fetchall()]
            # print (tables)
            table_list = re.findall('(\'.*?\')', str(table))
            table_list = [re.sub("'", '', each) for each in table_list]
            print(table_list)
            #判断如果没有
            if 'test1' in table_list:
                print('The table of test has existed,please delete it')

            else:
                try:  #执行并开始插入数据 10000条一提交
                    mycursor.execute(table_name)
                    #print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
                    s_time = time.time()
                    i = 0
                    while i < 150000:
                        content = self.gen_random_string()
                        sql = "INSERT INTO test1 (content, hash,insert_date) VALUES ('%s', '%d',now())" \
                              % (content, hash(content))
                        mycursor.execute(sql)
                        i += 1
                        if i % 10000 == 0:
                           connect.commit()
                           print(i)
                    connect.close()
                    print('You have test table')
                    en_time = time.time()
                    print(en_time-s_time)
                    #print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
                except Exception as e:
                    print(e)





        except mysql.connector.Error as err:
            if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
                print("Something is wrong with your user name or password")
            elif err.errno == errorcode.ER_BAD_DB_ERROR:
                print("Database does not exist")
            else:
                print(err)
        finally:
            mycursor.close()
            connect.close()


if __name__ == '__main__':
    info = DBFREEMEMORY(user='admin', passwd='1234.Com', host='192.168.198.9', db='performance_schema')
info.mysql_connect()
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-06-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 AustinDatabases 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 SQL Server
腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档