在3.4.2版本中,Disruptor的传入自定义线程池的构造方法被废弃掉了,推荐的是传入ThreadFactory的构造方法,该方法内部使用的是BasicExecutor,代码如下:
/** * Create a new Disruptor. * * @deprecated Use a {@link ThreadFactory} instead of an {@link Executor} as a the ThreadFactory * is able to report errors when it is unable to construct a thread to run a producer. * * @param eventFactory the factory to create events in the ring buffer. * @param ringBufferSize the size of the ring buffer, must be power of 2. * @param executor an {@link Executor} to execute event processors. * @param producerType the claim strategy to use for the ring buffer. * @param waitStrategy the wait strategy to use for the ring buffer. */ @Deprecated public Disruptor( final EventFactory<T> eventFactory, final int ringBufferSize, final Executor executor, final ProducerType producerType, final WaitStrategy waitStrategy) { this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy), executor); }
/** * Create a new Disruptor. * * @param eventFactory the factory to create events in the ring buffer. * @param ringBufferSize the size of the ring buffer, must be power of 2. * @param threadFactory a {@link ThreadFactory} to create threads for processors. * @param producerType the claim strategy to use for the ring buffer. * @param waitStrategy the wait strategy to use for the ring buffer. */ public Disruptor( final EventFactory<T> eventFactory, final int ringBufferSize, final ThreadFactory threadFactory, final ProducerType producerType, final WaitStrategy waitStrategy) { this( RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy), new BasicExecutor(threadFactory)); }
其中BasicExecutor的定义为:
public class BasicExecutor implements Executor{ private final ThreadFactory factory; private final Queue<Thread> threads = new ConcurrentLinkedQueue<>();
public BasicExecutor(ThreadFactory factory) { this.factory = factory; }
@Override public void execute(Runnable command) { final Thread thread = factory.newThread(command); if (null == thread) { throw new RuntimeException("Failed to create thread to run: " + command); }
thread.start();
threads.add(thread); }
@Override public String toString() { return "BasicExecutor{" + "threads=" + dumpThreadInfo() + '}'; }
private String dumpThreadInfo() { final StringBuilder sb = new StringBuilder();
final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
for (Thread t : threads) { ThreadInfo threadInfo = threadMXBean.getThreadInfo(t.getId()); sb.append("{"); sb.append("name=").append(t.getName()).append(","); sb.append("id=").append(t.getId()).append(","); sb.append("state=").append(threadInfo.getThreadState()).append(","); sb.append("lockInfo=").append(threadInfo.getLockInfo()); sb.append("}"); }
return sb.toString(); }}
关于这个问题可以参考github上的一个issue:https://github.com/LMAX-Exchange/disruptor/issues/148
其中一段说明是:
The reason for moving to ThreadFactory insteads of Executor is that an Executor is not really the right abstraction. Control over the number of threads required is set by the Disruptor, but many Executor implementations specify this themselves. A common error for new starters is that they have 2 event handlers and pass a SingleThreadedExecutorService to the DSL.
这么做的一个原因是Executor不是一个真正的正确的抽象,disruptor会控制线程的个数,但是很多Executor的实现自己定义了这些。一个常见的错误就是一些新手拥有两个event handler但是却传入一个SingleThreadedExecutorService到DSL中。
The number of consumers is dictated by the number of handlers that you create. With an Executor if not enough threads were allowed the Disruptor would simply hang without useful explanation. So the "one producer one consumer" thing is orthogonal to using ThreadFactories over Executors.
If you want to prevent the creation of more than one consumer thread, then it is pretty easy to write a custom ThreadFactory that provides that restriction and will fail in a meaningful way.
消费者的数量是由你创建的handlers的数量决定的。传入的Executor如果没有足够多的线程,disruptor会被挂起。 如果你只想创建一个线程来消费,一个方法是:
public class LimitedThreadFactory implements ThreadFactory{ private final AtomicInteger count == new AtomicInteger(0);
public Thread newThread(Runnable r) { if (count.compareAndSet(0, 1)) { return new Thread(r); } else { throw new IllegalStateException("Created more that one thread"); } }}
比较详细的一点说明是(就不再一一翻译啦):
The Disruptor requires 1 thread per EventProcessor/EventHandler. One quite common bug is that someone would create a single threaded executor and create a Disruptor with 2 event handlers. In that case the Disruptor would hang indefinitely at start up. The idea that "OK, framework wants me to provide a pool to execute consumers jobs" for the Disruptor is the wrong. What the Disruptor actually wants is to create a number of new threads (one per EventProcessor), which will be owned for the lifetime of the Disruptor. In this case a ThreadFactory better describes what the Disruptor is intending to do (i.e. create threads).
The reason why a BasicExecutor is constructed internally is that I needed to preserve backwards compatibility. Using an Executor is deprecated, but still works. I didn't want to have 2 separate code paths and it is easy to make a ThreadFactory behave like a Executor, but difficult to do the inverse.
The BasicExecutor and support for passing in an Executor will be removed on the next major release (version 4).
参考:https://github.com/LMAX-Exchange/disruptor/issues/148