转载

追踪解析 Disruptor 源码

零 前期准备

0 FBI WARNING

文章异常啰嗦且绕弯。

1 版本

Disruptor 版本 : Disruptor 3.4.2

IDE : idea 2018.3

JDK 版本 : OpenJDK 11.0.1

2 Disruptor 简介

高性能线程间消息队列框架 Disruptor,是金融与游戏领域的常用开发组件之一,也是 java 日志框架和流处理框架底层的常用依赖。

3 Demo

Disruptor 的 github 主页有非常详细的 quick start demo,本文依照此 demo 做追踪的模板(做了很小的改动)。

另外,对于官方提供的 jdk8 lambda 简化版 demo 暂不做讨论。

import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory;
import java.nio.ByteBuffer;

public class LongEventMain {

    //main 方法,启动入口
    public static void main(String[] args) throws Exception {

        //在该框架中,所有的 task 的包装类被称为 Event,EventFactory 则是 Event 的生产者
        LongEventFactory factory = new LongEventFactory();

        //RingBuffer 的大小,数字为字节数
        //RingBuffer 是框架启动器内部的缓存区,用来存储 event 内的 task 数据
        int bufferSize = 1024;

        //创建一个 Disruptor 启动器,其中 DaemonThreadFactory 是一个线程工厂的实现类
        Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE);

        
        //该框架本质上是 生产-消费 设计模式的应用。所有的消费者被冠名为 handler
        //handleEventsWith(...) 方法会在启动器中注册 handler
        //此处的参数是不定数量的,可以有多个消费者,每个消费者都可以获取 Event
        disruptor.handleEventsWith(new LongEventHandler("handler1"),new LongEventHandler("handler2"));

        //启动器开始执行,并获取其内部的缓存区
        RingBuffer<LongEvent> ringBuffer = disruptor.start();

        //创建一个生产者,负责往缓存区内写入数据
        LongEventProducer producer = new LongEventProducer(ringBuffer);

        //官方 demo 中使用了 ByteBuffer 来方便操作,其实非必须
        ByteBuffer bb = ByteBuffer.allocate(8);

        for (long l = 0; true; l++) {
            //将变量 l 作为一个 long 类型的数存入 ByteBuffer 中
            bb.putLong(0, l);
            //将 ByteBuffer 传入生产者的相关方法中,该方法会负责将 ByteBuffer 中的数据写入 RingBuffer
            producer.onData(bb);
            //线程休眠
            Thread.sleep(1000);
        }
    }
}

//Event 类,本质上是数据的封装,是生产者和消费者之间进行数据传递的介质
class LongEvent {
    private long value;

    public void set(long value) {
        this.value = value;
    }

    public long get() {
        return value;
    }
}

//Event 的生产工厂类,必须实现 Disruptor 自带的 EventFactory 接口
class LongEventFactory implements EventFactory<LongEvent> {

    @Override
    public LongEvent newInstance() {
        return new LongEvent();
    }
}

//消费者,必须实现 Disruptor 自带的 EventHandler 接口
class LongEventHandler implements EventHandler<LongEvent> {

    private String handlerName;

    public LongEventHandler(String handlerName){
        this.handlerName = handlerName;
    }

    //此方法为最终的消费 Event 的方法
    @Override
    public void onEvent(LongEvent event, long sequence, boolean endOfBatch) {
        System.out.println("Event " + handlerName + " : " + event.get());
    }
}

//生产者,主要负责往 RingBuffer 中写入数据
//生产者类在框架中并非必须,但是一般情况下都会做一定程度的封装
class LongEventProducer {
    private final RingBuffer<LongEvent> ringBuffer;

    //生产者的构造器负责获取并存储启动器中的 RingBuffer
    public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    public void onData(ByteBuffer bb) {
        //sequence 是 RingBuffer 中的一个数据块,类似于一个数据地址
        long sequence = ringBuffer.next();
        try {
            //用数据地址去获取到一个 Event 事件类实例
            LongEvent event = ringBuffer.get(sequence);
            //在实例中存入 ByteBuffer 中的数据
            event.set(bb.getLong(0));
        } finally {
            //发布该数据块,此时消费者们都可以看到该数据块了,可以进行消费
            ringBuffer.publish(sequence);
        }
    }
}

一 DaemonThreadFactory

在开始正式追踪代码之前有必要先来理解 DaemonThreadFactory。这是 Disruptor 自身携带的线程工厂类:

public enum DaemonThreadFactory implements ThreadFactory{
    
    //线程工厂使用枚举实现单例模式
    INSTANCE;

    @Override
    public Thread newThread(final Runnable r){
        Thread t = new Thread(r);
        //此处创建的线程是守护线程
        t.setDaemon(true);
        return t;
    }
}

二 Disruptor

本 part 主要追踪 demo 中 Disruptor 相关的代码:

Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE);
disruptor.handleEventsWith(new LongEventHandler("handler1"),new LongEventHandler("handler2"));
RingBuffer<LongEvent> ringBuffer = disruptor.start();

1 disruptor 的创建

来看下方代码:

Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE);

追踪 Disruptor 的构造器:

//step 1
//Disruptor.class
public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final ThreadFactory threadFactory){
    //RingBuffer.createMultiProducer(...) 方法会返回一个 RingBuffer
    //BasicExecutor 是线程和线程工厂的封装类
    this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), new BasicExecutor(threadFactory));
}

//step 2
//Disruptor.class
private Disruptor(final RingBuffer<T> ringBuffer, final Executor executor){
    //存入 RingBuffer 和 Executor
    this.ringBuffer = ringBuffer;
    this.executor = executor;
}

但是实际上 Disruptor 提供了很多的构造器,其中还有一个较高配置权限的:

//Disruptor.class
public Disruptor(final EventFactory<T> eventFactory,final int ringBufferSize,
                final ThreadFactory threadFactory,final ProducerType producerType,
                final WaitStrategy waitStrategy){
    //解释传入的参数:
    //eventFactory 是 Event 类的创建工厂
    //ringBufferSize 是 RingBuffer 的字节数大小
    //threadFactory 是线程工厂
    //ProducerType 是生产者的类型,分为单生产者类型(single)和多生产者类型(multi),默认为 multi
    //waitStrategy 是框架中的一个接口,表示等待策略,默认为 BlockingWaitStrategy(阻塞等待),WaitStrategy 的可讲内容较多,在后头开一个单独 part
    this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),new BasicExecutor(threadFactory));
}

先来看 ProducerType:

public enum ProducerType{
    SINGLE,
    MULTI
}

仅仅只是个标记,不多赘述。

1.1 BasicExecutor

BasicExecutor 是 Executor 的实现类,其内部维护着一个线程工厂和一个线程队列,核心方法为 execute(...):

//BasicExecutor.class
public void execute(Runnable command){
    
    //使用线程工厂创建一个线程,此处的 factory 即为 DaemonThreadFactory
    final Thread thread = factory.newThread(command);
    //有效性验证
    if (null == thread){
        throw new RuntimeException("Failed to create thread to run: " + command);
    }

    //开启线程
    thread.start();
    //threads 是一个 ConcurrentLinkedQueue<Thread> 类型的变量,用来存储线程
    threads.add(thread);
}

1.2 RingBuffer 的创建

再来追踪一下 RingBuffer 的创建:

//RingBuffer.class
public static <E> RingBuffer<E> create(ProducerType producerType,EventFactory<E> factory,
                                        int bufferSize,WaitStrategy waitStrategy){
    //此处根据 ProducerType 进行分发操作
    switch (producerType){
        case SINGLE:
            //创建单消费者的 producer
            return createSingleProducer(factory, bufferSize, waitStrategy);
        case MULTI:
            //创建多消费者的 producer
            return createMultiProducer(factory, bufferSize, waitStrategy);
        default:
            //抛出错误
            throw new IllegalStateException(producerType.toString());
    }
}

本质上这两种模式的 RingBuffer 的创建差距并不大,此处追踪 createMultiProducer(...) 方法:

//step 1
//RingBuffer.class
public static <E> RingBuffer<E> createMultiProducer(EventFactory<E> factory,int bufferSize,WaitStrategy waitStrategy){
    //MultiProducerSequencer 是 RingBuffer 中用来在生产者和消费者之间传递数据的组件
    //sequencer 是 RingBuffer 中的核心组件,是区别 SINGLE 和 MULTI 的关键,后文会继续理解
    MultiProducerSequencer sequencer = new MultiProducerSequencer(bufferSize, waitStrategy);
    //自身构造器
    return new RingBuffer<E>(factory, sequencer);
}

//step 2
//RingBuffer.class
RingBuffer(EventFactory<E> eventFactory,Sequencer sequencer){
    //调用父类 RingBufferFields 的构造方法
    super(eventFactory, sequencer);
}

//step 3
//RingBufferFields.class
RingBufferFields(EventFactory<E> eventFactory,Sequencer sequencer){
    //此处为 MultiProducerSequencer
    this.sequencer = sequencer;
    //获取使用者自定义的 bufferSize 并记录下来
    this.bufferSize = sequencer.getBufferSize();

    //bufferSize 的有效性验证
    if (bufferSize < 1){
        throw new IllegalArgumentException("bufferSize must not be less than 1");
    }

    if (Integer.bitCount(bufferSize) != 1){
        throw new IllegalArgumentException("bufferSize must be a power of 2");
    }

    //根据 bufferSize 确定序列号最大值,因为从 0 开始所以要减一
    this.indexMask = bufferSize - 1;
    //entries 是一个 Object 数组,用于存放 Event
    //BUFFER_PAD 是对整个缓冲区的填充
    this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
    //fill(...) 方法会重新设置 entries
    fill(eventFactory);
}

//step 4
//RingBuffer.class
private void fill(EventFactory<E> eventFactory){
    for (int i = 0; i < bufferSize; i++){
        //遍历数组进行 Event 的填充
        entries[BUFFER_PAD + i] = eventFactory.newInstance();
    }
}

2 消费者的注册

来看下方代码:

disruptor.handleEventsWith(new LongEventHandler("handler1"),new LongEventHandler("handler2"));

追踪 handleEventsWith(...) 方法:

//step 1
//Disruptor.class
public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers){
    //Sequence 可以看做是 long 型的封装类
    //此处的第一个参数是前置关卡,在处理 handler 之前会进行处理的事件
    //handlers 即为消费者
    return createEventProcessors(new Sequence[0], handlers);
}

//step 2
//Disruptor.class
EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences,final EventHandler<? super T>[] eventHandlers){
    
    //Disruptor 中有一个 AtomicBoolean 类型的变量 started
    //checkNotStarted() 会检查该变量的值是否为 true,如果是的话就证明已经启动了,则抛出异常
    checkNotStarted();

    //processorSequences 是每个消费者对应的执行器的序列号
    final Sequence[] processorSequences = new Sequence[eventHandlers.length];

    //此处返回的 barrier 可以看做是上文 MultiProducerSequencer 的封装增强
    final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);

    for (int i = 0,eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++){
        final EventHandler<? super T> eventHandler = eventHandlers[i];

        //batchEventProcessor 是存储了消费者和生产者的执行器,实现了 Runnable 接口,内部会不断循环去接收并处理事件
        final BatchEventProcessor<T> batchEventProcessor =
            new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);

        //exceptionHandler 是用于处理错误的消费者组件
        if (exceptionHandler != null){
            batchEventProcessor.setExceptionHandler(exceptionHandler);
        }

        //consumerRepository 可以看做是消费者的集合封装
        //consumerRepository 会将传入的三个参数包装成 EventProcessorInfo 并储存在集合和 map 里
        consumerRepository.add(batchEventProcessor, eventHandler, barrier);

        //记录下消费者对应的执行器的序列号
        processorSequences[i] = batchEventProcessor.getSequence();
    }

    //处理一些前置事件,在本例中没有前置事件存在
    updateGatingSequencesForNextInChain(barrierSequences, processorSequences);

    return new EventHandlerGroup<>(this, consumerRepository, processorSequences);
}

2.1 newBarrier

来追踪一下 ringBuffer.newBarrier(...) 方法:

//step 1
//RingBuffer.class
public SequenceBarrier newBarrier(Sequence... sequencesToTrack){
    //在本例中,此处的 sequencesToTrack 是 Sequence[0]
    //此处的 sequencer 即为 MultiProducerSequencer
    return sequencer.newBarrier(sequencesToTrack);
}

//step 2
//AbstractSequencer.class
public SequenceBarrier newBarrier(Sequence... sequencesToTrack){
    //此方法被定义在 MultiProducerSequencer 的父类 AbstractSequencer 中
    //cursor 是在 AbstractSequencer 中实例化的一个 Sequence 类型对象,是 MultiProducerSequencer 的序列号
    return new ProcessingSequenceBarrier(this, waitStrategy, cursor, sequencesToTrack);
}

//step 3
//ProcessingSequenceBarrier.class
ProcessingSequenceBarrier(final Sequencer sequencer,final WaitStrategy waitStrategy,
                            final Sequence cursorSequence,final Sequence[] dependentSequences){
    //即为 Disruptor 启动器中的 MultiProducerSequencer
    this.sequencer = sequencer;
    //即为 Disruptor 启动器中的阻塞策略
    this.waitStrategy = waitStrategy;  
    //上述方法的 cursor
    this.cursorSequence = cursorSequence;
    if (0 == dependentSequences.length){
        //此处的 dependentSequences 是长度是 0,所以此处
        dependentSequence = cursorSequence;
    }else{
        dependentSequence = new FixedSequenceGroup(dependentSequences);
    }
}

需要注意的是,此处的 sequencer 已经被抽象成了 SingleProducerSequencer 和 MultiProducerSequencer 的共同实现接口 Sequencer。

所以对于 SingleProducerSequencer 来说,这个流程也是没有区别的。

2.2 updateGatingSequencesForNextInChain

回到上述代码:

//此处的 barrierSequences 是一个 Sequence[0] 数组,processorSequences 是所有消费者的序列号集合
updateGatingSequencesForNextInChain(barrierSequences, processorSequences);

追踪该方法的实现:

//step 1
//Disruptor.class
private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences){
    
    //processorSequences.length 大于 0 意味着消费者数量大于 0
    if (processorSequences.length > 0){
        ringBuffer.addGatingSequences(processorSequences);

        //barrierSequences 是前置事件的集合
        //由于此处的 barrierSequences 是长度为 0 的 Sequence 数组,即没有前置事件,所以此处不会进入循环,忽略
        for (final Sequence barrierSequence : barrierSequences){
            ringBuffer.removeGatingSequence(barrierSequence);
        }
        //unMarkEventProcessorsAsEndOfChain(...) 方法也是处理 barrierSequences 的,忽略
        consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
    }
}

addGatingSequences

追踪 ringBuffer.addGatingSequences(...) 方法:

//step 1
//RingBuffer.class
public void addGatingSequences(Sequence... gatingSequences){
    //sequencer 为 MultiProducerSequencer
    sequencer.addGatingSequences(gatingSequences);
}

//step 2
//AbstractSequencer.class
public final void addGatingSequences(Sequence... gatingSequences){
    //此处的 SEQUENCE_UPDATER 是一个 AtomicReferenceFieldUpdaterImpl 类型的变量,用于 CAS 操作 gatingSequences
    SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences);
}

//step 3
//SequenceGroups.class
static <T> void addSequences(final T holder,final AtomicReferenceFieldUpdater<T, Sequence[]> updater,
                            final Cursored cursor,final Sequence... sequencesToAdd){
    long cursorSequence;
    Sequence[] updatedSequences;
    Sequence[] currentSequences;

    do{
        //此处的 holder 即为 MultiProducerSequencer,此处获取其内部的 gatingSequences 变量
        currentSequences = updater.get(holder);
        //此处为 copyOf(...) 方法为 java.util.Arrays.copyOf(...) 方法,用于将 currentSequences 复制一份
        updatedSequences = copyOf(currentSequences, currentSequences.length + sequencesToAdd.length);
        //此处的 cursor 即为 MultiProducerSequencer,getCursor() 方法会获取其的序列号
        cursorSequence = cursor.getCursor();

        int index = currentSequences.length;
        //此处的 sequencesToAdd 是之前消费者的序列号集合,更新 sequencesToAdd 中的每个序列号封装
        //将 MultiProducerSequencer 的序列号注册进去,并填充到新集合的后面一半中
        for (Sequence sequence : sequencesToAdd){
            sequence.set(cursorSequence);
            updatedSequences[index++] = sequence;
        }
    }while (!updater.compareAndSet(holder, currentSequences, updatedSequences));
    //此处的 while 会死循环 CAS 操作直到更新成功

    //在此获取 MultiProducerSequencer 的序列号,更新到 sequencesToAdd 的每个序列号封装类中
    cursorSequence = cursor.getCursor();
    for (Sequence sequence : sequencesToAdd){
        sequence.set(cursorSequence);
    }
}

3 Disruptor 的启动

来看下方代码:

disruptor.start();

追踪 start(...) 方法:

//Disruptor.class
public RingBuffer<T> start(){
    //确认该 Disruptor 没有启动
    checkOnlyStartedOnce();
    //此处的 consumerInfo 是 EventProcessorInfo 类型的变量
    for (final ConsumerInfo consumerInfo : consumerRepository){
        consumerInfo.start(executor);
    }

    return ringBuffer;
}

先来看 checkOnlyStartedOnce() 方法:

//Disruptor.class
private void checkOnlyStartedOnce(){
    //如果在调用该 CAS 方法之前已经为 true 了,会抛出错误
    //其实就是确保在调用该方法之前还处于未开启的状态
    if (!started.compareAndSet(false, true)){
        throw new IllegalStateException("Disruptor.start() must only be called once.");
    }
}

再来追踪 EventProcessorInfo 的 start(...) 方法:

//EventProcessorInfo.class
public void start(final Executor executor){
    //此处的 executor 即为 BasicExecutor
    executor.execute(eventprocessor);
}

所以本质上 Disruptor 的启动就是开启 BasicExecutor,借此启动线程。

3.1 BatchEventProcessor

上述代码中启动线程的时候会传入 eventprocessor 对象作为 task 去启动消费者。eventprocessor 对象本质上是上文中提到过的 BatchEventProcessor。

BatchEventProcessor 能够被传入 execute(...) 方法,证明其实现了 Runnable 接口:

//step 1
//BatchEventProcessor.class
@Override
public void run(){
    //running 是一个定义在 BatchEventProcessor 中的 AtomicInteger 类型的变量
    //CAS 操作,先判断 running 的值是否等于 IDLE,如果是的话就修改成 RUNNING,并返回 true
    //IDLE = 1,RUNNING = 2,皆为 int 类型的常量
    if (running.compareAndSet(IDLE, RUNNING)){
        
        //此处修改 sequenceBarrier 中 alert 变量的状态值,清除掉中断状态
        sequenceBarrier.clearAlert();

        //如果传入的消费者实现了 LifecycleAware 接口,就会在 notifyStart() 方法中去执行相关方法
        //LifecycleAware 中定义了 onStart() 和 onShutdown() 方法,会分别在消费者真正执行之前和关闭之前执行一次
        //执行 LifecycleAware 的 onStart() 方法
        notifyStart();
        try{
            //如果 running 是 RUNNING 状态,就会进入死循环
            if (running.get() == RUNNING){
                //核心方法
                processEvents();
            }
        }finally{
            //执行 LifecycleAware 的 onShutdown() 方法
            notifyShutdown();
            //切换 running 的状态值
            running.set(IDLE);
        }
    }else{
        if (running.get() == RUNNING){
            throw new IllegalStateException("Thread is already running");
        }else{
            earlyExit();
        }
    }
}

//step 2
//BatchEventProcessor.class
private void processEvents(){
    T event = null;
    //此处的 sequence 记录着当前消费者已经处理过的事件的编号,初始化的时候为 -1,所以 nextSequence 初始为 0,每次加一
    //nextSequence 是当前消费者下一项准备处理的事件的编号
    long nextSequence = sequence.get() + 1L;

    //死循环
    while (true){
        try{
            //当没有事件发生的时候,消费者所在的线程会在此等待,具体的实现依照使用者设置的等待策略的不同而不同
            //本例中使用的是 BlockingWaitStrategy,所以会在此阻塞直到出现了事件
            //返回的 availableSequence 是最新的事件的编号,在任务量较小的情况下和 nextSequence 数值相同,在任务量较大的情况下小于 nextSequence
            //等待策略留在后头展开
            final long availableSequence = sequenceBarrier.waitFor(nextSequence);

            if (batchStartAware != null){
                batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
            }

            //nextSequence 大于 availableSequence 的情况理论上不会出现
            while (nextSequence <= availableSequence){
                //dataProvider 就是之前初始化的 RingBuffer,RingBuffer 在此处会去获取当前编号的 Event
                event = dataProvider.get(nextSequence);
                //onEvent(...) 是 EventHandler 接口定义的方法,是消费者消费 Event 的最重要方法,方法体由使用者进行定义
                eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                //编号自增
                nextSequence++;
            }

            //在消费完当前的所有事件之后,记录下事件编号
            sequence.set(availableSequence);
        }catch (final TimeoutException e){
            //如果消费者实现了 TimeoutHandler 接口,就可以在这里处理超时问题
            notifyTimeout(sequence.get());
        }catch (final AlertException ex){
            //running 的状态值非 RUNNING,就会退出死循环
            if (running.get() != RUNNING){
                break;
            }
        }catch (final Throwable ex){
            //如果当前的消费者实现了 ExceptionHandler 接口的话,可以在此处进行错误处理
            exceptionHandler.handleEventException(ex, nextSequence, event);
            sequence.set(nextSequence);
            nextSequence++;
        }
    }
}

3.2 WaitStrategy

回到上述代码的以下这句:

final long availableSequence = sequenceBarrier.waitFor(nextSequence);

追踪一下 waitFor(...) 方法:

//ProcessingSequenceBarrier.class
public long waitFor(final long sequence) throws AlertException, InterruptedException, TimeoutException{
    
    //如果变量 alert 为 true 的话会抛出错误
    checkAlert();

    //调用等待策略的相关方法
    //返回最新的事件的编号
    long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);

    //如果当前可用的最新事件编号小于传入的 sequence,就直接返回可用编号即可
    if (availableSequence < sequence){
        return availableSequence;
    }

    //getHighestPublishedSequence(...) 方法会判断最大的可用的事件编号
    return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}

等待策略的所有实现类都实现了 WaitStrategy 接口:

public interface WaitStrategy{
    //休眠方法
    long waitFor(long sequence, Sequence cursor, Sequence dependentSequence, SequenceBarrier barrier)
        throws AlertException, InterruptedException, TimeoutException;
    //唤醒方法
    void signalAllWhenBlocking();
}

Disruptor 自带的策略中,常用的有以下几种:

阻塞策略  BlockingWaitStrategy:默认策略,没有获取到任务的情况下线程会进入等待状态。cpu 消耗少,但是延迟高。
阻塞限时策略  TimeoutBlockingWaitStrategy:相对于BlockingWaitStrategy来说,设置了等待时间,超过后抛异常。
自旋策略  BusySpinWaitStrategy:线程一直自旋等待。cpu 占用高,延迟低.
Yield 策略 YieldingWaitStrategy:尝试自旋 100 次,然后调用 Thread.yield() 让出 cpu。cpu 占用高,延迟低。
分段策略  SleepingWaitStrategy:尝试自旋 100 此,然后调用 Thread.yield() 100 次,如果经过这两百次的操作还未获取到任务,就会尝试阶段性挂起自身线程。此种方式是对 cpu 占用和延迟的一种平衡,性能不太稳定。

还有几种譬如 PhasedBackoffWaitStrategy 和 LiteBlockingWaitStrategy 等,不多介绍。

详细看一下 BlockingWaitStrategy 的实现:

public final class BlockingWaitStrategy implements WaitStrategy{
    //重入锁
    private final Lock lock = new ReentrantLock();
    //Condition 用来控制线程的休眠和唤醒
    private final Condition processorNotifyCondition = lock.newCondition();

    @Override
    public long waitFor(long sequence, Sequence cursorSequence, 
                        Sequence dependentSequence, SequenceBarrier barrier)
                        throws AlertException, InterruptedException{
        
        long availableSequence;
        if (cursorSequence.get() < sequence){
            //上锁
            lock.lock();
            try{
                while (cursorSequence.get() < sequence){
                    //检查线程是否中断了,如果已经中断了就会抛出异常
                    barrier.checkAlert();
                    //休眠线程
                    processorNotifyCondition.await();
                }
            }finally{
                //解锁
                lock.unlock();
            }
        }

        //生产者进度小于消费者的消费进度,此循环进行等待
        //正常情况下都会在上方阻塞,不会进入该循环
        while ((availableSequence = dependentSequence.get()) < sequence){
            barrier.checkAlert();
            ThreadHints.onSpinWait();
        }

        return availableSequence;
    }

    @Override
    public void signalAllWhenBlocking(){
        lock.lock();
        try{
            //用 Condition 唤醒全部的线程
            processorNotifyCondition.signalAll();
        }finally{
            lock.unlock();
        }
    }

    //toString() 方法,忽略
    @Override
    public String toString(){
        return "BlockingWaitStrategy{" +
            "processorNotifyCondition=" + processorNotifyCondition +
            '}';
    }
}

3.3 DataProvider

回到上述代码的以下这句:

event = dataProvider.get(nextSequence);

dataProvider 是一个 DataProvider 类型的变量。DataProvider 本质上是一个 Disruptor 内的接口:

public interface DataProvider<T>{
    T get(long sequence);
}

其存在唯一实现类 RingBuffer。所以 get(...) 方法也在 RingBuffer 中:

//step 1
//RingBuffer.class
@Override
public E get(long sequence){
    //elementAt(...) 方法定义在 RingBuffer 的抽象父类 RingBufferFields 中
    return elementAt(sequence);
}

//step 2
//RingBufferFields.class
protected final E elementAt(long sequence){
    //调用 UNSAFE 的相关方法,通过地址去直接获取
    //entries 在上文代码中申请了一系列地址连续的内存
    //REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT) 是一个很巧妙的算法,结果永远只会在申请下来的内存中循环
    return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));
}

由此可知,Disruptor 中的所有的事件都非存储在虚拟机中,而是储存在虚拟机外,由 Unsafe 类直接调用。

Unsafe 具有 "调用内存对象很快,但是申请内存块很慢" 的特性,所以也就可以解释为什么在初始化的时候要一次性将储存 Event 的数组进行逐个初始化了(代码在上述 1.2 小节的 step 4 中)。

有一个注意点,entries 上的元素实际上是在 jvm 管辖范围内的,并不一定需要使用 Unsafe 去调用,这里只是为了更高的性能。

三 Event 的产生

在开头的 demo 中,可以看到 LongEventProducer 中有一个核心方法:

//LongEventProducer.class
public void onData(ByteBuffer bb) {
    //sequence 是 RingBuffer 中的一个数据块,类似于一个数据地址
    long sequence = ringBuffer.next();
    try {
        //用数据地址去获取到一个 Event 事件类实例
        LongEvent event = ringBuffer.get(sequence);
        //在实例中存入 ByteBuffer 中的数据
        event.set(bb.getLong(0));
    } finally {
        //发布该数据块,此时消费者们都可以看到该数据块了,可以进行消费
        ringBuffer.publish(sequence);
    }
}

这个方法内通过调用 ringBuffer.next() 方法获取数组内对象的地址,然后通过 ringBuffer.get(...) 方法获取对象。

在 finally 代码块中调用 ringBuffer.publish(...) 方法去发布该信息。

1 next

回到上述代码的以下这句:

long sequence = ringBuffer.next();

追踪 next() 方法:

//step 1
//RingBuffer.class
@Override
public long next(){
    //调用 RingBuffer 内的 MultiProducerSequencer 的相关方法
    return sequencer.next();
}

//step 2
//MultiProducerSequencer.class
@Override
public long next(){
    //调用自身的相关方法
    return next(1);
}

//step 3
//MultiProducerSequencer.class
@Override
public long next(int n){
    //参数有效性验证,此处 n = 1
    if (n < 1){
        throw new IllegalArgumentException("n must be > 0");
    }

    long current;
    long next;

    //死循环
    do{
        //current 是当前最新的事件编号
        current = cursor.get();
        //此处为 current + 1,用作下一个事件的编号
        next = current + n;

        //wrapPoint 是事件编号和数组大小的差
        long wrapPoint = next - bufferSize;

        //gatingSequenceCache 的设计很巧妙,它是一个 Sequence 类型的变量,可以看做是一个 long 整数
        //gatingSequenceCache 的存在意义是每隔一段时间去检查一次消费者的处理进度
        //gatingSequenceCache 在每次检查进度的时候都会更新成 "当前处理最慢的消费者已经处理完成的事件编号"
        //处理逻辑在下方 if 判断中
        long cachedGatingSequence = gatingSequenceCache.get();

        //cachedGatingSequence > current 的情况就不会发生,因为 cachedGatingSequence 是消费者处理进度,current 是目前的事件总编号,所以最多相等
        //在消费者算力充足的情况下,cachedGatingSequence 会和 current 相等
        //wrapPoint > cachedGatingSequence 的情况,在极端情况下可能是因为生产者的速度太快了,已经远超过最慢的那个消费者,超过了 "一圈"(即 bufferSize 的大小)

        //此处可以这么理解,由于 RingBuffer 内数组的大小是有限的,如果事件生产的多了,就会覆盖掉最开始的几个事件
        //但是如果消费者的进度没有跟上,来不及消费就被覆盖了,就造成了 bug,此处即为抑制策略
        if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current){

            //getMinimumSequence(...) 方法会获取当前处理事件最慢的那个消费者的处理位置
            long gatingSequence = Util.getMinimumSequence(gatingSequences, current);

            //wrapPoint - gatingSequence = next - bufferSize - gatingSequence >0
            //即 next > bufferSize + gatingSequence,落后了 "一圈"
            if (wrapPoint > gatingSequence){
                //线程挂起 1 纳秒,然后跳过本次循环进行下一次循环
                //此处会陷入死循环,阻塞掉生产者,去等待消费者的进度
                LockSupport.parkNanos(1);
                continue;
            }

            //跳出上述循环之后在这里更新 gatingSequenceCache 的值
            gatingSequenceCache.set(gatingSequence);
        }else if (cursor.compareAndSet(current, next)){
            //如果消费者的进度正常,那么会在此用 CAS 操作更新 cursor 的值,并且跳出 while 循环
            break;
        }
    }while (true);
    
    //返回
    return next;
}

在线程池(比如笔者比较了解的 ThreadPoolExecutor)的实现中,对于 task 过多,溢出等待队列的情况,一般会有一种策略去应对。在 ThreadPoolExecutor 中,默认的策略为抛出错误,直接终止程序。

在 Disruptor 中,其实 RingBuffer 就类似一个等待队列,溢出策略则是暂停 task 的产生,等待线程池去执行。

【此处仅为类比,不能简单的把 Disruptor 想成是一个线程池】

2 publish

ringBuffer.publish(...) 是事件发布的核心方法:

//step 1
//RingBuffer.class
@Override
public void publish(long sequence){
    sequencer.publish(sequence);
}

//step 2
//MultiProducerSequencer.class
@Override
public void publish(final long sequence){
    //此处更新数据
    setAvailable(sequence);
    //此处调用等待策略的 signalAllWhenBlocking() 方法唤醒所有等待的线程
    //具体实现依照 waitStrategy 的不同而不同
    waitStrategy.signalAllWhenBlocking();
}

//step 3
//MultiProducerSequencer.class
private void setAvailable(final long sequence){
    //calculateAvailabilityFlag(sequence) 可以简单理解为是计算出的圈数,即 (sequence / bufferSize)
    //calculateIndex(sequence) 会计算出新的 sequence 对应组中的哪一个位置,即 (sequence % bufferSize)
    setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence));
}

//step 4
//MultiProducerSequencer.class
private void setAvailableBufferValue(int index, int flag){
    //SCALE 是本机 Object[] 引用的大小,一般为 4
    long bufferAddress = (index * SCALE) + BASE;
    //使用 Unsafe 更新元素
    //availableBuffer 是一个 int 数组,大小为 bufferSize,即和 entries 相同
    //Unsafe.putOrderedInt(...) 会将 availableBuffer 的指定位置(bufferAddress)的元素修改成 flag
    UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);
}

四 其它补充

笔者在项目过程中还看到了 Disruptor 的另一种用法,即和线程池一样,每个 Event 只被一个消费者取用一次,然后就会失效。

具体的改造是将消费者继承的 EventHandler 替换成 WorkHandler:

class LongEventHandler2 implements WorkHandler<LongEvent>{

    private String handlerName;

    public LongEventHandler2(String handlerName){
        this.handlerName = handlerName;
    }

    @Override
    public void onEvent(LongEvent event) throws Exception {
        System.out.println("Event " + handlerName + " : " + event.get());
    }
}

然后在 Disruptor 启动器注册消费者的时候不再使用 handleEventsWith(...) 方法,而是改用 handleEventsWithWorkerPool(...) 方法:

LongEventHandler2[] workers = new LongEventHandler2[5];
for (int i = 0; i < workers.length; i++) {
    workers[i] = new LongEventHandler2("worker " + i);
}

//此方法会注册线程池类的消费者
disruptor.handleEventsWithWorkerPool(workers);

具体源码不做追踪了,笔者在项目中做过尝试,挺好用的。

五 一点唠叨

· Disruptor 的封装很薄(比起 Netty、Spring 之类的重量级框架),调用链路都相对较短

· Disruptor 的环装缓存区(RingBuffer)的很多概念还有待理解

· 对于笔者这样的数学苦手来说看底层算法代码略头疼

· 仅为个人的学习笔记,可能存在错误或者表述不清的地方,有缘补充

原文  https://segmentfault.com/a/1190000018150581
正文到此结束
Loading...