我有一种情况,我有一个循环,它将从文件中读取数据块,将这些块发送到rest api,并继续到EOF,但我希望这在循环中是异步的,因此,我不必等待API响应来读取下一个块。我一直在研究Amphp和ReactPHP,因为我找不到一个解决方案,或者我不明白这些库应该如何使用。这是我正在做的一个伪代码。
<?php
while($file.read()){
$chunk = getNextChunk();
sendChunkAsync($chunk);
}
function getNextChunk(){
echo "reading next chunk";
// read next chunk of data
}
带有amphp的示例
function sendChunkAsync($chunk){
Loop::run(function () {
$uri = "https://testapi.com/api";
$client = new DefaultClient;
try {
$promises = $client->request($uri);
$responses = yield $promises;
echo "chunk processed";
} catch (Amp\Artax\HttpException $error) {
// log error
// $error->getMessage() . PHP_EOL;
}
});
}
在这种情况下,我希望(如果读取chunk比从api获得响应更快)会发生这样的事情,不要认为这是文学上的,我正试图为您说明它。
正在读取下一块
正在读取下一块
已处理区块
正在读取下一块
已处理区块
已处理区块
发布于 2018-12-05 02:34:35
我将使用React,因为我更了解这个库,但它们的工作方式相似。
编辑:已更新,请参阅注释
这将读取一个文件,并且每次收到数据块时,它将创建一个api调用并将数据发送出去
<?php
require_once __DIR__ . '/vendor/autoload.php';
function async_send($config, $file, callable $proccessor)
{
$config['ssl'] = true === $config['ssl'] ? 's' : '';
$client = new \GuzzleHttp\Client([
'base_uri' => 'http' . $config['ssl'] . '://' . $config['domain'] . '/rest/all/V1/',
'verify' => false,
'http_errors' => false
]);
$loop = \React\EventLoop\Factory::create();
$filesystem = \React\Filesystem\Filesystem::create($loop);
$filesystem->getContents($file)->then(function($contents) use ($config, $proccessor, $client) {
$contents = $proccessor($contents);
$client->post($config['uri'], ['body' => $contents]);
});
}
$config = [
'domain' => 'example.com',
'ssl' => true
];
//somewhere later
$configp['uri'] = 'products';
async_send($configp, __DIR__ . 'my.csv', function ($contents) {
return json_encode($contents);
});
发布于 2018-12-07 06:09:16
以防其他人试图解决类似的问题
<?php
require_once __DIR__ . '/../vendor/autoload.php';
use React\HttpClient\Client as ReactClient;
function async_send($loop, $filePath, callable $proccessor)
{
echo "starting";
echo "\n\r";
try {
$filesystem = \React\Filesystem\Filesystem::create($loop);
$file = $filesystem->file($filePath);
$file->open('r')
->then(function ($stream) use ($loop, $proccessor){
$stream->on('data', function ($chunk) use ($loop, $proccessor) {
$proccessor($chunk);
});
});
} catch (\Exception $e) {
echo "failed";
echo "\n\r";
}
echo "ending reading";
echo "\n\r";
}
function callApiReal($loop, $fileChunk = null)
{
echo "ready to call api". PHP_EOL;
$uri = "https://testapi.com/";
try {
$client = new ReactClient($loop);
} catch (\Exception $e) {
echo "Error";
}
echo "ready to call api";
$request = $client->request('POST', $uri, $fileChunk);
$request->on('response', function ($response) use ($uri) {
$response->on('data', function ($data_chunk) {
echo 'data chunk from api received';
echo "\n\r";
});
// subscribe to listen to the end of the response
$response->on('end', function () use ($uri) {
echo "operation has completed";
echo "\n\r";
});
});
$request->on('error', function ($error) {
// something went bad in the request
echo "Damm!";
echo "\n\r";
});
$request->end();
}
// main loop
$loop = React\EventLoop\Factory::create();
//somewhere later
async_send($loop, __DIR__ . '/my.csv', function ($chunk) use ($loop) {
echo "calling api";
callApiReal($loop, $chunk);
echo "\n\r";
});
$loop->run();
https://stackoverflow.com/questions/53598490
复制相似问题