Disrupter是业界公认的一个性能很高的有界队列
被使用在了Log4J2,Spring Messaging等,为什么性能如此之高呢
可以总结为以下四点:
1.内存分配更加合理,使用了RingBuffer数组,提高了命中率,对象循环利用,避免GC
2.避免使用伪共享,来提高了共享使用率
3.采用了无锁的算法,避免了频繁的加锁解锁
4.支持批量消费,消费者可以无锁消费多个消息
在具体实现中,Disputor要求,生产者生产的对象称为Event,使用Disputor必须自定义Event
除了使用这个有界队列需要指定大小,还需要传入一个EventFactory,指定的大小必须是2的次方
消费其中的Event需要通过handleEventsWith()注册一个事件处理器,发布Event需要通过publishEvent方法
//自定义Event
class LongEvent {
private long value;
public void set(long value) {
this.value = value;
}
}
//指定RingBuffer大小,
//必须是2的N次方
int bufferSize = 1024;
//构建Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);
//注册事件处理器
disruptor.handleEventsWith((event,sequence,endOfBatch)-> System.out.println(“E: “+event));
//启动Disruptor
disruptor.start();
//获取RingBuffer
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
//生产Event
ByteBuffer bb = ByteBuffer.allocate(8);
for(long l = 0; true;l++){
bb.putLong(0, l);
//生产者生产消息
ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb);
Thread.sleep(1000);
}
关于这个环形数组的强大之处
我们分为三个方面来进行介绍
1.使用了RingBuffer
具体的参考下一章
而且Disputor再其基础上做了修改,在发布新的Event时候,也并非进行新增,而是进行了Set的重新设置
2.避免了伪共享
因为CPU的缓存机制是按照缓存行进行管理的,通常是64位字节,CPU从内存中加载数据X,会加载X后面的,补齐64位字节,
但是在实际使用中,通常多个彼此依赖的变量在同一个字节中,被作为同一个字节进行缓存了
例如下面代码中,出队的索引和入队的索引
可能由于放在一个CPU缓存中,导致了出队或者入队任何一个操作会导致其他的变量重新读取Cache,导致了伪共享的问题
解决方案很简单 缓存行填充
保证每个变量独占一个缓存行,让其填充为一个字节,避免了伪共享问题的出现
//前:填充56字节
class LhsPadding{ long p1, p2, p3, p4, p5, p6, p7; } class Value extends LhsPadding{ volatile long value; } //后:填充56字节 class RhsPadding extends Value{ long p9, p10, p11, p12, p13, p14, p15; } class Sequence extends RhsPadding{ //省略实现 } |
3.无锁操作
Disruptor采用的是无锁算法,虽然复杂,但是其的核心操作可以归为生产和消费两种,
Disruptor维护了出队索引和入队索引两个关键变量,其中多个消费者可以同时消费,都有一个各自的出队索引,RingBuffer的出队索引是所有的消费者中最小的那一个
//生产者获取n个写入位置
do {
//cursor类似于入队索引,指的是上次生产到这里
current = cursor.get();
//目标是在生产n个
next = current + n;
//减掉一个循环
long wrapPoint = next – bufferSize;
//获取上一次的最小消费位置
long cachedGatingSequence = gatingSequenceCache.get();
//没有足够的空余位置
if (wrapPoint>cachedGatingSequence || cachedGatingSequence>current){
//重新计算所有消费者里面的最小值位置
long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
//仍然没有足够的空余位置,出让CPU使用权,重新执行下一循环
if (wrapPoint > gatingSequence){
LockSupport.parkNanos(1);
continue;
}
//从新设置上一次的最小消费位置
gatingSequenceCache.set(gatingSequence);
} else if (cursor.compareAndSet(current, next)){
//获取写入位置成功,跳出循环
break;
}
} while (true);
Disruptor在并发方面将性能做到了极致,其中利用了无锁算法避免了锁的争用,再加上利用了CPu的性能
虽然Java提供了高效率的优化,但是这种优化对于程序员来说是透明的,但是有些并不是真的合适,需要程序员去自己思量
而且比如Java8,提供了避免伪共享的注解 @sun.misc.Contended 通过这个注解可以避免伪共享,但是以牺牲内存为代价的,需要我们去思量