swoole4.0 mysql连接池之读写分离

上一篇:

swoole mysql连接池之断线重连

,我们实现了一个更键壮的连接池,又有饭米粒er提出新的需求:能否支持读写分离?

当然没问题了!

开始主题前,先回答上一篇的问题:

为什么mysql要主动关闭长久不活跃的连接?

主要是mysql每个连接是需要一个线程去处理,而线程资源本身是有限制的,所以需要定期清理不活跃连接,释放资源,像redis本身是单进程单线程,是通过i/o复用机制,所以连接数可以非常大

本篇不介绍mysql本身怎么配置读写分离,而是让连接池给够自动做到读写分离,对于上层应用是透明无感知的。

本篇也不介绍利用现成的专业的mysql中间件来做读写分离,只是提供一个思路,也可以满足于中小规模的场景

为什么要读写分离?

一般的系统都是读多写少,利用读写分离,可以提升mysql的效率

读写分离后,从库可以水平扩展

下面我们开始代码之旅吧

配置先改造:

$config = [

'host'=>'127.0.0.1',//数据库ip

'port'=>3306,//数据库端口

'user'=>'root',//数据库用户名

'password'=>'123456',//数据库密码

'database'=>'test',//默认数据库名

'timeout'=>0.5,//数据库连接超时时间

'charset'=>'utf8mb4',//默认字符集

'strict_type'=>true,//ture,会自动表数字转为int类型

'pool_size'=>'3',//连接池大小

'pool_get_timeout'=>0.5,//当在此时间内未获得到一个连接,会立即返回。(表示所以的连接都已在使用中)

];

改成:

$config = [

'pool_size'=>'3',//连接池大小

'pool_get_timeout'=>0.5,//当在此时间内未获得到一个连接,会立即返回。(表示所以的连接都已在使用中)

'master'=> [

'host'=>'127.0.0.1',//数据库ip

'port'=>3306,//数据库端口

'user'=>'root',//数据库用户名

'password'=>'123456',//数据库密码

'database'=>'test',//默认数据库名

'timeout'=>0.5,//数据库连接超时时间

'charset'=>'utf8mb4',//默认字符集

'strict_type'=>true,//ture,会自动表数字转为int类型

],

'slave'=> [

[

'host'=>'127.0.0.1',//从数据库1ip

'port'=>3306,//从数据库1端口

'user'=>'root',//从数据库1用户名

'password'=>'123456',//从数据库1密码

'database'=>'test',//默认数据库名

'timeout'=>0.5,//数据库连接超时时间

'charset'=>'utf8mb4',//默认字符集

'strict_type'=>true,//ture,会自动表数字转为int类型

],

[

'host'=>'127.0.0.1',//从数据库2ip

'port'=>3306,//从数据库2端口

'user'=>'root',//从数据库2用户名

'password'=>'123456',//数据库密码

'database'=>'test',//默认数据库名

'timeout'=>0.5,//数据库连接超时时间

'charset'=>'utf8mb4',//默认字符集

'strict_type'=>true,//ture,会自动表数字转为int类型

]

],

];

上面模拟的一主两从,由于我本机没有搭建mysql的主从,所以这里主从配置一样

由于我们要做到对上层业务无感知,所以我们只需要改动mydb.php这一层就行了

useSwoole\Coroutine\MySQL;

classmydb

{

/**

*@varMySQL

*/

private$master;//主数据库连接

private$slave;//从数据库连接list

private$config;//数据库配置

/**

*@param$config

*@returnmixed

*@desc连接mysql

*/

public functionconnect($config)

{

//创建主数据连接

$master =newMySQL();

$res = $master->connect($config['master']);

if($res ===false) {

//连接失败,抛弃常

throw newRuntimeException($master->connect_error, $master->errno);

}else{

//存入master资源

$this->master= $master;

}

//创建从数据库连接

foreach($config['slave']as$conf) {

$slave =newMySQL();

$res = $slave->connect($conf);

if($res ===false) {

//连接失败,抛弃常

throw newRuntimeException($slave->connect_error, $slave->errno);

}else{

//存入slave资源

$this->slave[] = $slave;

}

}

$this->config= $config;

return$res;

}

/**

*@param$type

*@param$index

*@returnMySQL

*@desc单个数据库重连

*/

public functionreconnect($type, $index)

{

//通过type判断是主还是从

if('master'== $type) {

//创建主数据连接

$master =newMySQL();

$res = $master->connect($this->config['master']);

if($res ===false) {

//连接失败,抛弃常

throw newRuntimeException($master->connect_error, $master->errno);

}else{

//更新主库连接

$this->master= $master;

}

return$this->master;

}

//创建从数据连接

$slave =newMySQL();

$res = $slave->connect($this->config['slave'][$index]);

if($res ===false) {

//连接失败,抛弃常

throw newRuntimeException($slave->connect_error, $slave->errno);

}else{

//更新对应的重库连接

$this->slave[$index] = $slave;

}

return$slave;

}

/**

*@param$name

*@param$arguments

*@returnmixed

*@desc利用__call,实现操作mysql,并能做断线重连等相关检测

*/

public function__call($name, $arguments)

{

$sql = $arguments[];

$res = $this->chooseDb($sql);

print_r($res);

$db = $res['db'];

$result =call_user_func_array([$db, $name], $arguments);

if(false=== $result) {

if(!$db->connected) {//断线重连

echo"mysql reconnect".PHP_EOL;

$db = $this->reconnect($res['type'], $res['index']);

$result =call_user_func_array([$db, $name], $arguments);

return$this->parseResult($result, $db);

}

if(!empty($db->errno)) {//有错误码,则抛出弃常

throw newRuntimeException($db->error, $db->errno);

}

}

return$this->parseResult($result, $db);

}

/**

*@param$result

*@param$db MySQL

*@returnarray

*@desc格式化返回结果:查询:返回结果集,插入:返回新增id, 更新删除等操作:返回影响行数

*/

public functionparseResult($result, $db)

{

if($result ===true) {

return[

'affected_rows'=> $db->affected_rows,

'insert_id'=> $db->insert_id,

];

}

return$result;

}

/**

*@param$sql

*@desc根据sql语句,选择主还是从

* @ 判断有select 则选择从库, insert, update, delete等选择主库

*@returnarray

*/

protected functionchooseDb($sql)

{

//查询语句,随机选择一个从库

if('select'==strtolower(substr($sql,,6))) {

if(1==count($this->slave)) {

$index =;

}else{

$index =array_rand($this->slave);

}

return[

'type'=>'slave',

'index'=> $index,

'db'=> $this->slave[$index],

];

}

return[

'type'=>'master',

'index'=>,

'db'=> $this->master

];

}

}

connect的时候,我们自动把主,从所有的连接都建立好

执行sql的时候,我们跟据sql语句是否有select关键词,来判断选择主库还是从库

重连操作,只重连有问题的连接,不重连所有的连接

一个支持读写分离的mysql数据库接连池就OK了,是不是很简单的,大家完全可以在此基础上封装自己的CRUD等相关操作

我们在http server里增加 Insert的逻辑:

if($request->server['path_info'] =='/add') {

go(function()use($request, $response) {

//从池子中获取一个实例

try{

$pool =MysqlPool::getInstance();

echo"当前可用连接数:". $pool->getLength() .PHP_EOL;

$mysql = $pool->get();

echo"当前可用连接数:". $pool->getLength() .PHP_EOL;

defer(function()use($mysql) {

//协程执行完成,归还$mysql到连接池

MysqlPool::getInstance()->put($mysql);

echo"当前可用连接数:".MysqlPool::getInstance()->getLength() .PHP_EOL;

});

$ct =time();

$title = $request->get['title'];

$result = $mysql->query("insert into test values(NULL, '{$title}', '{$ct}')");

$response->end(json_encode($result));

}catch(\Exception $e) {

$response->end($e->getMessage());

}

});

return;

}

访问:http://127.0.0.1:9501/add?title=测试

再访问: http://127.0.0.1:9501/list, 看看是否有最新的

抛个问题:

主从分离后,如果写入之后马上读,大概率会读不到?为什么?有什么方案解决?

至此,关于mysql的姿势暂时告一段落了,接下来我们回过头来看最开始的http server, 我将通过一系列文章,把他改造成一个MVC的web开发框架,查看原文,可以先了解下 swoole http server的相关姿势

--------------伟大的分割线----------------

PHP饭米粒(phpfamily) 由一群靠谱的人建立,愿为PHPer带来一些值得细细品味的精神食粮!

饭米粒只发原创或授权发表的文章,不转载网上的文章

所发的文章,均可找到原作者进行沟通。

也希望各位多多打赏(算作稿费给文章作者),更希望大家多多投搞。

投稿请联系:

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20181207G0FC1C00?refer=cp_1026
  • 腾讯「云+社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 yunjia_community@tencent.com 删除。

扫码关注云+社区

领取腾讯云代金券