前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >使用Coroutine\Channel实现一个简单的MySQL连接池

使用Coroutine\Channel实现一个简单的MySQL连接池

作者头像
沈唁
修改2020-02-11 21:38:57
9160
修改2020-02-11 21:38:57
举报
文章被收录于专栏:沈唁志沈唁志

Channel 通道,类似于 go 语言的 chan,支持多生产者协程和多消费者协程,Swoole 底层自动实现了协程的切换和调度

Channel 实现原理

  • 通道与 PHP 的 Array 类似,仅占用内存,没有其他额外的资源申请,所有操作均为内存操作,无 IO 消耗
  • 底层使用 PHP 引用计数实现,无内存拷贝。即使是传递巨大字符串或数组也不会产生额外性能消耗

方法

  1. Channel->push :当队列中有其他协程正在等待 pop 数据时,自动按顺序唤醒一个消费者协程。当队列已满时自动 yield 让出控制器,等待其他协程消费数据
  2. Channel->pop:当队列为空时自动 yield,等待其他协程生产数据。消费数据后,队列可写入新的数据,自动按顺序唤醒一个生产者协程

连接池

使用Coroutine\Channel来实现 MySQL 连接池可以使用 defer 特性来实现资源的回收,同时可以被协程调度,而且使用channel->pop方法的时候,可以设置超时,减少一系列的心智负担

代码实现

代码语言:javascript
复制
namespace SwExample;

class MysqlPool
{
    private static $instance;
    private $pool;  //连接池容器,一个 channel
    private $config;

    /**
     * @param  null $config
     * @return MysqlPool
     * @desc   获取连接池实例
     */
    public static function getInstance($config = null)
    {
        if (empty(self::$instance)) {
            if (empty($config)) {
                throw new \RuntimeException("mysql config empty");
            }
            self::$instance = new static($config);
        }

        return self::$instance;
    }

   /**
    * MysqlPool constructor.
    *
    * @param $config
    * @desc  初始化,自动创建实例,需要放在 workerstart 中执行
    */
   public function __construct($config)
   {
      if (empty($this->pool)) {
         $this->config = $config;
         $this->pool = new chan($config['pool_size']);
         for ($i = 0; $i < $config['pool_size']; $i++) {
            $mysql = new MySQL();
            $res = $mysql->connect($config);
            if ($res == false) {
               //连接失败,抛异常
               throw new \RuntimeException("failed to connect mysql server.");
            } else {
               //mysql 连接存入 channel
               $this->put($mysql);
            }
         }
      }
   }

   /**
     * @param $mysql
     * @desc  放入一个 mysql 连接入池
     */
    public function put($mysql)
    {
        $this->pool->push($mysql);
    }

    /**
     * @return mixed
     * @desc   获取一个连接,当超时,返回一个异常
     */
    public function get()
    {
        $mysql = $this->pool->pop($this->config['pool_get_timeout']);
        if (false === $mysql) {
            throw new \RuntimeException("get mysql timeout, all mysql connection is used");
        }
        return $mysql;
    }

    /**
     * @return mixed
     * @desc   获取当时连接池可用对象
     */
    public function getLength()
    {
        return $this->pool->length();
    }

}

使用

代码语言:javascript
复制
<?php

require '../vendor/autoload.php';

use SwExample\MysqlPool;

$config = [
    'host' => '127.0.0.1',   //数据库 ip
    'port' => 3306,          //数据库端口
    'user' => 'root',        //数据库用户名
    'password' => 'root', //数据库密码
    'database' => 'wordpress',   //默认数据库名
    'timeout' => 0.5,       //数据库连接超时时间
    'charset' => 'utf8mb4', //默认字符集
    'strict_type' => true,  //ture,会自动表数字转为 int 类型
    'pool_size' => '3',     //连接池大小
    'pool_get_timeout' => 0.5, //当在此时间内未获得到一个连接,会立即返回。(表示所有的连接都已在使用中)
];

//创建 http server
$http = new Swoole\Http\Server("0.0.0.0", 9501);
$http->set([
      //"daemonize" => true, // 常驻进程模式运行
      "worker_num" => 1,
      "log_level" => SWOOLE_LOG_ERROR,
    ]);

$http->on('WorkerStart', function ($serv, $worker_id) use ($config) {
        //worker 启动时,每个进程都初始化连接池,在 onRequest 中可以直接使用
        try {
            MysqlPool::getInstance($config);
        } catch (\Exception $e) {
            //初始化异常,关闭服务
            echo $e->getMessage() . PHP_EOL;
            $serv->shutdown();
        } catch (\Throwable $throwable) {
            //初始化异常,关闭服务
            echo $throwable->getMessage() . PHP_EOL;
            $serv->shutdown();
        }
    });

$http->on('request', function ($request, $response) {

        //浏览器会自动发起这个请求 避免占用请求
        if ($request->server['path_info'] == '/favicon.ico') {
            $response->end('');
            return;
        }

        //获取数据库
        if ($request->server['path_info'] == '/list') {
            go(function () use ($request, $response) {
                    //从池子中获取一个实例
                    try {
                        $pool = MysqlPool::getInstance();
                        $mysql = $pool->get();
                        defer(function () use ($mysql) {
                                //利用 defer 特性,可以达到协程执行完成,归还$mysql 到连接池
                                //好处是 可能因为业务代码很长,导致乱用或者忘记把资源归还
                                MysqlPool::getInstance()->put($mysql);
                                echo "当前可用连接数:" . MysqlPool::getInstance()->getLength() . PHP_EOL;
                            });
                        $result = $mysql->query("select * from wp_users");
                        $response->end(json_encode($result));
                    } catch (\Exception $e) {
                       $response->end($e->getMessage());
                    }
            });
            return;
        }

        echo "get request:".date('Y-m-d H:i:s').PHP_EOL;
        if ($request->server['path_info'] == '/timeout') {
            go(function () use ($request, $response) {
                    //从池子中获取一个实例
                    try {
                        $pool = MysqlPool::getInstance();
                        $mysql = $pool->get();
                        defer(function () use ($mysql) {
                                //协程执行完成,归还$mysql 到连接池
                                MysqlPool::getInstance()->put($mysql);
                                echo "当前可用连接数:" . MysqlPool::getInstance()->getLength() . PHP_EOL;
                        });
                        $result = $mysql->query("select * from wp_users");
                        \Swoole\Coroutine::sleep(10); //sleep 10 秒,模拟耗时操作
                        $response->end(date('Y-m-d H:i:s').PHP_EOL.json_encode($result));
                    } catch (\Exception $e) {
                       $response->end($e->getMessage());
                    }
            });
            return;
        }
    });

$http->start();

访问http://127.0.0.1:9501/list可以看到正常的结果输出

访问http://127.0.0.1:9501/timeout演示连接池取和存的过程

模拟 timeout, 需要浏览器打开 4 个 tab 页面,都请求http://127.0.0.1:9501/timeout,前三个应该是等 10 秒出结果,第四个 500ms 后出超时结果

如果是 chrome 浏览器,会对完全一样的 url 做并发请求限制需要加一个随机数,例如http://127.0.0.1:9501/timeout?n=0http://127.0.0.1:9501/timeout?n=1

沈唁志,一个PHPer的成长之路! 任何个人或团体,未经允许禁止转载本文:《使用Coroutine\Channel实现一个简单的MySQL连接池》,谢谢合作!

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-07-02,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Channel 实现原理
  • 方法
  • 连接池
  • 代码实现
  • 使用
相关产品与服务
云数据库 SQL Server
腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档