【2024】JAVA实现响应式编程Web Flux的Reactor具体API文档使用说明

方渐鸿 2024-10-26 15:03:02 阅读 52

目录💻

前言一、简介1、响应式编程概述背景知识什么是响应式编程具体概述应用场景:常用的库和框架

二、 Reactor实现响应式编程1、Flux 和 Mono介绍Flux:Mono:Flux 和 Mono 的区别:Flux 和 Mono 的关系:

2、常用API使用添加依赖2.1、生产流常用汇总

2.1.1、直接创建2.1.2、使用Sinks工具类

2.2、中间操作常用汇总

2.2.1、变换数据2.2.2、Context API2.2.3、合并组合Flux2.2.3、并发控制Flux2.2.4、doOnxxx:感知事件相关的 API2.2.4、onErrorXXX:异常处理相关的 API2.2.4、其他工具 API

2.3、订阅流常用汇总详情使用

前言

响应式编程 (Reactive Programming) 是一种声明式编程范式,专注于数据流和变化的传播。随着软件系统日益复杂,对高并发、实时性和弹性的需求不断增长,响应式编程正逐渐成为主流。特别从Spring Boot3开始逐渐越来越重视使用,并且Spring框架为了全面拥抱响应式编程,提供了Spring WebFlux、Spring Data Reactive等模块,为Java开发者构建响应式应用提供了强大的支持。

一、简介

1、响应式编程概述

背景知识

为了应对高并发服务器端开发场景,在2009 年,微软提出了一个更优雅地实现异步编程的方式——Reactive Programming,我们称之为响应式编程。随后,Netflix 和LightBend 公司提供了RxJava 和Akka Stream 等技术,使得Java 平台也有了能够实现响应式编程的框架。

在2017 年9 月28 日,Spring 5 正式发布。Spring 5 发布最大的意义在于,它将响应式编程技术的普及向前推进了一大步。而同时,作为在背后支持Spring 5 响应式编程的框架Spring Reactor,也进入了里程碑式的3.1.0 版本。

什么是响应式编程

响应式编程是一种面向数据流和变化传播的编程范式。这意味着可以在编程语言中很方便地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。

响应式编程基于reactor(Reactor 是一个运行在 Java8 之上的响应式框架)的思想,当你做一个带有一定延迟的才能够返回的io操作时,不会阻塞,而是立刻返回一个流,并且订阅这个流,当这个流上产生了返回数据,可以立刻得到通知并调用回调函数处理数据。

电子表格程序就是响应式编程的一个例子。单元格可以包含字面值或类似"=B1+C1"的公式,而包含公式的单元格的值会依据其他单元格的值的变化而变化。

响应式传播核心特点之一:变化传播:一个单元格变化之后,会像多米诺骨牌一样,导致直接和间接引用它的其他单元格均发生相应变化。

具体概述

响应式编程 (Reactive Programming) 是一种声明式的编程范式,它关注于数据流和变化的传播。这意味着可以通过定义数据流和它们之间的关系来构建应用程序,当数据发生变化时,应用程序会自动做出响应。

核心概念:

Publisher:发布者;产生数据流Subscriber:订阅者;消费数据流Subscription:订阅关系

订阅关系是发布者和订阅者之间的关键接口。订阅者通过订阅来表示对发布者产生的数据的兴趣。订阅者可以请求一定数量的元素,也可以取消订阅。 Processor:处理器

处理器是同时实现了发布者和订阅者接口的组件,它可以接收来自一个发布者的数据,进行处理,并将结果发布给下一个订阅者。处理器在Reactor中充当中间环节,代表一个处理阶段,允许你在数据流中进行转换、过滤和其他操作。

这种模型遵循Reactive Streams规范,确保了异步流的一致性和可靠性。

在这里插入图片描述

应用场景:

实时数据流处理: 例如股票交易系统、传感器数据监控、网络游戏等。用户界面开发: 例如响应式Web应用、移动应用等。微服务架构: 响应式编程可以帮助构建更加弹性和可扩展的微服务系统。大数据处理: 响应式编程可以用于处理大规模数据集,例如使用Spark Streaming或Apache Flink。

常用的库和框架

RxJava: Java的响应式扩展库,提供了丰富的操作符和工具。Reactor: Java的响应式编程框架,由Pivotal开发,是Spring WebFlux的基础。Kotlin Coroutines: Kotlin的协程库,提供了轻量级的异步编程模型,可以与响应式编程框架集成。Spring WebFlux: Spring框架的响应式Web框架,用于构建响应式Web应用。

二、 Reactor实现响应式编程

目前java要实现响应式编程主要就是使用Reactor进行使用, Reactor 也是 Spring WebFlux 的基础,所以在使用上可以与 Spring 框架无缝集成,构建响应式 Web 应用。使用下面我们要介绍的就是Reactor的常用API的使用,基本上学会了Reactor,就可以直接使用Spring WebFlux 构建高性能、可扩展的 Web 应用。

并且使用Spring WebFlux 构建的非阻塞 I/O 和事件驱动模型可以充分利用系统资源,可以有效的提高应用程序的性能和效率。相对于Spring MVC 这种阻塞式来说在性能上会得到很大的提升,

而且响应式编程可以更好地处理并发和异步操作,提高系统的弹性和可扩展性。

1、Flux 和 Mono介绍

在 Reactor 中,Flux 和 Mono 是两个核心组件,用于表示异步数据流。它们都实现了 Reactive Streams 规范中的 Publisher 接口,可以发出零个或多个元素。

Flux:

表示一个可以发出零个或多个元素的异步序列。

可以用于表示任何类型的数据流,例如用户输入事件、传感器数据、数据库查询结果等。

提供了丰富的操作符,用于对数据流进行转换、过滤、合并、延迟等操作。

<code>Flux<String> names = Flux.just("Alice", "Bob", "Charlie");

names.subscribe(System.out::println); // 输出: Alice Bob Charlie

Mono:

表示一个最多发出一个元素的异步序列(1个或者0个)。通常用于表示单个结果,例如数据库查询结果、HTTP 请求响应等。也提供了一些操作符,用于对结果进行转换和处理。

Mono<String> name = Mono.just("Alice");

name.subscribe(System.out::println); // 输出: Alice

Flux 和 Mono 的区别:
特性 Flux Mono
发射元素个数 零个或多个 零个或一个
使用场景 表示数据流 表示单个结果
操作符 提供丰富的操作符 提供一些操作符
Flux 和 Mono 的关系:

Mono 可以看作是 Flux 的特例,它最多只发出一个元素。可以使用 Flux.from(mono) 将 Mono 转换为 Flux。可以使用 mono.flux() 将 Mono 转换为 Flux。可以使用 flux.single() 或 flux.singleOrEmpty() 将 Flux 转换为 Mono,但前提是 Flux 必须只包含一个元素或为空。

2、常用API使用

在具体使用上Flux和Mono的API使用都差不多,在分类上,我们可以大致分为三类,用来产生生产流的和中间做转换的,还有结束流的

添加依赖

<dependency>

<groupId>io.projectreactor</groupId>

<artifactId>reactor-core</artifactId>

</dependency>

<dependency>

<groupId>io.projectreactor</groupId>

<artifactId>reactor-test</artifactId>

<scope>test</scope>

</dependency>

2.1、生产流

也就是创建Flux和Mono的API

常用汇总
方法 作用
just(T... data) 创建一个发射指定元素的 Flux
fromIterable(Iterable<? extends T> it) 从 Iterable(集合) 创建一个 Flux
fromArray(T[] array) 从数组创建 Flux
fromStream(Stream<? extends T> s) 从Stream中创建 Flux,意味着我们可以把jdk8的Stream也直接进行无缝衔接转为Flux
range(int start, int count) 创建发射指定范围的 Flux,一般可以用作计数器或者读秒配合delayElements()
empty() 创建一个不发射任何元素的 Flux,mono也一样。
Flux<T> error(Throwable error 创建一个发射错误信号的 Flux
2.1.1、直接创建

Flux<T> just(T... data):创建一个发射指定元素的 Flux

public void just() throws IOException {

Flux<String> just = Flux.just("a", "b", "c", "d", "e");

just.subscribe(System.out::println);

System.in.read();

}

/*得到的结果

a

b

c

d

e

Flux<T> fromIterable(Iterable<? extends T> it) :从 Iterable(集合) 创建一个 Flux

public void fromIterable() throws IOException {

List<String> list = Arrays.asList("a", "b", "c", "d", "e");

Flux<String> just = Flux.fromIterable(list);

just.subscribe(System.out::println);

System.in.read();

}

/*得到的结果

a

b

c

d

e

Flux<T> fromArray(T[] array):从数组创建 Flux

public void fromArray() throws IOException {

String[] arrays = { "a", "b", "c", "d", "e"};

Flux<String> just = Flux.fromArray(arrays);

just.subscribe(System.out::println);

System.in.read();

}

/*得到的结果

a

b

c

d

e

Flux<T> fromStream(Stream<? extends T> s):从Stream中创建 Flux,意味着我们可以把jdk8的Stream也直接进行无缝衔接转为Flux

public void fromStream() throws IOException {

Stream<String> stream = Stream.of("a", "b", "c", "d", "e");

Flux<String> just = Flux.fromStream(stream);

just.subscribe(System.out::println);

System.in.read();

}

/*得到的结果

a

b

c

d

e

Flux<Integer> range(int start, int count):创建发射指定范围的 Flux,一般可以用作计数器或者读秒配合delayElements()

public void range() throws IOException {

Flux<Integer> just = Flux.range(0,5)

.delayElements(Duration.ofSeconds(1)); //一秒发射一次

just.subscribe(System.out::println);

System.in.read();

}

/* 得到的结果

0

1

2

3

4

Flux<T> empty():创建一个不发射任何元素的 Flux,mono也一样。

public void empty() throws IOException {

Mono<Integer> mono = Mono.empty();

Flux<Integer> flux = Flux.empty();

flux.subscribe(System.out::println);

System.in.read();

}

/* 订阅者不会接受到任何结果

Flux<T> error(Throwable error):创建一个发射错误信号的 Flux

public void error() throws IOException {

Flux<Integer> just = Flux.error(new RuntimeException("执行错误!"));

just.subscribe(System.out::println);

System.in.read();

}

/*

reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: 执行错误!

2.1.2、使用Sinks工具类

Sinks 是一个用于创建各种类型的 Sink工厂类,然后在通过asFlux方法把Sink转为Flux。具体分别为:

One<T> one():创建一个只接收一个数据的 Sink。

public void one(){

Sinks.One<Integer> one = Sinks.one();

one.tryEmitValue(1);

one.tryEmitValue(2);

Mono<Integer> mono = one.asMono();

mono.subscribe(System.out::println);

}

/* 不管发射多少个消息到one里都只会读取一个

1

many():创建一个可以接收多个数据的 Sink。

unicast():单播,只能有一个消费者订阅转换后到flux,第二个去订阅时会报错

public void unicast() throws IOException {

Sinks.Many<Integer> sink = Sinks.many()

.unicast()

.onBackpressureBuffer();

// 生产消息

sink.tryEmitNext(1);

sink.tryEmitNext(2);

Flux<Integer> flux = sink.asFlux();

flux.subscribe(v->System.out.println("p1:"+v));

//如果有第二个消费者来消费消息,会直接抛出错误

flux.subscribe(v->System.out.println("p2:"+v));

sink.tryEmitNext(3);

sink.tryEmitNext(4);

System.in.read();

}

/* 执行到订阅者2时会报错

multicast():多播。

Sinks.many() 方法创建的多播 Sink 可以选择不同的策略解决背压问题。不同策略之间的区别主要体现在 如何处理下游消费者无法及时处理数据的情况。

背压 (Backpressure):在响应式编程中,背压指的是下游消费者无法及时处理上游生产者发送的数据,导致数据堆积,最终可能导致内存溢出等问题。也就是生产者生产的速度大于消费者消费的速度

onBackpressureBuffer():如果下游消费者无法及时处理数据,则 Sink 会将数据缓存到一个缓冲区中,直到消费者能够处理为止,缓冲区可以设置大小

public void onBackpressureBuffer() throws IOException {

Sinks.Many<Integer> sink = Sinks.many()

.multicast()

.onBackpressureBuffer(); //开启缓冲区

sink.tryEmitNext(1);

sink.tryEmitNext(2);

Flux<Integer> flux = sink.asFlux();

//在订阅之前发送的消息会先被放在缓冲区

flux.subscribe(v->System.out.println("p1:"+v));

sink.tryEmitNext(3);

sink.tryEmitNext(4);

flux.subscribe(v->System.out.println("p2:"+v));

sink.tryEmitNext(5);

sink.tryEmitNext(6);

System.in.read();

}

/* 第二个订阅的消费者会从订阅的时间开始接收消息,前面存放缓冲区的消息无法接收到

p1:1

p1:2

p1:3

p1:4

p1:5

p2:5

p1:6

p2:6

directBestEffort():如果下游消费者无法及时处理数据,则 Sink 会尽力将数据发送给消费者,但可能会丢弃一些数据。

public void directBestEffort() throws IOException {

Sinks.Many<Integer> sink = Sinks.many()

.multicast()

.directBestEffort();

sink.tryEmitNext(1);

sink.tryEmitNext(2);

sink.tryEmitNext(3);

sink.tryEmitNext(4);

Flux<Integer> flux = sink.asFlux();

flux.subscribe(v->System.out.println("p1:"+v));

sink.tryEmitNext(5);

sink.tryEmitNext(6);

flux.subscribe(v->System.out.println("p2:"+v));

sink.tryEmitNext(7);

sink.tryEmitNext(8);

System.in.read();

}

/*

p1:5

p1:6

p1:7

p2:7

p1:8

p2:8

directAllOrNothing():如果下游消费者无法及时处理数据,则 Sink 会直接丢弃所有后续数据,并发出一个 onError 信号。操作同上

empty():创建一个空的 Sink

2.2、中间操作

常用汇总
方法 作用
map() 将每个元素映射到另一个类型的值。
flatMap() 将每个元素映射到一个新的 Publisher,并将其扁平化成一个新的 Flux
transform() transformer会立即执行传递给它的转换函数,并生成新的 Flux。
filter() 根据指定条件过滤
take(long n) 获取到指定长度的结果结束
skip(long skipped) 从第几个元素开始获取
distinct() 去重,去除重复元素
contextWrite() 在整个响应式链中传递上下文信息
merge() 合并多个flux,按照源里每个元素的写入的时间排序,不会保证顺序
zip() 压缩多个flux,按照下标把每个flux的元素都放入一个数组
combineLatest() 把多个flux按照下标把值压缩再一起进行处理
publishOn(Scheduler scheduler) 并发处理,指定后续操作的执行线程池
doOnComplete(Runnable onComplete) 当数据流完成时执行指定的操作,可以用于清理资源、记录日志等。
doOnEach(Consumer<? super Signal<T>> signalConsumer) 每个元素(流的数据和信号)到达的时候触发
onErrorReturn() 吃掉异常,消费者无异常感知,返回一个兜底默认值,并结束flux流
onErrorContinue() 忽略当前异常,仅通知记录,继续推进
retry() 当发生错误时,重新订阅 Flux 流,最多尝试指定次数
buffer() 缓冲指定元素再消费
cache() 缓存数据,把订阅的数据缓存,可以设置缓存大小。当第二个订阅者来获取数据的时候就只能到缓存区去取数据
handle() 自定义流中元素处理规则,内部可以自定义设置处理规则,然后再通过sink.next(),把处理后的数据发送到下一个节点
switchIfEmpty() 如果流数据为空则动态兜底获取数据
2.2.1、变换数据

map():将每个元素映射到另一个类型的值。

public void map(){

Flux<String> flux = Flux.just("a", "b", "c", "d", "e")

.map(String::toUpperCase)

.log();

flux.subscribe(System.out::println);

}

flatMap():将每个元素映射到一个新的 Publisher,并将其扁平化成一个新的 Flux

public void flatMap() throws IOException {

Flux<String> flux = Flux.just("a1", "b2", "c3", "d4", "e5")

.flatMap(v->

Flux.just(v.split(""))

);

flux.subscribe(System.out::println);

System.in.read();

}

transformDeferredtransform都是用来转换 Flux 数据流的运算符。区别在于 转换逻辑的执行时机

transformDeferred() 运算符会延迟执行传递给它的转换函数,直到下游订阅者订阅时才会执行。每个订阅者都会独立地执行转换逻辑,生成自己的转换结果。适用于复杂、有状态的转换

public void transformDeferred() throws IOException {

AtomicInteger atomicInteger = new AtomicInteger();

Flux<String> flux = Flux.just("a", "b", "c")

.transformDeferred(v->{ //

int andIncrement = atomicInteger.getAndIncrement(); //把值加一

return v.map(it->it+andIncrement);

});

//transformDeferred中,每个订阅者都会去执行一次transformDeferred,等于每个订阅者都会有自己独立的结果

flux.subscribe(v-> System.out.println("订阅者1:"+v));

flux.subscribe(v-> System.out.println("订阅者2:"+v));

System.in.read();

}

/* transformDeferred每个订阅者都会独立生成结果

订阅者1:a0

订阅者1:b0

订阅者1:c0

订阅者2:a1

订阅者2:b1

订阅者2:c1

transform():transformer会立即执行传递给它的转换函数,并生成新的 Flux。由于转换逻辑立即执行,所有订阅者都会共享同一个转换结果。适用于简单、无状态的转换

public void transform() throws IOException {

AtomicInteger atomicInteger = new AtomicInteger();

Flux<String> flux = Flux.just("a", "b", "c")

.transform(v->{

int andIncrement = atomicInteger.getAndIncrement();

return v.map(it->it+andIncrement);

});

//不管多少个订阅者都会共享transform这一个转换的结果,等于transform只会执行一次

flux.subscribe(v-> System.out.println("订阅者1:"+v));

flux.subscribe(v-> System.out.println("订阅者2:"+v));

System.in.read();

}

/*transform所有订阅者都会共享同一个转换结果

订阅者1:a0

订阅者1:b0

订阅者1:c0

订阅者2:a0

订阅者2:b0

订阅者2:c0

filter():根据指定条件过滤

public void filter() throws IOException {

Flux<String> flux = Flux.just("a", "b", "c")

.filter(v->

v.equals("b")

);

flux.subscribe(System.out::println);

System.in.read();

}

/*生成结果

b

take(long n):获取到指定长度的结果结束

public void take() throws IOException {

Flux<String> flux = Flux.just("a", "b", "c","d","e")

.take(3);

flux.subscribe(System.out::println);

System.in.read();

}

/*生成结果

a

b

c

skip(long skipped):从第几个元素开始获取

public void skip() throws IOException {

Flux<String> flux = Flux.just("a", "b", "c","d","e")

.skip(3);

flux.subscribe(System.out::println);

System.in.read();

}

/*生成结果

d

e

distinct():去重,去除重复元素

public void distinct() throws IOException {

Flux<String> flux = Flux.just("a", "a", "c","a","e")

.distinct();

flux.subscribe(System.out::println);

System.in.read();

}

/*

a

c

e

2.2.2、Context API

在Flux响应式编程中,Context API用于在整个响应式链中传递上下文信息,例如订阅者信息、调度器、钩子等。它类似于线程本地存储,但作用于响应式流。

可以使用Context API添加钩子函数,例如在响应式流的每个元素处理前后执行特定操作。

用法:

transformDeferredContextual():两个参数一个是当前的flux对象,一个是Context上下文对象contextWrite():一个参数,就是Context对象Context:通过of方法把数据通过key,value的方式写入到flux中

of(Object key1, Object value1, Object key2, Object value2) :如果有多组参数要传入,通过of的(k, v, k, v)的方式这样写

public void context() throws IOException {

Flux<String> fluxs = Flux.just(1, 2, 3)

.transformDeferredContextual((flux, context) -> { //它会在每个元素处理之前,提供当前的 Flux 和下面添加的上下文context对象信息。

System.out.println("flux:" + flux);

System.out.println("context:" + context);

return flux.map(i -> i + "====>" + context.get("key1")); //对每个元素进行处理,将元素与上下文中的 "key" 值拼接在一起。

}).map(String::toUpperCase);

//可以在后面把context通过contextWrite添加到上下文中

Context context = Context.of("key1", "zhangsan","key2", "list");

fluxs.contextWrite(context) //设置上下文中的 "key" 为 "zhangsan"。

.subscribe(v-> System.out.println("v = " + v));

System.in.read();

}

/* 返回的结果

flux:FluxArray 当前flux

context:Context2{key1=zhangsan, key2=list} Context对象,几组k,v,名字就是几

v = 1====>ZHANGSAN

v = 2====>ZHANGSAN

v = 3====>ZHANGSAN

2.2.3、合并组合Flux

merge():合并多个flux,按照源里每个元素的写入的时间排序,不会保证顺序

public void merge() throws InterruptedException {

Flux<String> flux = Flux.just("a","b");

Mono<String> mono = Mono.just("c");

Flux.merge(mono,flux)

.subscribe(System.out::println);

Thread.sleep(5000);

}

/* 输出结果,可能会出现a、c、b

a

b

c

concat():合并多个flux,会严格保证元素的顺序。

public void concat() throws InterruptedException {

Flux<String> flux1 = Flux.just("a","b");

Flux<String> flux2 = Flux.just("c","d");

Flux.concat(flux1,flux2)

.subscribe(System.out::println);

Thread.sleep(5000);

}

/* 输出结果,一定会说先输出完flux1,再到flux2

a

b

c

d

zip():压缩多个flux,按照下标把每个flux的元素都放入一个数组,如果多个flux的长度不一样,以最短的为长度

public void zip() throws IOException {

Flux<String> flux1 = Flux.just("a", "b", "c");

Flux<Integer> flux2 = Flux.just(1, 2, 3,4);

Flux<String> flux3 = Flux.just("A", "B", "C","D","E");

Flux.zip(flux1, flux2, flux3)

.map(Tuple2::toString)

.subscribe(System.out::println);

System.in.read();

}

/* 输出结果。会保证每个数组的长度都一样

[a,1,A]

[b,2,B]

[c,3,C]

zipWith:压缩拼接,作用同上,只不过是只能一个一个的压缩。

public void zipWith() throws IOException {

Flux<String> flux1 = Flux.just("a", "b", "c");

Flux.just(1, 2, 3, 4)

.zipWith(flux1)

.subscribe(System.out::println);

System.in.read();

}

/* 输出结果。

[1,a]

[2,b]

[3,c]

combineLatest():把多个flux按照下标把值压缩再一起进行处理

public void combineLatest() throws IOException {

Flux<String> flux1 = Flux.just("a", "b", "c");

Flux<Integer> flux2 = Flux.just(1, 2, 3,4);

Flux<String> flux3 = Flux.just("A", "B", "C");

// 两个的写法

// Flux.combineLatest(flux1, flux2, (f1,f2)->f1+"-"+f2)

// .map(String::toUpperCase)

// .subscribe(System.out::println);

// 多个的写法

Iterable<Publisher<?>> publishers = Arrays.asList(flux1, flux2,flux3);

Flux.combineLatest(publishers,fs->fs[0]+"-"+fs[1]+"-"+fs[2])

.map(String::toUpperCase)

.subscribe(System.out::println);

System.in.read();

}

/* 获取按照下标拼接后的值

C-4-A

C-4-B

C-4-C

2.2.3、并发控制Flux

Scheduler:调度器

Reactor 中,Schedulers 提供了多种指定线程的方式,可以根据不同的场景选择合适的 Scheduler 来执行任务。下面是常用的Scheduler

Schedulers.immediate():默认,无执行上下文,当前线程运行所有操作

Schedulers.single():使用固定的单线程

Schedulers.boundedElastic():使用一个有界弹性线程池执行任务。最大线程数为 CPU 核心数 * 10。

Schedulers.parallel():使用一个固定大小的线程池执行任务,线程池的大小默认为 CPU 核心数

Schedulers.fromExecutor(Executor executor):使用自定义的 Executor 执行任务

Schedulers.newParallel(String name, int parallelism):也可以通过new的方式自定义Scheduler ,直接指定线程池大小,作用同上

parallel(int parallelism):并行处理,将 Flux 的数据流分成多个并行的轨道

runOn():指定后续操作的执行线程,一般会配合parallel()一起使用,使用parallel控制并发数,再通过runOn绑定线程池

public void parallel() throws IOException {

Flux.just("a","b","c","d","e")

.log()

.parallel(4) //指定并行数

.runOn(Schedulers.newParallel("yy")) //指定线程名称

.map(String::toUpperCase)

.log()

.subscribe();

System.in.read();

}

/* 通过log查看使用runOn前后线程,可以看到Map操作都是使用的runOn指定的yy名称的线程执行的

[ main] reactor.Flux.Array.1 : | onNext(a)

[ main] reactor.Flux.Array.1 : | onNext(b)

[ yy-2] reactor.Parallel.Map.2 : onNext(A)

[ main] reactor.Flux.Array.1 : | onNext(c)

[ main] reactor.Flux.Array.1 : | onNext(d)

[ yy-3] reactor.Parallel.Map.2 : onNext(B)

[ yy-5] reactor.Parallel.Map.2 : onNext(D)

[ main] reactor.Flux.Array.1 : | onNext(e)

[ yy-4] reactor.Parallel.Map.2 : onNext(C)

[ yy-2] reactor.Parallel.Map.2 : onNext(E)

publishOn(Scheduler scheduler):指定下游操作执行的线程,作用差不多相当于上面两个的组合。不会影响上游操作的执行线程,只影响其后的操作符

public void publishOn() throws InterruptedException {

//自定义线程池

Scheduler scheduler = Schedulers.fromExecutor(new ThreadPoolExecutor(

4,

8,

60,

TimeUnit.SECONDS,

new LinkedBlockingQueue<>(100),

(r)->{

Thread thread = new Thread(r);

thread.setName("yy:"+ thread.getName());

return thread;

}));

Flux.just("a", "b", "c", "d", "e")

.log()

.publishOn(scheduler) //指定

.map(String::toUpperCase)

.log().subscribe();

Thread.sleep(1000);

}

/* 执行结果,因为比较快所以只用了一条线程

[ main] reactor.Flux.Array.1 : | onNext(a)

[ main] reactor.Flux.Array.1 : | onNext(b)

[ main] reactor.Flux.Array.1 : | onNext(c)

[ main] reactor.Flux.Array.1 : | onNext(d)

[ main] reactor.Flux.Array.1 : | onNext(e)

[ main] reactor.Flux.Array.1 : | onComplete()

[ yy:Thread-2] reactor.Flux.MapFuseable.2 : | onNext(A)

[ yy:Thread-2] reactor.Flux.MapFuseable.2 : | onNext(B)

[ yy:Thread-2] reactor.Flux.MapFuseable.2 : | onNext(C)

[ yy:Thread-2] reactor.Flux.MapFuseable.2 : | onNext(D)

[ yy:Thread-2] reactor.Flux.MapFuseable.2 : | onNext(E)

subscribeOn(Scheduler scheduler):指定订阅发生和上游操作执行的线程

public void subscribeOn() throws InterruptedException {

Scheduler scheduler = Schedulers.fromExecutor(new ThreadPoolExecutor(

4,

8,

60,

TimeUnit.SECONDS,

new LinkedBlockingQueue<>(100),

(r)->{

Thread thread = new Thread(r);

thread.setName("yy:"+ thread.getName());

return thread;

}));

Flux.just("a", "b", "c", "d", "e")

.log()

.subscribeOn(scheduler)

.map(String::toUpperCase)

.log().subscribe();

Thread.sleep(1000);

}

/* 使用subscribeOn后,会把上游的操作也全部使用指定的线程

[ yy:Thread-1] reactor.Flux.Array.1 : | onNext(a)

[ yy:Thread-1] reactor.Flux.Map.2 : onNext(A)

[ yy:Thread-1] reactor.Flux.Array.1 : | onNext(b)

[ yy:Thread-1] reactor.Flux.Map.2 : onNext(B)

[ yy:Thread-1] reactor.Flux.Array.1 : | onNext(c)

[ yy:Thread-1] reactor.Flux.Map.2 : onNext(C)

[ yy:Thread-1] reactor.Flux.Array.1 : | onNext(d)

[ yy:Thread-1] reactor.Flux.Map.2 : onNext(D)

[ yy:Thread-1] reactor.Flux.Array.1 : | onNext(e)

[ yy:Thread-1] reactor.Flux.Map.2 : onNext(E)

2.2.4、doOnxxx:感知事件相关的 API

在 Reactor 中,感知事件相关的 API 主要用于处理数据流中的特殊事件,例如数据流的完成、错误以及取消等。这些 API 可以帮助我们更好地控制数据流的行为,并对不同的事件做出相应的处理。

doOnComplete(Runnable onComplete):当数据流完成时执行指定的操作,可以用于清理资源、记录日志等。

public void doOnComplete() throws InterruptedException {

Flux<String> flux = Flux.just("a", "b", "c")

.map(String::toUpperCase)

.doOnComplete(() -> System.out.println("执行完成!"));

flux.subscribe(v-> System.out.println("v1:"+v));

flux.subscribe(v-> System.out.println("v2:"+v));

Thread.sleep(1000);

}

/* 输出结果,会再消费完成时感知

v1:A

v1:B

v1:C

执行完成!

v2:A

v2:B

v2:C

执行完成!

doOnError(Consumer<? super Throwable> onError):当数据流发生错误时执行指定的操作,用作处理异常

public void doOnError() throws InterruptedException {

Flux<String> flux = Flux.just("a", "b", "c")

.map(v-> {

if (v.equals("b")) {

throw new RuntimeException("b");

}

return v;

})

.doOnError((v) -> System.out.println("发生异常==>"+v));

flux.subscribe(v-> System.out.println("v1:"+v));

Thread.sleep(1000);

}

/*

v1:a

发生异常==>java.lang.RuntimeException: b

doOnEach(Consumer<? super Signal<T>> signalConsumer):每个元素(流的数据和信号)到达的时候触发

doOnNext(Consumer<? super T> onNext):每个数据(流的数据)到达的时候触发

public void doOnNext() throws InterruptedException {

Flux<String> flux = Flux.just("a", "b", "c")

.map(String::toUpperCase)

.doOnEach((v) -> System.out.println("Each读取到==>"+v))

.doOnNext((v) -> System.out.println("Next读取到==>"+v));

flux.subscribe(v-> System.out.println("v1:"+v));

Thread.sleep(1000);

}

/* doOnNext只能读取到元素,doOnEach可以获取到信号

Each读取到==>doOnEach_onNext(A)

Next读取到==>A

v1:A

Each读取到==>doOnEach_onNext(B)

Next读取到==>B

v1:B

Each读取到==>doOnEach_onNext(C)

Next读取到==>C

v1:C

Each读取到==>onComplete()

doOnCancel(Runnable onCancel):流被取消时触发,如通过take再还没读取完就取消。正常结束不会被触发

doFinally(Consumer<SignalType> onFinally):流被订阅执行完成终止时触发,包括正常结束和异常结束

public void doOnCancel() throws InterruptedException {

Flux<String> flux = Flux.just("a", "b", "c")

.doOnCancel(() -> System.out.println("流被取消"))

.doFinally((v) -> System.out.println("执行结束"+v))

.take(2);

flux.subscribe(v-> System.out.println("v1:"+v));

flux.subscribe(v-> System.out.println("v2:"+v));

Thread.sleep(1000);

}

/* 正常结束不会有doOnCancel的触发

v1:a

v1:b

流被取消

执行结束cancel

v2:a

v2:b

流被取消

执行结束cancel

doOnRequest(LongConsumer consumer):流被订阅者请求数据时触发

doOnSubscribe(Consumer<? super Subscription> onSubscribe):流被订阅开始订阅时触发

public void doOnRequest() throws InterruptedException {

Flux<String> flux = Flux.just("a", "b", "c")

.map(String::toUpperCase)

.doOnRequest((n) -> System.out.println("流被请求"+n))

.doOnSubscribe((n) -> System.out.println("流被订阅"+n));

flux.subscribe(v-> System.out.println("v1:"+v));

flux.subscribe(v-> System.out.println("v2:"+v));

Thread.sleep(1000);

}

/* 订阅者会需要先订阅在请求数据,所以订阅会被触发在前面

流被订阅reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber@60e06f7d

流被请求9223372036854775807

v1:A

v1:B

v1:C

流被订阅reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber@59b32539

流被请求9223372036854775807

v2:A

v2:B

v2:C

2.2.4、onErrorXXX:异常处理相关的 API

onErrorReturn():吃掉异常,消费者无异常感知,返回一个兜底默认值,并结束flux流

public void onErrorReturn() throws InterruptedException {

Flux<String> flux = Flux.just("a", "b", "c")

.map(v->{

if(v.equals("b"))

throw new RuntimeException("b");

else

return v;

}).onErrorReturn("Error"); //发生异常返回兜底

flux.subscribe(v-> System.out.println("v1:"+v));

Thread.sleep(1000);

}

/* 会返回兜底的异常输出

v1:a

v1:Error

onErrorResume():当发生错误时,使用另一个 Flux 继续执行流程。相当于一个补偿结果

public void onErrorResume() throws InterruptedException {

Flux<String> flux = Flux.just("a", "b", "c")

.map(v->{

if(v.equals("b"))

throw new RuntimeException("b");

else

return v;

}).onErrorResume(e->{

System.err.println("发生异常==>"+e);

return Flux.just("D","E");

});

flux.subscribe(v-> System.out.println("v1:"+v));

Thread.sleep(1000);

}

/* 发生异常后,原始的flux会停止,会去执行onErrorResume内部的flux

v1:a

发生异常==>java.lang.RuntimeException: b

v1:D

v1:E

onErrorMap():捕获并包装成一个业务异常,并重新抛出,消费者有感知

public void onErrorMap() throws InterruptedException {

Flux<String> flux = Flux.just("a", "b", "c")

.map(v->{

if(v.equals("b"))

throw new RuntimeException("b");

else

return v;

}).onErrorMap(RuntimeException.class,e -> new IllegalArgumentException("计算失败:"+e.getMessage())

);

flux.subscribe(v-> System.out.println("v1:"+v));

Thread.sleep(1000);

}

/** 会捕获异常,然后在转换为onErrorMap内部定义的异常返回

v1:a

reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalArgumentException: 计算失败:b

Caused by: java.lang.IllegalArgumentException: 计算失败:b

onErrorContinue():忽略当前异常,仅通知记录,继续推进

public void onErrorContinue() throws InterruptedException {

Flux<String> flux = Flux.just("a", "b", "c")

.map(v->{

if(v.equals("b"))

throw new RuntimeException("b");

else

return v;

}).onErrorContinue((e, obj) -> 忽略错误,并打印错误信息和当前元素

System.err.println("发生异常: " + e + ", 数据: " + obj)

);

flux.subscribe(v-> System.out.println("v1:"+v));

Thread.sleep(1000);

}

/**

v1:a

发生异常: java.lang.RuntimeException: b, 数据: b

v1:c

retry():当发生错误时,重新订阅 Flux 流,最多尝试指定次数

public void retry() throws InterruptedException {

Flux<String> flux = Flux.just("a", "b", "c")

.map(v->{

if(v.equals("b"))

throw new RuntimeException("b");

else

return v;

}).retry(2); //最多尝试两次

flux.subscribe(v-> System.out.println("v1:"+v));

Thread.sleep(1000);

}

/* 发生错误时尝试重新订阅,如果超过设置的次数,则抛出错误

v1:a

v1:a

v1:a

reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: b

2.2.4、其他工具 API

buffer():缓冲指定元素再消费,缓冲区:缓冲n个元素: 消费一次最多可以拿到n个元素; 凑满数批量发给消费者。

blockFirst():阻塞当前线程,直到 Flux 发出其第一个元素 ,然后返回该元素

public void blockFirst() throws InterruptedException {

List<String> block = Flux.just("a", "b", "c","d","e")

.map(String::toUpperCase)

.buffer(3) //创建缓冲区,接收到3个元素转为集合在一起返回订阅者

.blockFirst(); //阻塞当前线程,收到第一个元素就结束阻塞

System.out.println(block);

Thread.sleep(1000);

}

/* 首先是通过buffer设置缓冲区,然后把前三个元素一起发生过来到blockFirst,所以接受到的第一个元素就是被buffer转为数组的元素

[A, B, C]

bufferUntilChanged():作用和buffer一样,也是设置缓冲区,只不过不是固定的长度,而是根据条件分隔

public void bufferUntilChanged() throws InterruptedException {

Flux<List<String>> flux = Flux.just("a1", "b1", "c2", "d2", "e1")

.map(String::toUpperCase)

.bufferUntilChanged(i -> i.contains("2")); //设置分割缓冲区的条件

flux.subscribe(v-> System.out.println("v1:"+v));

Thread.sleep(1000);

}

/** 包含2则拆分,然后2结束拆分,不会去跳着拆分

v1:[A1, B1]

v1:[C2, D2]

v1:[E1]

cache():缓存数据,把订阅的数据缓存,可以设置缓存大小,当第二个订阅者来获取数据的时候就只能到缓存区去取数据

public void cache() throws IOException {

Flux<String> cache = Flux.just("a", "b", "c" ,"d" , "e")

.cache(2);

cache.subscribe(v-> System.out.println("v1:"+v));

cache.subscribe(v-> System.out.println("v2:"+v));

System.in.read();

}

/* 订阅者1直接获取到全部数据,然后把数据缓存到缓冲区,但只要两个大小,所以订阅者2就只拿到了最后放入缓冲区的两个数据

v1:a

v1:b

v1:c

v1:d

v1:e

v2:d

v2:e

handle():自定义流中元素处理规则,内部可以自定义设置处理规则,然后再通过sink.next(),把处理后的数据发送到下一个节点

public void handle() throws InterruptedException {

Flux.just("a", "b", "c")

.delayElements(Duration.ofSeconds(1))

.handle((v, sink)->{

sink.next("自定义增强:"+v);

})

.subscribe(System.out::println);

Thread.sleep(5000);

}

/* 获取结果

自定义增强:a

自定义增强:b

自定义增强:c

switchIfEmpty():如果为空则动态兜底获取数据

public void switchIfEmpty() throws InterruptedException {

Mono.just("add")

.switchIfEmpty(Mono.just("xx")) //如果为空,则动态兜底去调取方法

.log()

.subscribe(System.out::println);

Thread.sleep(5000);

}

defaultIfEmpty():如果为空,则调用当前的兜底的默认数据,作用同上类似

public void defaultIfEmpty() throws InterruptedException {

Mono.just("add")

.defaultIfEmpty("xx") //如果为空,则动态兜底去调取方法

.log()

.subscribe(System.out::println);

Thread.sleep(5000);

}

2.3、订阅流

常用汇总
方法 作用
collectList() 把flux转为Mono的list。
block() 阻塞当前线程等待完成。
subscribe() 订阅流,订阅的时候可以有多种方式订阅。除了直接订阅的,还可以捕获异常,以及监听结束等。
详情使用

collectList():把flux转为Mono的list。这个开发中会很常用

public void collectList() throws InterruptedException {

Mono<List<String>> mono = Flux.just("a", "b", "c", "d", "e")

.collectList();

mono.subscribe(v-> System.out.println("v:"+v));

Thread.sleep(1000);

}

/* 结果

v:[a, b, c, d, e]

block():阻塞当前线程等待完成。一般不建议这样写,这样就丧失了响应式编程的本质了

public void block() throws InterruptedException {

Mono<List<String>> mono = Flux.just("a", "b", "c", "d", "e")

.collectList();

List<String> block = mono.block();

System.out.println(block);

Thread.sleep(1000);

}

/* 直接阻塞获取到结果

[a, b, c, d, e]

subscribe():订阅流,订阅的时候可以有多种方式订阅。除了前面用的,直接订阅的,还可以捕获异常,以及监听结束等。

public void subscribe() throws InterruptedException {

Flux<String> flux = Flux.just("a", "b", "c", "d", "e")

.map(String::toUpperCase);

flux.subscribe(v-> System.out.println("v:"+v),

throwable -> System.out.println("异常"+throwable.getMessage()),

()-> System.out.println("流执行完成!!"));

Thread.sleep(1000);

}

/* 正常结束,如果我异常结束会被捕获到

v:A

v:B

v:C

v:D

v:E

流执行完成!!

BaseSubscriber():subscribe还可以通过BaseSubscriber来自定义消费者。

public void BaseSubscriber() throws InterruptedException {

Flux<String> flux = Flux.just("a", "b", "c", "d", "e")

.map(String::toUpperCase);

// Flux<String> flux2 = Flux.just("a", "b", "c", "d", "e")

// .map(v->{

// if (v.equals("b")) {throw new RuntimeException("b");}

// return v;

// });

flux.subscribe(new BaseSubscriber<String>(){

@Override

protected void hookOnSubscribe(Subscription subscription) {

System.out.println(Thread.currentThread()+"流开始了:"+subscription);

request(1);

}

@Override

protected void hookOnNext(String value) {

System.out.println(Thread.currentThread()+"开始接收元素:"+value);

request(1); //持续接收元素

}

@Override

protected void hookOnComplete() {

System.out.println(Thread.currentThread()+"流正常结束");

}

@Override

protected void hookOnError(Throwable throwable) {

System.out.println(Thread.currentThread()+"流错误结束:"+throwable);

}

@Override

protected void hookOnCancel() {

System.out.println(Thread.currentThread()+"流被取消");

}

@Override

protected void hookFinally(SignalType type) {

System.out.println(Thread.currentThread()+"最终回调");

}

});

Thread.sleep(6000);

}

/* 到对应的节点时会 触发对应的回调方法

Thread[main,5,main]流开始了:reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber@782e6b40

Thread[main,5,main]开始接收元素:A

Thread[main,5,main]开始接收元素:B

Thread[main,5,main]开始接收元素:C

Thread[main,5,main]开始接收元素:D

Thread[main,5,main]开始接收元素:E

Thread[main,5,main]流正常结束

Thread[main,5,main]最终回调



声明

本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。