多進(jìn)程

本文是我在學(xué)習(xí) Python 多進(jìn)程過(guò)程中的一些總結(jié),主要介紹多進(jìn)程的實(shí)現(xiàn)方式以及進(jìn)程間的通信,大體有如下這么幾點(diǎn)知識(shí):

  • fork 創(chuàng)建進(jìn)程
  • Process 創(chuàng)建進(jìn)程
  • 進(jìn)程池
  • 進(jìn)程間通信
  • 進(jìn)程池的進(jìn)程間通信

fork 創(chuàng)建進(jìn)程

針對(duì)于類 UNIX 操作系統(tǒng),os 模塊下有一個(gè) fork 方法,可以創(chuàng)建多進(jìn)程。使用方法如下:

pid = os.fork()

調(diào)用 fork 方法會(huì)得到一個(gè)返回值,該返回值是一個(gè)標(biāo)志值:如果是主進(jìn)程,那么該返回值就是主進(jìn)程的 pid,如果是創(chuàng)建出來(lái)的子進(jìn)程,該返回值就是0,我們可以通過(guò)這個(gè)返回值來(lái)區(qū)分主進(jìn)程和子進(jìn)程:

# 導(dǎo)入 fork 方法
from os import fork
# 使用 fork 創(chuàng)建進(jìn)程
pid = fork()
# 根據(jù) pid 判斷是主進(jìn)程還是子進(jìn)程
if not pid:
    print("我是子進(jìn)程,pid 是 %s"%pid)
else:
    print("我是主進(jìn)程,pid 是 %s"%pid)

執(zhí)行該程序:

charley@charley-ubuntu:~/桌面/Py$ python3 mutliprocess01.py 
我是主進(jìn)程,pid 是 9667
charley@charley-ubuntu:~/桌面/Py$ 我是子進(jìn)程,pid 是 0

通過(guò) pid 標(biāo)志值群分主進(jìn)程和子進(jìn)程,我們同時(shí)可以看到,在主進(jìn)程執(zhí)行完成后其會(huì)終止,并不影響子進(jìn)程的執(zhí)行。也就是說(shuō):使用 fork 創(chuàng)建的進(jìn)程,主進(jìn)程并不會(huì)等待子進(jìn)程執(zhí)行完成,二者是獨(dú)立的。

getpid 和 getppid 方法

主進(jìn)程和子進(jìn)程都是獨(dú)立的進(jìn)程,因此二者都有獨(dú)立的 pid,前面我們?cè)谑褂?fork 創(chuàng)建進(jìn)程的時(shí)候,如果是主進(jìn)程,那么 fork 函數(shù)的返回值就是主進(jìn)程的 pid,而如果是子進(jìn)程的話,fork 函數(shù)的返回值并不是其的 pid,而是 0,以此來(lái)和主進(jìn)程進(jìn)行區(qū)分。
同時(shí),我們可以使用 getpid 來(lái)獲取進(jìn)程的 pid,使用 getppid 獲取父進(jìn)程的 pid

# 導(dǎo)入 os 模塊
import os
# 創(chuàng)建進(jìn)程
pid = os.fork()
# 獲取進(jìn)程的 pid
if not pid:
    print("子進(jìn)程的 pid 是 %s,子進(jìn)程的 ppid 是 %s"%(os.getpid(),os.getppid()))
else:
    print("父進(jìn)程的 pid 是 %s,父進(jìn)程的 ppid 是 %s"%(os.getpid(),os.getppid()))

運(yùn)行結(jié)果:

charley@charley-ubuntu:~/桌面/Py$ python3 mutliprocess02.py 
父進(jìn)程的 pid 是 10431,父進(jìn)程的 ppid 是 9576
子進(jìn)程的 pid 是 10432,子進(jìn)程的 ppid 是 10431

父進(jìn)程也是有 ppid 的,這里它的 ppid 就是 bash 的 pid

進(jìn)程是彼此獨(dú)立的

一旦創(chuàng)建進(jìn)程,他們就是彼此獨(dú)立的,因此我們不能讓兩個(gè)進(jìn)程修改同一個(gè)變量:

from os import fork
pid = fork()
# 創(chuàng)建一個(gè)列表
testList = [1,2,3]

if not pid:
    # 子進(jìn)程向列表中添加元素
    testList.append(4)
    print("子進(jìn)程:",testList)
else:
    # 主進(jìn)程從列表中移除元素
    testList.remove(3)
    print("主進(jìn)程:",testList)

運(yùn)行結(jié)果:

charley@charley-ubuntu:~/桌面/Py$ python3 mutliprocess3.py 
主進(jìn)程: [1, 2]
子進(jìn)程: [1, 2, 3, 4]

可見(jiàn),兩個(gè)進(jìn)程是不能修改同一份數(shù)據(jù)的,可以這樣理解:一旦創(chuàng)建了一個(gè)進(jìn)程,就在原始程序的基礎(chǔ)上創(chuàng)建了一份全新的副本,和原始的代碼沒(méi)有了關(guān)聯(lián),因此他們是無(wú)法修改同一份數(shù)據(jù)的,只可修改其所在的代碼副本中的數(shù)據(jù)。

Process 創(chuàng)建進(jìn)程

上面的 fork 可以用來(lái)在類 UNIX 操作系統(tǒng)中創(chuàng)建進(jìn)程,而在 Windows 環(huán)境下就不能再使用 fork 方法,需要使用其他的方式。針對(duì)這種情況,Python 為我們提供了一個(gè) multiprocessing 模塊,通過(guò)該模塊中的 Process 類也可以創(chuàng)建進(jìn)程,并兼容各個(gè)平臺(tái)。
下面是 Process 類的用法:

subprocess = Process([ target ],[ args ],[ kwargs ])

Process 類在創(chuàng)建進(jìn)程對(duì)象時(shí),可以接收一個(gè)函數(shù)作為參數(shù),該函數(shù)就是我們要在進(jìn)程中執(zhí)行的函數(shù),后面的 argskwargs 分別是一個(gè)元組和字典,作為目標(biāo)函數(shù)的參數(shù)傳入。如果不傳入目標(biāo)函數(shù),將會(huì)默認(rèn)執(zhí)行進(jìn)程對(duì)象中的 run 方法。

# 導(dǎo)入 Process 類
from multiprocessing import Process
from time import sleep

# 定義目標(biāo)函數(shù)
def getNum(num,delay):
    for i in range(num):
        print(i)
        sleep(delay)

# 在入口程序中執(zhí)行多進(jìn)程操作
if __name__ == "__main__":
    p = Process(target = getNum,args = (5,1))
    # 開(kāi)始執(zhí)行進(jìn)程
    p.start()

運(yùn)行結(jié)果:

0
1
2
3
4

使用 Process 創(chuàng)建的進(jìn)程,主進(jìn)程會(huì)等待子進(jìn)程執(zhí)行完成嗎?我們可以進(jìn)行驗(yàn)證:

# 導(dǎo)入 Process 類
from multiprocessing import Process
from time import sleep

# 定義目標(biāo)函數(shù)
def getNum(num,delay):
    for i in range(num):
        print(i)
        sleep(delay)

# 在入口程序中執(zhí)行多進(jìn)程操作
if __name__ == "__main__":
    p = Process(target = getNum,args = (5,1))
    # 開(kāi)始執(zhí)行進(jìn)程
    p.start()
    print("-----我是主進(jìn)程中的代碼-----")

運(yùn)行結(jié)果:

PS C:\Users\Charley\Desktop\py> python .\py.py
-----我是主進(jìn)程中的代碼-----
0
1
2
3
4
PS C:\Users\Charley\Desktop\py>

可見(jiàn),主進(jìn)程是會(huì)等待子進(jìn)程執(zhí)行完成再退出的。

Process 進(jìn)程對(duì)象的常用方法

下面是 Process 進(jìn)程對(duì)象的一些常用屬性和方法:

  • pid 屬性:獲取當(dāng)前進(jìn)程的 pid
  • name 屬性:獲取當(dāng)前進(jìn)程的 name
  • is_alive 方法:判斷進(jìn)程是否在運(yùn)行
  • join 方法,主進(jìn)程是否等待子進(jìn)程結(jié)束再執(zhí)行,默認(rèn)為等子進(jìn)程結(jié)束后再執(zhí)行主進(jìn)程,也可以接受一個(gè)參數(shù),表示最多等待多少時(shí)間
  • start 方法:?jiǎn)?dòng)子進(jìn)程
  • run 方法:如果創(chuàng)建子進(jìn)程對(duì)象時(shí)沒(méi)有指定 target 參數(shù),則會(huì)在 start 時(shí)執(zhí)行子進(jìn)程中的 run 方法
  • terminate 方法:立即終止子進(jìn)程,不論任務(wù)是否完成

擴(kuò)展 Process 類

除了使用 Process 類直接創(chuàng)建進(jìn)程,我們也可以自定義一個(gè)繼承于 Process 的類來(lái)進(jìn)行創(chuàng)建,提高了擴(kuò)展性:

# 導(dǎo)入 Process 類
from multiprocessing import Process
from time import sleep

# 定義 Process 類的子類
class CreateSubProcess(Process):
    def __init__(self,name,maxRange):
        Process.__init__(self)
        self.name = name
        self.__maxRange = maxRange
    def run(self):
        for i in range(self.__maxRange):
            print("%s 正在輸出 %s"%(self.name,i))
            sleep(1)

if __name__ == '__main__':
    p = CreateSubProcess("子進(jìn)程01",5)
    p.start()

運(yùn)行結(jié)果:

PS C:\Users\Charley\Desktop\py> python .\py.py
子進(jìn)程01 正在輸出 0
子進(jìn)程01 正在輸出 1
子進(jìn)程01 正在輸出 2
子進(jìn)程01 正在輸出 3
子進(jìn)程01 正在輸出 4
PS C:\Users\Charley\Desktop\py>

我們定義了 Process 的子類,實(shí)現(xiàn)了在其基礎(chǔ)上的擴(kuò)展,可以把某個(gè)獨(dú)立進(jìn)程相關(guān)的方法都放在子類里面,起到了很好的封裝作用。

進(jìn)程池

除了使用 Process 類創(chuàng)建進(jìn)程,也可以運(yùn)用進(jìn)程池來(lái)實(shí)現(xiàn)多進(jìn)程,更加節(jié)省資源和方便。進(jìn)程池方便了資源管理和調(diào)度,要使用進(jìn)程池,需要首先創(chuàng)建一個(gè)進(jìn)程池對(duì)象,創(chuàng)建該對(duì)象時(shí)需要傳入一個(gè)參數(shù),表示池子中的最大進(jìn)程數(shù),我們所有的多任務(wù)都可以通過(guò)進(jìn)程池中的進(jìn)程來(lái)完成,而不是每個(gè)任務(wù)都創(chuàng)建一個(gè)進(jìn)程,節(jié)省了資源,并且進(jìn)程池會(huì)自動(dòng)幫我們調(diào)度資源。

創(chuàng)建進(jìn)程池

創(chuàng)建進(jìn)程池很簡(jiǎn)單,只需使用 multiprocessing 模塊中的 Pool 類:

pool = Pool([ maxValue ])

在創(chuàng)建進(jìn)程池對(duì)象時(shí)可以接受一個(gè)參數(shù),表示池子中的最大進(jìn)程數(shù),如果不傳入?yún)?shù),表示無(wú)進(jìn)程數(shù)限制。

進(jìn)程池的幾個(gè)常用方法

下面是幾個(gè)進(jìn)程池中的常用方法:

  • apply_async( target,args,kwargs ):接受一個(gè)目標(biāo)函數(shù)作為池子中某個(gè)進(jìn)程的執(zhí)行函數(shù),異步添加
  • apply( target,args,kwargs ):同步添加,只有在當(dāng)前進(jìn)程執(zhí)行完成后再添加目標(biāo)目標(biāo)函數(shù),會(huì)造成進(jìn)程池中其他進(jìn)程的阻塞,少用
  • close:關(guān)閉進(jìn)程池,關(guān)閉后不再接受任務(wù)
  • join:主進(jìn)程等待進(jìn)程池中的進(jìn)程執(zhí)行完成后再執(zhí)行,因?yàn)?strong>使用進(jìn)程池創(chuàng)建進(jìn)程時(shí),主進(jìn)程默認(rèn)不會(huì)等待子進(jìn)程執(zhí)行完成,因此該方法較常用,但只能放在 close 方法后執(zhí)行。

下面來(lái)看一個(gè)進(jìn)程池的例子:

from multiprocessing import Pool
from time import sleep
import os

def getNum(maxRange,delay):
    for i in range(maxRange):
        print("進(jìn)程 %s 正在輸出 %d"%(os.getpid(),i))
        sleep(delay)

if __name__ == '__main__':
    # 定義進(jìn)程池
    pool = Pool(3)
    # 向進(jìn)程池中添加任務(wù)
    for i in range(5):
        pool.apply_async(getNum,(3,1))
    # 關(guān)閉進(jìn)程池,不在接受任務(wù)
    pool.close()
    pool.join()

運(yùn)行結(jié)果:

PS C:\Users\Charley\Desktop\py> python .\py.py
進(jìn)程 2584 正在輸出 0
進(jìn)程 12780 正在輸出 0
進(jìn)程 6072 正在輸出 0
進(jìn)程 12780 正在輸出 1
進(jìn)程 2584 正在輸出 1
進(jìn)程 6072 正在輸出 1
進(jìn)程 2584 正在輸出 2
進(jìn)程 12780 正在輸出 2
進(jìn)程 6072 正在輸出 2
進(jìn)程 2584 正在輸出 0
進(jìn)程 12780 正在輸出 0
進(jìn)程 12780 正在輸出 1
進(jìn)程 2584 正在輸出 1
進(jìn)程 2584 正在輸出 2
進(jìn)程 12780 正在輸出 2
PS C:\Users\Charley\Desktop\py>

上面我們創(chuàng)建了容量為 3 的進(jìn)程池,并向其中仍進(jìn)去了 5 個(gè)任務(wù),一開(kāi)始進(jìn)程池中進(jìn)程數(shù)是小于任務(wù)數(shù)的,所以會(huì)先執(zhí)行 3 個(gè)任務(wù),當(dāng)池子中有進(jìn)程完成任務(wù)后,再執(zhí)行后面的任務(wù),直到所有任務(wù)執(zhí)行完成為止。

進(jìn)程間通信

前面我們說(shuō)到過(guò),多個(gè)進(jìn)程間是彼此獨(dú)立的,無(wú)法直接修改同一份數(shù)據(jù),但實(shí)際情況中往往是有這樣的需求的,于是就要使用進(jìn)程間通信。進(jìn)程間通信的方式有很多,比如共享內(nèi)存、socket、網(wǎng)絡(luò)、Queue 等,我們這里討論的是使用 Queue 進(jìn)行進(jìn)程間通信。

Queue

Queue 也是 multiprocessing 模塊中的一個(gè)類,意為“隊(duì)列”,創(chuàng)建一個(gè) Queue 對(duì)象:

queue = Queue( [maxVal] )

在創(chuàng)建 Queue 對(duì)象時(shí),可以傳入一個(gè)參數(shù),表示隊(duì)列中的最大消息數(shù),如果不傳,表示無(wú)限制。Queue 對(duì)象中的常用方法有:

  • qsize:返回當(dāng)前隊(duì)列中包含的消息數(shù)量
  • empty:判斷隊(duì)列是否為空
  • full:判斷隊(duì)列是否充滿
  • get([ block, [timeout]]):從隊(duì)列中取出消息,如果當(dāng)前隊(duì)列為空,則進(jìn)入阻塞狀態(tài),直到隊(duì)列中有消息可以被取用為止。如果 block 設(shè)置為 False,則在隊(duì)列為空時(shí)拋出 Queue.Empty 異常。該方法也可以接受一個(gè)超時(shí)參數(shù),表示最多等待多長(zhǎng)時(shí)間,如果超過(guò)等待時(shí)間仍然沒(méi)有取出消息,也會(huì)拋出 Queue.Empty 異常。
  • get_nowait:相當(dāng)于 get(False)
  • put(item, [block]):向隊(duì)列中放入消息,當(dāng)隊(duì)列充滿時(shí),會(huì)進(jìn)入等待狀態(tài),直到隊(duì)列中可以放入消息為止。如果 block 設(shè)置為 False,則在隊(duì)列充滿時(shí)拋出 Queue.Full 異常。
  • put_nowait:相當(dāng)于 put(item,False)

使用 Queue 進(jìn)行進(jìn)程間通信

我們可以利用 Queue 進(jìn)行進(jìn)程間通信,只需將 Queue 對(duì)象傳入相應(yīng)的任務(wù)函數(shù)中:

from multiprocessing import Queue,Process
from time import sleep
import os

def getVal(queue):
    while True:
        if queue.empty():
            print("進(jìn)程 %s 已經(jīng)取完了~"%os.getpid())
            break
        else:
            print("進(jìn)程 %s 獲取到了數(shù)據(jù) %d"%(os.getpid(),queue.get()))
            sleep(1)

def putVal(queue,maxRange):
    for i in range(maxRange):
        queue.put(i)
        print("進(jìn)程 %s 放入了數(shù)據(jù) %d"%(os.getpid(),i))
        sleep(1)

# 創(chuàng)建隊(duì)列
queue = Queue()
# 創(chuàng)建兩個(gè)進(jìn)程
gv = Process(target = getVal,args = (queue,))
pv = Process(target = putVal,args = (queue,10))

# 啟動(dòng)進(jìn)程
if __name__ == "__main__":
    pv.start()
    gv.start()

運(yùn)行結(jié)果:

PS C:\Users\Charley\Desktop\py> python .\py.py
進(jìn)程 22208 放入了數(shù)據(jù) 0
進(jìn)程 20656 獲取到了數(shù)據(jù) 0
進(jìn)程 22208 放入了數(shù)據(jù) 1
進(jìn)程 20656 獲取到了數(shù)據(jù) 1
進(jìn)程 22208 放入了數(shù)據(jù) 2
進(jìn)程 20656 獲取到了數(shù)據(jù) 2
進(jìn)程 22208 放入了數(shù)據(jù) 3
進(jìn)程 20656 獲取到了數(shù)據(jù) 3
進(jìn)程 22208 放入了數(shù)據(jù) 4
進(jìn)程 20656 獲取到了數(shù)據(jù) 4
進(jìn)程 22208 放入了數(shù)據(jù) 5
進(jìn)程 20656 獲取到了數(shù)據(jù) 5
進(jìn)程 22208 放入了數(shù)據(jù) 6
進(jìn)程 20656 獲取到了數(shù)據(jù) 6
進(jìn)程 22208 放入了數(shù)據(jù) 7
進(jìn)程 20656 獲取到了數(shù)據(jù) 7
進(jìn)程 22208 放入了數(shù)據(jù) 8
進(jìn)程 20656 獲取到了數(shù)據(jù) 8
進(jìn)程 22208 放入了數(shù)據(jù) 9
進(jìn)程 20656 獲取到了數(shù)據(jù) 9
進(jìn)程 20656 已經(jīng)取完了~
PS C:\Users\Charley\Desktop\py>

進(jìn)程池中的通信

上面的進(jìn)程間通信是基于 Process類(或者其子類)創(chuàng)建出的進(jìn)程,可以直接使用 multiprocessing 中的 Queue隊(duì)列,但對(duì)于進(jìn)程池,需要使用 multiprocessing 中的 Manager 類,除此之外,其他操作并沒(méi)有變化:

# 導(dǎo)入 Manager 和 Pool 類
from multiprocessing import Manager,Pool
from time import sleep
import os

def getVal(queue):
    while True:
        if queue.empty():
            print("進(jìn)程 %s 已經(jīng)取完了~"%os.getpid())
            break
        else:
            print("進(jìn)程 %s 獲取到了數(shù)據(jù) %d"%(os.getpid(),queue.get()))
            sleep(1)

def putVal(queue,maxRange):
    for i in range(maxRange):
        queue.put(i)
        print("進(jìn)程 %s 放入了數(shù)據(jù) %d"%(os.getpid(),i))
        sleep(1)

# 入口方法
def main():
    # 創(chuàng)建隊(duì)列
    queue = Manager().Queue()
    # 創(chuàng)建進(jìn)程池
    pool = Pool(2)
    pool.apply_async(putVal,(queue,5))
    pool.apply_async(getVal,(queue,))

    # 關(guān)閉進(jìn)程池并使主進(jìn)程等待
    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

運(yùn)行結(jié)果如下:

PS C:\Users\Charley\Desktop\py> python .\py.py
進(jìn)程 15304 放入了數(shù)據(jù) 0
進(jìn)程 17264 獲取到了數(shù)據(jù) 0
進(jìn)程 15304 放入了數(shù)據(jù) 1
進(jìn)程 17264 獲取到了數(shù)據(jù) 1
進(jìn)程 15304 放入了數(shù)據(jù) 2
進(jìn)程 17264 獲取到了數(shù)據(jù) 2
進(jìn)程 15304 放入了數(shù)據(jù) 3
進(jìn)程 17264 獲取到了數(shù)據(jù) 3
進(jìn)程 15304 放入了數(shù)據(jù) 4
進(jìn)程 17264 獲取到了數(shù)據(jù) 4
進(jìn)程 17264 已經(jīng)取完了~
PS C:\Users\Charley\Desktop\py>

針對(duì)于進(jìn)程池中的通信,只需改變創(chuàng)建隊(duì)列對(duì)象的方式即可,其他的操作都不變。

總結(jié)

本文主要介紹了 Python 中的多進(jìn)程,下面是一些要點(diǎn):

  • 使用 fork 在類 UNIX 操作系統(tǒng)創(chuàng)建進(jìn)程
  • 使用兼容各平臺(tái)的 Process
  • 擴(kuò)展 Process 類,封裝進(jìn)程模塊
  • 使用 Pool 創(chuàng)建進(jìn)程池
  • 幾種創(chuàng)建方式下,主進(jìn)程是否會(huì)等待子進(jìn)程執(zhí)行完成
  • 進(jìn)程對(duì)象的幾個(gè)常用方法
  • 進(jìn)程池對(duì)象的常用方法
  • 隊(duì)列的常用方法
  • 進(jìn)程間通信
  • 使用進(jìn)程池時(shí)如何通信

完。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 一、進(jìn)程的概念 相信很多同學(xué)都聽(tīng)說(shuō)過(guò)windows、linux,MacOS都是多任務(wù),多用戶的操作系統(tǒng)。那什么是多...
    轉(zhuǎn)身后的那一回眸閱讀 1,093評(píng)論 0 1
  • 現(xiàn)在, 多核CPU已經(jīng)非常普及了, 但是, 即使過(guò)去的單核CPU, 也可以執(zhí)行多任務(wù)。 CPU執(zhí)行代碼都是順序執(zhí)行...
    LittlePy閱讀 4,932評(píng)論 0 3
  • 本文是筆者學(xué)習(xí)廖雪峰Python3教程的筆記,在此感謝廖老師的教程讓我們這些初學(xué)者能夠一步一步的進(jìn)行下去.如果讀者...
    相關(guān)函數(shù)閱讀 5,616評(píng)論 1 8
  • fork方式創(chuàng)建進(jìn)程 簡(jiǎn)單的fork 主進(jìn)程fork時(shí)返回值大于0,子進(jìn)程fork時(shí)返回值等于0 os.getpi...
    cHl0aG9u閱讀 836評(píng)論 0 1
  • 進(jìn)程的基本概念 進(jìn)程是程序的一次執(zhí)行,每個(gè)進(jìn)程都有自己的地址空間,內(nèi)存,數(shù)據(jù)棧以及其他記錄其運(yùn)行軌跡的輔助數(shù)據(jù)。多...
    XYZeroing閱讀 1,258評(píng)論 0 13

友情鏈接更多精彩內(nèi)容