Reactphp-MQ 的主要目标是在并发处理多个操作时进行有效的速率限制,避免资源过度消耗或被远程服务器封禁。它就像是一个智能的调度员,帮助开发者管理异步操作,确保同一时间不会有过多的操作同时执行。例如,在网页爬虫场景中,如果需要发送大量的 HTTP 请求,直接并发发送可能会导致网络拥堵或被目标网站封禁 IP。而使用 Reactphp-MQ 可以对请求进行排队和调度,控制并发数量,从而更稳定地完成任务。
推荐使用 Composer 来安装 clue/reactphp-mq
,只需在项目根目录下运行以下命令:
composer require clue/mq-react:^1.7
以下是一个简单的 HTTP 请求示例,展示了如何使用 clue/reactphp-mq
来控制并发请求:
<?php
require__DIR__ . '/vendor/autoload.php';
$browser = new React\Http\Browser();
// 加载需要请求的 URL 列表
$urls = file('urls.txt');
// 创建一个队列实例,限制并发任务数量为 3
$q = new Clue\React\Mq\Queue(3, null, function ($url) use ($browser) {
return $browser->get($url);
});
// 遍历 URL 列表,将每个请求添加到队列中
foreach ($urls as $url) {
$q($url)->then(function (Psr\Http\Message\ResponseInterface $response) use ($url) {
echo $url . ': ' . $response->getBody()->getSize() . ' bytes' . PHP_EOL;
}, function (Exception $e) {
echo'Error: ' . $e->getMessage() . PHP_EOL;
});
}
在这个示例中,我们首先创建了一个 React\Http\Browser
实例,用于发送 HTTP 请求。然后,我们创建了一个 Queue
实例,将并发任务数量限制为 3
。最后,我们遍历 URL 列表,将每个请求添加到队列中,并处理请求的结果。
all()
方法Queue::all()
方法可以同时处理多个任务,并在所有任务完成后返回结果。如果其中任何一个任务失败,整个操作将被拒绝。示例代码如下:
$browser = new React\Http\Browser();
$promise = Queue::all(3, $urls, function ($url) use ($browser) {
return $browser->get($url);
});
$promise->then(function (array $responses) {
echo 'All ' . count($responses) . ' successful!' . PHP_EOL;
});
any()
方法Queue::any()
方法会并发处理多个任务,并在第一个任务成功完成后返回结果。如果所有任务都失败,则操作将被拒绝。示例代码如下:
$browser = new React\Http\Browser();
$promise = Queue::any(3, $urls, function ($url) use ($browser) {
return $browser->get($url);
});
$promise->then(function (ResponseInterface $response) {
echo 'First response: ' . $response->getBody() . PHP_EOL;
});
默认情况下,该库不会限制单个操作的执行时间。但在实际应用中,我们可能需要为操作设置超时时间。可以使用 react/promise-timer
来实现这一功能,示例代码如下:
use React\Promise\Timer;
$q = new Queue(10, null, function ($uri) use ($browser) {
return Timer\timeout($browser->get($uri), 2.0);
});
$promise = $q($uri);