我选用的数据库是Mysql。选用它是因为其可以满足我的需求,而且资料多。因为作为第三方工具,难免有一些配置问题。所以本文也会讲解一些和Mysql配置及开发相关的问题。(转载请指明出于breaksoftware的csdn博客)
Mysql的安装我就不说了。我先说说和我习惯相关的一个问题:我希望在我Windows系统上可以通过Navicat for Mysql连接到我Ubuntu上的Mysql服务器。这块问题的解决可以参见《允许ubuntu下mysql远程连接》。
然后需要准备Python下进行Mysql开发的一些环境
apt-get install python-dev
apt-get install libmysqld-dev
apt-get install libmysqlclient-dev
updatedb
locate mysql_config
pip install MySQL-python -i http://pypi.douban.com/simple
由于我们要进行分表,所以数据库连接数要进行增大。于是需要修改mysql的配置
max_connections=1000
基础环境配置好后,我们就可以开始进行数据库管理器的设计和实现了。
数据库连接我们使用PooledDB连接池,使用这个库的最大好处是我们可以不用考虑很多底层的重连和多线程问题。
from DBUtils.PooledDB import PooledDB
class mysql_conn():
def __init__(self, host_name, port_num, user_name, password, db_name, charset_name = "utf8"):
self._host = host_name
self._port = int(port_num)
self._user = user_name
self._passwd = password
self._db = db_name
self._charset = charset_name
self._pool = None
self._table_info = {}
self.re_connect()
re_connect方法要考虑数据库不存在的情况。
def re_connect(self):
self._try_close_connect()
try:
self._pool = PooledDB(creator=MySQLdb, mincached=1, maxcached=20, maxconnections = 3, host = self._host, port = self._port, user = self._user, passwd = self._passwd, db = self._db, charset = self._charset)
LOG_INFO("connect %s success" %(self._db))
self.refresh_tables_info()
return
except MySQLdb.Error, e :
if e.args[0] == 1049:
self._create_db()
else:
LOG_WARNING("%s connect error %s" % (self._db, str(e)))
return
except Exception as e:
LOG_WARNING("connect mysql %s:%d %s error" % (self._host, self._port, self._db))
return
如果数据库不存在,MySQLdb.Error对象的值是1049,这种场景我们就需要创建数据库。如果发生其他错误,就直接报错
def _create_db(self):
conn = None
cursor = None
try:
conn = MySQLdb.connect(host=self._host, port=self._port, user=self._user, passwd=self._passwd)
cursor = conn.cursor()
sql = """create database if not exists %s""" %(self._db)
#LOG_INFO(sql)
cursor.execute(sql)
conn.select_db(self._db);
conn.commit()
except MySQLdb.Error, e :
LOG_WARNING("%s execute error %s" % (sql, str(e)))
finally:
try:
if cursor:
cursor.close()
if conn:
conn.close()
except:
pass
创建完数据后,要关闭连接。然后再走一遍数据库连接过程,但是这次就用不判断数据库是否存在了
try:
self._pool = PooledDB(creator=MySQLdb, mincached=1, maxcached=20, maxconnections = 3, host = self._host, port = self._port, user = self._user, passwd = self._passwd, db = self._db, charset = self._charset)
LOG_INFO("connect %s success" %(self._db))
self.refresh_tables_info()
return
except Exception as e:
LOG_WARNING("connect mysql %s:%d %s error" % (self._host, self._port, self._db))
return
if None == self._pool:
LOG_WARNING("connect mysql %s:%d %s error" % (self._host, self._port, self._db))
return
连接完数据库后,我们需要通过refresh_tables_info获取该库中表的信息。为什么我们需要获取这个信息呢?因为我希望在调用数据库操作时,mysql_conn类已经知晓一些字段的类型和长度,这样就可以将用户传入的数据进行相应的格式化,而从让调用者不用太多关心表字段类型。
def refresh_tables_info(self):
self._table_info = self._get_tables_info()
def _get_tables_info(self):
tables_info = {}
tables_sql = "show tables"
tables_name = self.execute(tables_sql, select = True)
for table_name_item in tables_name:
table_name = table_name_item[0]
if 0 == len(table_name):
continue
columns_sql = "show columns from " + table_name
table_info = self.execute(columns_sql, select = True)
table_name = table_name_item[0]
columns_info = self._get_table_info(table_info)
if len(columns_info):
tables_info[table_name] = columns_info
return tables_info
def _get_table_info(self, table_info):
columns_info = {}
for item in table_info:
column_name = item[0]
column_type_info = item[1]
(type, len) = self._get_column_type_info(column_type_info)
columns_info[column_name] = {"type":type, "length":len}
return columns_info
def _get_column_type_info(self, type_info):
re_str = '(\w*)\((\d*),?.*\)'
kw = re.findall(re_str, type_info)
if len(kw):
if len(kw[0]) > 1:
return (kw[0][0], kw[0][1])
return (None, None)
连接完数据库后,我们需要对表进行一系列操作,比如表查询
def select(self, table_name, fields_array, conditions, pre = "", extend = ""):
fields_str = "," . join(fields_array)
conds = []
for (column_name, column_data_info) in conditions.items():
column_type = self._get_column_type(table_name, column_name)
column_data = column_data_info[0]
operation = column_data_info[1]
if isinstance(column_data, list):
new_datas = []
for item in column_data:
new_data = self._conv_data(item, column_type)
try:
new_datas.append(new_data)
except:
LOG_WARNING("%s %s conv error" %(item, column_type))
temp_str = "," . join(new_datas)
cond = column_name + " " + operation + " (" + temp_str + ")"
conds.append(cond)
else:
new_data = self._conv_data(column_data, column_type)
try:
cond = column_name + " " + operation + " " + new_data
conds.append(cond)
except:
LOG_WARNING("%s %s conv error" %(column_data, column_type))
conds_str = " and " . join(conds)
sql = "select " + pre + " " + fields_str + " from " + table_name
if len(conds_str) > 0:
sql = sql + " where " + conds_str
if len(extend) > 0:
sql = sql + " " + extend
data_info = self.execute(sql, select = True)
return data_info
select方法中table_name是表名;fields_array是需要查询的字段数组;conditions是查询条件的Key/Value对,其中Key是字段名称,Value是个数组,数组的第一个元素是表达式右值,第二个元素是表达式的操作符。比如条件a>1 and b < 2,则conditions是{"a":["1",">"],"b":["2","<"] }。这儿需要考虑表达式右值是一个数组的场景,比如 a in (1,2,3)这样的条件,所以组装条件时做了特殊处理。
在处理表中数据的时候,比如查询语句的条件中有表中字段信息,再比如更新、插入数据语句中也有相关信息,这个时候都需要调用_get_column_type方法获取字段类型,然后通过_conv_data方法将数据进行格式化——当然目前这个函数还不能涵盖所有类型。
def _get_column_type(self, table_name, column_name):
if 0 == len(self._table_info):
self.refresh_tables_info()
if table_name not in self._table_info.keys():
LOG_WARNING("table_%s info in not exist" %(table_name))
return "None"
if column_name not in self._table_info[table_name].keys():
LOG_WARNING("column name %s is not in table %s" % (column_name, table_name))
return "None"
return self._table_info[table_name][column_name]["type"]
def _conv_data(self, data, type):
if type == "varchar" or type == "char":
return '"%s"' % (data)
elif type == "float" or type == "double":
try:
conv_data = float(data)
return "%.8f" % (conv_data)
except Exception as e:
LOG_WARNING("conv %s to %s error" % (data, type))
return "0"
elif type == "tinyint" or type == "bigint" or type == "int":
return "%d" % (int(data))
数据的更新操作和插入操作我就不把代码贴出来了。大家可以到之后公布的源码地址里看。
最后说明下操作执行的方法
def execute(self, sql, select = False, commit=False):
try:
data = ()
conn = self._pool.connection()
cursor = conn.cursor()
data = cursor.execute(sql)
if select:
data = cursor.fetchall()
if commit:
conn.commit()
cursor.close()
except Exception as e:
LOG_WARNING("excute sql error %s" % (str(e)))
LOG_ERROR_SQL("%s" % (sql))
finally:
cursor.close()
conn.close()
return data
一些操作我们需要数据库服务马上去执行,如创建数据库和创建表操作,因为我们在创建后立即会去使用或者查询相关信息。如果不及时执行,将影响之后的结果。这个场景下我们需要把commit参数设置为True。当然这种方式不要滥用,否则会影响数据库执行效率。
还有一些操作我们需要关心返回结果,比如select指令。这个时候就需要通过fetchall获取全部数据并返回。而创建表等操作则不需要fetchall结果。
因为我们数据库是分库的,而上述每个连接只管理一个数据库的操作,所以我们需要一个连接管理器去管理这些连接。
连接管理类是个单例,它通过modify_conns方法连接每个数据库
@singleton
class mysql_manager():
def __init__(self):
self._conns = {}
def modify_conns(self, conns_info):
for (conn_name, conn_info) in conns_info.items():
conn_info_hash = frame_tools.hash(json.dumps(conn_info))
if conn_name in self._conns.keys():
if conn_info_hash in self._conns[conn_name].conns_dict.keys():
continue
else:
self._conns[conn_name] = mysql_conn_info()
for key in conf_keys.mysql_conn_keys:
if key not in conn_info.keys():
continue
conn_obj = mysql_conn(conn_info["host"], conn_info["port"], conn_info["user"], conn_info["passwd"], conn_info["db"], conn_info["charset"])
self._conns[conn_name].conns_dict[conn_info_hash] = conn_obj
self._conns[conn_name].valid = 1
self._print_conns()
如果数据库连接信息发生改变,则需要将发生改变的数据库连接置为无效,然后再重新连接并记录
def add_conns(self, conns_info):
self.modify_conns(conns_info)
def remove_conns(self, conns_info):
for (conn_name, conn_info) in conns_info.items():
conn_info_hash = frame_tools.hash(json.dumps(conn_info))
if conn_name in self._conns.keys():
if conn_info_hash in self._conns[conn_name].conns_dict.keys():
self._conns[conn_name].valid = 0
self._print_conns()
连接管理类通过get_mysql_conn方法暴露连接对象
def get_mysql_conn(self, conn_name):
if conn_name not in self._conns.keys():
return None
conn_info = self._conns[conn_name]
valid = self._conns[conn_name].valid
if 0 == valid:
return None
conns_dict_keys = self._conns[conn_name].conns_dict.keys()
if len(conns_dict_keys) == 0:
return None
key = conns_dict_keys[-1]
ret_conn = self._conns[conn_name].conns_dict[key]
return ret_conn
它还暴露了一个刷新所有数据库中表信息的方法,用于在系统任务中定期更新内存中信息,保证数据稳定写入。
def refresh_all_conns_tables_info(self):
for (conn_name, conn_info) in self._conns.items():
conn = self.get_mysql_conn(conn_name)
if None != conn:
conn.refresh_tables_info()
我共设计了三种数据库。一种是保存股票基础数据的数据库,其配置是
[stock_db]
host=127.0.0.1
port=3306
user=root
passwd=fangliang
db=stock
charset=utf8
一个是保存每日实时数据的数据库
[daily_temp]
host=127.0.0.1
port=3306
user=root
passwd=fangliang
db=daily_temp
charset=utf8
最后一种是按股票代码分类的库,这种库有300个,设计原因我在《码农技术炒股之路——架构和设计》有说明
[stock_part]
host=127.0.0.1
port=3306
user=root
passwd=fangliang
db=stock_part
charset=utf8
range_max=300
注意range_max这个参数,如果配置中有该参数,则代表其是一个数据库组
class mysql_conf_parser:
def parse(self, job_conf_path):
cp = ConfigParser.SafeConfigParser()
cp.read(job_conf_path)
sections = cp.sections()
conns_info = {}
for section in sections:
conn_info = {}
for key in conf_keys.mysql_conn_keys:
if False == cp.has_option(section, key):
LOG_WARNING()
continue
conn_info[key] = cp.get(section, key)
if cp.has_option(section, "range_max"):
range_max = int(cp.get(section, "range_max"))
db_name_base = conn_info["db"]
for index in range(0, range_max):
conn_info["db"] = db_name_base + "_" + str(index)
section_index_name = section + "_" + str(index)
conns_info[section_index_name] = copy.deepcopy(conn_info)
else:
conns_info[section] = conn_info
return conns_info
最终我们将建成下图所示数据库
当我们从数据源获取数据后,需要使用一系列正则将原始数据转换成一组数据。然后才可以将这些数据写入数据库。举个例子,我们先看下正则管理器的配置
[string_comma_regular]
regular_expression_0 = data:\[(.*)\]
regular_expression_1 = "[^"]+"
regular_expression_2 = [^,"]+
[hq_sinajs_cn_list]
regular_expression_0 = var hq_str_([^;]*);
regular_expression_1 = ([^,="shz]+)
[quotes_money_163]
regular_expression_0 = ([^\r\n]+)
regular_expression_1 = ([^,'\r\n]+)
每一节都是一个正则名称,其下都是以“regular_expression_”开始的关键字。正则执行的顺序从序号小的向序号大的方向执行。我们通过下面的配置解释器读取配置
import ConfigParser
class regulars_manager_conf_parser:
def parse(self, regulars_conf_path):
cp = ConfigParser.SafeConfigParser()
cp.read(regulars_conf_path)
sections = cp.sections()
regulars_info = {}
for section in sections:
regular_info = []
regular_name_pre = "regular_expression_"
for index in range(0, len(cp.options(section))):
regular_name = regular_name_pre + str(index)
if cp.has_option(section, regular_name):
regular_info.append(cp.get(section, regular_name))
else:
break
regulars_info[section] = regular_info
return regulars_info
正则表达式管理通过下面方法维护信息
@singleton
class regular_split_manager():
def __init__(self):
self._regulars = {}
def modify_regulars(self, regulars_info):
for (regular_name, regular_info) in regulars_info.items():
self._regulars[regular_name] = regulars_info
def add_regulars(self, regulars_info):
for (regular_name, regular_info) in regulars_info.items():
self._regulars[regular_name] = regular_info
def remove_regulars(self, regulars_info):
for (regular_name, regular_info) in regulars_info.items():
if regular_name in self._regulars.keys():
del self._regulars[regular_name]
通过get_split_data方法可以将数据通过指定的正则名称进行分解,且分解到最后一步
def get_split_data(self, data, regular_name):
data_array = []
self._recursion_regular(data, regular_name, 0, data_array)
return data_array
def _get_regular(self, regular_name, deep):
if regular_name not in self._regulars.keys():
LOG_WARNING("regular manager has no %s" % (regular_name))
return ""
if deep >= len(self._regulars[regular_name]):
return ""
return self._regulars[regular_name][deep]
def _recursion_regular(self, data, regular_name, deep, data_array):
regular_str = self._get_regular(regular_name, deep)
split_data = re.findall(regular_str, data)
regualer_next_str = self._get_regular(regular_name, deep + 1)
split_array = []
if len(regualer_next_str) > 0:
for item in split_data:
self._recursion_regular(item, regular_name, deep + 1, data_array)
else:
for item in split_data:
split_array.append(item)
if len(split_array) > 0:
data_array.append(split_array)
有了上述各种管理器,我们已经把主要的准备工作做好。下一篇我将介绍最核心的任务调取管理器,它才是上述管理器最终的使用方。