Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注。2011年,企业应用软件专家Martin Fowler专门撰写长文介绍。同年它还获得了Oracle官方的Duke大奖。
目前,包括Apache Storm、Camel、Log4j 2在内的很多知名项目都应用了Disruptor以获取高性能。需要特别指出的是,这里所说的队列是系统内部的内存队列,而不是Kafka这样的分布式队列。以下的队列都单一系统内的队列。

传统队列的问题

传统队列的分类

队列的底层数据结构一般分成三种:数组、链表和堆。

在稳定性和性能要求特别高的系统中,为了防止生产者速度过快,导致内存溢出,只能选择有界队列;同时,为了减少 Java 的垃圾回收对系统性能的影响,会尽量选择 array/heap 格式的数据结构。这样筛选下来,符合条件的队列就只有 ArrayBlockingQueue。

出现的问题

ArrayBlockingQueue 是通过加锁的方式保证线程安全,而且 ArrayBlockingQueue 还存在伪共享问题,这两个问题严重影响了性能。

伪共享问题
伪共享指的是多个线程同时读写同一个缓存行的不同变量时导致的 CPU 缓存失效。尽管这些变量之间没有任何关系,但由于在主内存中邻近,存在于同一个缓存行之中,它们的相互覆盖会导致频繁的缓存未命中,引发性能下降。

Disruptor的原理

Disruptor的使用

三个角色:生产者,消费者,disruptor 对象。
引入依赖:

1
2
3
4
5
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.2</version>
</dependency>

定义事件

事件是Disruptor进行数据交换的数据类型,读写的基本数据单元。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class LongEvent
{
private long value;

public void set(long value)
{
this.value = value;
}

public long getValue() {
return value;
}

@Override
public String toString() {
return "LongEvent{" +
"value=" + value +
'}';
}
}

定义事件工厂

事件工厂用来生成,事件实例的。发布者发布前,先从 RingBuffer 获得一个 Event 的实例,然后往 Event 实例中填充数据,之后再发布到 RingBuffer 中,之后由 Consumer 获得该 Event 实例并从中读取数据。

1
2
3
4
5
6
7
8
public class LongEventFactory implements EventFactory<LongEvent>
{
@Override
public LongEvent newInstance()
{
return new LongEvent();
}
}

定义事件处理器

事件处理器,从RingBuffer中获取数据,并进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class LongEventHandler implements EventHandler<LongEvent> {
private String name;

public LongEventHandler(){}
public LongEventHandler(String name) {
this.name=name;
}
@Override
public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
{
ThreadUtil.sleep(RandomUtil.randomLong(500));
System.out.println("处理器名称:"+name+" 线程:"+Thread.currentThread().getName()+" 处理事件: " + event);
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}
}

创建Disruptor对象

1
2
3
4
5
6
7
8
9
10
EventFactory<LongEvent> eventFactory = new LongEventFactory();
ThreadFactory threadFactory = ThreadUtil
.createThreadFactoryBuilder()
.setNamePrefix("disruptorThread-").build();
// RingBuffer 大小,必须是 2 的 N 次方;
int ringBufferSize = 16;
// 创建一个对象
Disruptor<LongEvent> disruptor = new Disruptor<>(eventFactory,
ringBufferSize, threadFactory, ProducerType.SINGLE,
new YieldingWaitStrategy());

注册处理器(消费者)

1
2
3
4
5
// 注册处理器 可以注册多个
disruptor.handleEventsWith(new LongEventHandler("worker1"),
new LongEventHandler("worker2"));

disruptor.start();

定义生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 发布事件
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
// 获取需要
long sequence = ringBuffer.next();

try {
//获取该序号对应的事件对象;
LongEvent event = ringBuffer.get(sequence);
long data = Thread.currentThread().getId();
event.set(data);
} finally {
//发布事件;
ringBuffer.publish(sequence);
}

完整的main函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class MainRun {

public static void main(String[] args) {
EventFactory<LongEvent> eventFactory = new LongEventFactory();
ThreadFactory threadFactory = ThreadUtil
.createThreadFactoryBuilder()
.setNamePrefix("disruptorThread-").build();
// RingBuffer 大小,必须是 2 的 N 次方;
int ringBufferSize = 16;
// 创建一个对象
Disruptor<LongEvent> disruptor = new Disruptor<>(eventFactory,
ringBufferSize, threadFactory, ProducerType.SINGLE,
new YieldingWaitStrategy());
// 注册处理器
disruptor.handleEventsWith(new LongEventHandler("worker1"),
new LongEventHandler("worker2"));

disruptor.start();
// 生产者线程
ExecutorService executorService = ThreadUtil.newExecutor(10);
for (int i = 0; i < 10; i++) {
executorService.execute(new PublishEvent(disruptor,0));
}

// 关闭线程池和队列
executorService.shutdown();
disruptor.shutdown();

}

static class PublishEvent implements Runnable {

private final Disruptor<LongEvent> disruptor;

private int max;

public PublishEvent(Disruptor<LongEvent> disruptor,int max) {
this.disruptor = disruptor;
this.max=max;
}

@Override
public void run() {
while (max++<10) {
// 发布事件;
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
// 获取需要
long sequence = ringBuffer.next();

try {
//获取该序号对应的事件对象;
LongEvent event = ringBuffer.get(sequence);
long data = Thread.currentThread().getId();
event.set(data);
} finally {
//发布事件;
ringBuffer.publish(sequence);
}
}
}
}
}

参考资料

高性能队列——Disruptor
disruptor 高性能队列最佳选择