Spark 任務(wù)輸出目錄_temporary目錄未刪除問(wèn)題排查

https://blog.csdn.net/u013332124/article/details/90677530

一、問(wèn)題描述

在hdfs上看到有個(gè)輸出目錄有_temporary目錄,但任務(wù)實(shí)際已經(jīng)結(jié)束了。

在這里插入圖片描述

有_SUCCESS文件表示這個(gè)任務(wù)已經(jīng)結(jié)束了。

二、問(wèn)題定位

Spark 輸出數(shù)據(jù)到 HDFS 時(shí),需要解決如下問(wèn)題:

  • 由于多個(gè) Task 同時(shí)寫(xiě)數(shù)據(jù)到 HDFS,如何保證要么所有 Task 寫(xiě)的所有文件要么同時(shí)對(duì)外可見(jiàn),要么同時(shí)對(duì)外不可見(jiàn),即保證數(shù)據(jù)一致性
  • 同一 Task 可能因?yàn)?Speculation 而存在兩個(gè)完全相同的 Task 實(shí)例寫(xiě)相同的數(shù)據(jù)到 HDFS中,如何保證只有一個(gè) commit 成功
  • 對(duì)于大 Job(如具有幾萬(wàn)甚至幾十萬(wàn) Task),如何高效管理所有文件

所以spark任務(wù)在輸出時(shí)不會(huì)立即輸出到目標(biāo)目錄,而且在目標(biāo)目錄下創(chuàng)建一個(gè)_temporary文件用于輸出數(shù)據(jù),每個(gè)task都有自己?jiǎn)为?dú)的輸出目錄。等最終job結(jié)束后,才真正的將_temporary目錄下的文件move到目標(biāo)目錄下,并且刪除_temporary目錄。

通過(guò)查看spark代碼我們發(fā)現(xiàn),spark判斷job結(jié)束的方式是每個(gè)partiton的task都執(zhí)行結(jié)束,而不管speculation的task是否已經(jīng)結(jié)束。比如有個(gè)task因?yàn)閳?zhí)行慢開(kāi)啟了推測(cè)執(zhí)行,那么它就會(huì)有兩個(gè)attempt,這時(shí)只要有一個(gè)attempt運(yùn)行成功,那么就判定為這個(gè)partition的task運(yùn)行成功了。所以在job運(yùn)行成功時(shí),可能還有一些speculation還在執(zhí)行。(雖然task運(yùn)行成功后,會(huì)去主動(dòng)kill其他的attempt,但是這是一個(gè)異步的過(guò)程,可能剛好在kill的過(guò)程中job已經(jīng)結(jié)束了

job結(jié)束后會(huì)去刪除_temporary目錄,但是因?yàn)檫€有幾個(gè)speculation task還在運(yùn)行,這幾個(gè)speculation task可能又創(chuàng)建了_temporary作為輸出。這樣,我們就觀察到這個(gè)目錄運(yùn)行結(jié)束了還有_temporary目錄存在。

通過(guò)相關(guān)日志,我們也可以判斷確實(shí)是這么一回事。

通過(guò)_temporary目錄下的文件,我們可以判斷出是task 20和task682出現(xiàn)了問(wèn)題:

在這里插入圖片描述

因此我們可以去ApplicationMaster中搜索相關(guān)日志:

在這里插入圖片描述
在這里插入圖片描述

上面那張圖說(shuō)明stage 667在05:19:44,717的時(shí)候已經(jīng)運(yùn)行結(jié)束了,但是在這之后還陸陸續(xù)續(xù)有task上報(bào)結(jié)果上來(lái),其中最晚的兩個(gè)就是task 20和task 682。倒數(shù)第三個(gè)上報(bào)信息的task是task 351,這個(gè)task沒(méi)有出現(xiàn)在_temporary,說(shuō)明_temporary目錄應(yīng)該是在05:19:45,865左右刪除的。之后task 20和task 682又創(chuàng)建了這個(gè)目錄。

三、driver kill task的時(shí)間

我們以task 20來(lái)分析。

通過(guò)日志觀察,task 20在05:19:43,546就已經(jīng)運(yùn)行成功了,并且主動(dòng)去kill了task 20 attempt 0的那個(gè)運(yùn)行記錄。

2019-05-09 05:19:43,546 - INFO  [task-result-getter-3 : org.apache.spark.scheduler.TaskSetManager @ 54] - Killing attempt 0 for task 20.0 in stage 667.0 (TID 605422) on dn143144.hadoop.harbinger2.data.m.com as the attempt 1 succeeded on dn143115.hadoop.harbinger2.data.m.c
om
2019-05-09 05:19:43,546 - INFO  [task-result-getter-3 : org.apache.spark.scheduler.TaskSetManager @ 54] - Finished task 20.1 in stage 667.0 (TID 605893) in 233 ms on dn143115.hadoop.harbinger2.data.m.com (executor 2161) (951/1000)

task 20.0到05:19:46,010才真正上報(bào)了fail的結(jié)果上來(lái)。期間經(jīng)過(guò)了2.5s。

看了下executor的日志,發(fā)現(xiàn)05:19:43,548 就收到了kill的請(qǐng)求,整個(gè)請(qǐng)求傳輸只用了2ms

2019-05-09 05:19:43,548 - INFO  [dispatcher-event-loop-10 : org.apache.spark.executor.Executor @ 54] - Executor is trying to kill task 20.0 in stage 667.0 (TID 605422)

在05:19:46,006才真正kill了task 20.0。

2019-05-09 05:19:46,006 - INFO  [Executor task launch worker-0 : org.apache.spark.executor.Executor @ 54] - Executor killed task 20.0 in stage 667.0 (TID 605422)

看了下kill task的相關(guān)代碼,發(fā)現(xiàn)它是以設(shè)置task運(yùn)行線程interrupt標(biāo)志位的方式來(lái)kill task的。因此,kill的過(guò)程和task的代碼是否能正確響應(yīng)中斷有關(guān)。2.5s 的時(shí)間就可以理解了

//Task.scala
/**
 * Kills a task by setting the interrupted flag to true. This relies on the upper level Spark
 * code and user code to properly handle the flag. This function should be idempotent so it can
 * be called multiple times.
 * If interruptThread is true, we will also call Thread.interrupt() on the Task's executor thread.
 */
def kill(interruptThread: Boolean, reason: String) {
  require(reason != null)
  _reasonIfKilled = reason
  if (context != null) {
    context.markInterrupted(reason)
  }
  if (interruptThread && taskThread != null) {
    taskThread.interrupt()
  }
}

從方法的注釋可以看出,用戶代碼要處理好中斷,否則可能導(dǎo)致task kill時(shí)間很長(zhǎng)的問(wèn)題

四、解決方案

從spark的某處代碼可以看出,spark方面是知道這個(gè)問(wèn)題的,但是到目前版本都沒(méi)有修改這個(gè)問(wèn)題。

// Sometimes (e.g., when speculative task is enabled), temporary directories may be left
// uncleaned. Here we simply ignore them.
if (currentPath.getName.toLowerCase == "_temporary") {
  return (None, None)
}

如果要解決這個(gè)問(wèn)題,可以修改stage的判斷規(guī)則,確保所有的task都上報(bào)結(jié)果上來(lái)了再結(jié)束job(包括那些被kill的task)。但是這樣就會(huì)有一個(gè)問(wèn)題,就像上面說(shuō)的,kill task是以比較溫和的方式進(jìn)行,如果用戶代碼沒(méi)處理好中斷,很可能會(huì)導(dǎo)致kill時(shí)間很久的問(wèn)題。如果以這種方式來(lái)判斷job結(jié)束,任務(wù)的運(yùn)行時(shí)間會(huì)更長(zhǎng)。同時(shí)很可能導(dǎo)致推測(cè)執(zhí)行沒(méi)有任何意義。

因此,這可能也是spark明知道有這個(gè)問(wèn)題卻一直沒(méi)修復(fù)的原因。

參考資料

https://zhuanlan.zhihu.com/p/45351972

https://issues.apache.org/jira/browse/SPARK-8513

https://issues.apache.org/jira/browse/SPARK-4879

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容