前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >disruptor 3.4.2版本使用BasicExecutor

disruptor 3.4.2版本使用BasicExecutor

作者头像
山行AI
发布2019-06-28 11:23:02
2K0
发布2019-06-28 11:23:02
举报
文章被收录于专栏:山行AI

disruptor 3.4.2版本使用BasicExecutor

在3.4.2版本中,Disruptor的传入自定义线程池的构造方法被废弃掉了,推荐的是传入ThreadFactory的构造方法,该方法内部使用的是BasicExecutor,代码如下:

代码语言:javascript
复制
  /**     * Create a new Disruptor.     *     * @deprecated Use a {@link ThreadFactory} instead of an {@link Executor} as a the ThreadFactory     * is able to report errors when it is unable to construct a thread to run a producer.     *     * @param eventFactory   the factory to create events in the ring buffer.     * @param ringBufferSize the size of the ring buffer, must be power of 2.     * @param executor       an {@link Executor} to execute event processors.     * @param producerType   the claim strategy to use for the ring buffer.     * @param waitStrategy   the wait strategy to use for the ring buffer.     */    @Deprecated    public Disruptor(        final EventFactory<T> eventFactory,        final int ringBufferSize,        final Executor executor,        final ProducerType producerType,        final WaitStrategy waitStrategy)    {        this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy), executor);    }
    /**       * Create a new Disruptor.       *       * @param eventFactory   the factory to create events in the ring buffer.       * @param ringBufferSize the size of the ring buffer, must be power of 2.       * @param threadFactory  a {@link ThreadFactory} to create threads for processors.       * @param producerType   the claim strategy to use for the ring buffer.       * @param waitStrategy   the wait strategy to use for the ring buffer.       */      public Disruptor(              final EventFactory<T> eventFactory,              final int ringBufferSize,              final ThreadFactory threadFactory,              final ProducerType producerType,              final WaitStrategy waitStrategy)      {          this(              RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),              new BasicExecutor(threadFactory));      }  

其中BasicExecutor的定义为:

代码语言:javascript
复制
public class BasicExecutor implements Executor{    private final ThreadFactory factory;    private final Queue<Thread> threads = new ConcurrentLinkedQueue<>();
    public BasicExecutor(ThreadFactory factory)    {        this.factory = factory;    }
    @Override    public void execute(Runnable command)    {        final Thread thread = factory.newThread(command);        if (null == thread)        {            throw new RuntimeException("Failed to create thread to run: " + command);        }
        thread.start();
        threads.add(thread);    }
    @Override    public String toString()    {        return "BasicExecutor{" +            "threads=" + dumpThreadInfo() +            '}';    }
    private String dumpThreadInfo()    {        final StringBuilder sb = new StringBuilder();
        final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
        for (Thread t : threads)        {            ThreadInfo threadInfo = threadMXBean.getThreadInfo(t.getId());            sb.append("{");            sb.append("name=").append(t.getName()).append(",");            sb.append("id=").append(t.getId()).append(",");            sb.append("state=").append(threadInfo.getThreadState()).append(",");            sb.append("lockInfo=").append(threadInfo.getLockInfo());            sb.append("}");        }
        return sb.toString();    }}

关于这个问题可以参考github上的一个issue:https://github.com/LMAX-Exchange/disruptor/issues/148

其中一段说明是:

代码语言:javascript
复制
The reason for moving to ThreadFactory insteads of Executor is that an Executor is not really the right abstraction. Control over the number of threads required is set by the Disruptor, but many Executor implementations specify this themselves. A common error for new starters is that they have 2 event handlers and pass a SingleThreadedExecutorService to the DSL.

这么做的一个原因是Executor不是一个真正的正确的抽象,disruptor会控制线程的个数,但是很多Executor的实现自己定义了这些。一个常见的错误就是一些新手拥有两个event handler但是却传入一个SingleThreadedExecutorService到DSL中。

代码语言:javascript
复制
The number of consumers is dictated by the number of handlers that you create. With an Executor if not enough threads were allowed the Disruptor would simply hang without useful explanation. So the "one producer one consumer" thing is orthogonal to using ThreadFactories over Executors.
If you want to prevent the creation of more than one consumer thread, then it is pretty easy to write a custom ThreadFactory that provides that restriction and will fail in a meaningful way.

消费者的数量是由你创建的handlers的数量决定的。传入的Executor如果没有足够多的线程,disruptor会被挂起。 如果你只想创建一个线程来消费,一个方法是:

代码语言:javascript
复制
public class LimitedThreadFactory implements ThreadFactory{    private final AtomicInteger count == new AtomicInteger(0);
    public Thread newThread(Runnable r)    {        if (count.compareAndSet(0, 1))        {            return new Thread(r);        }        else        {            throw new IllegalStateException("Created more that one thread");        }    }}

比较详细的一点说明是(就不再一一翻译啦):

代码语言:javascript
复制
The Disruptor requires 1 thread per EventProcessor/EventHandler. One quite common bug is that someone would create a single threaded executor and create a Disruptor with 2 event handlers. In that case the Disruptor would hang indefinitely at start up. The idea that "OK, framework wants me to provide a pool to execute consumers jobs" for the Disruptor is the wrong. What the Disruptor actually wants is to create a number of new threads (one per EventProcessor), which will be owned for the lifetime of the Disruptor. In this case a ThreadFactory better describes what the Disruptor is intending to do (i.e. create threads).
The reason why a BasicExecutor is constructed internally is that I needed to preserve backwards compatibility. Using an Executor is deprecated, but still works. I didn't want to have 2 separate code paths and it is easy to make a ThreadFactory behave like a Executor, but difficult to do the inverse.
The BasicExecutor and support for passing in an Executor will be removed on the next major release (version 4).

参考:https://github.com/LMAX-Exchange/disruptor/issues/148

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-06-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 开发架构二三事 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • disruptor 3.4.2版本使用BasicExecutor
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档