一、Storm簡(jiǎn)介
Storm是一個(gè)免費(fèi)并開源的分布式實(shí)時(shí)計(jì)算系統(tǒng)。利用Storm可以很容易做到可靠地處理無(wú)限的數(shù)據(jù)流,像Hadoop批量處理大數(shù)據(jù)一樣,Storm可以實(shí)時(shí)處理數(shù)據(jù)。
Storm 很簡(jiǎn)單,可用于任意編程語(yǔ)言。Apache Storm 采用 Clojure 開發(fā)。Storm 有很多應(yīng)用場(chǎng)景,包括實(shí)時(shí)數(shù)據(jù)分析、聯(lián)機(jī)學(xué)習(xí)、持續(xù)計(jì)算、分布式 RPC、ETL 等。
Hadoop(大數(shù)據(jù)分析領(lǐng)域無(wú)可爭(zhēng)辯的王者)專注于批處理?這種模型對(duì)許多情形(比如為網(wǎng)頁(yè)建立索引)已經(jīng)足夠,但還存在其他一些使用模型,它們需要來(lái)自高度動(dòng)態(tài)的來(lái)源的實(shí)時(shí)信息?為了解決這個(gè)問(wèn)題,就得借助 Nathan Marz 推出的 storm(現(xiàn)在已經(jīng)被Apache孵化)storm 不處理靜態(tài)數(shù)據(jù),但它處理連續(xù)的流數(shù)據(jù)。
storm特點(diǎn):
- 編程簡(jiǎn)單:開發(fā)人員只需要關(guān)注應(yīng)用邏輯,而且跟Hadoop類似,Storm提供的編程原語(yǔ)也很簡(jiǎn)單
- 高性能,低延遲:可以應(yīng)用于廣告搜索引擎這種要求對(duì)廣告主的操作進(jìn)行實(shí)時(shí)響應(yīng)的場(chǎng)景。
- 分布式:可以輕松應(yīng)對(duì)數(shù)據(jù)量大,單機(jī)搞不定的場(chǎng)景
- 可擴(kuò)展: 隨著業(yè)務(wù)發(fā)展,數(shù)據(jù)量和計(jì)算量越來(lái)越大,系統(tǒng)可水平擴(kuò)展
- 容錯(cuò):?jiǎn)蝹€(gè)節(jié)點(diǎn)掛了不影響應(yīng)用
- 消息不丟失:保證消息處理
storm與hadoop的比較:
1.Storm用于實(shí)時(shí)計(jì)算,Hadoop用于離線計(jì)算。
2. Storm處理的數(shù)據(jù)保存在內(nèi)存中,源源不斷;Hadoop處理的數(shù)據(jù)保存在文件系統(tǒng)中,一批一批。
Storm的數(shù)據(jù)通過(guò)網(wǎng)絡(luò)傳輸進(jìn)來(lái);Hadoop的數(shù)據(jù)保存在磁盤中。
Storm與Hadoop的編程模型相似
| 結(jié)構(gòu) | Hadoop | Storm |
|---|---|---|
| 主節(jié)點(diǎn) | JobTracker | Nimbus |
| 從節(jié)點(diǎn) | TaskTracker | Supervisor |
| 應(yīng)用程序 | Job | Topology |
| 工作進(jìn)程名稱 | Child | Worker |
| 計(jì)算模型 | Map / Reduce | Spout / Bolt |
二、Storm集群架構(gòu)
Storm集群采用主從架構(gòu)方式,主節(jié)點(diǎn)是Nimbus,從節(jié)點(diǎn)是Supervisor,有關(guān)調(diào)度相關(guān)的信息存儲(chǔ)到ZooKeeper集群中,架構(gòu)如下圖所示:

Nimbus
Storm集群的Master節(jié)點(diǎn),負(fù)責(zé)分發(fā)用戶代碼,指派給具體的Supervisor節(jié)點(diǎn)上的Worker節(jié)點(diǎn),去運(yùn)行Topology對(duì)應(yīng)的組件(Spout/Bolt)的Task。
Supervisor
Storm集群的從節(jié)點(diǎn),負(fù)責(zé)管理運(yùn)行在Supervisor節(jié)點(diǎn)上的每一個(gè)Worker進(jìn)程的啟動(dòng)和終止。通過(guò)Storm的配置文件中的supervisor.slots.ports配置項(xiàng),可以指定在一個(gè)Supervisor上最大允許多少個(gè)Slot,每個(gè)Slot通過(guò)端口號(hào)來(lái)唯一標(biāo)識(shí),一個(gè)端口號(hào)對(duì)應(yīng)一個(gè)Worker進(jìn)程(如果該Worker進(jìn)程被啟動(dòng))。
Worker
運(yùn)行具體處理組件邏輯的進(jìn)程。Worker運(yùn)行的任務(wù)類型只有兩種,一種是Spout任務(wù),一種是Bolt任務(wù)。
Task
worker中每一個(gè)spout/bolt的線程稱為一個(gè)task. 在storm0.8之后,task不再與物理線程對(duì)應(yīng),不同spout/bolt的task可能會(huì)共享一個(gè)物理線程,該線程稱為executor。
ZooKeeper
用來(lái)協(xié)調(diào)Nimbus和Supervisor,如果Supervisor因故障出現(xiàn)問(wèn)題而無(wú)法運(yùn)行Topology,Nimbus會(huì)第一時(shí)間感知到,并重新分配Topology到其它可用的Supervisor上運(yùn)行
三、Storm編程模型
Strom在運(yùn)行中可分為spout與bolt兩個(gè)組件,其中,數(shù)據(jù)源從spout開始,數(shù)據(jù)以tuple的方式發(fā)送到bolt,多個(gè)bolt可以串連起來(lái),一個(gè)bolt也可以接入多個(gè)spot/bolt.運(yùn)行時(shí)原理如下圖:

Topology:Storm中運(yùn)行的一個(gè)實(shí)時(shí)應(yīng)用程序的名稱。將 Spout、 Bolt整合起來(lái)的拓?fù)鋱D。定義了 Spout和 Bolt的結(jié)合關(guān)系、并發(fā)數(shù)量、配置等等。
Spout:在一個(gè)topology中獲取源數(shù)據(jù)流的組件。通常情況下spout會(huì)從外部數(shù)據(jù)源中讀取數(shù)據(jù),然后轉(zhuǎn)換為topology內(nèi)部的源數(shù)據(jù)。
Bolt:接受數(shù)據(jù)然后執(zhí)行處理的組件,用戶可以在其中執(zhí)行自己想要的操作。
Tuple:一次消息傳遞的基本單元,理解為一組消息就是一個(gè)Tuple。
Stream:Tuple的集合。表示數(shù)據(jù)的流向。
四、Topology運(yùn)行
在Storm中,一個(gè)實(shí)時(shí)應(yīng)用的計(jì)算任務(wù)被打包作為Topology發(fā)布,這同Hadoop的MapReduce任務(wù)相似。但是有一點(diǎn)不同的是:在Hadoop中,MapReduce任務(wù)最終會(huì)執(zhí)行完成后結(jié)束;而在Storm中,Topology任務(wù)一旦提交后永遠(yuǎn)不會(huì)結(jié)束,除非你顯示去停止任務(wù)。計(jì)算任務(wù)Topology是由不同的Spouts和Bolts,通過(guò)數(shù)據(jù)流(Stream)連接起來(lái)的圖?一個(gè)Storm在集群上運(yùn)行一個(gè)Topology時(shí),主要通過(guò)以下3個(gè)實(shí)體來(lái)完成Topology的執(zhí)行工作:
(1). Worker(進(jìn)程)
(2). Executor(線程)
(3). Task
下圖簡(jiǎn)要描述了這3者之間的關(guān)系:

1個(gè)worker進(jìn)程執(zhí)行的是1個(gè)topology的子集(注:不會(huì)出現(xiàn)1個(gè)worker為多個(gè)topology服務(wù))。1個(gè)worker進(jìn)程會(huì)啟動(dòng)1個(gè)或多個(gè)executor線程來(lái)執(zhí)行1個(gè)topology的component(spout或bolt)。因此,1個(gè)運(yùn)行中的topology就是由集群中多臺(tái)物理機(jī)上的多個(gè)worker進(jìn)程組成的。
executor是1個(gè)被worker進(jìn)程啟動(dòng)的單獨(dú)線程。每個(gè)executor只會(huì)運(yùn)行1個(gè)topology的1個(gè)component(spout或bolt)的task(注:task可以是1個(gè)或多個(gè),storm默認(rèn)是1個(gè)component只生成1個(gè)task,executor線程里會(huì)在每次循環(huán)里順序調(diào)用所有task實(shí)例)。
task是最終運(yùn)行spout或bolt中代碼的單元(注:1個(gè)task即為spout或bolt的1個(gè)實(shí)例,executor線程在執(zhí)行期間會(huì)調(diào)用該task的nextTuple或execute方法)。topology啟動(dòng)后,1個(gè)component(spout或bolt)的task數(shù)目是固定不變的,但該component使用的executor線程數(shù)可以動(dòng)態(tài)調(diào)整(例如:1個(gè)executor線程可以執(zhí)行該component的1個(gè)或多個(gè)task實(shí)例)。這意味著,對(duì)于1個(gè)component存在這樣的條件:#threads<=#tasks(即:線程數(shù)小于等于task數(shù)目)。默認(rèn)情況下task的數(shù)目等于executor線程數(shù)目,即1個(gè)executor線程只運(yùn)行1個(gè)task。
總體的Topology處理流程圖為:

下圖是Storm的數(shù)據(jù)交互圖,可以看出兩個(gè)模塊Nimbus和Supervisor之間沒(méi)有直接交互。狀態(tài)都是保存在Zookeeper上,Worker之間通過(guò)Netty傳送數(shù)據(jù)。Storm與Zookeeper之間的交互過(guò)程,暫時(shí)不細(xì)說(shuō)了。重要的一點(diǎn):storm所有的元數(shù)據(jù)信息保存在Zookeeper中!

五、Storm Streaming Grouping
Storm中最重要的抽象,應(yīng)該就是Stream grouping了,它能夠控制Spot/Bolt對(duì)應(yīng)的Task以什么樣的方式來(lái)分發(fā)Tuple,將Tuple發(fā)射到目的Spot/Bolt對(duì)應(yīng)的Task

目前,Storm Streaming Grouping支持如下幾種類型:
Shuffle Grouping :隨機(jī)分組,盡量均勻分布到下游Bolt中
將流分組定義為混排。這種混排分組意味著來(lái)自Spout的輸入將混排,或隨機(jī)分發(fā)給此Bolt中的任務(wù)。shuffle grouping對(duì)各個(gè)task的tuple分配的比較均勻。
Fields Grouping :按字段分組,按數(shù)據(jù)中field值進(jìn)行分組;相同field值的Tuple被發(fā)送到相同的Task
這種grouping機(jī)制保證相同field值的tuple會(huì)去同一個(gè)task,這對(duì)于WordCount來(lái)說(shuō)非常關(guān)鍵,如果同一個(gè)單詞不去同一個(gè)task,那么統(tǒng)計(jì)出來(lái)的單詞次數(shù)就不對(duì)了?!癷f the stream is grouped by the “user-id” field, tuples with the same “user-id” will alwaysGo to the same task”. —— 小示例
**All grouping **:廣播
廣播發(fā)送, 對(duì)于每一個(gè)tuple將會(huì)復(fù)制到每一個(gè)bolt中處理。
Global grouping :全局分組,Tuple被分配到一個(gè)Bolt中的一個(gè)Task,實(shí)現(xiàn)事務(wù)性的Topology。
Stream中的所有的tuple都會(huì)發(fā)送給同一個(gè)bolt任務(wù)處理,所有的tuple將會(huì)發(fā)送給擁有最小task_id的bolt任務(wù)處理。
None grouping :不分組
不關(guān)注并行處理負(fù)載均衡策略時(shí)使用該方式,目前等同于shuffle grouping,另外storm將會(huì)把bolt任務(wù)和他的上游提供數(shù)據(jù)的任務(wù)安排在同一個(gè)線程下。
**Direct grouping **:直接分組 指定分組
由tuple的發(fā)射單元直接決定tuple將發(fā)射給那個(gè)bolt,一般情況下是由接收tuple的bolt決定接收哪個(gè)bolt發(fā)射的Tuple。這是一種比較特別的分組方法,用這種分組意味著消息的發(fā)送者指定由消息接收者的哪個(gè)task處理這個(gè)消息。 只有被聲明為Direct Stream的消息流可以聲明這種分組方法。而且這種消息tuple必須使用emitDirect方法來(lái)發(fā)射。消息處理者可以通過(guò)TopologyContext來(lái)獲取處理它的消息的taskid (OutputCollector.emit方法也會(huì)返回taskid)。
另外,Storm還提供了用戶自定義Streaming Grouping接口,如果上述Streaming Grouping都無(wú)法滿足實(shí)際業(yè)務(wù)需求,也可以自己實(shí)現(xiàn),只需要實(shí)現(xiàn)backtype.storm.grouping.CustomStreamGrouping接口,該接口重定義了如下方法:
List chooseTasks(int taskId, List values)
上面幾種Streaming Group的內(nèi)置實(shí)現(xiàn)中,最常用的應(yīng)該是Shuffle Grouping、Fields Grouping、Direct Grouping這三種,使用其它的也能滿足特定的應(yīng)用需求。
六、可靠性
(1)、spout的可靠性
spout會(huì)記錄它所發(fā)射出去的tuple,當(dāng)下游任意一個(gè)bolt處理失敗時(shí)spout能夠重新發(fā)射該tuple。在spout的nextTuple()發(fā)送一個(gè)tuple時(shí),為實(shí)現(xiàn)可靠消息處理需要給每個(gè)spout發(fā)出的tuple帶上唯一ID,并將該ID作為參數(shù)傳遞給SpoutOutputCollector的emit()方法:collector.emit(new Values("value1","value2"), tupleID);
實(shí)際上Values extends ArrayList<Object>
保障過(guò)程中,每個(gè)bolt每收到一個(gè)tuple,都要向上游應(yīng)答或報(bào)錯(cuò),在tuple樹上的所有bolt都確認(rèn)應(yīng)答,spout才會(huì)隱式調(diào)用ack()方法表明這條消息(一條完整的流)已經(jīng)處理完畢,將會(huì)對(duì)編號(hào)ID的消息應(yīng)答確認(rèn);處理報(bào)錯(cuò)、超時(shí)則會(huì)調(diào)用fail()方法。
(2)、bolt的可靠性
bolt的可靠消息處理機(jī)制包含兩個(gè)步驟:
a、當(dāng)發(fā)射衍生的tuple,需要錨定讀入的tuple
b、當(dāng)處理消息時(shí),需要應(yīng)答或報(bào)錯(cuò)
可以通過(guò)OutputCollector中emit()的一個(gè)重載函數(shù)錨定或tuple:collector.emit(tuple, new Values(word)); 并且需要調(diào)用一次this.collector.ack(tuple)應(yīng)答。
七、總結(jié)
最后再來(lái)梳理一下Storm中涉及的主要概念
1.拓?fù)?Topology):打包好的實(shí)時(shí)應(yīng)用計(jì)算任務(wù),同Hadoop的MapReduce任務(wù)相似。
2.元組(Tuple):是Storm提供的一個(gè)輕量級(jí)的數(shù)據(jù)格式,可以用來(lái)包裝你需要實(shí)際處理的數(shù)據(jù)。
3.流(Streams):數(shù)據(jù)流(Stream)是Storm中對(duì)數(shù)據(jù)進(jìn)行的抽象,它是時(shí)間上無(wú)界的tuple元組序列(無(wú)限的元組序列)?
4.Spout(噴嘴):Storm中流的來(lái)源。Spout從外部數(shù)據(jù)源,如消息隊(duì)列中讀取元組數(shù)據(jù)并吐到拓?fù)淅铩?br>
5.Bolts:在拓?fù)渲兴械挠?jì)算邏輯都是在Bolt中實(shí)現(xiàn)的。
6.任務(wù)(Tasks):每個(gè)Spout和Bolt會(huì)以多個(gè)任務(wù)(Task)的形式在集群上運(yùn)行。
7.組件(Component):是對(duì)Bolt和Spout的統(tǒng)稱。
8.流分組(Stream groupings):流分組定義了一個(gè)流在一個(gè)消費(fèi)它的Bolt內(nèi)的多個(gè)任務(wù)(task)之間如何分組。
9.可靠性(Reliability):Storm保證了拓?fù)渲蠸pout產(chǎn)生的每個(gè)元組都會(huì)被處理。
10.Workers(工作進(jìn)程):拓?fù)湟砸粋€(gè)或多個(gè)Worker進(jìn)程的方式運(yùn)行。每個(gè)Worker進(jìn)程是一個(gè)物理的Java虛擬機(jī),執(zhí)行拓?fù)涞囊徊糠秩蝿?wù)。
11.Executor(線程):是1個(gè)被worker進(jìn)程啟動(dòng)的單獨(dú)線程。每個(gè)executor只會(huì)運(yùn)行1個(gè)topology的1個(gè)component。
12.Nimbus:Storm集群的Master節(jié)點(diǎn),負(fù)責(zé)分發(fā)用戶代碼,指派給具體的Supervisor節(jié)點(diǎn)上的Worker節(jié)點(diǎn),去運(yùn)行Topology對(duì)應(yīng)的組件(Spout/Bolt)的Task。
13.Supervisor:Storm集群的從節(jié)點(diǎn),負(fù)責(zé)管理運(yùn)行在Supervisor節(jié)點(diǎn)上的每一個(gè)Worker進(jìn)程的啟動(dòng)和終止。
參考:
http://www.cnblogs.com/swanspouse/p/5135679.html