Disrupter是业界公认的一个性能很高的有界队列

被使用在了Log4J2,Spring Messaging等,为什么性能如此之高呢

可以总结为以下四点:

1.内存分配更加合理,使用了RingBuffer数组,提高了命中率,对象循环利用,避免GC

2.避免使用伪共享,来提高了共享使用率

3.采用了无锁的算法,避免了频繁的加锁解锁

4.支持批量消费,消费者可以无锁消费多个消息

在具体实现中,Disputor要求,生产者生产的对象称为Event,使用Disputor必须自定义Event

除了使用这个有界队列需要指定大小,还需要传入一个EventFactory,指定的大小必须是2的次方

消费其中的Event需要通过handleEventsWith()注册一个事件处理器,发布Event需要通过publishEvent方法

//自定义Event

class LongEvent {

private long value;

public void set(long value) {

this.value = value;

}

}

//指定RingBuffer大小,

//必须是2的N次方

int bufferSize = 1024;

//构建Disruptor

Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);

//注册事件处理器

disruptor.handleEventsWith((event,sequence,endOfBatch)-> System.out.println(“E: “+event));

//启动Disruptor

disruptor.start();

//获取RingBuffer

RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

//生产Event

ByteBuffer bb = ByteBuffer.allocate(8);

for(long l = 0; true;l++){

bb.putLong(0, l);

//生产者生产消息

ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb);

Thread.sleep(1000);

}

关于这个环形数组的强大之处

我们分为三个方面来进行介绍

1.使用了RingBuffer

具体的参考下一章

而且Disputor再其基础上做了修改,在发布新的Event时候,也并非进行新增,而是进行了Set的重新设置

2.避免了伪共享

因为CPU的缓存机制是按照缓存行进行管理的,通常是64位字节,CPU从内存中加载数据X,会加载X后面的,补齐64位字节,

但是在实际使用中,通常多个彼此依赖的变量在同一个字节中,被作为同一个字节进行缓存了

例如下面代码中,出队的索引和入队的索引

图片

可能由于放在一个CPU缓存中,导致了出队或者入队任何一个操作会导致其他的变量重新读取Cache,导致了伪共享的问题

解决方案很简单 缓存行填充

保证每个变量独占一个缓存行,让其填充为一个字节,避免了伪共享问题的出现

//前:填充56字节

class LhsPadding{

long p1, p2, p3, p4, p5, p6, p7;

}

class Value extends LhsPadding{

volatile long value;

}

//后:填充56字节

class RhsPadding extends Value{

long p9, p10, p11, p12, p13, p14, p15;

}

class Sequence extends RhsPadding{

//省略实现

}

3.无锁操作

Disruptor采用的是无锁算法,虽然复杂,但是其的核心操作可以归为生产和消费两种,

Disruptor维护了出队索引和入队索引两个关键变量,其中多个消费者可以同时消费,都有一个各自的出队索引,RingBuffer的出队索引是所有的消费者中最小的那一个

//生产者获取n个写入位置

do {

//cursor类似于入队索引,指的是上次生产到这里

current = cursor.get();

//目标是在生产n个

next = current + n;

//减掉一个循环

long wrapPoint = next – bufferSize;

//获取上一次的最小消费位置

long cachedGatingSequence = gatingSequenceCache.get();

//没有足够的空余位置

if (wrapPoint>cachedGatingSequence || cachedGatingSequence>current){

//重新计算所有消费者里面的最小值位置

long gatingSequence = Util.getMinimumSequence(gatingSequences, current);

//仍然没有足够的空余位置,出让CPU使用权,重新执行下一循环

if (wrapPoint > gatingSequence){

LockSupport.parkNanos(1);

continue;

}

//从新设置上一次的最小消费位置

gatingSequenceCache.set(gatingSequence);

} else if (cursor.compareAndSet(current, next)){

//获取写入位置成功,跳出循环

break;

}

} while (true);

Disruptor在并发方面将性能做到了极致,其中利用了无锁算法避免了锁的争用,再加上利用了CPu的性能

虽然Java提供了高效率的优化,但是这种优化对于程序员来说是透明的,但是有些并不是真的合适,需要程序员去自己思量

而且比如Java8,提供了避免伪共享的注解 @sun.misc.Contended 通过这个注解可以避免伪共享,但是以牺牲内存为代价的,需要我们去思量

发表评论

邮箱地址不会被公开。 必填项已用*标注