【2024】JAVA实现响应式编程Web Flux的Reactor具体API文档使用说明
方渐鸿 2024-10-26 15:03:02 阅读 52
目录💻
前言一、简介1、响应式编程概述背景知识什么是响应式编程具体概述应用场景:常用的库和框架
二、 Reactor实现响应式编程1、Flux 和 Mono介绍Flux:Mono:Flux 和 Mono 的区别:Flux 和 Mono 的关系:
2、常用API使用添加依赖2.1、生产流常用汇总
2.1.1、直接创建2.1.2、使用Sinks工具类
2.2、中间操作常用汇总
2.2.1、变换数据2.2.2、Context API2.2.3、合并组合Flux2.2.3、并发控制Flux2.2.4、doOnxxx:感知事件相关的 API2.2.4、onErrorXXX:异常处理相关的 API2.2.4、其他工具 API
2.3、订阅流常用汇总详情使用
前言
响应式编程 (Reactive Programming) 是一种声明式编程范式,专注于数据流和变化的传播。随着软件系统日益复杂,对高并发、实时性和弹性的需求不断增长,响应式编程正逐渐成为主流。特别从Spring Boot3开始逐渐越来越重视使用,并且Spring框架为了全面拥抱响应式编程,提供了Spring WebFlux、Spring Data Reactive等模块,为Java开发者构建响应式应用提供了强大的支持。
一、简介
1、响应式编程概述
背景知识
为了应对高并发服务器端开发场景,在2009 年,微软提出了一个更优雅地实现异步编程的方式——Reactive Programming,我们称之为响应式编程。随后,Netflix 和LightBend 公司提供了RxJava 和Akka Stream 等技术,使得Java 平台也有了能够实现响应式编程的框架。
在2017 年9 月28 日,Spring 5 正式发布。Spring 5 发布最大的意义在于,它将响应式编程技术的普及向前推进了一大步。而同时,作为在背后支持Spring 5 响应式编程的框架Spring Reactor,也进入了里程碑式的3.1.0 版本。
什么是响应式编程
响应式编程是一种面向数据流和变化传播的编程范式。这意味着可以在编程语言中很方便地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。
响应式编程基于reactor(Reactor 是一个运行在 Java8 之上的响应式框架)的思想,当你做一个带有一定延迟的才能够返回的io操作时,不会阻塞,而是立刻返回一个流,并且订阅这个流,当这个流上产生了返回数据,可以立刻得到通知并调用回调函数处理数据。
电子表格程序就是响应式编程的一个例子。单元格可以包含字面值或类似"=B1+C1"的公式,而包含公式的单元格的值会依据其他单元格的值的变化而变化。
响应式传播核心特点之一:变化传播:一个单元格变化之后,会像多米诺骨牌一样,导致直接和间接引用它的其他单元格均发生相应变化。
具体概述
响应式编程 (Reactive Programming) 是一种声明式的编程范式,它关注于数据流和变化的传播。这意味着可以通过定义数据流和它们之间的关系来构建应用程序,当数据发生变化时,应用程序会自动做出响应。
核心概念:
Publisher:发布者;产生数据流Subscriber:订阅者;消费数据流Subscription:订阅关系;
订阅关系是发布者和订阅者之间的关键接口。订阅者通过订阅来表示对发布者产生的数据的兴趣。订阅者可以请求一定数量的元素,也可以取消订阅。 Processor:处理器
处理器是同时实现了发布者和订阅者接口的组件,它可以接收来自一个发布者的数据,进行处理,并将结果发布给下一个订阅者。处理器在Reactor中充当中间环节,代表一个处理阶段,允许你在数据流中进行转换、过滤和其他操作。
这种模型遵循Reactive Streams规范,确保了异步流的一致性和可靠性。
应用场景:
实时数据流处理: 例如股票交易系统、传感器数据监控、网络游戏等。用户界面开发: 例如响应式Web应用、移动应用等。微服务架构: 响应式编程可以帮助构建更加弹性和可扩展的微服务系统。大数据处理: 响应式编程可以用于处理大规模数据集,例如使用Spark Streaming或Apache Flink。
常用的库和框架
RxJava: Java的响应式扩展库,提供了丰富的操作符和工具。Reactor: Java的响应式编程框架,由Pivotal开发,是Spring WebFlux的基础。Kotlin Coroutines: Kotlin的协程库,提供了轻量级的异步编程模型,可以与响应式编程框架集成。Spring WebFlux: Spring框架的响应式Web框架,用于构建响应式Web应用。
二、 Reactor实现响应式编程
目前java要实现响应式编程主要就是使用Reactor进行使用, Reactor 也是 Spring WebFlux 的基础,所以在使用上可以与 Spring 框架无缝集成,构建响应式 Web 应用。使用下面我们要介绍的就是Reactor的常用API的使用,基本上学会了Reactor,就可以直接使用Spring WebFlux 构建高性能、可扩展的 Web 应用。
并且使用Spring WebFlux 构建的非阻塞 I/O 和事件驱动模型可以充分利用系统资源,可以有效的提高应用程序的性能和效率。相对于Spring MVC 这种阻塞式来说在性能上会得到很大的提升,
而且响应式编程可以更好地处理并发和异步操作,提高系统的弹性和可扩展性。
1、Flux 和 Mono介绍
在 Reactor 中,Flux 和 Mono 是两个核心组件,用于表示异步数据流。它们都实现了 Reactive Streams 规范中的 Publisher 接口,可以发出零个或多个元素。
Flux:
表示一个可以发出零个或多个元素的异步序列。
可以用于表示任何类型的数据流,例如用户输入事件、传感器数据、数据库查询结果等。
提供了丰富的操作符,用于对数据流进行转换、过滤、合并、延迟等操作。
<code>Flux<String> names = Flux.just("Alice", "Bob", "Charlie");
names.subscribe(System.out::println); // 输出: Alice Bob Charlie
Mono:
表示一个最多发出一个元素的异步序列(1个或者0个)。通常用于表示单个结果,例如数据库查询结果、HTTP 请求响应等。也提供了一些操作符,用于对结果进行转换和处理。
Mono<String> name = Mono.just("Alice");
name.subscribe(System.out::println); // 输出: Alice
Flux 和 Mono 的区别:
特性 | Flux | Mono |
---|---|---|
发射元素个数 | 零个或多个 | 零个或一个 |
使用场景 | 表示数据流 | 表示单个结果 |
操作符 | 提供丰富的操作符 | 提供一些操作符 |
Flux 和 Mono 的关系:
Mono 可以看作是 Flux 的特例,它最多只发出一个元素。可以使用 Flux.from(mono) 将 Mono 转换为 Flux。可以使用 mono.flux() 将 Mono 转换为 Flux。可以使用 flux.single() 或 flux.singleOrEmpty() 将 Flux 转换为 Mono,但前提是 Flux 必须只包含一个元素或为空。
2、常用API使用
在具体使用上Flux和Mono的API使用都差不多,在分类上,我们可以大致分为三类,用来产生生产流的和中间做转换的,还有结束流的
添加依赖
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
2.1、生产流
也就是创建Flux和Mono的API
常用汇总
方法 | 作用 |
---|---|
just(T... data) | 创建一个发射指定元素的 Flux |
fromIterable(Iterable<? extends T> it) | 从 Iterable(集合) 创建一个 Flux |
fromArray(T[] array) | 从数组创建 Flux |
fromStream(Stream<? extends T> s) | 从Stream中创建 Flux,意味着我们可以把jdk8的Stream也直接进行无缝衔接转为Flux |
range(int start, int count) | 创建发射指定范围的 Flux,一般可以用作计数器或者读秒配合delayElements() 。 |
empty() | 创建一个不发射任何元素的 Flux,mono也一样。 |
Flux<T> error(Throwable error | 创建一个发射错误信号的 Flux |
2.1.1、直接创建
Flux<T> just(T... data)
:创建一个发射指定元素的 Flux
public void just() throws IOException {
Flux<String> just = Flux.just("a", "b", "c", "d", "e");
just.subscribe(System.out::println);
System.in.read();
}
/*得到的结果
a
b
c
d
e
Flux<T> fromIterable(Iterable<? extends T> it)
:从 Iterable(集合) 创建一个 Flux
public void fromIterable() throws IOException {
List<String> list = Arrays.asList("a", "b", "c", "d", "e");
Flux<String> just = Flux.fromIterable(list);
just.subscribe(System.out::println);
System.in.read();
}
/*得到的结果
a
b
c
d
e
Flux<T> fromArray(T[] array)
:从数组创建 Flux
public void fromArray() throws IOException {
String[] arrays = { "a", "b", "c", "d", "e"};
Flux<String> just = Flux.fromArray(arrays);
just.subscribe(System.out::println);
System.in.read();
}
/*得到的结果
a
b
c
d
e
Flux<T> fromStream(Stream<? extends T> s)
:从Stream中创建 Flux,意味着我们可以把jdk8的Stream也直接进行无缝衔接转为Flux
public void fromStream() throws IOException {
Stream<String> stream = Stream.of("a", "b", "c", "d", "e");
Flux<String> just = Flux.fromStream(stream);
just.subscribe(System.out::println);
System.in.read();
}
/*得到的结果
a
b
c
d
e
Flux<Integer> range(int start, int count)
:创建发射指定范围的 Flux,一般可以用作计数器或者读秒配合delayElements()
。
public void range() throws IOException {
Flux<Integer> just = Flux.range(0,5)
.delayElements(Duration.ofSeconds(1)); //一秒发射一次
just.subscribe(System.out::println);
System.in.read();
}
/* 得到的结果
0
1
2
3
4
Flux<T> empty()
:创建一个不发射任何元素的 Flux,mono也一样。
public void empty() throws IOException {
Mono<Integer> mono = Mono.empty();
Flux<Integer> flux = Flux.empty();
flux.subscribe(System.out::println);
System.in.read();
}
/* 订阅者不会接受到任何结果
Flux<T> error(Throwable error)
:创建一个发射错误信号的 Flux
public void error() throws IOException {
Flux<Integer> just = Flux.error(new RuntimeException("执行错误!"));
just.subscribe(System.out::println);
System.in.read();
}
/*
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: 执行错误!
2.1.2、使用Sinks工具类
Sinks 是一个用于创建各种类型的 Sink工厂类,然后在通过asFlux方法把Sink转为Flux。具体分别为:
One<T> one()
:创建一个只接收一个数据的 Sink。
public void one(){
Sinks.One<Integer> one = Sinks.one();
one.tryEmitValue(1);
one.tryEmitValue(2);
Mono<Integer> mono = one.asMono();
mono.subscribe(System.out::println);
}
/* 不管发射多少个消息到one里都只会读取一个
1
many()
:创建一个可以接收多个数据的 Sink。
unicast()
:单播,只能有一个消费者订阅转换后到flux,第二个去订阅时会报错
public void unicast() throws IOException {
Sinks.Many<Integer> sink = Sinks.many()
.unicast()
.onBackpressureBuffer();
// 生产消息
sink.tryEmitNext(1);
sink.tryEmitNext(2);
Flux<Integer> flux = sink.asFlux();
flux.subscribe(v->System.out.println("p1:"+v));
//如果有第二个消费者来消费消息,会直接抛出错误
flux.subscribe(v->System.out.println("p2:"+v));
sink.tryEmitNext(3);
sink.tryEmitNext(4);
System.in.read();
}
/* 执行到订阅者2时会报错
multicast()
:多播。
Sinks.many()
方法创建的多播 Sink 可以选择不同的策略解决背压问题。不同策略之间的区别主要体现在 如何处理下游消费者无法及时处理数据的情况。
背压 (Backpressure):在响应式编程中,背压指的是下游消费者无法及时处理上游生产者发送的数据,导致数据堆积,最终可能导致内存溢出等问题。也就是生产者生产的速度大于消费者消费的速度
onBackpressureBuffer()
:如果下游消费者无法及时处理数据,则 Sink 会将数据缓存到一个缓冲区中,直到消费者能够处理为止,缓冲区可以设置大小
public void onBackpressureBuffer() throws IOException {
Sinks.Many<Integer> sink = Sinks.many()
.multicast()
.onBackpressureBuffer(); //开启缓冲区
sink.tryEmitNext(1);
sink.tryEmitNext(2);
Flux<Integer> flux = sink.asFlux();
//在订阅之前发送的消息会先被放在缓冲区
flux.subscribe(v->System.out.println("p1:"+v));
sink.tryEmitNext(3);
sink.tryEmitNext(4);
flux.subscribe(v->System.out.println("p2:"+v));
sink.tryEmitNext(5);
sink.tryEmitNext(6);
System.in.read();
}
/* 第二个订阅的消费者会从订阅的时间开始接收消息,前面存放缓冲区的消息无法接收到
p1:1
p1:2
p1:3
p1:4
p1:5
p2:5
p1:6
p2:6
directBestEffort()
:如果下游消费者无法及时处理数据,则 Sink 会尽力将数据发送给消费者,但可能会丢弃一些数据。
public void directBestEffort() throws IOException {
Sinks.Many<Integer> sink = Sinks.many()
.multicast()
.directBestEffort();
sink.tryEmitNext(1);
sink.tryEmitNext(2);
sink.tryEmitNext(3);
sink.tryEmitNext(4);
Flux<Integer> flux = sink.asFlux();
flux.subscribe(v->System.out.println("p1:"+v));
sink.tryEmitNext(5);
sink.tryEmitNext(6);
flux.subscribe(v->System.out.println("p2:"+v));
sink.tryEmitNext(7);
sink.tryEmitNext(8);
System.in.read();
}
/*
p1:5
p1:6
p1:7
p2:7
p1:8
p2:8
directAllOrNothing()
:如果下游消费者无法及时处理数据,则 Sink 会直接丢弃所有后续数据,并发出一个 onError 信号。操作同上
empty()
:创建一个空的 Sink
2.2、中间操作
常用汇总
方法 | 作用 |
---|---|
map() | 将每个元素映射到另一个类型的值。 |
flatMap() | 将每个元素映射到一个新的 Publisher,并将其扁平化成一个新的 Flux |
transform() | transformer会立即执行传递给它的转换函数,并生成新的 Flux。 |
filter() | 根据指定条件过滤 |
take(long n) | 获取到指定长度的结果结束 |
skip(long skipped) | 从第几个元素开始获取 |
distinct() | 去重,去除重复元素 |
contextWrite() | 在整个响应式链中传递上下文信息 |
merge() | 合并多个flux,按照源里每个元素的写入的时间排序,不会保证顺序 |
zip() | 压缩多个flux,按照下标把每个flux的元素都放入一个数组 |
combineLatest() | 把多个flux按照下标把值压缩再一起进行处理 |
publishOn(Scheduler scheduler) | 并发处理,指定后续操作的执行线程池 |
doOnComplete(Runnable onComplete) | 当数据流完成时执行指定的操作,可以用于清理资源、记录日志等。 |
doOnEach(Consumer<? super Signal<T>> signalConsumer) | 每个元素(流的数据和信号)到达的时候触发 |
onErrorReturn() | 吃掉异常,消费者无异常感知,返回一个兜底默认值,并结束flux流 |
onErrorContinue() | 忽略当前异常,仅通知记录,继续推进 |
retry() | 当发生错误时,重新订阅 Flux 流,最多尝试指定次数 |
buffer() | 缓冲指定元素再消费 |
cache() | 缓存数据,把订阅的数据缓存,可以设置缓存大小。当第二个订阅者来获取数据的时候就只能到缓存区去取数据 |
handle() | 自定义流中元素处理规则,内部可以自定义设置处理规则,然后再通过sink.next(),把处理后的数据发送到下一个节点 |
switchIfEmpty() | 如果流数据为空则动态兜底获取数据 |
2.2.1、变换数据
map()
:将每个元素映射到另一个类型的值。
public void map(){
Flux<String> flux = Flux.just("a", "b", "c", "d", "e")
.map(String::toUpperCase)
.log();
flux.subscribe(System.out::println);
}
flatMap()
:将每个元素映射到一个新的 Publisher,并将其扁平化成一个新的 Flux
public void flatMap() throws IOException {
Flux<String> flux = Flux.just("a1", "b2", "c3", "d4", "e5")
.flatMap(v->
Flux.just(v.split(""))
);
flux.subscribe(System.out::println);
System.in.read();
}
transformDeferred
和transform
都是用来转换 Flux 数据流的运算符。区别在于 转换逻辑的执行时机
transformDeferred()
运算符会延迟执行传递给它的转换函数,直到下游订阅者订阅时才会执行。每个订阅者都会独立地执行转换逻辑,生成自己的转换结果。适用于复杂、有状态的转换
public void transformDeferred() throws IOException {
AtomicInteger atomicInteger = new AtomicInteger();
Flux<String> flux = Flux.just("a", "b", "c")
.transformDeferred(v->{ //
int andIncrement = atomicInteger.getAndIncrement(); //把值加一
return v.map(it->it+andIncrement);
});
//transformDeferred中,每个订阅者都会去执行一次transformDeferred,等于每个订阅者都会有自己独立的结果
flux.subscribe(v-> System.out.println("订阅者1:"+v));
flux.subscribe(v-> System.out.println("订阅者2:"+v));
System.in.read();
}
/* transformDeferred每个订阅者都会独立生成结果
订阅者1:a0
订阅者1:b0
订阅者1:c0
订阅者2:a1
订阅者2:b1
订阅者2:c1
transform()
:transformer会立即执行传递给它的转换函数,并生成新的 Flux。由于转换逻辑立即执行,所有订阅者都会共享同一个转换结果。适用于简单、无状态的转换
public void transform() throws IOException {
AtomicInteger atomicInteger = new AtomicInteger();
Flux<String> flux = Flux.just("a", "b", "c")
.transform(v->{
int andIncrement = atomicInteger.getAndIncrement();
return v.map(it->it+andIncrement);
});
//不管多少个订阅者都会共享transform这一个转换的结果,等于transform只会执行一次
flux.subscribe(v-> System.out.println("订阅者1:"+v));
flux.subscribe(v-> System.out.println("订阅者2:"+v));
System.in.read();
}
/*transform所有订阅者都会共享同一个转换结果
订阅者1:a0
订阅者1:b0
订阅者1:c0
订阅者2:a0
订阅者2:b0
订阅者2:c0
filter()
:根据指定条件过滤
public void filter() throws IOException {
Flux<String> flux = Flux.just("a", "b", "c")
.filter(v->
v.equals("b")
);
flux.subscribe(System.out::println);
System.in.read();
}
/*生成结果
b
take(long n)
:获取到指定长度的结果结束
public void take() throws IOException {
Flux<String> flux = Flux.just("a", "b", "c","d","e")
.take(3);
flux.subscribe(System.out::println);
System.in.read();
}
/*生成结果
a
b
c
skip(long skipped)
:从第几个元素开始获取
public void skip() throws IOException {
Flux<String> flux = Flux.just("a", "b", "c","d","e")
.skip(3);
flux.subscribe(System.out::println);
System.in.read();
}
/*生成结果
d
e
distinct()
:去重,去除重复元素
public void distinct() throws IOException {
Flux<String> flux = Flux.just("a", "a", "c","a","e")
.distinct();
flux.subscribe(System.out::println);
System.in.read();
}
/*
a
c
e
2.2.2、Context API
在Flux响应式编程中,Context API用于在整个响应式链中传递上下文信息,例如订阅者信息、调度器、钩子等。它类似于线程本地存储,但作用于响应式流。
可以使用Context API添加钩子函数,例如在响应式流的每个元素处理前后执行特定操作。
用法:
transformDeferredContextual()
:两个参数一个是当前的flux对象,一个是Context上下文对象contextWrite()
:一个参数,就是Context对象Context
:通过of方法把数据通过key,value的方式写入到flux中
of(Object key1, Object value1, Object key2, Object value2)
:如果有多组参数要传入,通过of的(k, v, k, v)的方式这样写
public void context() throws IOException {
Flux<String> fluxs = Flux.just(1, 2, 3)
.transformDeferredContextual((flux, context) -> { //它会在每个元素处理之前,提供当前的 Flux 和下面添加的上下文context对象信息。
System.out.println("flux:" + flux);
System.out.println("context:" + context);
return flux.map(i -> i + "====>" + context.get("key1")); //对每个元素进行处理,将元素与上下文中的 "key" 值拼接在一起。
}).map(String::toUpperCase);
//可以在后面把context通过contextWrite添加到上下文中
Context context = Context.of("key1", "zhangsan","key2", "list");
fluxs.contextWrite(context) //设置上下文中的 "key" 为 "zhangsan"。
.subscribe(v-> System.out.println("v = " + v));
System.in.read();
}
/* 返回的结果
flux:FluxArray 当前flux
context:Context2{key1=zhangsan, key2=list} Context对象,几组k,v,名字就是几
v = 1====>ZHANGSAN
v = 2====>ZHANGSAN
v = 3====>ZHANGSAN
2.2.3、合并组合Flux
merge()
:合并多个flux,按照源里每个元素的写入的时间排序,不会保证顺序
public void merge() throws InterruptedException {
Flux<String> flux = Flux.just("a","b");
Mono<String> mono = Mono.just("c");
Flux.merge(mono,flux)
.subscribe(System.out::println);
Thread.sleep(5000);
}
/* 输出结果,可能会出现a、c、b
a
b
c
concat()
:合并多个flux,会严格保证元素的顺序。
public void concat() throws InterruptedException {
Flux<String> flux1 = Flux.just("a","b");
Flux<String> flux2 = Flux.just("c","d");
Flux.concat(flux1,flux2)
.subscribe(System.out::println);
Thread.sleep(5000);
}
/* 输出结果,一定会说先输出完flux1,再到flux2
a
b
c
d
zip()
:压缩多个flux,按照下标把每个flux的元素都放入一个数组,如果多个flux的长度不一样,以最短的为长度
public void zip() throws IOException {
Flux<String> flux1 = Flux.just("a", "b", "c");
Flux<Integer> flux2 = Flux.just(1, 2, 3,4);
Flux<String> flux3 = Flux.just("A", "B", "C","D","E");
Flux.zip(flux1, flux2, flux3)
.map(Tuple2::toString)
.subscribe(System.out::println);
System.in.read();
}
/* 输出结果。会保证每个数组的长度都一样
[a,1,A]
[b,2,B]
[c,3,C]
zipWith
:压缩拼接,作用同上,只不过是只能一个一个的压缩。
public void zipWith() throws IOException {
Flux<String> flux1 = Flux.just("a", "b", "c");
Flux.just(1, 2, 3, 4)
.zipWith(flux1)
.subscribe(System.out::println);
System.in.read();
}
/* 输出结果。
[1,a]
[2,b]
[3,c]
combineLatest()
:把多个flux按照下标把值压缩再一起进行处理
public void combineLatest() throws IOException {
Flux<String> flux1 = Flux.just("a", "b", "c");
Flux<Integer> flux2 = Flux.just(1, 2, 3,4);
Flux<String> flux3 = Flux.just("A", "B", "C");
// 两个的写法
// Flux.combineLatest(flux1, flux2, (f1,f2)->f1+"-"+f2)
// .map(String::toUpperCase)
// .subscribe(System.out::println);
// 多个的写法
Iterable<Publisher<?>> publishers = Arrays.asList(flux1, flux2,flux3);
Flux.combineLatest(publishers,fs->fs[0]+"-"+fs[1]+"-"+fs[2])
.map(String::toUpperCase)
.subscribe(System.out::println);
System.in.read();
}
/* 获取按照下标拼接后的值
C-4-A
C-4-B
C-4-C
2.2.3、并发控制Flux
Scheduler:调度器
Reactor 中,Schedulers 提供了多种指定线程的方式,可以根据不同的场景选择合适的 Scheduler 来执行任务。下面是常用的Scheduler
Schedulers.immediate()
:默认,无执行上下文,当前线程运行所有操作
Schedulers.single()
:使用固定的单线程
Schedulers.boundedElastic()
:使用一个有界弹性线程池执行任务。最大线程数为 CPU 核心数 * 10。
Schedulers.parallel()
:使用一个固定大小的线程池执行任务,线程池的大小默认为 CPU 核心数
Schedulers.fromExecutor(Executor executor)
:使用自定义的 Executor 执行任务
Schedulers.newParallel(String name, int parallelism)
:也可以通过new的方式自定义Scheduler ,直接指定线程池大小,作用同上
parallel(int parallelism)
:并行处理,将 Flux 的数据流分成多个并行的轨道
runOn()
:指定后续操作的执行线程,一般会配合parallel()
一起使用,使用parallel控制并发数,再通过runOn绑定线程池
public void parallel() throws IOException {
Flux.just("a","b","c","d","e")
.log()
.parallel(4) //指定并行数
.runOn(Schedulers.newParallel("yy")) //指定线程名称
.map(String::toUpperCase)
.log()
.subscribe();
System.in.read();
}
/* 通过log查看使用runOn前后线程,可以看到Map操作都是使用的runOn指定的yy名称的线程执行的
[ main] reactor.Flux.Array.1 : | onNext(a)
[ main] reactor.Flux.Array.1 : | onNext(b)
[ yy-2] reactor.Parallel.Map.2 : onNext(A)
[ main] reactor.Flux.Array.1 : | onNext(c)
[ main] reactor.Flux.Array.1 : | onNext(d)
[ yy-3] reactor.Parallel.Map.2 : onNext(B)
[ yy-5] reactor.Parallel.Map.2 : onNext(D)
[ main] reactor.Flux.Array.1 : | onNext(e)
[ yy-4] reactor.Parallel.Map.2 : onNext(C)
[ yy-2] reactor.Parallel.Map.2 : onNext(E)
publishOn(Scheduler scheduler)
:指定下游操作执行的线程,作用差不多相当于上面两个的组合。不会影响上游操作的执行线程,只影响其后的操作符
public void publishOn() throws InterruptedException {
//自定义线程池
Scheduler scheduler = Schedulers.fromExecutor(new ThreadPoolExecutor(
4,
8,
60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100),
(r)->{
Thread thread = new Thread(r);
thread.setName("yy:"+ thread.getName());
return thread;
}));
Flux.just("a", "b", "c", "d", "e")
.log()
.publishOn(scheduler) //指定
.map(String::toUpperCase)
.log().subscribe();
Thread.sleep(1000);
}
/* 执行结果,因为比较快所以只用了一条线程
[ main] reactor.Flux.Array.1 : | onNext(a)
[ main] reactor.Flux.Array.1 : | onNext(b)
[ main] reactor.Flux.Array.1 : | onNext(c)
[ main] reactor.Flux.Array.1 : | onNext(d)
[ main] reactor.Flux.Array.1 : | onNext(e)
[ main] reactor.Flux.Array.1 : | onComplete()
[ yy:Thread-2] reactor.Flux.MapFuseable.2 : | onNext(A)
[ yy:Thread-2] reactor.Flux.MapFuseable.2 : | onNext(B)
[ yy:Thread-2] reactor.Flux.MapFuseable.2 : | onNext(C)
[ yy:Thread-2] reactor.Flux.MapFuseable.2 : | onNext(D)
[ yy:Thread-2] reactor.Flux.MapFuseable.2 : | onNext(E)
subscribeOn(Scheduler scheduler)
:指定订阅发生和上游操作执行的线程
public void subscribeOn() throws InterruptedException {
Scheduler scheduler = Schedulers.fromExecutor(new ThreadPoolExecutor(
4,
8,
60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100),
(r)->{
Thread thread = new Thread(r);
thread.setName("yy:"+ thread.getName());
return thread;
}));
Flux.just("a", "b", "c", "d", "e")
.log()
.subscribeOn(scheduler)
.map(String::toUpperCase)
.log().subscribe();
Thread.sleep(1000);
}
/* 使用subscribeOn后,会把上游的操作也全部使用指定的线程
[ yy:Thread-1] reactor.Flux.Array.1 : | onNext(a)
[ yy:Thread-1] reactor.Flux.Map.2 : onNext(A)
[ yy:Thread-1] reactor.Flux.Array.1 : | onNext(b)
[ yy:Thread-1] reactor.Flux.Map.2 : onNext(B)
[ yy:Thread-1] reactor.Flux.Array.1 : | onNext(c)
[ yy:Thread-1] reactor.Flux.Map.2 : onNext(C)
[ yy:Thread-1] reactor.Flux.Array.1 : | onNext(d)
[ yy:Thread-1] reactor.Flux.Map.2 : onNext(D)
[ yy:Thread-1] reactor.Flux.Array.1 : | onNext(e)
[ yy:Thread-1] reactor.Flux.Map.2 : onNext(E)
2.2.4、doOnxxx:感知事件相关的 API
在 Reactor 中,感知事件相关的 API 主要用于处理数据流中的特殊事件,例如数据流的完成、错误以及取消等。这些 API 可以帮助我们更好地控制数据流的行为,并对不同的事件做出相应的处理。
doOnComplete(Runnable onComplete)
:当数据流完成时执行指定的操作,可以用于清理资源、记录日志等。
public void doOnComplete() throws InterruptedException {
Flux<String> flux = Flux.just("a", "b", "c")
.map(String::toUpperCase)
.doOnComplete(() -> System.out.println("执行完成!"));
flux.subscribe(v-> System.out.println("v1:"+v));
flux.subscribe(v-> System.out.println("v2:"+v));
Thread.sleep(1000);
}
/* 输出结果,会再消费完成时感知
v1:A
v1:B
v1:C
执行完成!
v2:A
v2:B
v2:C
执行完成!
doOnError(Consumer<? super Throwable> onError)
:当数据流发生错误时执行指定的操作,用作处理异常
public void doOnError() throws InterruptedException {
Flux<String> flux = Flux.just("a", "b", "c")
.map(v-> {
if (v.equals("b")) {
throw new RuntimeException("b");
}
return v;
})
.doOnError((v) -> System.out.println("发生异常==>"+v));
flux.subscribe(v-> System.out.println("v1:"+v));
Thread.sleep(1000);
}
/*
v1:a
发生异常==>java.lang.RuntimeException: b
doOnEach(Consumer<? super Signal<T>> signalConsumer)
:每个元素(流的数据和信号)到达的时候触发
doOnNext(Consumer<? super T> onNext)
:每个数据(流的数据)到达的时候触发
public void doOnNext() throws InterruptedException {
Flux<String> flux = Flux.just("a", "b", "c")
.map(String::toUpperCase)
.doOnEach((v) -> System.out.println("Each读取到==>"+v))
.doOnNext((v) -> System.out.println("Next读取到==>"+v));
flux.subscribe(v-> System.out.println("v1:"+v));
Thread.sleep(1000);
}
/* doOnNext只能读取到元素,doOnEach可以获取到信号
Each读取到==>doOnEach_onNext(A)
Next读取到==>A
v1:A
Each读取到==>doOnEach_onNext(B)
Next读取到==>B
v1:B
Each读取到==>doOnEach_onNext(C)
Next读取到==>C
v1:C
Each读取到==>onComplete()
doOnCancel(Runnable onCancel)
:流被取消时触发,如通过take再还没读取完就取消。正常结束不会被触发
doFinally(Consumer<SignalType> onFinally)
:流被订阅执行完成终止时触发,包括正常结束和异常结束
public void doOnCancel() throws InterruptedException {
Flux<String> flux = Flux.just("a", "b", "c")
.doOnCancel(() -> System.out.println("流被取消"))
.doFinally((v) -> System.out.println("执行结束"+v))
.take(2);
flux.subscribe(v-> System.out.println("v1:"+v));
flux.subscribe(v-> System.out.println("v2:"+v));
Thread.sleep(1000);
}
/* 正常结束不会有doOnCancel的触发
v1:a
v1:b
流被取消
执行结束cancel
v2:a
v2:b
流被取消
执行结束cancel
doOnRequest(LongConsumer consumer)
:流被订阅者请求数据时触发
doOnSubscribe(Consumer<? super Subscription> onSubscribe)
:流被订阅开始订阅时触发
public void doOnRequest() throws InterruptedException {
Flux<String> flux = Flux.just("a", "b", "c")
.map(String::toUpperCase)
.doOnRequest((n) -> System.out.println("流被请求"+n))
.doOnSubscribe((n) -> System.out.println("流被订阅"+n));
flux.subscribe(v-> System.out.println("v1:"+v));
flux.subscribe(v-> System.out.println("v2:"+v));
Thread.sleep(1000);
}
/* 订阅者会需要先订阅在请求数据,所以订阅会被触发在前面
流被订阅reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber@60e06f7d
流被请求9223372036854775807
v1:A
v1:B
v1:C
流被订阅reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber@59b32539
流被请求9223372036854775807
v2:A
v2:B
v2:C
2.2.4、onErrorXXX:异常处理相关的 API
onErrorReturn()
:吃掉异常,消费者无异常感知,返回一个兜底默认值,并结束flux流
public void onErrorReturn() throws InterruptedException {
Flux<String> flux = Flux.just("a", "b", "c")
.map(v->{
if(v.equals("b"))
throw new RuntimeException("b");
else
return v;
}).onErrorReturn("Error"); //发生异常返回兜底
flux.subscribe(v-> System.out.println("v1:"+v));
Thread.sleep(1000);
}
/* 会返回兜底的异常输出
v1:a
v1:Error
onErrorResume()
:当发生错误时,使用另一个 Flux 继续执行流程。相当于一个补偿结果
public void onErrorResume() throws InterruptedException {
Flux<String> flux = Flux.just("a", "b", "c")
.map(v->{
if(v.equals("b"))
throw new RuntimeException("b");
else
return v;
}).onErrorResume(e->{
System.err.println("发生异常==>"+e);
return Flux.just("D","E");
});
flux.subscribe(v-> System.out.println("v1:"+v));
Thread.sleep(1000);
}
/* 发生异常后,原始的flux会停止,会去执行onErrorResume内部的flux
v1:a
发生异常==>java.lang.RuntimeException: b
v1:D
v1:E
onErrorMap()
:捕获并包装成一个业务异常,并重新抛出,消费者有感知
public void onErrorMap() throws InterruptedException {
Flux<String> flux = Flux.just("a", "b", "c")
.map(v->{
if(v.equals("b"))
throw new RuntimeException("b");
else
return v;
}).onErrorMap(RuntimeException.class,e -> new IllegalArgumentException("计算失败:"+e.getMessage())
);
flux.subscribe(v-> System.out.println("v1:"+v));
Thread.sleep(1000);
}
/** 会捕获异常,然后在转换为onErrorMap内部定义的异常返回
v1:a
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalArgumentException: 计算失败:b
Caused by: java.lang.IllegalArgumentException: 计算失败:b
onErrorContinue()
:忽略当前异常,仅通知记录,继续推进
public void onErrorContinue() throws InterruptedException {
Flux<String> flux = Flux.just("a", "b", "c")
.map(v->{
if(v.equals("b"))
throw new RuntimeException("b");
else
return v;
}).onErrorContinue((e, obj) -> 忽略错误,并打印错误信息和当前元素
System.err.println("发生异常: " + e + ", 数据: " + obj)
);
flux.subscribe(v-> System.out.println("v1:"+v));
Thread.sleep(1000);
}
/**
v1:a
发生异常: java.lang.RuntimeException: b, 数据: b
v1:c
retry()
:当发生错误时,重新订阅 Flux 流,最多尝试指定次数
public void retry() throws InterruptedException {
Flux<String> flux = Flux.just("a", "b", "c")
.map(v->{
if(v.equals("b"))
throw new RuntimeException("b");
else
return v;
}).retry(2); //最多尝试两次
flux.subscribe(v-> System.out.println("v1:"+v));
Thread.sleep(1000);
}
/* 发生错误时尝试重新订阅,如果超过设置的次数,则抛出错误
v1:a
v1:a
v1:a
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: b
2.2.4、其他工具 API
buffer()
:缓冲指定元素再消费,缓冲区:缓冲n个元素: 消费一次最多可以拿到n个元素; 凑满数批量发给消费者。
blockFirst()
:阻塞当前线程,直到 Flux 发出其第一个元素 ,然后返回该元素
public void blockFirst() throws InterruptedException {
List<String> block = Flux.just("a", "b", "c","d","e")
.map(String::toUpperCase)
.buffer(3) //创建缓冲区,接收到3个元素转为集合在一起返回订阅者
.blockFirst(); //阻塞当前线程,收到第一个元素就结束阻塞
System.out.println(block);
Thread.sleep(1000);
}
/* 首先是通过buffer设置缓冲区,然后把前三个元素一起发生过来到blockFirst,所以接受到的第一个元素就是被buffer转为数组的元素
[A, B, C]
bufferUntilChanged()
:作用和buffer一样,也是设置缓冲区,只不过不是固定的长度,而是根据条件分隔
public void bufferUntilChanged() throws InterruptedException {
Flux<List<String>> flux = Flux.just("a1", "b1", "c2", "d2", "e1")
.map(String::toUpperCase)
.bufferUntilChanged(i -> i.contains("2")); //设置分割缓冲区的条件
flux.subscribe(v-> System.out.println("v1:"+v));
Thread.sleep(1000);
}
/** 包含2则拆分,然后2结束拆分,不会去跳着拆分
v1:[A1, B1]
v1:[C2, D2]
v1:[E1]
cache()
:缓存数据,把订阅的数据缓存,可以设置缓存大小,当第二个订阅者来获取数据的时候就只能到缓存区去取数据
public void cache() throws IOException {
Flux<String> cache = Flux.just("a", "b", "c" ,"d" , "e")
.cache(2);
cache.subscribe(v-> System.out.println("v1:"+v));
cache.subscribe(v-> System.out.println("v2:"+v));
System.in.read();
}
/* 订阅者1直接获取到全部数据,然后把数据缓存到缓冲区,但只要两个大小,所以订阅者2就只拿到了最后放入缓冲区的两个数据
v1:a
v1:b
v1:c
v1:d
v1:e
v2:d
v2:e
handle()
:自定义流中元素处理规则,内部可以自定义设置处理规则,然后再通过sink.next(),把处理后的数据发送到下一个节点
public void handle() throws InterruptedException {
Flux.just("a", "b", "c")
.delayElements(Duration.ofSeconds(1))
.handle((v, sink)->{
sink.next("自定义增强:"+v);
})
.subscribe(System.out::println);
Thread.sleep(5000);
}
/* 获取结果
自定义增强:a
自定义增强:b
自定义增强:c
switchIfEmpty()
:如果为空则动态兜底获取数据
public void switchIfEmpty() throws InterruptedException {
Mono.just("add")
.switchIfEmpty(Mono.just("xx")) //如果为空,则动态兜底去调取方法
.log()
.subscribe(System.out::println);
Thread.sleep(5000);
}
defaultIfEmpty()
:如果为空,则调用当前的兜底的默认数据,作用同上类似
public void defaultIfEmpty() throws InterruptedException {
Mono.just("add")
.defaultIfEmpty("xx") //如果为空,则动态兜底去调取方法
.log()
.subscribe(System.out::println);
Thread.sleep(5000);
}
2.3、订阅流
常用汇总
方法 | 作用 |
---|---|
collectList() | 把flux转为Mono的list。 |
block() | 阻塞当前线程等待完成。 |
subscribe() | 订阅流,订阅的时候可以有多种方式订阅。除了直接订阅的,还可以捕获异常,以及监听结束等。 |
详情使用
collectList()
:把flux转为Mono的list。这个开发中会很常用
public void collectList() throws InterruptedException {
Mono<List<String>> mono = Flux.just("a", "b", "c", "d", "e")
.collectList();
mono.subscribe(v-> System.out.println("v:"+v));
Thread.sleep(1000);
}
/* 结果
v:[a, b, c, d, e]
block()
:阻塞当前线程等待完成。一般不建议这样写,这样就丧失了响应式编程的本质了
public void block() throws InterruptedException {
Mono<List<String>> mono = Flux.just("a", "b", "c", "d", "e")
.collectList();
List<String> block = mono.block();
System.out.println(block);
Thread.sleep(1000);
}
/* 直接阻塞获取到结果
[a, b, c, d, e]
subscribe()
:订阅流,订阅的时候可以有多种方式订阅。除了前面用的,直接订阅的,还可以捕获异常,以及监听结束等。
public void subscribe() throws InterruptedException {
Flux<String> flux = Flux.just("a", "b", "c", "d", "e")
.map(String::toUpperCase);
flux.subscribe(v-> System.out.println("v:"+v),
throwable -> System.out.println("异常"+throwable.getMessage()),
()-> System.out.println("流执行完成!!"));
Thread.sleep(1000);
}
/* 正常结束,如果我异常结束会被捕获到
v:A
v:B
v:C
v:D
v:E
流执行完成!!
BaseSubscriber()
:subscribe还可以通过BaseSubscriber来自定义消费者。
public void BaseSubscriber() throws InterruptedException {
Flux<String> flux = Flux.just("a", "b", "c", "d", "e")
.map(String::toUpperCase);
// Flux<String> flux2 = Flux.just("a", "b", "c", "d", "e")
// .map(v->{
// if (v.equals("b")) {throw new RuntimeException("b");}
// return v;
// });
flux.subscribe(new BaseSubscriber<String>(){
@Override
protected void hookOnSubscribe(Subscription subscription) {
System.out.println(Thread.currentThread()+"流开始了:"+subscription);
request(1);
}
@Override
protected void hookOnNext(String value) {
System.out.println(Thread.currentThread()+"开始接收元素:"+value);
request(1); //持续接收元素
}
@Override
protected void hookOnComplete() {
System.out.println(Thread.currentThread()+"流正常结束");
}
@Override
protected void hookOnError(Throwable throwable) {
System.out.println(Thread.currentThread()+"流错误结束:"+throwable);
}
@Override
protected void hookOnCancel() {
System.out.println(Thread.currentThread()+"流被取消");
}
@Override
protected void hookFinally(SignalType type) {
System.out.println(Thread.currentThread()+"最终回调");
}
});
Thread.sleep(6000);
}
/* 到对应的节点时会 触发对应的回调方法
Thread[main,5,main]流开始了:reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber@782e6b40
Thread[main,5,main]开始接收元素:A
Thread[main,5,main]开始接收元素:B
Thread[main,5,main]开始接收元素:C
Thread[main,5,main]开始接收元素:D
Thread[main,5,main]开始接收元素:E
Thread[main,5,main]流正常结束
Thread[main,5,main]最终回调
声明
本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。