本文基于最新的3.4.2的版本文档进行翻译,翻译自: https://github.com/LMAX-Exchange/disruptor/wiki/Introduction https://github.com/LMAX-Exchange/disruptor/wiki/Getting-Started
最好的方法去理解Disruptor就是将它和容易理解并且相似的队列,例如BlockingQueue。Disruptor其实就像一个队列一样,用于在不同的线程之间迁移数据,但是Disruptor也实现了一些其他队列没有的特性,如:
这是Disruptor和队列最大的区别。当你有多个消费者监听了一个Disruptor,所有的事件将会被发布到所有的消费者中,相比之下队列的一个事件只能被发到一个消费者中。Disruptor这一特性被用来需要对同一数据进行多个并行操作的情况。如在LMAX系统中有三个操作可以同时进行:日志(将数据持久到日志文件中),复制(将数据发送到其他的机器上,以确保存在数据远程副本),业务逻辑处理。也可以使用WokrerPool来并行处理不同的事件。
为了支持真实世界中的业务并行处理流程,Disruptor提供了多个消费者之间的协助功能。回到上面的LMAX的例子,我们可以让日志处理和远程副本赋值先执行完之后再执行业务处理流程,这个功能被称之为gating。gating发生在两种场景中。第一,我们需要确保生产者不要超过消费者。通过调用RingBuffer.addGatingConsumers()增加相关的消费者至Disruptor来完成。第二,就是之前所说的场景,通过构造包含需要必须先完成的消费者的Sequence的SequenceBarrier来实现。
引用上面的例子来说,有三个消费者监听来自RingBuffer的事件。这里有一个依赖关系图。ApplicationConsumer依赖JournalConsumer和ReplicationConsumer。这个意味着JournalConsumer和ReplicationConsumer可以自由的并发运行。依赖关系可以看成是从ApplicationConsumer的SequenceBarrier到JournalConsumer和ReplicationConsumer的Sequence的连接。还有一点值得关注,Sequencer与下游的消费者之间的关系。它的角色是确保发布不会包裹RingBuffer。为此,所有下游消费者的Sequence不能比ring buffer的Sequence小且不能比ring buffer 的大小小。因为ApplicationConsumers的Sequence是确保比JournalConsumer和ReplicationConsumer的Sequence小或等于,所以Sequencer只需要检查ApplicationConsumers的Sequence。在更为普遍的应用场景中,Sequencer只需要意识到消费者树中的叶子节点的的Sequence即可。
Disruptor的一个目标之一是被用在低延迟的环境中。在一个低延迟系统中有必要去减少和降低内存的占用。在基于Java的系统中,需要减少由于GC导致的停顿次数(在低延迟的C/C++系统中,由于内存分配器的争用,大量的内存分配也会导致问题)。
为了满足这点,用户可以在Disruptor中为事件预分配内存。所以EventFactory是用户来提供,并且Disruptor的Ring Buffer每个entry中都会被调用。当将新的数据发布到Disruptor中时,Disruptor的API将会允许用户持有所构造的对象,以便用户可以调用这些对象的方法和更新字段到这些对象中。Disruptor将确保这些操作是线程安全。
无锁算法实现的Disruptor的所有内存可见性和正确性都使用内存屏障和CAS操作实现。只仅仅一个场景BlockingWaitStrategy中使用到了lock。而这仅仅是为了使用Condition,以便消费者线程能被park住当在等待一个新的事件到来的时候。许多低延迟系统都使用自旋(busy-wait)来避免使用Condition造成的抖动。但是自旋(busy-wait)的数量变多时将会导致性能的下降,特别是CPU资源严重受限的情况下。例如,在虚拟环境中的Web服务器。
我们使用一个简单的例子来体验一下Disruptor。生产者会传递一个long类型的值到消费者,消费者接受到这个值后会打印出这个值。
public class LongEvent
{
private long value;
public void set(long value)
{
this.value = value;
}
}
为了使用Disruptor的内存预分配event,我们需要定义一个EventFactory:
import com.lmax.disruptor.EventFactory;
public class LongEventFactory implements EventFactory<LongEvent>
{
public LongEvent newInstance()
{
return new LongEvent();
}
}
为了让消费者处理这些事件,所以我们这里定义一个事件处理器,负责打印event:
import com.lmax.disruptor.EventHandler;
public class LongEventHandler implements EventHandler<LongEvent>
{
public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
{
System.out.println("Event: " + event);
}
}
在Disruptor的3.0版本中,由于加入了丰富的Lambda风格的API,可以用来帮组开发人员简化流程。所以在3.0版本后首选使用Event Publisher/Event Translator来发布事件。
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.EventTranslatorOneArg;
public class LongEventProducerWithTranslator
{
private final RingBuffer<LongEvent> ringBuffer;
public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer)
{
this.ringBuffer = ringBuffer;
}
private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR =
new EventTranslatorOneArg<LongEvent, ByteBuffer>()
{
public void translateTo(LongEvent event, long sequence, ByteBuffer bb)
{
event.set(bb.getLong(0));
}
};
public void onData(ByteBuffer bb)
{
ringBuffer.publishEvent(TRANSLATOR, bb);
}
}
这种方法的另一个优点是可以将translator代码放入一个单独的类中,并且可以轻松地对它们进行独立的单元测试。
import com.lmax.disruptor.RingBuffer;
public class LongEventProducer
{
private final RingBuffer<LongEvent> ringBuffer;
public LongEventProducer(RingBuffer<LongEvent> ringBuffer)
{
this.ringBuffer = ringBuffer;
}
public void onData(ByteBuffer bb)
{
long sequence = ringBuffer.next(); // Grab the next sequence
try
{
LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor
// for the sequence
event.set(bb.getLong(0)); // Fill with data
}
finally
{
ringBuffer.publish(sequence);
}
}
}
这里我们需要把发布包裹在try/finally代码块中。如果某个请求的 sequence 未被提交,将会堵塞后续的发布操作或者其它的 producer。特别地在多生产中如果没有提交Sequence,那么会造成消费者停滞,导致只能重启消费者才能恢复。
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.util.DaemonThreadFactory;
import java.nio.ByteBuffer;
public class LongEventMain
{
public static void main(String[] args) throws Exception
{
// The factory for the event
LongEventFactory factory = new LongEventFactory();
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 1024;
// Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE);
// Connect the handler
disruptor.handleEventsWith(new LongEventHandler());
// Start the Disruptor, starts all threads running
disruptor.start();
// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
LongEventProducer producer = new LongEventProducer(ringBuffer);
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++)
{
bb.putLong(0, l);
producer.onData(bb);
Thread.sleep(1000);
}
}
}
我们也可以使用Java 8的函数式编程来写这个例子:
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.util.DaemonThreadFactory;
import java.nio.ByteBuffer;
public class LongEventMain
{
public static void main(String[] args) throws Exception
{
// Specify the size of the ring buffer, must be power of 2.
int bufferSize = 1024;
// Construct the Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);
// Connect the handler
disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("Event: " + event));
// Start the Disruptor, starts all threads running
disruptor.start();
// Get the ring buffer from the Disruptor to be used for publishing.
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++)
{
bb.putLong(0, l);
ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb);
Thread.sleep(1000);
}
}
}
使用函数式编程我们可以发现很多的类都不需要了,如:handler,translator等。 上面的代码还可以再简化一下:
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++)
{
bb.putLong(0, l);
ringBuffer.publishEvent((event, sequence) -> event.set(bb.getLong(0)));
Thread.sleep(1000);
}
不过这样将实例化一个对象去持有ByteBuffer bb
变量传入lambda的值。这会产生不必要的垃圾。因此,如果要求低GC压力,则应首选将参数传递给lambda的调用。
如果想要让Disruptor拥有更好的性能这里有两个选项可以调整,wait strategy 和 producer的类型。
最好的方法在并发环境下提高性能是坚持单独写原则( Single Writer Principle)。如果你的业务场景中只有一个线程写入数据到Disruptor,那么你可以设置成单生产者来提升性能:
public class LongEventMain
{
public static void main(String[] args) throws Exception
{
//.....
// Construct the Disruptor with a SingleProducerSequencer
Disruptor<LongEvent> disruptor = new Disruptor(
factory, bufferSize, DaemonThreadFactory.INSTANCE, ProducerType.SINGLE, new BlockingWaitStrategy());
//.....
}
}
性能测试: Multiple Producer
Run 0, Disruptor=26,553,372 ops/sec
Run 1, Disruptor=28,727,377 ops/sec
Run 2, Disruptor=29,806,259 ops/sec
Run 3, Disruptor=29,717,682 ops/sec
Run 4, Disruptor=28,818,443 ops/sec
Run 5, Disruptor=29,103,608 ops/sec
Run 6, Disruptor=29,239,766 ops/sec
Single Producer
Run 0, Disruptor=89,365,504 ops/sec
Run 1, Disruptor=77,579,519 ops/sec
Run 2, Disruptor=78,678,206 ops/sec
Run 3, Disruptor=80,840,743 ops/sec
Run 4, Disruptor=81,037,277 ops/sec
Run 5, Disruptor=81,168,831 ops/sec
Run 6, Disruptor=81,699,346 ops/sec
BlockingWaitStrategy Disruptor的默认策略是BlockingWaitStrategy。在BlockingWaitStrategy内部是使用锁和condition来控制线程的唤醒。BlockingWaitStrategy是最低效的策略,但其对CPU的消耗最小并且在各种不同部署环境中能提供更加一致的性能表现
SleepingWaitStrategy SleepingWaitStrategy 的性能表现跟 BlockingWaitStrategy 差不多,对 CPU 的消耗也类似,但其对生产者线程的影响最小,通过使用LockSupport.parkNanos(1)来实现循环等待。一般来说Linux系统会暂停一个线程约60µs,这样做的好处是,生产线程不需要采取任何其他行动就可以增加适当的计数器,也不需要花费时间信号通知条件变量。但是,在生产者线程和使用者线程之间移动事件的平均延迟会更高。它在不需要低延迟并且对生产线程的影响较小的情况最好。一个常见的用例是异步日志记录。
YieldingWaitStrategy YieldingWaitStrategy是可以使用在低延迟系统的策略之一。YieldingWaitStrategy将自旋以等待序列增加到适当的值。在循环体内,将调用Thread.yield(),以允许其他排队的线程运行。在要求极高性能且事件处理线数小于 CPU 逻辑核心数的场景中,推荐使用此策略;例如,CPU开启超线程的特性。
BusySpinWaitStrategy 性能最好,适合用于低延迟的系统。在要求极高性能且事件处理线程数小于CPU逻辑核心树的场景中,推荐使用此策略;例如,CPU开启超线程的特性。
通过Disruptor传递数据时,对象的生存期可能比预期的更长。为避免发生这种情况,可能需要在处理事件后清除事件。如果只有一个事件处理程序,则需要在处理器中清除对应的对象。如果您有一连串的事件处理程序,则可能需要在该链的末尾放置一个特定的处理程序来处理清除对象。
class ObjectEvent<T>
{
T val;
void clear()
{
val = null;
}
}
public class ClearingEventHandler<T> implements EventHandler<ObjectEvent<T>>
{
public void onEvent(ObjectEvent<T> event, long sequence, boolean endOfBatch)
{
// Failing to call clear here will result in the
// object associated with the event to live until
// it is overwritten once the ring buffer has wrapped
// around to the beginning.
event.clear();
}
}
public static void main(String[] args)
{
Disruptor<ObjectEvent<String>> disruptor = new Disruptor<>(
() -> ObjectEvent<String>(), bufferSize, DaemonThreadFactory.INSTANCE);
disruptor
.handleEventsWith(new ProcessingEventHandler())
.then(new ClearingObjectHandler());
}