命令式编程是工作中常用的,一行一行的代码,正像一行一行的命令,程序按照顺序,执行。当一条命令阻塞,那么整个任务也就阻塞了。响应式编程则是一种基于观察者的设计模式,目的提升系统的响应能力,是一种异步非阻塞的模型,基于数据驱动,当数据变化后,其他的观察者会收到相应的通知,Java8中引入了诸如CompletionStage及其实现,CompletableFuture等概念。响应式宣言,The Reactive Manifesto

接口规范

响应式流的规范定义了四个接口:

Publisher

消息发布者,Publisher 为每一个 Subscription 的 Subscriber 生产数据。Publisher 接口声明了一个 subscribe() 方法,通过这个方法 Subscriber 可以订阅 Publisher:

1
2
3
public interface Publisher<T> {
void subscribe(Subscriber<? super T> subscriber);
}

Publisher可以提供0-N个序列元素,并根据订阅者的需求进行推送数据,支持多个订阅者,没有订阅者,那么她也没有存在的意义。
reactor中提供了两种publisher的实现:Flux<T>Mono<T>,前者可以输出0-N个元素,后者则是0-1个元素。

Subscriber

消息订阅者,一旦进行了订阅,调用 Publisher#subscribe(Subscriber)就可以从 Publisher 中接收消息,这些消息都是通过 Subscriber 接口中的方法进行发送。

1
2
3
4
5
6
public interface Subscriber<T> {
void onSubscribe(Subscription sub);
void onNext(T item);
void onError(Throwable ex);
void onComplete();
}

Subscription

订阅操作,发布者和订阅者之间的消息传递。

1
2
3
4
public interface Subscription {
void request(long n);
void cancel();
}

Processor

连接Subcriber和Publisher,作为 Subscriber,Processor 将会接收数据然后以一定的方式处理这些数据。然后它会摇身一变,变为一个 Publisher,将处理的结果发布到 Subscriber。

1
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}

图解响应式流

在 Java 流和响应式流之间有很大的相似性。首先,它们的名字中都含有 Streams。它们也都为处理数据提供函数式接口。事实上,稍后当学到容器的时候,您会看到,其实它们有很多共同操作。然而,Java 流通常是同步的,同时只能处理有限数据集。它们本质上是使用函数式进行集合迭代的一种手段。响应式流支持任何大小的数据集,包括无限数据集的异步处理。它们使实时处理数据成为了可能。

响应式流通常使用弹珠图(Marble Diagram)进行绘制。弹珠图最简单的形式就是,在最上面画出数据流经 Flux 或是 Mono 的时间线,在中间画出操作,在最下面画出 Flux 或是 Mono 结果的时间线。图 11.1 展示了 Flux 的弹珠图模板。正如您所看到的,当数据流通过原始的 Flux 后,它通过一些操作进行处理,通过数据流处理后产生一个新的 Flux。

几种对比:

  1. 传统的方式:
    1
    2
    3
    public List<String> listAllUserV1() {
    return Arrays.asList("a","b","c","d");
    }
    预先生成数据,用者自取。
  2. Java8Stream的方式:
    1
    2
    3
    public Stream<String> listAllUserV2() {
    return Stream.of("a","b","c","d");
    }
    消费过程中提供了额外的操作。
  3. 响应式流的方式:
    1
    2
    3
    public Flux<String> listAllUserV3() {
    return Flux.just("a","b","c","d");
    }
    没有人用就不会产生。