本文將介紹 Airflow 這一款優(yōu)秀的調度工具。主要包括 Airflow 的服務構成、Airflow 的 Web 界面、DAG 配置、常用配置以及 Airflow DAG Creation Manager Plugin 這一款 Airflow 插件。
一、什么是 Airflow
Airflow 是 Airbnb 開源的一個用 Python 編寫的調度工具。于 2014 年啟動,2015 年春季開源,2016 年加入 Apache 軟件基金會的孵化計劃。
Airflow 通過 DAG 也即是有向非循環(huán)圖來定義整個工作流,因而具有非常強大的表達能力。

如上圖所示,一個工作流可以用一個 DAG 來表示,在 DAG 中將完整得記錄整個工作流中每個作業(yè)之間的依賴關系、條件分支等內容,并可以記錄運行狀態(tài)。通過 DAG,我們可以精準的得到各個作業(yè)之間的依賴關系。
在進一步介紹 Airflow 之前,我想先介紹一些在 Airflow 中常見的名詞概念:
-
DAG
DAG 意為有向無循環(huán)圖,在 Airflow 中則定義了整個完整的作業(yè)。同一個 DAG 中的所有 Task 擁有相同的調度時間。
-
Task
Task 為 DAG 中具體的作業(yè)任務,它必須存在于某一個 DAG 之中。Task 在 DAG 中配置依賴關系,跨 DAG 的依賴是可行的,但是并不推薦。跨 DAG 依賴會導致 DAG 圖的直觀性降低,并給依賴管理帶來麻煩。
-
DAG Run
當一個 DAG 滿足它的調度時間,或者被外部觸發(fā)時,就會產(chǎn)生一個 DAG Run??梢岳斫鉃橛?DAG 實例化的實例。
-
Task Instance
當一個 Task 被調度啟動時,就會產(chǎn)生一個 Task Instance??梢岳斫鉃橛?Task 實例化的實例。
二、Airflow 的服務構成
一個正常運行的 Airflow 系統(tǒng)一般由以下幾個服務構成
-
WebServer
Airflow 提供了一個可視化的 Web 界面。啟動 WebServer 后,就可以在 Web 界面上查看定義好的 DAG 并監(jiān)控及改變運行狀況。也可以在 Web 界面中對一些變量進行配置。
-
Worker
一般來說我們用 Celery Worker 來執(zhí)行具體的作業(yè)。Worker 可以部署在多臺機器上,并可以分別設置接收的隊列。當接收的隊列中有作業(yè)任務時,Worker 就會接收這個作業(yè)任務,并開始執(zhí)行。Airflow 會自動在每個部署 Worker 的機器上同時部署一個 Serve Logs 服務,這樣我們就可以在 Web 界面上方便的瀏覽分散在不同機器上的作業(yè)日志了。
-
Scheduler
整個 Airflow 的調度由 Scheduler 負責發(fā)起,每隔一段時間 Scheduler 就會檢查所有定義完成的 DAG 和定義在其中的作業(yè),如果有符合運行條件的作業(yè),Scheduler 就會發(fā)起相應的作業(yè)任務以供 Worker 接收。
-
Flower
Flower 提供了一個可視化界面以監(jiān)控所有 Celery Worker 的運行狀況。這個服務并不是必要的。
三、Airflow 的 Web 界面
下面簡單介紹一下 Airflow 的 Web 操作界面,從而可以對 Airflow 有一個更直觀的了解。
1、DAG 列表

左側 On/Off 按鈕控制 DAG 的運行狀態(tài),Off 為暫停狀態(tài),On 為運行狀態(tài)。注意:所有 DAG 腳本初次部署完成時均為 Off 狀態(tài)。
若 DAG 名稱處于不可點擊狀態(tài),可能為 DAG 被刪除或未載入。若 DAG 未載入,可點擊右側刷新按鈕進行刷新。注意:由于可以部署若干 WebServer,所以單次刷新可能無法刷新所有 WebServer 緩存,可以嘗試多次刷新。
Recent Tasks 會顯示最近一次 DAG Run(可以理解為 DAG 的執(zhí)行記錄)中 Task Instances(可以理解為作業(yè)的執(zhí)行記錄)的運行狀態(tài),如果 DAG Run 的狀態(tài)為 running,此時顯示最近完成的一次以及正在運行的 DAG Run 中所有 Task Instances 的狀態(tài)。
Last Run 顯示最近一次的 execution date。注意:execution date 并不是真實執(zhí)行時間,具體細節(jié)在下文 DAG 配置中詳述。將鼠標移至 execution date 右側 info 標記上,會顯示 start date,start date 為真實運行時間。start date 一般為 execution date 所對應的下次執(zhí)行時間。
2、作業(yè)操作框
在 DAG 的樹狀圖和 DAG 圖中都可以點擊對應的 Task Instance 以彈出 Task Instance 模態(tài)框,以進行 Task Instance 的相關操作。注意:選擇的 Task Instance 為對應 DAG Run 中的 Task Instance。

在作業(yè)名字的右邊有一個漏斗符號,點擊后整個 DAG 的界面將只顯示該作業(yè)及該作業(yè)的依賴作業(yè)。當該作業(yè)所處的 DAG 較大時,此功能有較大的幫助。
Task Instance Details 顯示該 Task Instance 的詳情,可以從中得知該 Task Instance 的當前狀態(tài),以及處于當前狀態(tài)的原因。例如,若該 Task Instance 為 no status 狀態(tài),遲遲不進入 queued 及 running 狀態(tài),此時就可通過 Task Instance Details 中的 Dependency 及 Reason 得知原因。
Rendered 顯示該 Task Instance 被渲染后的命令。
Run 指令可以直接執(zhí)行當前作業(yè)。
-
Clear 指令為清除當前 Task Instance 狀態(tài),清除任意一個 Task Instance 都會使當前 DAG Run 的狀態(tài)變更為 running。注意:如果被清除的 Task Instance 的狀態(tài)為 running,則會嘗試 kill 該 Task Instance 所執(zhí)行指令,并進入 shutdown 狀態(tài),并在 kill 完成后將此次執(zhí)行標記為 failed(如果 retry 次數(shù)沒有用完,將標記為 up_for_retry)。Clear 有額外的5個選項,均為多選,這些選項從左到右依次為:
- Past: 同時清除所有過去的 DAG Run 中此 Task Instance 所對應的 Task Instance。
- Future: 同時清除所有未來的 DAG Run 中此 Task Instance 所對應的 Task Instance。注意:僅清除已生成的 DAG Run 中的 Task Instance。
- Upstream: 同時清除該 DAG Run 中所有此 Task Instance 上游的 Task Instance。
- Downstream: 同時清除該 DAG Run 中所有此 Task Instance 下游的 Task Instance。
- Recursive: 當此 Task Instance 為 sub DAG 時,循環(huán)清除所有該 sub DAG 中的 Task Instance。注意:若當此 Task Instance 不是 sub DAG 則忽略此選項。
Mark Success 指令為講當前 Task Instance 狀態(tài)標記為 success。注意:如果該 Task Instance 的狀態(tài)為 running,則會嘗試 kill 該 Task Instance 所執(zhí)行指令,并進入 shutdown 狀態(tài),并在 kill 完成后將此次執(zhí)行標記為 failed(如果 retry 次數(shù)沒有用完,將標記為 up_for_retry)。
四、DAG 配置
Airflow 中的 DAG 是由 Python 腳本來配置的,因而可擴展性非常強。Airflow 提供了一些 DAG 例子,我們可以通過一個例子來簡單得了解一下。
# -*- coding: utf-8 -*-
import airflow
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import DAG
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2)
}
dag = DAG(
dag_id='example_bash_operator', default_args=args,
schedule_interval='0 0 * * *')
cmd = 'ls -l'
run_this_last = DummyOperator(task_id='run_this_last', dag=dag)
run_this = BashOperator(
task_id='run_after_loop', bash_command='echo 1', dag=dag)
run_this.set_downstream(run_this_last)
for i in range(3):
i = str(i)
task = BashOperator(
task_id='runme_'+i,
bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
dag=dag)
task.set_downstream(run_this)
task = BashOperator(
task_id='also_run_this',
bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
dag=dag)
task.set_downstream(run_this_last)
我們可以看到,整個 DAG 的配置就是一份完整的 Python 代碼,在代碼中實例化 DAG,實例化適合的 Operator,并通過 set_downstream 等方法配置上下游依賴關系。下面我們簡單看一下在 DAG 配置中的幾個重要概念。
-
DAG
要配置一個 DAG 自然需要一個 DAG 實例。在同一個 DAG 下的所有作業(yè),都需要將它的 dag 屬性設置為這個 DAG 實例。在實例化 DAG 時,通過傳參數(shù)可以給這個 DAG 實例做一些必要的配置。
-
dag_id
給 DAG 取一個名字,方便日后維護。
-
default_args
默認參數(shù),當屬于這個 DAG 實例的作業(yè)沒有配置相應參數(shù)時,將使用 DAG 實例的 default_args 中的相應參數(shù)。
-
schedule_interval
配置 DAG 的執(zhí)行周期,語法和 crontab 的一致。
-
-
作業(yè) (Task)
Airflow 提供了很多 Operator,我們也可以自行編寫新的 Operator。在本例中使用了 2 種 Operator,DummyOperator 什么都不會做, BashOperator 則會執(zhí)行 bash_command 參數(shù)所指定的 bash 指令,并且使用 jinja2 模版引擎,對該指令進行渲染,因而在本例的 bash_command 中,可以看到一些需要渲染的變量。當 Operator 被實例化后,我們稱之為相應 DAG 的一個作業(yè)(Task)。在實例化 Operator 時,同樣可以通過穿參數(shù)進行必要的配置,值得注意的是,如果在 DAG 中有設置 default_args 而在 Operator 中沒有覆蓋相應配置,則會使用 default_args 中的配置。
-
dag
傳遞一個 DAG 實例,以使當前作業(yè)屬于相應 DAG。
-
task_id
給作業(yè)去一個名字,方便日后維護。
-
owner
作業(yè)的擁有者,方便作業(yè)維護。另外有些 Operator 會根據(jù)該參數(shù)實現(xiàn)相應的權限控制。
-
start_date
作業(yè)的開始時間,即作業(yè)將在這個時間點以后開始調度。
-
-
依賴
配置以來的方法有兩種,除了可以使用作業(yè)實例的 set_upstream 和 set_downstream 方法外,還可以使用類似
task1 << task2 << task3 task3 >> task4這樣更直觀的語法來設置。
這里我們要特別注意一個關于調度執(zhí)行時間的問題。在談這個問題前,我們先確定幾個名詞:
- start date: 在配置中,它是作業(yè)開始調度時間。而在談論執(zhí)行狀況時,它是調度開始時間。
- schedule interval: 調度執(zhí)行周期。
- execution date: 執(zhí)行時間,在 Airflow 中稱之為執(zhí)行時間,但其實它并不是真實的執(zhí)行時間。
那么現(xiàn)在,讓我們看一下當一個新配置的 DAG 生效后第一次調度會在什么時候。很多人會很自然的認為,第一次的調度時間當然是在作業(yè)中配置的 start date,但其實并不是。第一次調度時間是在作業(yè)中配置的 start date 的第二個滿足 schedule interval 的時間點,并且記錄的 execution date 為作業(yè)中配置的 start date 的第一個滿足 schedule interval 的時間點。聽起來很繞,讓我們來舉個例子。
假設我們配置了一個作業(yè)的 start date 為 2017年10月1日,配置的 schedule interval 為 **00 12 * * *** 那么第一次執(zhí)行的時間將是 2017年10月2日 12點 而此時記錄的 execution date 為 2017年10月1日 12點。因此 execution date 并不是如其字面說的表示執(zhí)行時間,真正的執(zhí)行時間是 execution date 所顯示的時間的下一個滿足 schedule interval 的時間點。
另外,當作業(yè)已經(jīng)執(zhí)行過之后,start date 的配置將不會再生效,這個作業(yè)的調度開始時間將直接按照上次調度所對應的 execution date 來計算。
這個例子只是簡要的介紹了一下 DAG 的配置,也只介紹了非常少量的配置參數(shù)。Airflow 為 DAG 和作業(yè)提供了大量的可配置參數(shù),詳情可以參考 Airflow 官方文檔。
五、常用配置
在日常工作中,有時候僅僅靠配置作業(yè)依賴和調度執(zhí)行周期并不能滿足一些復雜的需求。接下來將介紹一些常用的作業(yè)配置。
1、跳過非最新 DAG Run
假如有一個每小時調度的 DAG 出錯了,我們把它的調度暫停,之后花了3個小時修復了它,修復完成后重新啟動這個作業(yè)的調度。于是 Airflow 一下子創(chuàng)建了 3 個 DAG Run 并同時執(zhí)行,這顯然不是我們希望的,我們希望它只執(zhí)行最新的 DAG Run。
我們可以創(chuàng)建一個 Short Circuit Operator,并且讓 DAG 中所有沒有依賴的作業(yè)都依賴這個作業(yè),然后在這個作業(yè)中進行判斷,檢測當前 DAG Run 是否為最新,不是最新的直接跳過整個 DAG。
def skip_dag_not_latest_worker(ds, **context):
if context['dag_run'] and context['dag_run'].external_trigger:
logging.info('Externally triggered DAG_Run: allowing execution to proceed.')
return True
skip = False
now = datetime.now()
left_window = context['dag'].following_schedule(context['execution_date'])
right_window = context['dag'].following_schedule(left_window)
logging.info('Checking latest only with left_window: %s right_window: %s now: %s', left_window, right_window, now)
if not left_window < now <= right_window:
skip = True
return not skip
ShortCircuitOperator(
task_id='skip_dag_not_latest',
provide_context=True,
python_callable=skip_dag_not_latest_worker,
dag=dag
)
2、當存在正在執(zhí)行的 DAG Run 時跳過當前 DAG Run
依舊是之前提到的每小時調度的 DAG,假設它這次沒有出錯而是由于資源、網(wǎng)絡或者其他問題導致執(zhí)行時間變長,當下一個調度時間開始時 Airflow 依舊會啟動一次新的 DAG Run,這樣就會同時出現(xiàn) 2 個 DAG Run。如果我們想要避免這種情況,一個簡單的方法是直接將 DAG 的 max_active_runs 設置為 1。但這樣會導致 DAG Run 堆積的問題,如果你配置的調度是早上 9 點至晚上 9 點,直至晚上 9 點之后 Airflow 可能依舊在處理堆積的 DAG Run。這樣就可能影響到我們原本安排在晚上 9 點之后的任務。
我們可以創(chuàng)建一個 Short Circuit Operator,并且讓 DAG 中所有沒有依賴的作業(yè)都依賴這個作業(yè),然后在這個作業(yè)中進行判斷,檢測當前是否存在正在執(zhí)行的 DAG Run,存在時則直接跳過整個 DAG。
def skip_dag_when_previous_running_worker(ds, **context):
if context['dag_run'] and context['dag_run'].external_trigger:
logging.info('Externally triggered DAG_Run: allowing execution to proceed.')
return True
skip = False
session = settings.Session()
count = session.query(DagRun).filter(
DagRun.dag_id == context['dag'].dag_id,
DagRun.state.in_(['running']),
).count()
session.close()
logging.info('Checking running DAG count: %s' % count)
skip = count > 1
return not skip
ShortCircuitOperator(
task_id='skip_dag_when_previous_running',
provide_context=True,
python_callable=skip_dag_when_previous_running_worker,
dag=dag
)
3、Sensor 的替代方案
Airflow 中有一類 Operator 被稱為 Sensor,Sensor 可以感應預先設定的條件是否滿足(如:某個時間點是否達到、某條 MySQL 記錄是否被更新、某個 DAG 是否完成),當滿足條件后 Sensor 作業(yè)變?yōu)?Success 使得下游的作業(yè)能夠執(zhí)行。Sensor 的功能很強大但卻帶來一個問題,假如我們有一個 Sensor 用于檢測某個 MySQL 記錄是否被更新,在 Sensor 作業(yè)啟動后 3 個小時這個 MySQL 記錄才被更新。于是我們的這個 Sensor 占用了一個 Worker 整整 3 小時,這顯然是一個極大的浪費。
因此我們需要一個 Sensor 的替代方案,既能滿足 Sensor 原來的功能,又能節(jié)省 Worker 資源。有一個辦法是不使用 Sensor,直接使用 Python Operator 判斷預先設定的條件是否滿足,如果不滿足直接 raise Exception,然后將這個作業(yè)的 retry_delay(重試間隔時間) 設為每次檢測的間隔時間,retries(重試次數(shù)) 設為最長檢測時間除以 retry_delay,即滿足:最長檢測時間 = retries * retry_delay。這樣既不會長時間占用 Worker 資源,又可以滿足 Sensor 原來的功能。
六、Airflow DAG Creation Manager Plugin
正如上兩章所描述的,Airflow 雖然具有強大的功能,但是配置 DAG 并不是簡單的工作,也有一些較為繁瑣的概念,對于業(yè)務人員來說可能略顯復雜。因此,筆者編寫了 Airflow DAG Creation Manager Plugin(https://github.com/lattebank/airflow-dag-creation-manager-plugin)以提供一個 Web界面來讓業(yè)務人員可視化的編寫及管理 DAG。具體的安裝及使用方法請查看插件的README。

如上圖所示,插件的 Web 界面中可以直接所見即所得的編寫 DAG 圖。
插件中盡量簡化了一些繁瑣的諸如上文所述的作業(yè)開始調度時間等一系列的概念,并提供了一些在實際工作中常常會用到的一些額外的功能(如上文提到的跳過非最新 DAG Run、當存在正在執(zhí)行的 DAG Run 時跳過當前 DAG Run 等),以及版本控制和權限管理。如果大家在使用 Airflow 的過程中也有類似的問題,歡迎嘗試使用 Airflow DAG Creation Manager Plugin。
七、總結
Airflow 適用于調度作業(yè)較為復雜,特別是各作業(yè)之間的依賴關系復雜的情況。
希望本文能讓大家對 Airflow 有所了解,并能將 Airflow 運用到適合它使用的場景中。
更多閱讀:
基于Redis的限流系統(tǒng)的設計
Presto+Hive統(tǒng)一賬戶體系及查詢監(jiān)控輕型解決方案
談談基于 OpenResty 的接口網(wǎng)關設計
