SequenceBarrier是消费者与Ringbuffer之间建立消费关系的桥梁,同时也是消费者与消费者之间消费依赖的抽象。
SequenceBarrier只有一个实现类,就是ProcessingSequenceBarrier。ProcessingSequenceBarrier由生产者Sequencer,消费定位cursorSequence,等待策略waitStrategy还有一组依赖sequence:dependentSequence组成:
public ProcessingSequenceBarrier(
final Sequencer sequencer,
final WaitStrategy waitStrategy,
final Sequence cursorSequence,
final Sequence[] dependentSequences)
{
this.sequencer = sequencer;
this.waitStrategy = waitStrategy;
this.cursorSequence = cursorSequence;
if (0 == dependentSequences.length)
{
dependentSequence = cursorSequence;
}
else
{
dependentSequence = new FixedSequenceGroup(dependentSequences);
}
}
首先,为了实现消费依赖,SequenceBarrier肯定有一个获取可以消费的sequence方法,就是
long waitFor(long sequence) throws AlertException, InterruptedException, TimeoutException;
实现为:
@Override
public long waitFor(final long sequence)
throws AlertException, InterruptedException, TimeoutException
{
//检查是否alerted
checkAlert();
//通过等待策略获取下一个可消费的sequence,这个sequence通过之前的讲解可以知道,需要大于cursorSequence和dependentSequence,我们可以通过dependentSequence实现先后消费
long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
//等待可能被中断,所以检查下availableSequence是否小于sequence
if (availableSequence < sequence)
{
return availableSequence;
}
//如果不小于,返回所有sequence(可能多生产者)和availableSequence中最大的
return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}
其他方法实现很简单,功能上分别有: 1. 获取当前cursorSequence(并没有什么用,就是为了监控) 2. 负责中断和恢复的alert标记
@Override
public long getCursor()
{
return dependentSequence.get();
}
@Override
public boolean isAlerted()
{
return alerted;
}
@Override
public void alert()
{
alerted = true;
waitStrategy.signalAllWhenBlocking();
}
@Override
public void clearAlert()
{
alerted = false;
}
@Override
public void checkAlert() throws AlertException
{
if (alerted)
{
throw AlertException.INSTANCE;
}
}
构造SequenceBarrier在框架中只有一个入口,就是AbstractSequencer的:
public SequenceBarrier newBarrier(Sequence... sequencesToTrack)
{
return new ProcessingSequenceBarrier(this, waitStrategy, cursor, sequencesToTrack);
}
通过SequenceBarrier,我们可以实现消费之间的依赖关系,但是,消费方式(比如广播,群组消费等等),需要通过SequenceProcessor的实现类实现:
通过类依赖关系我们发现,EventProcessor都是拓展了Runnable接口,也就是我们可以把它们当做线程处理。
它的构造方法:
/**
* 构造一个消费者之间非互斥消费的消费者
*
* @param dataProvider 对应的RingBuffer
* @param sequenceBarrier 依赖关系,通过构造不同的sequenceBarrier用互相的dependentsequence,我们可以构造出先后消费关系
* @param eventHandler 用户实现的处理消费的event的业务消费者.
*/
public BatchEventProcessor(
final DataProvider dataProvider,
final SequenceBarrier sequenceBarrier,
final EventHandlersuper T> eventHandler)
{
this.dataProvider = dataProvider;
this.sequenceBarrier = sequenceBarrier;
this.eventHandler = eventHandler;
if (eventHandler instanceof SequenceReportingEventHandler)
{
((SequenceReportingEventHandler) eventHandler).setSequenceCallback(sequence);
}
timeoutHandler = (eventHandler instanceof TimeoutHandler) ? (TimeoutHandler) eventHandler : null;
}
线程为一个死循环:
@Override
public void run()
{
//检查状态
if (!running.compareAndSet(false, true))
{
throw new IllegalStateException("Thread is already running");
}
//清理
sequenceBarrier.clearAlert();
//如果用户实现的EventHandler继承了LifecycleAware,则执行其onStart方法
notifyStart();
T event = null;
//sequence初始值为-1,设计上当前值是已经消费过的
long nextSequence = sequence.get() + 1L;
try
{
while (true)
{
try
{
//获取当前可以消费的最大sequence
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
while (nextSequence <= availableSequence)
{
//获取并处理
event = dataProvider.get(nextSequence);
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}
//设置当前sequence,注意,出现异常需要特殊处理,防止重复消费
sequence.set(availableSequence);
}
catch (final TimeoutException e)
{
//wait超时异常
notifyTimeout(sequence.get());
}
catch (final AlertException ex)
{
//中断异常
if (!running.get())
{
break;
}
}
catch (final Throwable ex)
{
exceptionHandler.handleEventException(ex, nextSequence, event);
//如果出现异常则设置为nextSequence
sequence.set(nextSequence);
nextSequence++;
}
}
}
finally
{
//如果用户实现的EventHandler继承了LifecycleAware,则执行其onShutdown方法
notifyShutdown();
running.set(false);
}
}
可以看出: 1. BatchEventProcessor可以处理超时,可以处理中断,可以通过用户实现的异常处理类处理异常,同时,发生异常之后再次启动,不会漏消费,也不会重复消费。 2. 不同的BatchEventProcessor之间通过SequenceBarrier进行依赖消费。原理如下图所示:
假设我们有三个消费者BatchEventProcessor1,BatchEventProcessor2,BatchEventProcessor3. 1需要先于2和3消费,那么构建BatchEventProcessor和SequenceBarrier时,我们需要让BatchEventProcessor2和BatchEventProcessor3的SequenceBarrier的dependentSequence中加入SequenceBarrier1的sequence。 其实这里2和3共用一个SequenceBarrier就行。
另一种消费者是WorkProcessor。利用它,可以实现互斥消费,同样的利用SequenceBarrier可以实现消费顺序
public void run()
{
if (!running.compareAndSet(false, true))
{
throw new IllegalStateException("Thread is already running");
}
sequenceBarrier.clearAlert();
notifyStart();
boolean processedSequence = true;
long cachedAvailableSequence = Long.MIN_VALUE;
long nextSequence = sequence.get();
T event = null;
while (true)
{
try
{
if (processedSequence)
{
processedSequence = false;
//获取下一个可以消费的Sequence
do
{
nextSequence = workSequence.get() + 1L;
sequence.set(nextSequence - 1L);
}
//多个WorkProcessor之间,如果共享一个workSequence,那么,可以实现互斥消费,因为只有一个线程可以CAS更新成功
while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));
}
if (cachedAvailableSequence >= nextSequence)
{
event = ringBuffer.get(nextSequence);
workHandler.onEvent(event);
processedSequence = true;
}
else
{
cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);
}
}
catch (final TimeoutException e)
{
notifyTimeout(sequence.get());
}
catch (final AlertException ex)
{
if (!running.get())
{
break;
}
}
catch (final Throwable ex)
{
// handle, mark as processed, unless the exception handler threw an exception
exceptionHandler.handleEventException(ex, nextSequence, event);
processedSequence = true;
}
}
notifyShutdown();
running.set(false);
}
多个WorkerProcessor可以组成一个WorkerPool:
public WorkerPool(
final RingBuffer ringBuffer,
final SequenceBarrier sequenceBarrier,
final ExceptionHandlersuper T> exceptionHandler,
final WorkHandlersuper T>... workHandlers)
{
this.ringBuffer = ringBuffer;
final int numWorkers = workHandlers.length;
workProcessors = new WorkProcessor[numWorkers];
for (int i = 0; i < numWorkers; i++)
{
workProcessors[i] = new WorkProcessor(
ringBuffer,
sequenceBarrier,
workHandlers[i],
exceptionHandler,
workSequence);
}
}
里面的 workHandlers[i]共享同一个workSequence,所以,同一个WorkerPool内,是互斥消费。