首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Perl并行爬虫多线程

Perl并行爬虫多线程
EN

Stack Overflow用户
提问于 2015-01-22 05:11:31
回答 3查看 119关注 0票数 0

我有一个多线程的Perl爬虫,它可以很好地工作,如果我在array.How中声明URL,即使我从数据库中读取URL,我得到了“分段失败”error.Please帮助我修复这个issue.Thanks

直接URL声明

代码语言:javascript
运行
复制
use 5.012; use warnings;
use threads;
use Thread::Queue;
use LWP::UserAgent;

use constant THREADS => 10;

my $queue = Thread::Queue->new();
my @URLs =qw(http://www.example.com
http://www.example.com1
http://www.example.com2 );

print @URLs;
my @threads;

for (1..THREADS) {
    push @threads, threads->create(sub {
        my $ua = LWP::UserAgent->new;
        $ua->timeout(5); # short timeout for easy testing.
        while(my $task = $queue->dequeue) {
            my $response = eval{ $ua->get($task)->status_line };
            say "$task --> $response";
        }
    });
}

$queue->enqueue(@URLs);
$queue->enqueue(undef) for 1..THREADS;
# ... here work is done
$_->join foreach @threads;

正在尝试从数据库中读取URL

代码语言:javascript
运行
复制
my $dbh = DBI->connect("DBI:mysql:$database;host=$server", $username, $password) # Get the rows from database
    || die "Could not connect to database: $DBI::errstr";

my $sth = $dbh->prepare('select cname,url,xpath,region from competitors')    #query to select required fields
    || die "$DBI::errstr";

$sth->execute();

if ($sth->rows < 0) {
    print "Sorry, no domains found.\n";
}
else {                                                
    while (my $results = $sth->fetchrow_hashref) {
        my $competitor= $results->{cname};                      
        my $url = $results->{url};                         
        my $xpath = $results->{xpath};
        my $region = $results->{region};

        push(my @all,$url);   

        use constant THREADS => 10;
        my $queue = Thread::Queue->new();
        my @URLs=@all;
        my @threads;

        for (1..THREADS) {
            push @threads, threads->create(sub {
                my $ua = LWP::UserAgent->new;
                $ua->timeout(500); # short timeout for easy testing.
                while(my $task = $queue->dequeue) {
                    my $response = eval{ $ua->get($task)->status_line };
                    print  "$task --> $response";
                }
            });
        }

        $queue->enqueue( @URLs);
        $queue->enqueue(undef) for 1..THREADS;
        # ... here work is done
        $_->join foreach @threads;
    }

}  #close db

$sth->finish;
$dbh->disconnect;

预期为o/p

代码语言:javascript
运行
复制
www.example.com-->200 ok

www.example.com1-->200 ok

当前o/p

分割错误

EN

回答 3

Stack Overflow用户

发布于 2015-01-22 05:43:53

当您创建线程时,您的$sth$dbh仍然存在,并创建了它们的副本,即no-no

新创建的线程必须与数据库建立自己的连接。句柄不能在线程之间共享。

更好的变量作用域应该可以避免这个问题。

代码语言:javascript
运行
复制
use strict;
use warnings;
use threads;
use Thread::Queue 3.01 qw( );

use constant NUM_WORKERS => 10;

sub worker {
   my ($ua, $url) = @_;
   ...
}

{
   my $q = Thread::Queue->new();

   for (1..NUM_WORKERS) {
      async {
         my $ua = LWP::UserAgent->new();
         while ( my $url = $q->dequeue() ) {
            eval { worker($ua, $url); 1 }
               or warn $@;
         }
      };
   }

   {
      my $dbh = DBI->connect(..., { RaiseError => 1 });
      my $sth = $dbh->prepare('SELECT ...');
      $sth->execute();
      while ( my $row = $sth->fetchrow_hashref() ) {
         $q->enqueue($row->{url});
      }
   }

   $q->end();
   $_->join for threads->list;
}
票数 2
EN

Stack Overflow用户

发布于 2015-01-22 05:22:28

您应该在while循环外部声明@all,然后,当URL被推送时,关闭该循环并继续

代码语言:javascript
运行
复制
my $dbh = DBI->connect("DBI:mysql:$database;host=$server", $username, $password) # Get the rows from database
    || die "Could not connect to database: $DBI::errstr";

my $sth = $dbh->prepare('select cname,url,xpath,region from competitors')    #query to select required fields
    || die "$DBI::errstr";

$sth->execute();

# >> declare your URL-array before starting to fetch
my @URLs;
if ($sth->rows < 0) {
    print "Sorry, no domains found.\n";
}

else {


    while (my $results = $sth->fetchrow_hashref) {
        my $competitor= $results->{cname};                      
        my $url = $results->{url};                         
        my $xpath = $results->{xpath};
        my $region = $results->{region};

        push(@URLs,$url);   
    }

}  

$sth->finish;
$dbh->disconnect;

use constant THREADS => 10;
my $queue = Thread::Queue->new();
my @threads;

for (1..THREADS) {
        push @threads, threads->create(sub {
        my $ua = LWP::UserAgent->new;
        $ua->timeout(500); # short timeout for easy testing.
        while(my $task = $queue->dequeue) {
            my $response = eval{ $ua->get($task)->status_line };
            print  "$task --> $response";
        }
    });
}

$queue->enqueue( @URLs);
$queue->enqueue(undef) for 1..THREADS;
# ... here work is done
$_->join foreach @threads;
票数 0
EN

Stack Overflow用户

发布于 2015-01-22 05:42:34

由于perl代码的原因,段错误非常少见。它们与内存相关,通常意味着外部二进制文件中存在问题。(我把赌注押在DBI上)

特别是线程有很多遗留问题-尽管在新版本的perl中它们会变得更好。如果还没有升级到perl的最新版本,我强烈建议您考虑将其升级到最新版本。我知道这并不总是一个选择,但这是一个好主意。

这真的很难猜测你的问题,因为我没有你的数据库,所以我不能重新创建它。

我建议你通常可以做一些事情来保持线程的“整洁”--你的代码的工作方式是DB句柄在线程的作用域中。我会避免这样做。在顶部声明thread,并尽可能缩小范围。

不过,我会注意到:

代码语言:javascript
运行
复制
push ( my @all, $url ); 

可能并不像你想的那样!

但是,是的,拿你的代码来说,我会这样写:

代码语言:javascript
运行
复制
#!/usr/bin/perl
use strict;
use warnings;

use threads;
use Thread::Queue;
use LWP;

my $num_threads = 10;

my $work_q = Thread::Queue->new();

sub worker {
    my $ua = LWP::UserAgent->new;
    $ua->timeout(500);    # short timeout for easy testing.
    while ( my $task = $work_q->dequeue ) {
        my $response = eval { $ua->get($task)->status_line };
        print "$task --> $response";
    }
}


## fetch_list

sub fetch_url_list {
    my $dbh = DBI->connect( "DBI:mysql:$database;host=$server",
        $username, $password )    # Get the rows from database
        || die "Could not connect to database: $DBI::errstr";

    my $sth =
        $dbh->prepare( 'select cname,url,xpath,region from competitors'
        )                         #query to select required fields
        || die "$DBI::errstr";

    $sth->execute();


    if ( $sth->rows < 0 ) {
        print "Sorry, no domains found.\n";
    }
    else {
        while ( my $results = $sth->fetchrow_hashref ) {
            my $competitor = $results->{cname};
            my $url        = $results->{url};
            my $xpath      = $results->{xpath};
            my $region     = $results->{region};

            $work_q -> enqueue ( $url );
        }
    }
    $sth->finish;
    $dbh->disconnect;
}

for ( 1 .. $num_threads ) {
    threads->create( \&worker );
}

fetch_url_list();
$work_q->end;

foreach my $thr ( threads->list() ) {
    $thr->join();
}

这样--你的线程都没有“作用域”中的DB内容,而DB也没有“作用域”中的线程内容。这降低了“污染”给你带来问题的几率。尤其是线程,当它们开始“复制”当前作用域中的所有内容时,当它们是对象时,可能会做一些非常奇怪的事情。(例如,DB句柄)

如果做不到这一点,我会考虑使用“分叉”方法。线程很擅长来回传递数据,但是当你不需要来回传递数据时,fork通常更高效(在基于Unix的系统上是肯定的)(你不需要,你只是运行测试并打印结果)。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/28076879

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档