原文鏈接Storm Tutorial
本人原創(chuàng)翻譯,轉(zhuǎn)載請注明出處
這個教程內(nèi)容包含如何創(chuàng)建topologies及部署到Storm集群上。Java是主要使用語言,但有的例子使用了Python,主要是為了解釋Storm的多語言能力。
前言
本教程使用的例子來自storm-starter project。建議clone該project并照著練習(xí)。閱讀Setting up a development environment和 Creating a new Storm project以使你的計算機具備開始條件。
這兩篇文章本人已翻譯,請閱Storm(一)打造開發(fā)環(huán)境&創(chuàng)建一個Storm項目
Storm集群的組件
表面上看Storm集群和Hadoop集群有些像。在Hadoop上運行的是"MapReduce jobs",而在Storm上運行的是topologies。"Jobs" 和 "topologies"大不相同,有一個關(guān)鍵不同就是MapReduce job最終會停止,而topology永不停止(除非被用戶kill掉)。
Storm集群有兩類節(jié)點:master節(jié)點和worker節(jié)點。master節(jié)點上運行著一個守護程序 "Nimbus"(和Hadoop的 "JobTracker"有些像)。Nimbus負責(zé)在集群中散布code,給各個機器分配任務(wù)以及監(jiān)控失敗的情況。每個worker節(jié)點上也運行著一個守護程序"Supervisor"。Supervisor負責(zé)接收Nimbus分配的任務(wù),按需啟動和停止worker進程。每個worker進程執(zhí)行了一個topology的子集。一個運行中的topology包含了多個worker,這些worker分布在多個機器上。

所有Nimbus和Supervisors的協(xié)調(diào)都通過Zookeeper集群進行。此外,Nimbus和Supervisors是立即失敗和無狀態(tài)的(fail-fast and stateless)。所有的狀態(tài)都保存在Zookeeper或本地硬盤上。這意味著即使通過kill -9 殺死Nimbus和Supervisors,他們也會自動恢復(fù),這個設(shè)計給了Storm集群難以置信的穩(wěn)定性。
Topologies
要利用Storm來進行實時計算,就要創(chuàng)建Topologies。一個topology是一個計算的圖,topology 中的每個節(jié)點包含處理邏輯,topology 中的每個邊(link)指明了數(shù)據(jù)如何在節(jié)點中傳遞。運行一個topology很簡單,首先打包code和依賴到j(luò)ar文件,然后執(zhí)行以下命令:
storm jar all-my-code.jar org.apache.storm.MyTopology arg1 arg2
這樣就啟動了org.apache.storm.MyTopology類,參數(shù)是arg1、arg2,這個類的主函數(shù)定義了 topology并提交到Nimbus。由于topology是Thrift structs,而且Nimbus是Thrift service,因此可以用任意的編程語言創(chuàng)建和提交topologies。這里舉的是基于JVM語言的最簡單的例子,更多信息請閱Running topologies on a production cluster
Streams
Storm 中最核心的抽象概念是"stream"。stream是元組(tuples)的無限序列,Storm提供了一種分布式、可靠的方式來將一個stream轉(zhuǎn)化成一個新的stream。舉個例子,你可以將一個tweets stream轉(zhuǎn)化成一個trending topics stream。
Storm提供了"spouts" 和 "bolts"來完成stream的轉(zhuǎn)化。通過實現(xiàn)Spouts和bolts的接口,你可以運行應(yīng)用相關(guān)的邏輯。
spout是stream的來源,舉個例子,spout可以從Kestrel隊列中讀取tuples并生成stream,或者spout也可以通過Twitter API生成一個tweets stream。
bolt可以消費任意數(shù)量的輸入stream,做一些處理,很可能拋出新的stream。類似將tweets stream轉(zhuǎn)化成trending topics stream這樣的復(fù)雜轉(zhuǎn)化,常常需要多個步驟,對應(yīng)著多個bolt。Bolts可以做的事情很多,比如run functions, filter tuples, do streaming aggregations, do streaming joins, talk to databases等等。
spouts和bolts的網(wǎng)絡(luò)被打包成了"topology",這是你提交到Storm集群執(zhí)行的最高級別的抽象。 topology是一個stream轉(zhuǎn)化的圖,圖的節(jié)點是spout或bolt,圖的邊指明了哪個bolt在訂閱哪個stream。當一個spout 或 bolt發(fā)出一個tuple到一個stream,那么訂閱了這個stream的所有bolt都會收到這個tuple。

Storm topology中的每個節(jié)點都是并行運行,在你的topology中,你可以指定各個節(jié)點的并行程度,Storm會按照你指定的數(shù)目在集群中啟動相應(yīng)數(shù)量的線程。
topology會永久執(zhí)行下去,除非你停止它。Storm會自動重新分配失敗的任務(wù),此外,Storm保證數(shù)據(jù)不會丟失,即便機器宕機并且messages are dropped。
Data model
Storm的數(shù)據(jù)模型是tuple。tuple是一個命名list,字段可以是任意類型的對象。Storm支持所有primitive types, strings, and byte arrays作為tuple的字段值。如果要使用其他類型的對象,只需要實現(xiàn)serializer。
topology的每個節(jié)點都需要定義輸出的tuple字段。如下的例子中,bolt定義了兩個帶有 "double"和"triple"字段的tuple。
public class DoubleAndTripleBolt extends BaseRichBolt {
private OutputCollectorBase _collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) {
_collector = collector;
}
@Override
public void execute(Tuple input) {
int val = input.getInteger(0);
_collector.emit(input, new Values(val*2, val*3));
_collector.ack(input);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("double", "triple"));
}
}
一個簡單的topology
看一下storm-starter中ExclamationTopology的定義:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("words", new TestWordSpout(), 10);
builder.setBolt("exclaim1", new ExclamationBolt(), 3)
.shuffleGrouping("words");
builder.setBolt("exclaim2", new ExclamationBolt(), 2)
.shuffleGrouping("exclaim1");
這個Topology包含一個spout和兩個bolt,spout發(fā)出words,bolt在輸入的string后面加上"!!!"。節(jié)點被組織成一條直線:spout發(fā)出到第一個bolt,然后到第二個bolt。如果spout發(fā)出了tuples ["bob"]和["john"],那么第二個bolt會發(fā)出 ["bob!!!!!!"]和["john!!!!!!"]。
setSpout 和 setBolt方法定義了節(jié)點,第一個參數(shù)是用戶定義的ID,第二個參數(shù)是處理邏輯對象,第三個參數(shù)是并發(fā)線程數(shù)。
spout的處理邏輯對象實現(xiàn)了IRichSpout接口,bolt的處理邏輯對象實現(xiàn)了IRichBolt接口。最后一個參數(shù)是可選的,如果不指定,Storm只分配一個線程。
setBolt返回一個InputDeclarer對象,這個對象定義了bolt的輸入。這里組件exclaim1聲明了它要讀取所有組件words發(fā)出的tuples。組件exclaim2聲明了它要讀取所有組件exclaim1發(fā)出的tuples。"shuffle grouping"會被隨機的分配到bolt。組件之間有多種分組數(shù)據(jù)的方式,如果想要組件exclaim2同時讀取組件words和exclaim1的tuples,可以像這樣定義:
builder.setBolt("exclaim2", new ExclamationBolt(), 5)
.shuffleGrouping("words")
.shuffleGrouping("exclaim1");
接下來看看這個topology的spout和bolt的實現(xiàn)。spout負責(zé)發(fā)出數(shù)據(jù)到topology。TestWordSpout每隔100ms從["nathan", "mike", "jackson", "golda", "bertels"]中發(fā)出隨機word作為一個tuple,TestWordSpout的nextTuple()實現(xiàn)如下:
public void nextTuple() {
Utils.sleep(100);
final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
final Random rand = new Random();
final String word = words[rand.nextInt(words.length)];
_collector.emit(new Values(word));
}
ExclamationBolt的實現(xiàn)如下:
public static class ExclamationBolt implements IRichBolt {
OutputCollector _collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
@Override
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
prepare方法給bolt提供了一個OutputCollector,用來從這個bolt發(fā)出tuple。Tuples可以在任何時候發(fā)出——包括prepare, execute, 或cleanup方法,甚至在另一個線程中異步發(fā)出。這里prepare方法只是保存OutputCollector為實例變量,后面execute方法會用到。
execute方法接收一個tuple,ExclamationBolt抓取tuple的第一個字段并在后面加上"!!!"。如果bolt訂閱了多個輸入源,可以通過Tuple#getSourceComponent方法查詢tuple來源。
輸入tuple被作為了emit的第一個參數(shù),最后一行ack了輸入tuple。這些是Storm可靠性API的一部分,用以保證沒有數(shù)據(jù)丟失,本教程后續(xù)會進一步介紹。
cleanup方法在bolt停止的時候調(diào)用,在這里應(yīng)該關(guān)閉所有打開的資源。Storm集群不保證一定會調(diào)用這個方法:例如,if the machine the task is running on blows up, there's no way to invoke the method. cleanup方法適用于local模式,你可以運行和停止許多topologies,不必擔(dān)心資源泄露。
declareOutputFields方法聲明了ExclamationBolt發(fā)出帶一個名為"word"字段的1-tuples。
getComponentConfiguration方法允許你配置組件運行的參數(shù)。這是一個更高級的主題Configuration
在一個bolt的實現(xiàn)中,像cleanup 和 getComponentConfiguration這樣的方法常常不需要。你可以通過繼承提供了默認實現(xiàn)的基類來更簡潔的實現(xiàn)bolt。例如:
public static class ExclamationBolt extends BaseRichBolt {
OutputCollector _collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
@Override
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
在local模式下運行ExclamationTopology
在local模式中,Storm完全在一個進程內(nèi)運行,worker通過線程模擬,主要用于測試和開發(fā)場景。當你運行storm-starte中的topologies,他們將在local模式下運行,你能夠看到每個組件正在發(fā)出的消息。
更多l(xiāng)ocal模式信息請閱Local mode
更多分布式模式信息請閱running topologies in local mode on Local mode
如下是local模式下運行ExclamationTopology的代碼:
Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(2);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();
首先,通過創(chuàng)建LocalCluster對象定義了一個進程內(nèi)的集群。提交topologies到這個虛擬集群和提交到分布式集群是完全一樣的操作。submitTopology方法的第一個參數(shù)是topology的名字,第二個參數(shù)是topology的配置,第三個參數(shù)是topology對象。
這里topology的配置很常見:
TOPOLOGY_WORKERS (由setNumWorkers設(shè)置) 指定了你想分配多少進程來執(zhí)行這個topology。 topology中的每個組件將以線程的形式執(zhí)行,線程的數(shù)量由setBolt 和 setSpout配置。
TOPOLOGY_DEBUG (由setDebug設(shè)置) 當設(shè)置為true時, Storm記錄組件發(fā)出的每個消息。
更多配置信息請閱the Javadoc for Config.
Stream groupings
stream grouping用于描述組件之間如何發(fā)送tuple。在集群中,spouts 和 bolts總是并行執(zhí)行任務(wù),在執(zhí)行任務(wù)層面上,一個topology看起來如下圖所示:

當Bolt A的一個任務(wù)發(fā)出tuple到Bolt B時,應(yīng)該發(fā)到Bolt B的哪個任務(wù)呢?
stream groupings就是用來解決這個問題。在深入了解不同種類的stream groupings之前,我們先看看storm-starter中的另一個topology。WordCountTopology從spout中讀取句子,WordCountBolt統(tǒng)計單詞出現(xiàn)的次數(shù)。
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("sentences", new RandomSentenceSpout(), 5);
builder.setBolt("split", new SplitSentence(), 8)
.shuffleGrouping("sentences");
builder.setBolt("count", new WordCount(), 12)
.fieldsGrouping("split", new Fields("word"));
SplitSentence發(fā)出句子中的單詞作為tuple,WordCount以map的形式存儲單詞出現(xiàn)的次數(shù),每次WordCount收到單詞,就更新map并發(fā)出新的單詞數(shù)目。
最簡單的grouping方式是"shuffle grouping",也就是隨機發(fā)送tuple給任務(wù)。"fields grouping"是一個更有趣的grouping方式,這里用在了SplitSentence bolt 和 WordCount bolt之間。對WordCount bolt來說,相同的單詞應(yīng)該發(fā)送到相同的任務(wù),否則多個任務(wù)都會收到同樣的單詞,由于每個任務(wù)的信息都不完全,他們可能會發(fā)出錯誤的單詞數(shù)目。fields grouping用字段的子集來分組,字段值相同的tuple被發(fā)送到相同的任務(wù)。
Fields groupings是實現(xiàn)streaming joins 和 streaming aggregations,它的底層實現(xiàn)利用了mod hashing。還有一些其他的分組方式,請閱Concepts。
使用其他編程語言定義Bolts
Bolts可以使用任意語言定義,用JVM-based語言以外定義的Bolts以子進程的方式運行,Storm以stdin/stdout之上的JSON格式消息與子進程通信。通信協(xié)議用到了一個100行左右的適配器庫,支持Ruby, Python, Fancy。
WordCountTopology的SplitSentence定義如下:
public static class SplitSentence extends ShellBolt implements IRichBolt {
public SplitSentence() {
super("python", "splitsentence.py");
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
SplitSentence重寫了ShellBolt,聲明它使用python,參數(shù)是splitsentence.py。splitsentence.py實現(xiàn)如下:
import storm
class SplitSentenceBolt(storm.BasicBolt):
def process(self, tup):
words = tup.values[0].split(" ")
for word in words:
storm.emit([word])
SplitSentenceBolt().run()
更多多語言信息,請閱Using non-JVM languages with Storm
確保消息被處理
這部分屬于Storm's reliability API的內(nèi)容:Storm如何保證spout發(fā)出的消息都能被處理,請閱Guaranteeing Message Processing
Transactional topologies
Storm保證每個消息都至少被處理一次。一個常見的問題是:Storm會不會overcount?Storm提供了一種機制,確保消息只被傳遞一次。transactional topologies。
Distributed RPC
除了本教程已經(jīng)展示的功能,Storm還可以做許多事。其中最有趣的應(yīng)用之一是Distributed RPC。