前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊Elasticsearch的EsThreadPoolExecutor

聊聊Elasticsearch的EsThreadPoolExecutor

原创
作者头像
code4it
修改2019-06-03 10:45:57
5090
修改2019-06-03 10:45:57
举报

本文主要研究一下Elasticsearch的EsThreadPoolExecutor

EsThreadPoolExecutor

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java

public class EsThreadPoolExecutor extends ThreadPoolExecutor {
​
    private final ThreadContext contextHolder;
    private volatile ShutdownListener listener;
​
    private final Object monitor = new Object();
    /**
     * Name used in error reporting.
     */
    private final String name;
​
    final String getName() {
        return name;
    }
​
    EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, ThreadContext contextHolder) {
        this(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new EsAbortPolicy(), contextHolder);
    }
​
    @SuppressForbidden(reason = "properly rethrowing errors, see EsExecutors.rethrowErrors")
    EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, XRejectedExecutionHandler handler,
            ThreadContext contextHolder) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        this.name = name;
        this.contextHolder = contextHolder;
    }
​
    public void shutdown(ShutdownListener listener) {
        synchronized (monitor) {
            if (this.listener != null) {
                throw new IllegalStateException("Shutdown was already called on this thread pool");
            }
            if (isTerminated()) {
                listener.onTerminated();
            } else {
                this.listener = listener;
            }
        }
        shutdown();
    }
​
    @Override
    protected synchronized void terminated() {
        super.terminated();
        synchronized (monitor) {
            if (listener != null) {
                try {
                    listener.onTerminated();
                } finally {
                    listener = null;
                }
            }
        }
    }
​
    public interface ShutdownListener {
        void onTerminated();
    }
​
    @Override
    public void execute(Runnable command) {
        command = wrapRunnable(command);
        try {
            super.execute(command);
        } catch (EsRejectedExecutionException ex) {
            if (command instanceof AbstractRunnable) {
                // If we are an abstract runnable we can handle the rejection
                // directly and don't need to rethrow it.
                try {
                    ((AbstractRunnable) command).onRejection(ex);
                } finally {
                    ((AbstractRunnable) command).onAfter();
​
                }
            } else {
                throw ex;
            }
        }
    }
​
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        EsExecutors.rethrowErrors(unwrap(r));
        assert assertDefaultContext(r);
    }
​
    private boolean assertDefaultContext(Runnable r) {
        try {
            assert contextHolder.isDefaultContext() : "the thread context is not the default context and the thread [" +
                Thread.currentThread().getName() + "] is being returned to the pool after executing [" + r + "]";
        } catch (IllegalStateException ex) {
            // sometimes we execute on a closed context and isDefaultContext doen't bypass the ensureOpen checks
            // this must not trigger an exception here since we only assert if the default is restored and
            // we don't really care if we are closed
            if (contextHolder.isClosed() == false) {
                throw ex;
            }
        }
        return true;
    }
​
    /**
     * Returns a stream of all pending tasks. This is similar to {@link #getQueue()} but will expose the originally submitted
     * {@link Runnable} instances rather than potentially wrapped ones.
     */
    public Stream<Runnable> getTasks() {
        return this.getQueue().stream().map(this::unwrap);
    }
​
    @Override
    public final String toString() {
        StringBuilder b = new StringBuilder();
        b.append(getClass().getSimpleName()).append('[');
        b.append("name = ").append(name).append(", ");
        if (getQueue() instanceof SizeBlockingQueue) {
            @SuppressWarnings("rawtypes")
            SizeBlockingQueue queue = (SizeBlockingQueue) getQueue();
            b.append("queue capacity = ").append(queue.capacity()).append(", ");
        }
        appendThreadPoolExecutorDetails(b);
        /*
         * ThreadPoolExecutor has some nice information in its toString but we
         * can't get at it easily without just getting the toString.
         */
        b.append(super.toString()).append(']');
        return b.toString();
    }
​
    /**
     * Append details about this thread pool to the specified {@link StringBuilder}. All details should be appended as key/value pairs in
     * the form "%s = %s, "
     *
     * @param sb the {@link StringBuilder} to append to
     */
    protected void appendThreadPoolExecutorDetails(final StringBuilder sb) {
​
    }
​
    protected Runnable wrapRunnable(Runnable command) {
        return contextHolder.preserveContext(command);
    }
​
    protected Runnable unwrap(Runnable runnable) {
        return contextHolder.unwrap(runnable);
    }
}
  • EsThreadPoolExecutor继承了ThreadPoolExecutor,它提供了两个构造器,它们要求RejectedExecutionHandler为XRejectedExecutionHandler类型,其中一个构造器默认为EsAbortPolicy,它们还要求传入ThreadContext
  • 它覆盖了terminated、execute、afterExecute方法,其中terminated方法会回调listener.onTerminated();execute方法会捕获EsRejectedExecutionException异常,在command为AbstractRunnable类型时回调其onRejection及onAfter方法;afterExecute方法会执行EsExecutors.rethrowErrors(unwrap(r))方法
  • 它提供了wrapRunnable及unwrap方法,分别会调用contextHolder.preserveContext及contextHolder.unwrap方法

XRejectedExecutionHandler

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/XRejectedExecutionHandler.java

public interface XRejectedExecutionHandler extends RejectedExecutionHandler {
​
    /**
     * The number of rejected executions.
     */
    long rejected();
}
  • XRejectedExecutionHandler接口继承了RejectedExecutionHandler接口,它定义了rejected方法返回rejected的数量;它有两个实现类分别为EsAbortPolicy及ForceQueuePolicy

EsAbortPolicy

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/EsAbortPolicy.java

public class EsAbortPolicy implements XRejectedExecutionHandler {
    private final CounterMetric rejected = new CounterMetric();
​
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        if (r instanceof AbstractRunnable) {
            if (((AbstractRunnable) r).isForceExecution()) {
                BlockingQueue<Runnable> queue = executor.getQueue();
                if (!(queue instanceof SizeBlockingQueue)) {
                    throw new IllegalStateException("forced execution, but expected a size queue");
                }
                try {
                    ((SizeBlockingQueue) queue).forcePut(r);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new IllegalStateException("forced execution, but got interrupted", e);
                }
                return;
            }
        }
        rejected.inc();
        throw new EsRejectedExecutionException("rejected execution of " + r + " on " + executor, executor.isShutdown());
    }
​
    @Override
    public long rejected() {
        return rejected.count();
    }
}
  • EsAbortPolicy实现了XRejectedExecutionHandler接口,其内部使用CounterMetric类维护rejected数量,而rejected方法直接返回该值;rejectedExecution方法对AbstractRunnable类型的runnable会判断是否isForceExecution,且是SizeBlockingQueue,则调用SizeBlockingQueue的forcePut方法重新force执行该runnable,之后就是递增rejected计数

ForceQueuePolicy

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java

    static class ForceQueuePolicy implements XRejectedExecutionHandler {
​
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                // force queue policy should only be used with a scaling queue
                assert executor.getQueue() instanceof ExecutorScalingQueue;
                executor.getQueue().put(r);
            } catch (final InterruptedException e) {
                // a scaling queue never blocks so a put to it can never be interrupted
                throw new AssertionError(e);
            }
        }
​
        @Override
        public long rejected() {
            return 0;
        }
​
    }
  • ForceQueuePolicy实现了XRejectedExecutionHandler接口,它的rejectedExecution方法仅仅对ExecutorScalingQueue进行重新入队操作,而rejected方法返回0

AbstractRunnable

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractRunnable.java

public abstract class AbstractRunnable implements Runnable {
​
    /**
     * Should the runnable force its execution in case it gets rejected?
     */
    public boolean isForceExecution() {
        return false;
    }
​
    @Override
    public final void run() {
        try {
            doRun();
        } catch (Exception t) {
            onFailure(t);
        } finally {
            onAfter();
        }
    }
​
    /**
     * This method is called in a finally block after successful execution
     * or on a rejection.
     */
    public void onAfter() {
        // nothing by default
    }
​
    /**
     * This method is invoked for all exception thrown by {@link #doRun()}
     */
    public abstract void onFailure(Exception e);
​
    /**
     * This should be executed if the thread-pool executing this action rejected the execution.
     * The default implementation forwards to {@link #onFailure(Exception)}
     */
    public void onRejection(Exception e) {
        onFailure(e);
    }
​
    /**
     * This method has the same semantics as {@link Runnable#run()}
     * @throws InterruptedException if the run method throws an InterruptedException
     */
    protected abstract void doRun() throws Exception;
}
  • AbstractRunnable声明实现Runnable接口,它的run方法分别会回调doRun、onFailure、onAfter方法;另外它还定义了isForceExecution方法用于确定当rejected的时候是否force execution

小结

  • EsThreadPoolExecutor继承了ThreadPoolExecutor,它提供了两个构造器,它们要求RejectedExecutionHandler为XRejectedExecutionHandler类型,其中一个构造器默认为EsAbortPolicy,它们还要求传入ThreadContext
  • 它覆盖了terminated、execute、afterExecute方法,其中terminated方法会回调listener.onTerminated();execute方法会捕获EsRejectedExecutionException异常,在command为AbstractRunnable类型时回调其onRejection及onAfter方法;afterExecute方法会执行EsExecutors.rethrowErrors(unwrap(r))方法
  • XRejectedExecutionHandler接口继承了RejectedExecutionHandler接口,它定义了rejected方法返回rejected的数量;它有两个实现类分别为EsAbortPolicy及ForceQueuePolicy
  • EsAbortPolicy实现了XRejectedExecutionHandler接口,其内部使用CounterMetric类维护rejected数量,而rejected方法直接返回该值;rejectedExecution方法对AbstractRunnable类型的runnable会判断是否isForceExecution,且是SizeBlockingQueue,则调用SizeBlockingQueue的forcePut方法重新force执行该runnable,之后就是递增rejected计数
  • ForceQueuePolicy实现了XRejectedExecutionHandler接口,它的rejectedExecution方法仅仅对ExecutorScalingQueue进行重新入队操作,而rejected方法返回0
  • AbstractRunnable声明实现Runnable接口,它的run方法分别会回调doRun、onFailure、onAfter方法;另外它还定义了isForceExecution方法用于确定当rejected的时候是否force execution

doc

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • EsThreadPoolExecutor
  • XRejectedExecutionHandler
    • EsAbortPolicy
      • ForceQueuePolicy
      • AbstractRunnable
      • 小结
      • doc
      相关产品与服务
      Elasticsearch Service
      腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档