Reactor相关API和数据结构的使用。

依赖

需要引入相关的pom依赖,如下:

1
2
3
4
5
6
7
8
9
10
11
12
<!--引入reactor的依赖-->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.2.3.RELEASE</version>
</dependency>
<!--引入reactor的测试依赖-->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.2.3.RELEASE</version>
</dependency>

用到了Reactor两个重要的类:Flux 和 Mono ,实现了Publisher接口,这两个响应式类型所提供的操作就是粘合剂,这些操作将它们结合在一起,来创建数据流动的通道。在 Flux 和 Mono 之间,存在超过 500 种操作,其中的每一个可以被归类为:

  • 创建操作
  • 联合操作
  • 传输操作
  • 逻辑处理操作

响应式类型的创建

响应式类型创建后,如果没有被订阅,那么数据是不会流动起来的。
提供了如下几种方法创建响应式类型:

直接从对象中创建

1
2
3
4
5
6
7
8
9
10
11
// 1. 直接从对象中创建
Flux<String> a = Flux.just("a", "b", "c");
// 中间操作...
// 订阅操作
a.subscribe(System.out::println);
// 使用测试类,验证数据的流动
StepVerifier.create(a)
.expectNext("a")
.expectNext("b")
.expectNext("c")
.verifyComplete();

从集合中创建

可以从数组或者实现了Iterable接口的类中创建Flux。

1
2
3
4
5
6
7
8
9
10
11
String[] fruits = new String[] {
"Apple", "Orange", "Grape", "Banana", "Strawberry" };
// 或者fromIterable
Flux<String> fruitFlux = Flux.fromArray(fruits);
StepVerifier.create(fruitFlux)
.expectNext("Apple")
.expectNext("Orange")
.expectNext("Grape")
.expectNext("Banana")
.expectNext("Strawberry")
.verifyComplete();

从Stream中创建

1
2
3
4
5
6
7
8
9
10
11
12
13
// 从stream中创建
Stream<String> fruitStream =
Stream.of("Apple", "Orange", "Grape", "Banana", "Strawberry");

Flux<String> fruitFlux2 = Flux.fromStream(fruitStream);

StepVerifier.create(fruitFlux2)
.expectNext("Apple")
.expectNext("Orange")
.expectNext("Grape")
.expectNext("Banana")
.expectNext("Strawberry")
.verifyComplete();

从另一个响应式类型中创建

但是要保证输入源不能被关闭了。

1
Flux.from(fruitFlux2).subscribe(System.out::println);

生成flux数据

有的时候没有任何数据可供使用,只需要使用 Flux 作为计数器,发出一个随每个新值递增的数字。要创建计数器 Flux,可以使用静态 range() 方法。

1
2
3
4
5
6
7
8
9
Flux<Integer> intervalFlux =
Flux.range(1, 5);
StepVerifier.create(intervalFlux)
.expectNext(1)
.expectNext(2)
.expectNext(3)
.expectNext(4)
.expectNext(5)
.verifyComplete();

也可以定时发送数据

1
2
3
4
5
6
Flux<Long> intervalFlux =
Flux.interval(Duration.ofSeconds(1))
.take(5);

intervalFlux.subscribe(System.out::println);
ThreadUtil.sleep(10000);

转换过滤响应式流

合并数据流merge

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Flux<String> characterFlux = Flux
.just("Garfield", "Kojak", "Barbossa")
.delayElements(Duration.ofMillis(500));

Flux<String> foodFlux = Flux
.just("Lasagna", "Lollipops", "Apples")
.delaySubscription(Duration.ofMillis(250))
.delayElements(Duration.ofMillis(500));

Flux<String> mergedFlux = characterFlux.mergeWith(foodFlux);

mergedFlux.subscribe(System.out::println);

// 休眠一会,等消费完
ThreadUtil.sleep(10000);

还有一种zip操作,将两个流中的数据,两两结合,形成一个元组,或者自己定义的数据结构。

1
2
3
4
5
6
7
8
9
10
11
12
13
public void zipFluxes() {
Flux<String> characterFlux = Flux
.just("Garfield", "Kojak", "Barbossa");
Flux<String> foodFlux = Flux
.just("Lasagna", "Lollipops", "Apples");

Flux<Tuple2<String, String>> zippedFlux =
Flux.zip(characterFlux, foodFlux);
zippedFlux.subscribe(item->{
System.out.println(item.getT1()+", "+item.getT2());
});
ThreadUtil.sleep(1000);
}

过滤数据

常用的操作:

  • skip()跳过前面的几个数据,或者跳过前面一段时间产生的数据;
  • take()取出前面的几个数据,获取取出前面一段时间产生的数据;
  • filter()根据一定的条件,过滤数据。

skip和take提供了两种取值方式,一种基于数据个数,另一种则是基于时间的,以skip为例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/** skip应用*/
public void skipFluxes() {
String[] arr = {"a", "b", "c", "d", "e", "f"};
// 跳过前面3个元素的流
Flux<String> skip1 = Flux.fromArray(arr).skip(3);
StepVerifier.create(skip1)
.expectNext("d")
.expectNext("e")
.expectNext("f")
.verifyComplete();
// 1s发送一个数据,从第4s开始取数据
Flux<String> skip2 = Flux.fromArray(arr).delayElements(Duration.ofSeconds(1))
.skip(Duration.ofMillis(4000));
StepVerifier.create(skip2)
.expectNext("d")
.expectNext("e")
.expectNext("f")
.verifyComplete();
}

filter操作,基本上和JavaStream的一致。

映射操作

提供了map()flatmap操作。
Map()操作也与JavaStream中的一致,是一个对象到另一个对象的映射,是同步的。
而flatMap() 不是简单地将一个对象映射到另一个对象,而是将每个对象映射到一个新的 Mono 或 Flux。Mono 或 Flux 的结果被压成一个新的 Flux。当与 subscribeOn() 一起使用时,flatMap() 可以释放 Reactor 类型的异步能力。

1
2
3
4
5
6
7
8
public void flatMapFluxes() {
Flux<String> flux1 = Flux.range(1,20)
.flatMap(item -> Mono.just(item).map(x -> "string: " + x).subscribeOn(Schedulers.parallel()));

flux1.subscribe(System.out::println);

ThreadUtil.sleep(1000);
}

因为flatMap是并行的,所以,输出的结果并不是有序的。

1
2
3
4
5
6
7
string: 2
string: 10
string: 18
string: 1
string: 3
string: 4
...

并发模型:

Schedulers提供的方法 描述
Schedulers.immediate() 在当前线程中执行订阅
Schedulers.single() 在单个可重用线程中执行订阅,对所有调用方重复使用同一线程
Schedulers.newSingle() 在每个调用专用线程中执行订阅
Schedulers.elastic() 在从无限弹性池中提取的工作进程中执行订阅,根据需要创建新的工作线程,并释放空闲的工作线程(默认情况下 60 秒)
Schedulers.parallel() 在从固定大小的池中提取的工作进程中执行订阅,该池的大小取决于 CPU 核心的数量。

参考

Reactor介绍