Flume使用兩個獨立的事務(wù)分別負(fù)責(zé)從soucrce到channel,以及從channel到sink的event傳遞。一旦事務(wù)中所有的event全部傳遞到channel且提交成功,那么source就將該文件標(biāo)記為完成。同理,事務(wù)以類似的方式處理從channel到sink的傳遞過程,如果因為某種原因使得event無法記錄,那么事務(wù)將會回滾,且所有的event都會保持到channel中,等待重新傳遞。
Flume的事務(wù)機制保證了source產(chǎn)生的每個event都會傳送到sink中(如果失敗會無限重試),flume采用的是At-least-once的提交方式,這樣就造成每個source產(chǎn)生的event至少到達sink一次,這種方式保證了數(shù)據(jù)的可靠性,但數(shù)據(jù)可能重復(fù)。
Transaction接口定義如下:
public void begin();
public void commit();
public void rollback();
public void close();
以MemoryTransaction介紹介紹下事務(wù)機制:

MemoryTransaction是MemoryChannel中的一個內(nèi)部類,內(nèi)部有2個阻塞隊列putList和takeList,MemoryChannel內(nèi)部有個queue阻塞隊列。putList接收Source交給Channel的event數(shù)據(jù),takeList保存Channel交給Sink的event數(shù)據(jù)。
- 如果Source交給Channel任務(wù)完成,進行commit時,會把putList中的所有event放到MemoryChannel中的queue。
- 如果Source交給Channel任務(wù)失敗,進行rollback時,程序就不會繼續(xù)走下去,比如KafkaSource需要commitOffsets,如果任務(wù)失敗就不會commitOffsets。
- 如果Sink處理完Channel帶來的event,進行commit的時,會清空takeList中的event數(shù)據(jù),因為已經(jīng)沒consume。
- 如果Sink處理Channel帶來的event失敗,進行rollback的時,會把takeList中的event寫回到queue中。
commit的關(guān)鍵代碼:
@Override
protected void doCommit() throws InterruptedException {
int puts = putList.size();
int takes = takeList.size();
synchronized(queueLock) {
if(puts > 0 ) {
// 清空putList,丟到外部類MemoryChannel中的queue隊列里
while(!putList.isEmpty()) {
// MemoryChannel中的queue隊列
if(!queue.offer(putList.removeFirst())) {
throw new RuntimeException("Queue add failed, this shouldn't be able to happen");
}
}
}
putList.clear();
takeList.clear();
}
}
rollback的關(guān)鍵代碼
@Override
protected void doRollback() {
int takes = takeList.size();
synchronized(queueLock) {
Preconditions.checkState(queue.remainingCapacity() >= takeList.size(), "Not enough space in memory channel " +
"queue to rollback takes. This should never happen, please report");
// 把takeList中的數(shù)據(jù)放回到queue中
while(!takeList.isEmpty()) {
queue.addFirst(takeList.removeLast());
}
putList.clear();
}
}