聊聊Disruptor

一起聊聊 LMAX Disruptor

关键词:Cache buffer CAS原子操作 Disruptor 低延迟latency

why为什么要聊这个东西呢?

关键词:量化交易 高频交易策略 搏杀最短的时间

相关的深入了解 推荐书籍 Flash Boys -Michael Lewis

当大量的orders涌入,那么怎样高效的处理这些输入呢?

这时就可以用disruptor

how怎么解决呢?

主要需要解决两个问题:量化和高频

其中量化数据问题其实就是老问题,其实大家都能想到。

当输入速度大于处理速度的时候。可以用一个buffer把这些处理不完的数据缓存起来,然后让consumer慢慢处理。

量化解决了,而高频怎么办呢?毕竟交易的时候 speed = $$$(money) 所以对低延迟要求很高,而buffer必然会带来延迟latency

latency = computational time(计算的延迟时间)+latency of moving data(数据移动延迟时间)

平时我们上课都讨论的是计算复杂性,但是在这种实际的生产环境中,会发现其实数据移动的延迟占了大部分的延迟时间。

那么用一个简单的buffer(array-based)有什么问题呢??

主要是有这样几个竞争的存在:

  • producer的头指针的竞争
  • consumer的尾指针的竞争
  • buffer空的时候头尾指针指向一块,生产者消费者都有竞争

这个简单buffer需要解决竞争:

  • 需要锁
  • false sharing of tail
  • 频繁的读存数据是一个很大的开销
  • 阻塞队列对缓存是不友好的

    什么是memery hierarchy
    存储空间的分层 这和计算机体系结构学的一样
    总之就是利用不同硬件的性能不同 和容量大小的不同来完成数据处理效率的提高
    cache是核心部分

design of disruptor

思路:避免写竞争

  • 同一时间只有一个写
  • no locks
  • avoid false sharing

disruptor的核心简单理解

核心:circular array with a sequence number

环形,producer超出就覆盖

需要注意producer的指针不能超过consumer,因为那后面的数据还没有处理

而且也不能读18-5的数据,因为那部分还没有被producer更新

而多个consumer就展现disruptor的优越性了

因为consumer1和2是没有交流的,他们都可以一直读到18而无竞争

consumer落后太多的时候,还可以有batch effect追上进度

2-PC(两阶段提交)

2-PC 称为“两阶段提交”,2-PC 是分布式事务协议。

比如说一个事务涉及 Oracle、MySQL 和 MS SQLServer 三个数据库的操作
举个最简单的例子,要在这三个数据库中各插入一条数据,但必须保持在一个事务中
要三个插入全部成功才算成功,如果只成功了一个或者两个,那么所有的操作都进行回滚
而 2-PC 就是用来干这事的。

两个阶段如下:

一、
各数据库在执行完 INSERT 后,J2EE 应用服务器在收到提交指令
这时通知各数据库进行事务提交准备。
数据库在收到响应后,进行准备工作,基本上是一个预提交工作,
如果能提交则响应 J2EE 应用服务器是 能成功提交的,
如果无法提交则响应 J2EE 应用服务器是无法提交的。

二、J2EE 应用服务器在收集到所有的响应之后进行判断,
如果在第一阶段收到的信息都是可提交的,那么就通知所有的数据库进行提交;
如果在第一阶段收到的信息有一个是无法提交的,那么就通知所有的数据库进行回滚操作。

要支持分布式事务,那么数据库就必须支持两阶段提交协议,否则是不能支持的。

多个producer怎么办?

这个也是无锁的,怎么处理呢?就是用CAS。没详细讲这部分。
CAS原子操作——Compare & Set,或是 Compare & Swap,
现在几乎所image有的CPU指令都支持CAS的原子操作,X86下对应的是 CMPXCHG 汇编指令。
大家应该还记得操作系统里面关于“原子操作”的概念,一个操作是原子的(atomic),
如果这个操作所处的层(layer)的更高层不能发现其内部实现与结构。
原子操作可以是一个步骤,也可以是多个操作步骤,
但是其顺序是不可以被打乱,或者切割掉只执行部分。
有了这个原子操作这个保证我们就可以实现无锁了。

CAS原子操作在维基百科中的代码描述如下:

1: int compare_and_swap(int* reg, int oldval, int newval)
2: {i
3:   ATOMIC();
4:   int old_reg_val = *reg;
5:   if (old_reg_val == oldval)
6:      *reg = newval;
7:   END_ATOMIC();
8:   return old_reg_val;
9: }

也就是检查内存*reg里的值是不是oldval,如果是的话,则对其赋值newval。
上面的代码总是返回old_reg_value,调用者如果需要知道是否更新成功还需要做进一步判断,
为了方便,它可以变种为直接返回是否更新成功,如下:

1: bool compare_and_swap (int *accum, int *dest, int newval)
2: {
3:   if ( *accum == *dest ) {
4:       *dest = newval;
5:       return true;
6:   }
7:   return false;
8: }

有关Cache很多内容,然而不是这次讨论的焦点,但是不知道又不能很好的理解Disruptor。
实验课有一个图形化展示多Cache一致性的软件,有兴趣可以尝试一下:http://o8fvkqeus.bkt.clouddn.com/Cache%E6%A8%A1%E6%8B%9F%E5%99%A8.zip