RxCPP(一)編程模型

編程模型將涉及到以下幾塊內(nèi)容:

  • 數(shù)據(jù)流計(jì)算范例
  • rxcpp庫(kù)的介紹
  • Rx操作符
  • 調(diào)度
  • flat/ concatmap的區(qū)別
  • 更多重要的操作符

數(shù)據(jù)流計(jì)算簡(jiǎn)介

在函數(shù)響應(yīng)式編程(FRP)中,所有這些主題都以系統(tǒng)的方式結(jié)合在一起。
簡(jiǎn)單地說(shuō),響應(yīng)式編程就是使用異步數(shù)據(jù)流進(jìn)行編程。通過(guò)對(duì)流應(yīng)用各種操作,我們可以實(shí)現(xiàn)不同的計(jì)算目標(biāo)。響應(yīng)式程序中的主要任務(wù)是將數(shù)據(jù)轉(zhuǎn)換為流,而不管數(shù)據(jù)的來(lái)源是什么。事件流通常稱為可觀察對(duì)象,事件流訂閱者稱為觀察者。在可觀察對(duì)象和觀察者之間,存在流操作符(過(guò)濾器/轉(zhuǎn)換)。
由于隱式假設(shè)在數(shù)據(jù)通過(guò)操作符傳遞時(shí)數(shù)據(jù)源不會(huì)發(fā)生變化,所以在可觀察對(duì)象和觀察者之間可以有多個(gè)操作符路徑。不變性為無(wú)序執(zhí)行提供了選項(xiàng),并且可以將調(diào)度委托給稱為調(diào)度程序的特殊軟件。因此,可觀測(cè)對(duì)象、觀察者、流操作符和調(diào)度程序構(gòu)成了FRP模型的主干。

數(shù)據(jù)流計(jì)算范例

傳統(tǒng)上,程序員根據(jù)控制流來(lái)編碼他們的程序。這意味著我們將程序編碼為一系列小語(yǔ)句(序列、分支、迭代)或函數(shù)(包括遞歸),以及它們的關(guān)聯(lián)狀態(tài)。我們使用選擇(if/else)、迭代(while/for)和遞歸函數(shù)等構(gòu)造對(duì)計(jì)算進(jìn)行編碼。為這些類型的程序處理并發(fā)性和狀態(tài)管理確實(shí)存在問(wèn)題,并且會(huì)導(dǎo)致一些細(xì)微的bug。我們需要圍繞共享可變狀態(tài)放置鎖和其他同步原語(yǔ)。在編譯器級(jí)別,語(yǔ)言編譯器將解析源代碼以創(chuàng)建抽象語(yǔ)法樹(shù)(AST),執(zhí)行類型分析和代碼生成。事實(shí)上,AST是一個(gè)信息流圖,您可以在其中執(zhí)行數(shù)據(jù)流分析(用于數(shù)據(jù)/寄存器級(jí)優(yōu)化)和控制流分析,以利用處理器級(jí)的代碼管道優(yōu)化。即使程序員根據(jù)控制流來(lái)編碼程序,編譯器(至少部分)也會(huì)根據(jù)數(shù)據(jù)流來(lái)查看程序。這里的底線是,在每個(gè)程序中都有一個(gè)隱式數(shù)據(jù)流圖處于休眠狀態(tài)。
數(shù)據(jù)流計(jì)算將計(jì)算組織為顯式圖,其中節(jié)點(diǎn)是計(jì)算,邊是數(shù)據(jù)在節(jié)點(diǎn)之間流動(dòng)的路徑。如果我們對(duì)節(jié)點(diǎn)上的計(jì)算施加某些限制(例如通過(guò)處理輸入數(shù)據(jù)的副本來(lái)保存數(shù)據(jù)狀態(tài),避免使用就地算法),我們就可以利用并行性的機(jī)會(huì)。調(diào)度程序?qū)⑼ㄟ^(guò)對(duì)圖數(shù)據(jù)結(jié)構(gòu)進(jìn)行拓?fù)渑判騺?lái)尋找并行的機(jī)會(huì)。我們將使用流(路徑)和流(節(jié)點(diǎn))上的操作構(gòu)造圖。這可以通過(guò)聲明的方式實(shí)現(xiàn),因?yàn)椴僮鞣梢员痪幋a為lambdas,它可以執(zhí)行一些本地計(jì)算。函數(shù)式編程社區(qū)標(biāo)識(shí)了一組基本的標(biāo)準(zhǔn)(函數(shù)/流)操作符,如map、reduce、filter和take。在數(shù)據(jù)流計(jì)算框架中有一個(gè)條款是將數(shù)據(jù)轉(zhuǎn)換為流。用于機(jī)器學(xué)習(xí)的tensorflow庫(kù)就是使用這種范例的一個(gè)庫(kù)。即使圖創(chuàng)建并不像在tensorflow中那樣完全顯式的,但rxcpp庫(kù)也可以看作是一個(gè)數(shù)據(jù)流計(jì)算庫(kù)。由于函數(shù)式編程構(gòu)造支持延遲計(jì)算,所以在構(gòu)造具有異步數(shù)據(jù)流和操作的流管道時(shí),我們正在創(chuàng)建一個(gè)計(jì)算流圖。

rxcpp庫(kù)的介紹

rxcpp庫(kù)是一個(gè)只讀的c++庫(kù),可以從github下載。RxCpp依賴于現(xiàn)代c++結(jié)構(gòu),如語(yǔ)言級(jí)并發(fā)性、lambda函數(shù)/表達(dá)式、函數(shù)組合/轉(zhuǎn)換和操作符重載,來(lái)實(shí)現(xiàn)反應(yīng)性編程結(jié)構(gòu)。rxcpp庫(kù)的結(jié)構(gòu)類似于rx.net和rxjava等庫(kù)。
與任何其他反應(yīng)性編程框架一樣,在編寫(xiě)第一行代碼之前,每個(gè)人都應(yīng)該理解一些關(guān)鍵構(gòu)造。它們是:

  • 可觀察對(duì)象(可觀察到的流)
  • 觀察者(訂閱觀察對(duì)象)
  • 操作符(過(guò)濾、轉(zhuǎn)換和歸約)
  • 調(diào)度器

rxcpp大部分計(jì)算都是基于可觀測(cè)的概念。該庫(kù)提供了大量原語(yǔ)來(lái)創(chuàng)建來(lái)自各種數(shù)據(jù)源的可觀察流。數(shù)據(jù)源可以是范圍、stl容器等等。我們可以在可觀察對(duì)象和它們的消費(fèi)者之間放置操作符(稱為觀察者)。由于函數(shù)編程構(gòu)造支持函數(shù)的組合,所以我們可以將操作符鏈作為單個(gè)實(shí)體放在可觀察對(duì)象和訂閱流的觀察者之間。與庫(kù)關(guān)聯(lián)的調(diào)度程序?qū)⒋_保當(dāng)數(shù)據(jù)以可觀察流的形式可用時(shí),它將通過(guò)操作符傳遞,并且在經(jīng)過(guò)一系列篩選和轉(zhuǎn)換之后,如果有數(shù)據(jù)存在,將向訂閱者發(fā)出通知。當(dāng)調(diào)用訂閱者中的一個(gè)lambda方法時(shí),觀察者需要考慮一些事情。觀察員可以把注意力集中在他們主要負(fù)責(zé)的任務(wù)上。

Rx操作符

一個(gè)簡(jiǎn)單的可觀察/觀察者交互

讓我們編寫(xiě)一個(gè)簡(jiǎn)單的程序來(lái)幫助我們理解rxcpp庫(kù)的編程模型。在這個(gè)特殊的程序中,我們將有一個(gè)可觀察的流和一個(gè)訂閱流的觀察者。我們將使用range對(duì)象生成一系列從1到12的數(shù)字。在創(chuàng)建值的范圍及其上的可觀察值之后,我們將為可觀察值附加一個(gè)訂閱者。當(dāng)我們執(zhí)行程序時(shí),它會(huì)打印一系列的數(shù)字到控制臺(tái),并進(jìn)行額外的測(cè)試:First.cpp

//////////
//first.cpp
// g++ -I<PathToRxcpplibfoldersrc> First.cpp
//
#include "rxcpp/rx.hpp"
#include <iostream>

int main() {

 //------------- Create an Observable.. a stream of numbers
 auto observable = rxcpp::observable<>::range(1, 12);
     

 //------------ Subscribe (only OnNext and OnCompleted Lambda given
 observable.
subscribe(
           
    [](int v){printf("OnNext: %d\n", v);},
            
    [](){printf("OnCompleted\n");});

}

可觀察對(duì)象的過(guò)濾和轉(zhuǎn)換

下面的程序?qū)椭覀兝斫膺^(guò)濾(filter)和映射(map)操作符的工作原理,以及使用訂閱方法將觀察者連接到可觀察流的常用機(jī)制。filter方法對(duì)流的每個(gè)項(xiàng)計(jì)算謂詞,如果計(jì)算碰巧產(chǎn)生一個(gè)正斷言,則該項(xiàng)將出現(xiàn)在輸出流中。map操作符對(duì)輸入流的每個(gè)元素應(yīng)用一個(gè)表達(dá)式,并幫助生成一個(gè)輸出隊(duì)列:

//------------------ Second.cpp
#include "rxcpp/rx.hpp"

#include <iostream>

#include <array>

int main() {

    auto values = rxcpp::observable<>::range(1, 12).

        filter([](int v) {
        return v % 2 == 0;
            }).map([](int x) {return x * x; });
            values.
                subscribe(
                    [](int v) {printf("OnNext: %d\n", v); },

                    []() {printf("OnCompleted\n"); });


}

從c++容器中流化值

即使rx用于處理隨時(shí)間變化的數(shù)據(jù),我們也可以將stl容器轉(zhuǎn)換為響應(yīng)流。我們需要使用iterate操作符來(lái)進(jìn)行轉(zhuǎn)換。這有時(shí)很方便,并有助于從使用stl的代碼庫(kù)集成代碼:

//------------------ STLContainerStream.cpp
#include "rxcpp/rx.hpp"

#include <iostream>

#include <array>

int main() {

 std::array< int, 3 > a={{1, 2, 3}};
    auto values1 = rxcpp::observable<>::iterate(a);
    values1.
        subscribe(
            [](int v){printf("OnNext: %d\n", v);},
            [](){printf("OnCompleted\n");});


}

從零開(kāi)始創(chuàng)建可觀察對(duì)象

到目前為止,我們已經(jīng)編寫(xiě)了從范圍對(duì)象或stl容器創(chuàng)建可觀察流的程序。讓我們看看如何從頭創(chuàng)建一個(gè)可觀察的流:

//------------------ ObserverFromScratch.cpp
#include "rxcpp/rx.hpp"
#include "rxcpp/rx-test.hpp"
int main() {
    auto ints = rxcpp::observable<>::create<int>([](rxcpp::subscriber<int> s) {
        s.on_next(1);
        s.on_next(4);
        s.on_next(9);
        s.on_completed();
        });
    ints.subscribe([](int v) {printf("OnNext: %d\n", v); },
        []() {printf("OnCompleted\n"); });
}

連接可觀察到的流

concat不交錯(cuò)的發(fā)射兩個(gè)或多個(gè)Observable的發(fā)射物

image.png

Concat操作符連接多個(gè)Observable的輸出,就好像它們是一個(gè)Observable,第一個(gè)Observable發(fā)射的所有數(shù)據(jù)在第二個(gè)Observable發(fā)射的任何數(shù)據(jù)前面,以此類推。
直到前面一個(gè)Observable終止,'concat'才會(huì)訂閱額外的一個(gè)Observable。注意:因此,如果你嘗試連接一個(gè)"熱"Observable(這種Observable在創(chuàng)建后立即開(kāi)始發(fā)射數(shù)據(jù),即使沒(méi)有訂閱者),'concat'將不會(huì)看到也不會(huì)發(fā)射它之前發(fā)射的任何數(shù)據(jù)。
startwith操作符類似于'concat',但是它是插入到前面,而不是追加那些Observable的數(shù)據(jù)到原始Observable發(fā)射的數(shù)據(jù)序列:
image.png

merge操作符也差不多,它結(jié)合兩個(gè)或多個(gè)Observable的發(fā)射物,但是數(shù)據(jù)可能交錯(cuò),而concat不會(huì)讓多個(gè)Observable的發(fā)射物交錯(cuò)。

image.png

我們可以將兩個(gè)流連接起來(lái)形成一個(gè)新的流,這在某些情況下非常方便。讓我們通過(guò)編寫(xiě)一個(gè)簡(jiǎn)單的程序來(lái)看看它是如何工作的:

//------------------ Concat.cpp
#include "rxcpp/rx.hpp"
#include <iostream>

#include <array>

int main() {

    auto o1 = rxcpp::observable<>::range(1, 3);


    auto o2 = rxcpp::observable<>::range(4, 6);

    auto values = o1.concat(o2);

    values.
        subscribe(
            [](int v) {printf("OnNext: %d\n", v); },

            []() {printf("OnCompleted\n"); });

}

創(chuàng)建一個(gè)發(fā)射指定值的Observable

Just將單個(gè)數(shù)據(jù)轉(zhuǎn)換為發(fā)射那個(gè)數(shù)據(jù)的Observable:


image.png
//------------------ sixth.cpp
#include "rxcpp/rx.hpp"
#include <iostream>
#include <array>
int main() {
    auto values = rxcpp::observable<>::just(1);
    values.
        subscribe(
            [](int v) {printf("OnNext: %d\n", v); },
            []() {printf("OnCompleted\n"); });
}

Take

只發(fā)射前面的N項(xiàng)數(shù)據(jù)


image.png

使用take操作符讓你可以修改Observable的行為,只返回前面的N項(xiàng)數(shù)據(jù),然后發(fā)射完成通知,忽略剩余的數(shù)據(jù)。

image.png

如果你對(duì)一個(gè)Observable使用take(n)(或它的同義詞limit(n))操作符,而那個(gè)Observable發(fā)射的數(shù)據(jù)少于N項(xiàng),那么take操作生成的Observable不會(huì)拋異常或發(fā)射onError通知,在完成前它只會(huì)發(fā)射相同的少量數(shù)據(jù)。

//------------------ fifth.cpp
#include "rxcpp/rx.hpp"

#include <iostream>

#include <array>

int main() {

    auto values = rxcpp::observable<>::range(1); // infinite (until overflow) stream of integers

    auto s1 = values.
        take(3).
        map([](int prime) { return std::make_tuple("1:", prime); });

    auto s2 = values.
        take(3).
        map([](int prime) { return std::make_tuple("2:", prime); });

    s1.
        concat(s2).
        subscribe(rxcpp::util::apply_to(
            [](const char* s, int p) {
                printf("%s %d\n", s, p);
            }));
}

其中,apply_to可以替換成一下方式:

        subscribe(
            [](std::tuple<const char*, int> p) {
                printf("%s, %d\n",std::get<0>(p),std::get<1>(p));
            });

從可觀察的流取消訂閱

下面的程序顯示了如何訂閱一個(gè)可觀察流并停止訂閱。程序只顯示了可用的選項(xiàng),應(yīng)該參考文檔:

//---------------- Unsubscribe.cpp
#include "rxcpp/rx.hpp"
#include <iostream>
int main() {
    auto subs = rxcpp::composite_subscription();
    auto values = rxcpp::observable<>::range(1, 10);
    values.subscribe(
        subs,
        [&subs](int v) {
            printf("OnNext: %d\n", v);
            if (v == 6)
                subs.unsubscribe(); //-- Stop recieving events
        },
        []() {printf("OnCompleted\n"); });
}

map

對(duì)Observable發(fā)射的每一項(xiàng)數(shù)據(jù)應(yīng)用一個(gè)函數(shù),執(zhí)行變換操作
map操作符對(duì)原始Observable發(fā)射的每一項(xiàng)數(shù)據(jù)應(yīng)用一個(gè)你選擇的函數(shù),然后返回一個(gè)發(fā)射這些結(jié)果的Observable。

image.png

大理石圖的頂部顯示了兩個(gè)時(shí)間線,這些時(shí)間線通過(guò)將第二個(gè)時(shí)間線的內(nèi)容附加到第一個(gè)時(shí)間線來(lái)組合在一起,形成一個(gè)復(fù)合時(shí)間線。
Map.cpp

//------------------ Map.cpp
#include "rxcpp/rx.hpp"
#include <iostream>
#include <array>
int main() {
    auto ints = rxcpp::observable<>::range(1, 10).
        map([](int n) {return n * n; });

    ints.subscribe(
        [](int v) {printf("OnNext: %d\n", v); },
        []() {printf("OnCompleted\n"); });

}

rxcpp(流)操作符

面向流處理的一個(gè)主要優(yōu)點(diǎn)是,我們可以將函數(shù)式編程原語(yǔ)應(yīng)用于它們。用rxcpp的話說(shuō),處理是使用操作符完成的。它們只是流上的過(guò)濾、轉(zhuǎn)換、聚合和規(guī)約。在前面的示例中,我們已經(jīng)了解了map、filter和take操作符的工作原理。

平均運(yùn)算符

平均運(yùn)算符從可觀察的流計(jì)算值的算術(shù)平均值。所支持的其他統(tǒng)計(jì)運(yùn)算符包括:

  • 最小值
  • 最大值
  • 計(jì)數(shù)
  • 求和

計(jì)算原始Observable發(fā)射數(shù)字的平均值并發(fā)射它


image.png

下面的程序只演示了平均操作符。對(duì)于前面列表中的其他操作符,模式是相同的Average.cpp

//----------- Average.cpp
#include "rxcpp/rx.hpp"
#include <iostream>
int main() {
    auto values = rxcpp::observable<>::range(1, 20).average();
    values.
        subscribe(
            [](double v) {printf("average: %lf\n", v); },
            []() {printf("OnCompleted\n"); });
}

掃描操作符

掃描操作符對(duì)流的每個(gè)元素依次應(yīng)用一個(gè)函數(shù),并將該值累積為種子值。下面的程序生成一系列數(shù)字的平均值,這些值是在何時(shí)累計(jì)的。
連續(xù)地對(duì)數(shù)據(jù)序列的每一項(xiàng)應(yīng)用一個(gè)函數(shù),然后連續(xù)發(fā)射結(jié)果


image.png

Scan操作符對(duì)原始Observable發(fā)射的第一項(xiàng)數(shù)據(jù)應(yīng)用一個(gè)函數(shù),然后將那個(gè)函數(shù)的結(jié)果作為自己的第一項(xiàng)數(shù)據(jù)發(fā)射。它將函數(shù)的結(jié)果同第二項(xiàng)數(shù)據(jù)一起填充給這個(gè)函數(shù)來(lái)產(chǎn)生它自己的第二項(xiàng)數(shù)據(jù)。它持續(xù)進(jìn)行這個(gè)過(guò)程來(lái)產(chǎn)生剩余的數(shù)據(jù)序列。這個(gè)操作符在某些情況下被叫做accumulator。


image.png
//----------- Scan.cpp
#include "rxcpp/rx.hpp"
#include <iostream>
int main() {
    int count = 0;
    auto values = rxcpp::observable<>::range(1, 20).
        scan(
            0,
            [&count](int seed, int v) {
                count++;
                return seed + v;
            });
    values.subscribe(
        [&](int v) {printf("Average through Scan: %f\n", (double)v / count); },
        []() {printf("OnCompleted\n"); });
}

通過(guò)管道組合運(yùn)算符

RxCpp庫(kù)允許您連接或組合運(yùn)算符以啟用運(yùn)算符組合。 該庫(kù)允許您使用管道(|)運(yùn)算符來(lái)組合運(yùn)算符,程序員可以將一個(gè)運(yùn)算符的輸出傳遞給另一個(gè)運(yùn)算符,就好像它們位于UNIX shell的命令行中一樣。 這使我們能夠理解一段代碼的作用。下面的程序使用| 運(yùn)算符以映射范圍。RxCpp樣品含有使用管功能的例子很多

//------------------ Map_With_Pipe.cpp
#include "rxcpp/rx.hpp"
namespace Rx {
    using namespace rxcpp;
    using namespace rxcpp::sources;
    using namespace rxcpp::operators;
    using namespace rxcpp::util;
}
using namespace Rx;
#include <iostream>
int main() {
    auto ints = rxcpp::observable<>::range(1, 10) |
        map([](int n) {return n * n; });
    ints.subscribe(
        [](int v) {printf("OnNext: %d\n", v); },
        []() {printf("OnCompleted\n"); });
}

以上的管道操作等價(jià)于:

auto ints = rxcpp::observable<>::range(1, 10);
auto intsFromMap = ints.map([](int n) {return n * n; });

調(diào)度

我們已經(jīng)在上一節(jié)中了解了Observables,Operators和Observers。我們已經(jīng)知道,在Observables和Observers之間,我們可以應(yīng)用標(biāo)準(zhǔn)的Rx運(yùn)算符來(lái)過(guò)濾和轉(zhuǎn)換Streams。在函數(shù)式編程的情況下,我們編寫(xiě)不可變函數(shù)(沒(méi)有副作用的函數(shù)),不可變性的結(jié)果是無(wú)序執(zhí)行的可能性。如果我們可以保證永遠(yuǎn)不會(huì)修改對(duì)運(yùn)算符的輸入,那么我們?cè)u(píng)估的順序無(wú)關(guān)緊要。由于Rx程序?qū)⒉倏v多個(gè)觀察者和訂閱者,我們可以將選擇執(zhí)行順序的任務(wù)委派給調(diào)度程序模塊。默認(rèn)情況下,RxCpp將在我們稱為訂閱者方法的線程中安排執(zhí)行。可以使用observe_on和subscriber_on運(yùn)算符指定不同的線程。此外,一些Observable運(yùn)算符將Scheduler作為參數(shù),其中執(zhí)行可以在Scheduler管理的線程中進(jìn)行。
該RxCpp庫(kù)支持以下兩種類型的調(diào)度:

  • ImmediateScheduler
  • EventLoopScheduler

ObserveOn

指定一個(gè)觀察者在哪個(gè)調(diào)度器上觀察這個(gè)Observable

image.png

很多ReactiveX實(shí)現(xiàn)都使用調(diào)度器 "Scheduler"來(lái)管理多線程環(huán)境中Observable的轉(zhuǎn)場(chǎng)。你可以使用ObserveOn操作符指定Observable在一個(gè)特定的調(diào)度器上發(fā)送通知給觀察者 (調(diào)用觀察者的onNext, onCompleted, onError方法)。
image.png

注意:當(dāng)遇到一個(gè)異常時(shí)ObserveOn會(huì)立即向前傳遞這個(gè)onError終止通知,它不會(huì)等待慢速消費(fèi)的Observable接受任何之前它已經(jīng)收到但還沒(méi)有發(fā)射的數(shù)據(jù)項(xiàng)。這可能意味著onError通知會(huì)跳到(并吞掉)原始Observable發(fā)射的數(shù)據(jù)項(xiàng)前面,正如圖例上展示的。

RxCpp庫(kù)默認(rèn)是單線程的。 您可以將其配置為使用某些運(yùn)算符在多個(gè)線程中運(yùn)行:

//----------ObserveOn.cpp
#include "rxcpp/rx.hpp"
#include <iostream>
#include <thread>
int main() {
    //---------------- Generate a range of values
    //---------------- Apply Square function
    auto values = rxcpp::observable<>::range(1, 4).
        map([](int v) {

        return v * v;

            });
    //------------- Emit the current thread details
    std::cout << "Main Thread id => "
        << std::this_thread::get_id()
        << std::endl;
    //---------- observe_on another thread....
    //---------- make it blocking to 
    values.observe_on(rxcpp::synchronize_new_thread()).
        as_blocking().
        subscribe(
            [](int v) {
                std::cout << "Observable Thread id => "
                    << std::this_thread::get_id()
                    << "  " << v << std::endl; },
            []() { std::cout << "OnCompleted" << std::endl; });
    //------------------ Print the main thread details
    std::cout << "Main Thread id => "
        << std::this_thread::get_id()
        << std::endl;
}

SubscribeOn

SubscribeOn操作符的作用類似,但它是用于指定Observable本身在特定的調(diào)度器上執(zhí)行,它同樣會(huì)在那個(gè)調(diào)度器上給觀察者發(fā)通知。


image.png

ObserveOn操作符的作用類似,但是功能很有限,它指示Observable在一個(gè)指定的調(diào)度器上給觀察者發(fā)通知,下面的程序?qū)⒀菔緎ubscribe_on方法的用法:

//---------- SubscribeOn.cpp
#include "rxcpp/rx.hpp"
#include "rxcpp/rx-test.hpp"
#include <iostream>
#include <thread>
#include <mutex>
std::mutex console_mutex;
void CTDetails(int val = 0) {
    console_mutex.lock();
    std::cout << "Current Thread id => "
        << std::this_thread::get_id()
        << val
        << std::endl;
    console_mutex.unlock();
}

void Yield(bool y) {
    if (y) { std::this_thread::yield(); }
}

int main() {

    //----------- coordination object
    auto coordination = rxcpp::serialize_new_thread();

    //----------------- retrieve the worker
    auto worker = coordination.create_coordinator().get_worker();

    //-------------- Create an Obsrevable
    auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50)).
        take(5).
        replay(coordination);
    // Subscribe from the beginning
    worker.schedule([&](const rxcpp::schedulers::schedulable&)
        {
            values.subscribe(
                [](long v) {CTDetails(v); },
                []() { CTDetails(); });
        });
    // Wait before subscribing
    worker.schedule(coordination.now() + std::chrono::milliseconds(125),
        [&](const rxcpp::schedulers::schedulable&) {
            values.subscribe(
                [](long v) {CTDetails(v*v); },
                []() { CTDetails(); });
        });
    // Start emitting
    worker.schedule([&](const rxcpp::schedulers::schedulable&) {
        values.connect();
        });
    // Add blocking subscription to see results
    values.as_blocking().subscribe();
}

flat/ concatmap的區(qū)別

開(kāi)發(fā)人員之間的一個(gè)混淆之處通常集中在flatmap和concatmap操作符上。它們之間的差異非常細(xì)微。

flatmap

FlatMap將一個(gè)發(fā)射數(shù)據(jù)的Observable變換為多個(gè)Observables,然后將它們發(fā)射的數(shù)據(jù)合并后放進(jìn)一個(gè)單獨(dú)的Observable


image.png

FlatMap操作符使用一個(gè)指定的函數(shù)對(duì)原始Observable發(fā)射的每一項(xiàng)數(shù)據(jù)執(zhí)行變換操作,這個(gè)函數(shù)返回一個(gè)本身也發(fā)射數(shù)據(jù)的Observable,然后FlatMap合并這些Observables發(fā)射的數(shù)據(jù),最后將合并后的結(jié)果當(dāng)做它自己的數(shù)據(jù)序列發(fā)射。
這個(gè)方法是很有用的,例如,當(dāng)你有一個(gè)這樣的Observable:它發(fā)射一個(gè)數(shù)據(jù)序列,這些數(shù)據(jù)本身包含Observable成員或者可以變換為Observable,因此你可以創(chuàng)建一個(gè)新的Observable發(fā)射這些次級(jí)Observable發(fā)射的數(shù)據(jù)的完整集合。
注意:

  • FlatMap對(duì)這些Observables發(fā)射的數(shù)據(jù)做的是合并(merge)操作,因此它們可能是交錯(cuò)的。
  • 如果任何一個(gè)通過(guò)這個(gè)flatMap操作產(chǎn)生的單獨(dú)的Observable調(diào)用onError異常終止了,這個(gè)Observable自身會(huì)立即調(diào)用onError并終止。
    如下,flatmap將lambda應(yīng)用于可觀察流并生成一個(gè)新的可觀察流。生成的流合并在一起以提供輸出:
#include "rxcpp/rx.hpp"
#include <iostream>
namespace rxu = rxcpp::util;
#include <array>
#include <string>
//#include <tuple>
int main() {
    std::array< std::string, 4 > a = { {"Praseed", "Peter", "Sanjay","Raju"} };
    auto values = rxcpp::observable<>::iterate(a).flat_map(
        [](std::string v) {
            std::array<std::string, 3> salutation = { { "Mr." ,  "Monsieur" , "Sri" } };
            return rxcpp::observable<>::iterate(salutation);
        },
        [](std::string f, std::string s) {
            return s + " " + f;
        });
    values.subscribe([](std::string f) {
        std::cout << f << std::endl; }, []() {std::cout << "Hello World.." << std::endl; });
}

concatmap

concatmap不會(huì)讓變換后的Observables發(fā)射的數(shù)據(jù)交錯(cuò),它按照嚴(yán)格的順序發(fā)射這些數(shù)據(jù)。


image.png

contact和merge

為了讓區(qū)別更清楚,讓我們看一下兩個(gè)操作符:concat和merge。讓我們來(lái)看看流的串聯(lián)是如何工作的。它基本上是一個(gè)接一個(gè)地添加流的內(nèi)容,保持順序:

#include "rxcpp/rx.hpp"
#include <iostream>
#include <array>
int main() {
    auto o1 = rxcpp::observable<>::range(1, 3);
    auto o2 = rxcpp::observable<>::range(4, 6);
    auto values = o1.concat(o2);
    values.
        subscribe(
            [](int v) {printf("OnNext: %d\n", v); },

            []() {printf("OnCompleted\n"); });
}
image.png

下面的云石圖清楚地顯示了當(dāng)我們合并兩個(gè)可觀察到的流時(shí)會(huì)發(fā)生什么。輸出隊(duì)列的內(nèi)容將是兩個(gè)流的交叉組合:

//---------------- Merge.cpp
#include "rxcpp/rx.hpp"
#include <iostream>
#include <array>
int main() {
    auto o1 = rxcpp::observable<>::range(1, 3);
    auto o2 = rxcpp::observable<>::from(4, 5, 6);
    auto values = o1.merge(o2);
    values.subscribe(
        [](int v) {printf("OnNext: %d\n", v); }, []() {printf("OnCompleted\n"); });
}
image.png

flatmap和concatmap或多或少都執(zhí)行相同的操作。不同之處在于值的組合方式。flatmap使用merge操作符,而concatmap使用concact操作符。merge操作符順序無(wú)關(guān)緊要。concat操作符將可觀察值一個(gè)接一個(gè)地追加。這就是為什么按我們期望的順序得到值。

更多重要的操作符

現(xiàn)在我們了解了反應(yīng)性編程模型的關(guān)鍵,因?yàn)槲覀冇懻摿艘恍┗局黝},比如可觀察性、觀察者、操作符和調(diào)度程序。為了更好地編寫(xiě)邏輯,我們應(yīng)該了解更多的運(yùn)算符。

tap

tap是一個(gè)RxCPP管道操作符,返回與源可觀察相同的可觀察值,可用于執(zhí)行副作用,例如記錄源可觀察值發(fā)出的每個(gè)值。我們將探索tap操作符,它有助于查看流的內(nèi)容:

//----------- TapExample.cpp
#include "rxcpp/rx.hpp"
#include <iostream>
int main() {
    //---- Create a mapped Observable
    auto ints = rxcpp::observable<>::range(1, 3).
        map([](int n) {return n * n; });
    //---- Apply the tap operator...The Operator 
    //---- will act as a filter/debug operator
    auto values = ints.
        tap(
            [](int v)
            {printf("Tap -       OnNext: %d\n", v); },
            []() {printf("Tap -       OnCompleted\n");
            });
    //------- Do some action
    values.
        subscribe(
            [](int v) {printf("Subscribe - OnNext: %d\n", v); },
            []() {printf("Subscribe - OnCompleted\n"); });
}

defer

直到有觀察者訂閱時(shí)才創(chuàng)建Observable,并且為每個(gè)觀察者創(chuàng)建一個(gè)新的Observable
defer操作符接受一個(gè)你選擇的Observable工廠函數(shù)作為單個(gè)參數(shù)。這個(gè)函數(shù)沒(méi)有參數(shù),返回一個(gè)Observable。
defer操作符會(huì)一直等待直到有觀察者訂閱它,然后它使用Observable工廠方法生成一個(gè)Observable。它對(duì)每個(gè)觀察者都這樣做,因此盡管每個(gè)訂閱者都以為自己訂閱的是同一個(gè)Observable,事實(shí)上每個(gè)訂閱者獲取的是它們自己的單獨(dú)的數(shù)據(jù)序列。
在某些情況下,等待直到最后一分鐘(就是知道訂閱發(fā)生時(shí))才生成Observable可以確保Observable包含最新的數(shù)據(jù)。


image.png

當(dāng)有人試圖連接到指定的可觀察對(duì)象時(shí),我們調(diào)用observable_factory lambda:

//----------- DeferExample.cpp
#include "rxcpp/rx.hpp"
#include <iostream>
int main() {
    auto observable_factory = []() {
        return rxcpp::observable<>::range(1, 3).
            map([](int n) {return n * n; });
    };
    auto ints = rxcpp::observable<>::defer(observable_factory);
    ints.
        subscribe(
            [](int v) {printf("OnNext: %d\n", v); },
            []() {printf("OnCompleted\n"); });
    ints.
        subscribe(
            [](int v) {printf("2nd OnNext: %d\n", v); },
            []() {printf("2nd OnCompleted\n"); });
}

buffer

buffer操作符定期收集Observable的數(shù)據(jù)放進(jìn)一個(gè)數(shù)據(jù)包裹,然后發(fā)射這些數(shù)據(jù)包裹,而不是一次發(fā)射一個(gè)值。


image.png

buffer操作符將一個(gè)Observable變換為另一個(gè),原來(lái)的Observable正常發(fā)射數(shù)據(jù),變換產(chǎn)生的Observable發(fā)射這些數(shù)據(jù)的緩存集合。Buffer操作符在很多語(yǔ)言特定的實(shí)現(xiàn)中有很多種變體,它們?cè)谌绾尉彺孢@個(gè)問(wèn)題上存在區(qū)別。
注意:如果原來(lái)的Observable發(fā)射了一個(gè)onError通知,Buffer會(huì)立即傳遞這個(gè)通知,而不是首先發(fā)射緩存的數(shù)據(jù),即使在這之前緩存中包含了原始Observable發(fā)射的數(shù)據(jù)。
Window操作符與Buffer類似,但是它在發(fā)射之前把收集到的數(shù)據(jù)放進(jìn)單獨(dú)的Observable,而不是放進(jìn)一個(gè)數(shù)據(jù)結(jié)構(gòu)。詳見(jiàn):https://mcxiaoke.gitbooks.io/rxdocs/content/operators/Buffer.html
buffer操作符發(fā)出一個(gè)包含一個(gè)可觀察對(duì)象的非重疊內(nèi)容的可觀察對(duì)象,每個(gè)可觀察對(duì)象最多包含count參數(shù)指定的項(xiàng)數(shù)。這將幫助我們以適合內(nèi)容的方式處理項(xiàng)目:

//----------- BufferExample.cpp
#include "rxcpp/rx.hpp"
#include <iostream>
int main() {
    auto values = rxcpp::observable<>::range(1, 10).buffer(3);
    values.
        subscribe(
            [](std::vector<int> v) {
                printf("OnNext:{");
                std::for_each(v.begin(), v.end(), [](int a) {
                    printf(" %d", a);
                    });
                printf("}\n");
            },
            []() {printf("OnCompleted\n"); });
}

timer

創(chuàng)建一個(gè)Observable,它在一個(gè)給定的延遲后發(fā)射一個(gè)特殊的值。


image.png

這個(gè)函數(shù)在庫(kù)中有不同的版本,timer操作符默認(rèn)在computation調(diào)度器上執(zhí)行。有一個(gè)變體可以通過(guò)可選參數(shù)指定Scheduler:

//----------- TimerExample.cpp
#include "rxcpp/rx.hpp"
#include "rxcpp/rx-test.hpp"
#include <iostream>
#include <thread>
#include <chrono>
int main() {
    auto scheduler = rxcpp::observe_on_new_thread();

    auto period = std::chrono::seconds(3);

    auto values = rxcpp::observable<>::timer(period, scheduler).
        finally([]() {
        std::cout << "The final action, thread id: " << std::this_thread::get_id() << std::endl;
            });
    values.
        as_blocking().
        subscribe(
            [](int v) { std::cout << "OnNext: " << v << "thread id: " << std::this_thread::get_id() << std::endl; },

            []() {std::cout << "OnCompleted, thread id: " << std::this_thread::get_id() << std::endl; });
    std::cout << "main thread id: " << std::this_thread::get_id() << std::endl;
    //必須在主線程sleep,否則finally中的內(nèi)容打印不全,因?yàn)槌绦蛲顺龊螅€程銷毀
    std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 創(chuàng)建操作 用于創(chuàng)建Observable的操作符Create通過(guò)調(diào)用觀察者的方法從頭創(chuàng)建一個(gè)ObservableEm...
    rkua閱讀 1,962評(píng)論 0 1
  • 響應(yīng)式編程簡(jiǎn)介 響應(yīng)式編程是一種基于異步數(shù)據(jù)流概念的編程模式。數(shù)據(jù)流就像一條河:它可以被觀測(cè),被過(guò)濾,被操作,或者...
    說(shuō)碼解字閱讀 3,577評(píng)論 0 5
  • 記錄RxJava操作符,方便查詢(2.2.2版本) 英文文檔地址:http://reactivex.io/docu...
    凌云飛魚(yú)閱讀 1,055評(píng)論 0 0
  • 1. 響應(yīng)式編程 1.1 響應(yīng)式編程概念 響應(yīng)式編程是一種通過(guò)異步和數(shù)據(jù)流來(lái)構(gòu)建事物關(guān)系的編程模型。 事物的關(guān)系 ...
    師傅_有妖氣閱讀 1,494評(píng)論 0 0
  • 創(chuàng)建操作: 1. create 通過(guò)調(diào)用觀察者的方法從頭調(diào)用一個(gè)Observable2. just 將對(duì)象或?qū)ο蠹?..
    perry_Fan閱讀 1,014評(píng)論 0 0

友情鏈接更多精彩內(nèi)容