模拟Executor策略的实现如何控制执行顺序?怎么限制最大同时开启线程的个数?为什么要有一个线程来将结束的线程移除出执行区?转移线程的时候要判断线程是否为空遍历线程的容器会抛出ConcurrentM

Executor作为现在线程的一个管理工具,就像管理线程的管理器一样,不用像以前一样,通过start来开启线程 Executor将提交线程执行线程分离开来,使得用户只需要提交线程,并不需要在乎怎么和什么时候开启线程

需要有以下功能: 1.查看现在开启了哪些进程 2.查看还有哪些进程未执行 3.查看现在开启线程的数量 4.查看还有多少线程未开启 5.设置执行顺序(先提交先执行,先提交后执行) 6.限制最大同时开启线程的个数 7.目前提交的线程执行完之后,关闭管理器(此过程中不允许再提交线程) 8.立即关闭管理器(正在执行的线程也立即停止)


实现原理

Executor管理器将提交上来的线程放入线程等待区(一个LinkedList),当线程执行区中有空位时,控制线程1就会将线程等待区中的线程移除转移到线程执行区(一个LinkedList)。接着,控制线程2就会开启线程执行区中未开启的线程(start)。等到线程执行区中的线程跑完了,控制线程3就会把它从线程执行区移除出去


代码实现

import java.util.*;
import java.util.concurrent.*;
public class MyExecutor{
    //静态常量用于决定执行顺序
    public static final String FIFO="FIFO";//先进先出
    public static final String LIFO="LIFO";//后进先出
    //public static final int PRIORITY=2;//优先级
    //建立线程池
    private LinkedList<Thread> waitinglist;
    private String order;//储存指定顺序
    private int maxThreadRun;//储存最大并发运行线程数量
    //建立执行队列
    private LinkedList<Thread> runningList;
    //建立三个线程来控制Executor的运行
    private Thread checkThread;
    private Thread readyThread;
    private Thread runThread;
    //建立一个标记用来控制管理器的开关
    private boolean isShutdown=false;//关闭为true,开启为false
    private boolean isShutdownNow=false;//关闭为true,开启为false




    //构造函数
    public MyExecutor()throws IllegalArgumentException{
        this(MyExecutor.FIFO,10);//默认执行顺序为先进先出,默认最大并行线程数量为10
        }

    public MyExecutor(String order)throws IllegalArgumentException{
        this(order,10);//默认最大并行线程数量为10
        }

    public MyExecutor (String order,int maxThreadRun) throws IllegalArgumentException{
        if((order.equals(FIFO)||order.equals(LIFO))&&maxThreadRun>=1){
            this.order=order;
            waitinglist=new LinkedList<Thread>();
            this.maxThreadRun=maxThreadRun;
            runningList=new LinkedList<Thread>();
            checkThread=new CheckThread(this);
            readyThread=new ReadyThread(this);
            runThread=new RunThread(this);
            checkThread.start();
            readyThread.start();
            runThread.start();
            }
        else{
            throw new IllegalArgumentException("参数order只能为'FIFO'或'LIFO'");
            }
        }



        //提交任务
        public void execute(Thread task)throws RejectedExecutionException{
            if(state==true){
                waitinglist.offer(task);
                //count++;
                }
            else{
                throw new RejectedExecutionException("管理器已经关闭,不能再提交任务");
                }
        }
        public void execute(Runnable command)throws IllegalArgumentException,RejectedExecutionException{
            this.execute(new Thread(command));
            }


        //将线程池的任务送进执行队列
        void ready(){
            while(runningList.size()<maxThreadRun){
                if(order.equals(FIFO)){
                        if(waitinglist.peekFirst()!=null)
                            runningList.offer(waitinglist.pollFirst());
                }
                else{
                        if(waitinglist.peekLast()!=null)
                        runningList.offer(waitinglist.pollLast());
                }
                }
            }


        //将执行队列中的线程start
        void go(){
            try{
            for(Thread thread:runningList){
                if(thread.getState()==Thread.State.NEW){
                    thread.start();
                    }
                }
                }
                catch(Exception e){
                    //e.printStackTrace();
                    }
            }

        //检测已经结束的线程
        void isTerminated(){
            try{
            for(Thread thread:runningList){
                if(thread.getState()==Thread.State.TERMINATED){
                    runningList.remove(thread);

                    }
                }
                }
                catch(Exception e){
                    //e.printStackTrace();
                    }
            }



    //启动一次顺序关闭,执行以前提交的任务,但不接受新任务
    public void shutdown(){
        isShutdown=true;
        }


    //试图停止所有正在执行的活动任务
    public void shutdownNow(){
        isShutdownNum=true;
        for(Thread thread:waitinglist){
            thread.interrupt();
            }
        for(Thread thread:runningList){
            thread.interrupt();
            }
        }

    //获取管理器状态
    public boolean isShutdown(){
        return isShutdown;
        }
    public boolean isShutdownNow(){
        return isShutdownNow;
        }


    //获取正在等待的线程名字
    public String getWaitingThreadName(){
        return waitinglist.toString();
        }

    //查看现在在执行的线程的名字
    public String getRunningThreadName(){
        return runningList.toString();
        }

    //查看还有多少个线程在等待
    public int getWaitingThreadNum(){
        return waitinglist.size();
        }


    //获取正在执行的线程数量
    public int getRunningThreadNum(){
        return runningList.size();
        }

    //查看管理器中的线程空了没有
    public boolean isEmpty(){
        return (getWaitingThreadNum()==0&&getRunningThreadNum()==0)?true:false;
        }
    }
//三个控制线程的代码
//建立一个线程用来检测runningList中的线程是否已经结束了
class CheckThread extends Thread{
    private MyExecutor executor;
    CheckThread(MyExecutor executor){
        this.executor=executor;
        setName("CheckThread");
        this.setPriority(MAX_PRIORITY);//将线程优先级设置到最高
        }
    public void run(){
        while(!(executor.isShutdown()&&executor.getWaitingThreadNum==0)&&!executor.isShutdownNow()){//当管理器关闭就跳出循环
            executor.isTerminated();
            Thread.yield();
            }
        }
    }

//建立一个线程用来将提交的线程送进执行队列
class ReadyThread extends Thread{
    private MyExecutor executor;
    ReadyThread(MyExecutor executor){
            this.executor=executor;
            setName("ReadyThread");
            this.setPriority(MAX_PRIORITY);//将线程优先级设置到最高
        }
    public void run(){
        while(!(executor.isShutdown()&&executor.getWaitingThreadNum==0)&&!executor.isShutdownNow()){//当管理器关闭就跳出循环
            executor.ready();
            Thread.yield();
            }
        }
    }

//建立一个线程用来将执行队列中的线程开启
class RunThread extends Thread{
    private MyExecutor executor;
    RunThread(MyExecutor executor){
            this.executor=executor;
            setName("RunThread");
            this.setPriority(MAX_PRIORITY);//将线程优先级设置到最高
        }
    public void run(){
        while(!(executor.isShutdown()&&executor.getWaitingThreadNum==0)&&!executor.isShutdownNow()){//当管理器关闭就跳出循环
            executor.go();
            Thread.yield();
            }
        }
    }

几个需要解释的地方

如何控制执行顺序?

首先执行顺序在初始化的时候就需要确定,然后设置一个变量order把这个顺序储存起来 下面看看实现的代码

        //将线程池的任务送进执行队列
        void ready(){
            while(runningList.size()<maxThreadRun){
                if(order.equals(FIFO)){
                        if(waitinglist.peekFirst()!=null)
                            runningList.offer(waitinglist.pollFirst());//如果是先进先出,取出等待区中第一个线程
                }
                else{
                        if(waitinglist.peekLast()!=null)
                        runningList.offer(waitinglist.pollLast());//如果是后进先出,取出等待区中最后一个线程
                }
                }
            }

从代码上看,执行顺序实际上是在,将线程从等待区中取出到执行区的过程中控制的 先判断order,然后使用不同的poll方法(pollFirst或者是pollLast)

怎么限制最大同时开启线程的个数?

最大同时开启线程的个数也是在实例化管理器对象的时候就需要确定的(否则,默认的最大同时开启线程的个数为10个) 然后,将设置的值储存在变量maxThreadRun中 下面看看代码怎么实现

        //将线程池的任务送进执行队列
        void ready(){
            while(runningList.size()<maxThreadRun){//当执行区的大小小于最大可同时运行线程的数量时,才能放的进

从代码上看出,实际上也是将线程从等待区中取出到执行区的过程中控制的

为什么要有一个线程来将结束的线程移除出执行区?

因为!!!当执行区中的线程跑完了之后,这个线程对象仍然是在执行区中存在的,所以如果不把结束的线程移除出去,那么提交任务几毫秒后,执行区就会爆满了,不清理的话,等待区的线程也进不来


几个需要注意的地方

转移线程的时候要判断线程是否为空

代码位置:将线程从等待区中取出到执行区中的过程

//FIFO的情况
                        if(waitinglist.peekFirst()!=null)//等待区第一个位置的线程不能为空
                            runningList.offer(waitinglist.pollFirst());//如果是先进先出,取出等待区中第一个线程
//LIFO的情况
                        if(waitinglist.peekLast()!=null)//等待区最后一个位置的线程不能为空
                        runningList.offer(waitinglist.pollLast());//如果是后进先出,取出等待区中最后一个线程

为什么不能将空线程放进执行区呢? 因为这样子,空线程在执行区中start和判断这个线程是否结束的时候(getState()==Thread.State.TERMINATED),会抛出NullPointerException空指针异常,会无缘无故占领了执行区的空间,抛出异常和处理异常也会浪费时间

而且不知道为什么,如果不判断的话,会发生阻塞 我想了想,想到了一个不靠谱的解释: 在主线程提交线程给executor之前,executor一直在把空的线程丢进执行区,然后执行区一直在处理异常,等待区也一直在把空线程丢给执行区,这样子也就没有现象出现 可是这样的话,迟早也会有现象出现的,不可能一直都阻塞在那里啊??

遍历线程的容器会抛出ConcurrentModificationException异常

ConcurrentModificationException这个异常是什么呢? 当遍历线程的容器时,会发生这个异常 这个异常存在的意义是不要我们遍历线程的容器,因为如果对装有线程的容器发生修改(比如,移除啊),就会使得线程没有执行 下面看看API的解释: 某个线程在 Collection 上进行迭代时,通常不允许另一个线性修改* Collection*。通常在这些情况下,迭代的结果是不确定的

API很粗暴的,只要循环体中或者迭代器中,遍历的是Collection的时候,就会直接抛出这个异常 所以当开发的时候,没有对容器线程做出修改,那么直接处理忽视掉这个异常吧

线程一定要适当的yield()切换线程

yield()这个方法的用处是:暂停正在执行的线程,切换给别的线程跑跑 如果不用这个方法的话,会出现阻塞 正在执行的那个线程不放cpu,其他的线程也就执行不到了 可是这样子也不会发生阻塞啊,只是运行的慢一点而已

主线程不能轻易的修改执行优先级

我发现,当把主线程(main线程)的优先级改到最低或者较低,很容易出现阻塞 这是为什么捏??

当把可同时开启的线程数量调到1或2

此时又会发生阻塞了 为什么呢? 我想想的是,这样子,控制线程就需要频繁的从等待区中取出线程,也要频繁的将执行区的已结束的线程移除出去 可是这样子也不会发生阻塞啊,只是运行的慢一点而已 真烦!!

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏玄魂工作室

看代码学PHP渗透(3) - 实例化任意对象漏洞

大家好,我们是红日安全-代码审计小组。最近我们小组正在做一个PHP代码审计的项目,供大家学习交流,我们给这个项目起了一个名字叫 PHP-Audit-Labs 。...

77310
来自专栏IT派

PHP面试知识梳理

B树是为了磁盘或者其他存储设备而设计的一种多叉平衡查找树,相对于二叉树,B树的每个内节点有多个分支,即多叉。

34130
来自专栏菜鸟计划

webpack学习(六)打包压缩js和css

打包压缩js与css 由于webpack本身集成了UglifyJS插件(webpack.optimize.UglifyJsPlugin)来完成对JS与CSS的压...

79860
来自专栏阮一峰的网络日志

MIME笔记

MIME的全称是"Multipurpose Internet Mail Extensions",中译为"多用途互联网邮件扩展",指的是一系列的电子邮件技术规范,...

15040
来自专栏PHP在线

PHP 面试知识梳理

算法与数据结构 BTree和B+tree BTree B树是为了磁盘或者其他存储设备而设计的一种多叉平衡查找树,相对于二叉树,B树的每个内节点有多个分支,即多叉...

50760
来自专栏大内老A

WCF服务端运行时架构体系详解[中篇]

在这篇文章中,我们对信道分发器本身作一个深入的了解,首先来看看它具有哪些可供扩展的组件,以及我们可以针对信道分发器对WCF实现哪些可能的扩展。 目录: ...

205100
来自专栏章鱼的慢慢技术路

网络中TCP、IP、MAC、UDP的头部格式信息

46170
来自专栏IMWeb前端团队

AS3程序员小福利--as3js介绍及FlashDevelop工程的配置

本文作者:IMWeb 黄龙 原文出处:IMWeb社区 未经同意,禁止转载 ? 什么是AS3JS? AS3JS是ActionScript 3.0到Jav...

28560
来自专栏蓝天

Linux系统面面观 PROC文件系统详细介绍

什么是proc文件系统? proc文件系统是一个伪文件系统,它只存在内存当中,而不占用外存空间。它以文件系统的方式为访问系统内核数据的操作提供接口。用户和应用...

12720
来自专栏我爱编程

运维开发笔试

all(iterable) and any(iterable) all(x)如果all(x)参数x对象的所有元素不为0、''、False或者x为空对象(即所有...

25930

扫码关注云+社区

领取腾讯云代金券