詳解Python中的多進(jìn)程、進(jìn)程間通信(隊(duì)列和管道) - multiprocessing、Process、Pool、Queue、Pipe詳解

其他關(guān)于Python的總結(jié)文章請?jiān)L問:http://m.itdecent.cn/nb/47435944

詳解Python中的多進(jìn)程、進(jìn)程間通信(隊(duì)列和管道) - multiprocessing、Process、Pool、Queue、Pipe詳解

計(jì)算機(jī)執(zhí)行任務(wù)都是多任務(wù)同步執(zhí)行的,比如使用瀏覽器瀏覽網(wǎng)頁是一個任務(wù),同時使用編輯器寫代碼又是一個任務(wù),計(jì)算機(jī)還有好多的后臺任務(wù),這些任務(wù)都是同時進(jìn)行的。對于多核CUP來說,不同的任務(wù)可以被調(diào)度到多個核上平行進(jìn)行,但是一個核上也同時進(jìn)行著多個任務(wù),在同一個核上的多個任務(wù)實(shí)機(jī)上是交替進(jìn)行的,比如計(jì)算機(jī)使用0.0001秒執(zhí)行一個任務(wù),然后切換到另一個任務(wù)執(zhí)行0.0001秒,如此交替運(yùn)行多個任務(wù),但是由于運(yùn)算速度很快,所以讓我們覺得這些任務(wù)是同時進(jìn)行的。我們可以乘這些任務(wù)為進(jìn)程process)。一個任務(wù)其實(shí)通常還有多個子任務(wù)同時進(jìn)行,這些子任務(wù)被稱之為線程thread),一個進(jìn)程必定至少包括一個進(jìn)程,也可以有多個線程,Python中可以使用多進(jìn)程、多線程、多進(jìn)程同時多線程等方式處理并行程序。

首先,每一個進(jìn)程都可以使用 os.getpid() 的方法獲取此進(jìn)程的pid(要引入os模塊),pid是進(jìn)程的唯一性標(biāo)識。

Python中可以使用 multiprocessing 包來處理多進(jìn)程:

import multiprocessing

使用Process處理多進(jìn)程

使用 Process 可以創(chuàng)建一個進(jìn)程,Process創(chuàng)建進(jìn)程時可以傳入?yún)?shù),target參數(shù)接收一個函數(shù)作為該進(jìn)程要執(zhí)行的內(nèi)容(函數(shù)可以帶參數(shù)),然后使用args參數(shù)為作為target的函數(shù)傳入?yún)?shù),使用元組傳入,最后要留有一個逗號。

一個Process的實(shí)例有如下的一些方法:

  • start: 開始該進(jìn)程
  • join:等待進(jìn)程結(jié)束
  • terminate:強(qiáng)制結(jié)束該進(jìn)程

一個例子:

from multiprocessing import Process
import os


def print_process(text):
    print("Process \n\tPID: {}\n\tTEXT: {}\n".format(os.getpid(), text))


if __name__ == "__main__":
    print("Processes Starting...")
    process1 = Process(target=print_process, args=('process 1',))
    process2 = Process(target=print_process('process 2'))
    process1.start()
    process2.start()
    process1.join()
    process2.terminate()
    print("Processes Ended")

得到的結(jié)果:

Processes Starting...
Process 
    PID: 17152
    TEXT: process 2
Process 
    PID: 16884
    TEXT: process 1
Processes Ended

使用Pool處理多進(jìn)程

Pool可以用于處理大量分進(jìn)程,批量管理,實(shí)例化一個Pool對象可以傳入整數(shù)指定Pool中最大同時執(zhí)行的進(jìn)程數(shù)量,默認(rèn)值為你計(jì)算機(jī)CPU的核數(shù)。如果傳入的參數(shù)大于計(jì)算機(jī)的核數(shù),也就是默認(rèn)值,則需要先跑完核數(shù)個進(jìn)程后才會繼續(xù)執(zhí)行后邊的進(jìn)程(比如計(jì)算機(jī)核數(shù)為8,傳入9作為參數(shù),則會先執(zhí)行8個進(jìn)程,有一個進(jìn)程執(zhí)行結(jié)束后,才會有第九個進(jìn)程開始執(zhí)行)

一個Pool實(shí)例的常用方法有:

  • apply_async:向進(jìn)程池中添加一個進(jìn)程并設(shè)置要執(zhí)行的內(nèi)容,可以傳入一個函數(shù)做參數(shù),然后使用args指定函數(shù)的參數(shù),或者直接將參數(shù)寫在函數(shù)后邊,同理于Process的參數(shù)
  • close:關(guān)閉池,表示不會再給該進(jìn)程池中添加新的進(jìn)程
  • join:等待池中所有進(jìn)程結(jié)束

一個例子:

from multiprocessing import Pool
import os


def print_process(text):
    print("Process \n\tPID: {}\n\tTEXT: {}".format(os.getpid(), text))


if __name__ == "__main__":
    print("Processes Pool Starting...")
    pool = Pool(2)
    for i in range(3):
        pool.apply_async(print_process(i + 1))
    print("All processes have been added into the pool")
    pool.close()
    pool.join()
    print("All processes in the pool have ended")

得到的結(jié)果:

Processes Pool Starting...
Process 
    PID: 17544
    TEXT: 1
Process 
    PID: 17544
    TEXT: 2
Process 
    PID: 17544
    TEXT: 3
All processes have been added into the pool
All processes in the pool have ended

進(jìn)程間通信

進(jìn)程之間進(jìn)行通信依賴于操作系統(tǒng)提供的多種機(jī)制,這里主要詳述Python中常使用的兩種機(jī)制:QueuePipes,它們的區(qū)別在于:

  • Queue:隊(duì)列,是先進(jìn)先出的數(shù)據(jù)結(jié)構(gòu),可以供給多個寫入者和多個讀取者,寫入者向隊(duì)列的隊(duì)尾寫入數(shù)據(jù),讀取者從隊(duì)列的隊(duì)頭讀取數(shù)據(jù)
  • Pipes:管道,只供給于一對進(jìn)程通信,兩個進(jìn)程在管道的兩端,兩個進(jìn)程都可以發(fā)送給對方消息以及接受對方的消息,管道可以理解為一根網(wǎng)線

使用Queue通信

使用Queue創(chuàng)建一個隊(duì)列實(shí)例,一個實(shí)例的主要方法:

  • put:向隊(duì)列的隊(duì)尾寫入數(shù)據(jù)
  • get:從隊(duì)列的隊(duì)頭讀取數(shù)據(jù)

直接看一個例子,這個例子中我們定義兩個進(jìn)程,一個用來向隊(duì)列中寫入數(shù)據(jù),一個用于從隊(duì)列中讀取數(shù)據(jù):

from multiprocessing import Process, Queue
import os, time, random


def writer(queue):
    print("Writing process with PID {}".format(os.getpid()))
    for i in range(5):
        print("Putting {} into the queue".format(i + 1))
        queue.put(i + 1)
        time.sleep(random.random())


def reader(queue):
    print("Reading process with PID {}".format(os.getpid()))
    while True:
        get_value = queue.get()
        print("Getting {} from the queue".format(get_value))


if __name__ == "__main__":
    queue = Queue()
    writer_process = Process(target=writer(queue))
    reader_process = Process(target=reader(queue))

    reader_process.start()
    writer_process.start()

    writer_process.join()
    reader_process.terminate()  # while True, 因此不能等到其結(jié)束,只能使用terminate

使用Pipes通信

使用Pipes創(chuàng)建一個管道實(shí)例,一個管道實(shí)例的主要方法:

  • send:向管道中發(fā)送信息
  • recv:從管道中接收信息
  • close:關(guān)閉管道

直接看一個例子,管道兩端的進(jìn)程互相給對方發(fā)消息:

from multiprocessing import Pipe

if __name__ == "__main__":
    (pipe_left, pipe_right) = Pipe()

    pipe_left.send("Hello, I'm LEFT!")
    pipe_right.send("Hello, I'm RIGHT!")

    print("pipe_left received {}".format(pipe_left.recv()))
    print("pipe_right received {}".format(pipe_right.recv()))

    pipe_left.close()
    pipe_right.close()

得到的結(jié)果為:

pipe_left received Hello, I'm RIGHT!
pipe_right received Hello, I'm LEFT!
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容