我有一个多线程的Perl爬虫,它可以很好地工作,如果我在array.How中声明URL,即使我从数据库中读取URL,我得到了“分段失败”error.Please帮助我修复这个issue.Thanks
直接URL声明
use 5.012; use warnings;
use threads;
use Thread::Queue;
use LWP::UserAgent;
use constant THREADS => 10;
my $queue = Thread::Queue->new();
my @URLs =qw(http://www.example.com
http://www.example.com1
http://www.example.com2 );
print @URLs;
my @threads;
for (1..THREADS) {
push @threads, threads->create(sub {
my $ua = LWP::UserAgent->new;
$ua->timeout(5); # short timeout for easy testing.
while(my $task = $queue->dequeue) {
my $response = eval{ $ua->get($task)->status_line };
say "$task --> $response";
}
});
}
$queue->enqueue(@URLs);
$queue->enqueue(undef) for 1..THREADS;
# ... here work is done
$_->join foreach @threads;正在尝试从数据库中读取URL
my $dbh = DBI->connect("DBI:mysql:$database;host=$server", $username, $password) # Get the rows from database
|| die "Could not connect to database: $DBI::errstr";
my $sth = $dbh->prepare('select cname,url,xpath,region from competitors') #query to select required fields
|| die "$DBI::errstr";
$sth->execute();
if ($sth->rows < 0) {
print "Sorry, no domains found.\n";
}
else {
while (my $results = $sth->fetchrow_hashref) {
my $competitor= $results->{cname};
my $url = $results->{url};
my $xpath = $results->{xpath};
my $region = $results->{region};
push(my @all,$url);
use constant THREADS => 10;
my $queue = Thread::Queue->new();
my @URLs=@all;
my @threads;
for (1..THREADS) {
push @threads, threads->create(sub {
my $ua = LWP::UserAgent->new;
$ua->timeout(500); # short timeout for easy testing.
while(my $task = $queue->dequeue) {
my $response = eval{ $ua->get($task)->status_line };
print "$task --> $response";
}
});
}
$queue->enqueue( @URLs);
$queue->enqueue(undef) for 1..THREADS;
# ... here work is done
$_->join foreach @threads;
}
} #close db
$sth->finish;
$dbh->disconnect;预期为o/p
www.example.com-->200 ok
www.example.com1-->200 ok当前o/p
分割错误
发布于 2015-01-22 05:43:53
当您创建线程时,您的$sth和$dbh仍然存在,并创建了它们的副本,即no-no。
新创建的线程必须与数据库建立自己的连接。句柄不能在线程之间共享。
更好的变量作用域应该可以避免这个问题。
use strict;
use warnings;
use threads;
use Thread::Queue 3.01 qw( );
use constant NUM_WORKERS => 10;
sub worker {
my ($ua, $url) = @_;
...
}
{
my $q = Thread::Queue->new();
for (1..NUM_WORKERS) {
async {
my $ua = LWP::UserAgent->new();
while ( my $url = $q->dequeue() ) {
eval { worker($ua, $url); 1 }
or warn $@;
}
};
}
{
my $dbh = DBI->connect(..., { RaiseError => 1 });
my $sth = $dbh->prepare('SELECT ...');
$sth->execute();
while ( my $row = $sth->fetchrow_hashref() ) {
$q->enqueue($row->{url});
}
}
$q->end();
$_->join for threads->list;
}发布于 2015-01-22 05:22:28
您应该在while循环外部声明@all,然后,当URL被推送时,关闭该循环并继续
my $dbh = DBI->connect("DBI:mysql:$database;host=$server", $username, $password) # Get the rows from database
|| die "Could not connect to database: $DBI::errstr";
my $sth = $dbh->prepare('select cname,url,xpath,region from competitors') #query to select required fields
|| die "$DBI::errstr";
$sth->execute();
# >> declare your URL-array before starting to fetch
my @URLs;
if ($sth->rows < 0) {
print "Sorry, no domains found.\n";
}
else {
while (my $results = $sth->fetchrow_hashref) {
my $competitor= $results->{cname};
my $url = $results->{url};
my $xpath = $results->{xpath};
my $region = $results->{region};
push(@URLs,$url);
}
}
$sth->finish;
$dbh->disconnect;
use constant THREADS => 10;
my $queue = Thread::Queue->new();
my @threads;
for (1..THREADS) {
push @threads, threads->create(sub {
my $ua = LWP::UserAgent->new;
$ua->timeout(500); # short timeout for easy testing.
while(my $task = $queue->dequeue) {
my $response = eval{ $ua->get($task)->status_line };
print "$task --> $response";
}
});
}
$queue->enqueue( @URLs);
$queue->enqueue(undef) for 1..THREADS;
# ... here work is done
$_->join foreach @threads;发布于 2015-01-22 05:42:34
由于perl代码的原因,段错误非常少见。它们与内存相关,通常意味着外部二进制文件中存在问题。(我把赌注押在DBI上)
特别是线程有很多遗留问题-尽管在新版本的perl中它们会变得更好。如果还没有升级到perl的最新版本,我强烈建议您考虑将其升级到最新版本。我知道这并不总是一个选择,但这是一个好主意。
这真的很难猜测你的问题,因为我没有你的数据库,所以我不能重新创建它。
我建议你通常可以做一些事情来保持线程的“整洁”--你的代码的工作方式是DB句柄在线程的作用域中。我会避免这样做。在顶部声明thread,并尽可能缩小范围。
不过,我会注意到:
push ( my @all, $url ); 可能并不像你想的那样!
但是,是的,拿你的代码来说,我会这样写:
#!/usr/bin/perl
use strict;
use warnings;
use threads;
use Thread::Queue;
use LWP;
my $num_threads = 10;
my $work_q = Thread::Queue->new();
sub worker {
my $ua = LWP::UserAgent->new;
$ua->timeout(500); # short timeout for easy testing.
while ( my $task = $work_q->dequeue ) {
my $response = eval { $ua->get($task)->status_line };
print "$task --> $response";
}
}
## fetch_list
sub fetch_url_list {
my $dbh = DBI->connect( "DBI:mysql:$database;host=$server",
$username, $password ) # Get the rows from database
|| die "Could not connect to database: $DBI::errstr";
my $sth =
$dbh->prepare( 'select cname,url,xpath,region from competitors'
) #query to select required fields
|| die "$DBI::errstr";
$sth->execute();
if ( $sth->rows < 0 ) {
print "Sorry, no domains found.\n";
}
else {
while ( my $results = $sth->fetchrow_hashref ) {
my $competitor = $results->{cname};
my $url = $results->{url};
my $xpath = $results->{xpath};
my $region = $results->{region};
$work_q -> enqueue ( $url );
}
}
$sth->finish;
$dbh->disconnect;
}
for ( 1 .. $num_threads ) {
threads->create( \&worker );
}
fetch_url_list();
$work_q->end;
foreach my $thr ( threads->list() ) {
$thr->join();
}这样--你的线程都没有“作用域”中的DB内容,而DB也没有“作用域”中的线程内容。这降低了“污染”给你带来问题的几率。尤其是线程,当它们开始“复制”当前作用域中的所有内容时,当它们是对象时,可能会做一些非常奇怪的事情。(例如,DB句柄)
如果做不到这一点,我会考虑使用“分叉”方法。线程很擅长来回传递数据,但是当你不需要来回传递数据时,fork通常更高效(在基于Unix的系统上是肯定的)(你不需要,你只是运行测试并打印结果)。
https://stackoverflow.com/questions/28076879
复制相似问题