首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >php实现多进程socket服务器class

php实现多进程socket服务器class

作者头像
仙士可
发布2019-12-19 14:11:38
1.7K0
发布2019-12-19 14:11:38
举报
文章被收录于专栏:仙士可博客仙士可博客

该类实现了多进程的socket服务,目前只写了关于TCP协议,待完善其他内容

所需扩展:socket  pcntl

<?php
/**
 * Created by PhpStorm.
 * User: tioncico
 * Date: 18-5-1
 * Time: 下午7:56
 */

class SphpSocket
{

    private static $_instance;
    public $connect_list = array();//客户端列表
    public $connect_callback, $receive_callback, $close_callback;//回调函数
    public $server;//socket服务
    public $is_run=true;//是否运行
    public $config = array(//各种配置
        'debug'=>true,

        'host' => '0.0.0.0',
        'port' => '9501',

        'domain'   => AF_INET,
        'type'     => SOCK_STREAM,
        'protocol' => SOL_TCP,

        'accept' => 511,

        'option_level' => SOL_SOCKET,
        'optname'      => SO_REUSEADDR,
        'optval'       => 1,

        'read_length'=>1024,
        'read_type'=>PHP_NORMAL_READ

    );

    public $error_log=array();

    public static function getInstance($host, $port)
    {
        if (!(self::$_instance instanceof self)) {
            self::$_instance = new static($host, $port);
        }
        return self::$_instance;
    }

    public function __construct($host, $port)
    {
        $this->config['host'] = $host;
        $this->config['port'] = $port;
    }

    /**
     * 绑定事件
     * @param $type connect|receive|close
     * @param callable $function
     */
    public function on($type, callable $function)
    {
        switch (strtolower($type)) {
            case 'connect':
                $this->connect_callback = $function;
                break;
            case 'receive':
                $this->receive_callback = $function;
                break;
            case 'close':
                $this->close_callback = $function;
                break;
        }
        return $this;
    }

    public function onConnect($connection){
        if (is_callable($this->connect_callback)) {
            call_user_func($this->connect_callback,$connection);
        }
    }
    public function onReceive($connection,$data){

        if (is_callable($this->receive_callback)) {
            call_user_func($this->receive_callback,$connection,$data);
        }
    }
    public function onClose($connection){
        if (is_callable($this->close_callback)) {
            call_user_func($this->close_callback,$connection);
        }
    }

    /**
     *
     */
    public function start()
    {
        $this->createSocket();
        echo '创建socket成功!'.PHP_EOL;
        $this->bindSocket();
        echo '绑定端口成功!'.PHP_EOL;
        $this->listenSocket();
        echo '监听端口成功!'.PHP_EOL;
        $this->setOptionSocket();
        $this->acceptSocket();
        return $this;
    }

    /**
     * 创建socket
     * @return $this
     * @throws Exception
     */
    protected function createSocket()
    {
        $this->server = socket_create($this->config['domain'], $this->config['type'], $this->config['protocol']);
        if ($this->server === false) {
            throw new Exception('创建socket失败!');
        }
        return $this;
    }

    /**
     * 绑定端口
     * @return $this
     * @throws Exception
     */
    protected function bindSocket()
    {
        $this->server === false && $this->createSocket();

        $result = socket_bind($this->server, $this->config['host'], $this->config['port']);
        if ($result === false) {
            throw new Exception('绑定端口失败!');
        }
        return $this;
    }

    /**
     * 监听端口
     * @param null $accept
     * @return $this
     * @throws Exception
     */
    protected function listenSocket($accept = null)
    {
        $this->server === false && $this->createSocket();
        $accept || $accept = $this->config['accept'];
        $result = socket_listen($this->server, $accept);
        if ($result === false) {
            throw new Exception('监听端口失败!');
        }
        return $this;
    }

    /**
     * 配置socket
     * @return $this
     * @throws Exception
     */
    protected function setOptionSocket()
    {
        $this->server === false && $this->createSocket();
        $result = socket_set_option($this->server, $this->config['option_level'], $this->config['optname'], $this->config['optval']);
        if ($result === false) {
            throw new Exception('配置socket失败!');
        }
        return $this;
    }

    /**
     * 接收socket连接
     */
    protected function acceptSocket(){
        $this->server === false && $this->createSocket();
        while(true&&$this->is_run===true){
            $connection = socket_accept($this->server);
            if($connection===false){

            }else{
                $this->addConnectionList($connection);
                $this->onConnect($connection);
                $this->forkProcess($connection);
            }

        }
    }

    /**
     * 写入客户端信息
     * @param $connection
     * @return $this
     */
    protected function addConnectionList($connection){
//        $fd =
        $this->connect_list[(string)$connection]['fd']=$connection;
        return $this;
    }

    /**
     * 写入客户端进程id
     * @param $connection
     * @param $pid
     * @return $this
     */
    protected function addConnectionListProcess($connection,$pid){
        $this->connect_list[(string)$connection]['pid']=$pid;
        return $this;
    }

    /**
     * 派生进程处理
     * @param $connection
     */
    protected function forkProcess($connection){
   
        $pid = pcntl_fork();
        if($pid>0){//使用主进程处理客户端其他请求,子进程继续监听连接请求
            $this->addConnectionListProcess($connection,$pid);
            $this->readSocket($connection);
        }else{
        }
    }

    /**
     * 读取socket信息
     * @param $connection
     */
    protected function readSocket($connection){
        while(true&&isset($this->connect_list[(string)$connection])&&$this->is_run){
            $data = @socket_read($connection,$this->config['read_length'],$this->config['read_type']);
            if($data===false){
                $this->close($connection);
            }else{
                $this->onReceive($connection,$data);
            }
        }
   }

    /**
     * 发送消息给客户端
     * @param $connection
     * @param $msg
     * @return int
     */
   public function send($connection,$msg){
       $result = socket_write($connection, $msg,strlen($msg));
       return $result;
   }

    /**
     * 主动关闭客户端
     * @param $connection
     */
   public function close($connection){
       $this->onClose($connection);
       //先关掉子进程
       posix_kill($this->connect_list[(string)$connection]['pid'],SIGTERM);
       $result = socket_close($connection);
       unset($this->connect_list[(string)$connection]);
       return $result;
   }
}

开启例子:

$socket = SphpSocket::getInstance('0.0.0.0',9501);
$socket->on('connect',function ($connection)use($socket){
    $socket->send($connection,'恭喜您连接成功!');
});
$socket->on('receive',function ($connection,$data)use($socket){
    $result = $socket->send($connection,'您发送的消息是:'.$data);
    var_dump($data);
    if(trim($data)=='关闭'){
        $socket->close($connection);
    }
    echo "发送消息成功";

});
$socket->on('close',function ($connection)use($socket){
    var_dump($connection.'已经关闭连接');
});
$socket->start();

本文为仙士可原创文章,转载无需和我联系,但请注明来自仙士可博客www.php20.cn

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018-05-02 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档