如果關(guān)注這個(gè)領(lǐng)域的同學(xué)可能知道,Ray其實(shí)在去年就已經(jīng)在開源社區(qū)正式發(fā)布了,只不過后來就一直沒有什么太大動(dòng)靜,前段時(shí)間也是因?yàn)闄C(jī)緣巧合,我又回頭學(xué)習(xí)了解了一下,順便總結(jié)如下:
Ray是什么?
Ray 是RISELab實(shí)驗(yàn)室(前身也就是開發(fā)Spark/Mesos等的AMPLab實(shí)驗(yàn)室)針對(duì)機(jī)器學(xué)習(xí)領(lǐng)域開發(fā)的一種新的分布式計(jì)算框架。按照官方的定義:“Ray is a flexible, high-performance distributed execution framework”。看起來很明確的定義對(duì)吧,不過所謂的:“靈活的高性能的分布式執(zhí)行框架”,這句話無論是哪個(gè)分布式計(jì)算框架,大概都會(huì)往自己身上套。那么Ray的不同之處在哪里呢?當(dāng)Spark/Flink/TensorFlow等一眾計(jì)算框架在機(jī)器學(xué)習(xí)領(lǐng)域中不斷開疆?dāng)U土,風(fēng)頭正勁的時(shí)候,為什么RISELab的同學(xué)們又要另起爐灶,再發(fā)明一個(gè)新的輪子呢?

這個(gè)問題的答案和其他發(fā)明輪子的同學(xué)的說法也很類似:因?yàn)榧扔械南到y(tǒng)不滿足某種需求。那么這種需求是真需求還是偽需求,如果是真需求,既有的系統(tǒng)不能滿足的原因是暫時(shí)的,可以改進(jìn)的,不關(guān)本質(zhì)的具體實(shí)現(xiàn)問題,還是由于根源上的架構(gòu)和方案的局限性所決定的呢?下面就讓我來現(xiàn)學(xué)現(xiàn)賣,分析討論一下。
目標(biāo)問題
Ray的目標(biāo)問題,主要是在類似增強(qiáng)學(xué)習(xí)這樣的場景中所遇到的工程問題。那么增強(qiáng)學(xué)習(xí)的場景和普通的機(jī)器學(xué)習(xí),深度學(xué)習(xí)的場景又有什么不同呢?簡單來說,就是對(duì)整個(gè)處理鏈路流程的時(shí)效性和靈活性有更高的要求。
增強(qiáng)學(xué)習(xí)的場景,按照原理定義,因?yàn)闆]有預(yù)先可用的靜態(tài)標(biāo)簽信息,所以通常需要引入實(shí)際的目標(biāo)系統(tǒng)(為了加快訓(xùn)練,往往是目標(biāo)系統(tǒng)的模擬環(huán)境)來獲取反饋信息,用做損失/收益判斷,進(jìn)而完成整個(gè)訓(xùn)練過程的閉環(huán)反饋。典型的步驟是通過觀察特定目標(biāo)系統(tǒng)的狀態(tài),收集反饋信息,判斷收益,用這些信息來調(diào)整參數(shù),訓(xùn)練模型,并根據(jù)新的訓(xùn)練結(jié)果產(chǎn)出可用于調(diào)整目標(biāo)系統(tǒng)的行為Action,輸出到目標(biāo)系統(tǒng),進(jìn)而影響目標(biāo)系統(tǒng)狀態(tài)變化,完成閉環(huán),如此反復(fù)迭代,最終目標(biāo)是追求某種收益的最大化(比如對(duì)AlphoGo來說,收益是贏得一盤圍棋的比賽)。

在這個(gè)過程中,一方面,模擬目標(biāo)系統(tǒng),收集狀態(tài)和反饋信息,判斷收益,訓(xùn)練參數(shù),生成Action等等行為可能涉及大量的任務(wù)和計(jì)算(為了選擇最佳Action,可能要并發(fā)模擬眾多可能的行為)。而這些行為本身可能也是千差萬別的異構(gòu)的任務(wù),任務(wù)執(zhí)行的時(shí)間也可能長短不一,執(zhí)行過程有些可能要求同步,也有些可能更適合異步。
另一方面,整個(gè)任務(wù)流程的DAG圖也可能是動(dòng)態(tài)變化的,系統(tǒng)往往可能需要根據(jù)前一個(gè)環(huán)節(jié)的結(jié)果,調(diào)整下一個(gè)環(huán)節(jié)的行為參數(shù)或者流程。這種調(diào)整,可能是目標(biāo)系統(tǒng)的需要(比如在自動(dòng)駕駛過程中遇到行人了,那么我們可能需要模擬計(jì)算剎車的距離來判斷該采取的行動(dòng)是剎車還是拐彎,而平時(shí)可能不需要這個(gè)環(huán)節(jié)),也可能是增強(qiáng)學(xué)習(xí)特定訓(xùn)練算法的需要(比如根據(jù)多個(gè)并行訓(xùn)練的模型的當(dāng)前收益,調(diào)整模型超參數(shù),替換模型等等)。
此外,由于所涉及到的目標(biāo)系統(tǒng)可能是具體的,現(xiàn)實(shí)物理世界中的系統(tǒng),所以對(duì)時(shí)效性也可能是有強(qiáng)要求的。舉個(gè)例子,比如你想要實(shí)現(xiàn)的系統(tǒng)是用來控制機(jī)器人行走,或者是用來打視頻游戲的。那么整個(gè)閉環(huán)反饋流程就需要在特定的時(shí)間限制內(nèi)完成(比如毫秒級(jí)別)。
總結(jié)來說,就是增強(qiáng)學(xué)習(xí)的場景,對(duì)分布式計(jì)算框架的任務(wù)調(diào)度延遲,吞吐量和動(dòng)態(tài)修改DAG圖的能力都可能有很高的要求。按照官方的設(shè)計(jì)目標(biāo),Ray需要支持異構(gòu)計(jì)算任務(wù),動(dòng)態(tài)計(jì)算鏈路,毫秒級(jí)別延遲和每秒調(diào)度百萬級(jí)別任務(wù)的能力。
那么現(xiàn)有的框架,真的不能滿足這種場景的需求么?
從上面提到的目標(biāo)問題來看,需求在對(duì)應(yīng)的場景下是真需求應(yīng)該問題不大。但現(xiàn)有的框架,通過適當(dāng)?shù)母倪M(jìn),真的滿足不了這些需求么?以下我們主要結(jié)合Spark/Flink/TensorFlow等常見的當(dāng)紅分布式計(jì)算框架來討論一下這些問題,也算是為Ray的方案選擇再做一下理論和背景鋪墊。
海量任務(wù)調(diào)度能力
首先來看每秒百萬級(jí)別任務(wù)調(diào)度的能力,上述三種計(jì)算框架,基本采用的是中心集中式的任務(wù)調(diào)度機(jī)制,在作業(yè)任務(wù)調(diào)度的角色集中在單個(gè)中心節(jié)點(diǎn)的情況下,單個(gè)作業(yè)要實(shí)現(xiàn)每秒百萬級(jí)別任務(wù)的調(diào)度能力,的確是不太現(xiàn)實(shí)的。
但問題未必只有一種解決方式,如果不能實(shí)現(xiàn)百萬級(jí)別的任務(wù)調(diào)度能力,那么想辦法降低需要調(diào)度的任務(wù)的數(shù)量就好了。比如像Flink這類以流式計(jì)算模型為基礎(chǔ)的框架,往往可以將任務(wù)的調(diào)度問題轉(zhuǎn)化為數(shù)據(jù)和指令的傳輸問題。所以在很多場景下,換一種處理模型,在同樣的場景下,可能就不需要發(fā)起大量的任務(wù)調(diào)度了。
而對(duì)于Spark這樣的批處理型框架來說,要提高任務(wù)調(diào)度的吞吐率,也可以有多種改進(jìn)方案。比如通過任務(wù)樹拆分,批量統(tǒng)一預(yù)調(diào)度等來分散調(diào)度負(fù)載,提高調(diào)度效率。當(dāng)然這些框架如果要引入類似的改進(jìn),從工程實(shí)現(xiàn)的角度來看,需要一定的時(shí)間。但是本質(zhì)上來說,是完全可以實(shí)現(xiàn)的,和他們的整體框架的核心思想本身并沒有絕對(duì)的沖突。而Spark 2.3的版本中引入的continuous computation的概念,實(shí)際上也是用另一種方式,引入預(yù)調(diào)度的思想,通過建立long run的任務(wù),簡化多個(gè)Epoc批次的任務(wù)的調(diào)度流程,客觀上減少了需要調(diào)度的任務(wù)的數(shù)量,同時(shí),更重要的是對(duì)端到端的計(jì)算延遲也有顯著改進(jìn)。
另外,比如各種改進(jìn)的PS參數(shù)服務(wù)器模型,引入各種PS端UDF或存儲(chǔ)過程的實(shí)現(xiàn),實(shí)際上從某種意義上來說,也是將一些子任務(wù)派發(fā)到PS節(jié)點(diǎn)去執(zhí)行,減少了由于框架限制帶來的數(shù)據(jù)和任務(wù)分發(fā)問題,客觀上也能降低需要中心節(jié)點(diǎn)統(tǒng)一規(guī)劃處理的任務(wù)的數(shù)量和負(fù)載。
毫秒級(jí)別的延遲
延遲其實(shí)有兩個(gè)概念,一個(gè)是任務(wù)調(diào)度自身的延遲,另一個(gè)是整體數(shù)據(jù)計(jì)算的延遲。當(dāng)然,前者是后者的基礎(chǔ),對(duì)后者是強(qiáng)相關(guān)影響的。對(duì)于基于流式計(jì)算模型的框架來說,當(dāng)任務(wù)拓?fù)溥壿嫶_定以后,通常不涉及更多的后續(xù)任務(wù)調(diào)度,所以整體的計(jì)算延遲,從流程上來說,只受數(shù)據(jù)流轉(zhuǎn)效率的影響。在排除了計(jì)算的代價(jià)開銷后,基本上就取決于數(shù)據(jù)傳輸批次Buffer的長度選擇。追求吞吐率就使用較大的buffer來緩沖數(shù)據(jù),追求端到端延遲則采用較小的buffer加快流轉(zhuǎn),比如Flink就允許用戶定義相關(guān)參數(shù)。而此外,如前所述,Spark等框架也引入了連續(xù)計(jì)算模型來規(guī)避流式計(jì)算場景下的超短周期任務(wù)調(diào)度問題,從而改進(jìn)端到端的計(jì)算延遲
異構(gòu)任務(wù)的支持
這里說的異構(gòu)任務(wù),不光指的是不同類型的任務(wù),還包括同類型的一批任務(wù),由于環(huán)境,數(shù)據(jù),參數(shù)的差異和變化,可能導(dǎo)致的計(jì)算時(shí)間上的傾斜問題。不管什么原因,概括起來就是在High level的一次訓(xùn)練迭代過程中,不同的任務(wù)的執(zhí)行時(shí)間可能有很大的差異。
那么不同的任務(wù),執(zhí)行時(shí)間有差異,會(huì)給分布式計(jì)算框架的設(shè)計(jì)帶來什么影響或者要求嗎?
這里,主要的問題是,當(dāng)前多數(shù)系統(tǒng)的調(diào)度邏輯,容錯(cuò)策略和執(zhí)行性能是與同一批次的任務(wù)執(zhí)行時(shí)間接近的假設(shè)相關(guān)的。比如分批次的任務(wù)調(diào)度,各批次之間的同步可能受最慢的Lagger任務(wù)的影響,資源的分配,慢任務(wù)的處理策略,往往也基于同批次任務(wù)執(zhí)行時(shí)間應(yīng)該接近的前提假設(shè)來設(shè)計(jì),當(dāng)這個(gè)前提假設(shè)不成立的時(shí)候,基本的流程策略的合理性和性能的好壞也就可能會(huì)受到比較大的影響。
如果是流式計(jì)算框架,鏈路上各計(jì)算步驟環(huán)節(jié)的算子,往往沒有明確的任務(wù)劃分的概念,所以從模型上來說,其實(shí)不太涉及異構(gòu)任務(wù)的問題。當(dāng)然,從數(shù)據(jù)處理的業(yè)務(wù)流程上來說,流式計(jì)算框架也還是有批次和同步的概念的,比如快照,window,跨流join等環(huán)節(jié),就往往需要在各個(gè)算子乃至同一個(gè)算子的多個(gè)處理節(jié)點(diǎn)之間,達(dá)成一定的同步。但這種同步在具體框架的實(shí)現(xiàn)中,未必是通過時(shí)間域上的所有算子的絕對(duì)同步來實(shí)現(xiàn),也未必涉及框架的調(diào)度性能等問題,所以和Ray的目標(biāo)問題的定義還是有一些區(qū)別的,但這就要具體問題具體分析了。
任務(wù)拓?fù)鋱D動(dòng)態(tài)修改的能力
什么是動(dòng)態(tài)修改任務(wù)拓?fù)鋱D的能力呢?其實(shí)簡單來說,就是你在編寫程序,或者計(jì)算框架在開始啟動(dòng)任務(wù)時(shí),并不能完全確定具體的任務(wù)執(zhí)行流程,需要根據(jù)程序執(zhí)行的中間結(jié)果來判斷任務(wù)流程或者調(diào)整任務(wù)的拓?fù)浣Y(jié)構(gòu)。
所以,對(duì)于預(yù)先定義好計(jì)算邏輯,然后手動(dòng)定義任務(wù)拓?fù)溥壿嫽蛘哂煽蚣軆?yōu)化執(zhí)行計(jì)劃并自動(dòng)生成調(diào)度或任務(wù)拓?fù)溥壿嫷南到y(tǒng)來說,不論Spark,F(xiàn)link,storm還是Tensorflow,在任務(wù)執(zhí)行的過程中,當(dāng)任務(wù)拓?fù)溥壿嬕呀?jīng)生成的情況下,確實(shí)都不太具備嚴(yán)格意義上的動(dòng)態(tài)修改任務(wù)拓?fù)鋱D的能力。
但是不是一旦使用這些計(jì)算框架,就完全不能根據(jù)中途的計(jì)算結(jié)果和狀態(tài)變化來調(diào)整程序的執(zhí)行邏輯了呢?顯然不是。
對(duì)于批處理型的任務(wù),你完全可以根據(jù)上一步的結(jié)算結(jié)果,走不同的流程,選擇生成下一步邏輯所需的任務(wù)拓?fù)浣Y(jié)構(gòu)。比如Hive任務(wù)就可以根據(jù)前一個(gè)Stage的執(zhí)行結(jié)果,對(duì)下一步執(zhí)行計(jì)劃進(jìn)行篩選,是執(zhí)行本地Map Join還是走M(jìn)R任務(wù)做Shuffle等。當(dāng)然,這種修改拓?fù)鋱D的能力是在更粗的粒度意義上的,有一定的局限性。
而對(duì)于流式計(jì)算框架,你也可以通過傳輸指令,或者狀態(tài)變化觸發(fā)的方式,在原有的任務(wù)拓?fù)溥壿嫿Y(jié)構(gòu)內(nèi)調(diào)整程序的執(zhí)行邏輯。如果你能預(yù)先把所有可能的任務(wù)執(zhí)行邏輯都部署好,通過指令或狀態(tài)選擇執(zhí)行具體的邏輯路徑,一定程度上也能滿足動(dòng)態(tài)修改任務(wù)流程的需要。當(dāng)然,這對(duì)拓?fù)溥壿嬕?guī)劃和具體編程也提出了更高的要求。
把上述問題結(jié)合起來完整的來看
如前所訴,上述問題需求如果分開來看,現(xiàn)有的框架或多或少都能通過這樣或那樣的方式在一定程度上滿足需求,又或者可以通過別的手段來規(guī)避問題,迂回解決。但是在一個(gè)場景下,同時(shí)滿足所有的需求,相對(duì)來說就比較困難了。
這其中,海量任務(wù)的調(diào)度能力和毫秒級(jí)別的延遲這兩個(gè)需求的組合同時(shí)滿足是難度之一,現(xiàn)有的框架往往很難同時(shí)兼顧,或者只能在特定的約束條件下部分滿足。而對(duì)于異構(gòu)任務(wù)的處理和任務(wù)拓?fù)鋱D的動(dòng)態(tài)修改能力這兩點(diǎn)要求,從靈活性和性能考量方面來看,現(xiàn)有的框架也有很大的局限性,具體實(shí)現(xiàn)時(shí),往往需要用戶在應(yīng)用邏輯層面自行規(guī)劃,實(shí)現(xiàn)代價(jià)也可能比較高。
總之,好比CAP理論,這些問題要妥善的解決,至少現(xiàn)階段,并沒有面面俱到的完美方案。當(dāng)前已有的方案,實(shí)際上也沒有對(duì)錯(cuò)之分,只是各種計(jì)算框架的側(cè)重點(diǎn)和取舍不同,那么Ray是如何進(jìn)行取舍的呢。
Ray的基本架構(gòu)設(shè)計(jì)思路
從任務(wù)調(diào)度的吞吐率和響應(yīng)速度這兩方面需求的角度來說,Ray的方案就是分而治之,概括來說,Ray沒有采用中心任務(wù)調(diào)度的方案,而是采用了類似層級(jí)(hierarchy)調(diào)度的方案,除了一個(gè)全局的中心調(diào)度服務(wù)節(jié)點(diǎn)(實(shí)際上這個(gè)中心調(diào)度節(jié)點(diǎn)也是可以水平拓展的),任務(wù)的調(diào)度也可以在具體的執(zhí)行任務(wù)的工作節(jié)點(diǎn)上,由本地調(diào)度服務(wù)來管理和執(zhí)行。
與傳統(tǒng)的層級(jí)調(diào)度方案,至上而下分配調(diào)度任務(wù)的方式不同的是,Ray采用了至下而上的調(diào)度策略。也就是說,任務(wù)調(diào)度的發(fā)起,并不是先提交給全局的中心調(diào)度器統(tǒng)籌規(guī)劃以后再分發(fā)給次級(jí)調(diào)度器的。而是由任務(wù)執(zhí)行節(jié)點(diǎn)直接提交給本地的調(diào)度器,本地的調(diào)度器如果能滿足該任務(wù)的調(diào)度需求就直接完成調(diào)度請(qǐng)求,在無法滿足的情況下,才會(huì)提交給全局調(diào)度器,由全局調(diào)度器協(xié)調(diào)轉(zhuǎn)發(fā)給有能力滿足需求的另外一個(gè)節(jié)點(diǎn)上的本地調(diào)度器去調(diào)度執(zhí)行。

這么做的好處,一方面減少了跨節(jié)點(diǎn)的RPC開銷,另一方面也能規(guī)避中心節(jié)點(diǎn)的瓶頸問題。當(dāng)然缺點(diǎn)也不是沒有,由于缺乏全局的任務(wù)視圖,無法進(jìn)行全局規(guī)劃,因此任務(wù)的拓?fù)溥壿嫿Y(jié)構(gòu)也就未必是最優(yōu)的了。
從支持動(dòng)態(tài)任務(wù)拓?fù)鋱D和異構(gòu)任務(wù)的角度來說,Ray的思路基本就是別在編程模型上做太多假定和約束限制,怎么靈活怎么來。問題是這要如何實(shí)現(xiàn)呢?
如果在單機(jī)上,其實(shí)后面兩點(diǎn)要求很簡單。所謂的動(dòng)態(tài)拓?fù)溥壿?,就是各種程序執(zhí)行分支唄,各種函數(shù)調(diào)用和程序判斷邏輯,天然就是根據(jù)當(dāng)前程序的狀態(tài)選擇后續(xù)的執(zhí)行路徑,至于異構(gòu)任務(wù),不外乎就是不同路徑觸發(fā)不同的函數(shù)而已,如果需要并行處理,那么引入多線程,異步調(diào)用等等機(jī)制就好了,對(duì)于單機(jī)程序來說,這些都是再普通不過的標(biāo)準(zhǔn)實(shí)踐,靈活性顯然不是問題。
而Ray的基本設(shè)計(jì)思想,就是在分布式計(jì)算的環(huán)境下,實(shí)現(xiàn)類似單機(jī)執(zhí)行程序的能力。讓用戶能在函數(shù)級(jí)別隨意調(diào)用而不用操心這個(gè)函數(shù)具體執(zhí)行的位置,不論從調(diào)用者的角度還是被調(diào)用者的角度,結(jié)合嵌套調(diào)用和本地任務(wù)調(diào)度的能力,整體上的執(zhí)行流程也就不存在需要預(yù)先在中心節(jié)點(diǎn)進(jìn)行規(guī)劃部署的問題。
所以Ray的理想的實(shí)現(xiàn),相當(dāng)于把單機(jī)程序的執(zhí)行能力在不做大的編程模型改造的前提下,適配到分布式計(jì)算的多節(jié)點(diǎn)環(huán)境中。如果能做到這一點(diǎn),顯然前面提到的任務(wù)拓?fù)涞膭?dòng)態(tài)性和靈活性問題也就不是問題了。
Ray的具體實(shí)現(xiàn)

要實(shí)現(xiàn)上述思想,從工程的角度來說,有幾個(gè)重要的問題需要解決。
首先來看至下而上分而治之的層級(jí)調(diào)度的實(shí)現(xiàn)問題。
本地調(diào)度器要能發(fā)揮最大的作用,就需要盡量減少任務(wù)通過全局調(diào)度器中轉(zhuǎn)的必要性,因此本地調(diào)度器需要具備盡可能完備的獲取系統(tǒng)全局信息的能力。此外,在分布式環(huán)境下,為了增強(qiáng)系統(tǒng)的魯棒性,工作節(jié)點(diǎn)崩潰以后,該節(jié)點(diǎn)上本地調(diào)度器所管理任務(wù)的遷移能力,顯然也是必備的。再有,對(duì)于全局調(diào)度器來說,也要具備HA的能力,而全局調(diào)度器的水平拓展能力則是進(jìn)一步拓展任務(wù)調(diào)度吞吐率的基本要求。
為了應(yīng)對(duì)這些需求,Ray將任務(wù)調(diào)度的執(zhí)行邏輯和任務(wù)調(diào)度的狀態(tài)信息進(jìn)行了分離處理。通過全局的狀態(tài)存儲(chǔ)服務(wù)(Global Control State GCS)來存儲(chǔ)和管理各類任務(wù)控制和狀態(tài)信息,包括任務(wù)拓?fù)浣Y(jié)構(gòu)信息,數(shù)據(jù)和任務(wù)的生產(chǎn)關(guān)系信息,函數(shù)(任務(wù))之間的調(diào)用關(guān)系拓?fù)浣Y(jié)構(gòu)信息等等。將這些狀態(tài)信息剝離出來統(tǒng)一管理以后,調(diào)度器本身就成為了一個(gè)無狀態(tài)的服務(wù),因此也就具備了實(shí)現(xiàn)前面所說的任務(wù)遷移,擴(kuò)展和信息共享的能力。
此外為了能和各種需要維護(hù)狀態(tài)的任務(wù)進(jìn)行交互,比如所模擬的目標(biāo)系統(tǒng)的狀態(tài)變遷,以及其它各種第三方有狀態(tài)任務(wù)或接口邏輯的封裝(比如通過TensorFlow訓(xùn)練一個(gè)神經(jīng)網(wǎng)絡(luò)模型的任務(wù),這些第三方系統(tǒng)可能無法將內(nèi)部狀態(tài)信息暴露出來交給Ray來管理),Ray也定義了名為Actor的抽象封裝。在Ray中,Actor是一種有狀態(tài)的任務(wù),通過暴露特定的方法接口供外部進(jìn)行遠(yuǎn)程調(diào)用。而對(duì)于Actor的調(diào)用歷史,也可以轉(zhuǎn)化成一種自依賴關(guān)系拓?fù)鋱D,保存在GCS中。從而將促成Actor內(nèi)部狀態(tài)變遷的調(diào)用過程也通過任務(wù)圖的方式記錄了下來,從而系統(tǒng)也就具備了Actor狀態(tài)重建的能力。
其次,是實(shí)現(xiàn)函數(shù)能在任何節(jié)點(diǎn)上進(jìn)行遠(yuǎn)程調(diào)用的問題。
Ray讓用戶通過顯示的定義,比如@ray.remote的裝飾器的形式來告知系統(tǒng)需要允許遠(yuǎn)程調(diào)用的函數(shù)。當(dāng)一個(gè)遠(yuǎn)程調(diào)用函數(shù)被定義以后,它就會(huì)被推送到所有的工作節(jié)點(diǎn)上,已備后續(xù)調(diào)用。相關(guān)的函數(shù)代碼也會(huì)被存儲(chǔ)到GCS中。這樣后續(xù)的任務(wù)調(diào)度,容錯(cuò)恢復(fù)等過程都能夠更簡單的實(shí)現(xiàn),
@ray.remote
def f(x):
return x + 1
不過由于遠(yuǎn)程函數(shù)是在定義以后就立刻被推送到工作節(jié)點(diǎn)上去的,所以在遠(yuǎn)程函數(shù)中并不能引用后面的代碼中所定義的函數(shù)/變量(大概是因?yàn)樾枰獙?duì)閉包進(jìn)行序列化的原因,而在運(yùn)行時(shí),無法執(zhí)行編譯時(shí)傳統(tǒng)的二次掃描過程,只能或許截止目前已有的信息),這個(gè)問題,個(gè)人感覺對(duì)于寫代碼的同學(xué)來說應(yīng)該是個(gè)很大的限制,不排除后續(xù)可以有更好的解決辦法。
要使函數(shù)能在任意節(jié)點(diǎn)上遠(yuǎn)程執(zhí)行,代碼分發(fā)部署的問題解決以后,剩下的就是數(shù)據(jù)讀寫的問題。被遠(yuǎn)程調(diào)用的函數(shù)顯然需要能夠獲取它所需要處理的數(shù)據(jù)。與代碼的全局分發(fā)部署不同,數(shù)據(jù)顯然無法也不適合提前在所有節(jié)點(diǎn)上都同步一份。Ray是通過在GCS中保存一份全局?jǐn)?shù)據(jù)對(duì)象列表的方式,來管理各個(gè)工作節(jié)點(diǎn)上的本地?cái)?shù)據(jù)。如果一個(gè)函數(shù)需要處理的數(shù)據(jù)對(duì)象不在工作節(jié)點(diǎn)本地,那么該工作節(jié)點(diǎn)上的對(duì)象存儲(chǔ)服務(wù)(object Store)就會(huì)去GCS尋找該對(duì)象所在的節(jié)點(diǎn)映射信息,然后從對(duì)應(yīng)節(jié)點(diǎn)的Object Store中拷貝一份數(shù)據(jù)到本地供函數(shù)執(zhí)行時(shí)所用。而函數(shù)產(chǎn)出的數(shù)據(jù)對(duì)象也會(huì)由本地Object Store管理和保存,并將相關(guān)映射信息登記到GCS中供后續(xù)函數(shù)調(diào)用查詢。
這么做,整體上看起來是把數(shù)據(jù)往代碼處移動(dòng),和現(xiàn)代大數(shù)據(jù)環(huán)境下,典型的代碼往數(shù)據(jù)移動(dòng)的思想正好相反,貌似又走回到更早期的網(wǎng)格計(jì)算的舊路上去了。但實(shí)際計(jì)算過程中,如果上下游相關(guān)的遠(yuǎn)程函數(shù)調(diào)用最終被本地調(diào)度器調(diào)度到同一個(gè)工作節(jié)點(diǎn)上執(zhí)行的話,數(shù)據(jù)實(shí)際上是在本地節(jié)點(diǎn)的。由于Object Store實(shí)現(xiàn)了數(shù)據(jù)零拷貝的本地共享能力,所以在任務(wù)調(diào)度合理的情況下,這種方案實(shí)際產(chǎn)生的數(shù)據(jù)拷貝動(dòng)作的代價(jià)可能未必很高。加上高速網(wǎng)絡(luò)的應(yīng)用推廣,數(shù)據(jù)拷貝成為瓶頸的可能性也大大降低,當(dāng)然,對(duì)計(jì)算流程的latency還是有一些影響的。此外,因?yàn)閷?shí)際的數(shù)據(jù)拷貝是在object Store之間直接點(diǎn)對(duì)點(diǎn)進(jìn)行的,所以也不存在數(shù)據(jù)中轉(zhuǎn)瓶頸的擔(dān)憂。最后,其實(shí)這種數(shù)據(jù)向代碼移動(dòng)的設(shè)計(jì),我理解和后面要解決的另一個(gè)問題:異構(gòu)任務(wù)和任務(wù)執(zhí)行時(shí)間傾斜問題的解決方案也是相關(guān)的。所以它其實(shí)是各種因素綜合考慮以后取舍的結(jié)果。
最后來看一下計(jì)算框架流程上另一個(gè)重要的問題,就是對(duì)于異構(gòu)任務(wù)和任務(wù)執(zhí)行時(shí)間可能存在傾斜的問題處理。
Ray對(duì)于這個(gè)問題的解決方案是全面引入Future的概念,任務(wù)的執(zhí)行不僅僅是Lazy的,結(jié)果數(shù)據(jù)的處理更是異步的。遠(yuǎn)程函數(shù)的調(diào)用,會(huì)立即返回一個(gè)結(jié)果數(shù)據(jù)的Future對(duì)象,這個(gè)Future對(duì)象可以進(jìn)一步傳遞給下一個(gè)遠(yuǎn)程函數(shù)調(diào)用,當(dāng)真正需要讀取數(shù)據(jù)的時(shí)候,才會(huì)Block等待數(shù)據(jù)的真正計(jì)算完成。
這么做的結(jié)果,自然就是無法(或者不適合)提前根據(jù)數(shù)據(jù)的位置來確定代碼執(zhí)行的位置,因此,客觀的也就導(dǎo)致了數(shù)據(jù)往代碼移動(dòng),而不是代碼往數(shù)據(jù)移動(dòng)的計(jì)算模型。

那么使用Future的好處是什么呢?一方面當(dāng)然是為了盡可能提升并行效率。流程上執(zhí)行快慢進(jìn)度不一的任務(wù)之間,也不需要互相等待,降低各個(gè)任務(wù)之間非必要的進(jìn)度同步的代價(jià)。另一方面,在分布式任務(wù)執(zhí)行場景下,具體的算法策略也可以根據(jù)部分任務(wù)的執(zhí)行結(jié)果來提前結(jié)束一次迭代或著調(diào)整計(jì)算流程,進(jìn)而提高程序整體流轉(zhuǎn)效率。比如,多個(gè)子模型同時(shí)訓(xùn)練的時(shí)候,根據(jù)最先完成的部分模型的結(jié)果來決策下一步的行動(dòng),使用部分計(jì)算結(jié)果先行調(diào)整模型參數(shù)等等。
這種打破全局批量同步(BSP)模型的應(yīng)用場景在其它一些機(jī)器學(xué)習(xí)計(jì)算框架的實(shí)現(xiàn)中也有類似的例子,比如騰訊的Angel參數(shù)服務(wù)器所支持的SSP,ASP等處理模型。不過Angel提供的是框架自身定義好的,固定的同步邏輯實(shí)現(xiàn),而Ray的核心框架層,則是通過Future和wait原語的方式將基礎(chǔ)的語義暴露給用戶,讓用戶自己來構(gòu)建實(shí)現(xiàn)所需要的模型邏輯,相對(duì)來說更加靈活一些,當(dāng)然,具體場景的實(shí)現(xiàn)代價(jià)也就稍微更高一些了。
和當(dāng)前主流熱門計(jì)算框架整體上再比較一下
分布式計(jì)算模型的發(fā)展歷程就是一個(gè)在易用性,靈活性和效率性能之間進(jìn)行平衡的過程。早期,MapReduce模型通過極度簡化編程模型,大大降低了分布式編程的難度,但是為了提高編程效率,提供更加靈活的應(yīng)用模型,社區(qū)在上層又開發(fā)了Hive/Pig等業(yè)務(wù)語義更加豐富的模型,但限于底層MR模型的約束,在性能上就受到了一定的限制,因此反過來,又促成了比如Tez等模型的發(fā)展,出現(xiàn)了Hive on Tez之類的實(shí)現(xiàn)。
Spark/Flink/TensorFlow等框架,從各自不同的角度重新定義或著放寬了編程模型的約束,增加了系統(tǒng)的靈活性,但本身核心的編程模型的使用和實(shí)現(xiàn)難度也就更高,開發(fā)過程中,需要開發(fā)人員對(duì)穩(wěn)定性,性能進(jìn)行的考慮也往往更多,這就要求這些框架的上層API可以通過封裝和抽象,進(jìn)一步簡化和降低開發(fā)代價(jià),又或者需要應(yīng)用開發(fā)人員投入更多的精力和知識(shí)儲(chǔ)備去針對(duì)性的解決問題。
Ray的整體設(shè)計(jì)思想,僅從核心框架的角度來對(duì)比的話,可以看到最突出的優(yōu)勢還是動(dòng)態(tài)構(gòu)建任務(wù)拓?fù)溥壿媹D的能力。因此更加適合一些任務(wù)流程復(fù)雜,需要按需調(diào)整的場景,典型的也就是前文提到的增強(qiáng)學(xué)習(xí)的場景。而其它的計(jì)算框架由于整體編程模型約束相對(duì)更強(qiáng),所以要實(shí)現(xiàn)復(fù)雜的流程場景會(huì)更加困難一些,比如需要自行定制一些分布式的處理邏輯,對(duì)一些流程進(jìn)行粘合等。所以,個(gè)人認(rèn)為,Ray的成敗關(guān)鍵,就在于在這類應(yīng)用場景下,能替用戶降低多少開發(fā)成本,能讓用戶多大程度上在保留靈活性的同時(shí),減少開發(fā)和維護(hù)的成本。簡單來說,易用性的好壞或許決定了Ray最終的價(jià)值。
比如遠(yuǎn)程函數(shù)一方面給了用戶靈活切割代碼邏輯,便于復(fù)用邏輯的好處。另一方面,它也需要用戶明確的定義函數(shù)的邊界,要求用戶能夠清晰的理順自己程序的分布式運(yùn)行邏輯,如何切分代碼邏輯,不同的切分方案對(duì)性能是否有影響,函數(shù)之間的并發(fā)如何設(shè)定,如何交互,哪些邏輯適合用無狀態(tài)的模式實(shí)現(xiàn),哪些需要構(gòu)建有狀態(tài)的Actor等等,都需要用戶自己來考慮,隨之而來的問題就是應(yīng)用構(gòu)建的難度可能更高。如果要降低開發(fā)代價(jià),就需要更智能的解決這個(gè)問題,需要上層API層/應(yīng)用層提供不同層次的抽象,來降低特定場景的應(yīng)用構(gòu)建門檻。
用同類系統(tǒng)做個(gè)類比,比如,Spark的核心編程模型是RDD,它的基本思想就是構(gòu)建RDD對(duì)象,然后按RDD間的依賴關(guān)系切分作業(yè)任務(wù),遞推執(zhí)行,這個(gè)思想本身很聰明,但如果Spark只是單純提供這個(gè)核心思想的基本實(shí)現(xiàn)框架的話,顯然不可能成為一個(gè)熱門的計(jì)算框架,因?yàn)橛脩舻膶W(xué)習(xí)成本和開發(fā)成本太高了。所以,Spark在上層提供了各類基本算子,抽象簡化了常見計(jì)算流程的開發(fā)模式,讓用戶一定程度上無需太多關(guān)心RDD的概念細(xì)節(jié),而是關(guān)注在算子邏輯的串聯(lián)和應(yīng)用上。但僅僅如此還不夠,RDD為編程接口的模型對(duì)于多數(shù)用戶來說還是門檻太高,所以Spark進(jìn)一步抽象封裝了包括Stream/SQL/Graph/ML/DataFrame/Dataset在內(nèi)的各個(gè)層級(jí)的高層語義或算法邏輯來進(jìn)一步降低開發(fā)成本,應(yīng)該說,這些上層API的完善,才是Spark計(jì)算框架能夠更好的推廣應(yīng)用的關(guān)鍵所在。
Flink和TensorFlow也不例外,核心的編程模型(TensowFlow的核心思想再簡單不過了,數(shù)據(jù)用Tensor表達(dá),在節(jié)點(diǎn)間傳遞Tensor,具體算子適配和屏蔽底層硬件細(xì)節(jié))本身并不是決定他們成敗的關(guān)鍵,有太多的類似思想的系統(tǒng)實(shí)現(xiàn),關(guān)鍵的是至下而上整體各個(gè)層面的具體實(shí)現(xiàn)的好壞,完整性和易用性,包括業(yè)務(wù)表達(dá)層的設(shè)計(jì),決定了大量同類思想系統(tǒng)的最終命運(yùn)。
現(xiàn)狀
Ray從一年多前發(fā)布到現(xiàn)在,現(xiàn)狀是API層以上的部分還比較薄弱,Core模塊核心邏輯估計(jì)也需要時(shí)間打磨。這僅從項(xiàng)目的代碼量大致就可以看出來了,目標(biāo)如此宏偉的系統(tǒng),主要模塊目前一共也就兩百多個(gè)python文件和不到一百個(gè)C++文件。
國內(nèi)目前除了螞蟻金服和RISELab有針對(duì)性的合作以外(據(jù)說是投入了不少資源的重點(diǎn)項(xiàng)目),關(guān)注程度還很低,更沒有什么實(shí)際的應(yīng)用實(shí)例看到,整體來說還處于比較早期的框架構(gòu)建階段。
當(dāng)然,Ray在核心Core模塊以上,也開始構(gòu)建類似Ray RLLib這樣的針對(duì)增強(qiáng)學(xué)習(xí)訓(xùn)練算法的上層Library。不過目前看來這些library也是非常基本的概念實(shí)現(xiàn),代碼量都不大。當(dāng)然,也有可能是Core模塊足夠強(qiáng)大,上層算法策略并不需要寫太多代碼。不過,不管怎么說,這塊顯然也是處于早期階段,需要實(shí)踐檢驗(yàn)和打磨,畢竟,能用和好用,中間還有很長的路。類比Spark中圖計(jì)算框架的實(shí)現(xiàn),用于實(shí)現(xiàn)pregel的幾行代碼顯然和后面的graphx沒法同日而語。
至于其它的ML/SQL/Stream/Graph之類的實(shí)現(xiàn),暫時(shí)沒有看到,理論上Ray目標(biāo)定位的“靈活的”編程模型,也是可以用來實(shí)現(xiàn)這些更高層的編程語義模型的。但實(shí)際上,目前現(xiàn)狀一方面的原因可能是為時(shí)尚早,Ray還沒有來得及拓展到這些領(lǐng)域,另一方面,相對(duì)于其它計(jì)算框架,Ray在這些領(lǐng)域可能也未必有優(yōu)勢。相反的由于Ray的分層調(diào)度模型和數(shù)據(jù)向代碼移動(dòng)的計(jì)算模型所帶來的全局任務(wù)的優(yōu)化難度,在任務(wù)拓?fù)溥壿媹D相對(duì)固定的場景下,Ray的整體計(jì)算性能效率很可能長遠(yuǎn)來說,也并不如當(dāng)前這些主流的計(jì)算框架。
所以Ray能否成長成為一個(gè)足夠通用的計(jì)算框架,目前我覺得還無法判斷,但如果你需要標(biāo)準(zhǔn)化,模式化的解決大量類似增強(qiáng)學(xué)習(xí)的這種流程復(fù)雜的大規(guī)模分布式計(jì)算問題,那么Ray至少是一種有益的補(bǔ)充,可能值得關(guān)注一下,將它和TensorFlow等框架進(jìn)行局部的結(jié)合,讓Ray來關(guān)注和整合計(jì)算處理流程,讓其它系統(tǒng)解決各自擅長的問題,可能也是短期內(nèi)可行的應(yīng)用方式,Ray自己目前貌似也是朝著這個(gè)所謂的混合計(jì)算的方向前進(jìn)的。
相關(guān)文檔
- 項(xiàng)目文檔主頁:http://ray.readthedocs.io/en/latest/index.html
- Ray 論文,Ray: A Distributed Framework for Emerging AI Applications (https://arxiv.org/pdf/1712.05889.pdf)
- Ray RLLib 論文:https://arxiv.org/abs/1712.09381
常按掃描下面的二維碼,關(guān)注“大數(shù)據(jù)務(wù)虛雜談”,務(wù)虛,我是認(rèn)真的 ;)