前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >轻量级线程池的实现

轻量级线程池的实现

作者头像
大闲人柴毛毛
发布2018-03-09 17:28:56
1.1K0
发布2018-03-09 17:28:56
举报
文章被收录于专栏:大闲人柴毛毛大闲人柴毛毛

写在前面

最近因为项目需要,自己写了个单生产者-多消费者的消息队列模型。多线程真的不是等闲之辈能玩儿的,我花了两个小时进行设计与编码,却花了两天的时间调试与运行。在这里,我把我遇到的坑与大家分享。

需求的由来

一开始我需要实现一个记录用户操作日志的功能,目的是给商家用户提供客户行为分析的能力。要记录的信息包括客户的访问时间、IP、在网站上所做的操作等。其中,客户的地域信息是个重要的分析项,所以必须要把IP转化成省市县。那么究竟何时完成这个转化的动作呢?有两种方案: 1. 在用户进行数据分析完成转化 2. 在用户进行数据分析完成转化 第一种方案显然不靠谱,因为需要转化的IP数量很大,而且转化采用第三方接口,因此整个转化过程将持续很长很长很长……的时间。

而在分析前就把转化过程完成掉,这样当用户需要分析的时候就可以减少这部分时间的开销,提高了响应速度。因此第二种方案显然比较合理。

那么随之而来的问题是:究竟在数据分析前的哪个时机进行转化? 这个问题又有两种方案: 1. 在记录日志的时候就立即完成IP向省市县的转换; 2. 每天半夜服务器统一把当天的IP转化成省市县; 这两种方案应该来说各有千秋。 第一种方案比较消耗服务器资源,因为IP向省市县转化需要向第三方接口发送GET请求,因此需要消耗一定的出口带宽和内存资源,在服务器资源一定的前提下,分给用户访问的资源就会被减少,从而可能会影响请求响应速度。但这个问题可以用钱来解决,只要花钱砸服务器就行了;而第二种方案在服务器空闲的时候进行转化虽然节约了服务器资源,但这也导致了商家的分析结果会有一天的滞后,影响用户体验。

于是,这个问题就变成了老板的钱重要还是用户体验重要。因此我毫不犹豫地选择了第一种方案。

初步设计

我使用Servlet Filter拦截用户的所有请求,并在Filter中获取用户的各项信息(其中包括IP),然后再请求第三方接口,完成IP向省市县的转化,最后将这些信息入库。

这个流程很显然有重大缺陷:请求响应时间将被拉的很长。 因为Filter是同步的,只有当Filter中的任务完成后才会放行用户的请求,而这个Filter中有两处耗时操作:请求第三方接口、数据入库,这无疑增加了用户的等待时间。

因此,我需要将耗时操作异步执行,减少Filter的阻塞时间。

我把这两个耗时操作放入一个新线程中,只要请求一来,就创建一条新线程去处理这两步操作。和先前的方式比对之后发现,确实响应速度提高了不少!

但仔细一想,发现不妙。这种方式没办法控制线程的数量,当访问量很高的情况下,线程数量将会无限增加,这时候会搞垮服务器的!

所以需要一个机制来管理所有的线程,于是我就设计了一个消息队列模型。

模型设计

这里写图片描述
这里写图片描述

这个模型很简单,由一个任务队列和多个工作线程组成。生产者只需不停地往任务队列中添加任务,消费者(工作线程)不停地从任务队列的另一端取任务执行。

这个模型在项目中的应用是这样的:当一个请求被Filter拦截后,Filter从请求中获取用户的各项信息,然后把这些信息封装成一个任务对象,扔给任务队列,此刻这个Filter的使命就完成了,它完全不用管任务的执行过程。工作线程会不停地从任务队列中取任务执行。

类图设计

这里写图片描述
这里写图片描述

从代码层面来看,整个消息队列由三个类构成:

消息队列类MsgQueue

这个类管理整个消息队列的运行,是主控程序,它包含以下方法:

  • init:初始化整个消息队列 在初始化过程中,它会依次做以下事情:
    1. 创建一个任务队列
    2. 调用initWorkThread函数,创建指定数量的工作线程(工作线程一旦被创建,就会不停地读取任务队列中的任务)
    3. 调用loadTask函数,从数据库中加载所有任务
  • loadTask:加载数据库中的所有任务 这是一个抽象函数,若要使用这个消息队列,必须实现这个函数。 消息队列初始化的时候会调用这个函数,从数据库中加载上次没有执行完的任务。 作为消息队列来讲,它并不知道你提供的任务是啥,因此它没办法知道你的任务应该存在哪里,以何种形式存储?因此,这个过程就需要让消息队列使用者自己去实现。
  • saveTask:持久化当前任务队列中的任务 这也是个抽象函数,若要使用这个消息队列,也必须实现这个函数。 当使用者调用消息队列的stop函数时,它会被执行,用于存储当前消息队列中尚未被执行的任务,并且在下次启动消息队列的时候通过loadTask函数再次加载进任务队列,这样能确保所有任务不会被遗漏。
  • addTask:向任务队列添加一个任务
  • stop:停止所有工作线程
  • initWorkThread:初始化所有工作线程 这是一个私有函数,当初始化整个消息队列的时候被init函数调用。

工作线程类WorkThread

工作线程会不断地检查任务队列中是否有任务,若有任务,就会取一个任务执行;若没有任务,就会等待一定时间后再次检查。 它是MsgQueue的一个内部类。因为WorkThread的行为完全由MsgQueue管理,外界不需要知道它的存在。

任务类Task

它是一个接口,并且只有一个函数run,用于封装任务具体的执行过程。

附上代码

以下代码还没将消息队列单独抽象出来,相当于是一个专门用于IP向省市县转化的消息队列,有空把它整一下。 代码中有详细的注释来解释线程安全性问题。

  • 消息队列主控程序
package com.sdata.foundation.web.filter;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Map;

import javax.servlet.FilterConfig;
import javax.servlet.ServletContext;

import org.apache.log4j.Logger;
import org.springframework.web.context.support.WebApplicationContextUtils;
import org.springframework.web.context.support.XmlWebApplicationContext;

import com.sdata.foundation.web.service.util.DelDataUtilService;
import com.sdata.foundation.web.service.util.InsertDataUtilService;
import com.sdata.foundation.web.service.util.QueryDataUtilService;
import com.thinkgem.jeesite.modules.sys.service.LogService;

/**
 * 记录请求IP的消息队列
 * @author Chai
 *
 */
public class RecordLocationMQ {
    // 工作线程的个数
    private static int MaxWorkThread;
    // 工作线程队列
    private static List<WorkThread> workThreadQueue = new ArrayList<WorkThread>();
    // 任务队列(存放等待执行的任务)
    private static List<RecordLocationTask> msgQueue = new ArrayList<RecordLocationTask>();
    // 控制所有工作线程的运行与否
    private static boolean isRunning = true;
    private static LogService LogService;
    // 数据库查询的service(用于任务的持久化)
    private static QueryDataUtilService QueryService;
    // 数据库删除的service(用于任务的持久化)
    private static DelDataUtilService DelService;
    // 数据库插入的service(用于任务的持久化)
    private static InsertDataUtilService InsertService;
    // 日志
    private static final Logger logger = Logger.getLogger(RecordLocationMQ.class);

    // 一些常量
    private static final int SUCCESS = 1;
    private static final int FAIL = 0;



    /**
     * 本消息队列的初始化函数
     * @param config 用于获取数据库操作的service
     */
    public static void init (  FilterConfig config ) {
        RecordLocationMQ.init( 10, config );
    }


    /**
     * 本消息队列的初始化函数
     * @param MaxWorkThread 工作线程的个数
     * @param config 用于获取数据库操作的service
     */
    public static void init ( int MaxWorkThread, FilterConfig config ) {

        RecordLocationMQ.MaxWorkThread = MaxWorkThread;

        // 初始化LogService
        if (null == RecordLocationMQ.LogService) {
            ServletContext sc = config.getServletContext();
            XmlWebApplicationContext cxt = (XmlWebApplicationContext) WebApplicationContextUtils.getWebApplicationContext(sc);
            if (cxt != null && cxt.getBean("logService") != null) {
                RecordLocationMQ.LogService = (LogService) cxt.getBean("logService");
            }
        }

        // 初始化QueryService
        if (null == RecordLocationMQ.QueryService) {
            ServletContext sc = config.getServletContext();
            XmlWebApplicationContext cxt = (XmlWebApplicationContext) WebApplicationContextUtils.getWebApplicationContext(sc);
            if (cxt != null && cxt.getBean("queryDataUtilService") != null) {
                RecordLocationMQ.QueryService = (QueryDataUtilService) cxt.getBean("queryDataUtilService");
            }
        }

        // 初始化DelService
        if (null == RecordLocationMQ.DelService) {
            ServletContext sc = config.getServletContext();
            XmlWebApplicationContext cxt = (XmlWebApplicationContext) WebApplicationContextUtils.getWebApplicationContext(sc);
            if (cxt != null && cxt.getBean("delDataUtilService") != null) {
                RecordLocationMQ.DelService = (DelDataUtilService) cxt.getBean("delDataUtilService");
            }
        }

        // 初始化InsertService
        if (null == RecordLocationMQ.InsertService) {
            ServletContext sc = config.getServletContext();
            XmlWebApplicationContext cxt = (XmlWebApplicationContext) WebApplicationContextUtils.getWebApplicationContext(sc);
            if (cxt != null && cxt.getBean("insertDataUtilService") != null) {
                RecordLocationMQ.InsertService = (InsertDataUtilService) cxt.getBean("insertDataUtilService");
            }
        }

        // 从DB中加载尚未完成的任务
        // PS:在新线程中执行,防止tomcat启动时间过长
        new RecordLocationMQ().new loadTaskThread().start();

        // 初始化工作线程,并开始工作
        initWorkThread( MaxWorkThread, workThreadQueue );
    }


    /**
     * 初始化工作线程,并开始工作
     * @param maxWorkThread 工作线程数量
     * @param workThreadQueue 工作线程队列
     */
    private static void initWorkThread(int maxWorkThread, List<WorkThread> workThreadQueue) {
        for ( int i=0; i<maxWorkThread; i++ ) {
            WorkThread workThread = new RecordLocationMQ().new WorkThread("WorkThread"+(i+1));
            workThreadQueue.add( workThread );
            workThread.start();
            System.out.println("已开启线程:WorkThread"+(i+1));
        }
    }


    /**
     * 从DB中加载尚未完成的任务
     * 并插入传入的消息队列中
     * @param msgQueue
     * @param logger 
     * @param logService 
     */
    private static void loadTask ( List<RecordLocationTask> msgQueue, QueryDataUtilService QueryService, DelDataUtilService DelService ) {

        String querySQL = "select * from sys_log_temp";
        String delSQL = "delete from sys_log_temp";

        // 查询DB中的任务
        try {
            List<Map<String, Object>> queryResultList = QueryService.query( Arrays.asList( querySQL ) );
            for ( Map<String,Object> map : queryResultList ) {

                String ip = map.get("ip").toString();
                String logId = map.get("log_id").toString();

                if ( null!=ip && null!=logId ) {
                    RecordLocationMQ.addTask( new RecordLocationTask(ip, logId, LogService ) );
                }
            }
        } 
        // 查询失败,不能执行delte操作
        catch (Exception e) {
            e.printStackTrace();
            return;
        }

        // 清空DB中的任务
        DelService.del( Arrays.asList( delSQL ) );
    }


    /**
     * 持久化当前任务队列
     */
    public static void saveTask () {
        // PS1:为什么要使用同步?
        //    Java不允许在遍历集合过程中新增/删除元素, 
        //    因此在遍历任务队列前必须要先冻结任务队列,
        //    防止其他线程新增/删除元素;
        //    此外,为了冻结任务队列,就必须使用msgQueue锁。
        // PS2:为何要先使isRunning为false?
        //     将isRunning设为false能立即停止所有工作线程(见PS3),
        //     从而所有工作线程都将释放msgQueue锁,
        //     从而确保这里的同步块能顺利拿到msgQueue锁。
        //     若不执行isRunning = false的话,
        //     所有工作线程就会继续执行,
        //     如果任务队列为空,工作线程就会一直持有msgQueue锁,并等待任务的到来,
        //     然而添加任务的功能得在当前同步块执行完成后才会执行,
        //     因此就出现了死锁。
        // PS3:为何将isRunning设为false就能立即停止所有工作线程?
        //     因为isRunning为工作线程的共享资源,
        //     并且工作线程的运行依赖于它的值;
        //     因此当isRunning设为false后,
        //     工作线程执行完当前任务或发现任务队列为空后,就会纷纷停止。
        isRunning = false;
        synchronized ( msgQueue ) {
            for ( RecordLocationTask task : msgQueue ) {
                if ( task!=null && !task.persisted ) {
                    int result = InsertService.insert( Arrays.asList( "insert into sys_log_temp (id, ip, log_id) values('"+new Date().getTime()+"', '"+task.getIp()+"', '"+task.getLogId()+"')" ) );
                    if ( result == SUCCESS ) {
                        task.persisted = true;
                    }
                }
            }
            isRunning = true;
        }
    }

    /**
     * 向任务队列中添加一个任务
     * @param task 任务对象
     */
    public static void addTask ( RecordLocationTask task ) {

        // 添加任务
        // PS:加判断的原因:由于当前这个类的这个函数是提供给别人使用的,
        //    我们没办法保证别人一定会传入一个非空的task,
        //    因此加个判断能提高程序的健壮性。
        if ( task!=null ) {
            // PS1:加同步块的原因:由于msgQueue是ArrayList类型,
            //     ArrayList所有函数都是线程不安全的,
            //     这里加一个同步块使add函数具有原子性。
            // PS2:千万不能使用msgQueue作为锁!
            //     因为工作线程获取一个任务的过程,使用的锁就是msgQueue,
            //     并且在这个过程中,如果任务队列为空就会一直循环等待,
            //     因此在等待的过程中工作线程就一直占用的msgQueue锁;
            //     然而如果这里添加任务还需要msgQueue锁,那么就会出现死锁,
            //     工作线程因为任务队列为空就一直占用着msgQueue锁,
            //     而添加任务的进程获取不到msgQueue锁就无法添加任务。
            synchronized ( new Object() ) {
                msgQueue.add(task);
                // System.out.println("向消息队列添加了一条task!");
            }
        }

        // 持久化任务队列
        // PS:不使用同步的原因:这里对于数据的实时性要求没那么高。
        if ( msgQueue.size() > 100 ) { 
            saveTask();
        }
    }


    public static void stop () {
        isRunning = false;
    }


    /**
     * 工作线程内部类
     */
    private class WorkThread extends Thread {

        public WorkThread ( String threadName ) {
            super(threadName);
        }

        @Override
        public void run() {
            RecordLocationTask task = null;

            while ( isRunning ) {
                // 获取一个任务
                synchronized ( msgQueue ) {
                    // 任务队列为空,则等待
                    while ( isRunning && msgQueue.isEmpty() ) {
                        // System.out.println("消息队列为空!");
                        try {
                            Thread.sleep(200);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                    // 取一个任务
                    // PS:加判断的原因:上述while循环的结束有两种可能:
                    //              1.msgQueue不为空;
                    //              2.isRunning为false
                    // 因此要加判断排除msgQueue为空,但isRunning为false的情况,
                    // 防止msgQueue.remove时出现空指针!
                    if ( !msgQueue.isEmpty() ) {
                        task = msgQueue.remove(0);
                    }
                    // System.out.println(this.getName() + "取了一个task!");
                }

                // 执行任务
                // PS1:加try-catch的原因:捕获任务执行过程中发生的一切异常,  
                //   只要发生异常,就说明该任务执行失败,
                //   因此需要把它重新放进任务队列等待下一次执行。
                // PS2:加判断的原因:同上述“取一个任务”加判断的原因一样。
                try {
                    if ( task!=null ) {
                        task.run();
                    }
                } catch (Exception e) {
                    // e.printStackTrace();
                    RecordLocationMQ.addTask( task ); // 使用addTask函数添加,统一添加的入口
                }
            }
        }
    }

    /**
     * 从数据库加载任务的内部类
     */
    private class loadTaskThread extends Thread {

        @Override
        public void run() {

            String querySQL = "select * from sys_log_temp";
            String delSQL = "delete from sys_log_temp";

            // 查询DB中的任务
            try {
                List<Map<String, Object>> queryResultList = QueryService.query( Arrays.asList( querySQL ) );
                for ( Map<String,Object> map : queryResultList ) {

                    String ip = map.get("ip").toString();
                    String logId = map.get("log_id").toString();

                    if ( null!=ip && null!=logId ) {
                        RecordLocationMQ.addTask( new RecordLocationTask(ip, logId, LogService ) );
                    }
                }
            } 
            // 查询失败,不能执行delte操作
            catch (Exception e) {
                e.printStackTrace();
                return;
            }

            // 清空DB中的任务
            DelService.del( Arrays.asList( delSQL ) );

        }
    }

    // 禁用构造函数
    private RecordLocationMQ () {}
}
  • 任务接口
package com.sdata.foundation.web.filter;

public interface Task {
    public void run() throws Exception;
}
  • 用于IP向省市县转化的任务线程
package com.sdata.foundation.web.filter;

import java.util.Date;

import org.apache.log4j.Logger;

import com.thinkgem.jeesite.common.utils.IdGen;
import com.thinkgem.jeesite.modules.sys.entity.Log;
import com.thinkgem.jeesite.modules.sys.service.LogService;

public class RecordLocationTask implements Task {
    private static final Logger logger = Logger.getLogger(RecordLocationTask.class);
    private LogService logService;
    private String ip;
    private String logId;
    // 
    public boolean persisted = false;

    public RecordLocationTask(String ip, String logId, LogService logService ) {
        super();
        this.logService = logService;
        this.ip = ip;
        this.logId = logId;
    }

    @Override
    public void run() throws Exception {

        // 查询IP
        if ( (new Date().getTime() - TransferIPTool.lastOperaTime) < 100 ){
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        String location = TransferIPTool.transferIP(ip);

        // 更新log
        Log log = new Log();
        log.setIsNewRecord(false);
        log.setId(logId);
        log.setLocation(location);
        logService.save(log);

        System.out.println("完成一个task!");

    }

    public String getIp() {
        return ip;
    }

    public String getLogId() {
        return logId;
    }

}
  • 用于IP向省市县转化的工具类
package com.sdata.foundation.web.filter;

import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;

import com.alibaba.fastjson.JSONObject;
import com.sdata.foundation.web.service.util.HTTPHelper;

/**
 * 
 * @author Chai
 * 本类用于将IP转化为位置信息
 */
public class TransferIPTool {

    // 本次请求taobao接口的开始时间
    public static long lastOperaTime = new Date().getTime();
    // 模拟IP队列
    private static List<String> IPList = Arrays.asList("49.65.250.135","115.28.217.42","114.80.166.240","122.92.218.0","218.28.191.23","218.12.41.179","221.239.16.227","59.108.49.35","124.117.66.0","218.21.128.31","116.52.147.50");
    // IP转换接口
    private static final String RequestIP = "http://ip.taobao.com/service/getIpInfo.php";


    /**
     * 将IP转化为省份
     * @param ip
     * @return 省份字符串
     * @throws Exception 
     */
    public static String transferIP ( String ip ) throws Exception {

        // 记录本次请求taobao接口的开始时间
        TransferIPTool.lastOperaTime = new Date().getTime();

        // 打乱IPList
        Collections.shuffle( IPList );

        String resultJsonStr = HTTPHelper.executeGet(RequestIP + "?ip=" + IPList.get(0));
        JSONObject resultJsonObj = JSONObject.parseObject( resultJsonStr );
        JSONObject data = resultJsonObj.getJSONObject("data");
        return data.getString("region");

    }

}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2016年12月23日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 写在前面
  • 需求的由来
  • 初步设计
  • 模型设计
  • 类图设计
    • 消息队列类MsgQueue
      • 工作线程类WorkThread
        • 任务类Task
        • 附上代码
        相关产品与服务
        消息队列 CMQ 版
        消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档