前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Elasticsearch源码分析三之线程池

Elasticsearch源码分析三之线程池

作者头像
山行AI
发布2020-03-11 17:54:50
1.2K0
发布2020-03-11 17:54:50
举报
文章被收录于专栏:山行AI山行AI

前面几篇梳理了Elasticsearch的初始化流程以及Node节点的创建和初始化流程。在创建Node节点时,我们创建了Elasticsearch的线程池,本篇我们就来分析下线程池的一些细节。

我们先来看一看ThreadPool的签名:public class ThreadPool implements Scheduler,ThreadPool实现了Scheduler,我们就来围绕Scheduler和ThreadPool中的方法和属性进行分析。首先来看一个类的结构图:

接下来分别对Scheduler和ThreadPool进行分析。

1、Scheduler

先来看一下Scheduler的类图:

Scheduler是一个可以单独调度和周期性命令调度的定时器,它有很多个方法,主要用于定时和调度来使用,下面来对每个方法进行分析:

1.1、 org.elasticsearch.threadpool.Scheduler#initScheduler方法

直接来看代码:

代码语言:javascript
复制
 static ScheduledThreadPoolExecutor initScheduler(Settings settings) {        // 是ScheduledThreadPoolExecutor的子类,添加了自定义的ThreadFactory和一个拒绝策略        final ScheduledThreadPoolExecutor scheduler = new SafeScheduledThreadPoolExecutor(1,                EsExecutors.daemonThreadFactory(settings, "scheduler"), new EsAbortPolicy());        // 设置scheduler在关闭时是否执行已经存在的延迟任务        scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);        // 设置在关闭scheduler时是否继续执行已经存在的周期性任务        scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);        // 在取消时是否清除        scheduler.setRemoveOnCancelPolicy(true);        return scheduler;    }

这个scheduler其实是对ScheduledThreadPoolExecutor进行了一层包装,添加了自定义的ThreadFactory和任务溢出时的拒绝策略。有一点是需要关注的,那就是队列的长度,我们看下它父类的构造方法:

代码语言:javascript
复制
public ScheduledThreadPoolExecutor(int corePoolSize,                                       ThreadFactory threadFactory,                                       RejectedExecutionHandler handler) {        super(corePoolSize, Integer.MAX_VALUE,              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,              new DelayedWorkQueue(), threadFactory, handler);    }

可以看出队列的长度为Integer.MAX_VALUE,而触发线程池reject的条件有三个,这个我们主要看下java.util.concurrent.ThreadPoolExecutor#execute:

代码语言:javascript
复制
public void execute(Runnable command) {        if (command == null)            throw new NullPointerException();        /*         * Proceed in 3 steps:         *         * 1. If fewer than corePoolSize threads are running, try to         * start a new thread with the given command as its first         * task.  The call to addWorker atomically checks runState and         * workerCount, and so prevents false alarms that would add         * threads when it shouldn't, by returning false.         *         * 2. If a task can be successfully queued, then we still need         * to double-check whether we should have added a thread         * (because existing ones died since last checking) or that         * the pool shut down since entry into this method. So we         * recheck state and if necessary roll back the enqueuing if         * stopped, or start a new thread if there are none.         *         * 3. If we cannot queue task, then we try to add a new         * thread.  If it fails, we know we are shut down or saturated         * and so reject the task.         */        int c = ctl.get();//ctl的值在添加worker线程的时候会改变,在线程关闭导致worker数量减少时也会改变,它不同的位保存线程池的不同状态,高位维护着线程池的runState        if (workerCountOf(c) < corePoolSize) {            if (addWorker(command, true))                return;            c = ctl.get();        }        if (isRunning(c) && workQueue.offer(command)) {            int recheck = ctl.get();            if (! isRunning(recheck) && remove(command))                reject(command);            else if (workerCountOf(recheck) == 0)                addWorker(null, false);        }        else if (!addWorker(command, false))            reject(command);    }

主要有三个条件:

  • 如果当前worker线程数量小于corePoolSize时,尝试着启动一个新的核心线程并使用该线程启动这个任务。addWorker方法会自动校验运行状态和工作线程数量,如果添加成功的话就会直接返回,否则会往下执行。其中在addWorker方法中如果添加失败时会将添加失败的这个worker线程关闭。
  • 如果一个任务能够成功地进入队列(队列数量未满,针对ScheduledThreadPoolExecutor来说它的队列长度为Integer.MAX_VALUE,所以不会满足这个条件),如果成功进入队列了还需要double-check一下我们是否需要添加一个线程,因为在上一次校验时尝试创建的那个worker线程已经被关闭或者是线程池在进入这个方法时关闭了。所以这里要重新检查下状态并且在必要的时候当线程停止的时候进行出列操作并且拒绝任务或者在没有线程时启动一个新的线程(isRunning(ctl)返回false时表进线程池中线程有异常)。
  • 如果任务不能入队列,将会尝试添加一个新的非核心线程(能否创建成功还需要通过corePoolSize与maximumPoolSize等参数来决定)来处理。如果失败了,我们知道是因为线程池关闭了或者线程池饱和了,所以拒绝了这个任务。

1.2、其他方法

  • org.elasticsearch.threadpool.Scheduler#terminate与org.elasticsearch.threadpool.Scheduler#awaitTermination都是用于关闭执行器的方法;
  • org.elasticsearch.threadpool.Scheduler#scheduleWithFixedDelay与org.elasticsearch.threadpool.Scheduler#wrapAsCancellable和org.elasticsearch.threadpool.Scheduler#wrapAsScheduledCancellable方法用于对Runnable或Future进行一层包装。

org.elasticsearch.threadpool.ThreadPool类

先来看一看其中的几个内部类:

1、org.elasticsearch.threadpool.ThreadPool.Names

代码语言:javascript
复制
/**     * 内部常用的属性名称  都是一些常量     */    public static class Names {        public static final String SAME = "same";        public static final String GENERIC = "generic";        public static final String LISTENER = "listener";        public static final String GET = "get";        public static final String ANALYZE = "analyze";        public static final String WRITE = "write";        public static final String SEARCH = "search";        public static final String SEARCH_THROTTLED = "search_throttled";        public static final String MANAGEMENT = "management";        public static final String FLUSH = "flush";        public static final String REFRESH = "refresh";        public static final String WARMER = "warmer";        public static final String SNAPSHOT = "snapshot";        public static final String FORCE_MERGE = "force_merge";        public static final String FETCH_SHARD_STARTED = "fetch_shard_started";        public static final String FETCH_SHARD_STORE = "fetch_shard_store";    }

主要是一些常用的名称常量。

2、org.elasticsearch.threadpool.ThreadPool.ThreadPoolType

代码语言:javascript
复制
/**     * 线程池类型枚举     */    public enum ThreadPoolType {        DIRECT("direct"),        FIXED("fixed"),        FIXED_AUTO_QUEUE_SIZE("fixed_auto_queue_size"), // TODO: remove in 9.0        SCALING("scaling");
        private final String type;
        public String getType() {            return type;        }
        ThreadPoolType(String type) {            this.type = type;        }
        // 以type为key,以ThreadPoolType为value        private static final Map<String, ThreadPoolType> TYPE_MAP =            Arrays.stream(ThreadPoolType.values()).collect(Collectors.toUnmodifiableMap(ThreadPoolType::getType, Function.identity()));
        /**         * 通过type获取对应的枚举对象         * @param type         * @return         */        public static ThreadPoolType fromType(String type) {            ThreadPoolType threadPoolType = TYPE_MAP.get(type);            if (threadPoolType == null) {                throw new IllegalArgumentException("no ThreadPoolType for " + type);            }            return threadPoolType;        }    }

这个是一个线程池类型枚举。

ThreadPool中的属性

代码语言:javascript
复制
    // Names中的常用的名称常量为key,ThreadPoolType枚举类型为value的map    public static final Map<String, ThreadPoolType> THREAD_POOL_TYPES = Map.ofEntries(        entry(Names.SAME, ThreadPoolType.DIRECT),        entry(Names.GENERIC, ThreadPoolType.SCALING),        entry(Names.LISTENER, ThreadPoolType.FIXED),        entry(Names.GET, ThreadPoolType.FIXED),        entry(Names.ANALYZE, ThreadPoolType.FIXED),        entry(Names.WRITE, ThreadPoolType.FIXED),        entry(Names.SEARCH, ThreadPoolType.FIXED),        entry(Names.MANAGEMENT, ThreadPoolType.SCALING),        entry(Names.FLUSH, ThreadPoolType.SCALING),        entry(Names.REFRESH, ThreadPoolType.SCALING),        entry(Names.WARMER, ThreadPoolType.SCALING),        entry(Names.SNAPSHOT, ThreadPoolType.SCALING),        entry(Names.FORCE_MERGE, ThreadPoolType.FIXED),        entry(Names.FETCH_SHARD_STARTED, ThreadPoolType.SCALING),        entry(Names.FETCH_SHARD_STORE, ThreadPoolType.SCALING),        entry(Names.SEARCH_THROTTLED, ThreadPoolType.FIXED));    // 存放执行器的map    private final Map<String, ExecutorHolder> executors;    // 线程池信息    private final ThreadPoolInfo threadPoolInfo;    // 专门用于缓存时间信息的线程    private final CachedTimeThread cachedTimeThread;    // 用于直接执行的线程池    static final ExecutorService DIRECT_EXECUTOR = EsExecutors.newDirectExecutorService();    // 线程上下文    private final ThreadContext threadContext;    // 执行器构造器    @SuppressWarnings("rawtypes")    private final Map<String, ExecutorBuilder> builders;    // 定时执行器    private final ScheduledThreadPoolExecutor scheduler;
    @SuppressWarnings("rawtypes")    public Collection<ExecutorBuilder> builders() {        return Collections.unmodifiableCollection(builders.values());    }    // 估计,评估的时间间隔   用于cachedTimeThread    public static Setting<TimeValue> ESTIMATED_TIME_INTERVAL_SETTING =        Setting.timeSetting("thread_pool.estimated_time_interval",            TimeValue.timeValueMillis(200), TimeValue.ZERO, Setting.Property.NodeScope);

ThreadPool构造方法

还是和之前一样,直接上代码:

代码语言:javascript
复制
  public ThreadPool(final Settings settings, final ExecutorBuilder<?>... customBuilders) {        // 确保node.name在配置中存在        assert Node.NODE_NAME_SETTING.exists(settings);
/*
* 提示:该行代码过长,系统自动注释不进行高亮。一键复制会移除系统注释 
* final Map<String, ExecutorBuilder> builders = new HashMap<>();        //从配置中获取可用的线程数        final int availableProcessors = EsExecutors.numberOfProcessors(settings);        // 核数的一半 控制在1到5之间        final int halfProcMaxAt5 = halfNumberOfProcessorsMaxFive(availableProcessors);        // 核数的一半 控制在1到10之间        final int halfProcMaxAt10 = halfNumberOfProcessorsMaxTen(availableProcessors);        // 线程池的最大线程数  = 核数的4倍 (它的值在128到512之间)        final int genericThreadPoolMax = boundedBy(4 * availableProcessors, 128, 512);        // 一般的线程池的ExecutorBuilder        builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30)));        // 写线程池的ExecutorBuilder        builders.put(Names.WRITE, new FixedExecutorBuilder(settings, Names.WRITE, availableProcessors, 200, false));        // 用于GET的线程池的ExecutorBuilder        builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, availableProcessors, 1000, false));        // 用于解析的ExecutorBuilder        builders.put(Names.ANALYZE, new FixedExecutorBuilder(settings, Names.ANALYZE, 1, 16, false));        // 用于搜索的线程池的ExecutorBuilder        builders.put(Names.SEARCH, new FixedExecutorBuilder(settings, Names.SEARCH, searchThreadPoolSize(availableProcessors), 1000, true));        // 猜测是搜索线程处理能力不足时使用的        builders.put(Names.SEARCH_THROTTLED, new FixedExecutorBuilder(settings, Names.SEARCH_THROTTLED, 1, 100, true));        // 用于管理的        builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5)));        // no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded        // the assumption here is that the listeners should be very lightweight on the listeners side        // 没有指定queue的长度意味着客户端需要处理listener 队列的reject方法即使操作能够成功。这里的假设就是所有的监听器在监听器端都是轻量级的        builders.put(Names.LISTENER, new FixedExecutorBuilder(settings, Names.LISTENER, halfProcMaxAt10, -1, false));        // 用于flush的ExecutorBuilder        builders.put(Names.FLUSH, new ScalingExecutorBuilder(Names.FLUSH, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));        // 用于refresh 的ExecutorBuilder        builders.put(Names.REFRESH, new ScalingExecutorBuilder(Names.REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5)));        // 用于预热的ExecutorBuilder        builders.put(Names.WARMER, new ScalingExecutorBuilder(Names.WARMER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));        // 快照用的        builders.put(Names.SNAPSHOT, new ScalingExecutorBuilder(Names.SNAPSHOT, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));        // 获取分片        builders.put(Names.FETCH_SHARD_STARTED,                new ScalingExecutorBuilder(Names.FETCH_SHARD_STARTED, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5)));        // 强制merge        builders.put(Names.FORCE_MERGE, new FixedExecutorBuilder(settings, Names.FORCE_MERGE, 1, -1, false));        // 获取分片存储        builders.put(Names.FETCH_SHARD_STORE,                new ScalingExecutorBuilder(Names.FETCH_SHARD_STORE, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5)));        // 遍历用户传入的builder,然后放入builders列表中        for (final ExecutorBuilder<?> builder : customBuilders) {            if (builders.containsKey(builder.name())) {                throw new IllegalArgumentException("builder with name [" + builder.name() + "] already exists");            }            builders.put(builder.name(), builder);        }        // 包装成只读map,赋值给属性        this.builders = Collections.unmodifiableMap(builders);        // 初始化线程上下文        threadContext = new ThreadContext(settings);        // 维护线程池和信息的map        final Map<String, ExecutorHolder> executors = new HashMap<>();        // 遍历builders列表        for (final Map.Entry<String, ExecutorBuilder> entry : builders.entrySet()) {            final ExecutorBuilder.ExecutorSettings executorSettings = entry.getValue().getSettings(settings);            // 通过org.elasticsearch.threadpool.ExecutorBuilder.build方法生成ExecutorHolder对象            final ExecutorHolder executorHolder = entry.getValue().build(executorSettings, threadContext);            if (executors.containsKey(executorHolder.info.getName())) {                throw new IllegalStateException("duplicate executors with name [" + executorHolder.info.getName() + "] registered");            }            logger.debug("created thread pool: {}", entry.getValue().formatInfo(executorHolder.info));            // 将builders中的ExecutorBuilder转化到executors中            executors.put(entry.getKey(), executorHolder);        }        // 添加DIRECT_EXECUTOR,比如传入的是一个Runnable对象,DIRECT_EXECUTOR的意思就是直接调用Runnable的run方法,@see org.elasticsearch.common.util.concurrent.EsExecutors.DirectExecutorService.execute        executors.put(Names.SAME, new ExecutorHolder(DIRECT_EXECUTOR, new Info(Names.SAME, ThreadPoolType.DIRECT)));        // 将executors包装成只读的map        this.executors = unmodifiableMap(executors);        // 过滤掉name为same的,将executors中其它的放到一个列表中        final List<Info> infos =                executors                        .values()                        .stream()                        .filter(holder -> holder.info.getName().equals("same") == false)                        .map(holder -> holder.info)                        .collect(Collectors.toList());        // 给整个线程池信息赋值        this.threadPoolInfo = new ThreadPoolInfo(infos);        // 设置调度器        this.scheduler = Scheduler.initScheduler(settings);        TimeValue estimatedTimeInterval = ESTIMATED_TIME_INTERVAL_SETTING.get(settings);        // 创建缓存时间的线程        this.cachedTimeThread = new CachedTimeThread(EsExecutors.threadName(settings, "[timer]"), estimatedTimeInterval.millis());        // 启动缓存时间的这个线程        this.cachedTimeThread.start();    }
*/

具体的细节直接看代码注释就很清楚了,我们来看下里面涉及到的几个类:

  • org.elasticsearch.common.util.concurrent.EsExecutors:

这个是一个工具类,类似于jdk里面的Executors工具类,里面主要是一些静态方法用于快速创建和维护线程池的相关功能。

  • ExecutorBuilder,先看下类结构图:

顾名思义,org.elasticsearch.threadpool.FixedExecutorBuilder用于构造固定数量的线程池,org.elasticsearch.threadpool.ScalingExecutorBuilder用于构建数量可伸缩的线程池。

  • org.elasticsearch.threadpool.ThreadPool.ExecutorHolder是用于维护线程池和线程池信息的,它的结构为:
代码语言:javascript
复制
 static class ExecutorHolder {        // 内部的线程池        private final ExecutorService executor;        public final Info info;
        ExecutorHolder(ExecutorService executor, Info info) {            assert executor instanceof EsThreadPoolExecutor || executor == DIRECT_EXECUTOR;            this.executor = executor;            this.info = info;        }
        ExecutorService executor() {            return executor;        }    }

org.elasticsearch.threadpool.ThreadPool.Info的主要代码为:

代码语言:javascript
复制
 /**     * 线程池信息     */    public static class Info implements Writeable, ToXContentFragment {
        private final String name;        private final ThreadPoolType type;        private final int min;        private final int max;        private final TimeValue keepAlive;        private final SizeValue queueSize;        ---------省略部分代码--------

ThreadPool中的方法

其中的方法主要是用于维护上述的属性和线程池中的一些信息,熟悉了上面的分析之后,方法的使用便很容易理解了,这里也不再赘述了。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-02-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 开发架构二三事 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1、Scheduler
    • 1.1、 org.elasticsearch.threadpool.Scheduler#initScheduler方法
      • 1.2、其他方法
      • org.elasticsearch.threadpool.ThreadPool类
        • 先来看一看其中的几个内部类:
          • 1、org.elasticsearch.threadpool.ThreadPool.Names
          • 2、org.elasticsearch.threadpool.ThreadPool.ThreadPoolType
        • ThreadPool中的属性
          • ThreadPool构造方法
            • ThreadPool中的方法
            相关产品与服务
            Elasticsearch Service
            腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档