托者設(shè)計(jì)吧官網(wǎng)免費(fèi)seo提交工具
響應(yīng)式編程-Project Reactor Mono 介紹
本文以Mono的角度來介紹Reactor編程,Flux的使用同理。
初體驗(yàn)
Web應(yīng)用 controller 方法在Spring webmvc 和 Spring webFlux下Controller方法實(shí)現(xiàn)示例如下:
Spring webmvc: ??? @GetMapping("/test1") ??? @ResponseBody ??? public String test1(){ ??????? String result =? geterateTest(); ??????? return result; ??? } Spring webFlux ??? @GetMapping("/test2") ??? @ResponseBody ??? public Mono<String> test2(){ ??????? Mono<String> result = Mono.fromSupplier(this:: geterateTest); ??????? return result; ??? } |
一個(gè)的響應(yīng)是String對(duì)象, 另一個(gè)是Mono<String>對(duì)象。Reactor Mono表示一個(gè)產(chǎn)生0-1元素的異步序列,異步指Mono創(chuàng)建的時(shí)候并不會(huì)執(zhí)行任何操作,當(dāng)Mono發(fā)生訂閱時(shí)才觸發(fā)Mono序列的運(yùn)行。非阻塞表示test2方法不會(huì)產(chǎn)生任何阻塞,即使genereateTest里面是一個(gè)阻塞的操作,因?yàn)榇藭r(shí)不會(huì)執(zhí)行實(shí)際的邏輯,所以不會(huì)發(fā)生任何阻塞。
NettyHttpServer.onStateChange方法中構(gòu)建Mono并進(jìn)行訂閱。
HttpServerOperations ops = (HttpServerOperations)connection; //Web Flux將按照Spring Web中的約定構(gòu)建一個(gè)Publisher(執(zhí)行過濾器、Controller方//法) Publisher<Void> publisher = (Publisher)this.handler.apply(ops, ops); Mono<Void> mono = Mono.deferContextual((ctx) -> { ????? ops.currentContext = Context.of(ctx); ????? return Mono.fromDirect(publisher); }); …… //subscribe將觸發(fā)前面Spring web中封裝在Mono構(gòu)建過程中的業(yè)務(wù)邏輯的真正執(zhí)行。 //如果我們按照命令是編程去編寫代碼,業(yè)務(wù)邏輯在構(gòu)建Mono的過程中就執(zhí)行了。 mono.subscribe(ops.disposeSubscriber()); |
注: Spring web flux框架下也可以按照傳統(tǒng)的命令式編程。
Mono的構(gòu)建
Reactor編程可以分為 異步序列Mono/Flux的構(gòu)建和和使用兩部分。
Mono的基本構(gòu)建
Mono類 提供了大量靜態(tài)方法幫助構(gòu)建Mono。
- just(T):返回T類型對(duì)象的Mono序列
- fromFuture(future):Mono序列的元素對(duì)象由future產(chǎn)生,訂閱時(shí)Future產(chǎn)生T并推送至訂閱者。其他from方法類似。
- empty():返回一個(gè)訂閱時(shí)直接完成的異步序列
- error():返回一個(gè)訂閱時(shí)直接推送錯(cuò)誤信號(hào)的序列
其他方法詳見Mono類API:
如:Mono<String> mono = Mono.just("TEST");
Mono裝配
假設(shè)我們按照上面示例,將整個(gè)程序都以響應(yīng)式編程的模式進(jìn)行開發(fā),方法都返回一個(gè)異步序列Mono/Flux。當(dāng)調(diào)用者調(diào)用某一個(gè)方法時(shí),面對(duì)返回的Mono/Flux對(duì)象有兩種選擇:1. 訂閱(觸發(fā)執(zhí)行), 2.裝配(Assembly):繼續(xù)將獲取到的異步序列封裝到一個(gè)新的異步序列中,繼續(xù)返回給外部調(diào)用者。如:Spring Web Flux 則是將Spring web 定義的包括WebFilter、Controller等邏輯組裝成一個(gè)復(fù)合的Mono,最終進(jìn)行訂閱。
圖1 Mono裝配示例
OptimizableOperator 接口
?????? OptimizableOperator <IN, OUT>接口提供了指向下一個(gè)OptimizableOperator的指針,并且提供了從IN型訂閱者獲取OUT訂閱者的方法,提供了一個(gè)Mono串行的組裝方法。
圖2 OptimizableOperator接口串行組裝示意圖
要實(shí)現(xiàn)一個(gè)串行化的Mono組裝類通常實(shí)現(xiàn)抽象類InternalMonoOperator<I, O>,構(gòu)造函數(shù)傳入一個(gè)Mono<I>,得到一個(gè)新的O型序列。實(shí)現(xiàn)subscribeOrReturn方法將O型訂閱轉(zhuǎn)化為原I型訂閱者,新的I型訂閱者實(shí)現(xiàn)了基于O性訂閱者之上的強(qiáng)化操作。Mono提供了大量InternalMonoOperator<I,O>的實(shí)現(xiàn)類。下面對(duì)MonoFilter進(jìn)行分析,解釋了如果創(chuàng)建基于InternalMonoOperator實(shí)現(xiàn)的裝配類和使用方法。
MonoFilter
將原Mono上增加一個(gè)過濾Predicate函數(shù),當(dāng)原Mono產(chǎn)生元素時(shí),只有Predicate測(cè)試通過的元素才會(huì)傳遞給最終的訂閱者,測(cè)試失敗將進(jìn)行過濾,Mono元素直接完成。
final class MonoFilter<T> extends InternalMonoOperator<T, T> { ???????? final Predicate<? super T> predicate; ???????? //構(gòu)造函數(shù)必須包含源Mono,和其他附加增加元素,這里是一個(gè)Predicate函數(shù) ???????? MonoFilter(Mono<? extends T> source, Predicate<? super T> predicate) { ????????????????? super(source); ????????????????? this.predicate = Objects.requireNonNull(predicate, "predicate"); ???????? } ???????? /** ???????? * 實(shí)現(xiàn)subscribeOrReturn,接收新Mono類型的訂閱者,返回原Mono類型的訂閱者。 ???????? * 新的訂閱者實(shí)現(xiàn)訂閱時(shí)裝配的目的,這里只有通過Predicate函數(shù)測(cè)試的元素,才會(huì) ???????? * 調(diào)用actual.onNext(T)方法推送給最終的訂閱者 ???????? **/ ???????? @Override ???????? @SuppressWarnings("unchecked") ???????? public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) { ????????????????? if (actual instanceof ConditionalSubscriber) { ????????????????????????? return new FluxFilter.FilterConditionalSubscriber<>((ConditionalSubscriber<? super T>) actual, predicate); ????????????????? } ????????????????? return new FluxFilter.FilterSubscriber<>(actual, predicate); ???????? } ??? ...... } |
Mono內(nèi)置了大量的InternalMonoOperator實(shí)現(xiàn)類,如MonoFilter,但Reactor框架并不對(duì)外暴露這些類,(這些實(shí)現(xiàn)類都是包內(nèi)可見的),而是通過Mono方法的形式去方便獲取各個(gè)可實(shí)現(xiàn)類的對(duì)象,并且統(tǒng)一以Mono類型的對(duì)外暴露。抽象統(tǒng)一的Mono使用范式比起暴露各種各樣的實(shí)現(xiàn)細(xì)節(jié)顯得簡(jiǎn)潔清晰。
我們可以使用Mono內(nèi)置的InternalMonoOperator實(shí)現(xiàn)類,也可以實(shí)現(xiàn)自己的InternalMonoOperator類,但應(yīng)和Reactor框架保持統(tǒng)一的用法, 在Mono的使用上統(tǒng)一以Mono類型和協(xié)議進(jìn)行操作,不對(duì)外暴露具體的實(shí)現(xiàn)細(xì)節(jié)。
Mono 提供的裝配方法
?????? Reactor框架并不暴露具體的裝配類細(xì)節(jié),而是提供了大量靜態(tài)或?qū)嵗椒▉韺?duì)Mono進(jìn)行裝配,返回裝配后的新Mono。如上節(jié)所述的MonoFilter使用方法如下:
Mono.just(2).filter( (v -> v % 2 != 0)).subscribe(i -> System.out.println(i),
??????????????? error -> System.err.println("Error: " + error),
??????????????? ()-> System.out.println("complete"));
Mono filter方法返回了一個(gè)可以對(duì)原序列元素進(jìn)行檢測(cè)的增強(qiáng)Mono,上述例子因Mono.just(2) 中的元素值2 無法通過(v -> v % 2 != 0)的測(cè)試,將被過濾掉,無法傳給最終的訂閱者,而只能接受到原序列的結(jié)束信號(hào), 因此只會(huì)打印“complete“。
Filter方法顯示實(shí)際是返回的MonoFilter對(duì)象。
public final Mono<T> filter(final Predicate<? super T> tester) { ???????? …… ???????? return onAssembly(new MonoFilter<>(this, tester)); } |
其他Mono裝配方法:
- Mono<Tuple2<T1, T2>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2)
:將一個(gè)T1類型元素的Mono和一個(gè)T2類型元素的Mono中的元素組合成一個(gè)Tuple2<T1,T2>元素的Mono. Mono還提供了zip的多種版本,滿足各種情況的Mono組合模式。
- public final Mono<T> timeout(Duration timeout): 當(dāng)原序列產(chǎn)生一個(gè)T類型元素后,如果沒有在指定的時(shí)間內(nèi)完成,則將觸發(fā)一個(gè)錯(cuò)誤。如果在限期內(nèi)完成則沒有任何影響,該實(shí)現(xiàn)使用了MonoTimeout<T, U, V> extends InternalMonoOperator<T, T>。
- doOnXXXX系列方法,如doOnCancel,? doOnNext, doOnError等, 返回在特定事件上加入行為的增強(qiáng)Mono。
更多Mono的裝配方法詳見Mono API。
Mono的使用
Mono的使用其實(shí)只有一種就是對(duì)Mono進(jìn)行訂閱, 但是Mono類也提供了其他傳統(tǒng)的接口來進(jìn)行Mono的使用。
Mono的訂閱
訂閱Mono很簡(jiǎn)單,調(diào)用Mono對(duì)象的subscribe方法,傳入一個(gè)CoreSubscriber的實(shí)現(xiàn)對(duì)象即可。
Mono.subscribe.源碼中展示了對(duì)Mono裝配后的復(fù)合Mono進(jìn)行訂閱的處理邏輯。
public final void subscribe(Subscriber<? super T> actual) { ??? //獲取最后一個(gè)裝配的Mono corePublisher ???????? CorePublisher publisher = Operators.onLastAssembly(this); ???????? CoreSubscriber subscriber = Operators.toCoreSubscriber(actual); ???????? ...... ???????? ??? //如果最后一個(gè)裝配的publisher 實(shí)現(xiàn)了OptmizableOperator接口,一路組裝 ???????? ??? //增強(qiáng)的Subscriber,按照循序后去下一個(gè)OptmizableOperator ????????????????? if (publisher instanceof OptimizableOperator) { ????????????????????????? OptimizableOperator operator = (OptimizableOperator) publisher; ????????????????????????? while (true) { ?????????????????????????????????? subscriber = operator.subscribeOrReturn(subscriber); ?????????????????????????????????? if (subscriber == null) { ??????????????????????????????????????????? return; ?????????????????????????????????? } ?????????????????????????????????? OptimizableOperator newSource = operator.nextOptimizableSource(); ?????????????????????????????????? if (newSource == null) { ??????????????????????????????????????????? publisher = operator.source(); ??????????????????????????????????????????? break; ?????????????????????????????????? } ?????????????????????????????????? operator = newSource; ????????????????????????? } ????????????????? } ???????? ??? //直到最底層的CorePublisher,使用最終轉(zhuǎn)換所得的subscriber進(jìn)行訂閱, ???????? ??? //原始序列產(chǎn)生的序號(hào),將在一些列增強(qiáng)subscriber的增強(qiáng)下,或丟棄、或加工后傳給 ???????? ??? //實(shí)際的訂閱者 ????????????????? publisher.subscribe(subscriber); } |
Mono的簡(jiǎn)化使用
?????? Mono 提供了一些方法簡(jiǎn)化Mono的訂閱操作,如block() 阻塞當(dāng)前線程知道Mono序列返回元素或完成/異常信號(hào)
PublishOn和SubscribeOn
?????? publishOn 和 SubscribeOn 傳入Scheduler對(duì)象,將Mono的行為交由Scheduler的現(xiàn)成執(zhí)行。其中publishOn調(diào)用之后的序列行為在新的執(zhí)行線程執(zhí)行,而SubscribeOn則是整個(gè)序列的執(zhí)行都在新的現(xiàn)成中執(zhí)行。
final Flux<String> flux = Flux
??? .range(1, 2)
??? .map(i -> 10 + i)?
??? .publishOn(s)?
.map(i -> "value " + i);
flux.subscribe(System.out::println)
final Flux<String> flux = Flux
??? .range(1, 2)
??? .map(i -> 10 + i)?
??? .subscribeOn(s)?
??? .map(i -> "value " + i);
flux.subscribe(System.out::println)
總結(jié)
?????? 本文對(duì)Reactor的Mono編程進(jìn)行了初步的介紹,體現(xiàn)了響應(yīng)式編程的核心在于異步序列的構(gòu)建(Mono/Flux)和訂閱使用。 其中構(gòu)建時(shí)對(duì)Mono/Flux的裝配(Assembly)是整個(gè)編程模型的核心。