數(shù)據(jù)傳輸事務(wù)分析

本文基于ThriftSource,MemoryChannel,HdfsSink三個(gè)組件,對(duì)Flume數(shù)據(jù)傳輸?shù)氖聞?wù)進(jìn)行分析,如果使用的是其他組件,F(xiàn)lume事務(wù)具體的處理方式將會(huì)不同。

一般情況下,用MemoryChannel就好了,我們公司用的就是這個(gè),F(xiàn)ileChannel速度慢,雖然提供日志級(jí)別的數(shù)據(jù)恢復(fù),但是一般情況下,不斷電MemoryChannel是不會(huì)丟數(shù)據(jù)的。

Flume提供事物操作,保證用戶的數(shù)據(jù)的可靠性,主要體現(xiàn)在:
數(shù)據(jù)在傳輸?shù)较聜€(gè)節(jié)點(diǎn)時(shí)(通常是批量數(shù)據(jù)),如果接收節(jié)點(diǎn)出現(xiàn)異常,比如網(wǎng)絡(luò)異常,則回滾這一批數(shù)據(jù)。因此有可能導(dǎo)致數(shù)據(jù)重發(fā)
同個(gè)節(jié)點(diǎn)內(nèi),Source寫(xiě)入數(shù)據(jù)到Channel,數(shù)據(jù)在一個(gè)批次內(nèi)的數(shù)據(jù)出現(xiàn)異常,則不寫(xiě)入到Channel。已接收到的部分?jǐn)?shù)據(jù)直接拋棄,靠上一個(gè)節(jié)點(diǎn)重發(fā)數(shù)據(jù)。

編程模型

Flume在對(duì)Channel進(jìn)行Put和Take操作的時(shí)候,必須要用事物包住,比如:

Channel ch = new MemoryChannel();
Transaction txn = ch.getTransaction();
//事物開(kāi)始
txn.begin();
try {

  Event eventToStage = EventBuilder.withBody("Hello Flume!",
                       Charset.forName("UTF-8"));
  //往臨時(shí)緩沖區(qū)Put數(shù)據(jù)
  ch.put(eventToStage);
  //或者ch.take()

  //將這些數(shù)據(jù)提交到channel中
  txn.commit();
} catch (Throwable t) {
  txn.rollback();

  if (t instanceof Error) {
    throw (Error)t;
  }
} finally {
  txn.close();
}

Put事務(wù)流程

Put事務(wù)可以分為以下階段:

  • doPut:將批數(shù)據(jù)先寫(xiě)入臨時(shí)緩沖區(qū)putList
  • doCommit:檢查channel內(nèi)存隊(duì)列是否足夠合并。
  • doRollback:channel內(nèi)存隊(duì)列空間不足,拋棄數(shù)據(jù)

我們從Source數(shù)據(jù)接收到寫(xiě)入Channel這個(gè)過(guò)程對(duì)Put事物進(jìn)行分析。

ThriftSource會(huì)spawn多個(gè)Worker線程(ThriftSourceHandler)去處理數(shù)據(jù),Worker處理數(shù)據(jù)的接口,我們只看batch批量處理這個(gè)接口:

@Override
    public Status appendBatch(List<ThriftFlumeEvent> events) throws TException {

      List<Event> flumeEvents = Lists.newArrayList();
      for(ThriftFlumeEvent event : events) {
        flumeEvents.add(EventBuilder.withBody(event.getBody(),
          event.getHeaders()));
      }

        //ChannelProcessor,在Source初始化的時(shí)候傳進(jìn)來(lái).將數(shù)據(jù)寫(xiě)入對(duì)應(yīng)的Channel
        getChannelProcessor().processEventBatch(flumeEvents);
        ...

      return Status.OK;
    }

事務(wù)邏輯都在processEventBatch這個(gè)方法里:

public void processEventBatch(List<Event> events) {
    ...
    //預(yù)處理每行數(shù)據(jù),有人用來(lái)做ETL嘛
    events = interceptorChain.intercept(events);
    ...
    //分類(lèi)數(shù)據(jù),劃分不同的channel集合對(duì)應(yīng)的數(shù)據(jù)

    // Process required channels
    Transaction tx = reqChannel.getTransaction();
    ...
        //事務(wù)開(kāi)始,tx即MemoryTransaction類(lèi)實(shí)例
        tx.begin();
        List<Event> batch = reqChannelQueue.get(reqChannel);
        for (Event event : batch) {
          // 這個(gè)put操作實(shí)際調(diào)用的是transaction.doPut
          reqChannel.put(event);
        }
        //提交,將數(shù)據(jù)寫(xiě)入Channel的隊(duì)列中
        tx.commit();
      } catch (Throwable t) {
        //回滾
        tx.rollback();
        ...
      }
    }
    ...
  }

每個(gè)Worker線程都擁有一個(gè)Transaction實(shí)例,保存在Channel(BasicChannelSemantics)里的ThreadLocal變量currentTransaction.
那么,事務(wù)到底做了什么?


實(shí)際上,Transaction實(shí)例包含兩個(gè)雙向阻塞隊(duì)列LinkedBlockingDeque(感覺(jué)沒(méi)必要用雙向隊(duì)列,每個(gè)線程寫(xiě)自己的putList,又不是多個(gè)線程?),分別為:

  • putList
  • takeList

對(duì)于Put事物操作,當(dāng)然是只用到putList了。putList就是一個(gè)臨時(shí)的緩沖區(qū),數(shù)據(jù)會(huì)先put到putList,最后由commit方法會(huì)檢查channel是否有足夠的緩沖區(qū),有則合并到channel的隊(duì)列。

channel.put -> transaction.doPut:

protected void doPut(Event event) throws InterruptedException {
      //計(jì)算數(shù)據(jù)字節(jié)大小
      int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize);
      //寫(xiě)入臨時(shí)緩沖區(qū)putList
      if (!putList.offer(event)) {
        throw new ChannelException(
          "Put queue for MemoryTransaction of capacity " +
            putList.size() + " full, consider committing more frequently, " +
            "increasing capacity or increasing thread count");
      }
      putByteCounter += eventByteSize;
    }

transaction.commit:

@Override
    protected void doCommit() throws InterruptedException {
      //檢查channel的隊(duì)列剩余大小是否足夠
      ...

      int puts = putList.size();
      ...
      synchronized(queueLock) {
        if(puts > 0 ) {
          while(!putList.isEmpty()) {
            //寫(xiě)入到channel的隊(duì)列
            if(!queue.offer(putList.removeFirst())) {
              throw new RuntimeException("Queue add failed, this shouldn't be able to happen");
            }
          }
        }
        //清除臨時(shí)隊(duì)列
        putList.clear();
        ...
      }
      ...
    }

如果在事務(wù)期間出現(xiàn)異常,比如channel剩余空間不足,則rollback:

@Override
    protected void doRollback() {
    ...
        //拋棄數(shù)據(jù),沒(méi)合并到channel的內(nèi)存隊(duì)列
        putList.clear();
      ...
    }

Take事務(wù)

Take事務(wù)分為以下階段:

  • doTake:先將數(shù)據(jù)取到臨時(shí)緩沖區(qū)takeList,將數(shù)據(jù)發(fā)送到下一個(gè)節(jié)點(diǎn)
  • doCommit:如果數(shù)據(jù)全部發(fā)送成功,則清除臨時(shí)緩沖區(qū)takeList
  • doRollback:數(shù)據(jù)發(fā)送過(guò)程中如果出現(xiàn)異常,rollback將臨時(shí)緩沖區(qū)takeList中的數(shù)據(jù)歸還給channel內(nèi)存隊(duì)列。

Sink其實(shí)是由SinkRunner線程調(diào)用Sink.process方法來(lái)了處理數(shù)據(jù)的。我們從HdfsEventSink的process方法說(shuō)起,Sink類(lèi)都有個(gè)process方法,用來(lái)處理傳輸數(shù)據(jù)的邏輯。:

public Status process() throws EventDeliveryException {
    ...
    Transaction transaction = channel.getTransaction();
    ...
    //事務(wù)開(kāi)始
    transaction.begin();
    ...
      for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {
        //take數(shù)據(jù)到臨時(shí)緩沖區(qū),實(shí)際調(diào)用的是transaction.doTake
        Event event = channel.take();
        if (event == null) {
          break;
        }
        ...
      //寫(xiě)數(shù)據(jù)到HDFS
      bucketWriter.append(event);
      ...
      // flush all pending buckets before committing the transaction
      for (BucketWriter bucketWriter : writers) {
        bucketWriter.flush();
      }
      //commit
      transaction.commit();
      ...
    } catch (IOException eIO) {
      transaction.rollback();
      ...
    } finally {
      transaction.close();
    }
  }

大致流程圖:


接著看看channel.take,作用是將數(shù)據(jù)放到臨時(shí)緩沖區(qū),實(shí)際調(diào)用的是transaction.doTake:

protected Event doTake() throws InterruptedException {
      ...
      //從channel內(nèi)存隊(duì)列取數(shù)據(jù)
      synchronized(queueLock) {
        event = queue.poll();
      }
      ...
      //將數(shù)據(jù)放到臨時(shí)緩沖區(qū)
      takeList.put(event);
      ...
      return event;
    }

接著,HDFS寫(xiě)線程bucketWriter將take到的數(shù)據(jù)寫(xiě)到HDFS,如果批數(shù)據(jù)都寫(xiě)完了,則要commit了:

protected void doCommit() throws InterruptedException {
    ...
    takeList.clear();
    ...
}

很簡(jiǎn)單,其實(shí)就是清空takeList而已。如果bucketWriter在寫(xiě)數(shù)據(jù)到HDFS的時(shí)候出現(xiàn)異常,則要rollback:

protected void doRollback() {
      int takes = takeList.size();
      //檢查內(nèi)存隊(duì)列空間大小,是否足夠takeList寫(xiě)回去
      synchronized(queueLock) {
        Preconditions.checkState(queue.remainingCapacity() >= takeList.size(), "Not enough space in memory channel " +
            "queue to rollback takes. This should never happen, please report");
        while(!takeList.isEmpty()) {
          queue.addFirst(takeList.removeLast());
        }
        ...
      }
      ...
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容