【一起學(xué)習(xí)Reactor】響應(yīng)式編程簡(jiǎn)介

IMG_3558.JPG

Reactor是響應(yīng)式編程規(guī)范的一個(gè)實(shí)現(xiàn),維基百科對(duì)響應(yīng)式編程的總結(jié)如下:

響應(yīng)式編程是一種異步編程范例,主要關(guān)注數(shù)據(jù)流和數(shù)據(jù)變化通知。這意味著可以使用編程語言輕松表達(dá)靜態(tài)(數(shù)組)或動(dòng)態(tài)(事件發(fā)射器)數(shù)據(jù)流。更多有關(guān)響應(yīng)式編程的描述可以參考Reactive programming

響應(yīng)式編程邁出的第一步是微軟在.NET系統(tǒng)中創(chuàng)建了響應(yīng)式擴(kuò)展庫(kù)(Rx)。在微軟創(chuàng)建Rx之后,RxJava在JVM上實(shí)現(xiàn)了響應(yīng)式編程。隨著時(shí)間推移,經(jīng)過Reactive Streams的不斷努力制定了Java實(shí)現(xiàn)響應(yīng)式編程的規(guī)范,規(guī)范為JVM上的響應(yīng)式庫(kù)定義了一組接口和交互規(guī)則。Java 9的Flow類已經(jīng)實(shí)現(xiàn)了規(guī)范定義的接口(從Java 9 開始,java開始默認(rèn)支持響應(yīng)式編程,有條件的小伙伴該考慮升級(jí)Java版本了)。

在面向?qū)ο蟮木幊陶Z言中,響應(yīng)式編程通常作為觀察者模式的一種擴(kuò)展。如果對(duì)比迭代器設(shè)計(jì)模式和主流的響應(yīng)式流模式對(duì)比,會(huì)發(fā)現(xiàn)在幾乎所有的庫(kù)中Iterable-Iterator 都有雙重性(可以互相轉(zhuǎn)換)。兩者主要的區(qū)別是:迭代器設(shè)計(jì)模式基于,響應(yīng)式流基于

迭代器是命令式編程模式,盡管訪問數(shù)據(jù)的方法僅由Iterable負(fù)責(zé)。實(shí)際上在使用迭代器時(shí)由開發(fā)者決定何時(shí)選擇序列中的next()元素。在響應(yīng)式流中,和上面Iterable-Iterator對(duì)應(yīng)的是Publisher-Subscriber,新值出現(xiàn)時(shí)Publisher 會(huì)通知Subscriber ,推送是響應(yīng)的關(guān)鍵。同樣,對(duì)推送值的操作是聲明式而不是命令式,代碼表達(dá)計(jì)算的邏輯,而不是描述其精確的控制流。

響應(yīng)式流除了推送值之外,同樣以良好的方式定義了錯(cuò)誤處理和操作完成。一個(gè)Publisher可以向其Subscriber推送新的值,也可以發(fā)送錯(cuò)誤信號(hào)或者完成信號(hào)。錯(cuò)誤信號(hào)和完成信號(hào)都會(huì)終止序列,下面的表達(dá)式準(zhǔn)確簡(jiǎn)練的描述了這個(gè)邏輯:

onNext x 0..N [onError | onComplete]

這種模式非常的靈活,可以支持沒有值,一個(gè)值或n個(gè)值(包括無限序列,比如時(shí)間)。但是為什么我們首先需要這樣一個(gè)異步響應(yīng)式庫(kù)呢?

阻塞是一種浪費(fèi)

現(xiàn)在應(yīng)用有大量的并發(fā)用戶,盡管現(xiàn)代化硬件的能力在不斷提升,但是軟件性能依然是一個(gè)關(guān)鍵問題。有兩種方法可以提高軟件的性能:

  • 并行使用更多的線程和更多的硬件資源,
  • 提高現(xiàn)有資源的使用率。

通常,Java 開發(fā)者使用阻塞代碼開發(fā)程序,這種方法在沒有性能瓶頸之前非常的完美,因?yàn)樽枞a更容易理解也更容易編寫。當(dāng)程序出現(xiàn)性能瓶頸時(shí),引入另外的線程來運(yùn)行相同的阻塞代碼(活多了需要加人)。但是,資源的這種擴(kuò)展會(huì)迅速引入競(jìng)爭(zhēng)和并發(fā)問題。更糟糕的是,阻塞會(huì)浪費(fèi)資源。如果程序遇到延遲(特別是I/O操作,比如數(shù)據(jù)庫(kù)請(qǐng)求或網(wǎng)絡(luò)調(diào)用),因?yàn)榫€程需要等待數(shù)據(jù)而處于空閑狀態(tài)進(jìn)而導(dǎo)致資源的浪費(fèi)。

因此,并行不是靈丹妙藥。充分使用硬件的能力十分必要,但是推理過程十分復(fù)雜而且更加容易造成浪費(fèi)。

異步是一副良藥嗎?

通過編寫異步、非阻塞代碼,可以切換到另一個(gè)活動(dòng)的使用相同基礎(chǔ)資源的任務(wù)并在異步處理完成后返回到當(dāng)前的任務(wù)。通過異步代碼可以提高資源的使用率,減少資源浪費(fèi)。

Java提供了下面兩種異步編程模型:

  • Callbacks: 異步方法沒有返回值,但是需要一個(gè)額外的callback參數(shù)(可以是lambda表達(dá)式或匿名類),當(dāng)結(jié)果可用時(shí)會(huì)被調(diào)用。
  • Futures::方法立即返回一個(gè)Future<T> 。異步計(jì)算T的值,但是Future 封裝了對(duì)T值的訪問。T值可能不是立即可用,而且Future對(duì)象支持輪詢直到值T可用。Java的ExecutorService 執(zhí)行Callable<T> 時(shí)返回一個(gè)Future 對(duì)象。

Java提供的這兩種編寫異步代碼的技術(shù)足夠好了嗎?這些技術(shù)很好,但并不適用于每一種場(chǎng)景,而且都有各自的局限性。Callbacks的缺點(diǎn)是很難組合在一起使用,而且多個(gè)回調(diào)組合在一起使用時(shí),代碼很快就會(huì)變的難以閱讀和維護(hù)(通常稱為回調(diào)地獄)。

下面以在用戶界面顯示用戶前五個(gè)收藏夾為樣例說明Callbacks的局限性。業(yè)務(wù)場(chǎng)景為:顯時(shí)用戶前五個(gè)收藏夾,如果用戶沒有收藏夾則顯示建議。

userService.getFavorites(userId, new Callback<List<String>>() { // 1
  public void onSuccess(List<String> list) { // 2
    if (list.isEmpty()) { // 3
      suggestionService.getSuggestions(new Callback<List<Favorite>>() {
        public void onSuccess(List<Favorite> list) { // 4
          UiUtils.submitOnUiThread(() -> { // 5
            list.stream()
                .limit(5)
                .forEach(uiList::show); // 6
            });
        }

        public void onError(Throwable error) { // 7
          UiUtils.errorPopup(error);
        }
      });
    } else {
      list.stream() //8
          .limit(5)
          .forEach(favId -> favoriteService.getDetails(favId, // 9
            new Callback<Favorite>() {
              public void onSuccess(Favorite details) {
                UiUtils.submitOnUiThread(() -> uiList.show(details));
              }

              public void onError(Throwable error) {
                UiUtils.errorPopup(error);
              }
            }
          ));
    }
  }

  public void onError(Throwable error) {
    UiUtils.errorPopup(error);
  }
});

基于callback的實(shí)現(xiàn)有很多的代碼,這些代碼難以理解,要想一步一步弄懂邏輯也比較困難,而且代碼還有部分重復(fù)。下面是基于Reactor的等價(jià)實(shí)現(xiàn):

userService.getFavorites(userId)
           .flatMap(favoriteService::getDetails)
           .switchIfEmpty(suggestionService.getSuggestions())
           .take(5)
           .publishOn(UiUtils.uiThreadScheduler())
           .subscribe(uiList::show, UiUtils::errorPopup);

基于callback的代碼實(shí)現(xiàn)如果要增加超時(shí)邏輯會(huì)十分的困難,但是基于Reactor的實(shí)現(xiàn)只要使用timeout方法即可輕松完成:

userService.getFavorites(userId)
           .timeout(Duration.ofMillis(800))
           .onErrorResume(cacheService.cachedFavoritesFor(userId))
           .flatMap(favoriteService::getDetails)
           .switchIfEmpty(suggestionService.getSuggestions())
           .take(5)
           .publishOn(UiUtils.uiThreadScheduler())
           .subscribe(uiList::show, UiUtils::errorPopup);

Future對(duì)象比回調(diào)好一點(diǎn),但它們?cè)诮M合方面仍然做得不好,盡管CompletableFuture在Java 8中帶來了改進(jìn)。編排多個(gè)Future 對(duì)象可行但是并不容易。除此之外,Future 還有其他問題:

  • 調(diào)用get() 方法很容易導(dǎo)致Future 對(duì)象阻塞,
  • 不支持懶加載/懶計(jì)算,
  • 它們?nèi)狈?duì)多值和高級(jí)錯(cuò)誤處理的支持。

考慮另一個(gè)例子:我們獲得一個(gè)id列表,我們想從中獲取一個(gè)名稱和一個(gè)統(tǒng)計(jì)信息,并將它們成對(duì)組合,所有這些都是異步的.。下面的代碼使用一個(gè) CompletableFuture列表來實(shí)現(xiàn)該功能:

CompletableFuture<List<String>> ids = ifhIds();

CompletableFuture<List<String>> result = ids.thenComposeAsync(l -> {
    Stream<CompletableFuture<String>> zip =
            l.stream().map(i -> {
                CompletableFuture<String> nameTask = ifhName(i);
                CompletableFuture<Integer> statTask = ifhStat(i);

                return nameTask.thenCombineAsync(statTask, (name, stat) -> "Name " + name + " has stats " + stat);
            });
    List<CompletableFuture<String>> combinationList = zip.collect(Collectors.toList());
    CompletableFuture<String>[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]);

    CompletableFuture<Void> allDone = CompletableFuture.allOf(combinationArray);
    return allDone.thenApply(v -> combinationList.stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList()));
});

List<String> results = result.join();
assertThat(results).contains(
        "Name NameJoe has stats 103",
        "Name NameBart has stats 104",
        "Name NameHenry has stats 105",
        "Name NameNicole has stats 106",
        "Name NameABSLAJNFOAJNFOANFANSF has stats 121");

由于Reactor有更多的可開箱的組合操作符,上面的過程可以簡(jiǎn)化如下:

Flux<String> ids = ifhrIds();

Flux<String> combinations =
        ids.flatMap(id -> {
            Mono<String> nameTask = ifhrName(id);
            Mono<Integer> statTask = ifhrStat(id);

            return nameTask.zipWith(statTask,
                    (name, stat) -> "Name " + name + " has stats " + stat);
        });

Mono<List<String>> result = combinations.collectList(); 

List<String> results = result.block(); 
assertThat(results).containsExactly(
        "Name NameJoe has stats 103",
        "Name NameBart has stats 104",
        "Name NameHenry has stats 105",
        "Name NameNicole has stats 106",
        "Name NameABSLAJNFOAJNFOANFANSF has stats 121"
);

使用回調(diào)和Future對(duì)象的危險(xiǎn)是相似的,這也是響應(yīng)式編程通過Publisher-Subscriber對(duì)解決的問題。

3.3. 從命令式編程到響應(yīng)式編程

諸如Reactor的響應(yīng)式編程庫(kù)旨在解決JVM上“經(jīng)典”異步方法的缺點(diǎn),同時(shí)也著重對(duì)以下方面進(jìn)行改進(jìn):

  • 可組合性可讀性,
  • 把數(shù)據(jù)當(dāng)做流處理,同時(shí)提供豐富的操作方法,
  • 訂閱之前不會(huì)發(fā)生任何事情,
  • 背壓消費(fèi)者向生產(chǎn)者發(fā)送信號(hào)通知數(shù)據(jù)生產(chǎn)速率過高或過低的能力
  • 對(duì)并發(fā)不可知更高價(jià)值和更高級(jí)的抽象。

3.3.1. 可組合性和可讀性

“可組合性”指的是編排多個(gè)異步任務(wù)的能力,可以將前面任務(wù)的結(jié)果作為后續(xù)任務(wù)的輸入。當(dāng)然也可以以fork-join的方式運(yùn)行多個(gè)任務(wù)。此外,我們可以在更高級(jí)的系統(tǒng)中把異步任務(wù)作為離散組件重用。

編排任務(wù)的能力與代碼的可讀性和可維護(hù)性緊密相關(guān)。隨著異步處理層的數(shù)量和復(fù)雜性的增加,編寫和閱讀代碼變得越來越困難。正如我們所見,callback模型十分簡(jiǎn)單,但是callback一個(gè)缺點(diǎn)就是處理變的復(fù)雜,一個(gè)callback需要在另外一個(gè)callback中執(zhí)行,這樣一層一層的嵌套。這就是“回調(diào)地獄”,這種代碼難以閱讀和分析邏輯。

Reactor提供了豐富的組合操作,代碼可以反應(yīng)對(duì)處理過程抽象的組織,一切盡量保持在同一層(盡量減少嵌套,這也是和callback模式相比最大的改進(jìn)之一)。

3.3.2. 類比工廠的生產(chǎn)線

數(shù)據(jù)在響應(yīng)式程序中的處理過程,可以被看作是數(shù)據(jù)在組裝流水線中移動(dòng)。Reactor既是傳送帶又是工作站。原材料從來源(第一個(gè)Publisher)傾瀉而出(中間經(jīng)過多道工序加工),最終成為可以推送給消費(fèi)者(Subscriber)的成品。

原材料可以經(jīng)過各種轉(zhuǎn)換和其他中間步驟,也可以成為將中間零件組裝在一起的更大裝配線的一部分。如果某一點(diǎn)出現(xiàn)故障或堵塞(某到工序耗時(shí)長(zhǎng)),那么出問題的工作站可以向上游發(fā)出信號(hào),以限制原材料的流動(dòng)(有問題及時(shí)向上游反饋,上游做出響應(yīng),避免進(jìn)一步惡化)。

3.3.3. Operators

在Reactor中,operator就是流水線中的工作站。每個(gè)operator都會(huì)將行為添加到Publisher 中,并將上一步的Publisher 包裝到新實(shí)例中。這樣構(gòu)建了一個(gè)完整的鏈接,數(shù)據(jù)從第一個(gè)Publisher 向下游移動(dòng)并由每一個(gè)鏈接進(jìn)行轉(zhuǎn)換,最后,由一個(gè)Subscriber 結(jié)束數(shù)據(jù)的數(shù)據(jù)處理過程。在Subscriber 訂閱之前數(shù)據(jù)不會(huì)被處理也不會(huì)向下游移動(dòng)。

盡管響應(yīng)式流規(guī)范根本沒有定義operator,但是像Reactor的響應(yīng)式庫(kù)提供的最佳附加值之一就是提供了豐富的operator,從簡(jiǎn)單的轉(zhuǎn)換和過濾到復(fù)雜的編排和錯(cuò)誤處理,這些內(nèi)容涉及很多的領(lǐng)域。

3.3.4. 不 subscribe()不會(huì)發(fā)生任何事情

在Reactor中,當(dāng)你編寫了一個(gè)Publisher 鏈,默認(rèn)數(shù)據(jù)不會(huì)注入。實(shí)際上只是創(chuàng)建了一個(gè)異步處理過程的抽象描述(這有助于重用和組合)。通過subscribing 動(dòng)作,將Publisher 綁定到Subscriber ,這會(huì)觸發(fā)數(shù)據(jù)在整個(gè)鏈路中移動(dòng)。內(nèi)部實(shí)現(xiàn)是通過Subscriber 發(fā)送Request 信號(hào),信號(hào)被傳播到上游一直到Publisherrequest 也是實(shí)現(xiàn)背壓的關(guān)鍵方法。

3.3.5. 背壓

向上游傳播信號(hào)通常用來實(shí)現(xiàn)背壓,在和流水線的類比中描述為當(dāng)工作站處理比上游工作站的處理速度慢時(shí),沿著流水線向上游反饋。背壓其實(shí)就是下游向上游發(fā)送信號(hào),并影響上游數(shù)據(jù)處理的一種機(jī)制。

響應(yīng)式流規(guī)范定義的實(shí)際機(jī)制可以簡(jiǎn)單的概括為:一個(gè)subscriber可以以“無界” 模式工作,并讓數(shù)據(jù)源以最快的速度推送所有數(shù)據(jù),或者使用request 機(jī)制向數(shù)據(jù)源發(fā)送信息,向數(shù)據(jù)源反饋已經(jīng)準(zhǔn)備好處理n個(gè)元素。

中間operator可以在中途改變r(jià)equest。想象一下一個(gè)buffer 以十個(gè)元素為一組將元素進(jìn)行分組。如果subscriber請(qǐng)求一個(gè)buffer,數(shù)據(jù)源發(fā)送十個(gè)元素是可以被接受。一些operator也實(shí)現(xiàn)了預(yù)拉取策略 ,這避免了request(1) 不斷往返。如果在請(qǐng)求之前生成元素的成本很低,這種操作就非常的有幫助,可以顯著的提高處理效率。

這會(huì)將推模式轉(zhuǎn)換為推拉混合模式,如果上游已經(jīng)準(zhǔn)備了數(shù)據(jù),下游則可以從上游獲取n個(gè)元素。但是如果數(shù)據(jù)還沒有準(zhǔn)備好,那么當(dāng)有數(shù)據(jù)時(shí)上游就會(huì)將數(shù)據(jù)推送到下游。

3.3.6. Hot vs Cold

Rx響應(yīng)式庫(kù)家族將響應(yīng)序列分為兩大類:“熱”和“冷”。這種區(qū)別主要與響應(yīng)式流對(duì)subscriber的響應(yīng)有關(guān):

  • 對(duì)于每一個(gè)Subscriber,包括在數(shù)據(jù)源位置,冷序列都會(huì)重新開始。例如,如果源包裝了HTTP調(diào)用,則將為每個(gè)subscription發(fā)出一個(gè)新的HTTP請(qǐng)求。
  • 對(duì)于每一個(gè)Subscriber ,熱序列并非都會(huì)從頭開始。相反,后面的subscriber只能收到訂閱完成之后產(chǎn)生的數(shù)據(jù)。但是一些熱響應(yīng)式流可以緩存或者對(duì)歷史數(shù)據(jù)全部或部分重放,也就是說遲來的subscriber可以收到在完成訂閱動(dòng)作之前的數(shù)據(jù)。從一般的角度來看,即使沒有訂閱者在訂閱數(shù)據(jù),熱序列甚至?xí)l(fā)出數(shù)據(jù)(“訂閱之前什么也沒有發(fā)生”規(guī)則的例外)。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容