Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注。2011年,企业应用软件专家Martin Fowler专门撰写长文介绍。同年它还获得了Oracle官方的Duke大奖。
目前,包括Apache Storm、Camel、Log4j 2在内的很多知名项目都应用了Disruptor以获取高性能。需要特别指出的是,这里所说的队列是系统内部的内存队列,而不是Kafka这样的分布式队列。以下的队列都单一系统内的队列。
传统队列的问题
传统队列的分类
队列的底层数据结构一般分成三种:数组、链表和堆。
在稳定性和性能要求特别高的系统中,为了防止生产者速度过快,导致内存溢出,只能选择有界队列;同时,为了减少 Java 的垃圾回收对系统性能的影响,会尽量选择 array/heap 格式的数据结构。这样筛选下来,符合条件的队列就只有 ArrayBlockingQueue。
出现的问题
ArrayBlockingQueue 是通过加锁的方式保证线程安全,而且 ArrayBlockingQueue 还存在伪共享问题,这两个问题严重影响了性能。
伪共享问题
伪共享指的是多个线程同时读写同一个缓存行的不同变量时导致的 CPU 缓存失效。尽管这些变量之间没有任何关系,但由于在主内存中邻近,存在于同一个缓存行之中,它们的相互覆盖会导致频繁的缓存未命中,引发性能下降。
Disruptor的原理
Disruptor的使用
三个角色:生产者,消费者,disruptor 对象。
引入依赖:
1 2 3 4 5
| <dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>3.4.2</version> </dependency>
|
定义事件
事件是Disruptor进行数据交换的数据类型,读写的基本数据单元。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public class LongEvent { private long value;
public void set(long value) { this.value = value; }
public long getValue() { return value; }
@Override public String toString() { return "LongEvent{" + "value=" + value + '}'; } }
|
定义事件工厂
事件工厂用来生成,事件实例的。发布者发布前,先从 RingBuffer 获得一个 Event 的实例,然后往 Event 实例中填充数据,之后再发布到 RingBuffer 中,之后由 Consumer 获得该 Event 实例并从中读取数据。
1 2 3 4 5 6 7 8
| public class LongEventFactory implements EventFactory<LongEvent> { @Override public LongEvent newInstance() { return new LongEvent(); } }
|
定义事件处理器
事件处理器,从RingBuffer中获取数据,并进行处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public class LongEventHandler implements EventHandler<LongEvent> { private String name;
public LongEventHandler(){} public LongEventHandler(String name) { this.name=name; } @Override public void onEvent(LongEvent event, long sequence, boolean endOfBatch) { ThreadUtil.sleep(RandomUtil.randomLong(500)); System.out.println("处理器名称:"+name+" 线程:"+Thread.currentThread().getName()+" 处理事件: " + event); }
public String getName() { return name; }
public void setName(String name) { this.name = name; } }
|
创建Disruptor对象
1 2 3 4 5 6 7 8 9 10
| EventFactory<LongEvent> eventFactory = new LongEventFactory(); ThreadFactory threadFactory = ThreadUtil .createThreadFactoryBuilder() .setNamePrefix("disruptorThread-").build();
int ringBufferSize = 16;
Disruptor<LongEvent> disruptor = new Disruptor<>(eventFactory, ringBufferSize, threadFactory, ProducerType.SINGLE, new YieldingWaitStrategy());
|
注册处理器(消费者)
1 2 3 4 5
| disruptor.handleEventsWith(new LongEventHandler("worker1"), new LongEventHandler("worker2"));
disruptor.start();
|
定义生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
long sequence = ringBuffer.next();
try {
LongEvent event = ringBuffer.get(sequence); long data = Thread.currentThread().getId(); event.set(data); } finally {
ringBuffer.publish(sequence); }
|
完整的main函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| public class MainRun {
public static void main(String[] args) { EventFactory<LongEvent> eventFactory = new LongEventFactory(); ThreadFactory threadFactory = ThreadUtil .createThreadFactoryBuilder() .setNamePrefix("disruptorThread-").build(); int ringBufferSize = 16; Disruptor<LongEvent> disruptor = new Disruptor<>(eventFactory, ringBufferSize, threadFactory, ProducerType.SINGLE, new YieldingWaitStrategy()); disruptor.handleEventsWith(new LongEventHandler("worker1"), new LongEventHandler("worker2"));
disruptor.start(); ExecutorService executorService = ThreadUtil.newExecutor(10); for (int i = 0; i < 10; i++) { executorService.execute(new PublishEvent(disruptor,0)); }
executorService.shutdown(); disruptor.shutdown();
}
static class PublishEvent implements Runnable {
private final Disruptor<LongEvent> disruptor;
private int max;
public PublishEvent(Disruptor<LongEvent> disruptor,int max) { this.disruptor = disruptor; this.max=max; }
@Override public void run() { while (max++<10) { RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); long sequence = ringBuffer.next();
try { LongEvent event = ringBuffer.get(sequence); long data = Thread.currentThread().getId(); event.set(data); } finally { ringBuffer.publish(sequence); } } } } }
|
参考资料
高性能队列——Disruptor
disruptor 高性能队列最佳选择