14_Flume之事務(wù)

Flume使用兩個獨立的事務(wù)分別負(fù)責(zé)從soucrce到channel,以及從channel到sink的event傳遞。一旦事務(wù)中所有的event全部傳遞到channel且提交成功,那么source就將該文件標(biāo)記為完成。同理,事務(wù)以類似的方式處理從channel到sink的傳遞過程,如果因為某種原因使得event無法記錄,那么事務(wù)將會回滾,且所有的event都會保持到channel中,等待重新傳遞。

Flume的事務(wù)機制保證了source產(chǎn)生的每個event都會傳送到sink中(如果失敗會無限重試),flume采用的是At-least-once的提交方式,這樣就造成每個source產(chǎn)生的event至少到達sink一次,這種方式保證了數(shù)據(jù)的可靠性,但數(shù)據(jù)可能重復(fù)。

Transaction接口定義如下:

public void begin();

public void commit();

public void rollback();

public void close();

以MemoryTransaction介紹介紹下事務(wù)機制:

MemoryTransaction是MemoryChannel中的一個內(nèi)部類,內(nèi)部有2個阻塞隊列putList和takeList,MemoryChannel內(nèi)部有個queue阻塞隊列。putList接收Source交給Channel的event數(shù)據(jù),takeList保存Channel交給Sink的event數(shù)據(jù)。

  1. 如果Source交給Channel任務(wù)完成,進行commit時,會把putList中的所有event放到MemoryChannel中的queue。
  2. 如果Source交給Channel任務(wù)失敗,進行rollback時,程序就不會繼續(xù)走下去,比如KafkaSource需要commitOffsets,如果任務(wù)失敗就不會commitOffsets。
  3. 如果Sink處理完Channel帶來的event,進行commit的時,會清空takeList中的event數(shù)據(jù),因為已經(jīng)沒consume。
  4. 如果Sink處理Channel帶來的event失敗,進行rollback的時,會把takeList中的event寫回到queue中。

commit的關(guān)鍵代碼:

@Override
protected void doCommit() throws InterruptedException {
  int puts = putList.size();
  int takes = takeList.size();
  synchronized(queueLock) {
    if(puts > 0 ) {
      // 清空putList,丟到外部類MemoryChannel中的queue隊列里
      while(!putList.isEmpty()) {
        // MemoryChannel中的queue隊列
        if(!queue.offer(putList.removeFirst())) {
          throw new RuntimeException("Queue add failed, this shouldn't be able to happen");
        }
      }
    }
    putList.clear();
    takeList.clear();
  }
}

rollback的關(guān)鍵代碼

@Override
protected void doRollback() {
  int takes = takeList.size();
  synchronized(queueLock) {
    Preconditions.checkState(queue.remainingCapacity() >= takeList.size(), "Not enough space in memory channel " +
        "queue to rollback takes. This should never happen, please report");
    // 把takeList中的數(shù)據(jù)放回到queue中
    while(!takeList.isEmpty()) {
      queue.addFirst(takeList.removeLast());
    }
    putList.clear();
  }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容