轻量级线程池的实现

写在前面

最近因为项目需要,自己写了个单生产者-多消费者的消息队列模型。多线程真的不是等闲之辈能玩儿的,我花了两个小时进行设计与编码,却花了两天的时间调试与运行。在这里,我把我遇到的坑与大家分享。

需求的由来

一开始我需要实现一个记录用户操作日志的功能,目的是给商家用户提供客户行为分析的能力。要记录的信息包括客户的访问时间、IP、在网站上所做的操作等。其中,客户的地域信息是个重要的分析项,所以必须要把IP转化成省市县。那么究竟何时完成这个转化的动作呢?有两种方案: 1. 在用户进行数据分析完成转化 2. 在用户进行数据分析完成转化 第一种方案显然不靠谱,因为需要转化的IP数量很大,而且转化采用第三方接口,因此整个转化过程将持续很长很长很长……的时间。

而在分析前就把转化过程完成掉,这样当用户需要分析的时候就可以减少这部分时间的开销,提高了响应速度。因此第二种方案显然比较合理。

那么随之而来的问题是:究竟在数据分析前的哪个时机进行转化? 这个问题又有两种方案: 1. 在记录日志的时候就立即完成IP向省市县的转换; 2. 每天半夜服务器统一把当天的IP转化成省市县; 这两种方案应该来说各有千秋。 第一种方案比较消耗服务器资源,因为IP向省市县转化需要向第三方接口发送GET请求,因此需要消耗一定的出口带宽和内存资源,在服务器资源一定的前提下,分给用户访问的资源就会被减少,从而可能会影响请求响应速度。但这个问题可以用钱来解决,只要花钱砸服务器就行了;而第二种方案在服务器空闲的时候进行转化虽然节约了服务器资源,但这也导致了商家的分析结果会有一天的滞后,影响用户体验。

于是,这个问题就变成了老板的钱重要还是用户体验重要。因此我毫不犹豫地选择了第一种方案。

初步设计

我使用Servlet Filter拦截用户的所有请求,并在Filter中获取用户的各项信息(其中包括IP),然后再请求第三方接口,完成IP向省市县的转化,最后将这些信息入库。

这个流程很显然有重大缺陷:请求响应时间将被拉的很长。 因为Filter是同步的,只有当Filter中的任务完成后才会放行用户的请求,而这个Filter中有两处耗时操作:请求第三方接口、数据入库,这无疑增加了用户的等待时间。

因此,我需要将耗时操作异步执行,减少Filter的阻塞时间。

我把这两个耗时操作放入一个新线程中,只要请求一来,就创建一条新线程去处理这两步操作。和先前的方式比对之后发现,确实响应速度提高了不少!

但仔细一想,发现不妙。这种方式没办法控制线程的数量,当访问量很高的情况下,线程数量将会无限增加,这时候会搞垮服务器的!

所以需要一个机制来管理所有的线程,于是我就设计了一个消息队列模型。

模型设计

这个模型很简单,由一个任务队列和多个工作线程组成。生产者只需不停地往任务队列中添加任务,消费者(工作线程)不停地从任务队列的另一端取任务执行。

这个模型在项目中的应用是这样的:当一个请求被Filter拦截后,Filter从请求中获取用户的各项信息,然后把这些信息封装成一个任务对象,扔给任务队列,此刻这个Filter的使命就完成了,它完全不用管任务的执行过程。工作线程会不停地从任务队列中取任务执行。

类图设计

从代码层面来看,整个消息队列由三个类构成:

消息队列类MsgQueue

这个类管理整个消息队列的运行,是主控程序,它包含以下方法:

  • init:初始化整个消息队列 在初始化过程中,它会依次做以下事情:
    1. 创建一个任务队列
    2. 调用initWorkThread函数,创建指定数量的工作线程(工作线程一旦被创建,就会不停地读取任务队列中的任务)
    3. 调用loadTask函数,从数据库中加载所有任务
  • loadTask:加载数据库中的所有任务 这是一个抽象函数,若要使用这个消息队列,必须实现这个函数。 消息队列初始化的时候会调用这个函数,从数据库中加载上次没有执行完的任务。 作为消息队列来讲,它并不知道你提供的任务是啥,因此它没办法知道你的任务应该存在哪里,以何种形式存储?因此,这个过程就需要让消息队列使用者自己去实现。
  • saveTask:持久化当前任务队列中的任务 这也是个抽象函数,若要使用这个消息队列,也必须实现这个函数。 当使用者调用消息队列的stop函数时,它会被执行,用于存储当前消息队列中尚未被执行的任务,并且在下次启动消息队列的时候通过loadTask函数再次加载进任务队列,这样能确保所有任务不会被遗漏。
  • addTask:向任务队列添加一个任务
  • stop:停止所有工作线程
  • initWorkThread:初始化所有工作线程 这是一个私有函数,当初始化整个消息队列的时候被init函数调用。

工作线程类WorkThread

工作线程会不断地检查任务队列中是否有任务,若有任务,就会取一个任务执行;若没有任务,就会等待一定时间后再次检查。 它是MsgQueue的一个内部类。因为WorkThread的行为完全由MsgQueue管理,外界不需要知道它的存在。

任务类Task

它是一个接口,并且只有一个函数run,用于封装任务具体的执行过程。

附上代码

以下代码还没将消息队列单独抽象出来,相当于是一个专门用于IP向省市县转化的消息队列,有空把它整一下。 代码中有详细的注释来解释线程安全性问题。

  • 消息队列主控程序
package com.sdata.foundation.web.filter;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Map;

import javax.servlet.FilterConfig;
import javax.servlet.ServletContext;

import org.apache.log4j.Logger;
import org.springframework.web.context.support.WebApplicationContextUtils;
import org.springframework.web.context.support.XmlWebApplicationContext;

import com.sdata.foundation.web.service.util.DelDataUtilService;
import com.sdata.foundation.web.service.util.InsertDataUtilService;
import com.sdata.foundation.web.service.util.QueryDataUtilService;
import com.thinkgem.jeesite.modules.sys.service.LogService;

/**
 * 记录请求IP的消息队列
 * @author Chai
 *
 */
public class RecordLocationMQ {
    // 工作线程的个数
    private static int MaxWorkThread;
    // 工作线程队列
    private static List<WorkThread> workThreadQueue = new ArrayList<WorkThread>();
    // 任务队列(存放等待执行的任务)
    private static List<RecordLocationTask> msgQueue = new ArrayList<RecordLocationTask>();
    // 控制所有工作线程的运行与否
    private static boolean isRunning = true;
    private static LogService LogService;
    // 数据库查询的service(用于任务的持久化)
    private static QueryDataUtilService QueryService;
    // 数据库删除的service(用于任务的持久化)
    private static DelDataUtilService DelService;
    // 数据库插入的service(用于任务的持久化)
    private static InsertDataUtilService InsertService;
    // 日志
    private static final Logger logger = Logger.getLogger(RecordLocationMQ.class);

    // 一些常量
    private static final int SUCCESS = 1;
    private static final int FAIL = 0;



    /**
     * 本消息队列的初始化函数
     * @param config 用于获取数据库操作的service
     */
    public static void init (  FilterConfig config ) {
        RecordLocationMQ.init( 10, config );
    }


    /**
     * 本消息队列的初始化函数
     * @param MaxWorkThread 工作线程的个数
     * @param config 用于获取数据库操作的service
     */
    public static void init ( int MaxWorkThread, FilterConfig config ) {

        RecordLocationMQ.MaxWorkThread = MaxWorkThread;

        // 初始化LogService
        if (null == RecordLocationMQ.LogService) {
            ServletContext sc = config.getServletContext();
            XmlWebApplicationContext cxt = (XmlWebApplicationContext) WebApplicationContextUtils.getWebApplicationContext(sc);
            if (cxt != null && cxt.getBean("logService") != null) {
                RecordLocationMQ.LogService = (LogService) cxt.getBean("logService");
            }
        }

        // 初始化QueryService
        if (null == RecordLocationMQ.QueryService) {
            ServletContext sc = config.getServletContext();
            XmlWebApplicationContext cxt = (XmlWebApplicationContext) WebApplicationContextUtils.getWebApplicationContext(sc);
            if (cxt != null && cxt.getBean("queryDataUtilService") != null) {
                RecordLocationMQ.QueryService = (QueryDataUtilService) cxt.getBean("queryDataUtilService");
            }
        }

        // 初始化DelService
        if (null == RecordLocationMQ.DelService) {
            ServletContext sc = config.getServletContext();
            XmlWebApplicationContext cxt = (XmlWebApplicationContext) WebApplicationContextUtils.getWebApplicationContext(sc);
            if (cxt != null && cxt.getBean("delDataUtilService") != null) {
                RecordLocationMQ.DelService = (DelDataUtilService) cxt.getBean("delDataUtilService");
            }
        }

        // 初始化InsertService
        if (null == RecordLocationMQ.InsertService) {
            ServletContext sc = config.getServletContext();
            XmlWebApplicationContext cxt = (XmlWebApplicationContext) WebApplicationContextUtils.getWebApplicationContext(sc);
            if (cxt != null && cxt.getBean("insertDataUtilService") != null) {
                RecordLocationMQ.InsertService = (InsertDataUtilService) cxt.getBean("insertDataUtilService");
            }
        }

        // 从DB中加载尚未完成的任务
        // PS:在新线程中执行,防止tomcat启动时间过长
        new RecordLocationMQ().new loadTaskThread().start();

        // 初始化工作线程,并开始工作
        initWorkThread( MaxWorkThread, workThreadQueue );
    }


    /**
     * 初始化工作线程,并开始工作
     * @param maxWorkThread 工作线程数量
     * @param workThreadQueue 工作线程队列
     */
    private static void initWorkThread(int maxWorkThread, List<WorkThread> workThreadQueue) {
        for ( int i=0; i<maxWorkThread; i++ ) {
            WorkThread workThread = new RecordLocationMQ().new WorkThread("WorkThread"+(i+1));
            workThreadQueue.add( workThread );
            workThread.start();
            System.out.println("已开启线程:WorkThread"+(i+1));
        }
    }


    /**
     * 从DB中加载尚未完成的任务
     * 并插入传入的消息队列中
     * @param msgQueue
     * @param logger 
     * @param logService 
     */
    private static void loadTask ( List<RecordLocationTask> msgQueue, QueryDataUtilService QueryService, DelDataUtilService DelService ) {

        String querySQL = "select * from sys_log_temp";
        String delSQL = "delete from sys_log_temp";

        // 查询DB中的任务
        try {
            List<Map<String, Object>> queryResultList = QueryService.query( Arrays.asList( querySQL ) );
            for ( Map<String,Object> map : queryResultList ) {

                String ip = map.get("ip").toString();
                String logId = map.get("log_id").toString();

                if ( null!=ip && null!=logId ) {
                    RecordLocationMQ.addTask( new RecordLocationTask(ip, logId, LogService ) );
                }
            }
        } 
        // 查询失败,不能执行delte操作
        catch (Exception e) {
            e.printStackTrace();
            return;
        }

        // 清空DB中的任务
        DelService.del( Arrays.asList( delSQL ) );
    }


    /**
     * 持久化当前任务队列
     */
    public static void saveTask () {
        // PS1:为什么要使用同步?
        //    Java不允许在遍历集合过程中新增/删除元素, 
        //    因此在遍历任务队列前必须要先冻结任务队列,
        //    防止其他线程新增/删除元素;
        //    此外,为了冻结任务队列,就必须使用msgQueue锁。
        // PS2:为何要先使isRunning为false?
        //     将isRunning设为false能立即停止所有工作线程(见PS3),
        //     从而所有工作线程都将释放msgQueue锁,
        //     从而确保这里的同步块能顺利拿到msgQueue锁。
        //     若不执行isRunning = false的话,
        //     所有工作线程就会继续执行,
        //     如果任务队列为空,工作线程就会一直持有msgQueue锁,并等待任务的到来,
        //     然而添加任务的功能得在当前同步块执行完成后才会执行,
        //     因此就出现了死锁。
        // PS3:为何将isRunning设为false就能立即停止所有工作线程?
        //     因为isRunning为工作线程的共享资源,
        //     并且工作线程的运行依赖于它的值;
        //     因此当isRunning设为false后,
        //     工作线程执行完当前任务或发现任务队列为空后,就会纷纷停止。
        isRunning = false;
        synchronized ( msgQueue ) {
            for ( RecordLocationTask task : msgQueue ) {
                if ( task!=null && !task.persisted ) {
                    int result = InsertService.insert( Arrays.asList( "insert into sys_log_temp (id, ip, log_id) values('"+new Date().getTime()+"', '"+task.getIp()+"', '"+task.getLogId()+"')" ) );
                    if ( result == SUCCESS ) {
                        task.persisted = true;
                    }
                }
            }
            isRunning = true;
        }
    }

    /**
     * 向任务队列中添加一个任务
     * @param task 任务对象
     */
    public static void addTask ( RecordLocationTask task ) {

        // 添加任务
        // PS:加判断的原因:由于当前这个类的这个函数是提供给别人使用的,
        //    我们没办法保证别人一定会传入一个非空的task,
        //    因此加个判断能提高程序的健壮性。
        if ( task!=null ) {
            // PS1:加同步块的原因:由于msgQueue是ArrayList类型,
            //     ArrayList所有函数都是线程不安全的,
            //     这里加一个同步块使add函数具有原子性。
            // PS2:千万不能使用msgQueue作为锁!
            //     因为工作线程获取一个任务的过程,使用的锁就是msgQueue,
            //     并且在这个过程中,如果任务队列为空就会一直循环等待,
            //     因此在等待的过程中工作线程就一直占用的msgQueue锁;
            //     然而如果这里添加任务还需要msgQueue锁,那么就会出现死锁,
            //     工作线程因为任务队列为空就一直占用着msgQueue锁,
            //     而添加任务的进程获取不到msgQueue锁就无法添加任务。
            synchronized ( new Object() ) {
                msgQueue.add(task);
                // System.out.println("向消息队列添加了一条task!");
            }
        }

        // 持久化任务队列
        // PS:不使用同步的原因:这里对于数据的实时性要求没那么高。
        if ( msgQueue.size() > 100 ) { 
            saveTask();
        }
    }


    public static void stop () {
        isRunning = false;
    }


    /**
     * 工作线程内部类
     */
    private class WorkThread extends Thread {

        public WorkThread ( String threadName ) {
            super(threadName);
        }

        @Override
        public void run() {
            RecordLocationTask task = null;

            while ( isRunning ) {
                // 获取一个任务
                synchronized ( msgQueue ) {
                    // 任务队列为空,则等待
                    while ( isRunning && msgQueue.isEmpty() ) {
                        // System.out.println("消息队列为空!");
                        try {
                            Thread.sleep(200);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                    // 取一个任务
                    // PS:加判断的原因:上述while循环的结束有两种可能:
                    //              1.msgQueue不为空;
                    //              2.isRunning为false
                    // 因此要加判断排除msgQueue为空,但isRunning为false的情况,
                    // 防止msgQueue.remove时出现空指针!
                    if ( !msgQueue.isEmpty() ) {
                        task = msgQueue.remove(0);
                    }
                    // System.out.println(this.getName() + "取了一个task!");
                }

                // 执行任务
                // PS1:加try-catch的原因:捕获任务执行过程中发生的一切异常,  
                //   只要发生异常,就说明该任务执行失败,
                //   因此需要把它重新放进任务队列等待下一次执行。
                // PS2:加判断的原因:同上述“取一个任务”加判断的原因一样。
                try {
                    if ( task!=null ) {
                        task.run();
                    }
                } catch (Exception e) {
                    // e.printStackTrace();
                    RecordLocationMQ.addTask( task ); // 使用addTask函数添加,统一添加的入口
                }
            }
        }
    }

    /**
     * 从数据库加载任务的内部类
     */
    private class loadTaskThread extends Thread {

        @Override
        public void run() {

            String querySQL = "select * from sys_log_temp";
            String delSQL = "delete from sys_log_temp";

            // 查询DB中的任务
            try {
                List<Map<String, Object>> queryResultList = QueryService.query( Arrays.asList( querySQL ) );
                for ( Map<String,Object> map : queryResultList ) {

                    String ip = map.get("ip").toString();
                    String logId = map.get("log_id").toString();

                    if ( null!=ip && null!=logId ) {
                        RecordLocationMQ.addTask( new RecordLocationTask(ip, logId, LogService ) );
                    }
                }
            } 
            // 查询失败,不能执行delte操作
            catch (Exception e) {
                e.printStackTrace();
                return;
            }

            // 清空DB中的任务
            DelService.del( Arrays.asList( delSQL ) );

        }
    }

    // 禁用构造函数
    private RecordLocationMQ () {}
}
  • 任务接口
package com.sdata.foundation.web.filter;

public interface Task {
    public void run() throws Exception;
}
  • 用于IP向省市县转化的任务线程
package com.sdata.foundation.web.filter;

import java.util.Date;

import org.apache.log4j.Logger;

import com.thinkgem.jeesite.common.utils.IdGen;
import com.thinkgem.jeesite.modules.sys.entity.Log;
import com.thinkgem.jeesite.modules.sys.service.LogService;

public class RecordLocationTask implements Task {
    private static final Logger logger = Logger.getLogger(RecordLocationTask.class);
    private LogService logService;
    private String ip;
    private String logId;
    // 
    public boolean persisted = false;

    public RecordLocationTask(String ip, String logId, LogService logService ) {
        super();
        this.logService = logService;
        this.ip = ip;
        this.logId = logId;
    }

    @Override
    public void run() throws Exception {

        // 查询IP
        if ( (new Date().getTime() - TransferIPTool.lastOperaTime) < 100 ){
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        String location = TransferIPTool.transferIP(ip);

        // 更新log
        Log log = new Log();
        log.setIsNewRecord(false);
        log.setId(logId);
        log.setLocation(location);
        logService.save(log);

        System.out.println("完成一个task!");

    }

    public String getIp() {
        return ip;
    }

    public String getLogId() {
        return logId;
    }

}
  • 用于IP向省市县转化的工具类
package com.sdata.foundation.web.filter;

import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;

import com.alibaba.fastjson.JSONObject;
import com.sdata.foundation.web.service.util.HTTPHelper;

/**
 * 
 * @author Chai
 * 本类用于将IP转化为位置信息
 */
public class TransferIPTool {

    // 本次请求taobao接口的开始时间
    public static long lastOperaTime = new Date().getTime();
    // 模拟IP队列
    private static List<String> IPList = Arrays.asList("49.65.250.135","115.28.217.42","114.80.166.240","122.92.218.0","218.28.191.23","218.12.41.179","221.239.16.227","59.108.49.35","124.117.66.0","218.21.128.31","116.52.147.50");
    // IP转换接口
    private static final String RequestIP = "http://ip.taobao.com/service/getIpInfo.php";


    /**
     * 将IP转化为省份
     * @param ip
     * @return 省份字符串
     * @throws Exception 
     */
    public static String transferIP ( String ip ) throws Exception {

        // 记录本次请求taobao接口的开始时间
        TransferIPTool.lastOperaTime = new Date().getTime();

        // 打乱IPList
        Collections.shuffle( IPList );

        String resultJsonStr = HTTPHelper.executeGet(RequestIP + "?ip=" + IPList.get(0));
        JSONObject resultJsonObj = JSONObject.parseObject( resultJsonStr );
        JSONObject data = resultJsonObj.getJSONObject("data");
        return data.getString("region");

    }

}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏大眼瞪小眼

操作系统常用算法

介绍:又称为高级调度或长程调度,调度对象是作业。根据作业控制块(JCB)中的信息,审查系统能否满足用户作业的资源需求,以及按照一定的算法,从外存的后备队列中...

391
来自专栏人工智能LeadAI

ElasticSearch优化系列二:机器设置(内存)

预留一半内存给Lucence使用 一个常见的问题是配置堆太大。你有一个64 GB的机器,觉得JVM内存越大越好,想给Elasticsearch所有64 GB的内...

3804
来自专栏纯洁的微笑

Guava 源码分析(Cache 原理【二阶段】)

在上文「Guava 源码分析(Cache 原理)」中分析了 Guava Cache 的相关原理。

1031
来自专栏机器学习从入门到成神

临界区、互斥量、信号量

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/sinat_35512245/articl...

552
来自专栏Golang语言社区

Goroutine + Channel 实践

背景 在最近开发的项目中,后端需要编写许多提供HTTP接口的API,另外技术选型相对宽松,因此选择Golang + Beego框架进行开发。之所以选择Golan...

3214
来自专栏编码小白

tomcat请求处理分析(五) 请求到响应流

1.1.1.1  请求到响应界面流 请求处理的过程主要是将所有的东西解析成流,转化成对应的http报文,所以在这里我先不关注servlet因为它最终也就是解析成...

3658
来自专栏积累沉淀

storm消息机制

这章讨论Storm's reliability capabilities, 如何保证从spout emit出来的所有tuple都被正确的执行(fully pr...

2593
来自专栏Vamei实验室

Linux信号基础

Linux进程基础一文中已经提到,Linux以进程为单位来执行程序。我们可以将计算机看作一个大楼,内核(kernel)是大楼的管理员,进程是大楼的房客。每个进程...

1805
来自专栏我是攻城师

给Java字节码加上”翅膀“的JIT编译器

上面文章在介绍Java的内存模型的时候,提到过由于编译器的优化会导致重排序的问题,其中一个比较重要的点地方就是关于JIT编译器的功能。JIT的英文单词是Just...

865
来自专栏IT技术精选文摘

从Java视角理解系统结构(三)伪共享

从我的前一篇博文中, 我们知道了CPU缓存及缓存行的概念, 同时用一个例子说明了编写单线程Java代码时应该注意的问题. 下面我们讨论更为复杂, 而且更符合现实...

1817

扫描关注云+社区