Reactor
Reactor相关笔记
Flux 和 Mono
Flux 和 Mono 是 Reactor 中的两个基本概念。
Flux 表示的是包含 0 到 N 个元素的异步序列。
在该序列中可以包含三种不同类型的消息通知:正常的包含元素的消息、序列结束的消息和序列出错的消息。当消息通知产生时,订阅者中对应的方法 onNext()
, onComplete()
和 onError()
会被调用。
Mono 表示的是包含 0 或者 1 个元素的异步序列。
该序列中同样可以包含与 Flux 相同的三种类型的消息通知。
Flux 和 Mono 之间可以进行转换。对一个 Flux 序列进行计数操作,得到的结果是一个 Mono<Long>
对象。把两个 Mono 序列合并在一起,得到的是一个 Flux 对象。
Flux
通过Flux静态方法创建Flux
just()
:可以指定序列中包含的全部元素。创建出来的 Flux 序列在发布这些元素之后会自动结束。fromArray()
,fromIterable()和 fromStream():可以从一个数组、Iterable 对象或 Stream 对象中创建 Flux 对象。empty()
:创建一个不包含任何元素,只发布结束消息的序列。error(Throwable error)
:创建一个只包含错误消息的序列。never()
:创建一个不包含任何消息通知的序列。range(int start, int count)
:创建包含从 start 起始的 count 个数量的 Integer 对象的序列。interval(Duration period)
和 interval(Duration delay, Duration period):创建一个包含了从 0 开始递增的 Long 对象的序列。其中包含的元素按照指定的间隔来发布。除了间隔时间之外,还可以指定起始元素发布之前的延迟时间。intervalMillis(long period)
和 intervalMillis(long delay, long period):与 interval()方法的作用相同,只不过该方法通过毫秒数来指定时间间隔和延迟时间。
Flux.just("Hello", "World").subscribe(System.out::println);
Flux.fromArray(new Integer[]{1, 2, 3}).subscribe(System.out::println);
Flux.empty().subscribe(System.out::println);
Flux.range(1, 10).subscribe(System.out::println);
Flux.interval(Duration.of(10, ChronoUnit.SECONDS)).subscribe(System.out::println);
Flux.intervalMillis(1000).subscribe(System.out::println);
subscribe() 消息处理
当需要处理 Flux 或 Mono 中的消息时,如之前的代码清单所示,可以通过 subscribe 方法来添加相应的订阅逻辑。
在调用 subscribe 方法时可以指定需要处理的消息类型。可以只处理其中包含的正常消息,也可以同时处理错误消息和完成消息。
Flux.just(1, 2)
.concatWith(Mono.error(new IllegalStateException()))
.subscribe(System.out::println, System.err::println);
使用 Reactor 进行反应式编程
https://www.ibm.com/developerworks/cn/java/j-cn-with-reactor-response-encode/index.html
上一篇 Spring-WebFlux
下一篇 Apache-Shiro
页面信息
location:
protocol
: host
: hostname
: origin
: pathname
: href
: document:
referrer
: navigator:
platform
: userAgent
: