本文是我在學(xué)習(xí) Python 多進(jìn)程過(guò)程中的一些總結(jié),主要介紹多進(jìn)程的實(shí)現(xiàn)方式以及進(jìn)程間的通信,大體有如下這么幾點(diǎn)知識(shí):
-
fork創(chuàng)建進(jìn)程 -
Process創(chuàng)建進(jìn)程 - 進(jìn)程池
- 進(jìn)程間通信
- 進(jìn)程池的進(jìn)程間通信
fork 創(chuàng)建進(jìn)程
針對(duì)于類 UNIX 操作系統(tǒng),os 模塊下有一個(gè) fork 方法,可以創(chuàng)建多進(jìn)程。使用方法如下:
pid = os.fork()
調(diào)用 fork 方法會(huì)得到一個(gè)返回值,該返回值是一個(gè)標(biāo)志值:如果是主進(jìn)程,那么該返回值就是主進(jìn)程的 pid,如果是創(chuàng)建出來(lái)的子進(jìn)程,該返回值就是0,我們可以通過(guò)這個(gè)返回值來(lái)區(qū)分主進(jìn)程和子進(jìn)程:
# 導(dǎo)入 fork 方法
from os import fork
# 使用 fork 創(chuàng)建進(jìn)程
pid = fork()
# 根據(jù) pid 判斷是主進(jìn)程還是子進(jìn)程
if not pid:
print("我是子進(jìn)程,pid 是 %s"%pid)
else:
print("我是主進(jìn)程,pid 是 %s"%pid)
執(zhí)行該程序:
charley@charley-ubuntu:~/桌面/Py$ python3 mutliprocess01.py
我是主進(jìn)程,pid 是 9667
charley@charley-ubuntu:~/桌面/Py$ 我是子進(jìn)程,pid 是 0
通過(guò) pid 標(biāo)志值群分主進(jìn)程和子進(jìn)程,我們同時(shí)可以看到,在主進(jìn)程執(zhí)行完成后其會(huì)終止,并不影響子進(jìn)程的執(zhí)行。也就是說(shuō):使用 fork 創(chuàng)建的進(jìn)程,主進(jìn)程并不會(huì)等待子進(jìn)程執(zhí)行完成,二者是獨(dú)立的。
getpid 和 getppid 方法
主進(jìn)程和子進(jìn)程都是獨(dú)立的進(jìn)程,因此二者都有獨(dú)立的 pid,前面我們?cè)谑褂?fork 創(chuàng)建進(jìn)程的時(shí)候,如果是主進(jìn)程,那么 fork 函數(shù)的返回值就是主進(jìn)程的 pid,而如果是子進(jìn)程的話,fork 函數(shù)的返回值并不是其的 pid,而是 0,以此來(lái)和主進(jìn)程進(jìn)行區(qū)分。
同時(shí),我們可以使用 getpid 來(lái)獲取進(jìn)程的 pid,使用 getppid 獲取父進(jìn)程的 pid:
# 導(dǎo)入 os 模塊
import os
# 創(chuàng)建進(jìn)程
pid = os.fork()
# 獲取進(jìn)程的 pid
if not pid:
print("子進(jìn)程的 pid 是 %s,子進(jìn)程的 ppid 是 %s"%(os.getpid(),os.getppid()))
else:
print("父進(jìn)程的 pid 是 %s,父進(jìn)程的 ppid 是 %s"%(os.getpid(),os.getppid()))
運(yùn)行結(jié)果:
charley@charley-ubuntu:~/桌面/Py$ python3 mutliprocess02.py
父進(jìn)程的 pid 是 10431,父進(jìn)程的 ppid 是 9576
子進(jìn)程的 pid 是 10432,子進(jìn)程的 ppid 是 10431
父進(jìn)程也是有 ppid 的,這里它的 ppid 就是 bash 的 pid。
進(jìn)程是彼此獨(dú)立的
一旦創(chuàng)建進(jìn)程,他們就是彼此獨(dú)立的,因此我們不能讓兩個(gè)進(jìn)程修改同一個(gè)變量:
from os import fork
pid = fork()
# 創(chuàng)建一個(gè)列表
testList = [1,2,3]
if not pid:
# 子進(jìn)程向列表中添加元素
testList.append(4)
print("子進(jìn)程:",testList)
else:
# 主進(jìn)程從列表中移除元素
testList.remove(3)
print("主進(jìn)程:",testList)
運(yùn)行結(jié)果:
charley@charley-ubuntu:~/桌面/Py$ python3 mutliprocess3.py
主進(jìn)程: [1, 2]
子進(jìn)程: [1, 2, 3, 4]
可見(jiàn),兩個(gè)進(jìn)程是不能修改同一份數(shù)據(jù)的,可以這樣理解:一旦創(chuàng)建了一個(gè)進(jìn)程,就在原始程序的基礎(chǔ)上創(chuàng)建了一份全新的副本,和原始的代碼沒(méi)有了關(guān)聯(lián),因此他們是無(wú)法修改同一份數(shù)據(jù)的,只可修改其所在的代碼副本中的數(shù)據(jù)。
Process 創(chuàng)建進(jìn)程
上面的 fork 可以用來(lái)在類 UNIX 操作系統(tǒng)中創(chuàng)建進(jìn)程,而在 Windows 環(huán)境下就不能再使用 fork 方法,需要使用其他的方式。針對(duì)這種情況,Python 為我們提供了一個(gè) multiprocessing 模塊,通過(guò)該模塊中的 Process 類也可以創(chuàng)建進(jìn)程,并兼容各個(gè)平臺(tái)。
下面是 Process 類的用法:
subprocess = Process([ target ],[ args ],[ kwargs ])
Process 類在創(chuàng)建進(jìn)程對(duì)象時(shí),可以接收一個(gè)函數(shù)作為參數(shù),該函數(shù)就是我們要在進(jìn)程中執(zhí)行的函數(shù),后面的 args 和 kwargs 分別是一個(gè)元組和字典,作為目標(biāo)函數(shù)的參數(shù)傳入。如果不傳入目標(biāo)函數(shù),將會(huì)默認(rèn)執(zhí)行進(jìn)程對(duì)象中的 run 方法。
# 導(dǎo)入 Process 類
from multiprocessing import Process
from time import sleep
# 定義目標(biāo)函數(shù)
def getNum(num,delay):
for i in range(num):
print(i)
sleep(delay)
# 在入口程序中執(zhí)行多進(jìn)程操作
if __name__ == "__main__":
p = Process(target = getNum,args = (5,1))
# 開(kāi)始執(zhí)行進(jìn)程
p.start()
運(yùn)行結(jié)果:
0
1
2
3
4
使用 Process 創(chuàng)建的進(jìn)程,主進(jìn)程會(huì)等待子進(jìn)程執(zhí)行完成嗎?我們可以進(jìn)行驗(yàn)證:
# 導(dǎo)入 Process 類
from multiprocessing import Process
from time import sleep
# 定義目標(biāo)函數(shù)
def getNum(num,delay):
for i in range(num):
print(i)
sleep(delay)
# 在入口程序中執(zhí)行多進(jìn)程操作
if __name__ == "__main__":
p = Process(target = getNum,args = (5,1))
# 開(kāi)始執(zhí)行進(jìn)程
p.start()
print("-----我是主進(jìn)程中的代碼-----")
運(yùn)行結(jié)果:
PS C:\Users\Charley\Desktop\py> python .\py.py
-----我是主進(jìn)程中的代碼-----
0
1
2
3
4
PS C:\Users\Charley\Desktop\py>
可見(jiàn),主進(jìn)程是會(huì)等待子進(jìn)程執(zhí)行完成再退出的。
Process 進(jìn)程對(duì)象的常用方法
下面是 Process 進(jìn)程對(duì)象的一些常用屬性和方法:
-
pid屬性:獲取當(dāng)前進(jìn)程的 pid -
name屬性:獲取當(dāng)前進(jìn)程的 name -
is_alive方法:判斷進(jìn)程是否在運(yùn)行 -
join方法,主進(jìn)程是否等待子進(jìn)程結(jié)束再執(zhí)行,默認(rèn)為等子進(jìn)程結(jié)束后再執(zhí)行主進(jìn)程,也可以接受一個(gè)參數(shù),表示最多等待多少時(shí)間 -
start方法:?jiǎn)?dòng)子進(jìn)程 -
run方法:如果創(chuàng)建子進(jìn)程對(duì)象時(shí)沒(méi)有指定target參數(shù),則會(huì)在start時(shí)執(zhí)行子進(jìn)程中的run方法 -
terminate方法:立即終止子進(jìn)程,不論任務(wù)是否完成
擴(kuò)展 Process 類
除了使用 Process 類直接創(chuàng)建進(jìn)程,我們也可以自定義一個(gè)繼承于 Process 的類來(lái)進(jìn)行創(chuàng)建,提高了擴(kuò)展性:
# 導(dǎo)入 Process 類
from multiprocessing import Process
from time import sleep
# 定義 Process 類的子類
class CreateSubProcess(Process):
def __init__(self,name,maxRange):
Process.__init__(self)
self.name = name
self.__maxRange = maxRange
def run(self):
for i in range(self.__maxRange):
print("%s 正在輸出 %s"%(self.name,i))
sleep(1)
if __name__ == '__main__':
p = CreateSubProcess("子進(jìn)程01",5)
p.start()
運(yùn)行結(jié)果:
PS C:\Users\Charley\Desktop\py> python .\py.py
子進(jìn)程01 正在輸出 0
子進(jìn)程01 正在輸出 1
子進(jìn)程01 正在輸出 2
子進(jìn)程01 正在輸出 3
子進(jìn)程01 正在輸出 4
PS C:\Users\Charley\Desktop\py>
我們定義了 Process 的子類,實(shí)現(xiàn)了在其基礎(chǔ)上的擴(kuò)展,可以把某個(gè)獨(dú)立進(jìn)程相關(guān)的方法都放在子類里面,起到了很好的封裝作用。
進(jìn)程池
除了使用 Process 類創(chuàng)建進(jìn)程,也可以運(yùn)用進(jìn)程池來(lái)實(shí)現(xiàn)多進(jìn)程,更加節(jié)省資源和方便。進(jìn)程池方便了資源管理和調(diào)度,要使用進(jìn)程池,需要首先創(chuàng)建一個(gè)進(jìn)程池對(duì)象,創(chuàng)建該對(duì)象時(shí)需要傳入一個(gè)參數(shù),表示池子中的最大進(jìn)程數(shù),我們所有的多任務(wù)都可以通過(guò)進(jìn)程池中的進(jìn)程來(lái)完成,而不是每個(gè)任務(wù)都創(chuàng)建一個(gè)進(jìn)程,節(jié)省了資源,并且進(jìn)程池會(huì)自動(dòng)幫我們調(diào)度資源。
創(chuàng)建進(jìn)程池
創(chuàng)建進(jìn)程池很簡(jiǎn)單,只需使用 multiprocessing 模塊中的 Pool 類:
pool = Pool([ maxValue ])
在創(chuàng)建進(jìn)程池對(duì)象時(shí)可以接受一個(gè)參數(shù),表示池子中的最大進(jìn)程數(shù),如果不傳入?yún)?shù),表示無(wú)進(jìn)程數(shù)限制。
進(jìn)程池的幾個(gè)常用方法
下面是幾個(gè)進(jìn)程池中的常用方法:
- apply_async( target,args,kwargs ):接受一個(gè)目標(biāo)函數(shù)作為池子中某個(gè)進(jìn)程的執(zhí)行函數(shù),異步添加
- apply( target,args,kwargs ):同步添加,只有在當(dāng)前進(jìn)程執(zhí)行完成后再添加目標(biāo)目標(biāo)函數(shù),會(huì)造成進(jìn)程池中其他進(jìn)程的阻塞,少用
- close:關(guān)閉進(jìn)程池,關(guān)閉后不再接受任務(wù)
- join:主進(jìn)程等待進(jìn)程池中的進(jìn)程執(zhí)行完成后再執(zhí)行,因?yàn)?strong>使用進(jìn)程池創(chuàng)建進(jìn)程時(shí),主進(jìn)程默認(rèn)不會(huì)等待子進(jìn)程執(zhí)行完成,因此該方法較常用,但只能放在
close方法后執(zhí)行。
下面來(lái)看一個(gè)進(jìn)程池的例子:
from multiprocessing import Pool
from time import sleep
import os
def getNum(maxRange,delay):
for i in range(maxRange):
print("進(jìn)程 %s 正在輸出 %d"%(os.getpid(),i))
sleep(delay)
if __name__ == '__main__':
# 定義進(jìn)程池
pool = Pool(3)
# 向進(jìn)程池中添加任務(wù)
for i in range(5):
pool.apply_async(getNum,(3,1))
# 關(guān)閉進(jìn)程池,不在接受任務(wù)
pool.close()
pool.join()
運(yùn)行結(jié)果:
PS C:\Users\Charley\Desktop\py> python .\py.py
進(jìn)程 2584 正在輸出 0
進(jìn)程 12780 正在輸出 0
進(jìn)程 6072 正在輸出 0
進(jìn)程 12780 正在輸出 1
進(jìn)程 2584 正在輸出 1
進(jìn)程 6072 正在輸出 1
進(jìn)程 2584 正在輸出 2
進(jìn)程 12780 正在輸出 2
進(jìn)程 6072 正在輸出 2
進(jìn)程 2584 正在輸出 0
進(jìn)程 12780 正在輸出 0
進(jìn)程 12780 正在輸出 1
進(jìn)程 2584 正在輸出 1
進(jìn)程 2584 正在輸出 2
進(jìn)程 12780 正在輸出 2
PS C:\Users\Charley\Desktop\py>
上面我們創(chuàng)建了容量為 3 的進(jìn)程池,并向其中仍進(jìn)去了 5 個(gè)任務(wù),一開(kāi)始進(jìn)程池中進(jìn)程數(shù)是小于任務(wù)數(shù)的,所以會(huì)先執(zhí)行 3 個(gè)任務(wù),當(dāng)池子中有進(jìn)程完成任務(wù)后,再執(zhí)行后面的任務(wù),直到所有任務(wù)執(zhí)行完成為止。
進(jìn)程間通信
前面我們說(shuō)到過(guò),多個(gè)進(jìn)程間是彼此獨(dú)立的,無(wú)法直接修改同一份數(shù)據(jù),但實(shí)際情況中往往是有這樣的需求的,于是就要使用進(jìn)程間通信。進(jìn)程間通信的方式有很多,比如共享內(nèi)存、socket、網(wǎng)絡(luò)、Queue 等,我們這里討論的是使用 Queue 進(jìn)行進(jìn)程間通信。
Queue
Queue 也是 multiprocessing 模塊中的一個(gè)類,意為“隊(duì)列”,創(chuàng)建一個(gè) Queue 對(duì)象:
queue = Queue( [maxVal] )
在創(chuàng)建 Queue 對(duì)象時(shí),可以傳入一個(gè)參數(shù),表示隊(duì)列中的最大消息數(shù),如果不傳,表示無(wú)限制。Queue 對(duì)象中的常用方法有:
-
qsize:返回當(dāng)前隊(duì)列中包含的消息數(shù)量 -
empty:判斷隊(duì)列是否為空 -
full:判斷隊(duì)列是否充滿 -
get([ block, [timeout]]):從隊(duì)列中取出消息,如果當(dāng)前隊(duì)列為空,則進(jìn)入阻塞狀態(tài),直到隊(duì)列中有消息可以被取用為止。如果 block 設(shè)置為False,則在隊(duì)列為空時(shí)拋出Queue.Empty異常。該方法也可以接受一個(gè)超時(shí)參數(shù),表示最多等待多長(zhǎng)時(shí)間,如果超過(guò)等待時(shí)間仍然沒(méi)有取出消息,也會(huì)拋出Queue.Empty異常。 -
get_nowait:相當(dāng)于get(False) -
put(item, [block]):向隊(duì)列中放入消息,當(dāng)隊(duì)列充滿時(shí),會(huì)進(jìn)入等待狀態(tài),直到隊(duì)列中可以放入消息為止。如果 block 設(shè)置為False,則在隊(duì)列充滿時(shí)拋出Queue.Full異常。 -
put_nowait:相當(dāng)于put(item,False)
使用 Queue 進(jìn)行進(jìn)程間通信
我們可以利用 Queue 進(jìn)行進(jìn)程間通信,只需將 Queue 對(duì)象傳入相應(yīng)的任務(wù)函數(shù)中:
from multiprocessing import Queue,Process
from time import sleep
import os
def getVal(queue):
while True:
if queue.empty():
print("進(jìn)程 %s 已經(jīng)取完了~"%os.getpid())
break
else:
print("進(jìn)程 %s 獲取到了數(shù)據(jù) %d"%(os.getpid(),queue.get()))
sleep(1)
def putVal(queue,maxRange):
for i in range(maxRange):
queue.put(i)
print("進(jìn)程 %s 放入了數(shù)據(jù) %d"%(os.getpid(),i))
sleep(1)
# 創(chuàng)建隊(duì)列
queue = Queue()
# 創(chuàng)建兩個(gè)進(jìn)程
gv = Process(target = getVal,args = (queue,))
pv = Process(target = putVal,args = (queue,10))
# 啟動(dòng)進(jìn)程
if __name__ == "__main__":
pv.start()
gv.start()
運(yùn)行結(jié)果:
PS C:\Users\Charley\Desktop\py> python .\py.py
進(jìn)程 22208 放入了數(shù)據(jù) 0
進(jìn)程 20656 獲取到了數(shù)據(jù) 0
進(jìn)程 22208 放入了數(shù)據(jù) 1
進(jìn)程 20656 獲取到了數(shù)據(jù) 1
進(jìn)程 22208 放入了數(shù)據(jù) 2
進(jìn)程 20656 獲取到了數(shù)據(jù) 2
進(jìn)程 22208 放入了數(shù)據(jù) 3
進(jìn)程 20656 獲取到了數(shù)據(jù) 3
進(jìn)程 22208 放入了數(shù)據(jù) 4
進(jìn)程 20656 獲取到了數(shù)據(jù) 4
進(jìn)程 22208 放入了數(shù)據(jù) 5
進(jìn)程 20656 獲取到了數(shù)據(jù) 5
進(jìn)程 22208 放入了數(shù)據(jù) 6
進(jìn)程 20656 獲取到了數(shù)據(jù) 6
進(jìn)程 22208 放入了數(shù)據(jù) 7
進(jìn)程 20656 獲取到了數(shù)據(jù) 7
進(jìn)程 22208 放入了數(shù)據(jù) 8
進(jìn)程 20656 獲取到了數(shù)據(jù) 8
進(jìn)程 22208 放入了數(shù)據(jù) 9
進(jìn)程 20656 獲取到了數(shù)據(jù) 9
進(jìn)程 20656 已經(jīng)取完了~
PS C:\Users\Charley\Desktop\py>
進(jìn)程池中的通信
上面的進(jìn)程間通信是基于 Process類(或者其子類)創(chuàng)建出的進(jìn)程,可以直接使用 multiprocessing 中的 Queue隊(duì)列,但對(duì)于進(jìn)程池,需要使用 multiprocessing 中的 Manager 類,除此之外,其他操作并沒(méi)有變化:
# 導(dǎo)入 Manager 和 Pool 類
from multiprocessing import Manager,Pool
from time import sleep
import os
def getVal(queue):
while True:
if queue.empty():
print("進(jìn)程 %s 已經(jīng)取完了~"%os.getpid())
break
else:
print("進(jìn)程 %s 獲取到了數(shù)據(jù) %d"%(os.getpid(),queue.get()))
sleep(1)
def putVal(queue,maxRange):
for i in range(maxRange):
queue.put(i)
print("進(jìn)程 %s 放入了數(shù)據(jù) %d"%(os.getpid(),i))
sleep(1)
# 入口方法
def main():
# 創(chuàng)建隊(duì)列
queue = Manager().Queue()
# 創(chuàng)建進(jìn)程池
pool = Pool(2)
pool.apply_async(putVal,(queue,5))
pool.apply_async(getVal,(queue,))
# 關(guān)閉進(jìn)程池并使主進(jìn)程等待
pool.close()
pool.join()
if __name__ == '__main__':
main()
運(yùn)行結(jié)果如下:
PS C:\Users\Charley\Desktop\py> python .\py.py
進(jìn)程 15304 放入了數(shù)據(jù) 0
進(jìn)程 17264 獲取到了數(shù)據(jù) 0
進(jìn)程 15304 放入了數(shù)據(jù) 1
進(jìn)程 17264 獲取到了數(shù)據(jù) 1
進(jìn)程 15304 放入了數(shù)據(jù) 2
進(jìn)程 17264 獲取到了數(shù)據(jù) 2
進(jìn)程 15304 放入了數(shù)據(jù) 3
進(jìn)程 17264 獲取到了數(shù)據(jù) 3
進(jìn)程 15304 放入了數(shù)據(jù) 4
進(jìn)程 17264 獲取到了數(shù)據(jù) 4
進(jìn)程 17264 已經(jīng)取完了~
PS C:\Users\Charley\Desktop\py>
針對(duì)于進(jìn)程池中的通信,只需改變創(chuàng)建隊(duì)列對(duì)象的方式即可,其他的操作都不變。
總結(jié)
本文主要介紹了 Python 中的多進(jìn)程,下面是一些要點(diǎn):
- 使用
fork在類 UNIX 操作系統(tǒng)創(chuàng)建進(jìn)程 - 使用兼容各平臺(tái)的
Process - 擴(kuò)展
Process類,封裝進(jìn)程模塊 - 使用
Pool創(chuàng)建進(jìn)程池 - 幾種創(chuàng)建方式下,主進(jìn)程是否會(huì)等待子進(jìn)程執(zhí)行完成
- 進(jìn)程對(duì)象的幾個(gè)常用方法
- 進(jìn)程池對(duì)象的常用方法
- 隊(duì)列的常用方法
- 進(jìn)程間通信
- 使用進(jìn)程池時(shí)如何通信
完。