編程模型將涉及到以下幾塊內(nèi)容:
- 數(shù)據(jù)流計(jì)算范例
- rxcpp庫(kù)的介紹
- Rx操作符
- 調(diào)度
- flat/ concatmap的區(qū)別
- 更多重要的操作符
數(shù)據(jù)流計(jì)算簡(jiǎn)介
在函數(shù)響應(yīng)式編程(FRP)中,所有這些主題都以系統(tǒng)的方式結(jié)合在一起。
簡(jiǎn)單地說(shuō),響應(yīng)式編程就是使用異步數(shù)據(jù)流進(jìn)行編程。通過(guò)對(duì)流應(yīng)用各種操作,我們可以實(shí)現(xiàn)不同的計(jì)算目標(biāo)。響應(yīng)式程序中的主要任務(wù)是將數(shù)據(jù)轉(zhuǎn)換為流,而不管數(shù)據(jù)的來(lái)源是什么。事件流通常稱為可觀察對(duì)象,事件流訂閱者稱為觀察者。在可觀察對(duì)象和觀察者之間,存在流操作符(過(guò)濾器/轉(zhuǎn)換)。
由于隱式假設(shè)在數(shù)據(jù)通過(guò)操作符傳遞時(shí)數(shù)據(jù)源不會(huì)發(fā)生變化,所以在可觀察對(duì)象和觀察者之間可以有多個(gè)操作符路徑。不變性為無(wú)序執(zhí)行提供了選項(xiàng),并且可以將調(diào)度委托給稱為調(diào)度程序的特殊軟件。因此,可觀測(cè)對(duì)象、觀察者、流操作符和調(diào)度程序構(gòu)成了FRP模型的主干。
數(shù)據(jù)流計(jì)算范例
傳統(tǒng)上,程序員根據(jù)控制流來(lái)編碼他們的程序。這意味著我們將程序編碼為一系列小語(yǔ)句(序列、分支、迭代)或函數(shù)(包括遞歸),以及它們的關(guān)聯(lián)狀態(tài)。我們使用選擇(if/else)、迭代(while/for)和遞歸函數(shù)等構(gòu)造對(duì)計(jì)算進(jìn)行編碼。為這些類型的程序處理并發(fā)性和狀態(tài)管理確實(shí)存在問(wèn)題,并且會(huì)導(dǎo)致一些細(xì)微的bug。我們需要圍繞共享可變狀態(tài)放置鎖和其他同步原語(yǔ)。在編譯器級(jí)別,語(yǔ)言編譯器將解析源代碼以創(chuàng)建抽象語(yǔ)法樹(shù)(AST),執(zhí)行類型分析和代碼生成。事實(shí)上,AST是一個(gè)信息流圖,您可以在其中執(zhí)行數(shù)據(jù)流分析(用于數(shù)據(jù)/寄存器級(jí)優(yōu)化)和控制流分析,以利用處理器級(jí)的代碼管道優(yōu)化。即使程序員根據(jù)控制流來(lái)編碼程序,編譯器(至少部分)也會(huì)根據(jù)數(shù)據(jù)流來(lái)查看程序。這里的底線是,在每個(gè)程序中都有一個(gè)隱式數(shù)據(jù)流圖處于休眠狀態(tài)。
數(shù)據(jù)流計(jì)算將計(jì)算組織為顯式圖,其中節(jié)點(diǎn)是計(jì)算,邊是數(shù)據(jù)在節(jié)點(diǎn)之間流動(dòng)的路徑。如果我們對(duì)節(jié)點(diǎn)上的計(jì)算施加某些限制(例如通過(guò)處理輸入數(shù)據(jù)的副本來(lái)保存數(shù)據(jù)狀態(tài),避免使用就地算法),我們就可以利用并行性的機(jī)會(huì)。調(diào)度程序?qū)⑼ㄟ^(guò)對(duì)圖數(shù)據(jù)結(jié)構(gòu)進(jìn)行拓?fù)渑判騺?lái)尋找并行的機(jī)會(huì)。我們將使用流(路徑)和流(節(jié)點(diǎn))上的操作構(gòu)造圖。這可以通過(guò)聲明的方式實(shí)現(xiàn),因?yàn)椴僮鞣梢员痪幋a為lambdas,它可以執(zhí)行一些本地計(jì)算。函數(shù)式編程社區(qū)標(biāo)識(shí)了一組基本的標(biāo)準(zhǔn)(函數(shù)/流)操作符,如map、reduce、filter和take。在數(shù)據(jù)流計(jì)算框架中有一個(gè)條款是將數(shù)據(jù)轉(zhuǎn)換為流。用于機(jī)器學(xué)習(xí)的tensorflow庫(kù)就是使用這種范例的一個(gè)庫(kù)。即使圖創(chuàng)建并不像在tensorflow中那樣完全顯式的,但rxcpp庫(kù)也可以看作是一個(gè)數(shù)據(jù)流計(jì)算庫(kù)。由于函數(shù)式編程構(gòu)造支持延遲計(jì)算,所以在構(gòu)造具有異步數(shù)據(jù)流和操作的流管道時(shí),我們正在創(chuàng)建一個(gè)計(jì)算流圖。
rxcpp庫(kù)的介紹
rxcpp庫(kù)是一個(gè)只讀的c++庫(kù),可以從github下載。RxCpp依賴于現(xiàn)代c++結(jié)構(gòu),如語(yǔ)言級(jí)并發(fā)性、lambda函數(shù)/表達(dá)式、函數(shù)組合/轉(zhuǎn)換和操作符重載,來(lái)實(shí)現(xiàn)反應(yīng)性編程結(jié)構(gòu)。rxcpp庫(kù)的結(jié)構(gòu)類似于rx.net和rxjava等庫(kù)。
與任何其他反應(yīng)性編程框架一樣,在編寫(xiě)第一行代碼之前,每個(gè)人都應(yīng)該理解一些關(guān)鍵構(gòu)造。它們是:
- 可觀察對(duì)象(可觀察到的流)
- 觀察者(訂閱觀察對(duì)象)
- 操作符(過(guò)濾、轉(zhuǎn)換和歸約)
- 調(diào)度器
rxcpp大部分計(jì)算都是基于可觀測(cè)的概念。該庫(kù)提供了大量原語(yǔ)來(lái)創(chuàng)建來(lái)自各種數(shù)據(jù)源的可觀察流。數(shù)據(jù)源可以是范圍、stl容器等等。我們可以在可觀察對(duì)象和它們的消費(fèi)者之間放置操作符(稱為觀察者)。由于函數(shù)編程構(gòu)造支持函數(shù)的組合,所以我們可以將操作符鏈作為單個(gè)實(shí)體放在可觀察對(duì)象和訂閱流的觀察者之間。與庫(kù)關(guān)聯(lián)的調(diào)度程序?qū)⒋_保當(dāng)數(shù)據(jù)以可觀察流的形式可用時(shí),它將通過(guò)操作符傳遞,并且在經(jīng)過(guò)一系列篩選和轉(zhuǎn)換之后,如果有數(shù)據(jù)存在,將向訂閱者發(fā)出通知。當(dāng)調(diào)用訂閱者中的一個(gè)lambda方法時(shí),觀察者需要考慮一些事情。觀察員可以把注意力集中在他們主要負(fù)責(zé)的任務(wù)上。
Rx操作符
一個(gè)簡(jiǎn)單的可觀察/觀察者交互
讓我們編寫(xiě)一個(gè)簡(jiǎn)單的程序來(lái)幫助我們理解rxcpp庫(kù)的編程模型。在這個(gè)特殊的程序中,我們將有一個(gè)可觀察的流和一個(gè)訂閱流的觀察者。我們將使用range對(duì)象生成一系列從1到12的數(shù)字。在創(chuàng)建值的范圍及其上的可觀察值之后,我們將為可觀察值附加一個(gè)訂閱者。當(dāng)我們執(zhí)行程序時(shí),它會(huì)打印一系列的數(shù)字到控制臺(tái),并進(jìn)行額外的測(cè)試:First.cpp
//////////
//first.cpp
// g++ -I<PathToRxcpplibfoldersrc> First.cpp
//
#include "rxcpp/rx.hpp"
#include <iostream>
int main() {
//------------- Create an Observable.. a stream of numbers
auto observable = rxcpp::observable<>::range(1, 12);
//------------ Subscribe (only OnNext and OnCompleted Lambda given
observable.
subscribe(
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
}
可觀察對(duì)象的過(guò)濾和轉(zhuǎn)換
下面的程序?qū)椭覀兝斫膺^(guò)濾(filter)和映射(map)操作符的工作原理,以及使用訂閱方法將觀察者連接到可觀察流的常用機(jī)制。filter方法對(duì)流的每個(gè)項(xiàng)計(jì)算謂詞,如果計(jì)算碰巧產(chǎn)生一個(gè)正斷言,則該項(xiàng)將出現(xiàn)在輸出流中。map操作符對(duì)輸入流的每個(gè)元素應(yīng)用一個(gè)表達(dá)式,并幫助生成一個(gè)輸出隊(duì)列:
//------------------ Second.cpp
#include "rxcpp/rx.hpp"
#include <iostream>
#include <array>
int main() {
auto values = rxcpp::observable<>::range(1, 12).
filter([](int v) {
return v % 2 == 0;
}).map([](int x) {return x * x; });
values.
subscribe(
[](int v) {printf("OnNext: %d\n", v); },
[]() {printf("OnCompleted\n"); });
}
從c++容器中流化值
即使rx用于處理隨時(shí)間變化的數(shù)據(jù),我們也可以將stl容器轉(zhuǎn)換為響應(yīng)流。我們需要使用iterate操作符來(lái)進(jìn)行轉(zhuǎn)換。這有時(shí)很方便,并有助于從使用stl的代碼庫(kù)集成代碼:
//------------------ STLContainerStream.cpp
#include "rxcpp/rx.hpp"
#include <iostream>
#include <array>
int main() {
std::array< int, 3 > a={{1, 2, 3}};
auto values1 = rxcpp::observable<>::iterate(a);
values1.
subscribe(
[](int v){printf("OnNext: %d\n", v);},
[](){printf("OnCompleted\n");});
}
從零開(kāi)始創(chuàng)建可觀察對(duì)象
到目前為止,我們已經(jīng)編寫(xiě)了從范圍對(duì)象或stl容器創(chuàng)建可觀察流的程序。讓我們看看如何從頭創(chuàng)建一個(gè)可觀察的流:
//------------------ ObserverFromScratch.cpp
#include "rxcpp/rx.hpp"
#include "rxcpp/rx-test.hpp"
int main() {
auto ints = rxcpp::observable<>::create<int>([](rxcpp::subscriber<int> s) {
s.on_next(1);
s.on_next(4);
s.on_next(9);
s.on_completed();
});
ints.subscribe([](int v) {printf("OnNext: %d\n", v); },
[]() {printf("OnCompleted\n"); });
}
連接可觀察到的流
concat不交錯(cuò)的發(fā)射兩個(gè)或多個(gè)Observable的發(fā)射物

Concat操作符連接多個(gè)Observable的輸出,就好像它們是一個(gè)Observable,第一個(gè)Observable發(fā)射的所有數(shù)據(jù)在第二個(gè)Observable發(fā)射的任何數(shù)據(jù)前面,以此類推。直到前面一個(gè)Observable終止,'concat'才會(huì)訂閱額外的一個(gè)Observable。注意:因此,如果你嘗試連接一個(gè)"熱"Observable(這種Observable在創(chuàng)建后立即開(kāi)始發(fā)射數(shù)據(jù),即使沒(méi)有訂閱者),'concat'將不會(huì)看到也不會(huì)發(fā)射它之前發(fā)射的任何數(shù)據(jù)。
startwith操作符類似于'concat',但是它是插入到前面,而不是追加那些Observable的數(shù)據(jù)到原始Observable發(fā)射的數(shù)據(jù)序列:
merge操作符也差不多,它結(jié)合兩個(gè)或多個(gè)Observable的發(fā)射物,但是數(shù)據(jù)可能交錯(cuò),而concat不會(huì)讓多個(gè)Observable的發(fā)射物交錯(cuò)。

我們可以將兩個(gè)流連接起來(lái)形成一個(gè)新的流,這在某些情況下非常方便。讓我們通過(guò)編寫(xiě)一個(gè)簡(jiǎn)單的程序來(lái)看看它是如何工作的:
//------------------ Concat.cpp
#include "rxcpp/rx.hpp"
#include <iostream>
#include <array>
int main() {
auto o1 = rxcpp::observable<>::range(1, 3);
auto o2 = rxcpp::observable<>::range(4, 6);
auto values = o1.concat(o2);
values.
subscribe(
[](int v) {printf("OnNext: %d\n", v); },
[]() {printf("OnCompleted\n"); });
}
創(chuàng)建一個(gè)發(fā)射指定值的Observable
Just將單個(gè)數(shù)據(jù)轉(zhuǎn)換為發(fā)射那個(gè)數(shù)據(jù)的Observable:

//------------------ sixth.cpp
#include "rxcpp/rx.hpp"
#include <iostream>
#include <array>
int main() {
auto values = rxcpp::observable<>::just(1);
values.
subscribe(
[](int v) {printf("OnNext: %d\n", v); },
[]() {printf("OnCompleted\n"); });
}
Take
只發(fā)射前面的N項(xiàng)數(shù)據(jù)

使用take操作符讓你可以修改Observable的行為,只返回前面的N項(xiàng)數(shù)據(jù),然后發(fā)射完成通知,忽略剩余的數(shù)據(jù)。

如果你對(duì)一個(gè)Observable使用take(n)(或它的同義詞limit(n))操作符,而那個(gè)Observable發(fā)射的數(shù)據(jù)少于N項(xiàng),那么
take操作生成的Observable不會(huì)拋異常或發(fā)射onError通知,在完成前它只會(huì)發(fā)射相同的少量數(shù)據(jù)。
//------------------ fifth.cpp
#include "rxcpp/rx.hpp"
#include <iostream>
#include <array>
int main() {
auto values = rxcpp::observable<>::range(1); // infinite (until overflow) stream of integers
auto s1 = values.
take(3).
map([](int prime) { return std::make_tuple("1:", prime); });
auto s2 = values.
take(3).
map([](int prime) { return std::make_tuple("2:", prime); });
s1.
concat(s2).
subscribe(rxcpp::util::apply_to(
[](const char* s, int p) {
printf("%s %d\n", s, p);
}));
}
其中,apply_to可以替換成一下方式:
subscribe(
[](std::tuple<const char*, int> p) {
printf("%s, %d\n",std::get<0>(p),std::get<1>(p));
});
從可觀察的流取消訂閱
下面的程序顯示了如何訂閱一個(gè)可觀察流并停止訂閱。程序只顯示了可用的選項(xiàng),應(yīng)該參考文檔:
//---------------- Unsubscribe.cpp
#include "rxcpp/rx.hpp"
#include <iostream>
int main() {
auto subs = rxcpp::composite_subscription();
auto values = rxcpp::observable<>::range(1, 10);
values.subscribe(
subs,
[&subs](int v) {
printf("OnNext: %d\n", v);
if (v == 6)
subs.unsubscribe(); //-- Stop recieving events
},
[]() {printf("OnCompleted\n"); });
}
map
對(duì)Observable發(fā)射的每一項(xiàng)數(shù)據(jù)應(yīng)用一個(gè)函數(shù),執(zhí)行變換操作
map操作符對(duì)原始Observable發(fā)射的每一項(xiàng)數(shù)據(jù)應(yīng)用一個(gè)你選擇的函數(shù),然后返回一個(gè)發(fā)射這些結(jié)果的Observable。

大理石圖的頂部顯示了兩個(gè)時(shí)間線,這些時(shí)間線通過(guò)將第二個(gè)時(shí)間線的內(nèi)容附加到第一個(gè)時(shí)間線來(lái)組合在一起,形成一個(gè)復(fù)合時(shí)間線。
Map.cpp
//------------------ Map.cpp
#include "rxcpp/rx.hpp"
#include <iostream>
#include <array>
int main() {
auto ints = rxcpp::observable<>::range(1, 10).
map([](int n) {return n * n; });
ints.subscribe(
[](int v) {printf("OnNext: %d\n", v); },
[]() {printf("OnCompleted\n"); });
}
rxcpp(流)操作符
面向流處理的一個(gè)主要優(yōu)點(diǎn)是,我們可以將函數(shù)式編程原語(yǔ)應(yīng)用于它們。用rxcpp的話說(shuō),處理是使用操作符完成的。它們只是流上的過(guò)濾、轉(zhuǎn)換、聚合和規(guī)約。在前面的示例中,我們已經(jīng)了解了map、filter和take操作符的工作原理。
平均運(yùn)算符
平均運(yùn)算符從可觀察的流計(jì)算值的算術(shù)平均值。所支持的其他統(tǒng)計(jì)運(yùn)算符包括:
- 最小值
- 最大值
- 計(jì)數(shù)
- 求和
計(jì)算原始Observable發(fā)射數(shù)字的平均值并發(fā)射它

下面的程序只演示了平均操作符。對(duì)于前面列表中的其他操作符,模式是相同的Average.cpp
//----------- Average.cpp
#include "rxcpp/rx.hpp"
#include <iostream>
int main() {
auto values = rxcpp::observable<>::range(1, 20).average();
values.
subscribe(
[](double v) {printf("average: %lf\n", v); },
[]() {printf("OnCompleted\n"); });
}
掃描操作符
掃描操作符對(duì)流的每個(gè)元素依次應(yīng)用一個(gè)函數(shù),并將該值累積為種子值。下面的程序生成一系列數(shù)字的平均值,這些值是在何時(shí)累計(jì)的。
連續(xù)地對(duì)數(shù)據(jù)序列的每一項(xiàng)應(yīng)用一個(gè)函數(shù),然后連續(xù)發(fā)射結(jié)果

Scan操作符對(duì)原始Observable發(fā)射的第一項(xiàng)數(shù)據(jù)應(yīng)用一個(gè)函數(shù),然后將那個(gè)函數(shù)的結(jié)果作為自己的第一項(xiàng)數(shù)據(jù)發(fā)射。它將函數(shù)的結(jié)果同第二項(xiàng)數(shù)據(jù)一起填充給這個(gè)函數(shù)來(lái)產(chǎn)生它自己的第二項(xiàng)數(shù)據(jù)。它持續(xù)進(jìn)行這個(gè)過(guò)程來(lái)產(chǎn)生剩余的數(shù)據(jù)序列。這個(gè)操作符在某些情況下被叫做accumulator。

//----------- Scan.cpp
#include "rxcpp/rx.hpp"
#include <iostream>
int main() {
int count = 0;
auto values = rxcpp::observable<>::range(1, 20).
scan(
0,
[&count](int seed, int v) {
count++;
return seed + v;
});
values.subscribe(
[&](int v) {printf("Average through Scan: %f\n", (double)v / count); },
[]() {printf("OnCompleted\n"); });
}
通過(guò)管道組合運(yùn)算符
RxCpp庫(kù)允許您連接或組合運(yùn)算符以啟用運(yùn)算符組合。 該庫(kù)允許您使用管道(|)運(yùn)算符來(lái)組合運(yùn)算符,程序員可以將一個(gè)運(yùn)算符的輸出傳遞給另一個(gè)運(yùn)算符,就好像它們位于UNIX shell的命令行中一樣。 這使我們能夠理解一段代碼的作用。下面的程序使用| 運(yùn)算符以映射范圍。RxCpp樣品含有使用管功能的例子很多
//------------------ Map_With_Pipe.cpp
#include "rxcpp/rx.hpp"
namespace Rx {
using namespace rxcpp;
using namespace rxcpp::sources;
using namespace rxcpp::operators;
using namespace rxcpp::util;
}
using namespace Rx;
#include <iostream>
int main() {
auto ints = rxcpp::observable<>::range(1, 10) |
map([](int n) {return n * n; });
ints.subscribe(
[](int v) {printf("OnNext: %d\n", v); },
[]() {printf("OnCompleted\n"); });
}
以上的管道操作等價(jià)于:
auto ints = rxcpp::observable<>::range(1, 10);
auto intsFromMap = ints.map([](int n) {return n * n; });
調(diào)度
我們已經(jīng)在上一節(jié)中了解了Observables,Operators和Observers。我們已經(jīng)知道,在Observables和Observers之間,我們可以應(yīng)用標(biāo)準(zhǔn)的Rx運(yùn)算符來(lái)過(guò)濾和轉(zhuǎn)換Streams。在函數(shù)式編程的情況下,我們編寫(xiě)不可變函數(shù)(沒(méi)有副作用的函數(shù)),不可變性的結(jié)果是無(wú)序執(zhí)行的可能性。如果我們可以保證永遠(yuǎn)不會(huì)修改對(duì)運(yùn)算符的輸入,那么我們?cè)u(píng)估的順序無(wú)關(guān)緊要。由于Rx程序?qū)⒉倏v多個(gè)觀察者和訂閱者,我們可以將選擇執(zhí)行順序的任務(wù)委派給調(diào)度程序模塊。默認(rèn)情況下,RxCpp將在我們稱為訂閱者方法的線程中安排執(zhí)行。可以使用observe_on和subscriber_on運(yùn)算符指定不同的線程。此外,一些Observable運(yùn)算符將Scheduler作為參數(shù),其中執(zhí)行可以在Scheduler管理的線程中進(jìn)行。
該RxCpp庫(kù)支持以下兩種類型的調(diào)度:
- ImmediateScheduler
- EventLoopScheduler
ObserveOn
指定一個(gè)觀察者在哪個(gè)調(diào)度器上觀察這個(gè)Observable

很多ReactiveX實(shí)現(xiàn)都使用調(diào)度器 "
Scheduler"來(lái)管理多線程環(huán)境中Observable的轉(zhuǎn)場(chǎng)。你可以使用ObserveOn操作符指定Observable在一個(gè)特定的調(diào)度器上發(fā)送通知給觀察者 (調(diào)用觀察者的onNext, onCompleted, onError方法)。
注意:當(dāng)遇到一個(gè)異常時(shí)ObserveOn會(huì)立即向前傳遞這個(gè)onError終止通知,它不會(huì)等待慢速消費(fèi)的Observable接受任何之前它已經(jīng)收到但還沒(méi)有發(fā)射的數(shù)據(jù)項(xiàng)。這可能意味著onError通知會(huì)跳到(并吞掉)原始Observable發(fā)射的數(shù)據(jù)項(xiàng)前面,正如圖例上展示的。
RxCpp庫(kù)默認(rèn)是單線程的。 您可以將其配置為使用某些運(yùn)算符在多個(gè)線程中運(yùn)行:
//----------ObserveOn.cpp
#include "rxcpp/rx.hpp"
#include <iostream>
#include <thread>
int main() {
//---------------- Generate a range of values
//---------------- Apply Square function
auto values = rxcpp::observable<>::range(1, 4).
map([](int v) {
return v * v;
});
//------------- Emit the current thread details
std::cout << "Main Thread id => "
<< std::this_thread::get_id()
<< std::endl;
//---------- observe_on another thread....
//---------- make it blocking to
values.observe_on(rxcpp::synchronize_new_thread()).
as_blocking().
subscribe(
[](int v) {
std::cout << "Observable Thread id => "
<< std::this_thread::get_id()
<< " " << v << std::endl; },
[]() { std::cout << "OnCompleted" << std::endl; });
//------------------ Print the main thread details
std::cout << "Main Thread id => "
<< std::this_thread::get_id()
<< std::endl;
}
SubscribeOn
SubscribeOn操作符的作用類似,但它是用于指定Observable本身在特定的調(diào)度器上執(zhí)行,它同樣會(huì)在那個(gè)調(diào)度器上給觀察者發(fā)通知。

ObserveOn操作符的作用類似,但是功能很有限,它指示Observable在一個(gè)指定的調(diào)度器上給觀察者發(fā)通知,下面的程序?qū)⒀菔緎ubscribe_on方法的用法:
//---------- SubscribeOn.cpp
#include "rxcpp/rx.hpp"
#include "rxcpp/rx-test.hpp"
#include <iostream>
#include <thread>
#include <mutex>
std::mutex console_mutex;
void CTDetails(int val = 0) {
console_mutex.lock();
std::cout << "Current Thread id => "
<< std::this_thread::get_id()
<< val
<< std::endl;
console_mutex.unlock();
}
void Yield(bool y) {
if (y) { std::this_thread::yield(); }
}
int main() {
//----------- coordination object
auto coordination = rxcpp::serialize_new_thread();
//----------------- retrieve the worker
auto worker = coordination.create_coordinator().get_worker();
//-------------- Create an Obsrevable
auto values = rxcpp::observable<>::interval(std::chrono::milliseconds(50)).
take(5).
replay(coordination);
// Subscribe from the beginning
worker.schedule([&](const rxcpp::schedulers::schedulable&)
{
values.subscribe(
[](long v) {CTDetails(v); },
[]() { CTDetails(); });
});
// Wait before subscribing
worker.schedule(coordination.now() + std::chrono::milliseconds(125),
[&](const rxcpp::schedulers::schedulable&) {
values.subscribe(
[](long v) {CTDetails(v*v); },
[]() { CTDetails(); });
});
// Start emitting
worker.schedule([&](const rxcpp::schedulers::schedulable&) {
values.connect();
});
// Add blocking subscription to see results
values.as_blocking().subscribe();
}
flat/ concatmap的區(qū)別
開(kāi)發(fā)人員之間的一個(gè)混淆之處通常集中在flatmap和concatmap操作符上。它們之間的差異非常細(xì)微。
flatmap
FlatMap將一個(gè)發(fā)射數(shù)據(jù)的Observable變換為多個(gè)Observables,然后將它們發(fā)射的數(shù)據(jù)合并后放進(jìn)一個(gè)單獨(dú)的Observable

FlatMap操作符使用一個(gè)指定的函數(shù)對(duì)原始Observable發(fā)射的每一項(xiàng)數(shù)據(jù)執(zhí)行變換操作,這個(gè)函數(shù)返回一個(gè)本身也發(fā)射數(shù)據(jù)的Observable,然后FlatMap合并這些Observables發(fā)射的數(shù)據(jù),最后將合并后的結(jié)果當(dāng)做它自己的數(shù)據(jù)序列發(fā)射。
這個(gè)方法是很有用的,例如,當(dāng)你有一個(gè)這樣的Observable:它發(fā)射一個(gè)數(shù)據(jù)序列,這些數(shù)據(jù)本身包含Observable成員或者可以變換為Observable,因此你可以創(chuàng)建一個(gè)新的Observable發(fā)射這些次級(jí)Observable發(fā)射的數(shù)據(jù)的完整集合。
注意:
- FlatMap對(duì)這些Observables發(fā)射的數(shù)據(jù)做的是合并(merge)操作,因此它們可能是交錯(cuò)的。
- 如果任何一個(gè)通過(guò)這個(gè)flatMap操作產(chǎn)生的單獨(dú)的Observable調(diào)用onError異常終止了,這個(gè)Observable自身會(huì)立即調(diào)用onError并終止。
如下,flatmap將lambda應(yīng)用于可觀察流并生成一個(gè)新的可觀察流。生成的流合并在一起以提供輸出:
#include "rxcpp/rx.hpp"
#include <iostream>
namespace rxu = rxcpp::util;
#include <array>
#include <string>
//#include <tuple>
int main() {
std::array< std::string, 4 > a = { {"Praseed", "Peter", "Sanjay","Raju"} };
auto values = rxcpp::observable<>::iterate(a).flat_map(
[](std::string v) {
std::array<std::string, 3> salutation = { { "Mr." , "Monsieur" , "Sri" } };
return rxcpp::observable<>::iterate(salutation);
},
[](std::string f, std::string s) {
return s + " " + f;
});
values.subscribe([](std::string f) {
std::cout << f << std::endl; }, []() {std::cout << "Hello World.." << std::endl; });
}
concatmap
concatmap不會(huì)讓變換后的Observables發(fā)射的數(shù)據(jù)交錯(cuò),它按照嚴(yán)格的順序發(fā)射這些數(shù)據(jù)。

contact和merge
為了讓區(qū)別更清楚,讓我們看一下兩個(gè)操作符:concat和merge。讓我們來(lái)看看流的串聯(lián)是如何工作的。它基本上是一個(gè)接一個(gè)地添加流的內(nèi)容,保持順序:
#include "rxcpp/rx.hpp"
#include <iostream>
#include <array>
int main() {
auto o1 = rxcpp::observable<>::range(1, 3);
auto o2 = rxcpp::observable<>::range(4, 6);
auto values = o1.concat(o2);
values.
subscribe(
[](int v) {printf("OnNext: %d\n", v); },
[]() {printf("OnCompleted\n"); });
}

下面的云石圖清楚地顯示了當(dāng)我們合并兩個(gè)可觀察到的流時(shí)會(huì)發(fā)生什么。輸出隊(duì)列的內(nèi)容將是兩個(gè)流的交叉組合:
//---------------- Merge.cpp
#include "rxcpp/rx.hpp"
#include <iostream>
#include <array>
int main() {
auto o1 = rxcpp::observable<>::range(1, 3);
auto o2 = rxcpp::observable<>::from(4, 5, 6);
auto values = o1.merge(o2);
values.subscribe(
[](int v) {printf("OnNext: %d\n", v); }, []() {printf("OnCompleted\n"); });
}

flatmap和concatmap或多或少都執(zhí)行相同的操作。不同之處在于值的組合方式。flatmap使用merge操作符,而concatmap使用concact操作符。merge操作符順序無(wú)關(guān)緊要。concat操作符將可觀察值一個(gè)接一個(gè)地追加。這就是為什么按我們期望的順序得到值。
更多重要的操作符
現(xiàn)在我們了解了反應(yīng)性編程模型的關(guān)鍵,因?yàn)槲覀冇懻摿艘恍┗局黝},比如可觀察性、觀察者、操作符和調(diào)度程序。為了更好地編寫(xiě)邏輯,我們應(yīng)該了解更多的運(yùn)算符。
tap
tap是一個(gè)RxCPP管道操作符,返回與源可觀察相同的可觀察值,可用于執(zhí)行副作用,例如記錄源可觀察值發(fā)出的每個(gè)值。我們將探索tap操作符,它有助于查看流的內(nèi)容:
//----------- TapExample.cpp
#include "rxcpp/rx.hpp"
#include <iostream>
int main() {
//---- Create a mapped Observable
auto ints = rxcpp::observable<>::range(1, 3).
map([](int n) {return n * n; });
//---- Apply the tap operator...The Operator
//---- will act as a filter/debug operator
auto values = ints.
tap(
[](int v)
{printf("Tap - OnNext: %d\n", v); },
[]() {printf("Tap - OnCompleted\n");
});
//------- Do some action
values.
subscribe(
[](int v) {printf("Subscribe - OnNext: %d\n", v); },
[]() {printf("Subscribe - OnCompleted\n"); });
}
defer
直到有觀察者訂閱時(shí)才創(chuàng)建Observable,并且為每個(gè)觀察者創(chuàng)建一個(gè)新的Observable
defer操作符接受一個(gè)你選擇的Observable工廠函數(shù)作為單個(gè)參數(shù)。這個(gè)函數(shù)沒(méi)有參數(shù),返回一個(gè)Observable。
defer操作符會(huì)一直等待直到有觀察者訂閱它,然后它使用Observable工廠方法生成一個(gè)Observable。它對(duì)每個(gè)觀察者都這樣做,因此盡管每個(gè)訂閱者都以為自己訂閱的是同一個(gè)Observable,事實(shí)上每個(gè)訂閱者獲取的是它們自己的單獨(dú)的數(shù)據(jù)序列。
在某些情況下,等待直到最后一分鐘(就是知道訂閱發(fā)生時(shí))才生成Observable可以確保Observable包含最新的數(shù)據(jù)。

當(dāng)有人試圖連接到指定的可觀察對(duì)象時(shí),我們調(diào)用observable_factory lambda:
//----------- DeferExample.cpp
#include "rxcpp/rx.hpp"
#include <iostream>
int main() {
auto observable_factory = []() {
return rxcpp::observable<>::range(1, 3).
map([](int n) {return n * n; });
};
auto ints = rxcpp::observable<>::defer(observable_factory);
ints.
subscribe(
[](int v) {printf("OnNext: %d\n", v); },
[]() {printf("OnCompleted\n"); });
ints.
subscribe(
[](int v) {printf("2nd OnNext: %d\n", v); },
[]() {printf("2nd OnCompleted\n"); });
}
buffer
buffer操作符定期收集Observable的數(shù)據(jù)放進(jìn)一個(gè)數(shù)據(jù)包裹,然后發(fā)射這些數(shù)據(jù)包裹,而不是一次發(fā)射一個(gè)值。

buffer操作符將一個(gè)Observable變換為另一個(gè),原來(lái)的Observable正常發(fā)射數(shù)據(jù),變換產(chǎn)生的Observable發(fā)射這些數(shù)據(jù)的緩存集合。Buffer操作符在很多語(yǔ)言特定的實(shí)現(xiàn)中有很多種變體,它們?cè)谌绾尉彺孢@個(gè)問(wèn)題上存在區(qū)別。
注意:如果原來(lái)的Observable發(fā)射了一個(gè)onError通知,Buffer會(huì)立即傳遞這個(gè)通知,而不是首先發(fā)射緩存的數(shù)據(jù),即使在這之前緩存中包含了原始Observable發(fā)射的數(shù)據(jù)。
Window操作符與Buffer類似,但是它在發(fā)射之前把收集到的數(shù)據(jù)放進(jìn)單獨(dú)的Observable,而不是放進(jìn)一個(gè)數(shù)據(jù)結(jié)構(gòu)。詳見(jiàn):https://mcxiaoke.gitbooks.io/rxdocs/content/operators/Buffer.html
buffer操作符發(fā)出一個(gè)包含一個(gè)可觀察對(duì)象的非重疊內(nèi)容的可觀察對(duì)象,每個(gè)可觀察對(duì)象最多包含count參數(shù)指定的項(xiàng)數(shù)。這將幫助我們以適合內(nèi)容的方式處理項(xiàng)目:
//----------- BufferExample.cpp
#include "rxcpp/rx.hpp"
#include <iostream>
int main() {
auto values = rxcpp::observable<>::range(1, 10).buffer(3);
values.
subscribe(
[](std::vector<int> v) {
printf("OnNext:{");
std::for_each(v.begin(), v.end(), [](int a) {
printf(" %d", a);
});
printf("}\n");
},
[]() {printf("OnCompleted\n"); });
}
timer
創(chuàng)建一個(gè)Observable,它在一個(gè)給定的延遲后發(fā)射一個(gè)特殊的值。

這個(gè)函數(shù)在庫(kù)中有不同的版本,timer操作符默認(rèn)在computation調(diào)度器上執(zhí)行。有一個(gè)變體可以通過(guò)可選參數(shù)指定Scheduler:
//----------- TimerExample.cpp
#include "rxcpp/rx.hpp"
#include "rxcpp/rx-test.hpp"
#include <iostream>
#include <thread>
#include <chrono>
int main() {
auto scheduler = rxcpp::observe_on_new_thread();
auto period = std::chrono::seconds(3);
auto values = rxcpp::observable<>::timer(period, scheduler).
finally([]() {
std::cout << "The final action, thread id: " << std::this_thread::get_id() << std::endl;
});
values.
as_blocking().
subscribe(
[](int v) { std::cout << "OnNext: " << v << "thread id: " << std::this_thread::get_id() << std::endl; },
[]() {std::cout << "OnCompleted, thread id: " << std::this_thread::get_id() << std::endl; });
std::cout << "main thread id: " << std::this_thread::get_id() << std::endl;
//必須在主線程sleep,否則finally中的內(nèi)容打印不全,因?yàn)槌绦蛲顺龊螅€程銷毀
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}