在上一篇中我們介紹了 mpi4py 中的 profiling,下面我們將介紹 mpi4py 中的 futures 模塊。
mpi4py.futures 提供了一個由多個工作進程使用 MPI 進程間通信來異步執(zhí)行任務的高級別接口。mpi4py.futures 是建立在 Python 標準庫中的 concurrent.futures 的基礎(chǔ)之上的。這里先簡要介紹一下 concurrent.futures 模塊。
concurrent.futures
concurrent.futures 提供了一個異步并行執(zhí)行任務的高級別接口。異步執(zhí)行的任務可以通過線程來完成(使用 ThreadPoolExecutor),也可以通過不同的進程來完成(使用 ProcessPoolExecutor),它們都繼承自抽象的 Executor 類。
為了有助于理解和使用 moi4py.futures,下面簡要給出 concurrent.futures 中的主要類,方法和函數(shù),但是不做詳細的講解,想要了解更多的讀者可以參見其文檔。
Executor 類
class concurrent.futures.Executor
提供異步執(zhí)行任務的抽象類,不能直接使用,而是使用其具體的子類。下面是其幾個主要的方法接口:
submit(fn, *args, **kwargs)
提交執(zhí)行 fn(*args **kwargs),返回一個 Future 對象(將在下面介紹)表示提交執(zhí)行的結(jié)果。
map(func, *iterables, timeout=None, chunksize=1)
類似于 Python 中的 map(func, *iterables),不同的是 func 是異步并發(fā)執(zhí)行的。
shutdown(wait=True)
通知任務執(zhí)行器在當前掛起的 future 任務完成后釋放所占用的資源。
ThreadPoolExecutor 類
ThreadPoolExecutor 是 Executor 的一個子類,使用一個線程池來異步地執(zhí)行任務,下面是其類原型:
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())
ProcessPoolExecutor 類
ProcessPoolExecutor 是 Executor 的一個子類,使用一個進程池來異步地執(zhí)行任務,使用 multiprocessing 模塊,可以避開 Python 的 Global Interpreter Lock (GIL),但是只有那些可以被 pickle 系列化的對象才能被執(zhí)行并返回。下面是其類原型:
class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())
Future 對象
Future 類包裝異步執(zhí)行的任務結(jié)果。Future 類對象由 Executor.submit() 創(chuàng)建。
class concurrent.futures.Future()
Future 類原型,下面使其主要方法:
cancel()
嘗試取消調(diào)用。
cancelled()
如果調(diào)用被成功取消則返回 True。
running()
如果調(diào)用正在執(zhí)行而不能被取消則返回 True。
done()
如果調(diào)用被成功取消或已執(zhí)行完成則返回 True。
result(timeout=None)
返回調(diào)用的結(jié)果。
exception(timeout=None)
返回調(diào)用拋出的異常。
模塊函數(shù)
concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)
等待 Future 對象 fs 完成。
concurrent.futures.as_completed(fs, timeout=None)
返回一個由一系列 Future 對象中完成(執(zhí)行完或者被取消)的那些組成的迭代器。
mpi4py.futures
mpi4py.futures 是基于 concurrent.futures 的。具體來說,mpi4py.futures 提供了 Executor 類的子類實現(xiàn) MPIPoolExecutor 和 MPICommExecutor。
MPIPoolExecutor 類
MPIPoolExecutor 使用一個 MPI 進程池來異步執(zhí)行任務。同 ProcessPoolExecutor,MPIPoolExecutor 可以避開 Python 的 Global Interpreter Lock (GIL),但是只有那些可以被 pickle 系列化的對象才能被執(zhí)行并返回。因為工作進程必須導入 __main__ 模塊,因此 MPIPoolExecutor 不能工作在交互的環(huán)境下(如 Ipython shell 中)。
MPIPoolExecutor 使用 MPI-2 標準中引進的動態(tài)進程管理特性。主進程使用 MPI.COMM_SELF 的 Spawn() 方法啟動新的工作進程。主進程會分別使用一個單獨的線程同每個工作進程進行通信。工作進程在其唯一的主線程內(nèi)執(zhí)行所分配到的任務。為了避免工作進程再啟動新的工作進程(會導致無限遞歸),一種簡單的方式是將 MPIPoolExecutor 執(zhí)行代碼放到主腳本的 if __name__ == '__main__': 語句下。
下面是其類原型:
class mpi4py.futures.MPIPoolExecutor(max_workers=None,**kwargs)
Executor 的子類,使用至多 max_workers 個進程組成的進程池來異步執(zhí)行任務。如果 max_workers 為 None (默認值),則其值由環(huán)境變量 MPI4PY_MAX_WORKERS (如果設(shè)置了)決定,或者由 MPI universe 的大小(如果設(shè)置了)決定,否則只會生成單個工作進程。如果 max_workers 的值小于或等于 0,則會拋出 ValueError 異常。其它可設(shè)置參數(shù)有:
-
python_exe:Python 執(zhí)行程序路徑。 -
python_args:列表或者可迭代對象,用來向 Python 執(zhí)行程序傳遞額外的命令行參數(shù)。 -
mpi_info:字典或可產(chǎn)生 (key, value) 對的迭代對象。這些 (key, value) 對會通過 MPI.Info 對象傳遞給 MPI.Intracomm.Spawn() 調(diào)用以啟動工作進程。可以通過此機制告訴 MPI 在什么地方及怎樣啟動新的進程。 -
globals:字典或可產(chǎn)生 (name, value) 對的迭代對象,用來初始化工作進程的主模塊命名空間。 -
main:如果為 False,則不會在工作進程中導入 __main__ 模塊。 -
path:列表或可迭代對象,向 sys.path 中追加一系列工作進程搜尋路徑。 -
wdir:設(shè)置工作進程的當前工作目錄。 -
env:字典可產(chǎn)生 (name, value) 對的迭代對象,用來更新工作進程的 os.environ。
submit(func, *args, **kwargs)
以 func(*args, **kwargs) 提交執(zhí)行任務,返回一個 Future 對象作為提交結(jié)果。簡單的使用例程如下:
executor = MPIPoolExecutor(max_workers=1)
future = executor.submit(pow, 321, 1234)
print(future.result())
map(func, *iterables, timeout=None, chunksize=1, **kwargs)
等價于 map(func, *iterables),不同的是 func 是被異步地執(zhí)行,對 func 的多個調(diào)用可以在多個進程中并發(fā)地無序地執(zhí)行。返回的迭代器會拋出一個 TimeoutError 如果 __next__() 調(diào)用后 timout 秒還沒有得到結(jié)果,timeout 可以為一個整數(shù)或一個浮點數(shù)。如果 timeout 為 None,則等待的時間沒有限制。如果某個調(diào)用拋出了異常,則在獲取返回的迭代器中該值時會重新拋出該異常。該方法會將 iterables 分割成若干塊分別提交到進程池中執(zhí)行,塊的近似大小由 chunksize 設(shè)置。對非常長的 iterables,使用一個大的 chunksize 可以顯著地提高執(zhí)行性能。在默認情況下,返回的迭代器會產(chǎn)生與原 iterables 相同順序的結(jié)果,等待提交的任務依次完成,如果傳遞并設(shè)置關(guān)鍵字參數(shù) unordered 為 True,則返回的迭代器會盡快地返回任意已經(jīng)完成的任務。
starmap(func, iterable, timeout=None, chunksize=1, **kwargs)
等價于 itertools.starmap(func, iterable)。如果 iterable 已經(jīng) "zip" 過了,則使用該方法更方便。map(func, *iterable) 等價于 starmap(func, zip(*iterable))。
shutdown(wait=True)
通知任務執(zhí)行器在當前掛起的 future 任務完成后釋放所占用的資源。在該方法執(zhí)行后的 submit() 和 map() 調(diào)用會拋出 RuntimeError。如果 wait 為 True (默認),則該方法會一直等到所有掛起的 future 任務都執(zhí)行完并且執(zhí)行器的相關(guān)資源都已釋放后才返回。如果 wait 為 False,則該方法會立即返回,但執(zhí)行器的相關(guān)資源則會等到所有掛起的 future 任務都完成后才釋放。不管 wait 的值是什么,整個 Python 程序都會在所有掛起的 future 任務都完成后才會結(jié)束退出。使用 with 語句可以避免顯式地調(diào)用該方法。with 語句相當于設(shè)置 wait 為 True 調(diào)用 shutdown(),舉例如下:
import time
with MPIPoolExecutor(max_workers=1) as executor:
future = executor.submit(time.sleep, 2) assert future.done()
bootup(wait=True)
通知執(zhí)行器盡早分配所需的資源(特別是 MPI 進程)。如果 wait 為 True,則該方法會直到資源已經(jīng)分配好才返回。在第一次調(diào)用 submit() 時會自動分配所需的資源,因此很少需要顯式地調(diào)用該方法。
需要注意的是,因為主進程要使用單獨的線程同每一個工作進程進行 MPI 通信,因此所使用的 MPI 環(huán)境需要提供 MPI.THREAD_MULTIPLE 級別的多線程支持。如果 MPI 環(huán)境所支持的線程級別比 MPI.THREAD_MULTIPLE 低,則 mpi4py.futures 會使用一個全局鎖來系列化 MPI 調(diào)用。如果支持的線程級別比 MPI.THREAD_SERIALIZED 低, 則 mpi4py.futures 會發(fā)出 RuntimeWarning 警告。
MPICommExecutor 類
對只支持 MPI-1 標準的 MPI 實現(xiàn),無法使用 MPI-2 標準中才引入的動態(tài)進程管理特性,另外在一些超算平臺上對調(diào)用 MPI_Comm_spawn() 方法可能有額外的限制或引起額外的復雜度。針對這些情況,mpi4py.futures 支持另一種更加傳統(tǒng)的類似于 SPMD 的使用方式,這種使用方式只用到 MPI-1 的相關(guān)特性。用戶使用 mpiexec 命令來啟動 Python 應用程序,在程序里面集合調(diào)用 MPICommExecutor 上下文管理器將啟動起來的若干 MPI 進程分割成一個主進程和多個工作進程。主進程訪問 MPICommExecutor 實例以提交任務,與此同時,工作進程沿著另一個不同的執(zhí)行路徑執(zhí)行主進程提交的任務。
下面是 MPICommExecutor 的原型:
class mpi4py.futures.MPICommExecutor(comm=None,root=0)
MPICommExecutor 的上下文管理器,將一個 MPI 組內(nèi)通信子 comm (默認值 None 表示 MPI.COMM_WORLD) 分割成兩個無交集的集合:單個主進程(rank 為 root 的進程)和其它的所有進程作為工作進程。這兩個集合通過組間通信子連接在一起。with 語句的目標要么是一個 MPICommExecutor 實例(對主進程),要么是 None (對工作進程)。簡短的使用例程如下:
from mpi4py import MPI
from mpi4py.futures import MPICommExecutor
with MPICommExecutor(MPI.COMM_WORLD, root=0) as executor:
if executor is not None:
future = executor.submit(abs, -42)
assert future.result() == 42
answer = set(executor.map(abs, [-42, 42]))
assert answer == {42}
需要注意的是,如果向 MPICommExecutor 傳遞了一個 size 為 1 的通信子(如 MPI.COMM_SELF),with 語句的目標將會將所有提交的任務在一個單獨的工作線程上完成,用于保證任務會被異步地執(zhí)行。但是,Python 的 Global Interpreter Lock (GIL) 會阻止主線程和工作線程并發(fā)地執(zhí)行,即使是運行在多核處理器上。線程的頻繁切換可能會降低程序的性能,因此一般不建議使用一個 size 為 1 的通信子來執(zhí)行 MPICommExecutor,如果確實要使用的話,可以考慮使用 concurrent.futures 的 ThreadPoolExecutor。
命令行執(zhí)行方法
當所使用的 MPI 實現(xiàn)不支持動態(tài)進程管理特性時,可以用另一種方式來使用 mpi4py.futures:在命令行方式下傳遞 -m mpi4py.futures 給 python 執(zhí)行程序,此外 mpi4py.futures 接受 -m mod 以執(zhí)行一個模塊,-c cmd 以執(zhí)行一條 Python 語句,或者 - 從標準輸入(sys.stdin)讀取 Python 命令語句??偟膩碚f,可以使用下面 4 種命令行方式來運行 mpi4py.futures:
- $ mpiexec -n numprocs python -m mpi4py.futures pyfile [arg] ...
- $ mpiexec -n numprocs python -m mpi4py.futures -m mod [arg] ...
- $ mpiexec -n numprocs python -m mpi4py.futures -c cmd [arg] ...
- $ mpiexec -n numprocs python -m mpi4py.futures - [arg] ...
在開始執(zhí)行主腳本之前,mpi4py.futures 會將 MPI.COMM_WORLD 分割成一個主進程(MPI.COMM_WORLD 中 rank 為 0 的進程)和 numprocs - 1 個工作進程,這些進程會通過一個 MPI 組間通信子連接起來。然后,主進程執(zhí)行用戶腳本代碼,最終會創(chuàng)建 MPIPoolExecutor 實例以提交計算任務,與此同時,工作進程沿著一個不同的執(zhí)行路徑以服務于主進程。當主進程順利地執(zhí)行完主腳本結(jié)束時,整個 MPI 執(zhí)行環(huán)境會合適地退出,但是在遇到?jīng)]有處理的異常情況時,主進程會調(diào)用 MPI.COMM_WORLD.Abort(1) 以避免死鎖并強制整個 MPI 執(zhí)行環(huán)境退出。
例程
下面給出相應的使用例程。
# julia.py
"""
Demonstrates the usage of mpi4py.futures.MPIPoolExecutor.
Run this with 1 processes like:
$ mpiexec -n 1 -usize 17 python julia.py
or 17 processes like:
$ mpiexec -n 17 python -m mpi4py.futures julia.py
"""
from mpi4py.futures import MPIPoolExecutor
x0, x1, w = -2.0, +2.0, 640*2
y0, y1, h = -1.5, +1.5, 480*2
dx = (x1 - x0) / w
dy = (y1 - y0) / h
c = complex(0, 0.65)
def julia(x, y):
z = complex(x, y)
n = 255
while abs(z) < 3 and n > 1:
z = z**2 + c
n -= 1
return n
def julia_line(k):
line = bytearray(w)
y = y1 - k * dy
for j in range(w):
x = x0 + j * dx
line[j] = julia(x, y)
return line
if __name__ == '__main__':
with MPIPoolExecutor() as executor:
image = executor.map(julia_line, range(h))
with open('julia.pgm', 'wb') as f:
f.write(b'P5 %d %d %d\n' % (w, h, 255))
for line in image:
f.write(line)
推薦以 1 個 MPI 進程并設(shè)置所需的 universe size 的方式使用 mpiexec 命令以啟動并執(zhí)行以上腳本:
$ mpiexec -n 1 -usize 17 python julia.py
注意以上 -usize 標志(或者等價的設(shè)置 MPIEXEC_UNIVERSE_SIZE 環(huán)境變量)只適用于 MPICH。對 OPenMPI,則需要設(shè)置 OMPI_UNIVERSE_SIZE 環(huán)境變量來給定 universe size。
在以上執(zhí)行方式中,mpiexec 命令啟動單個 MPI 進程(主進程)以執(zhí)行主腳本,當需要時,mpi4py.futures 生成另外的 16 個 MPI 進程以組成一個工作進程池。主進程提交任務到工作進程池并等待其返回結(jié)果。工作進程接收來自主進程提交的任務,執(zhí)行并返回結(jié)果給主進程。
另外,用戶還可以以一種更加傳統(tǒng)的方式來執(zhí)行以上腳本,即一次啟動所有需要的 MPI 進程。這種執(zhí)行方式類似下面的命令:
$ mpiexec -n 17 python -m mpi4py.futures julia.py
此時,啟動的 17 個進程會被分割成一個主進程和 16 個工作進程。主進程執(zhí)行主腳本并提交任務,工作進程執(zhí)行提交的任務并返回結(jié)果給主進程。
程序執(zhí)行后的結(jié)果如下:

以上介紹了 mpi4py 中的 futures 模塊,在下一篇中我們將介紹 mpi4py 中的 run 模塊。