Storm Trident 之 Trident API介紹

Trident是什么

Trident是Storm上的高層次抽象,它能夠在提供高吞吐量的能力同時(每秒幾百萬消息),也提供了有狀態(tài)的流式處理和低延遲分布式查詢的能力。它類似Pig這種高級批處理工具,Trident提供了joins、aggregations、grouping、functions以及filters等功能函數(shù)。

Trident主要提供了以下功能:

  • 常見的流分析操作,比如join、aggregation等。具體就是Trident提供的API操作。
  • 一次性處理語義(exactly-once)。
  • 事務(wù)數(shù)據(jù)存儲(transaction)。

Trident核心數(shù)據(jù)模型是一系列批處理(Batch)的流,也就是說雖然Storm Trident處理的是Stream,但是處理過程中Trident將Stream分隔成Batch來進(jìn)行處理。

Batch

所以Stream會被切分成一個個Batch分布到集群中,所有應(yīng)用在Stream上的函數(shù)都會具體應(yīng)用到每個節(jié)點(diǎn)的Batch上中,來實(shí)現(xiàn)并行計算。

為什么使用Trident

Storm Topology適合一些無統(tǒng)計、不需要Transaction(事務(wù))的應(yīng)用,比如過濾、清洗數(shù)據(jù)等場景。Topology在開啟Ack的情況下,能夠保證數(shù)據(jù)不丟失但可能重復(fù)。
而Trident適合需要嚴(yán)格不丟不重復(fù)消息的場景,比如交易額統(tǒng)計。Trident通過事務(wù)來實(shí)現(xiàn)eactly-once,保證數(shù)據(jù)不丟不重復(fù)。但同時,使用Trident會使其性能有所下降。

Triden API

Trident API可以分為Spout操作和Bolt操作,對于Bolt操作提供常見的流數(shù)據(jù)分析操作。
Bolt Trident提供了五種類型的操作:

  • 本地分區(qū)操作(Partition-local operations),操作應(yīng)用到本地每個分區(qū)上,這部分操作不會產(chǎn)生網(wǎng)絡(luò)傳輸。
  • 重分區(qū)操作,對數(shù)據(jù)流進(jìn)行重新分區(qū),但是不會改變數(shù)據(jù)內(nèi)容,這部分操作會有網(wǎng)絡(luò)傳輸。
  • 聚合操作,這部分操作會有網(wǎng)絡(luò)傳輸。
  • 流分組操作。
  • 合并(meger)和連接(join)操作。

Trident Spout

Trident與Storm topology一樣也是使用Spout作為Trident拓?fù)涞臄?shù)據(jù)源。Trident Spout提供了更復(fù)雜的API,因?yàn)樗瓤梢垣@取事務(wù)數(shù)據(jù)源,也可以獲取非事務(wù)數(shù)據(jù)源。

對于非事務(wù)的Spout,可以使用普通的Storm IRichSpout接口:

TridentTopology topology = new TridentTopology();
topology.newStream("myspoutid",new RichSpout());

Trident拓?fù)渲?,所有Spout都需要指定一個唯一的流的標(biāo)識,比如這里的“myspoutid”(整個集群級別的唯一標(biāo)識)。Trident使用該唯一標(biāo)識存儲Spout元數(shù)據(jù),比如txId(事務(wù)ID)以及其它Spout相關(guān)的信息。
可以通過如下配置,來配置Zookeeper保存Spout的元數(shù)據(jù)。

transactional.zookeeper.servers:Zookeeper主機(jī)列表
transactional.zookeeper.port:Zookeeper集群端口
transactional.zookeeper.root:在Zookeeper存儲元數(shù)據(jù)的根目錄

下面是Trident Spout一些類型:

  • ITridentSpout:最通用API接口,可以支持事務(wù)和不透明事務(wù)語義。一般會用這個API分區(qū)的特性,而不是直接使用該接口。
  • IBatchSpout:非事務(wù)Spout,每次發(fā)射一個Batch的元組。
  • IPartitionedTridentSpout:事務(wù)Spout,從分區(qū)數(shù)據(jù)源讀取數(shù)據(jù),比如Kafka集群。
  • IOpaquePartitionedTridentSpout:不透明事務(wù)Spout,從分區(qū)數(shù)據(jù)源中讀取數(shù)據(jù)。

本地分區(qū)操作

本地分區(qū)操作不會產(chǎn)生網(wǎng)絡(luò)傳輸,并且會獨(dú)立的應(yīng)用到batch的每個分區(qū)上。

函數(shù)操作(Functions)

函數(shù)用于接受一個tuple,并且指定接收這個tuple的哪些field,它會發(fā)射(emit)0個或多個tuple。輸出的tuple feild會被追加到原始tuple的后面,如果不輸出tuple就意味著這個tuple被過濾掉了。比如下面的實(shí)例:

class MyFunction extends BaseFunction {
        /**
         * 在每個元組上面執(zhí)行該邏輯函數(shù),并且發(fā)射0個或多個元組
         *
         * @param tuple     傳入的元組
         * @param collector 用于發(fā)射元組的收集器實(shí)例
         */
        @Override
        public void execute(TridentTuple tuple, TridentCollector collector) {
            //tuple.getInteger(0)接收第一個Field
            for(int i=0; i< tuple.getInteger(0); i++) {
                collector.emit(new Values(i));
            }
        }
    }

假設(shè)我們有一個“mystream”流,其中每個tuple包含以下filed ['a','b','c'],比如有以下元組。

[1,2,3]
[2,1,2]
[10,0,1]

我們將每個元組都經(jīng)過以下MyFunction操作:

//將每個tuple的字段"b"應(yīng)用于MyFunction,并且產(chǎn)生新字段"d"追加到原tuple的字段中
mystream.each(new Fields("b"),new MyFunction(),new Fields('d'))

得到數(shù)據(jù)為:

//[1,2,3]的emit,其中0和1是新字段“d”
[1,2,3,0]
[1,2,3,1]
//[2,1,2]的emit,其中0是新字段“d”
[2,1,2,0]
//[10,0,1]不滿足需求,過濾掉了

過濾操作(Filters)

接收一個tuple,并決定這個tuple是否應(yīng)該被保留。比如我們有以下Filter操作:

class MyFilter extends BaseFilter {
        /**
         * 確定是否應(yīng)該從流中過濾元組
         *
         * @param tuple 被評估的元組
         * @return 返回"false"則該元組被拋棄,返回"true"則該元組被保留
         */
        @Override
        public boolean isKeep(TridentTuple tuple) {
            //每個tuple中的第一個Field
            return tuple.getInteger(0) > 1;
        }
    }

同樣以Function中的實(shí)例進(jìn)行操作:

mystream.filter(new MyFilter())。

輸出數(shù)據(jù):

#[1,2,3]中的第一個字段不大于1,所以被過濾掉
[2,1,2]
[10,0,1]

map和flatMap操作

map接收一個tuple,將其作用在map函數(shù)上,并且返回經(jīng)map函數(shù)處理過的tuple字段值。
比如下面的實(shí)例:

class UpperMap implements MapFunction {
        /**
         * 流中的每個trident元組調(diào)用
         *
         * @param input 接受trident tuple
         * @return 返回轉(zhuǎn)換之后的值
         */
        @Override
        public Values execute(TridentTuple input) {
            //只返回原tuple中第一個Filed的大寫字符串(其它filed被丟棄了)
            return new Values(input.getString(0).toUpperCase());
        }
    }

flatMap類似map,但是它會分兩步執(zhí)行:執(zhí)行flat將所有元素展開,然后每個元素使用map函數(shù)。比如[[1,2],3,4]經(jīng)過flat操作后得到元素集合為[1,2,3,4]。比如我們有以下實(shí)例:

class SplitFlatMap implements FlatMapFunction {
        /**
         * 流中的每個trident元組調(diào)用
         *
         * @param input 接收的trident tuple
         * @return 一個可迭代的結(jié)果集
         */
        @Override
        public Iterable<Values> execute(TridentTuple input) {
            List<Values> resultValues = new ArrayList<>();
            //獲取一個Filed并將其以空格作為切割
            for(String word : input.getString(0).split(" ")){
                resultValues.add(new Values(word));
            }
            return resultValues;
        }
    }

我們通過上面的flatMap和Map就可以得到一個流的所有大寫詞組流了。

mystream.flatMap(new SplitFlatMap()).map(new UpperMap())。

通常我們也可以將map或flatMap的輸出結(jié)果命名一個新字段:

mystream.flatMap(new SplitFlatMap(),new Fields("word"))

peek操作

peek操作一般用來debug,比如查看上一步的操作結(jié)果。假如我們有以下peek操作。

class PrintPeek implements Consumer {
        /**
         * 對于輸入的每個trident元組應(yīng)用以下操作
         *
         * @param input 接收的trident 元組
         */
        @Override
        public void accept(TridentTuple input) {
            System.out.println(input.getString(0));
        }
    }

以下處理操作,能把轉(zhuǎn)換大寫之后的tuple打印打出來:

mystream.flatMap(new SplitFlatMap()).map(new UpperMap()).peek(new PrintPeek());

min和minBy操作

返回一批(Batch)元組中的每個分區(qū)的最小值。
比如一批(Batch)元組有以下三個partition,它們對應(yīng)的Field為['device-id','count']。

Partiton 0:
[213,15]
[125,21]
[100,10]

Partition 1:
[123,20]
[215,32]
[183,25]

針對以上數(shù)據(jù)統(tǒng)計count最小的device-id:

mystream.minBy(new Fields("count"));

返回結(jié)果:

Partition 0:
[100,10]

Partition 1:
[123,20]

除了以上使用方式,我們還可以通過傳入比較器來使用min和minBy:

public <T> Stream minBy(String inputFieldName, Comparator<T> comparator) 
public Stream min(Comparator<TridentTuple> comparator)
max和maxBy操作

max和maxBy操作同min/minBy操作,只不過返回最大值。

mystream.maxBy(new Fields("count"));

上面實(shí)例輸出結(jié)果為:

Partition 0:
[125,21]
Partition 1:
[215,32]

max和maxBy也提供了自定義比較器的方法:

public <T> Stream maxBy(String inputFieldName, Comparator<T> comparator) 
public Stream max(Comparator<TridentTuple> comparator) 

窗口操作(Window)

Trident流能夠處理具有相同窗口的元素,對它們進(jìn)行聚合操作,然后將聚合結(jié)果向下發(fā)送。Storm支持兩種窗口操作:翻滾窗口(Tumbing window)和滑動窗口(Sliding window)。

Tumbing window

元組根據(jù)處理時間或計數(shù)分組到一個窗口中,任何元組只屬于其中一個窗口。

//返回一個元組流的聚合結(jié)果,它是滾動窗口內(nèi)每windowCount個數(shù)的聚合結(jié)果
public Stream tumblingWindow(int windowCount, WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields);
//返回一個元組流的聚合結(jié)果,這些元組是一個窗口的聚合結(jié)果,該窗口在windowDuration的持續(xù)時間內(nèi)滾動
public Stream tumblingWindow(BaseWindowedBolt.Duration windowDuration, WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields);
Sliding window

元組在每個滑動間隔的窗口內(nèi)分組,一個元組可能屬于多個窗口。

//返回一個元組流的聚合結(jié)果,它是滑動窗口每windowCount個元組樹的聚合結(jié)果,并在slideCount之后滑動窗口
public Stream slidingWindow(int windowCount, int slideCount, WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields);
//返回一個元組流的聚合結(jié)果,該窗口在slidingInterval持續(xù)滑動,并在windowDuration處完成一個窗口
public Stream slidingWindow(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slidingInterval, WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields);
Common window

除了上面提供的滾動窗口api和滑動窗口api,Trident還提供了公用窗口api,通過windowConfig可以支持任意窗口。

public Stream window(WindowConfig windowConfig, WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields)

Trident window api需要使用WindowsStoreFactor存儲接收到的元組和聚合值。目前,Trident提供了HBaseWindowsStoreFactor的HBase實(shí)現(xiàn)。

partitionAggregate操作

partitionAggregate對一批(batch)元組的每個分區(qū)進(jìn)行聚合,與前面Function在元組后面追加不同F(xiàn)ield不同,partitionAggregate會使用發(fā)射出去的元組替換接收進(jìn)來的元組。比如以下實(shí)例:
比如有以下數(shù)據(jù),對應(yīng)的Field分別為["a","b"]:

Partition 0:
["a":1]
["b":2]

Partition 1:
["c":2]
["d":2]

使用partitionAggregate進(jìn)行求和:

mystream.partitionAggregate(new Fields("b"),new Sum(),new Fields("sum"))

經(jīng)過partitionAggregate函數(shù)之后的結(jié)果為:

Partition 0:
["sum":3]

Partition 1:
["sum":4]

Trident API提供了三個聚合器接口:CombinerAggregator、ReducerAggregator和Aggregator。

CombinerAggregator操作

CombinerAggregator只返回單個tuple,并且這個tuple只包含一個Field。每個元組首先都經(jīng)過init函數(shù)進(jìn)行預(yù)處理,然后在執(zhí)行combine函數(shù)來計算接受到的tuple,直到最后一個tuple到達(dá)。如果分區(qū)內(nèi)沒有tuple,則會通過zero函數(shù)發(fā)射結(jié)果。

public interface CombinerAggregator<T> extends Serializable {
    T init(TridentTuple tuple);
    T combine(T val1, T val2);
    T zero();
}

比如以下實(shí)例:

class Count implements CombinerAggregator<Long> {
        @Override
        public Long init(TridentTuple tuple) {
            //計數(shù),每個tuple代表一個數(shù)
            return 1L;
        }
        @Override
        public Long combine(Long val1, Long val2) {
            return val1 + val2;
        }
        @Override
        public Long zero() {
            return 0L;
        }
    }
ReducerAggregator操作

ReducerAggregator通過init方法提供一個初始值,然后每個輸入的tuple迭代這個值,最后產(chǎn)生一個唯一的tuple輸出。

public interface ReducerAggregator<T> extends Serializable {
    T init();
    T reduce(T curr, TridentTuple tuple);
}

比如同樣使用RecuerAggregator來實(shí)現(xiàn)計數(shù)器:

 class  Count implements ReducerAggregator<Long> {
        @Override
        public Long init() {
            return 0L;
        }

        @Override
        public Long reduce(Long curr, TridentTuple tuple) {
            return curr + 1;
        }
    }
Aggregator

執(zhí)行聚合操作最通用的接口就是Aggregator了,它能夠發(fā)射任意數(shù)量的元組,每個元組可以包含任意數(shù)量的字段。

public interface Aggregator<T> extends Operation {
    T init(Object batchId, TridentCollector collector);
    void aggregate(T state, TridentTuple tuple, TridentCollector collector);
    void complete(T state, TridentCollector collector);
}

它的執(zhí)行流程是:

  1. 在處理Batch之前調(diào)用init方法,它返回一個聚合的狀態(tài)值,傳遞給aggregate和complete方法。
  2. 為批處理分區(qū)中的每個tuple調(diào)用aggregate方法,此方法可以更新狀態(tài)值,也可以發(fā)射元組。
  3. 當(dāng)aggregator處理完Batch分區(qū)的所有元組后調(diào)用complete方法。

使用Aggregator來實(shí)現(xiàn)計數(shù)器:

class CountAgg extends BaseAggregator<CountAgg.CountState> {
        class CountState{
            long count = 0;
        }

        @Override
        public CountState init(Object batchId, TridentCollector collector) {
            return new CountState();
        }

        @Override
        public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector) {
            state.count += 1;
        }

        @Override
        public void complete(CountState state, TridentCollector collector) {
            collector.emit(new Values(state.count));
        }
    }

狀態(tài)查詢(stateQuery)和分區(qū)持久化(partitionPersist)

stateQuery用于查詢狀態(tài)源,partitionPersist用于更新狀態(tài)源。具體使用方式可查看:http://storm.apache.org/releases/1.2.2/Trident-state.html

投影(projection)操作

projection操作用于只保留指定的字段,比如元組有字段["a","b","c","d"],通過以下投影操作,輸出流只會包含["c","d"]。

mystream.projection(new Fields("c","d"));

重分區(qū)操作

Repartition操作運(yùn)行一個函數(shù)來改變元組在任務(wù)之間的分布,調(diào)整分區(qū)數(shù)也可能會導(dǎo)致Repartition操作。重分區(qū)操作會引發(fā)網(wǎng)絡(luò)傳輸。下面是重分區(qū)的相關(guān)函數(shù):

  • shuffle:使用隨機(jī)算法來均衡tuple到每個分區(qū)。
  • broadcast:每個tuple被廣播到所有分區(qū)上,使用DRPC時使用這種方法比較多,比如每個分區(qū)上做stateQuery。
  • global:所有tuple都發(fā)送到一個分區(qū)上,這個分區(qū)用來處理stream。
  • batchGlobal:一個batch中的所有tuple會發(fā)送到一個分區(qū)中,不同batch的元組會被發(fā)送到不同分區(qū)上。
  • partition:通過一個自定義的分區(qū)函數(shù)來進(jìn)行分區(qū),這個自定義函數(shù)需要實(shí)現(xiàn)org.apache.storm.grouping.CustomStreamGrouping

聚合操作

Trident提供了aggregate和persistentAggregate方法,aggregate運(yùn)行在每個batch中,而persistentAggregate將聚合所有Batch,并將結(jié)果保存在一個狀態(tài)源上。
我們前面講的aggregate、CombinerAggregator和ReducerAggregator運(yùn)行在patitionAggregation上是本地分區(qū)操作。如果直接作用于流上,則是對全局進(jìn)行聚合。
在對全局流進(jìn)行聚合時,Aggregator和ReducerAggregator會首先重分區(qū)到一個單分區(qū),然后在該分區(qū)上執(zhí)行聚合函數(shù)。而CombinerAggregator則會首先聚合每個分區(qū),然后重分區(qū)到單個分區(qū),在網(wǎng)絡(luò)傳輸中完成聚合操作。所以我們應(yīng)該盡量用CombinerAggregator,因?yàn)樗痈咝А?/p>

mystream.aggregate(new Count(),new Fields("count"));

流分組操作

groupBy操作會重新分區(qū)流,對指定字段執(zhí)行partitionBy操作,指定字段相同的元組被劃分到相同的分區(qū)。goupBy操作如下圖:

Group

如果在流分組中運(yùn)行聚合器,聚合會在每個group中運(yùn)行,而不是對整個Batch操作。

合并和連接

Trident可以允許我們將不同流組合在一起,通過TridentTopology.merge()方法操作。

//合并流會以第一個流的輸出字段來命名
topology.mege(stream1,stream2,stream3);

另一種合并流的方式是連接,類似于SQL那樣的連接,要求輸入是有限的。所以Trident的join只適用于來自Spout的每個小Bath之間。
比如有一個流包含["key1","val1","val2"],另一個流包含["key2","val1","val2"],通過以下連接操作:

//Trident需要join之后的流重新命名,因?yàn)檩斎肓骺赡艽嬖谥貜?fù) 字段。
mystream.join(stream1,new Fields("key1"),stream2,new Fields("key2"),new Fields("key","a","b","c","d"))
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容