Reactor相关API和数据结构的使用。
依赖
需要引入相关的pom依赖,如下:
1 2 3 4 5 6 7 8 9 10 11 12
| <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> <version>3.2.3.RELEASE</version> </dependency>
<dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <version>3.2.3.RELEASE</version> </dependency>
|
用到了Reactor两个重要的类:Flux 和 Mono ,实现了Publisher接口,这两个响应式类型所提供的操作就是粘合剂,这些操作将它们结合在一起,来创建数据流动的通道。在 Flux 和 Mono 之间,存在超过 500 种操作,其中的每一个可以被归类为:
响应式类型的创建
响应式类型创建后,如果没有被订阅,那么数据是不会流动起来的。
提供了如下几种方法创建响应式类型:
直接从对象中创建
1 2 3 4 5 6 7 8 9 10 11
| Flux<String> a = Flux.just("a", "b", "c");
a.subscribe(System.out::println);
StepVerifier.create(a) .expectNext("a") .expectNext("b") .expectNext("c") .verifyComplete();
|
从集合中创建
可以从数组或者实现了Iterable
接口的类中创建Flux。
1 2 3 4 5 6 7 8 9 10 11
| String[] fruits = new String[] { "Apple", "Orange", "Grape", "Banana", "Strawberry" };
Flux<String> fruitFlux = Flux.fromArray(fruits); StepVerifier.create(fruitFlux) .expectNext("Apple") .expectNext("Orange") .expectNext("Grape") .expectNext("Banana") .expectNext("Strawberry") .verifyComplete();
|
从Stream中创建
1 2 3 4 5 6 7 8 9 10 11 12 13
| Stream<String> fruitStream = Stream.of("Apple", "Orange", "Grape", "Banana", "Strawberry");
Flux<String> fruitFlux2 = Flux.fromStream(fruitStream);
StepVerifier.create(fruitFlux2) .expectNext("Apple") .expectNext("Orange") .expectNext("Grape") .expectNext("Banana") .expectNext("Strawberry") .verifyComplete();
|
从另一个响应式类型中创建
但是要保证输入源不能被关闭了。
1
| Flux.from(fruitFlux2).subscribe(System.out::println);
|
生成flux数据
有的时候没有任何数据可供使用,只需要使用 Flux 作为计数器,发出一个随每个新值递增的数字。要创建计数器 Flux,可以使用静态 range() 方法。
1 2 3 4 5 6 7 8 9
| Flux<Integer> intervalFlux = Flux.range(1, 5); StepVerifier.create(intervalFlux) .expectNext(1) .expectNext(2) .expectNext(3) .expectNext(4) .expectNext(5) .verifyComplete();
|
也可以定时发送数据
1 2 3 4 5 6
| Flux<Long> intervalFlux = Flux.interval(Duration.ofSeconds(1)) .take(5);
intervalFlux.subscribe(System.out::println); ThreadUtil.sleep(10000);
|
转换过滤响应式流
合并数据流merge
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| Flux<String> characterFlux = Flux .just("Garfield", "Kojak", "Barbossa") .delayElements(Duration.ofMillis(500));
Flux<String> foodFlux = Flux .just("Lasagna", "Lollipops", "Apples") .delaySubscription(Duration.ofMillis(250)) .delayElements(Duration.ofMillis(500));
Flux<String> mergedFlux = characterFlux.mergeWith(foodFlux);
mergedFlux.subscribe(System.out::println);
ThreadUtil.sleep(10000);
|
还有一种zip操作,将两个流中的数据,两两结合,形成一个元组,或者自己定义的数据结构。
1 2 3 4 5 6 7 8 9 10 11 12 13
| public void zipFluxes() { Flux<String> characterFlux = Flux .just("Garfield", "Kojak", "Barbossa"); Flux<String> foodFlux = Flux .just("Lasagna", "Lollipops", "Apples");
Flux<Tuple2<String, String>> zippedFlux = Flux.zip(characterFlux, foodFlux); zippedFlux.subscribe(item->{ System.out.println(item.getT1()+", "+item.getT2()); }); ThreadUtil.sleep(1000); }
|
过滤数据
常用的操作:
skip()
跳过前面的几个数据,或者跳过前面一段时间产生的数据;
take()
取出前面的几个数据,获取取出前面一段时间产生的数据;
filter()
根据一定的条件,过滤数据。
skip和take提供了两种取值方式,一种基于数据个数,另一种则是基于时间的,以skip为例。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public void skipFluxes() { String[] arr = {"a", "b", "c", "d", "e", "f"}; Flux<String> skip1 = Flux.fromArray(arr).skip(3); StepVerifier.create(skip1) .expectNext("d") .expectNext("e") .expectNext("f") .verifyComplete(); Flux<String> skip2 = Flux.fromArray(arr).delayElements(Duration.ofSeconds(1)) .skip(Duration.ofMillis(4000)); StepVerifier.create(skip2) .expectNext("d") .expectNext("e") .expectNext("f") .verifyComplete(); }
|
filter操作,基本上和JavaStream的一致。
映射操作
提供了map()
和flatmap
操作。
Map()操作也与JavaStream中的一致,是一个对象到另一个对象的映射,是同步的。
而flatMap() 不是简单地将一个对象映射到另一个对象,而是将每个对象映射到一个新的 Mono 或 Flux。Mono 或 Flux 的结果被压成一个新的 Flux。当与 subscribeOn() 一起使用时,flatMap() 可以释放 Reactor 类型的异步能力。
1 2 3 4 5 6 7 8
| public void flatMapFluxes() { Flux<String> flux1 = Flux.range(1,20) .flatMap(item -> Mono.just(item).map(x -> "string: " + x).subscribeOn(Schedulers.parallel()));
flux1.subscribe(System.out::println);
ThreadUtil.sleep(1000); }
|
因为flatMap是并行的,所以,输出的结果并不是有序的。
1 2 3 4 5 6 7
| string: 2 string: 10 string: 18 string: 1 string: 3 string: 4 ...
|
并发模型:
Schedulers提供的方法 |
描述 |
Schedulers.immediate() |
在当前线程中执行订阅 |
Schedulers.single() |
在单个可重用线程中执行订阅,对所有调用方重复使用同一线程 |
Schedulers.newSingle() |
在每个调用专用线程中执行订阅 |
Schedulers.elastic() |
在从无限弹性池中提取的工作进程中执行订阅,根据需要创建新的工作线程,并释放空闲的工作线程(默认情况下 60 秒) |
Schedulers.parallel() |
在从固定大小的池中提取的工作进程中执行订阅,该池的大小取决于 CPU 核心的数量。 |
参考
Reactor介绍