reactor模式是谁提出来的(Reactor3参考指南)
reactor模式是谁提出来的(Reactor3参考指南)基于状态的 generate 方法示例例如,你可以使用 int 作为状态:以编程方式创建 Flux 的最简单方式是通过 generate 方法,该方法采用 generator 函数。这是针对同步和逐个发射的,这意味着 sink 是一个 SynchronousSink,并且它的 next() 方法在每次回调调用中最多只能调用一次。然后你可以另外调用 error(Throwable) 或 complete(),但这是可选的。最有用的变体可能是这样一个变体,它还允许你保持一个状态,你可以在 sink 使用中引用该状态来决定下一个要发出什么。然后,生成器函数变为 BiFunction<S SynchronousSink<T> S>,其中 <S> 是状态对象的类型。你必须为初始状态提供一个 Supplier<S>,并且生成器函数现在在每一轮返回一个新状
上一篇[4、Reactor 核心特性(1)]
下一篇[4、Reactor 核心特性(3)]
英文原文:https://projectreactor.io/docs/core/release/reference/#core-features GitHub:https://github.com/jijicai/ProjectReactor/tree/master/book/Reactor3 4.4. 以编程方式创建一个序列
在本节中,我们将通过编程定义其关联事件(onNext、onError、onComplete)来介绍 Flux 或 Mono 的创建。所有这些方法都共享这样一个事实:它们公开一个 API 来触发我们称为 sink 的事件。实际上,有一些 sink 的变体,我们很快就会讲到。
4.4.1. 同步方法:generate
以编程方式创建 Flux 的最简单方式是通过 generate 方法,该方法采用 generator 函数。
这是针对同步和逐个发射的,这意味着 sink 是一个 SynchronousSink,并且它的 next() 方法在每次回调调用中最多只能调用一次。然后你可以另外调用 error(Throwable) 或 complete(),但这是可选的。
最有用的变体可能是这样一个变体,它还允许你保持一个状态,你可以在 sink 使用中引用该状态来决定下一个要发出什么。然后,生成器函数变为 BiFunction<S SynchronousSink<T> S>,其中 <S> 是状态对象的类型。你必须为初始状态提供一个 Supplier<S>,并且生成器函数现在在每一轮返回一个新状态。
例如,你可以使用 int 作为状态:
基于状态的 generate 方法示例
Flux<String> flux = Flux.generate( () -> 0 (state sink) -> { sink.next("3 x " state " = " 3*state); if (state == 10) sink.complete(); return state 1; });
(1)我们提供初始状态值:0
(2)我们使用状态来选择发出什么(3 的乘法表中的一行)
(3)我们也用它来选择何时停止。
(4)我们返回在下一次调用中使用的新状态。(除非序列在此调用中终止)
上面的代码生成了 3 的乘法表,如下所示:
3 x 0 = 0 3 x 1 = 3 3 x 2 = 6 3 x 3 = 9 3 x 4 = 12 3 x 5 = 15 3 x 6 = 18 3 x 7 = 21 3 x 8 = 24 3 x 9 = 27 3 x 10 = 30
还可以使用可变的 <S>。例如,可以使用一个 AtomicLong 作为状态重写上面的示例,并在每一轮中对其进行修改。
可变状态的方法 generate 变体:
Flux<String> flux = Flux.generate( AtomicLong::new (state sink) -> { long i = state.getAndIncrement(); sink.next("3 x " i " = " 3*i); if (i == 10) sink.complete(); return state; });
(1)这次,我们生成一个可变对象作为状态。
(2)在这改变状态。
(3)返回与新状态相同的实例。
注意:
如果状态对象需要清理一些资源,则可以使用 generate(Supplier<S> BiFunction Consumer<S>) 变体来清理最后一个状态实例。
下面是使用包含一个 Consumer 的 generate 方法的示例:
Flux<String> flux = Flux.generate( AtomicLong::new (state sink) -> { long i = state.getAndIncrement(); sink.next("3 x " i " = " 3*i); if (i == 10) sink.complete(); return state; } (state) -> System.out.println("state: " state)); }
(1)同样,生成一个可变对象作为状态。
(2)在这改变状态。
(3)返回与新状态相同的实例。
(4)我们将最后一个状态值(11)作为 Consumer 拉姆达表达式的输出。
在这种情况下,这个状态变量包含一个数据库连接或其他资源,这些需要在进程的末尾被处理。Consumer 拉姆达表达式可以关闭这个连接,处理任何在进程的末尾应当做完的任务。
在包含数据库连接或其他资源的状态对象需要在进程结束时处理的情况下,Consumer 拉姆达表达式可以关闭连接,或以其他方式处理应该在进程结束时完成的任何任务。
4.4.2. 异步且多线程的方法:create
create 方法是一种更高级的程序化创建 Flux 的形式,它适合于每轮多次发送,甚至来自多个线程。
它提供了 FluxSink 类,其中包含:next、error 和 complete 方法。与 generate 方法相反,它没有基于状态的变量。另一方面,它可以在回调中触发多线程事件。
注意:
create 方法对于将现有的 API 与反应式世界(例如基于监听器的异步 API)桥接起来非常有用。
警告:
尽管 create 方法可以与异步 API 一起使用,但它不会将代码并行化,也不会使其成为异步的。如果在 create 方法的 拉姆达表达式中阻塞,则会使自己暴露在死锁和类似的副作用中。即使使用 subscribeOn,也需要注意,一个长时间阻塞的 create 方法的拉姆达表达式(例如调用 sink.next(t) 的无限循环)可以锁定管道:由于循环耗尽了请求应该运行的同一线程,因此永远不会执行这些请求。使用 subscribeOn(Scheduler false) 变体:requestOnSeparateThread=false 会使用 create 方法的 Scheduler 线程,并仍然通过在原始线程中执行请求让数据流动。
假设你使用一个基于监听器的 API。它按块处理数据,并有两个事件:
(1)数据块已准备好,(2)处理已完成(终端事件),如 MyEventListener 接口中所示:
interface MyEventListener<T> { void onDataChunk(List<T> chunk); void processComplete(); }
你可以使用 create 方法将这个接口桥接到 Flux<T>:
Flux<String> bridge = Flux.create(sink -> { myEventProcessor.register( new MyEventListener<String>() { public void onDataChunk(List<String> chunk) { for(String s : chunk) { sink.next(s); } } public void processComplete() { sink.complete(); } }); });
(1)桥接到 MyEventListener API
(2)块中的每个元素都成为 Flux 中的一个元素。
(3)processComplete 事件被转换为 onComplete 。
(4)所有这些都是在 myEventProcessor 执行时异步完成的。
此外,由于 create 方法可以桥接异步 APIs 并管理背压,因此可以通过指明 OverflowStrategy 来改进如何明智地处理背压。
IGNORE:完全忽略下游的背压请求。当队列在下游排满时,可能会产生 IllegalStateException。
ERROR:当下游无法跟上时,发出 IllegalStateException 信号。
DROP:如果下游没有准备好接收传入信号,则将其删除。
LATEST:让下游只接收来自上游的最新信号。
BUFFER:如果下游无法跟上,则缓存所有信号。这是默认情况。(这将执行无限制的缓冲,并可能导致内存溢出错误(OutOfMemoryError))
注意:
Mono 也有一个 create 生成器。Mono 的 create 的 MonoSink 不允许多次发送信号。它会在第一个信号之后丢弃所有信号。
4.4.3. 异步且单线程的方法:push
push 方法位于 generat 和 create 之间,适合于处理来自单个生产者的事件。它类似于创建,因为它也可以是异步的,并且可以使用 create 支持的任何溢出策略来管理背压。但是,一次只能有一个生产线程可以调用 next、complete 或 error。
Flux<String> bridge = Flux.push(sink -> { myEventProcessor.register( new SingleThreadEventListener<String>() { public void onDataChunk(List<String> chunk) { for(String s : chunk) { sink.next(s); } } public void processComplete() { sink.complete(); } public void processError(Throwable e) { sink.error(e); } }); });
(1)桥接到 SingleThreadEventListener API。
(2)使用来自单个监听器线程的 next 方法推送事件到接收器(sink)。
(3)complete 事件产生自相同的监听器线程。
(4)error 事件也产生自相同的监听器线程。
一个混合的 push/pull 模型
大多数 Reactor 操作符,像 create,遵循混合的 push/pull 模型。我们的意思是,尽管大多数处理是异步的(建议使用 push 方法),但它有一个小的 pull 组件:请求。
消费者从源拉取数据,也就是说,在第一次请求之前,它不会发出任何数据。当数据可用时,源推送数据到消费者,但在其请求量的范围内。
请注意,push() 和 create() 都允许设置 onRequest 消费者,以便管理请求量,并确保只有在存在挂起的请求时才通过接收器(sink)推送数据。
Flux<String> bridge = Flux.create(sink -> { myMessageProcessor.register( new MyMessageListener<String>() { public void onMessage(List<String> messages) { for(String s : messages) { sink.next(s); } } }); sink.onRequest(n -> { List<String> messages = myMessageProcessor.getHistory(n); for(String s : message) { sink.next(s); } }); });
(1)当发出请求时轮询消息。
(2)如果消息立即可用,则推送它们到接送器(sink)。
(3)稍后异步到达的其余消息也将被发送。
在 push() 或 create() 后清理数据
两个回调,onDispose 和 onCancel,在取消或终止时执行清理。当 Flux 完成、错误消除,或者被取消时,onDispose 可用于执行清理。onCancel 可用于在使用 onDispose 清理之前执行任何特定于取消的操作。
Flux<String> bridge = Flux.create(sink -> { sink.onRequest(n -> channel.poll(n)) .onCancel(() -> channel.cancel()) .onDispose(() -> channel.close()) });
(1)首先调用 onCancel,仅用于取消信号。
(2)调用 onDispose 处理完成、错误或取消信号。
4.4.4. 方法:handle
handle 方法有点不同:它是一个实例方法,这意味着它被连接到一个现有的源上(就像常见的操作符一样)。Mono 和 Flux 中都有这个方法。
它与 generate 相似,在这个意义上,它使用 SynchronousSink 并且只允许一个一个地发送数据。但是,handle 可以用于从每个源元素中生成任意值,可能跳过某些元素。通过这种方式,它可以作为 map 和 filter 的结合。handle 签名如下:
Flux<R> handle(BiConsumer<T SynchronousSink<R>>);
让我们看一个例子。反应式流规范不允许序列中有 null 值。如果你想执行 map,但是你想使用现有方法作为 map 函数,而该方法有时候返回 null,那该怎么办?
例如,下面的方法可以安全地应用到整数源:
public String alphabet(int letterNumber) { if (letterNumber < 1 || letterNumber > 26) { return null; } int letterIndexAscii = 'A' letterNumber - 1; return "" (char) letterIndexAscii; }
然后,我们可以使用 handle 方法去移除任何 null 值:
为“映射和消除 null”场景使用 handle 方法。
Flux<String> alphabet = Flux.just(-1 30 13 9 20) .handle((i sink) -> { String letter = alphabet(i); if (letter != null) sink.next(letter); }); alphabet.subscribe(System.out::println);
(1)映射到字母。
(2)如果 “map 函数”返回 null...
(3)通过不调用 sink.next 来过滤它。
输出:
M I T 4.5. 线程和调度器
像 RxJava 一样,Reactor 可以被认为是并发不可知的。也就是说,它不强制并发模型。相反,它让开发人员你来负责。但是,这并不妨碍库帮助你处理并发。
获取 Flux 或 Mono 并不一定意味着它将运行在专有线程中。相反,大多数操作符继续在前一个操作符执行的线程中工作。如果不指定,最上面的操作符(源)自身在产生 subscribe() 调用的线程上运行。
public static void main(String[] args) { final Mono<String> mono = Mono.just("hello "); new Thread(() -> mono .map(msg -> msg "thread ") .subscribe(v -> System.out.println(v Thread.currentThread().getName()) ) ).join(); }
(1)Mono<String> 在 main 线程中组装的...
(2)...但在 Thread-0 线程中订阅它。
(3)因此,map 和 onNext 回调实际上都运行在 Thread-0 线程中。
上面的代码生成以下输出:
hello thread Thread-0
在 Reactor 中,执行模型和执行发生的位置由所使用的 Scheduler 决定。Scheduler 的调度职责与 ExecutorService 类似,但是有一个专用的抽象允许做更多的工作,尤其是充当时钟和支持更广泛的实现。(测试、trampolining(蹦床)或即时调度等的虚拟时间)
Schedulers 类有静态方法,可以访问以下执行上下文:
(1)当前线程(Schedulers.immediate())
(2)单个的可复用的线程(Schedulers.single())。请注意,在释放 Scheduler 之前,此方法对所有调用者重复使用同一线程。如果你想要每个调用使用单独的线程,则对每个调用使用 Schedulers.newSingle()。
(3)可伸缩的线程池(Schedulers.elastic())。它根据需要创建新的 worker 池,并重用空闲的。空闲时间过长(默认:60秒)的 worker 池将被释放。例如,这对 I/O 阻塞工作来说是一个好选择。Schedulers.elastic() 是一种方便的方法,可以让阻塞进程拥有自己的线程,这样它就不会占用其他资源。查看:如何包装同步的阻塞调用?
(https://projectreactor.io/docs/core/release/reference/#faq.wrap-blocking)
(4)为并行工作而调优的固定工作线程池(Schedulers.parallel())。它创建和 CPU 核心数一样多的 worker。
另外,通过使用 Schedulers.fromExecutorService(ExecutorService),你可以从任何预先存在的 ExecutorService 创建 Scheduler 。(你也可以从 Executor 创建一个,尽管不鼓励这样做)
还可以使用 newXXX 方法创建各种调度器类型的新实例。例如,Schedulers.newElastic(yourScheduleName),创建一个名称为 yourScheduleName 的新弹性调度器。
警告:
如果不能避免遗留阻塞代码,那么使用 elastic 可以帮助处理它们,而single 和 parallel 不能。因此,在默认的单个和并行 Schedulers 中使用 Reactor 的阻塞 APIs(block()、blockFirst()、blockLast() 以及迭代 toIterable() 或 toStream())将导致抛出 IllegalStateException。
警告:
通过创建实现了 NonBlocking 标记接口的 Thread 实例,自定义的 Schedulers 也可以被标记为“仅非阻塞”。
一些操作符默认使用来自 Schedulers 的特定调度器(并且通常为你提供不同的调度器选项)。例如,调用工厂方法 Flux.interval(Duration.ofMillis(300)) 会生成一个 Flux<Long>,每 300毫秒滴答一次。默认情况下,这是由 Schedulers.parallel() 启用的。以下行将调度器(scheduler)更改为类似 Schedulers.single() 的新实例。
Flux.interval(Duration.ofMillis(300) Schedulers.newSingle("test"))
Reactor 提供在反应式链中切换执行上下文(或 Scheduler)的两种方法:publishOn 和 subscribeOn。两者都采用 Scheduler,并允许你将执行上下文切换到该 Scheduler。但 publishOn 在链中的位置很重要,而 subscribeOn 的位置却不重要。要理解这一区别,你首先要记住:在订阅之前什么都不会发生。
在 Reactor 中,当你链接操作符时,你可以根据需要将多个 Flux 和 Mono 实现包装在一起。订阅之后,将创建 Subscriber 对象的链,向后(向上)到第一个发布者。这实际上是对你隐藏的。你所能看到的只是 Flux(或 Mono)和 Subscription 的外层,但这些中间的特定于操作符的订阅者才是真正的工作发生的地方。
有了这些知识,我们可以更深入地了解 publishOn 和 subscribeOn 操作符。
4.5.1. 方法:publishOn
publishOn 与订阅者链中间的任何其他操作符一样以相同的方式应用。它接收来自上游的信号,并在下游重放它们,同时对来自关联的 Scheduler 的 worker 执行回调。因此,它会影响后续操作符的执行位置(直到另一个 publishOn 被链接进来):
(1)将执行上下文更改为 Scheduler 选择的一个 Thread。
(2)按照规范, onNext 是按顺序发生的,因此这将使用单个线程。
(3)如果它们没有在特定的 Scheduler 上工作,则 publishOn 之后的操作符继续在同一线程上执行。
Scheduler s = Schedulers.newParallel("parallel-scheduler" 4); final Flux<String> flux = Flux .range(1 2) .map(i -> 10 i) .publishOn(s) .map(i -> "value " i); new Thread(() -> flux.subscribe(System.out::println));
(1)创建一个由 4 个线程支持的新的 Scheduler
(2)第一个 map 运行在 <5> 中的匿名线程上
(3)publishOn 在从 <1> 中选择的 Thread 上切换整个序列
(4)第二个 map 在 <1> 的线程上运行
(5)这个匿名 Thread 是订阅发生的线程。打印发生在最新的执行上下文上,该上下文来自 publishOn。
4.5.2. 方法:subscribeOn
当该反向链被构造时,subscribeOn 应用于订阅进程。因此,不论把 subscribeOn 放在链条中的哪个位置,它总是会影响源发射的上下文。但是,这不会影响后续对 publishOn 的调用行为。它们仍然会切换执行上下文以获取链的一部分。
(1)更改上面整个运算符链订阅的 Thread
(2)从 Scheduler 选择一个线程
注意:实际上,仅考虑在链条中最早的 subscribeOn 调用。
Scheduler s = Schedulers.newParallel("parallel-scheduler" 4); final Flux<String> flux = Flux .range(1 2) .map(i -> 10 i) .subscribeOn(s) .map(i -> "value " i); new Thread(() -> flux.subscribe(System.out::println));
(1)创建一个由 4 个 Thread 支持的新 Scheduler
(2)第一个 map 运行在这 4 个线程之一上
(3)因为 subscribeOn 将从订阅时间(<5>)开始切换整个序列
(4)第二个 map 也运行在同一线程上
(5)这个匿名 Thread 是订阅最初发生的线程,但 subscribeOn 立即将其转移到 4 个调度器线程之一。
上一篇[4、Reactor 核心特性(1)]
下一篇[4、Reactor 核心特性(3)]