前面几篇梳理了Elasticsearch的初始化流程以及Node节点的创建和初始化流程。在创建Node节点时,我们创建了Elasticsearch的线程池,本篇我们就来分析下线程池的一些细节。
我们先来看一看ThreadPool的签名:public class ThreadPool implements Scheduler,ThreadPool实现了Scheduler,我们就来围绕Scheduler和ThreadPool中的方法和属性进行分析。首先来看一个类的结构图:
接下来分别对Scheduler和ThreadPool进行分析。
先来看一下Scheduler的类图:
Scheduler是一个可以单独调度和周期性命令调度的定时器,它有很多个方法,主要用于定时和调度来使用,下面来对每个方法进行分析:
直接来看代码:
static ScheduledThreadPoolExecutor initScheduler(Settings settings) { // 是ScheduledThreadPoolExecutor的子类,添加了自定义的ThreadFactory和一个拒绝策略 final ScheduledThreadPoolExecutor scheduler = new SafeScheduledThreadPoolExecutor(1, EsExecutors.daemonThreadFactory(settings, "scheduler"), new EsAbortPolicy()); // 设置scheduler在关闭时是否执行已经存在的延迟任务 scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); // 设置在关闭scheduler时是否继续执行已经存在的周期性任务 scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); // 在取消时是否清除 scheduler.setRemoveOnCancelPolicy(true); return scheduler; }
这个scheduler其实是对ScheduledThreadPoolExecutor进行了一层包装,添加了自定义的ThreadFactory和任务溢出时的拒绝策略。有一点是需要关注的,那就是队列的长度,我们看下它父类的构造方法:
public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue(), threadFactory, handler); }
可以看出队列的长度为Integer.MAX_VALUE,而触发线程池reject的条件有三个,这个我们主要看下java.util.concurrent.ThreadPoolExecutor#execute:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get();//ctl的值在添加worker线程的时候会改变,在线程关闭导致worker数量减少时也会改变,它不同的位保存线程池的不同状态,高位维护着线程池的runState if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
主要有三个条件:
/** * 内部常用的属性名称 都是一些常量 */ public static class Names { public static final String SAME = "same"; public static final String GENERIC = "generic"; public static final String LISTENER = "listener"; public static final String GET = "get"; public static final String ANALYZE = "analyze"; public static final String WRITE = "write"; public static final String SEARCH = "search"; public static final String SEARCH_THROTTLED = "search_throttled"; public static final String MANAGEMENT = "management"; public static final String FLUSH = "flush"; public static final String REFRESH = "refresh"; public static final String WARMER = "warmer"; public static final String SNAPSHOT = "snapshot"; public static final String FORCE_MERGE = "force_merge"; public static final String FETCH_SHARD_STARTED = "fetch_shard_started"; public static final String FETCH_SHARD_STORE = "fetch_shard_store"; }
主要是一些常用的名称常量。
/** * 线程池类型枚举 */ public enum ThreadPoolType { DIRECT("direct"), FIXED("fixed"), FIXED_AUTO_QUEUE_SIZE("fixed_auto_queue_size"), // TODO: remove in 9.0 SCALING("scaling");
private final String type;
public String getType() { return type; }
ThreadPoolType(String type) { this.type = type; }
// 以type为key,以ThreadPoolType为value private static final Map<String, ThreadPoolType> TYPE_MAP = Arrays.stream(ThreadPoolType.values()).collect(Collectors.toUnmodifiableMap(ThreadPoolType::getType, Function.identity()));
/** * 通过type获取对应的枚举对象 * @param type * @return */ public static ThreadPoolType fromType(String type) { ThreadPoolType threadPoolType = TYPE_MAP.get(type); if (threadPoolType == null) { throw new IllegalArgumentException("no ThreadPoolType for " + type); } return threadPoolType; } }
这个是一个线程池类型枚举。
// Names中的常用的名称常量为key,ThreadPoolType枚举类型为value的map public static final Map<String, ThreadPoolType> THREAD_POOL_TYPES = Map.ofEntries( entry(Names.SAME, ThreadPoolType.DIRECT), entry(Names.GENERIC, ThreadPoolType.SCALING), entry(Names.LISTENER, ThreadPoolType.FIXED), entry(Names.GET, ThreadPoolType.FIXED), entry(Names.ANALYZE, ThreadPoolType.FIXED), entry(Names.WRITE, ThreadPoolType.FIXED), entry(Names.SEARCH, ThreadPoolType.FIXED), entry(Names.MANAGEMENT, ThreadPoolType.SCALING), entry(Names.FLUSH, ThreadPoolType.SCALING), entry(Names.REFRESH, ThreadPoolType.SCALING), entry(Names.WARMER, ThreadPoolType.SCALING), entry(Names.SNAPSHOT, ThreadPoolType.SCALING), entry(Names.FORCE_MERGE, ThreadPoolType.FIXED), entry(Names.FETCH_SHARD_STARTED, ThreadPoolType.SCALING), entry(Names.FETCH_SHARD_STORE, ThreadPoolType.SCALING), entry(Names.SEARCH_THROTTLED, ThreadPoolType.FIXED)); // 存放执行器的map private final Map<String, ExecutorHolder> executors; // 线程池信息 private final ThreadPoolInfo threadPoolInfo; // 专门用于缓存时间信息的线程 private final CachedTimeThread cachedTimeThread; // 用于直接执行的线程池 static final ExecutorService DIRECT_EXECUTOR = EsExecutors.newDirectExecutorService(); // 线程上下文 private final ThreadContext threadContext; // 执行器构造器 @SuppressWarnings("rawtypes") private final Map<String, ExecutorBuilder> builders; // 定时执行器 private final ScheduledThreadPoolExecutor scheduler;
@SuppressWarnings("rawtypes") public Collection<ExecutorBuilder> builders() { return Collections.unmodifiableCollection(builders.values()); } // 估计,评估的时间间隔 用于cachedTimeThread public static Setting<TimeValue> ESTIMATED_TIME_INTERVAL_SETTING = Setting.timeSetting("thread_pool.estimated_time_interval", TimeValue.timeValueMillis(200), TimeValue.ZERO, Setting.Property.NodeScope);
还是和之前一样,直接上代码:
public ThreadPool(final Settings settings, final ExecutorBuilder<?>... customBuilders) { // 确保node.name在配置中存在 assert Node.NODE_NAME_SETTING.exists(settings);
/*
* 提示:该行代码过长,系统自动注释不进行高亮。一键复制会移除系统注释
* final Map<String, ExecutorBuilder> builders = new HashMap<>(); //从配置中获取可用的线程数 final int availableProcessors = EsExecutors.numberOfProcessors(settings); // 核数的一半 控制在1到5之间 final int halfProcMaxAt5 = halfNumberOfProcessorsMaxFive(availableProcessors); // 核数的一半 控制在1到10之间 final int halfProcMaxAt10 = halfNumberOfProcessorsMaxTen(availableProcessors); // 线程池的最大线程数 = 核数的4倍 (它的值在128到512之间) final int genericThreadPoolMax = boundedBy(4 * availableProcessors, 128, 512); // 一般的线程池的ExecutorBuilder builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30))); // 写线程池的ExecutorBuilder builders.put(Names.WRITE, new FixedExecutorBuilder(settings, Names.WRITE, availableProcessors, 200, false)); // 用于GET的线程池的ExecutorBuilder builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, availableProcessors, 1000, false)); // 用于解析的ExecutorBuilder builders.put(Names.ANALYZE, new FixedExecutorBuilder(settings, Names.ANALYZE, 1, 16, false)); // 用于搜索的线程池的ExecutorBuilder builders.put(Names.SEARCH, new FixedExecutorBuilder(settings, Names.SEARCH, searchThreadPoolSize(availableProcessors), 1000, true)); // 猜测是搜索线程处理能力不足时使用的 builders.put(Names.SEARCH_THROTTLED, new FixedExecutorBuilder(settings, Names.SEARCH_THROTTLED, 1, 100, true)); // 用于管理的 builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5))); // no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded // the assumption here is that the listeners should be very lightweight on the listeners side // 没有指定queue的长度意味着客户端需要处理listener 队列的reject方法即使操作能够成功。这里的假设就是所有的监听器在监听器端都是轻量级的 builders.put(Names.LISTENER, new FixedExecutorBuilder(settings, Names.LISTENER, halfProcMaxAt10, -1, false)); // 用于flush的ExecutorBuilder builders.put(Names.FLUSH, new ScalingExecutorBuilder(Names.FLUSH, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))); // 用于refresh 的ExecutorBuilder builders.put(Names.REFRESH, new ScalingExecutorBuilder(Names.REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5))); // 用于预热的ExecutorBuilder builders.put(Names.WARMER, new ScalingExecutorBuilder(Names.WARMER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))); // 快照用的 builders.put(Names.SNAPSHOT, new ScalingExecutorBuilder(Names.SNAPSHOT, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))); // 获取分片 builders.put(Names.FETCH_SHARD_STARTED, new ScalingExecutorBuilder(Names.FETCH_SHARD_STARTED, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5))); // 强制merge builders.put(Names.FORCE_MERGE, new FixedExecutorBuilder(settings, Names.FORCE_MERGE, 1, -1, false)); // 获取分片存储 builders.put(Names.FETCH_SHARD_STORE, new ScalingExecutorBuilder(Names.FETCH_SHARD_STORE, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5))); // 遍历用户传入的builder,然后放入builders列表中 for (final ExecutorBuilder<?> builder : customBuilders) { if (builders.containsKey(builder.name())) { throw new IllegalArgumentException("builder with name [" + builder.name() + "] already exists"); } builders.put(builder.name(), builder); } // 包装成只读map,赋值给属性 this.builders = Collections.unmodifiableMap(builders); // 初始化线程上下文 threadContext = new ThreadContext(settings); // 维护线程池和信息的map final Map<String, ExecutorHolder> executors = new HashMap<>(); // 遍历builders列表 for (final Map.Entry<String, ExecutorBuilder> entry : builders.entrySet()) { final ExecutorBuilder.ExecutorSettings executorSettings = entry.getValue().getSettings(settings); // 通过org.elasticsearch.threadpool.ExecutorBuilder.build方法生成ExecutorHolder对象 final ExecutorHolder executorHolder = entry.getValue().build(executorSettings, threadContext); if (executors.containsKey(executorHolder.info.getName())) { throw new IllegalStateException("duplicate executors with name [" + executorHolder.info.getName() + "] registered"); } logger.debug("created thread pool: {}", entry.getValue().formatInfo(executorHolder.info)); // 将builders中的ExecutorBuilder转化到executors中 executors.put(entry.getKey(), executorHolder); } // 添加DIRECT_EXECUTOR,比如传入的是一个Runnable对象,DIRECT_EXECUTOR的意思就是直接调用Runnable的run方法,@see org.elasticsearch.common.util.concurrent.EsExecutors.DirectExecutorService.execute executors.put(Names.SAME, new ExecutorHolder(DIRECT_EXECUTOR, new Info(Names.SAME, ThreadPoolType.DIRECT))); // 将executors包装成只读的map this.executors = unmodifiableMap(executors); // 过滤掉name为same的,将executors中其它的放到一个列表中 final List<Info> infos = executors .values() .stream() .filter(holder -> holder.info.getName().equals("same") == false) .map(holder -> holder.info) .collect(Collectors.toList()); // 给整个线程池信息赋值 this.threadPoolInfo = new ThreadPoolInfo(infos); // 设置调度器 this.scheduler = Scheduler.initScheduler(settings); TimeValue estimatedTimeInterval = ESTIMATED_TIME_INTERVAL_SETTING.get(settings); // 创建缓存时间的线程 this.cachedTimeThread = new CachedTimeThread(EsExecutors.threadName(settings, "[timer]"), estimatedTimeInterval.millis()); // 启动缓存时间的这个线程 this.cachedTimeThread.start(); }
*/
具体的细节直接看代码注释就很清楚了,我们来看下里面涉及到的几个类:
这个是一个工具类,类似于jdk里面的Executors工具类,里面主要是一些静态方法用于快速创建和维护线程池的相关功能。
顾名思义,org.elasticsearch.threadpool.FixedExecutorBuilder用于构造固定数量的线程池,org.elasticsearch.threadpool.ScalingExecutorBuilder用于构建数量可伸缩的线程池。
static class ExecutorHolder { // 内部的线程池 private final ExecutorService executor; public final Info info;
ExecutorHolder(ExecutorService executor, Info info) { assert executor instanceof EsThreadPoolExecutor || executor == DIRECT_EXECUTOR; this.executor = executor; this.info = info; }
ExecutorService executor() { return executor; } }
org.elasticsearch.threadpool.ThreadPool.Info的主要代码为:
/** * 线程池信息 */ public static class Info implements Writeable, ToXContentFragment {
private final String name; private final ThreadPoolType type; private final int min; private final int max; private final TimeValue keepAlive; private final SizeValue queueSize; ---------省略部分代码--------
其中的方法主要是用于维护上述的属性和线程池中的一些信息,熟悉了上面的分析之后,方法的使用便很容易理解了,这里也不再赘述了。