本文基于ThriftSource,MemoryChannel,HdfsSink三個(gè)組件,對(duì)Flume數(shù)據(jù)傳輸?shù)氖聞?wù)進(jìn)行分析,如果使用的是其他組件,F(xiàn)lume事務(wù)具體的處理方式將會(huì)不同。
一般情況下,用MemoryChannel就好了,我們公司用的就是這個(gè),F(xiàn)ileChannel速度慢,雖然提供日志級(jí)別的數(shù)據(jù)恢復(fù),但是一般情況下,不斷電MemoryChannel是不會(huì)丟數(shù)據(jù)的。
Flume提供事物操作,保證用戶的數(shù)據(jù)的可靠性,主要體現(xiàn)在:
數(shù)據(jù)在傳輸?shù)较聜€(gè)節(jié)點(diǎn)時(shí)(通常是批量數(shù)據(jù)),如果接收節(jié)點(diǎn)出現(xiàn)異常,比如網(wǎng)絡(luò)異常,則回滾這一批數(shù)據(jù)。因此有可能導(dǎo)致數(shù)據(jù)重發(fā)
同個(gè)節(jié)點(diǎn)內(nèi),Source寫(xiě)入數(shù)據(jù)到Channel,數(shù)據(jù)在一個(gè)批次內(nèi)的數(shù)據(jù)出現(xiàn)異常,則不寫(xiě)入到Channel。已接收到的部分?jǐn)?shù)據(jù)直接拋棄,靠上一個(gè)節(jié)點(diǎn)重發(fā)數(shù)據(jù)。
編程模型
Flume在對(duì)Channel進(jìn)行Put和Take操作的時(shí)候,必須要用事物包住,比如:
Channel ch = new MemoryChannel();
Transaction txn = ch.getTransaction();
//事物開(kāi)始
txn.begin();
try {
Event eventToStage = EventBuilder.withBody("Hello Flume!",
Charset.forName("UTF-8"));
//往臨時(shí)緩沖區(qū)Put數(shù)據(jù)
ch.put(eventToStage);
//或者ch.take()
//將這些數(shù)據(jù)提交到channel中
txn.commit();
} catch (Throwable t) {
txn.rollback();
if (t instanceof Error) {
throw (Error)t;
}
} finally {
txn.close();
}
Put事務(wù)流程
Put事務(wù)可以分為以下階段:
- doPut:將批數(shù)據(jù)先寫(xiě)入臨時(shí)緩沖區(qū)putList
- doCommit:檢查channel內(nèi)存隊(duì)列是否足夠合并。
- doRollback:channel內(nèi)存隊(duì)列空間不足,拋棄數(shù)據(jù)
我們從Source數(shù)據(jù)接收到寫(xiě)入Channel這個(gè)過(guò)程對(duì)Put事物進(jìn)行分析。
ThriftSource會(huì)spawn多個(gè)Worker線程(ThriftSourceHandler)去處理數(shù)據(jù),Worker處理數(shù)據(jù)的接口,我們只看batch批量處理這個(gè)接口:
@Override
public Status appendBatch(List<ThriftFlumeEvent> events) throws TException {
List<Event> flumeEvents = Lists.newArrayList();
for(ThriftFlumeEvent event : events) {
flumeEvents.add(EventBuilder.withBody(event.getBody(),
event.getHeaders()));
}
//ChannelProcessor,在Source初始化的時(shí)候傳進(jìn)來(lái).將數(shù)據(jù)寫(xiě)入對(duì)應(yīng)的Channel
getChannelProcessor().processEventBatch(flumeEvents);
...
return Status.OK;
}
事務(wù)邏輯都在processEventBatch這個(gè)方法里:
public void processEventBatch(List<Event> events) {
...
//預(yù)處理每行數(shù)據(jù),有人用來(lái)做ETL嘛
events = interceptorChain.intercept(events);
...
//分類(lèi)數(shù)據(jù),劃分不同的channel集合對(duì)應(yīng)的數(shù)據(jù)
// Process required channels
Transaction tx = reqChannel.getTransaction();
...
//事務(wù)開(kāi)始,tx即MemoryTransaction類(lèi)實(shí)例
tx.begin();
List<Event> batch = reqChannelQueue.get(reqChannel);
for (Event event : batch) {
// 這個(gè)put操作實(shí)際調(diào)用的是transaction.doPut
reqChannel.put(event);
}
//提交,將數(shù)據(jù)寫(xiě)入Channel的隊(duì)列中
tx.commit();
} catch (Throwable t) {
//回滾
tx.rollback();
...
}
}
...
}
每個(gè)Worker線程都擁有一個(gè)Transaction實(shí)例,保存在Channel(BasicChannelSemantics)里的ThreadLocal變量currentTransaction.
那么,事務(wù)到底做了什么?
實(shí)際上,Transaction實(shí)例包含兩個(gè)雙向阻塞隊(duì)列LinkedBlockingDeque(感覺(jué)沒(méi)必要用雙向隊(duì)列,每個(gè)線程寫(xiě)自己的putList,又不是多個(gè)線程?),分別為:
- putList
- takeList
對(duì)于Put事物操作,當(dāng)然是只用到putList了。putList就是一個(gè)臨時(shí)的緩沖區(qū),數(shù)據(jù)會(huì)先put到putList,最后由commit方法會(huì)檢查channel是否有足夠的緩沖區(qū),有則合并到channel的隊(duì)列。
channel.put -> transaction.doPut:
protected void doPut(Event event) throws InterruptedException {
//計(jì)算數(shù)據(jù)字節(jié)大小
int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize);
//寫(xiě)入臨時(shí)緩沖區(qū)putList
if (!putList.offer(event)) {
throw new ChannelException(
"Put queue for MemoryTransaction of capacity " +
putList.size() + " full, consider committing more frequently, " +
"increasing capacity or increasing thread count");
}
putByteCounter += eventByteSize;
}
transaction.commit:
@Override
protected void doCommit() throws InterruptedException {
//檢查channel的隊(duì)列剩余大小是否足夠
...
int puts = putList.size();
...
synchronized(queueLock) {
if(puts > 0 ) {
while(!putList.isEmpty()) {
//寫(xiě)入到channel的隊(duì)列
if(!queue.offer(putList.removeFirst())) {
throw new RuntimeException("Queue add failed, this shouldn't be able to happen");
}
}
}
//清除臨時(shí)隊(duì)列
putList.clear();
...
}
...
}
如果在事務(wù)期間出現(xiàn)異常,比如channel剩余空間不足,則rollback:
@Override
protected void doRollback() {
...
//拋棄數(shù)據(jù),沒(méi)合并到channel的內(nèi)存隊(duì)列
putList.clear();
...
}
Take事務(wù)
Take事務(wù)分為以下階段:
- doTake:先將數(shù)據(jù)取到臨時(shí)緩沖區(qū)takeList,將數(shù)據(jù)發(fā)送到下一個(gè)節(jié)點(diǎn)
- doCommit:如果數(shù)據(jù)全部發(fā)送成功,則清除臨時(shí)緩沖區(qū)takeList
- doRollback:數(shù)據(jù)發(fā)送過(guò)程中如果出現(xiàn)異常,rollback將臨時(shí)緩沖區(qū)takeList中的數(shù)據(jù)歸還給channel內(nèi)存隊(duì)列。
Sink其實(shí)是由SinkRunner線程調(diào)用Sink.process方法來(lái)了處理數(shù)據(jù)的。我們從HdfsEventSink的process方法說(shuō)起,Sink類(lèi)都有個(gè)process方法,用來(lái)處理傳輸數(shù)據(jù)的邏輯。:
public Status process() throws EventDeliveryException {
...
Transaction transaction = channel.getTransaction();
...
//事務(wù)開(kāi)始
transaction.begin();
...
for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {
//take數(shù)據(jù)到臨時(shí)緩沖區(qū),實(shí)際調(diào)用的是transaction.doTake
Event event = channel.take();
if (event == null) {
break;
}
...
//寫(xiě)數(shù)據(jù)到HDFS
bucketWriter.append(event);
...
// flush all pending buckets before committing the transaction
for (BucketWriter bucketWriter : writers) {
bucketWriter.flush();
}
//commit
transaction.commit();
...
} catch (IOException eIO) {
transaction.rollback();
...
} finally {
transaction.close();
}
}
大致流程圖:
接著看看channel.take,作用是將數(shù)據(jù)放到臨時(shí)緩沖區(qū),實(shí)際調(diào)用的是transaction.doTake:
protected Event doTake() throws InterruptedException {
...
//從channel內(nèi)存隊(duì)列取數(shù)據(jù)
synchronized(queueLock) {
event = queue.poll();
}
...
//將數(shù)據(jù)放到臨時(shí)緩沖區(qū)
takeList.put(event);
...
return event;
}
接著,HDFS寫(xiě)線程bucketWriter將take到的數(shù)據(jù)寫(xiě)到HDFS,如果批數(shù)據(jù)都寫(xiě)完了,則要commit了:
protected void doCommit() throws InterruptedException {
...
takeList.clear();
...
}
很簡(jiǎn)單,其實(shí)就是清空takeList而已。如果bucketWriter在寫(xiě)數(shù)據(jù)到HDFS的時(shí)候出現(xiàn)異常,則要rollback:
protected void doRollback() {
int takes = takeList.size();
//檢查內(nèi)存隊(duì)列空間大小,是否足夠takeList寫(xiě)回去
synchronized(queueLock) {
Preconditions.checkState(queue.remainingCapacity() >= takeList.size(), "Not enough space in memory channel " +
"queue to rollback takes. This should never happen, please report");
while(!takeList.isEmpty()) {
queue.addFirst(takeList.removeLast());
}
...
}
...
}