我正在尝试创建一个同时处理多个文件的脚本,规则是,每个文件只能处理一次,并且输入文件在处理后将被删除。我创建了这个脚本:
<?php
// Libraries for reading files
require_once "spooler.php";
// Configuration section ///////////////////////////////////////////////////////
$config["data"] = "data";
$config["threads"] = 20;
$config["timer"] = 1;
// Array to store currently processed files
$config["processed_files"] = array();
// Processing section //////////////////////////////////////////////////////////
$timer = 0;
$pool = new Pool($config["threads"], \ProcessingWorker::class);
while (true) {
// Read a number of files from the data folder according to the number of thread
$files = Spooler::read_spool_file($config["data"], $config["threads"]);
foreach ($files as $file) {
// Check if the file is already processed
if (in_array($file, $config["processed_files"])) continue;
// Submit the file to the worker
echo "Submitting $file\n";
$config["processed_files"][$file] = $file;
$pool->submit(new ProcessingJob($config, $file));
}
sleep($config["timer"]);
$timer++;
}
$pool->shutdown();
// Processing thread section ///////////////////////////////////////////////////
class ProcessingJob extends Stackable {
private $config;
private $file;
public function __construct($config, $file)
{
$this->config = $config;
$this->file = $file;
$this->complete = false;
}
public function run()
{
echo "Processing $this->file\n";
// Pretend we're doing something that takes time
sleep(mt_rand(1, 10));
file_put_contents("_LOG", $this->file."\n", FILE_APPEND);
// Delete the file
@unlink($this->file);
// Remove the file from the currently processing list
unset($this->config["processed_files"][$this->file]);
}
}
class ProcessingWorker extends Worker {
public function run() {}
}然而,这段代码并不能很好地工作,它不会两次处理相同的文件,而是有时会跳过处理某些文件。Here's the file list应该处理它,但它只处理these files。
我哪里做错了?
发布于 2014-05-20 13:44:18
日志文件的输出是不同步的,很可能是两个线程同时调用日志文件上的file_put_contents,从而破坏了它的输出。
您不应该以这种方式写入日志文件。
如果$config['processed_files']打算被多个上下文操作,那么它应该是一个线程安全的结构,而不是一个普通的PHP数组。
https://stackoverflow.com/questions/23733884
复制相似问题