Trident是什么
Trident是Storm上的高層次抽象,它能夠在提供高吞吐量的能力同時(每秒幾百萬消息),也提供了有狀態(tài)的流式處理和低延遲分布式查詢的能力。它類似Pig這種高級批處理工具,Trident提供了joins、aggregations、grouping、functions以及filters等功能函數(shù)。
Trident主要提供了以下功能:
- 常見的流分析操作,比如join、aggregation等。具體就是Trident提供的API操作。
- 一次性處理語義(exactly-once)。
- 事務(wù)數(shù)據(jù)存儲(transaction)。
Trident核心數(shù)據(jù)模型是一系列批處理(Batch)的流,也就是說雖然Storm Trident處理的是Stream,但是處理過程中Trident將Stream分隔成Batch來進(jìn)行處理。

所以Stream會被切分成一個個Batch分布到集群中,所有應(yīng)用在Stream上的函數(shù)都會具體應(yīng)用到每個節(jié)點(diǎn)的Batch上中,來實(shí)現(xiàn)并行計算。
為什么使用Trident
Storm Topology適合一些無統(tǒng)計、不需要Transaction(事務(wù))的應(yīng)用,比如過濾、清洗數(shù)據(jù)等場景。Topology在開啟Ack的情況下,能夠保證數(shù)據(jù)不丟失但可能重復(fù)。
而Trident適合需要嚴(yán)格不丟不重復(fù)消息的場景,比如交易額統(tǒng)計。Trident通過事務(wù)來實(shí)現(xiàn)eactly-once,保證數(shù)據(jù)不丟不重復(fù)。但同時,使用Trident會使其性能有所下降。
Triden API
Trident API可以分為Spout操作和Bolt操作,對于Bolt操作提供常見的流數(shù)據(jù)分析操作。
Bolt Trident提供了五種類型的操作:
- 本地分區(qū)操作(Partition-local operations),操作應(yīng)用到本地每個分區(qū)上,這部分操作不會產(chǎn)生網(wǎng)絡(luò)傳輸。
- 重分區(qū)操作,對數(shù)據(jù)流進(jìn)行重新分區(qū),但是不會改變數(shù)據(jù)內(nèi)容,這部分操作會有網(wǎng)絡(luò)傳輸。
- 聚合操作,這部分操作會有網(wǎng)絡(luò)傳輸。
- 流分組操作。
- 合并(meger)和連接(join)操作。
Trident Spout
Trident與Storm topology一樣也是使用Spout作為Trident拓?fù)涞臄?shù)據(jù)源。Trident Spout提供了更復(fù)雜的API,因?yàn)樗瓤梢垣@取事務(wù)數(shù)據(jù)源,也可以獲取非事務(wù)數(shù)據(jù)源。
對于非事務(wù)的Spout,可以使用普通的Storm IRichSpout接口:
TridentTopology topology = new TridentTopology();
topology.newStream("myspoutid",new RichSpout());
Trident拓?fù)渲?,所有Spout都需要指定一個唯一的流的標(biāo)識,比如這里的“myspoutid”(整個集群級別的唯一標(biāo)識)。Trident使用該唯一標(biāo)識存儲Spout元數(shù)據(jù),比如txId(事務(wù)ID)以及其它Spout相關(guān)的信息。
可以通過如下配置,來配置Zookeeper保存Spout的元數(shù)據(jù)。
transactional.zookeeper.servers:Zookeeper主機(jī)列表
transactional.zookeeper.port:Zookeeper集群端口
transactional.zookeeper.root:在Zookeeper存儲元數(shù)據(jù)的根目錄
下面是Trident Spout一些類型:
- ITridentSpout:最通用API接口,可以支持事務(wù)和不透明事務(wù)語義。一般會用這個API分區(qū)的特性,而不是直接使用該接口。
- IBatchSpout:非事務(wù)Spout,每次發(fā)射一個Batch的元組。
- IPartitionedTridentSpout:事務(wù)Spout,從分區(qū)數(shù)據(jù)源讀取數(shù)據(jù),比如Kafka集群。
- IOpaquePartitionedTridentSpout:不透明事務(wù)Spout,從分區(qū)數(shù)據(jù)源中讀取數(shù)據(jù)。
本地分區(qū)操作
本地分區(qū)操作不會產(chǎn)生網(wǎng)絡(luò)傳輸,并且會獨(dú)立的應(yīng)用到batch的每個分區(qū)上。
函數(shù)操作(Functions)
函數(shù)用于接受一個tuple,并且指定接收這個tuple的哪些field,它會發(fā)射(emit)0個或多個tuple。輸出的tuple feild會被追加到原始tuple的后面,如果不輸出tuple就意味著這個tuple被過濾掉了。比如下面的實(shí)例:
class MyFunction extends BaseFunction {
/**
* 在每個元組上面執(zhí)行該邏輯函數(shù),并且發(fā)射0個或多個元組
*
* @param tuple 傳入的元組
* @param collector 用于發(fā)射元組的收集器實(shí)例
*/
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
//tuple.getInteger(0)接收第一個Field
for(int i=0; i< tuple.getInteger(0); i++) {
collector.emit(new Values(i));
}
}
}
假設(shè)我們有一個“mystream”流,其中每個tuple包含以下filed ['a','b','c'],比如有以下元組。
[1,2,3]
[2,1,2]
[10,0,1]
我們將每個元組都經(jīng)過以下MyFunction操作:
//將每個tuple的字段"b"應(yīng)用于MyFunction,并且產(chǎn)生新字段"d"追加到原tuple的字段中
mystream.each(new Fields("b"),new MyFunction(),new Fields('d'))
得到數(shù)據(jù)為:
//[1,2,3]的emit,其中0和1是新字段“d”
[1,2,3,0]
[1,2,3,1]
//[2,1,2]的emit,其中0是新字段“d”
[2,1,2,0]
//[10,0,1]不滿足需求,過濾掉了
過濾操作(Filters)
接收一個tuple,并決定這個tuple是否應(yīng)該被保留。比如我們有以下Filter操作:
class MyFilter extends BaseFilter {
/**
* 確定是否應(yīng)該從流中過濾元組
*
* @param tuple 被評估的元組
* @return 返回"false"則該元組被拋棄,返回"true"則該元組被保留
*/
@Override
public boolean isKeep(TridentTuple tuple) {
//每個tuple中的第一個Field
return tuple.getInteger(0) > 1;
}
}
同樣以Function中的實(shí)例進(jìn)行操作:
mystream.filter(new MyFilter())。
輸出數(shù)據(jù):
#[1,2,3]中的第一個字段不大于1,所以被過濾掉
[2,1,2]
[10,0,1]
map和flatMap操作
map接收一個tuple,將其作用在map函數(shù)上,并且返回經(jīng)map函數(shù)處理過的tuple字段值。
比如下面的實(shí)例:
class UpperMap implements MapFunction {
/**
* 流中的每個trident元組調(diào)用
*
* @param input 接受trident tuple
* @return 返回轉(zhuǎn)換之后的值
*/
@Override
public Values execute(TridentTuple input) {
//只返回原tuple中第一個Filed的大寫字符串(其它filed被丟棄了)
return new Values(input.getString(0).toUpperCase());
}
}
flatMap類似map,但是它會分兩步執(zhí)行:執(zhí)行flat將所有元素展開,然后每個元素使用map函數(shù)。比如[[1,2],3,4]經(jīng)過flat操作后得到元素集合為[1,2,3,4]。比如我們有以下實(shí)例:
class SplitFlatMap implements FlatMapFunction {
/**
* 流中的每個trident元組調(diào)用
*
* @param input 接收的trident tuple
* @return 一個可迭代的結(jié)果集
*/
@Override
public Iterable<Values> execute(TridentTuple input) {
List<Values> resultValues = new ArrayList<>();
//獲取一個Filed并將其以空格作為切割
for(String word : input.getString(0).split(" ")){
resultValues.add(new Values(word));
}
return resultValues;
}
}
我們通過上面的flatMap和Map就可以得到一個流的所有大寫詞組流了。
mystream.flatMap(new SplitFlatMap()).map(new UpperMap())。
通常我們也可以將map或flatMap的輸出結(jié)果命名一個新字段:
mystream.flatMap(new SplitFlatMap(),new Fields("word"))
peek操作
peek操作一般用來debug,比如查看上一步的操作結(jié)果。假如我們有以下peek操作。
class PrintPeek implements Consumer {
/**
* 對于輸入的每個trident元組應(yīng)用以下操作
*
* @param input 接收的trident 元組
*/
@Override
public void accept(TridentTuple input) {
System.out.println(input.getString(0));
}
}
以下處理操作,能把轉(zhuǎn)換大寫之后的tuple打印打出來:
mystream.flatMap(new SplitFlatMap()).map(new UpperMap()).peek(new PrintPeek());
min和minBy操作
返回一批(Batch)元組中的每個分區(qū)的最小值。
比如一批(Batch)元組有以下三個partition,它們對應(yīng)的Field為['device-id','count']。
Partiton 0:
[213,15]
[125,21]
[100,10]
Partition 1:
[123,20]
[215,32]
[183,25]
針對以上數(shù)據(jù)統(tǒng)計count最小的device-id:
mystream.minBy(new Fields("count"));
返回結(jié)果:
Partition 0:
[100,10]
Partition 1:
[123,20]
除了以上使用方式,我們還可以通過傳入比較器來使用min和minBy:
public <T> Stream minBy(String inputFieldName, Comparator<T> comparator)
public Stream min(Comparator<TridentTuple> comparator)
max和maxBy操作
max和maxBy操作同min/minBy操作,只不過返回最大值。
mystream.maxBy(new Fields("count"));
上面實(shí)例輸出結(jié)果為:
Partition 0:
[125,21]
Partition 1:
[215,32]
max和maxBy也提供了自定義比較器的方法:
public <T> Stream maxBy(String inputFieldName, Comparator<T> comparator)
public Stream max(Comparator<TridentTuple> comparator)
窗口操作(Window)
Trident流能夠處理具有相同窗口的元素,對它們進(jìn)行聚合操作,然后將聚合結(jié)果向下發(fā)送。Storm支持兩種窗口操作:翻滾窗口(Tumbing window)和滑動窗口(Sliding window)。
Tumbing window
元組根據(jù)處理時間或計數(shù)分組到一個窗口中,任何元組只屬于其中一個窗口。
//返回一個元組流的聚合結(jié)果,它是滾動窗口內(nèi)每windowCount個數(shù)的聚合結(jié)果
public Stream tumblingWindow(int windowCount, WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields);
//返回一個元組流的聚合結(jié)果,這些元組是一個窗口的聚合結(jié)果,該窗口在windowDuration的持續(xù)時間內(nèi)滾動
public Stream tumblingWindow(BaseWindowedBolt.Duration windowDuration, WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields);
Sliding window
元組在每個滑動間隔的窗口內(nèi)分組,一個元組可能屬于多個窗口。
//返回一個元組流的聚合結(jié)果,它是滑動窗口每windowCount個元組樹的聚合結(jié)果,并在slideCount之后滑動窗口
public Stream slidingWindow(int windowCount, int slideCount, WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields);
//返回一個元組流的聚合結(jié)果,該窗口在slidingInterval持續(xù)滑動,并在windowDuration處完成一個窗口
public Stream slidingWindow(BaseWindowedBolt.Duration windowDuration, BaseWindowedBolt.Duration slidingInterval, WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields);
Common window
除了上面提供的滾動窗口api和滑動窗口api,Trident還提供了公用窗口api,通過windowConfig可以支持任意窗口。
public Stream window(WindowConfig windowConfig, WindowsStoreFactory windowStoreFactory, Fields inputFields, Aggregator aggregator, Fields functionFields)
Trident window api需要使用WindowsStoreFactor存儲接收到的元組和聚合值。目前,Trident提供了HBaseWindowsStoreFactor的HBase實(shí)現(xiàn)。
partitionAggregate操作
partitionAggregate對一批(batch)元組的每個分區(qū)進(jìn)行聚合,與前面Function在元組后面追加不同F(xiàn)ield不同,partitionAggregate會使用發(fā)射出去的元組替換接收進(jìn)來的元組。比如以下實(shí)例:
比如有以下數(shù)據(jù),對應(yīng)的Field分別為["a","b"]:
Partition 0:
["a":1]
["b":2]
Partition 1:
["c":2]
["d":2]
使用partitionAggregate進(jìn)行求和:
mystream.partitionAggregate(new Fields("b"),new Sum(),new Fields("sum"))
經(jīng)過partitionAggregate函數(shù)之后的結(jié)果為:
Partition 0:
["sum":3]
Partition 1:
["sum":4]
Trident API提供了三個聚合器接口:CombinerAggregator、ReducerAggregator和Aggregator。
CombinerAggregator操作
CombinerAggregator只返回單個tuple,并且這個tuple只包含一個Field。每個元組首先都經(jīng)過init函數(shù)進(jìn)行預(yù)處理,然后在執(zhí)行combine函數(shù)來計算接受到的tuple,直到最后一個tuple到達(dá)。如果分區(qū)內(nèi)沒有tuple,則會通過zero函數(shù)發(fā)射結(jié)果。
public interface CombinerAggregator<T> extends Serializable {
T init(TridentTuple tuple);
T combine(T val1, T val2);
T zero();
}
比如以下實(shí)例:
class Count implements CombinerAggregator<Long> {
@Override
public Long init(TridentTuple tuple) {
//計數(shù),每個tuple代表一個數(shù)
return 1L;
}
@Override
public Long combine(Long val1, Long val2) {
return val1 + val2;
}
@Override
public Long zero() {
return 0L;
}
}
ReducerAggregator操作
ReducerAggregator通過init方法提供一個初始值,然后每個輸入的tuple迭代這個值,最后產(chǎn)生一個唯一的tuple輸出。
public interface ReducerAggregator<T> extends Serializable {
T init();
T reduce(T curr, TridentTuple tuple);
}
比如同樣使用RecuerAggregator來實(shí)現(xiàn)計數(shù)器:
class Count implements ReducerAggregator<Long> {
@Override
public Long init() {
return 0L;
}
@Override
public Long reduce(Long curr, TridentTuple tuple) {
return curr + 1;
}
}
Aggregator
執(zhí)行聚合操作最通用的接口就是Aggregator了,它能夠發(fā)射任意數(shù)量的元組,每個元組可以包含任意數(shù)量的字段。
public interface Aggregator<T> extends Operation {
T init(Object batchId, TridentCollector collector);
void aggregate(T state, TridentTuple tuple, TridentCollector collector);
void complete(T state, TridentCollector collector);
}
它的執(zhí)行流程是:
- 在處理Batch之前調(diào)用init方法,它返回一個聚合的狀態(tài)值,傳遞給aggregate和complete方法。
- 為批處理分區(qū)中的每個tuple調(diào)用aggregate方法,此方法可以更新狀態(tài)值,也可以發(fā)射元組。
- 當(dāng)aggregator處理完Batch分區(qū)的所有元組后調(diào)用complete方法。
使用Aggregator來實(shí)現(xiàn)計數(shù)器:
class CountAgg extends BaseAggregator<CountAgg.CountState> {
class CountState{
long count = 0;
}
@Override
public CountState init(Object batchId, TridentCollector collector) {
return new CountState();
}
@Override
public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector) {
state.count += 1;
}
@Override
public void complete(CountState state, TridentCollector collector) {
collector.emit(new Values(state.count));
}
}
狀態(tài)查詢(stateQuery)和分區(qū)持久化(partitionPersist)
stateQuery用于查詢狀態(tài)源,partitionPersist用于更新狀態(tài)源。具體使用方式可查看:http://storm.apache.org/releases/1.2.2/Trident-state.html
投影(projection)操作
projection操作用于只保留指定的字段,比如元組有字段["a","b","c","d"],通過以下投影操作,輸出流只會包含["c","d"]。
mystream.projection(new Fields("c","d"));
重分區(qū)操作
Repartition操作運(yùn)行一個函數(shù)來改變元組在任務(wù)之間的分布,調(diào)整分區(qū)數(shù)也可能會導(dǎo)致Repartition操作。重分區(qū)操作會引發(fā)網(wǎng)絡(luò)傳輸。下面是重分區(qū)的相關(guān)函數(shù):
- shuffle:使用隨機(jī)算法來均衡tuple到每個分區(qū)。
- broadcast:每個tuple被廣播到所有分區(qū)上,使用DRPC時使用這種方法比較多,比如每個分區(qū)上做stateQuery。
- global:所有tuple都發(fā)送到一個分區(qū)上,這個分區(qū)用來處理stream。
- batchGlobal:一個batch中的所有tuple會發(fā)送到一個分區(qū)中,不同batch的元組會被發(fā)送到不同分區(qū)上。
- partition:通過一個自定義的分區(qū)函數(shù)來進(jìn)行分區(qū),這個自定義函數(shù)需要實(shí)現(xiàn)
org.apache.storm.grouping.CustomStreamGrouping。
聚合操作
Trident提供了aggregate和persistentAggregate方法,aggregate運(yùn)行在每個batch中,而persistentAggregate將聚合所有Batch,并將結(jié)果保存在一個狀態(tài)源上。
我們前面講的aggregate、CombinerAggregator和ReducerAggregator運(yùn)行在patitionAggregation上是本地分區(qū)操作。如果直接作用于流上,則是對全局進(jìn)行聚合。
在對全局流進(jìn)行聚合時,Aggregator和ReducerAggregator會首先重分區(qū)到一個單分區(qū),然后在該分區(qū)上執(zhí)行聚合函數(shù)。而CombinerAggregator則會首先聚合每個分區(qū),然后重分區(qū)到單個分區(qū),在網(wǎng)絡(luò)傳輸中完成聚合操作。所以我們應(yīng)該盡量用CombinerAggregator,因?yàn)樗痈咝А?/p>
mystream.aggregate(new Count(),new Fields("count"));
流分組操作
groupBy操作會重新分區(qū)流,對指定字段執(zhí)行partitionBy操作,指定字段相同的元組被劃分到相同的分區(qū)。goupBy操作如下圖:

如果在流分組中運(yùn)行聚合器,聚合會在每個group中運(yùn)行,而不是對整個Batch操作。
合并和連接
Trident可以允許我們將不同流組合在一起,通過TridentTopology.merge()方法操作。
//合并流會以第一個流的輸出字段來命名
topology.mege(stream1,stream2,stream3);
另一種合并流的方式是連接,類似于SQL那樣的連接,要求輸入是有限的。所以Trident的join只適用于來自Spout的每個小Bath之間。
比如有一個流包含["key1","val1","val2"],另一個流包含["key2","val1","val2"],通過以下連接操作:
//Trident需要join之后的流重新命名,因?yàn)檩斎肓骺赡艽嬖谥貜?fù) 字段。
mystream.join(stream1,new Fields("key1"),stream2,new Fields("key2"),new Fields("key","a","b","c","d"))