最近因为项目需要,自己写了个单生产者-多消费者的消息队列模型。多线程真的不是等闲之辈能玩儿的,我花了两个小时进行设计与编码,却花了两天的时间调试与运行。在这里,我把我遇到的坑与大家分享。
一开始我需要实现一个记录用户操作日志的功能,目的是给商家用户提供客户行为分析的能力。要记录的信息包括客户的访问时间、IP、在网站上所做的操作等。其中,客户的地域信息是个重要的分析项,所以必须要把IP转化成省市县。那么究竟何时完成这个转化的动作呢?有两种方案: 1. 在用户进行数据分析时完成转化 2. 在用户进行数据分析前完成转化 第一种方案显然不靠谱,因为需要转化的IP数量很大,而且转化采用第三方接口,因此整个转化过程将持续很长很长很长……的时间。
而在分析前就把转化过程完成掉,这样当用户需要分析的时候就可以减少这部分时间的开销,提高了响应速度。因此第二种方案显然比较合理。
那么随之而来的问题是:究竟在数据分析前的哪个时机进行转化? 这个问题又有两种方案: 1. 在记录日志的时候就立即完成IP向省市县的转换; 2. 每天半夜服务器统一把当天的IP转化成省市县; 这两种方案应该来说各有千秋。 第一种方案比较消耗服务器资源,因为IP向省市县转化需要向第三方接口发送GET请求,因此需要消耗一定的出口带宽和内存资源,在服务器资源一定的前提下,分给用户访问的资源就会被减少,从而可能会影响请求响应速度。但这个问题可以用钱来解决,只要花钱砸服务器就行了;而第二种方案在服务器空闲的时候进行转化虽然节约了服务器资源,但这也导致了商家的分析结果会有一天的滞后,影响用户体验。
于是,这个问题就变成了老板的钱重要还是用户体验重要。因此我毫不犹豫地选择了第一种方案。
我使用Servlet Filter拦截用户的所有请求,并在Filter中获取用户的各项信息(其中包括IP),然后再请求第三方接口,完成IP向省市县的转化,最后将这些信息入库。
这个流程很显然有重大缺陷:请求响应时间将被拉的很长。 因为Filter是同步的,只有当Filter中的任务完成后才会放行用户的请求,而这个Filter中有两处耗时操作:请求第三方接口、数据入库,这无疑增加了用户的等待时间。
因此,我需要将耗时操作异步执行,减少Filter的阻塞时间。
我把这两个耗时操作放入一个新线程中,只要请求一来,就创建一条新线程去处理这两步操作。和先前的方式比对之后发现,确实响应速度提高了不少!
但仔细一想,发现不妙。这种方式没办法控制线程的数量,当访问量很高的情况下,线程数量将会无限增加,这时候会搞垮服务器的!
所以需要一个机制来管理所有的线程,于是我就设计了一个消息队列模型。
这个模型很简单,由一个任务队列和多个工作线程组成。生产者只需不停地往任务队列中添加任务,消费者(工作线程)不停地从任务队列的另一端取任务执行。
这个模型在项目中的应用是这样的:当一个请求被Filter拦截后,Filter从请求中获取用户的各项信息,然后把这些信息封装成一个任务对象,扔给任务队列,此刻这个Filter的使命就完成了,它完全不用管任务的执行过程。工作线程会不停地从任务队列中取任务执行。
从代码层面来看,整个消息队列由三个类构成:
这个类管理整个消息队列的运行,是主控程序,它包含以下方法:
工作线程会不断地检查任务队列中是否有任务,若有任务,就会取一个任务执行;若没有任务,就会等待一定时间后再次检查。 它是MsgQueue的一个内部类。因为WorkThread的行为完全由MsgQueue管理,外界不需要知道它的存在。
它是一个接口,并且只有一个函数run,用于封装任务具体的执行过程。
以下代码还没将消息队列单独抽象出来,相当于是一个专门用于IP向省市县转化的消息队列,有空把它整一下。 代码中有详细的注释来解释线程安全性问题。
package com.sdata.foundation.web.filter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Map;
import javax.servlet.FilterConfig;
import javax.servlet.ServletContext;
import org.apache.log4j.Logger;
import org.springframework.web.context.support.WebApplicationContextUtils;
import org.springframework.web.context.support.XmlWebApplicationContext;
import com.sdata.foundation.web.service.util.DelDataUtilService;
import com.sdata.foundation.web.service.util.InsertDataUtilService;
import com.sdata.foundation.web.service.util.QueryDataUtilService;
import com.thinkgem.jeesite.modules.sys.service.LogService;
/**
* 记录请求IP的消息队列
* @author Chai
*
*/
public class RecordLocationMQ {
// 工作线程的个数
private static int MaxWorkThread;
// 工作线程队列
private static List<WorkThread> workThreadQueue = new ArrayList<WorkThread>();
// 任务队列(存放等待执行的任务)
private static List<RecordLocationTask> msgQueue = new ArrayList<RecordLocationTask>();
// 控制所有工作线程的运行与否
private static boolean isRunning = true;
private static LogService LogService;
// 数据库查询的service(用于任务的持久化)
private static QueryDataUtilService QueryService;
// 数据库删除的service(用于任务的持久化)
private static DelDataUtilService DelService;
// 数据库插入的service(用于任务的持久化)
private static InsertDataUtilService InsertService;
// 日志
private static final Logger logger = Logger.getLogger(RecordLocationMQ.class);
// 一些常量
private static final int SUCCESS = 1;
private static final int FAIL = 0;
/**
* 本消息队列的初始化函数
* @param config 用于获取数据库操作的service
*/
public static void init ( FilterConfig config ) {
RecordLocationMQ.init( 10, config );
}
/**
* 本消息队列的初始化函数
* @param MaxWorkThread 工作线程的个数
* @param config 用于获取数据库操作的service
*/
public static void init ( int MaxWorkThread, FilterConfig config ) {
RecordLocationMQ.MaxWorkThread = MaxWorkThread;
// 初始化LogService
if (null == RecordLocationMQ.LogService) {
ServletContext sc = config.getServletContext();
XmlWebApplicationContext cxt = (XmlWebApplicationContext) WebApplicationContextUtils.getWebApplicationContext(sc);
if (cxt != null && cxt.getBean("logService") != null) {
RecordLocationMQ.LogService = (LogService) cxt.getBean("logService");
}
}
// 初始化QueryService
if (null == RecordLocationMQ.QueryService) {
ServletContext sc = config.getServletContext();
XmlWebApplicationContext cxt = (XmlWebApplicationContext) WebApplicationContextUtils.getWebApplicationContext(sc);
if (cxt != null && cxt.getBean("queryDataUtilService") != null) {
RecordLocationMQ.QueryService = (QueryDataUtilService) cxt.getBean("queryDataUtilService");
}
}
// 初始化DelService
if (null == RecordLocationMQ.DelService) {
ServletContext sc = config.getServletContext();
XmlWebApplicationContext cxt = (XmlWebApplicationContext) WebApplicationContextUtils.getWebApplicationContext(sc);
if (cxt != null && cxt.getBean("delDataUtilService") != null) {
RecordLocationMQ.DelService = (DelDataUtilService) cxt.getBean("delDataUtilService");
}
}
// 初始化InsertService
if (null == RecordLocationMQ.InsertService) {
ServletContext sc = config.getServletContext();
XmlWebApplicationContext cxt = (XmlWebApplicationContext) WebApplicationContextUtils.getWebApplicationContext(sc);
if (cxt != null && cxt.getBean("insertDataUtilService") != null) {
RecordLocationMQ.InsertService = (InsertDataUtilService) cxt.getBean("insertDataUtilService");
}
}
// 从DB中加载尚未完成的任务
// PS:在新线程中执行,防止tomcat启动时间过长
new RecordLocationMQ().new loadTaskThread().start();
// 初始化工作线程,并开始工作
initWorkThread( MaxWorkThread, workThreadQueue );
}
/**
* 初始化工作线程,并开始工作
* @param maxWorkThread 工作线程数量
* @param workThreadQueue 工作线程队列
*/
private static void initWorkThread(int maxWorkThread, List<WorkThread> workThreadQueue) {
for ( int i=0; i<maxWorkThread; i++ ) {
WorkThread workThread = new RecordLocationMQ().new WorkThread("WorkThread"+(i+1));
workThreadQueue.add( workThread );
workThread.start();
System.out.println("已开启线程:WorkThread"+(i+1));
}
}
/**
* 从DB中加载尚未完成的任务
* 并插入传入的消息队列中
* @param msgQueue
* @param logger
* @param logService
*/
private static void loadTask ( List<RecordLocationTask> msgQueue, QueryDataUtilService QueryService, DelDataUtilService DelService ) {
String querySQL = "select * from sys_log_temp";
String delSQL = "delete from sys_log_temp";
// 查询DB中的任务
try {
List<Map<String, Object>> queryResultList = QueryService.query( Arrays.asList( querySQL ) );
for ( Map<String,Object> map : queryResultList ) {
String ip = map.get("ip").toString();
String logId = map.get("log_id").toString();
if ( null!=ip && null!=logId ) {
RecordLocationMQ.addTask( new RecordLocationTask(ip, logId, LogService ) );
}
}
}
// 查询失败,不能执行delte操作
catch (Exception e) {
e.printStackTrace();
return;
}
// 清空DB中的任务
DelService.del( Arrays.asList( delSQL ) );
}
/**
* 持久化当前任务队列
*/
public static void saveTask () {
// PS1:为什么要使用同步?
// Java不允许在遍历集合过程中新增/删除元素,
// 因此在遍历任务队列前必须要先冻结任务队列,
// 防止其他线程新增/删除元素;
// 此外,为了冻结任务队列,就必须使用msgQueue锁。
// PS2:为何要先使isRunning为false?
// 将isRunning设为false能立即停止所有工作线程(见PS3),
// 从而所有工作线程都将释放msgQueue锁,
// 从而确保这里的同步块能顺利拿到msgQueue锁。
// 若不执行isRunning = false的话,
// 所有工作线程就会继续执行,
// 如果任务队列为空,工作线程就会一直持有msgQueue锁,并等待任务的到来,
// 然而添加任务的功能得在当前同步块执行完成后才会执行,
// 因此就出现了死锁。
// PS3:为何将isRunning设为false就能立即停止所有工作线程?
// 因为isRunning为工作线程的共享资源,
// 并且工作线程的运行依赖于它的值;
// 因此当isRunning设为false后,
// 工作线程执行完当前任务或发现任务队列为空后,就会纷纷停止。
isRunning = false;
synchronized ( msgQueue ) {
for ( RecordLocationTask task : msgQueue ) {
if ( task!=null && !task.persisted ) {
int result = InsertService.insert( Arrays.asList( "insert into sys_log_temp (id, ip, log_id) values('"+new Date().getTime()+"', '"+task.getIp()+"', '"+task.getLogId()+"')" ) );
if ( result == SUCCESS ) {
task.persisted = true;
}
}
}
isRunning = true;
}
}
/**
* 向任务队列中添加一个任务
* @param task 任务对象
*/
public static void addTask ( RecordLocationTask task ) {
// 添加任务
// PS:加判断的原因:由于当前这个类的这个函数是提供给别人使用的,
// 我们没办法保证别人一定会传入一个非空的task,
// 因此加个判断能提高程序的健壮性。
if ( task!=null ) {
// PS1:加同步块的原因:由于msgQueue是ArrayList类型,
// ArrayList所有函数都是线程不安全的,
// 这里加一个同步块使add函数具有原子性。
// PS2:千万不能使用msgQueue作为锁!
// 因为工作线程获取一个任务的过程,使用的锁就是msgQueue,
// 并且在这个过程中,如果任务队列为空就会一直循环等待,
// 因此在等待的过程中工作线程就一直占用的msgQueue锁;
// 然而如果这里添加任务还需要msgQueue锁,那么就会出现死锁,
// 工作线程因为任务队列为空就一直占用着msgQueue锁,
// 而添加任务的进程获取不到msgQueue锁就无法添加任务。
synchronized ( new Object() ) {
msgQueue.add(task);
// System.out.println("向消息队列添加了一条task!");
}
}
// 持久化任务队列
// PS:不使用同步的原因:这里对于数据的实时性要求没那么高。
if ( msgQueue.size() > 100 ) {
saveTask();
}
}
public static void stop () {
isRunning = false;
}
/**
* 工作线程内部类
*/
private class WorkThread extends Thread {
public WorkThread ( String threadName ) {
super(threadName);
}
@Override
public void run() {
RecordLocationTask task = null;
while ( isRunning ) {
// 获取一个任务
synchronized ( msgQueue ) {
// 任务队列为空,则等待
while ( isRunning && msgQueue.isEmpty() ) {
// System.out.println("消息队列为空!");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 取一个任务
// PS:加判断的原因:上述while循环的结束有两种可能:
// 1.msgQueue不为空;
// 2.isRunning为false
// 因此要加判断排除msgQueue为空,但isRunning为false的情况,
// 防止msgQueue.remove时出现空指针!
if ( !msgQueue.isEmpty() ) {
task = msgQueue.remove(0);
}
// System.out.println(this.getName() + "取了一个task!");
}
// 执行任务
// PS1:加try-catch的原因:捕获任务执行过程中发生的一切异常,
// 只要发生异常,就说明该任务执行失败,
// 因此需要把它重新放进任务队列等待下一次执行。
// PS2:加判断的原因:同上述“取一个任务”加判断的原因一样。
try {
if ( task!=null ) {
task.run();
}
} catch (Exception e) {
// e.printStackTrace();
RecordLocationMQ.addTask( task ); // 使用addTask函数添加,统一添加的入口
}
}
}
}
/**
* 从数据库加载任务的内部类
*/
private class loadTaskThread extends Thread {
@Override
public void run() {
String querySQL = "select * from sys_log_temp";
String delSQL = "delete from sys_log_temp";
// 查询DB中的任务
try {
List<Map<String, Object>> queryResultList = QueryService.query( Arrays.asList( querySQL ) );
for ( Map<String,Object> map : queryResultList ) {
String ip = map.get("ip").toString();
String logId = map.get("log_id").toString();
if ( null!=ip && null!=logId ) {
RecordLocationMQ.addTask( new RecordLocationTask(ip, logId, LogService ) );
}
}
}
// 查询失败,不能执行delte操作
catch (Exception e) {
e.printStackTrace();
return;
}
// 清空DB中的任务
DelService.del( Arrays.asList( delSQL ) );
}
}
// 禁用构造函数
private RecordLocationMQ () {}
}
package com.sdata.foundation.web.filter;
public interface Task {
public void run() throws Exception;
}
package com.sdata.foundation.web.filter;
import java.util.Date;
import org.apache.log4j.Logger;
import com.thinkgem.jeesite.common.utils.IdGen;
import com.thinkgem.jeesite.modules.sys.entity.Log;
import com.thinkgem.jeesite.modules.sys.service.LogService;
public class RecordLocationTask implements Task {
private static final Logger logger = Logger.getLogger(RecordLocationTask.class);
private LogService logService;
private String ip;
private String logId;
//
public boolean persisted = false;
public RecordLocationTask(String ip, String logId, LogService logService ) {
super();
this.logService = logService;
this.ip = ip;
this.logId = logId;
}
@Override
public void run() throws Exception {
// 查询IP
if ( (new Date().getTime() - TransferIPTool.lastOperaTime) < 100 ){
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
String location = TransferIPTool.transferIP(ip);
// 更新log
Log log = new Log();
log.setIsNewRecord(false);
log.setId(logId);
log.setLocation(location);
logService.save(log);
System.out.println("完成一个task!");
}
public String getIp() {
return ip;
}
public String getLogId() {
return logId;
}
}
package com.sdata.foundation.web.filter;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import com.alibaba.fastjson.JSONObject;
import com.sdata.foundation.web.service.util.HTTPHelper;
/**
*
* @author Chai
* 本类用于将IP转化为位置信息
*/
public class TransferIPTool {
// 本次请求taobao接口的开始时间
public static long lastOperaTime = new Date().getTime();
// 模拟IP队列
private static List<String> IPList = Arrays.asList("49.65.250.135","115.28.217.42","114.80.166.240","122.92.218.0","218.28.191.23","218.12.41.179","221.239.16.227","59.108.49.35","124.117.66.0","218.21.128.31","116.52.147.50");
// IP转换接口
private static final String RequestIP = "http://ip.taobao.com/service/getIpInfo.php";
/**
* 将IP转化为省份
* @param ip
* @return 省份字符串
* @throws Exception
*/
public static String transferIP ( String ip ) throws Exception {
// 记录本次请求taobao接口的开始时间
TransferIPTool.lastOperaTime = new Date().getTime();
// 打乱IPList
Collections.shuffle( IPList );
String resultJsonStr = HTTPHelper.executeGet(RequestIP + "?ip=" + IPList.get(0));
JSONObject resultJsonObj = JSONObject.parseObject( resultJsonStr );
JSONObject data = resultJsonObj.getJSONObject("data");
return data.getString("region");
}
}