前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >PHP 使用协同程序实现合作多任务(二)

PHP 使用协同程序实现合作多任务(二)

作者头像
后端技术探索
发布2018-08-09 16:25:50
5320
发布2018-08-09 16:25:50
举报
文章被收录于专栏:后端技术探索后端技术探索

waitingForRead 及 waitingForWrite 属性是两个承载等待的socket 及等待它们的任务的数组。有趣的部分在于下面的方法,它将检查 socket 是否可用,并重新安排各自任务:

?

代码语言:javascript
复制
<?php
 
protected function ioPoll($timeout) {
    $rSocks = [];
    foreach ($this->waitingForRead as list($socket)) {
        $rSocks[] = $socket;
    }
 
    $wSocks = [];
    foreach ($this->waitingForWrite as list($socket)) {
        $wSocks[] = $socket;
    }
 
    $eSocks = []; // dummy
 
    if (!stream_select($rSocks, $wSocks, $eSocks, $timeout)) {
        return;
    }
 
    foreach ($rSocks as $socket) {
        list(, $tasks) = $this->waitingForRead[(int) $socket];
        unset($this->waitingForRead[(int) $socket]);
 
        foreach ($tasks as $task) {
            $this->schedule($task);
        }
    }
 
    foreach ($wSocks as $socket) {
        list(, $tasks) = $this->waitingForWrite[(int) $socket];
        unset($this->waitingForWrite[(int) $socket]);
 
        foreach ($tasks as $task) {
            $this->schedule($task);
        }
    }
}

stream_select 函数接受承载读取、写入以及待检查的socket的数组(我们无需考虑最后一类)。数组将按引用传递,函数只会保留那些状态改变了的数组元素。我们可以遍历这些数组,并重新安排与之相关的任务。

为了正常地执行上面的轮询动作,我们将在调度器里增加一个特殊的任务:

?

代码语言:javascript
复制
<?php
protected function ioPollTask() {
    while (true) {
        if ($this->taskQueue->isEmpty()) {
            $this->ioPoll(null);
        } else {
            $this->ioPoll(0);
        }
        yield;
    }
}

需要在某个地方注册这个任务,例如,你可以在run()方法的开始增加$this->newTask($this->ioPollTask())。然后就像其他 任务一样每执行完整任务循环一次就执行轮询操作一次(这么做一定不是最好的方法)。ioPollTask将使用0秒的超时来调用ioPoll, 这意味着stream_select将立即返回(而不是等待)。

只有任务队列为空时,我们才使用null超时,这意味着它一直等到某个套接口准备就绪。如果我们没有这么做,那么轮询任务将一而再, 再而三的循环运行,直到有新的连接建立。这将导致100%的CPU利用率。相反,让操作系统做这种等待会更有效。

现在编写服务器相对容易了:

?

代码语言:javascript
复制
<?php
 
function server($port) {
    echo "Starting server at port $port...\n";
 
    $socket = @stream_socket_server("tcp://localhost:$port", $errNo, $errStr);
    if (!$socket) throw new Exception($errStr, $errNo);
 
    stream_set_blocking($socket, 0);
 
    while (true) {
        yield waitForRead($socket);
        $clientSocket = stream_socket_accept($socket, 0);
        yield newTask(handleClient($clientSocket));
    }
}
 
function handleClient($socket) {
    yield waitForRead($socket);
    $data = fread($socket, 8192);
 
    $msg = "Received following request:\n\n$data";
    $msgLength = strlen($msg);
 
    $response = <<<RES
HTTP/1.1 200 OK\r
Content-Type: text/plain\r
Content-Length: $msgLength\r
Connection: close\r
\r
$msg
RES;
 
    yield waitForWrite($socket);
    fwrite($socket, $response);
 
    fclose($socket);
}
 
$scheduler = new Scheduler;
$scheduler->newTask(server(8000));
$scheduler->run();

这段代码将接收到localhost:8000上的连接,然后仅仅返回发送来的内容作为HTTP响应。要做“实际”的事情的话就爱哪个非常复杂(处理 HTTP请求可能已经超出了这篇文章的范围)。上面的代码片段只是演示了一般性的概念。

你可以使用类似于ab -n 10000 -c 100 localhost:8000/这样命令来测试服务器。这条命令将向服务器发送10000个请求,并且其中100个请求将同时到达。使用这样的数目,我得到了处于中间的10毫秒的响应时间。不过还有一个问题:有少数几个请求真正处理的很慢(如5秒), 这就是为什么总吞吐量只有2000请求/秒(如果是10毫秒的响应时间的话,总的吞吐量应该更像是10000请求/秒)。调高并发数(比如 -c 500),服务器大多数运行良好,不过某些连接将抛出“连接被对方重置”的错误。由于我对低级别的socket资料了解的非常少,所以 我不能指出问题出在哪儿。

协程堆栈

如果你试图用我们的调度系统建立更大的系统的话,你将很快遇到问题:我们习惯了把代码分解为更小的函数,然后调用它们。然而, 如果使用了协程的话,就不能这么做了。例如,看下面代码:

?

代码语言:javascript
复制
<?php
 
function echoTimes($msg, $max) {
    for ($i = 1; $i <= $max; ++$i) {
        echo "$msg iteration $i\n";
        yield;
    }
}
 
function task() {
    echoTimes('foo', 10); // print foo ten times
    echo "---\n";
    echoTimes('bar', 5); // print bar five times
    yield; // force it to be a coroutine
}
 
$scheduler = new Scheduler;
$scheduler->newTask(task());
$scheduler->run();

这段代码试图把重复循环“输出n次“的代码嵌入到一个独立的协程里,然后从主任务里调用它。然而它无法运行。正如在这篇文章的开始 所提到的,调用生成器(或者协程)将没有真正地做任何事情,它仅仅返回一个对象。这也出现在上面的例子里。echoTimes调用除了放回一个(无用的)协程对象外不做任何事情。

为了仍然允许这么做,我们需要在这个裸协程上写一个小小的封装。我们将调用它:“协程堆栈”。因为它将管理嵌套的协程调用堆栈。 这将是通过生成协程来调用子协程成为可能:

?

代码语言:javascript
复制
$retval = (yield someCoroutine($foo, $bar));

使用yield,子协程也能再次返回值:

?

代码语言:javascript
复制
yield retval("I'm a return value!");

retval函数除了返回一个值的封装外没有做任何其他事情。这个封装将表示它是一个返回值。

?

代码语言:javascript
复制
<?php
 
class CoroutineReturnValue {
    protected $value;
 
    public function __construct($value) {
        $this->value = $value;
    }
 
    public function getValue() {
        return $this->value;
    }
}
 
function retval($value) {
    return new CoroutineReturnValue($value);
}

为了把协程转变为协程堆栈(它支持子调用),我们将不得不编写另外一个函数(很明显,它是另一个协程):

?

代码语言:javascript
复制
<?php
 
function stackedCoroutine(Generator $gen) {
    $stack = new SplStack;
 
    for (;;) {
        $value = $gen->current();
 
        if ($value instanceof Generator) {
            $stack->push($gen);
            $gen = $value;
            continue;
        }
 
        $isReturnValue = $value instanceof CoroutineReturnValue;
        if (!$gen->valid() || $isReturnValue) {
            if ($stack->isEmpty()) {
                return;
            }
 
            $gen = $stack->pop();
            $gen->send($isReturnValue ? $value->getValue() : NULL);
            continue;
        }
 
        $gen->send(yield $gen->key() => $value);
    }
}

这个函数在调用者和当前正在运行的子协程之间扮演着简单代理的角色。在$gen->send(yield $gen->key()=>$value);这行完成了代理功能。另外它检查返回值是否是生成器,万一是生成器的话,它将开始运行这个生成器,并把前一个协程压入堆栈里。一旦它获得了CoroutineReturnValue的话,它将再次请求堆栈弹出,然后继续执行前一个协程。

为了使协程堆栈在任务里可用,任务构造器里的$this-coroutine =$coroutine;这行需要替代为$this->coroutine = StackedCoroutine($coroutine);。

现在我们可以稍微改进上面web服务器例子:把wait+read(和wait+write和warit+accept)这样的动作分组为函数。为了分组相关的 功能,我将使用下面类:

?

代码语言:javascript
复制
<?php
 
class CoSocket {
    protected $socket;
 
    public function __construct($socket) {
        $this->socket = $socket;
    }
 
    public function accept() {
        yield waitForRead($this->socket);
        yield retval(new CoSocket(stream_socket_accept($this->socket, 0)));
    }
 
    public function read($size) {
        yield waitForRead($this->socket);
        yield retval(fread($this->socket, $size));
    }
 
    public function write($string) {
        yield waitForWrite($this->socket);
        fwrite($this->socket, $string);
    }
 
    public function close() {
        @fclose($this->socket);
    }
}

现在服务器可以编写的稍微简洁点了:

?

代码语言:javascript
复制
<?php
 
function server($port) {
    echo "Starting server at port $port...\n";
 
    $socket = @stream_socket_server("tcp://localhost:$port", $errNo, $errStr);
    if (!$socket) throw new Exception($errStr, $errNo);
 
    stream_set_blocking($socket, 0);
 
    $socket = new CoSocket($socket);
    while (true) {
        yield newTask(
            handleClient(yield $socket->accept())
        );
    }
}
 
function handleClient($socket) {
    $data = (yield $socket->read(8192));
 
    $msg = "Received following request:\n\n$data";
    $msgLength = strlen($msg);
 
    $response = <<<RES
HTTP/1.1 200 OK\r
Content-Type: text/plain\r
Content-Length: $msgLength\r
Connection: close\r
\r
$msg
RES;
 
    yield $socket->write($response);
    yield $socket->close();
}

错误处理

作为一个优秀的程序员,相信你已经察觉到上面的例子缺少错误处理。几乎所有的 socket 都是易出错的。我这样做的原因一方面固然是因为错误处理的乏味(特别是 socket!),另一方面也在于它很容易使代码体积膨胀。

不过,我仍然了一讲一下常见的协程错误处理:协程允许使用 throw() 方法在其内部抛出一个错误。尽管此方法还未在 PHP 中实现,但我很快就会提交它,就在今天。

throw() 方法接受一个 Exception,并将其抛出到协程的当前悬挂点,看看下面代码:

?

代码语言:javascript
复制
<?php
 
function gen() {
    echo "Foo\n";
    try {
        yield;
    } catch (Exception $e) {
        echo "Exception: {$e->getMessage()}\n";
    }
    echo "Bar\n";
}
 
$gen = gen();
$gen->rewind();                     // echos "Foo"
$gen->throw(new Exception('Test')); // echos "Exception: Test"
                                    // and "Bar"

这非常棒,因为我们可以使用系统调用以及子协程调用异常抛出。对与系统调用,Scheduler::run() 方法需要一些小调整:

?

代码语言:javascript
复制
<?php
 
if ($retval instanceof SystemCall) {
    try {
        $retval($task, $this);
    } catch (Exception $e) {
        $task->setException($e);
        $this->schedule($task);
    }
    continue;
}

Task 类也许要添加 throw 调用处理:

?

代码语言:javascript
复制
<?php
 
class Task {
    // ...
    protected $exception = null;
 
    public function setException($exception) {
        $this->exception = $exception;
    }
 
    public function run() {
        if ($this->beforeFirstYield) {
            $this->beforeFirstYield = false;
            return $this->coroutine->current();
        } elseif ($this->exception) {
            $retval = $this->coroutine->throw($this->exception);
            $this->exception = null;
            return $retval;
        } else {
            $retval = $this->coroutine->send($this->sendValue);
            $this->sendValue = null;
            return $retval;
        }
    }
 
    // ...
}

现在,我们已经可以在系统调用中使用异常抛出了!例如,要调用 killTask,让我们在传递 ID 不可用时抛出一个异常:

?

代码语言:javascript
复制
<?php
 
function killTask($tid) {
    return new SystemCall(
        function(Task $task, Scheduler $scheduler) use ($tid) {
            if ($scheduler->killTask($tid)) {
                $scheduler->schedule($task);
            } else {
                throw new InvalidArgumentException('Invalid task ID!');
            }
        }
    );
}

试试看:

?

代码语言:javascript
复制
<?php
 
function task() {
    try {
        yield killTask(500);
    } catch (Exception $e) {
        echo 'Tried to kill task 500 but failed: ', $e->getMessage(), "\n";
    }
}

这些代码现在尚不能正常运作,因为 stackedCoroutine 函数无法正确处理异常。要修复需要做些调整:

?

代码语言:javascript
复制
<?php
 
function stackedCoroutine(Generator $gen) {
    $stack = new SplStack;
    $exception = null;
 
    for (;;) {
        try {
            if ($exception) {
                $gen->throw($exception);
                $exception = null;
                continue;
            }
 
            $value = $gen->current();
 
            if ($value instanceof Generator) {
                $stack->push($gen);
                $gen = $value;
                continue;
            }
 
            $isReturnValue = $value instanceof CoroutineReturnValue;
            if (!$gen->valid() || $isReturnValue) {
                if ($stack->isEmpty()) {
                    return;
                }
 
                $gen = $stack->pop();
                $gen->send($isReturnValue ? $value->getValue() : NULL);
                continue;
            }
 
            try {
                $sendValue = (yield $gen->key() => $value);
            } catch (Exception $e) {
                $gen->throw($e);
                continue;
            }
 
            $gen->send($sendValue);
        } catch (Exception $e) {
            if ($stack->isEmpty()) {
                throw $e;
            }
 
            $gen = $stack->pop();
            $exception = $e;
        }
    }
}

结束语

在这篇文章里,我使用多任务协作构建了一个任务调度器,其中包括执行“系统调用”,做非阻塞操作和处理错误。所有这些里真正很酷的事情是任务的结果代码看起来完全同步,甚至任务正在执行大量的异步操作的时候也是这样。如果你打算从套接口读取数据的话,你将不需要传递某个回调函数或者注册一个事件侦听器。相反,你只要书写yield $socket->read()。这儿大部分都是你常常也要编写的,只在它的前面增加yield。

当我第一次听到所有这一切的时候,我发现这个概念完全令人折服,而且正是这个激励我在PHP中实现了它。同时我发现协程真正令人心慌。在令人敬畏的代码和很大一堆代码之间只有单薄的一行,我认为协程正好处在这一行上。讲讲使用上面所述的方法书写异步代码是否真的有益对我来说很难。v

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2016-01-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 nginx 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 协程堆栈
  • 错误处理
    • 结束语
    相关产品与服务
    测试服务
    测试服务 WeTest 包括标准兼容测试、专家兼容测试、手游安全测试、远程调试等多款产品,服务于海量腾讯精品游戏,涵盖兼容测试、压力测试、性能测试、安全测试、远程调试等多个方向,立体化安全防护体系,保卫您的信息安全。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档