python 多進(jìn)程和多線程

多進(jìn)程

要讓python程序?qū)崿F(xiàn)多進(jìn)程,我們先了解操作系統(tǒng)的相關(guān)知識(shí)。

Unix、Linux操作系統(tǒng)提供了一個(gè)fork()系統(tǒng)調(diào)用,它非常特殊。普通的函數(shù)調(diào)用,調(diào)用一次,返回一次,但是fork()調(diào)用一次,返回兩次,因?yàn)椴僮飨到y(tǒng)自動(dòng)把當(dāng)前進(jìn)程(稱為父進(jìn)程)復(fù)制了一份(稱為子進(jìn)程),然后,分別在父進(jìn)程和子進(jìn)程內(nèi)返回。

子進(jìn)程永遠(yuǎn)返回0,而父進(jìn)程返回子進(jìn)程的ID。這樣做的理由是,一個(gè)父進(jìn)程可以fork出很多子進(jìn)程,所以,父進(jìn)程要記下每個(gè)子進(jìn)程的ID,而子進(jìn)程只需要調(diào)用getppid() 就可以拿到父進(jìn)程的ID

python的os模塊封裝了常見的系統(tǒng)調(diào)用,其中就包括fork,可以再python程序中輕松創(chuàng)建子進(jìn)程:

import os

print("Process (%s)? start..." % os.getpid())

pid = os.fork()

if pid == 0:

? ? ? ? print("i am child process (%s) and my parent is %s." % (os.getpid(),os.getppid()))

else:

? ? ? ? print("I (%s) just created a child process (%s)" % (os.getpid(),pid))

運(yùn)行結(jié)果如下:

Process (876) start...

I (876) just created a child process (877).

I am child process (877) and my parent is 876.

由于Windows沒有fork調(diào)用,上面的代碼在windows上無(wú)法運(yùn)行。由于Mac系統(tǒng)是基于BSD(unix的一種)內(nèi)核,所以在Mac下運(yùn)行是沒有問題的,推薦大家用Mac學(xué)python

有了fork調(diào)用,一個(gè)進(jìn)程在接到新任務(wù)時(shí)就可以復(fù)制出一個(gè)子進(jìn)程來(lái)處理新任務(wù),常見的Apache服務(wù)器就是由父進(jìn)程監(jiān)聽端口,每當(dāng)有新的http請(qǐng)求時(shí),就fork出子進(jìn)程來(lái)處理新的http請(qǐng)求。

multiprocessing

如果你打算編寫多進(jìn)程的服務(wù)程序,Unix/Linux無(wú)疑是正確的選擇。由于windows沒有fork調(diào)用,難道在windows上無(wú)法用python編寫多進(jìn)程的程序?

由于python是跨平臺(tái)的,自然也應(yīng)該提供一個(gè)跨平臺(tái)的多進(jìn)程支持。multiprocessing模塊就是跨平臺(tái)版本的多進(jìn)程模塊。

multiprocessing模塊提供了一個(gè)Process類來(lái)代表一個(gè)進(jìn)程對(duì)象,下面的例子演示了一個(gè)子進(jìn)程并等待其結(jié)束:

from multiprocessing import Process

import os

# 子進(jìn)程要執(zhí)行的代碼

def run_proc(name):

? ? ? ? print('Run child process %s (%s)...' % (name,os.getpid()))

if __name__ == "__main__":

? ? ? ? print("Parent process %s." % os.getpid())

? ? ? ? p = Process(target = run_proc,args = ("test",))

? ? ? ? print("Child process will start.")

? ? ? ? p.start()

? ? ? ? p.join()

? ? ? ? print("Child process end")

執(zhí)行結(jié)果如下:

Parent process 928.

Process will start

Run child process test (929)...

Process end.

創(chuàng)建子進(jìn)程時(shí),只需要傳入一個(gè)執(zhí)行函數(shù)和函數(shù)的參數(shù),創(chuàng)建一個(gè)Process實(shí)例,用start()方法啟動(dòng),這樣創(chuàng)建進(jìn)程比f(wàn)ork()還要簡(jiǎn)單。

join()方法可以等待子進(jìn)程結(jié)束后再繼續(xù)往下運(yùn)行,通常用于進(jìn)程間的同步。

Pool

如果要啟動(dòng)大量的子進(jìn)程,可以用進(jìn)程池的方式批量創(chuàng)建子進(jìn)程:

from multiprocessing import Pool

import os,time,random

def long_time_task(name):

? ? ? ? print("Run task %s (%s)..." % (name,os.getpid()))

? ? ? ? start = time.time()

? ? ? ? time.sleep(random.random() * 3)

? ? ? ? end = time.time()

? ? ? ? print("Task %s runs %0.2f seconds." % (name,(end - start)))

if __name__ == "__main__":

? ? ? ? print('Parent process %s.'% os.getpid())?

? ? ? ? p = Pool(4)

????????foriinrange(5):?

? ? ? ? ????????p.apply_async(long_time_task, args=(i,))?

? ? ? ? ? ? ? ? print('Waiting for all subprocesses done...')

? ? ? ? ? ? ? ? p.close()?

? ? ? ? ? ? ? ? p.join()?

? ? ? ? ? ? ? ? print('All subprocesses done.')

執(zhí)行結(jié)果如下:

Parentprocess669.

Waitingforall subprocesses done...

Run task0(671)...

Run task1(672)...

Run task2(673)...

Run task3(674)...

Task2runs0.14seconds.

Runtask4(673)...

Task1runs0.27seconds.

Task3runs0.86seconds.

Task0runs1.41seconds.

Task4runs1.91seconds.

All subprocesses done.

代碼解讀:

對(duì)Pool對(duì)象調(diào)用join()方法會(huì)等待所有子進(jìn)程執(zhí)行完畢,調(diào)用join()之前必須先調(diào)用close(),調(diào)用close()之后就不能繼續(xù)添加新的Process了。

請(qǐng)注意輸出的結(jié)果,task0,1,2,3是立刻執(zhí)行的,而task4要等待前面某個(gè)task完成后才執(zhí)行,這是因?yàn)镻ool的默認(rèn)大小在我的電腦上是4,因此,最多同時(shí)執(zhí)行4個(gè)進(jìn)程。這是Pool有意設(shè)計(jì)的限制,并不是操作系統(tǒng)的限制。如果改成:

p = Pool(5)

就可以同時(shí)跑5個(gè)進(jìn)程。

由于Pool的默認(rèn)大小是CPU的核數(shù),如果你不幸擁有8核CPU,你要提交至少9個(gè)子進(jìn)程才能看到上面的等待效果

子進(jìn)程

很多時(shí)候,子進(jìn)程并不是自身,而是一個(gè)外部進(jìn)程。我們創(chuàng)建了子進(jìn)程后,還需要控制子進(jìn)程的輸入和輸出。

subprocess模塊可以讓我們非常方便地啟動(dòng)一個(gè)子進(jìn)程,然后控制其輸入和輸出。

下面的例子演示了如何在Python代碼中運(yùn)行命令nslookup www.python.org,這和命令行直接運(yùn)行的效果是一樣的:

import subprocess

print('$ nslookup www.python.org')

r = subprocess.call(['nslookup','www.python.org'])

print('Exit code:', r)

運(yùn)行結(jié)果:

$ nslookup www.python.org

Server:192.168.19.4

Address:192.168.19.4#53

Non-authoritativeanswer:

www.python.org \? ? canonical name = python.map.fastly.net.?

Name:python.map.fastly.net

Address:199.27.79.223

Exitcode:0

如果子進(jìn)程還需要輸入,則可以通過communicate()方法輸入:

importsubprocess

print('$ nslookup')

p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=sub

process.PIPE, stderr=subprocess.PIPE)

output, err = p.communicate(b'set q=mx\npython.org\nexit\n')

print(output.decode('utf-8'))

print('Exit code:', p.returncode)

上面的代碼相當(dāng)于命令行執(zhí)行命令nslookup,然后手動(dòng)輸入:

set q=mx

python.org

exit

運(yùn)行結(jié)果如下:

$ nslookup

Server:192.168.19.4

Address:192.168.19.4#53

Non-authoritativeanswer:

python.org?

mail exchanger =50mail.python.org.

Authoritativeanswers can be foundfrom:

mail.python.org internet address =82.94.164.166

mail.python.org hasAAAAaddress2001:888:2000:d::a6

Exitcode:0

進(jìn)程間通信

Process之間肯定是需要通信的,操作系統(tǒng)提供了很多機(jī)制來(lái)實(shí)現(xiàn)進(jìn)程間的通信。Python的multiprocessing模塊包裝了底層的機(jī)制,提供了Queue、Pipes等多種方式來(lái)交換數(shù)據(jù)。

我們以Queue為例,在父進(jìn)程中創(chuàng)建兩個(gè)子進(jìn)程,一個(gè)往Queue里寫數(shù)據(jù),一個(gè)從Queue里讀數(shù)據(jù):

frommultiprocessingimportProcess, Queue

importos, time, random

# 寫數(shù)據(jù)進(jìn)程執(zhí)行的代碼:

defwrite(q):

????????print('Process to write: %s'% os.getpid())

????????forvaluein['A','B','C']:?

? ? ? ? print('Put %s to queue...'% value)

? ? ? ? q.put(value)

????????time.sleep(random.random())

# 讀數(shù)據(jù)進(jìn)程執(zhí)行的代碼:

defread(q):

????????print('Process to read: %s'% os.getpid())

????????whileTrue:?

? ? ? ? ????????value = q.get(True)

????????????????print('Get %s from queue.'% value)

if__name__=='__main__'

????????:# 父進(jìn)程創(chuàng)建Queue,并傳給各個(gè)子進(jìn)程:

????????q = Queue()

????????pw = Process(target=write, args=(q,))

????????pr = Process(target=read, args=(q,))

????????# 啟動(dòng)子進(jìn)程pw,寫入:

????????pw.start()

????????# 啟動(dòng)子進(jìn)程pr,讀取:

????????pr.start()

????????# 等待pw結(jié)束:

????????pw.join()

????????# pr進(jìn)程里是死循環(huán),無(wú)法等待其結(jié)束,只能強(qiáng)行終止:

????????pr.terminate()

運(yùn)行結(jié)果如下:

Process to write:50563

Put A to queue...

Process to read:50564

Get Afromqueue.

Put B to queue...

Get Bfromqueue.

Put C to queue...

Get Cfromqueue.

在Unix/Linux下,multiprocessing模塊封裝了fork()調(diào)用,使我們不需要關(guān)注fork()的細(xì)節(jié)。由于Windows沒有fork調(diào)用,因此,multiprocessing需要“模擬”出fork的效果,父進(jìn)程所有Python對(duì)象都必須通過pickle序列化再傳到子進(jìn)程去,所有,如果multiprocessing在Windows下調(diào)用失敗了,要先考慮是不是pickle失敗了。

小結(jié)

在Unix/Linux下,可以使用fork()調(diào)用實(shí)現(xiàn)多進(jìn)程。

要實(shí)現(xiàn)跨平臺(tái)的多進(jìn)程,可以使用multiprocessing模塊。

進(jìn)程間通信是通過Queue、Pipes等實(shí)現(xiàn)的。

多線程

多任務(wù)可以由多進(jìn)程完成,也可以由一個(gè)進(jìn)程內(nèi)的多線程完成。

我們前面提到了進(jìn)程是由若干線程組成的,一個(gè)進(jìn)程至少有一個(gè)線程。

由于線程是操作系統(tǒng)直接支持的執(zhí)行單元,因此,高級(jí)語(yǔ)言通常都內(nèi)置多線程的支持,Python也不例外,并且,Python的線程是真正的Posix Thread,而不是模擬出來(lái)的線程。

Python的標(biāo)準(zhǔn)庫(kù)提供了兩個(gè)模塊:_thread和threading,_thread是低級(jí)模塊,threading是高級(jí)模塊,對(duì)_thread進(jìn)行了封裝。絕大多數(shù)情況下,我們只需要使用threading這個(gè)高級(jí)模塊。

啟動(dòng)一個(gè)線程就是把一個(gè)函數(shù)傳入并創(chuàng)建Thread實(shí)例,然后調(diào)用start()開始執(zhí)行:

importtime, threading

# 新線程執(zhí)行的代碼:

defloop():

????????print('thread %s is running...'% threading.current_thread().name)?

? ? ? ? n =0

????????while n <5:?

? ? ? ? ????????n = n +1

????????????????print('thread %s >>> %s'% (threading.current_thread().name, n))?

????????????????time.sleep(1)

????????print('thread %s ended.'% threading.current_thread().name)

print('thread %s is running...'% threading.current_thread().name)

t = threading.Thread(target=loop, name='LoopThread')

t.start()

t.join()

print('thread %s ended.'% threading.current_thread().name)

執(zhí)行結(jié)果如下:

thread MainThreadisrunning...

thread LoopThreadisrunning...

thread LoopThread >>>1

thread LoopThread >>>2

thread LoopThread >>>3

thread LoopThread >>>4

thread LoopThread >>>5

thread LoopThread ended.

thread MainThread ended.

由于任何進(jìn)程默認(rèn)就會(huì)啟動(dòng)一個(gè)線程,我們把該線程稱為主線程,主線程又可以啟動(dòng)新的線程,Python的threading模塊有個(gè)current_thread()函數(shù),它永遠(yuǎn)返回當(dāng)前線程的實(shí)例。主線程實(shí)例的名字叫MainThread,子線程的名字在創(chuàng)建時(shí)指定,我們用LoopThread命名子線程。名字僅僅在打印時(shí)用來(lái)顯示,完全沒有其他意義,如果不起名字Python就自動(dòng)給線程命名為Thread-1,Thread-2……

Lock

多線程和多進(jìn)程最大的不同在于,多進(jìn)程中,同一個(gè)變量,各自有一份拷貝存在于每個(gè)進(jìn)程中,互不影響,而多線程中,所有變量都由所有線程共享,所以,任何一個(gè)變量都可以被任何一個(gè)線程修改,因此,線程之間共享數(shù)據(jù)最大的危險(xiǎn)在于多個(gè)線程同時(shí)改一個(gè)變量,把內(nèi)容給改亂了。

來(lái)看看多個(gè)線程同時(shí)操作一個(gè)變量怎么把內(nèi)容給改亂了:

importtime, threading

# 假定這是你的銀行存款:

balance =0

defchange_it(n):

????????# 先存后取,結(jié)果應(yīng)該為0:

????????globalbalance?

? ? ? ? balance = balance + n

????????balance = balance - n

defrun_thread(n):

????????for i in range(100000):

????????change_it(n)

t1 = threading.Thread(target=run_thread, args=(5,))

t2 = threading.Thread(target=run_thread, args=(8,))

t1.start()

t2.start()

t1.join()

t2.join()

print(balance)

我們定義了一個(gè)共享變量balance,初始值為0,并且啟動(dòng)兩個(gè)線程,先存后取,理論上結(jié)果應(yīng)該為0,但是,由于線程的調(diào)度是由操作系統(tǒng)決定的,當(dāng)t1、t2交替執(zhí)行時(shí),只要循環(huán)次數(shù)足夠多,balance的結(jié)果就不一定是0了。

原因是因?yàn)楦呒?jí)語(yǔ)言的一條語(yǔ)句在CPU執(zhí)行時(shí)是若干條語(yǔ)句,即使一個(gè)簡(jiǎn)單的計(jì)算:

balance = balance + n

也分兩步:

? ? 1. 計(jì)算 balance + n ,存入臨時(shí)變量中;

? ? 2. 將臨時(shí)變量的值賦值給balance.

也就是可以看成:

x = balance + n

balance = x

由于x是局部變量,兩個(gè)線程各自都有自己的x,當(dāng)代碼正常執(zhí)行時(shí):

初始值 balance = 0

t1:x1 = balance +5# x1 = 0 + 5 = 5

t1:balance = x1# balance = 5

t1:x1 = balance -5# x1 = 5 - 5 = 0

t1:balance = x1# balance = 0

t2:x2 = balance +8# x2 = 0 + 8 = 8

t2:balance = x2# balance = 8

t2:x2 = balance -8# x2 = 8 - 8 = 0

t2:balance = x2# balance = 0

結(jié)果 balance =0

但是t1和t2是交替運(yùn)行的,如果操作系統(tǒng)以下面的順序執(zhí)行t1、t2:

初始值 balance =0

t1:x1 = balance +5# x1 = 0 + 5 = 5

t2:x2 = balance +8# x2 = 0 + 8 = 8

t2:balance = x2# balance = 8

t1:balance = x1# balance = 5

t1:x1 = balance -5# x1 = 5 - 5 = 0

t1:balance = x1# balance = 0

t2:x2 = balance -8# x2 = 0 - 8 = -8

t2:balance = x2# balance = -8

結(jié)果 balance = -8

究其原因,是因?yàn)樾薷腷alance需要多條語(yǔ)句,而執(zhí)行這幾條語(yǔ)句時(shí),線程可能中斷,從而導(dǎo)致多個(gè)線程把同一個(gè)對(duì)象的內(nèi)容改亂了。

兩個(gè)線程同時(shí)一存一取,就可能導(dǎo)致余額不對(duì),你肯定不希望你的銀行存款莫名其妙地變成了負(fù)數(shù),所以,我們必須確保一個(gè)線程在修改balance的時(shí)候,別的線程一定不能改。

如果我們要確保balance計(jì)算正確,就要給change_it()上一把鎖,當(dāng)某個(gè)線程開始執(zhí)行change_it()時(shí),我們說(shuō),該線程因?yàn)楂@得了鎖,因此其他線程不能同時(shí)執(zhí)行change_it(),只能等待,直到鎖被釋放后,獲得該鎖以后才能改。由于鎖只有一個(gè),無(wú)論多少線程,同一時(shí)刻最多只有一個(gè)線程持有該鎖,所以,不會(huì)造成修改的沖突。創(chuàng)建一個(gè)鎖就是通過threading.Lock()來(lái)實(shí)現(xiàn):

balance =0

lock = threading.Lock()

defrun_thread(n):

????????foriinrange(100000):

????????# 先要獲取鎖:lock.acquire()

????????try:

????????????????# 放心地改吧:

????????????????change_it(n)

????????finally:

????????????????# 改完了一定要釋放鎖:

????????????????lock.release()

當(dāng)多個(gè)線程同時(shí)執(zhí)行l(wèi)ock.acquire()時(shí),只有一個(gè)線程能成功地獲取鎖,然后繼續(xù)執(zhí)行代碼,其他線程就繼續(xù)等待直到獲得鎖為止。

獲得鎖的線程用完后一定要釋放鎖,否則那些苦苦等待鎖的線程將永遠(yuǎn)等待下去,成為死線程。所以我們用try...finally來(lái)確保鎖一定會(huì)被釋放。

鎖的好處就是確保了某段關(guān)鍵代碼只能由一個(gè)線程從頭到尾完整地執(zhí)行,壞處當(dāng)然也很多,首先是阻止了多線程并發(fā)執(zhí)行,包含鎖的某段代碼實(shí)際上只能以單線程模式執(zhí)行,效率就大大地下降了。其次,由于可以存在多個(gè)鎖,不同的線程持有不同的鎖,并試圖獲取對(duì)方持有的鎖時(shí),可能會(huì)造成死鎖,導(dǎo)致多個(gè)線程全部掛起,既不能執(zhí)行,也無(wú)法結(jié)束,只能靠操作系統(tǒng)強(qiáng)制終止。

多核CPU

如果你不幸擁有一個(gè)多核CPU,你肯定在想,多核應(yīng)該可以同時(shí)執(zhí)行多個(gè)線程。

如果寫一個(gè)死循環(huán)的話,會(huì)出現(xiàn)什么情況呢?

打開Mac OS X的Activity Monitor,或者Windows的Task Manager,都可以監(jiān)控某個(gè)進(jìn)程的CPU使用率。

我們可以監(jiān)控到一個(gè)死循環(huán)線程會(huì)100%占用一個(gè)CPU。

如果有兩個(gè)死循環(huán)線程,在多核CPU中,可以監(jiān)控到會(huì)占用200%的CPU,也就是占用兩個(gè)CPU核心。

要想把N核CPU的核心全部跑滿,就必須啟動(dòng)N個(gè)死循環(huán)線程。

試試用Python寫個(gè)死循環(huán):

import threading, multiprocessing

defloop():

????????x =0

????????whileTrue:

????????????????x = x ^1

for i in range(multiprocessing.cpu_count()):

????????t = threading.Thread(target=loop)

????????t.start()

啟動(dòng)與CPU核心數(shù)量相同的N個(gè)線程,在4核CPU上可以監(jiān)控到CPU占用率僅有102%,也就是僅使用了一核。

但是用C、C++或Java來(lái)改寫相同的死循環(huán),直接可以把全部核心跑滿,4核就跑到400%,8核就跑到800%,為什么Python不行呢?

因?yàn)镻ython的線程雖然是真正的線程,但解釋器執(zhí)行代碼時(shí),有一個(gè)GIL鎖:Global Interpreter Lock,任何Python線程執(zhí)行前,必須先獲得GIL鎖,然后,每執(zhí)行100條字節(jié)碼,解釋器就自動(dòng)釋放GIL鎖,讓別的線程有機(jī)會(huì)執(zhí)行。這個(gè)GIL全局鎖實(shí)際上把所有線程的執(zhí)行代碼都給上了鎖,所以,多線程在Python中只能交替執(zhí)行,即使100個(gè)線程跑在100核CPU上,也只能用到1個(gè)核。

GIL是Python解釋器設(shè)計(jì)的歷史遺留問題,通常我們用的解釋器是官方實(shí)現(xiàn)的CPython,要真正利用多核,除非重寫一個(gè)不帶GIL的解釋器。

所以,在Python中,可以使用多線程,但不要指望能有效利用多核。如果一定要通過多線程利用多核,那只能通過C擴(kuò)展來(lái)實(shí)現(xiàn),不過這樣就失去了Python簡(jiǎn)單易用的特點(diǎn)。

不過,也不用過于擔(dān)心,Python雖然不能利用多線程實(shí)現(xiàn)多核任務(wù),但可以通過多進(jìn)程實(shí)現(xiàn)多核任務(wù)。多個(gè)Python進(jìn)程有各自獨(dú)立的GIL鎖,互不影響。

小結(jié)

多線程編程,模型復(fù)雜,容易發(fā)生沖突,必須用鎖加以隔離,同時(shí),又要小心死鎖的發(fā)生。

Python解釋器由于設(shè)計(jì)時(shí)有GIL全局鎖,導(dǎo)致了多線程無(wú)法利用多核。多線程的并發(fā)在Python中就是一個(gè)美麗的夢(mèng)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容