
在深入技术细节之前,我们首先要理解瓶颈所在。
通过异步与并发编程,我们可以将原本线性增长的总耗时,降低几个数量级,实现近乎与线程数成正比的抓取速度。
在Java生态中,我们有多种武器来实现并发爬虫:
ExecutorService 线程池:这是最经典和核心的工具。它管理着一个线程池,避免了频繁创建和销毁线程的开销,允许我们以提交任务的方式执行并发操作。我们将使用它作为本文的主力。CompletableFuture:Java 8引入的异步编程利器,它能够方便地组合多个异步操作,处理它们的结果或异常,实现非阻塞的回调。本文我们将重点介绍最实用、最普遍的 ExecutorService 线程池 方案。
我们的目标是并发地抓取一批淘宝商品ID对应的详情信息。为了规避复杂的反爬机制,本例将聚焦于核心的并发架构,并使用模拟数据进行演示。
首先,我们将“抓取一个商品”这个操作定义为一个独立的、可执行的任务,它实现了 Runnable 或 Callable 接口。这里我们使用 Callable,因为它可以返回结果。
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import java.net.InetSocketAddress;
import java.net.Proxy;
import java.util.concurrent.Callable;
/**
* 单个商品详情抓取任务
*/
public class ProductDetailFetcher implements Callable<ProductDetail> {
private final String itemId;
private final String proxyHost = "www.16yun.cn";
private final int proxyPort = 5445;
private final String proxyUser = "16QMSOML";
private final String proxyPass = "280651";
public ProductDetailFetcher(String itemId) {
this.itemId = itemId;
}
@Override
public ProductDetail call() throws Exception {
// 1. 模拟或真实地构建目标URL
// 注意:淘宝有强大的反爬虫机制,直接爬取非常困难。
// 此处仅为演示并发结构,URL和解析逻辑已做简化处理。
String url = "https://item.taobao.com/item.htm?id=" + itemId;
// 2. 创建代理对象
Proxy proxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress(proxyHost, proxyPort));
// 3. 发送HTTP请求,获取HTML文档(使用代理)
// 在实际项目中,这里需要配置User-Agent、代理IP、Cookie等以应对反爬。
Document doc = Jsoup.connect(url)
.proxy(proxy) // 设置代理
.userAgent("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36")
.header("Proxy-Authorization", getProxyAuthHeader()) // 设置代理认证
.timeout(30000) // 增加超时时间,因为代理可能会增加延迟
.ignoreContentType(true)
.ignoreHttpErrors(true)
.get();
// 4. 解析HTML,提取商品信息
String title = doc.select("h1[class*='Title']").first().text();
// 价格可能由动态JS渲染,此处解析静态页面可能为空,实战中需用Selenium等工具。
String price = doc.select("span[class*='Price']").first().text();
String shopName = doc.select("a[class*='ShopName']").first().text();
// 5. 封装并返回商品详情对象
return new ProductDetail(itemId, title, price, shopName);
}
/**
* 生成代理认证头信息
*/
private String getProxyAuthHeader() {
String credentials = proxyUser + ":" + proxyPass;
String base64Credentials = java.util.Base64.getEncoder().encodeToString(credentials.getBytes());
return "Basic " + base64Credentials;
}
}这是一个简单的POJO,用于承载抓取到的数据。
/**
* 商品详情数据模型
*/
public class ProductDetail {
private String itemId;
private String title;
private String price;
private String shopName;
// 构造器、Getter和Setter方法、toString方法
public ProductDetail(String itemId, String title, String price, String shopName) {
this.itemId = itemId;
this.title = title;
this.price = price;
this.shopName = shopName;
}
// ... 省略 Getter 和 Setter ...
@Override
public String toString() {
return "ProductDetail{" +
"itemId='" + itemId + '\'' +
", title='" + title + '\'' +
", price='" + price + '\'' +
", shopName='" + shopName + '\'' +
'}';
}
}这是整个爬虫的大脑,负责创建线程池、提交所有任务、并收集结果。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
/**
* 并发爬虫调度引擎
*/
public class ConcurrentCrawlerEngine {
public static void main(String[] args) {
// 模拟一批要抓取的商品ID
List<String> itemIds = List.of("1234567890", "2345678901", "3456789012", "..."); // 可以有很多
// 1. 创建线程池
// corePoolSize: 核心线程数,这里设置为5,可根据机器性能和网络情况调整。
// maximumPoolSize: 最大线程数,设置为10。
// keepAliveTime: 非核心线程空闲存活时间。
// workQueue: 任务队列,使用LinkedBlockingQueue。
ExecutorService executor = new ThreadPoolExecutor(
5, 10, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>()
);
// 2. 创建Future列表,用于保存每个任务的异步结果
List<Future<ProductDetail>> futureList = new ArrayList<>();
// 3. 提交所有任务
System.out.println("开始提交抓取任务...");
for (String itemId : itemIds) {
ProductDetailFetcher fetcher = new ProductDetailFetcher(itemId);
Future<ProductDetail> future = executor.submit(fetcher);
futureList.add(future);
}
// 4. 关闭线程池,不再接受新任务,但会执行完已提交的任务
executor.shutdown();
// 5. 遍历Future列表,获取任务结果
List<ProductDetail> results = new ArrayList<>();
for (Future<ProductDetail> future : futureList) {
try {
// future.get() 会阻塞,直到该任务完成并返回结果
ProductDetail detail = future.get();
results.add(detail);
System.out.println("成功抓取: " + detail);
} catch (InterruptedException | ExecutionException e) {
// 处理抓取过程中可能出现的异常
System.err.println("抓取任务失败: " + e.getMessage());
}
}
// 6. 所有任务完成,处理最终结果 (例如,存入数据库或文件)
System.out.println("所有任务执行完毕。总共抓取到 " + results.size() + " 个商品。");
// saveToDatabase(results);
}
}实现基础并发只是第一步,一个健壮的工业级爬虫还需要考虑更多:
CPU核心数 * (1 + 平均等待时间/平均计算时间) 这个公式开始估算。LinkedBlockingQueue 还是 SynchronousQueue,决定了线程池的负载策略。Semaphore(信号量)或引入速率限制库(如Guava的RateLimiter)来限流。executor.awaitTermination(long timeout, TimeUnit unit) 来等待线程池优雅关闭。future.get() 时,通过捕获 ExecutionException 来处理任务执行中的异常,确保一个任务的失败不会影响整个爬虫。CompletableFuture:可以实现更复杂的异步流水线,例如:抓取完成后,立即异步地进行数据清洗和存储,进一步提升整体吞吐量。// 使用CompletableFuture的示例片段
List<CompletableFuture<Void>> futures = itemIds.stream()
.map(itemId -> CompletableFuture.supplyAsync(() -> new ProductDetailFetcher(itemId).call(), executor)
.thenApplyAsync(this::saveToDatabase) // 异步存储
.exceptionally(e -> { System.err.println("Error: " + e); return null; })
)
.collect(Collectors.toList());
// 等待所有任务完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();通过利用Java的 ExecutorService 线程池,我们成功地将一个缓慢的单线程淘宝商品爬虫,改造为一个高效、强大的并发数据抓取引擎。这不仅极大地缩短了数据采集的时间,更充分利用了现代多核处理器的计算能力。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。