為什么需要同步
同樣舉之前的例子,兩個(gè)線程分別對(duì)同一個(gè)全局變量進(jìn)行加減,得不到預(yù)期結(jié)果,代碼如下:
total = 0
def add():
global total
for i in range(1000000):
total += 1
def desc():
global total
for i in range(1000000):
total -= 1
import threading
thread1 = threading.Thread(target=add)
thread2 = threading.Thread(target=desc)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(total)
原因就是因?yàn)?+= 和 -=并不是原子操作??梢允褂?code>dis模塊查看字節(jié)碼:
import dis
def add(total):
total += 1
def desc(total):
total -= 1
total = 0
print(dis.dis(add))
print(dis.dis(desc))
# 運(yùn)行結(jié)果如下:
# 3 0 LOAD_FAST 0 (total)
# 3 LOAD_CONST 1 (1)
# 6 INPLACE_ADD
# 7 STORE_FAST 0 (total)
# 10 LOAD_CONST 0 (None)
# 13 RETURN_VALUE
# None
# 5 0 LOAD_FAST 0 (total)
# 3 LOAD_CONST 1 (1)
# 6 INPLACE_SUBTRACT
# 7 STORE_FAST 0 (total)
# 10 LOAD_CONST 0 (None)
# 13 RETURN_VALUE
# None
可以看到 add()函數(shù)雖然其中只有一行代碼,但是字節(jié)碼主要分為四個(gè)步驟:
- load 變量total
- load 常量 1
- 執(zhí)行加法操作
- 對(duì)total進(jìn)行賦值
同理,desc()函數(shù)的步驟相同,只是第三步改為執(zhí)行減法。
假設(shè)一種極端情況,開始total = 0,首先線程1 load 變量total,得到值為0,切換到線程2,同樣的到total為0,再次切換線程1 load常量1,執(zhí)行加法,給total賦值得到1;然后線程2也 laod常量1,執(zhí)行減法,給total賦值為-1。最終total為-1,而不是預(yù)期的0。
期望中,必須在+=操作結(jié)束后,才能執(zhí)行-=,所以線程同步的需求就出來(lái)了。
互斥鎖Lock
threading模塊中提供了threading.Lock類(互斥鎖),基本用法如下:
import threading
lock = threading.Lock()
lock.acquire() # 獲取鎖
# dosomething…… # 臨界區(qū)的代碼只能被同時(shí)只能被一個(gè)線程運(yùn)行
lock.release() # 釋放鎖
將上面的代碼修改,即可得到正確結(jié)果:
import threading
total = 0
lock = threading.Lock()
def add(lock):
global total
for i in range(1000000):
lock.acquire()
total += 1
lock.release()
def desc(lock):
global total
for i in range(1000000):
lock.acquire()
total -= 1
lock.release()
thread1 = threading.Thread(target=add, args=(lock,))
thread2 = threading.Thread(target=desc, args=(lock,))
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(total)
# 執(zhí)行結(jié)果 0
可以看到,添加互斥鎖之后,程序執(zhí)行結(jié)果是正確的,但是用了互斥鎖之后,同樣有一些缺陷:
- 添加鎖之后,會(huì)影響程序性能。
- 可能引起"死鎖"。
死鎖主要在兩種情況下發(fā)生:
-
迭代死鎖
一個(gè)線程“迭代”請(qǐng)求同一個(gè)資源 ,會(huì)造成死鎖。
lock = threading.Lock() lock.acquire() lock.acquire() total += 1 lock.release() lock.release()上例種,第一次請(qǐng)求資源后還未 release ,再次acquire,最終無(wú)法釋放,造成死鎖 。(可通過(guò)可重入鎖解決這個(gè)問(wèn)題)。
-
互相調(diào)用死鎖
兩個(gè)線程中都會(huì)調(diào)用相同的資源,互相等待對(duì)方結(jié)束的情況 。假設(shè)A線程需要資源a,b,B線程也需要資源a,b,而線程A先獲取資源a后,再獲取資源b, B線程先獲取資源b,再獲取資源a。在A線程獲取資源a,B線程獲取資源b后,A線程在等待B線程釋放資源b,而B線程在等待A線程釋放資源a,從而死鎖就發(fā)生了。
import threading import time lock_a = threading.Lock() lock_b = threading.Lock() def func1(): global lock_a global lock_b lock_a.acquire() time.sleep(1) lock_b.acquire() time.sleep(1) lock_b.release() lock_a.release() def func2(): global lock_a global lock_b lock_b.acquire() time.sleep(1) lock_a.acquire() time.sleep(1) lock_a.release() lock_b.release() thread1 = threading.Thread(target=func1) thread2 = threading.Thread(target=func2) thread1.start() thread2.start() thread1.join() thread2.join() print("program finished") # 程序會(huì)陷入死循環(huán)這個(gè)例子比較重要,開始理解錯(cuò)了,如果B線程獲取了資源b,然后釋放之后再獲取資源a,這樣是不會(huì)發(fā)生死鎖的。只有在B線程獲取了資源b,還沒(méi)有釋放的時(shí)候,獲取了資源a,才會(huì)發(fā)生死鎖。
可重入鎖RLock
為解決同一線程種不能多次請(qǐng)求同一資源的問(wèn)題,python提供了“可重入鎖”:threading.RLock,RLock內(nèi)部維護(hù)著一個(gè)Lock和一個(gè)counter變量,counter記錄了acquire的次數(shù),從而使得資源可以被多次require。直到一個(gè)線程所有的acquire都被release,其他的線程才能獲得資源 。用法和threading.Lock類相同。
將上面迭代死鎖的代碼改寫一下,就不會(huì)發(fā)生死鎖,但注意,調(diào)用acquire和release的次數(shù)必須相等。
lock = threading.RLock()
lock.acquire()
lock.acquire()
total += 1
lock.release()
lock.release()
一般不會(huì)寫這么無(wú)聊的代碼,但是有一種情況是可能發(fā)生的,在加鎖區(qū)域調(diào)用了某個(gè)函數(shù),而這個(gè)函數(shù)內(nèi)部又申請(qǐng)了同樣的資源。
lock = threading.RLock()
def dosomething(lock):
lock.acquire()
# do something
lock.release()
lock.acquire()
dosomething(lock)
lock.release()
總結(jié)
- 線程間訪問(wèn)同一變量需要同步。
- 線程間加鎖會(huì)導(dǎo)致性能損失。
- 加鎖可能產(chǎn)生死鎖,迭代死鎖和互相調(diào)用死鎖。
- 可重入鎖可以避免迭代死鎖。