最近研究了Java多线程的几种实现方式,写了这个项目用于练手。实现了通过多线程的方式获取人民日报的新闻标题(其实同时存了内容,但是我后续懒得处理了,毕竟这个项目的主题是多线程的实现)
主要思路是:先获取一个新闻链接的列表,然后通过Excutor线程池框架来多线程访问新闻链接,通过正则表达式匹配出标题,加入到线程安全的HashMap中,接着输出即可。至于HTTP请求的方式,直接使用了Apache的HTTPClient类,但是我这里使用了更加简便的Flunt API。
为了减小多线程对人民日报网站的访问压力,使用的是固定大小的Excutor线程池FixedThreadPool,并将线程固定在一个较小的大小上。在线程的同步上,使用了ReentrantLock锁机制。为了防止HTTP连接长时间未返回而造成线程阻塞,在TitanReq类中设置了Timeout来控制超时。
在文章链接列表中,一共有1176个文章,在多线程并发下很快的执行完毕。成功的获取了大部分的标题。( 有一部分请求因为访问超时而被抛弃,这里可以增加一个重试方法)
Code-1
Code-2
package cn.titan6.crawler;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.LinkedList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Main {
public static void main(String[] args) {
boolean exitFlag = true;
LinkedList<String> urlList = getUrlList();
ContentFetch fetch = new ContentFetch(urlList);
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
executor.execute(fetch);
}
executor.shutdown();
do {
try {
exitFlag = !executor.awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
} while (exitFlag);
ConcurrentHashMap<String, String> resultMap = fetch.getResultMap();
for (String title : resultMap.keySet()) {
System.out.println("Title:" + title);
}
}
public static LinkedList<String> getUrlList() {
TitanReq req = new TitanReq();
String response = req.doGet("http://news.people.com.cn/210801/211150/index.js?t=" + Math.random());
LinkedList<String> urlList = new LinkedList<>();
ObjectMapper mapper = new ObjectMapper();
try {
JsonNode jsonNode = mapper.readTree(response).get("items");
for (JsonNode urlNode : jsonNode) {
String url = urlNode.get("url").asText();
urlList.add(url);
}
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return urlList;
}
}
package cn.titan6.crawler;
import java.util.LinkedList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class ContentFetch implements Runnable {
private static final ReentrantLock lock = new ReentrantLock();
private static final ReentrantLock writeLock = new ReentrantLock();
private LinkedList<String> urlList = new LinkedList<>();
private ConcurrentHashMap<String, String> resultMap = new ConcurrentHashMap<>();
private boolean exitFlag;
private Pattern contentPattern = Pattern.compile("<div class=\"box_con\" id=\"rwb_zw\">(.*?)<div class=\"edit clearfix\">",
Pattern.MULTILINE | Pattern.DOTALL);
private Pattern titlePattern = Pattern.compile("<div class=\"clearfix w1000_320 text_title\">.*<h1>(.*?)</h1>",
Pattern.MULTILINE | Pattern.DOTALL);
public ContentFetch(LinkedList<String> urlList) {
this.urlList = urlList;
}
@Override
public void run() {
String currentUrl = null;
while (!exitFlag) {
if (lock.tryLock()) {
if (urlList.isEmpty()) {
exitFlag = true;
break;
}
currentUrl = urlList.pop();
System.out.println("Working at " + Thread.currentThread() + " Left:" + urlList.size());
lock.unlock();
}
if (currentUrl != null) {
TitanReq req = new TitanReq();
String response = req.doGet(currentUrl);
if (response == null) {
continue;
}
String title;
String content;
Matcher contentMatcher = contentPattern.matcher(response);
Matcher titleMatcher = titlePattern.matcher(response);
if (contentMatcher.find() && titleMatcher.find()) {
title = titleMatcher.group(1);
content = contentMatcher.group(1);
} else {
continue;
}
if (writeLock.tryLock()) {
resultMap.put(title, content);
writeLock.unlock();
}
}
}
}
public ConcurrentHashMap<String, String> getResultMap() {
return resultMap;
}
public void pushUrl(String url) {
urlList.add(url);
}
}
package cn.titan6.crawler;
import org.apache.http.client.fluent.Request;
import java.io.UnsupportedEncodingException;
public class TitanReq {
public String doGet(String url) {
String response = null;
byte[] responseBytes = new byte[0];
try {
responseBytes = Request.Get(url)
.connectTimeout(2000)
.socketTimeout(2000)
.execute()
.returnContent()
.asBytes();
} catch (Exception ignored) {
}
if (responseBytes != null) {
try {
response = new String(responseBytes, "GB2312");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
return response;
}
}