disruptor经过几年的发展,似乎已经成为性能优化的大杀器,几乎每个想优化性能的项目宣称自己用上了disruptor,性能都会呈现质的跃进。毕竟,最好的例子就是LMAX自己的架构设计,支撑了600w/s的吞吐。
本文试图从代码层面将关键问题做些解答。
Disruptor: 实际上就是整个基于ringBuffer实现的生产者消费者模式的容器。
RingBuffer: 著名的环形队列,可以类比为BlockingQueue之类的队列,ringBuffer的使用,使得内存被循环使用,减少了某些场景的内存分配回收扩容等耗时操作。
EventProcessor: 事件处理器,实际上可以理解为消费者模型的框架,实现了线程Runnable的run方法,将循环判断等操作封在了里面。
EventHandler: 事件处置器,与前面处理器的不同是,事件处置器不负责框架内的行为,仅仅是EventProcessor作为消费者框架对外预留的扩展点罢了。
Sequencer: 作为RingBuffer生产者的父接口,其直接实现有SingleProducerSequencer和MultiProducerSequencer。
EventTranslator: 事件转换器。实际上就是新事件向旧事件覆盖的接口定义。
SequenceBarrier: 消费者路障。规定了消费者如何向下走。都说disruptor无锁,事实上,该路障算是变向的锁。
WaitStrategy: 当生产者生产得太快而消费者消费得太慢时的等待策略。
把上面几个关键概念画个图,大概长这样:
所以接下来主要也就从生产者,消费者以及ringBuffer3个维度去看disruptor是如何玩的。
生产者发布消息的过程从disruptor的publish方法为入口,实际调用了ringBuffer的publish方法。publish方法主要做了几件事,一是先确保能拿到后面的n个sequence;二是使用translator来填充新数据到相应的位置;三是真正的声明这些位置已经发布完成。
public void publishEvent(EventTranslator<E> translator)
{
final long sequence = sequencer.next();
translateAndPublish(translator, sequence);
}
public void publishEvents(EventTranslator<E>[] translators, int batchStartsAt, int batchSize)
{
checkBounds(translators, batchStartsAt, batchSize);
final long finalSequence = sequencer.next(batchSize);
translateAndPublishBatch(translators, batchStartsAt, batchSize, finalSequence);
}
获取生产者下一个sequence的方法,细节已经注释,实际上最终目的就是确保生产者和消费者互相不越界。
public long next(int n)
{
if (n < 1)
{
throw new IllegalArgumentException("n must be > 0");
}
//该生产者发布的最大序列号
long nextValue = this.nextValue;
//该生产者欲发布的序列号
long nextSequence = nextValue + n;
//覆盖点,即该生产者如果发布了这次的序列号,那它最终会落在哪个位置,实际上是nextSequence做了算术处理以后的值,最终目的是统一计算,否则就要去判绝对值以及取模等麻烦操作
long wrapPoint = nextSequence - bufferSize;
//所有消费者中消费得最慢那个的前一个序列号
long cachedGatingSequence = this.cachedValue;
//这里两个判断条件:一是看生产者生产是不是超过了消费者,所以判断的是覆盖点是否超过了最慢消费者;二是看消费者是否超过了当前生产者的最大序号,判断的是消费者是不是比生产者还快这种异常情况
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
{
cursor.setVolatile(nextValue); // StoreLoad fence
long minSequence;
//覆盖点是不是已经超过了最慢消费者和当前生产者序列号的最小者(这两个有点难理解,实际上就是覆盖点不能超过最慢那个生产者,也不能超过当前自身,比如一次发布超过bufferSize),gatingSequences的处理也是类似算术处理,也可以看成是相对于原点是正还是负
while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
{
//唤醒阻塞的消费者
waitStrategy.signalAllWhenBlocking();
//等上1纳秒
LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
}
//把这个最慢消费者缓存下来,以便下一次使用
this.cachedValue = minSequence;
}
//把当前序列号更新为欲发布序列号
this.nextValue = nextSequence;
return nextSequence;
}
translator由用户在调用时自己实现,其实就是预留的一个扩展点,将覆盖事件预留出来。大部分实现都是将ByteBuffer复制到Event中,参考disruptor github官方例子。
最后声明新序列号发布完成,实际上就是设置了cursor,并且通知可能阻塞的消费者,这里已经发布完新的Event了,快来消费吧。
public void publish(long sequence)
{
cursor.set(sequence);
waitStrategy.signalAllWhenBlocking();
}
以上就是单生产者的分析,MultiProducerSequencer可以类似分析。
等待策略实际上就是用来同步生产者和消费者的方法。SequenceBarrier只有一个实现ProcessingSequenceBarrier,中间就用到了WaitStrategy
BlockingWaitStrategy就是真正的加锁阻塞策略,采用的就是ReentrantLock以及Condition来控制阻塞与唤醒。
TimeoutBlockingWaitStrategy是BlockingWaitStrategy中条件带超时的版本。
LiteBlockingWaitStrategy是BlockingWaitStrategy的改进版,走了ReentrantLock和CAS轻量级锁结合的方式,不过注释说这算是实验性质的微性能改进。
BusySpinWaitStrategy算是一个自旋锁,其实现很有趣,即不停的调用Thread类的onSpinWait方法。
YieldingWaitStrategy是自旋锁的一种改进,自旋锁对于cpu来说太重,于是YieldingWaitStrategy先自旋100次,如果期间没有达成退出等待的条件,则主动让出cpu给其他线程作为惩罚。
SleepingWaitStrategy又是YieldingWaitStrategy的一种改进,SleepingWaitStrategy头100次先自旋,如果期间没有达成退出条件,则接下来100次主动让出cpu作为惩罚,如果还没有达成条件,则不再计数,每次睡1纳秒。
PhasedBackoffWaitStrategy相对复杂点,基本上是10000次自旋以后要么出让cpu,然后继续自旋,要么就采取新的等待策略。
EventProcessor是整个消费者事件处理框架,其主体就是线程的run方法,来看BatchEventProcessor,总体比较简单。
public void run()
{
if (!running.compareAndSet(false, true))
{
throw new IllegalStateException("Thread is already running");
}
sequenceBarrier.clearAlert();
notifyStart();
T event = null;
long nextSequence = sequence.get() + 1L;
try
{
while (true)
{
try
{
//等待至少一个可用的sequence出来
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
if (batchStartAware != null)
{
batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
}
//一个一个消费事件
while (nextSequence <= availableSequence)
{
//从ringBuffer里获取下一个事件
event = dataProvider.get(nextSequence);
//消费这个事件
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}
//当前的sequence推进到availableSequence
sequence.set(availableSequence);
}
catch (final TimeoutException e)
{
notifyTimeout(sequence.get());
}
catch (final AlertException ex)
{
if (!running.get())
{
break;
}
}
catch (final Throwable ex)
{
exceptionHandler.handleEventException(ex, nextSequence, event);
sequence.set(nextSequence);
nextSequence++;
}
}
}
finally
{
notifyShutdown();
running.set(false);
}
}
RingBuffer这边代码比较简单,主要就是封装了一下发布的api
abstract class RingBufferFields<E> extends RingBufferPad
{
private static final int BUFFER_PAD;
private static final long REF_ARRAY_BASE;
private static final int REF_ELEMENT_SHIFT;
private static final Unsafe UNSAFE = Util.getUnsafe();
static
{
final int scale = UNSAFE.arrayIndexScale(Object[].class);
if (4 == scale)
{
REF_ELEMENT_SHIFT = 2;
}
else if (8 == scale)
{
REF_ELEMENT_SHIFT = 3;
}
else
{
throw new IllegalStateException("Unknown pointer size");
}
// 如果scale是4, BUFFER_PAD则为32
BUFFER_PAD = 128 / scale;
// Including the buffer pad in the array base offset BUFFER_PAD<<REF_ELEMENT_SHIFT 实际上就是BUFFER_PAD * scale,最终算出来REF_ARRAY_BASE就是基地址
REF_ARRAY_BASE = UNSAFE.arrayBaseOffset(Object[].class) + (BUFFER_PAD << REF_ELEMENT_SHIFT);
}
private final long indexMask;
private final Object[] entries;
protected final int bufferSize;
protected final Sequencer sequencer;
RingBufferFields(
EventFactory<E> eventFactory,
Sequencer sequencer)
{
this.sequencer = sequencer;
this.bufferSize = sequencer.getBufferSize();
if (bufferSize < 1)
{
throw new IllegalArgumentException("bufferSize must not be less than 1");
}
if (Integer.bitCount(bufferSize) != 1)
{
throw new IllegalArgumentException("bufferSize must be a power of 2");
}
this.indexMask = bufferSize - 1;
//bufferSize再加两倍的BUFFER_PAD大小,BUFFER_PAD分别在头尾
this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
fill(eventFactory);
}
private void fill(EventFactory<E> eventFactory)
{
for (int i = 0; i < bufferSize; i++)
{
//初始化整个buffer
entries[BUFFER_PAD + i] = eventFactory.newInstance();
}
}
@SuppressWarnings("unchecked")
protected final E elementAt(long sequence)
{
//sequence & indexMask即对sequence取模, 最终算出来的就是基地址+偏移地址
return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));
}
}
主体代码基本如上。其他代码可以自行参考。
下面介绍下一些常见问题。
disruptor原本就是事件驱动的设计,其整个架构跟普通的多线程很不一样。比如一种用法,将disruptor作为业务处理,中间带I/O处理,这种玩法比多线程还慢;相反,如果将disruptor做业务处理,需要I/O时采用nio异步调用,不阻塞disruptor消费者线程,等到I/O异步调用回来后在回调方法中将后续处理重新塞到disruptor队列中,可以看出来,这是典型的事件处理架构,确实能在时间上占据优势,加上ringBuffer固有的几项性能优化,能让disruptor发挥最大功效。
这个问题参考之前的一篇文章 disruptor框架为什么这么强大
多生产者的消息写入实际上是通过availableBuffer与消费者来同步最后一个生产者写入的位置,这样,消费者永远不能超越最慢的那个生产者。见如下代码段
private void setAvailableBufferValue(int index, int flag)
{
long bufferAddress = (index * SCALE) + BASE;
UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);
}
@Override
public boolean isAvailable(long sequence)
{
int index = calculateIndex(sequence);
int flag = calculateAvailabilityFlag(sequence);
long bufferAddress = (index * SCALE) + BASE;
return UNSAFE.getIntVolatile(availableBuffer, bufferAddress) == flag;
}
@Override
public long getHighestPublishedSequence(long lowerBound, long availableSequence)
{
for (long sequence = lowerBound; sequence <= availableSequence; sequence++)
{
if (!isAvailable(sequence))
{
return sequence - 1;
}
}
return availableSequence;
}
可以参考这篇文章 RingBuffer多生产者写入
4. 除了多个消费者重复处理生产者发送的消息,是否可以多消费者不重复处理生产者发送的消息,即各处理各的?
若要多消费者重复处理生产者的消息,则使用disruptor.handleEventsWith方法将消费者传入;而若要消费者不重复的处理生产者的消息,则使用disruptor.handleEventsWithWorkerPool方法将消费者传入。