PHP 使用协同程序实现合作多任务(二)

waitingForRead 及 waitingForWrite 属性是两个承载等待的socket 及等待它们的任务的数组。有趣的部分在于下面的方法,它将检查 socket 是否可用,并重新安排各自任务:

?

<?php
 
protected function ioPoll($timeout) {
    $rSocks = [];
    foreach ($this->waitingForRead as list($socket)) {
        $rSocks[] = $socket;
    }
 
    $wSocks = [];
    foreach ($this->waitingForWrite as list($socket)) {
        $wSocks[] = $socket;
    }
 
    $eSocks = []; // dummy
 
    if (!stream_select($rSocks, $wSocks, $eSocks, $timeout)) {
        return;
    }
 
    foreach ($rSocks as $socket) {
        list(, $tasks) = $this->waitingForRead[(int) $socket];
        unset($this->waitingForRead[(int) $socket]);
 
        foreach ($tasks as $task) {
            $this->schedule($task);
        }
    }
 
    foreach ($wSocks as $socket) {
        list(, $tasks) = $this->waitingForWrite[(int) $socket];
        unset($this->waitingForWrite[(int) $socket]);
 
        foreach ($tasks as $task) {
            $this->schedule($task);
        }
    }
}

stream_select 函数接受承载读取、写入以及待检查的socket的数组(我们无需考虑最后一类)。数组将按引用传递,函数只会保留那些状态改变了的数组元素。我们可以遍历这些数组,并重新安排与之相关的任务。

为了正常地执行上面的轮询动作,我们将在调度器里增加一个特殊的任务:

?

<?php
protected function ioPollTask() {
    while (true) {
        if ($this->taskQueue->isEmpty()) {
            $this->ioPoll(null);
        } else {
            $this->ioPoll(0);
        }
        yield;
    }
}

需要在某个地方注册这个任务,例如,你可以在run()方法的开始增加$this->newTask($this->ioPollTask())。然后就像其他 任务一样每执行完整任务循环一次就执行轮询操作一次(这么做一定不是最好的方法)。ioPollTask将使用0秒的超时来调用ioPoll, 这意味着stream_select将立即返回(而不是等待)。

只有任务队列为空时,我们才使用null超时,这意味着它一直等到某个套接口准备就绪。如果我们没有这么做,那么轮询任务将一而再, 再而三的循环运行,直到有新的连接建立。这将导致100%的CPU利用率。相反,让操作系统做这种等待会更有效。

现在编写服务器相对容易了:

?

<?php
 
function server($port) {
    echo "Starting server at port $port...\n";
 
    $socket = @stream_socket_server("tcp://localhost:$port", $errNo, $errStr);
    if (!$socket) throw new Exception($errStr, $errNo);
 
    stream_set_blocking($socket, 0);
 
    while (true) {
        yield waitForRead($socket);
        $clientSocket = stream_socket_accept($socket, 0);
        yield newTask(handleClient($clientSocket));
    }
}
 
function handleClient($socket) {
    yield waitForRead($socket);
    $data = fread($socket, 8192);
 
    $msg = "Received following request:\n\n$data";
    $msgLength = strlen($msg);
 
    $response = <<<RES
HTTP/1.1 200 OK\r
Content-Type: text/plain\r
Content-Length: $msgLength\r
Connection: close\r
\r
$msg
RES;
 
    yield waitForWrite($socket);
    fwrite($socket, $response);
 
    fclose($socket);
}
 
$scheduler = new Scheduler;
$scheduler->newTask(server(8000));
$scheduler->run();

这段代码将接收到localhost:8000上的连接,然后仅仅返回发送来的内容作为HTTP响应。要做“实际”的事情的话就爱哪个非常复杂(处理 HTTP请求可能已经超出了这篇文章的范围)。上面的代码片段只是演示了一般性的概念。

你可以使用类似于ab -n 10000 -c 100 localhost:8000/这样命令来测试服务器。这条命令将向服务器发送10000个请求,并且其中100个请求将同时到达。使用这样的数目,我得到了处于中间的10毫秒的响应时间。不过还有一个问题:有少数几个请求真正处理的很慢(如5秒), 这就是为什么总吞吐量只有2000请求/秒(如果是10毫秒的响应时间的话,总的吞吐量应该更像是10000请求/秒)。调高并发数(比如 -c 500),服务器大多数运行良好,不过某些连接将抛出“连接被对方重置”的错误。由于我对低级别的socket资料了解的非常少,所以 我不能指出问题出在哪儿。

协程堆栈

如果你试图用我们的调度系统建立更大的系统的话,你将很快遇到问题:我们习惯了把代码分解为更小的函数,然后调用它们。然而, 如果使用了协程的话,就不能这么做了。例如,看下面代码:

?

<?php
 
function echoTimes($msg, $max) {
    for ($i = 1; $i <= $max; ++$i) {
        echo "$msg iteration $i\n";
        yield;
    }
}
 
function task() {
    echoTimes('foo', 10); // print foo ten times
    echo "---\n";
    echoTimes('bar', 5); // print bar five times
    yield; // force it to be a coroutine
}
 
$scheduler = new Scheduler;
$scheduler->newTask(task());
$scheduler->run();

这段代码试图把重复循环“输出n次“的代码嵌入到一个独立的协程里,然后从主任务里调用它。然而它无法运行。正如在这篇文章的开始 所提到的,调用生成器(或者协程)将没有真正地做任何事情,它仅仅返回一个对象。这也出现在上面的例子里。echoTimes调用除了放回一个(无用的)协程对象外不做任何事情。

为了仍然允许这么做,我们需要在这个裸协程上写一个小小的封装。我们将调用它:“协程堆栈”。因为它将管理嵌套的协程调用堆栈。 这将是通过生成协程来调用子协程成为可能:

?

$retval = (yield someCoroutine($foo, $bar));

使用yield,子协程也能再次返回值:

?

yield retval("I'm a return value!");

retval函数除了返回一个值的封装外没有做任何其他事情。这个封装将表示它是一个返回值。

?

<?php
 
class CoroutineReturnValue {
    protected $value;
 
    public function __construct($value) {
        $this->value = $value;
    }
 
    public function getValue() {
        return $this->value;
    }
}
 
function retval($value) {
    return new CoroutineReturnValue($value);
}

为了把协程转变为协程堆栈(它支持子调用),我们将不得不编写另外一个函数(很明显,它是另一个协程):

?

<?php
 
function stackedCoroutine(Generator $gen) {
    $stack = new SplStack;
 
    for (;;) {
        $value = $gen->current();
 
        if ($value instanceof Generator) {
            $stack->push($gen);
            $gen = $value;
            continue;
        }
 
        $isReturnValue = $value instanceof CoroutineReturnValue;
        if (!$gen->valid() || $isReturnValue) {
            if ($stack->isEmpty()) {
                return;
            }
 
            $gen = $stack->pop();
            $gen->send($isReturnValue ? $value->getValue() : NULL);
            continue;
        }
 
        $gen->send(yield $gen->key() => $value);
    }
}

这个函数在调用者和当前正在运行的子协程之间扮演着简单代理的角色。在$gen->send(yield $gen->key()=>$value);这行完成了代理功能。另外它检查返回值是否是生成器,万一是生成器的话,它将开始运行这个生成器,并把前一个协程压入堆栈里。一旦它获得了CoroutineReturnValue的话,它将再次请求堆栈弹出,然后继续执行前一个协程。

为了使协程堆栈在任务里可用,任务构造器里的$this-coroutine =$coroutine;这行需要替代为$this->coroutine = StackedCoroutine($coroutine);。

现在我们可以稍微改进上面web服务器例子:把wait+read(和wait+write和warit+accept)这样的动作分组为函数。为了分组相关的 功能,我将使用下面类:

?

<?php
 
class CoSocket {
    protected $socket;
 
    public function __construct($socket) {
        $this->socket = $socket;
    }
 
    public function accept() {
        yield waitForRead($this->socket);
        yield retval(new CoSocket(stream_socket_accept($this->socket, 0)));
    }
 
    public function read($size) {
        yield waitForRead($this->socket);
        yield retval(fread($this->socket, $size));
    }
 
    public function write($string) {
        yield waitForWrite($this->socket);
        fwrite($this->socket, $string);
    }
 
    public function close() {
        @fclose($this->socket);
    }
}

现在服务器可以编写的稍微简洁点了:

?

<?php
 
function server($port) {
    echo "Starting server at port $port...\n";
 
    $socket = @stream_socket_server("tcp://localhost:$port", $errNo, $errStr);
    if (!$socket) throw new Exception($errStr, $errNo);
 
    stream_set_blocking($socket, 0);
 
    $socket = new CoSocket($socket);
    while (true) {
        yield newTask(
            handleClient(yield $socket->accept())
        );
    }
}
 
function handleClient($socket) {
    $data = (yield $socket->read(8192));
 
    $msg = "Received following request:\n\n$data";
    $msgLength = strlen($msg);
 
    $response = <<<RES
HTTP/1.1 200 OK\r
Content-Type: text/plain\r
Content-Length: $msgLength\r
Connection: close\r
\r
$msg
RES;
 
    yield $socket->write($response);
    yield $socket->close();
}

错误处理

作为一个优秀的程序员,相信你已经察觉到上面的例子缺少错误处理。几乎所有的 socket 都是易出错的。我这样做的原因一方面固然是因为错误处理的乏味(特别是 socket!),另一方面也在于它很容易使代码体积膨胀。

不过,我仍然了一讲一下常见的协程错误处理:协程允许使用 throw() 方法在其内部抛出一个错误。尽管此方法还未在 PHP 中实现,但我很快就会提交它,就在今天。

throw() 方法接受一个 Exception,并将其抛出到协程的当前悬挂点,看看下面代码:

?

<?php
 
function gen() {
    echo "Foo\n";
    try {
        yield;
    } catch (Exception $e) {
        echo "Exception: {$e->getMessage()}\n";
    }
    echo "Bar\n";
}
 
$gen = gen();
$gen->rewind();                     // echos "Foo"
$gen->throw(new Exception('Test')); // echos "Exception: Test"
                                    // and "Bar"

这非常棒,因为我们可以使用系统调用以及子协程调用异常抛出。对与系统调用,Scheduler::run() 方法需要一些小调整:

?

<?php
 
if ($retval instanceof SystemCall) {
    try {
        $retval($task, $this);
    } catch (Exception $e) {
        $task->setException($e);
        $this->schedule($task);
    }
    continue;
}

Task 类也许要添加 throw 调用处理:

?

<?php
 
class Task {
    // ...
    protected $exception = null;
 
    public function setException($exception) {
        $this->exception = $exception;
    }
 
    public function run() {
        if ($this->beforeFirstYield) {
            $this->beforeFirstYield = false;
            return $this->coroutine->current();
        } elseif ($this->exception) {
            $retval = $this->coroutine->throw($this->exception);
            $this->exception = null;
            return $retval;
        } else {
            $retval = $this->coroutine->send($this->sendValue);
            $this->sendValue = null;
            return $retval;
        }
    }
 
    // ...
}

现在,我们已经可以在系统调用中使用异常抛出了!例如,要调用 killTask,让我们在传递 ID 不可用时抛出一个异常:

?

<?php
 
function killTask($tid) {
    return new SystemCall(
        function(Task $task, Scheduler $scheduler) use ($tid) {
            if ($scheduler->killTask($tid)) {
                $scheduler->schedule($task);
            } else {
                throw new InvalidArgumentException('Invalid task ID!');
            }
        }
    );
}

试试看:

?

<?php
 
function task() {
    try {
        yield killTask(500);
    } catch (Exception $e) {
        echo 'Tried to kill task 500 but failed: ', $e->getMessage(), "\n";
    }
}

这些代码现在尚不能正常运作,因为 stackedCoroutine 函数无法正确处理异常。要修复需要做些调整:

?

<?php
 
function stackedCoroutine(Generator $gen) {
    $stack = new SplStack;
    $exception = null;
 
    for (;;) {
        try {
            if ($exception) {
                $gen->throw($exception);
                $exception = null;
                continue;
            }
 
            $value = $gen->current();
 
            if ($value instanceof Generator) {
                $stack->push($gen);
                $gen = $value;
                continue;
            }
 
            $isReturnValue = $value instanceof CoroutineReturnValue;
            if (!$gen->valid() || $isReturnValue) {
                if ($stack->isEmpty()) {
                    return;
                }
 
                $gen = $stack->pop();
                $gen->send($isReturnValue ? $value->getValue() : NULL);
                continue;
            }
 
            try {
                $sendValue = (yield $gen->key() => $value);
            } catch (Exception $e) {
                $gen->throw($e);
                continue;
            }
 
            $gen->send($sendValue);
        } catch (Exception $e) {
            if ($stack->isEmpty()) {
                throw $e;
            }
 
            $gen = $stack->pop();
            $exception = $e;
        }
    }
}

结束语

在这篇文章里,我使用多任务协作构建了一个任务调度器,其中包括执行“系统调用”,做非阻塞操作和处理错误。所有这些里真正很酷的事情是任务的结果代码看起来完全同步,甚至任务正在执行大量的异步操作的时候也是这样。如果你打算从套接口读取数据的话,你将不需要传递某个回调函数或者注册一个事件侦听器。相反,你只要书写yield $socket->read()。这儿大部分都是你常常也要编写的,只在它的前面增加yield。

当我第一次听到所有这一切的时候,我发现这个概念完全令人折服,而且正是这个激励我在PHP中实现了它。同时我发现协程真正令人心慌。在令人敬畏的代码和很大一堆代码之间只有单薄的一行,我认为协程正好处在这一行上。讲讲使用上面所述的方法书写异步代码是否真的有益对我来说很难。v

原文发布于微信公众号 - nginx(nginx-study)

原文发表时间:2016-01-28

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏精讲JAVA

深入探索 Java 热部署

在 Java 开发领域,热部署一直是一个难以解决的问题,目前的 Java 虚拟机只能实现方法体的修改热部署,对于整个类的结构修改,仍然需要重启虚拟机,对类重新加...

1081
来自专栏沈唁志

如何优化PHP性能呢?PHP性能优化总结

2763
来自专栏Vamei实验室

纸上谈兵: 队列 (queue)

作者:Vamei 出处:http://www.cnblogs.com/vamei 欢迎转载,也请保留这段声明。谢谢! 队列(queue)是一个简单而常见的数据结...

2026
来自专栏ImportSource

并发编程-原子性

前面我们说了有关stateless的内容,那么如果我们在一个stateless的object中添加一个状态元素会发生什么呢?现在假设我们想要添加一个命中计数器(...

40311
来自专栏互联网杂技

丁点而内存知识

在C和C++语言开发中,指针、内存一直是学习的重点。因为C语言作为一种偏底层的中低级语言,提供了大量的内存直接操作的方法,这一方面使程序的灵活度最大化,同时也为...

3354
来自专栏大内老A

WCF技术剖析之二十二: 深入剖析WCF底层异常处理框架实现原理[下篇]

WCF客户端和服务端的框架体系相互协作,使得开发人员可以按照我们熟悉的方式进行异常的处理:在服务操作执行过程中抛出异常(FaultException),在调用服...

2029
来自专栏老马说编程

(67) 线程的基本协作机制 (上) / 计算机程序的思维逻辑

上节介绍了多线程之间竞争访问同一个资源的问题及解决方案synchronized,我们提到,多线程之间除了竞争,还经常需要相互协作,本节就来介绍Java中多线程协...

2116
来自专栏安恒网络空间安全讲武堂

堆学习入门

借助hitcon training的题目对三种堆的利用方法进行了一个系统的学习,刚入坑的堆小白们可以一起学习一下。题目链接:https://github.com...

1432
来自专栏专注 Java 基础分享

使用 Synchronized 关键字

使用 Synchronized 关键字来解决并发问题是最简单的一种方式,我们只需要使用它修饰需要被并发处理的代码块、方法或字段属性,虚拟机自动为它加锁和释放锁,...

1103
来自专栏小勇DW3

Java多线程面试题整理 1) 什么是线程?

线程是操作系统能够进行运算调度的最小单位,它被包含在进程之中,是进程中的实际运作单位。程序员可以通过它进行多处理器编程,你可以使用多线程对运算密集型任务提速。比...

3492

扫码关注云+社区

领取腾讯云代金券