public class DisruptorExample {
public static void main(String[] args) throws InterruptedException {
// RingBuffer大小,必须是2的N次方
int bufferSize = 1024;
// 构建Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);
// 注册事件处理器
disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("E: " + event));
// 启动Disruptor
disruptor.start();
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
// 生产Event
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++) {
bb.putLong(0, l);
// 生产者生产消息
ringBuffer.publishEvent((event, sequence, buffer) -> event.setValue(buffer.getLong(0)), bb);
TimeUnit.SECONDS.sleep(1);
}
}
}
@Data
class LongEvent {
private long value;
}
// com.lmax.disruptor.RingBufferFields
for (int i = 0; i < bufferSize; i++)
{
entries[BUFFER_PAD + i] = eventFactory.newInstance();
}
/** The queued items */ final Object[] items; /** items index for next take, poll, peek or remove */ int takeIndex; /** items index for next put, offer, or add */ int putIndex; /** Number of elements in the queue */ int count;
// 前:填充56字节
class LhsPadding
{
protected long p1, p2, p3, p4, p5, p6, p7;
}
class Value extends LhsPadding
{
protected volatile long value;
}
// 后:填充56字节
class RhsPadding extends Value
{
protected long p9, p10, p11, p12, p13, p14, p15;
}
public class Sequence extends RhsPadding
{
}
// com.lmax.disruptor.MultiProducerSequencer
public long next(int n)
{
if (n < 1)
{
throw new IllegalArgumentException("n must be > 0");
}
long current;
long next;
// 生产者获取n个写入位置
do
{
// current相当于入队索引,表示上次生产到这里
current = cursor.get();
// 目标是再生产n个
next = current + n;
// 减掉一个循环
long wrapPoint = next - bufferSize;
// 获取上一次的最小消费位置
long cachedGatingSequence = gatingSequenceCache.get();
// 没有足够的空余位置
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
{
// 重新计算所有消费者里面的最小值位置
long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
// 仍然没有足够的空余位置,出让CPU使用权,重新执行下一循环
if (wrapPoint > gatingSequence)
{
LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
continue;
}
// 重新设置上一次的最小消费位置
gatingSequenceCache.set(gatingSequence);
}
else if (cursor.compareAndSet(current, next))
{
// 获取写入位置成功,跳出循环
break;
}
}
while (true);
return next;
}
转载请注明出处:http://zhongmingmao.me/2019/05/31/java-concurrent-disruptor/
访问原文「 Java并发 -- Disruptor 」获取最佳阅读体验并参与讨论