Storm(二)官方Tutorial

原文鏈接Storm Tutorial

本人原創(chuàng)翻譯,轉(zhuǎn)載請注明出處

這個教程內(nèi)容包含如何創(chuàng)建topologies及部署到Storm集群上。Java是主要使用語言,但有的例子使用了Python,主要是為了解釋Storm的多語言能力。

前言

本教程使用的例子來自storm-starter project。建議clone該project并照著練習(xí)。閱讀Setting up a development environmentCreating a new Storm project以使你的計算機具備開始條件。
這兩篇文章本人已翻譯,請閱Storm(一)打造開發(fā)環(huán)境&創(chuàng)建一個Storm項目

Storm集群的組件

表面上看Storm集群和Hadoop集群有些像。在Hadoop上運行的是"MapReduce jobs",而在Storm上運行的是topologies。"Jobs" 和 "topologies"大不相同,有一個關(guān)鍵不同就是MapReduce job最終會停止,而topology永不停止(除非被用戶kill掉)。

Storm集群有兩類節(jié)點:master節(jié)點和worker節(jié)點。master節(jié)點上運行著一個守護程序 "Nimbus"(和Hadoop的 "JobTracker"有些像)。Nimbus負責(zé)在集群中散布code,給各個機器分配任務(wù)以及監(jiān)控失敗的情況。每個worker節(jié)點上也運行著一個守護程序"Supervisor"。Supervisor負責(zé)接收Nimbus分配的任務(wù),按需啟動和停止worker進程。每個worker進程執(zhí)行了一個topology的子集。一個運行中的topology包含了多個worker,這些worker分布在多個機器上。

所有Nimbus和Supervisors的協(xié)調(diào)都通過Zookeeper集群進行。此外,Nimbus和Supervisors是立即失敗和無狀態(tài)的(fail-fast and stateless)。所有的狀態(tài)都保存在Zookeeper或本地硬盤上。這意味著即使通過kill -9 殺死Nimbus和Supervisors,他們也會自動恢復(fù),這個設(shè)計給了Storm集群難以置信的穩(wěn)定性。

Topologies

要利用Storm來進行實時計算,就要創(chuàng)建Topologies。一個topology是一個計算的圖,topology 中的每個節(jié)點包含處理邏輯,topology 中的每個邊(link)指明了數(shù)據(jù)如何在節(jié)點中傳遞。運行一個topology很簡單,首先打包code和依賴到j(luò)ar文件,然后執(zhí)行以下命令:
storm jar all-my-code.jar org.apache.storm.MyTopology arg1 arg2
這樣就啟動了org.apache.storm.MyTopology類,參數(shù)是arg1、arg2,這個類的主函數(shù)定義了 topology并提交到Nimbus。由于topology是Thrift structs,而且Nimbus是Thrift service,因此可以用任意的編程語言創(chuàng)建和提交topologies。這里舉的是基于JVM語言的最簡單的例子,更多信息請閱Running topologies on a production cluster

Streams

Storm 中最核心的抽象概念是"stream"。stream是元組(tuples)的無限序列,Storm提供了一種分布式、可靠的方式來將一個stream轉(zhuǎn)化成一個新的stream。舉個例子,你可以將一個tweets stream轉(zhuǎn)化成一個trending topics stream。

Storm提供了"spouts" 和 "bolts"來完成stream的轉(zhuǎn)化。通過實現(xiàn)Spouts和bolts的接口,你可以運行應(yīng)用相關(guān)的邏輯。
spout是stream的來源,舉個例子,spout可以從Kestrel隊列中讀取tuples并生成stream,或者spout也可以通過Twitter API生成一個tweets stream。

bolt可以消費任意數(shù)量的輸入stream,做一些處理,很可能拋出新的stream。類似將tweets stream轉(zhuǎn)化成trending topics stream這樣的復(fù)雜轉(zhuǎn)化,常常需要多個步驟,對應(yīng)著多個bolt。Bolts可以做的事情很多,比如run functions, filter tuples, do streaming aggregations, do streaming joins, talk to databases等等。

spouts和bolts的網(wǎng)絡(luò)被打包成了"topology",這是你提交到Storm集群執(zhí)行的最高級別的抽象。 topology是一個stream轉(zhuǎn)化的圖,圖的節(jié)點是spout或bolt,圖的邊指明了哪個bolt在訂閱哪個stream。當一個spout 或 bolt發(fā)出一個tuple到一個stream,那么訂閱了這個stream的所有bolt都會收到這個tuple。


Storm topology中的每個節(jié)點都是并行運行,在你的topology中,你可以指定各個節(jié)點的并行程度,Storm會按照你指定的數(shù)目在集群中啟動相應(yīng)數(shù)量的線程。

topology會永久執(zhí)行下去,除非你停止它。Storm會自動重新分配失敗的任務(wù),此外,Storm保證數(shù)據(jù)不會丟失,即便機器宕機并且messages are dropped。

Data model

Storm的數(shù)據(jù)模型是tuple。tuple是一個命名list,字段可以是任意類型的對象。Storm支持所有primitive types, strings, and byte arrays作為tuple的字段值。如果要使用其他類型的對象,只需要實現(xiàn)serializer。
topology的每個節(jié)點都需要定義輸出的tuple字段。如下的例子中,bolt定義了兩個帶有 "double"和"triple"字段的tuple。

public class DoubleAndTripleBolt extends BaseRichBolt {
    private OutputCollectorBase _collector;

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) {
        _collector = collector;
    }

    @Override
    public void execute(Tuple input) {
        int val = input.getInteger(0);        
        _collector.emit(input, new Values(val*2, val*3));
        _collector.ack(input);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("double", "triple"));
    }    
}

一個簡單的topology

看一下storm-starter中ExclamationTopology的定義:

TopologyBuilder builder = new TopologyBuilder();        
builder.setSpout("words", new TestWordSpout(), 10);        
builder.setBolt("exclaim1", new ExclamationBolt(), 3)
        .shuffleGrouping("words");
builder.setBolt("exclaim2", new ExclamationBolt(), 2)
        .shuffleGrouping("exclaim1");

這個Topology包含一個spout和兩個bolt,spout發(fā)出words,bolt在輸入的string后面加上"!!!"。節(jié)點被組織成一條直線:spout發(fā)出到第一個bolt,然后到第二個bolt。如果spout發(fā)出了tuples ["bob"]和["john"],那么第二個bolt會發(fā)出 ["bob!!!!!!"]和["john!!!!!!"]。

setSpout 和 setBolt方法定義了節(jié)點,第一個參數(shù)是用戶定義的ID,第二個參數(shù)是處理邏輯對象,第三個參數(shù)是并發(fā)線程數(shù)。

spout的處理邏輯對象實現(xiàn)了IRichSpout接口,bolt的處理邏輯對象實現(xiàn)了IRichBolt接口。最后一個參數(shù)是可選的,如果不指定,Storm只分配一個線程。

setBolt返回一個InputDeclarer對象,這個對象定義了bolt的輸入。這里組件exclaim1聲明了它要讀取所有組件words發(fā)出的tuples。組件exclaim2聲明了它要讀取所有組件exclaim1發(fā)出的tuples。"shuffle grouping"會被隨機的分配到bolt。組件之間有多種分組數(shù)據(jù)的方式,如果想要組件exclaim2同時讀取組件words和exclaim1的tuples,可以像這樣定義:

builder.setBolt("exclaim2", new ExclamationBolt(), 5)
            .shuffleGrouping("words")
            .shuffleGrouping("exclaim1");

接下來看看這個topology的spout和bolt的實現(xiàn)。spout負責(zé)發(fā)出數(shù)據(jù)到topology。TestWordSpout每隔100ms從["nathan", "mike", "jackson", "golda", "bertels"]中發(fā)出隨機word作為一個tuple,TestWordSpout的nextTuple()實現(xiàn)如下:

public void nextTuple() {
    Utils.sleep(100);
    final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
    final Random rand = new Random();
    final String word = words[rand.nextInt(words.length)];
    _collector.emit(new Values(word));
}

ExclamationBolt的實現(xiàn)如下:

public static class ExclamationBolt implements IRichBolt {
    OutputCollector _collector;

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        _collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
        _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
        _collector.ack(tuple);
    }

    @Override
    public void cleanup() {
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

prepare方法給bolt提供了一個OutputCollector,用來從這個bolt發(fā)出tuple。Tuples可以在任何時候發(fā)出——包括prepare, execute, 或cleanup方法,甚至在另一個線程中異步發(fā)出。這里prepare方法只是保存OutputCollector為實例變量,后面execute方法會用到。

execute方法接收一個tuple,ExclamationBolt抓取tuple的第一個字段并在后面加上"!!!"。如果bolt訂閱了多個輸入源,可以通過Tuple#getSourceComponent方法查詢tuple來源。
輸入tuple被作為了emit的第一個參數(shù),最后一行ack了輸入tuple。這些是Storm可靠性API的一部分,用以保證沒有數(shù)據(jù)丟失,本教程后續(xù)會進一步介紹。

cleanup方法在bolt停止的時候調(diào)用,在這里應(yīng)該關(guān)閉所有打開的資源。Storm集群不保證一定會調(diào)用這個方法:例如,if the machine the task is running on blows up, there's no way to invoke the method. cleanup方法適用于local模式,你可以運行和停止許多topologies,不必擔(dān)心資源泄露。

declareOutputFields方法聲明了ExclamationBolt發(fā)出帶一個名為"word"字段的1-tuples。

getComponentConfiguration方法允許你配置組件運行的參數(shù)。這是一個更高級的主題Configuration

在一個bolt的實現(xiàn)中,像cleanup 和 getComponentConfiguration這樣的方法常常不需要。你可以通過繼承提供了默認實現(xiàn)的基類來更簡潔的實現(xiàn)bolt。例如:

public static class ExclamationBolt extends BaseRichBolt {
    OutputCollector _collector;

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        _collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
        _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
        _collector.ack(tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }    
}

在local模式下運行ExclamationTopology

在local模式中,Storm完全在一個進程內(nèi)運行,worker通過線程模擬,主要用于測試和開發(fā)場景。當你運行storm-starte中的topologies,他們將在local模式下運行,你能夠看到每個組件正在發(fā)出的消息。
更多l(xiāng)ocal模式信息請閱Local mode
更多分布式模式信息請閱running topologies in local mode on Local mode

如下是local模式下運行ExclamationTopology的代碼:

Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(2);

LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();

首先,通過創(chuàng)建LocalCluster對象定義了一個進程內(nèi)的集群。提交topologies到這個虛擬集群和提交到分布式集群是完全一樣的操作。submitTopology方法的第一個參數(shù)是topology的名字,第二個參數(shù)是topology的配置,第三個參數(shù)是topology對象。

這里topology的配置很常見:
TOPOLOGY_WORKERS (由setNumWorkers設(shè)置) 指定了你想分配多少進程來執(zhí)行這個topology。 topology中的每個組件將以線程的形式執(zhí)行,線程的數(shù)量由setBolt 和 setSpout配置。
TOPOLOGY_DEBUG (由setDebug設(shè)置) 當設(shè)置為true時, Storm記錄組件發(fā)出的每個消息。

更多配置信息請閱the Javadoc for Config.

Stream groupings

stream grouping用于描述組件之間如何發(fā)送tuple。在集群中,spouts 和 bolts總是并行執(zhí)行任務(wù),在執(zhí)行任務(wù)層面上,一個topology看起來如下圖所示:


當Bolt A的一個任務(wù)發(fā)出tuple到Bolt B時,應(yīng)該發(fā)到Bolt B的哪個任務(wù)呢?
stream groupings就是用來解決這個問題。在深入了解不同種類的stream groupings之前,我們先看看storm-starter中的另一個topology。WordCountTopology從spout中讀取句子,WordCountBolt統(tǒng)計單詞出現(xiàn)的次數(shù)。

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("sentences", new RandomSentenceSpout(), 5);        
builder.setBolt("split", new SplitSentence(), 8)
        .shuffleGrouping("sentences");
builder.setBolt("count", new WordCount(), 12)
        .fieldsGrouping("split", new Fields("word"));

SplitSentence發(fā)出句子中的單詞作為tuple,WordCount以map的形式存儲單詞出現(xiàn)的次數(shù),每次WordCount收到單詞,就更新map并發(fā)出新的單詞數(shù)目。

最簡單的grouping方式是"shuffle grouping",也就是隨機發(fā)送tuple給任務(wù)。"fields grouping"是一個更有趣的grouping方式,這里用在了SplitSentence bolt 和 WordCount bolt之間。對WordCount bolt來說,相同的單詞應(yīng)該發(fā)送到相同的任務(wù),否則多個任務(wù)都會收到同樣的單詞,由于每個任務(wù)的信息都不完全,他們可能會發(fā)出錯誤的單詞數(shù)目。fields grouping用字段的子集來分組,字段值相同的tuple被發(fā)送到相同的任務(wù)。

Fields groupings是實現(xiàn)streaming joins 和 streaming aggregations,它的底層實現(xiàn)利用了mod hashing。還有一些其他的分組方式,請閱Concepts。

使用其他編程語言定義Bolts

Bolts可以使用任意語言定義,用JVM-based語言以外定義的Bolts以子進程的方式運行,Storm以stdin/stdout之上的JSON格式消息與子進程通信。通信協(xié)議用到了一個100行左右的適配器庫,支持Ruby, Python, Fancy。

WordCountTopology的SplitSentence定義如下:

public static class SplitSentence extends ShellBolt implements IRichBolt {
    public SplitSentence() {
        super("python", "splitsentence.py");
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

SplitSentence重寫了ShellBolt,聲明它使用python,參數(shù)是splitsentence.py。splitsentence.py實現(xiàn)如下:

import storm

class SplitSentenceBolt(storm.BasicBolt):
    def process(self, tup):
        words = tup.values[0].split(" ")
        for word in words:
          storm.emit([word])

SplitSentenceBolt().run()

更多多語言信息,請閱Using non-JVM languages with Storm

確保消息被處理

這部分屬于Storm's reliability API的內(nèi)容:Storm如何保證spout發(fā)出的消息都能被處理,請閱Guaranteeing Message Processing

Transactional topologies

Storm保證每個消息都至少被處理一次。一個常見的問題是:Storm會不會overcount?Storm提供了一種機制,確保消息只被傳遞一次。transactional topologies。

Distributed RPC

除了本教程已經(jīng)展示的功能,Storm還可以做許多事。其中最有趣的應(yīng)用之一是Distributed RPC

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Date: Nov 17-24, 2017 1. 目的 積累Storm為主的流式大數(shù)據(jù)處理平臺對實時數(shù)據(jù)處理的相關(guān)...
    一只很努力爬樹的貓閱讀 2,333評論 0 4
  • 參考文章: Apache Storm 官方文檔中文版 storm Tutorial 的解讀 + 個人理解 官方文檔...
    louisliaoxh閱讀 1,238評論 0 1
  • Storm入門系列之一:storm核心概念及特性 本文的將介紹一些 storm 入門的基礎(chǔ)知識,包括 storm ...
    zhaif閱讀 3,404評論 0 17
  • 什么是實時流計算? 主要的處理模式可以分為:流處理,批處理 流處理是直接處理,有時也分為在線,離線,近線(st...
    Bloo_m閱讀 5,217評論 1 1
  • 是什么時候開始,我喜歡上了寫點文字,表達自己的心情。這個日子大概要追述到青春期的時候,那時候的我們正走...
    周周writing閱讀 573評論 1 1

友情鏈接更多精彩內(nèi)容