1. 內(nèi)存管理機(jī)制
1.1 介紹
- 概要
賦值語(yǔ)句內(nèi)存分析
垃圾回收機(jī)制
內(nèi)存管理機(jī)制 - 目標(biāo)
掌握賦值語(yǔ)句內(nèi)存分析方法
掌握id()和is的使用
了解Python的垃圾回收機(jī)制
了解Python的內(nèi)存管理機(jī)制 - 內(nèi)存與硬盤
內(nèi)存是電腦的數(shù)據(jù)存儲(chǔ)設(shè)備之一,其特點(diǎn)為容量較小,但數(shù)據(jù)傳送速度較快,用以彌補(bǔ)硬盤雖然容量大但傳送速度慢的缺點(diǎn)。
1.2 內(nèi)存管理機(jī)制
-
is、id()和==
a is b等價(jià)于id(a) == id(b),比較的是id(內(nèi)存標(biāo)識(shí))是否相同。
a == b比較的是value(值)是否相同。
a1 = 5
a2 = 5
print(id(a1)) # 4313714912
print(id(a1) == id(a2), a1 is a2) # True True
b1 = 'abc'
b2 = 'abc'
print(b1 == b2, b1 is b2) # True True
c1 = [1]
c2 = [1]
print(c1 == c2, c1 is c2) # True False
d1 = {'a': 1}
d2 = {'a': 1}
print(d1 == d2, d1 is d2) # True False
- 內(nèi)存管理示例
def extend_list(value, list=[]):
list.append(value)
return list
l1 = extend_list(10)
l2 = extend_list(123, [])
l3 = extend_list('a')
print(l1) # [10, 'a']
print(l2) # [123]
print(l3) # [10, 'a']
- 垃圾回收機(jī)制
以引用計(jì)數(shù)為主,分代收集為輔。
class Cat(object):
def __init__(self):
print('{} init'.format(id(self)))
def __del__(self):
print('{} del'.format(id(self)))
while True:
""" 自動(dòng)回收內(nèi)存 """
c1 = Cat()
l = []
while True:
""" 一直被引用,內(nèi)存不會(huì)釋放 """
c1 = Cat()
l.append(c1)
注意:在引用計(jì)數(shù)垃圾回收機(jī)制中,循環(huán)引用會(huì)導(dǎo)致內(nèi)存泄漏。
- 引用計(jì)數(shù)
每個(gè)對(duì)象都存在指向該對(duì)象的引用計(jì)數(shù)。
查看某個(gè)對(duì)象的引用計(jì)數(shù):sys.getrefcount()
刪除某個(gè)引用:del
import sys
# 基本數(shù)據(jù)類型
i = 1
print(sys.getrefcount(i)) # 181
i1 = 1
i2 = i1
print(sys.getrefcount(i)) # 183
del i1
print(sys.getrefcount(i)) # 182
# 引用數(shù)據(jù)類型
l = []
print(sys.getrefcount(l)) # 2
l1 = l
l2 = l1
print(sys.getrefcount(l)) # 4
del l2
print(sys.getrefcount(l)) # 3
- 分代收集
Python將所有的對(duì)象分為0,1,2三代。
所有新建對(duì)象都是0代對(duì)象。
當(dāng)某一代對(duì)象經(jīng)歷過(guò)垃圾回收,依然存活,那么它就被歸入下一代對(duì)象。 - 垃圾回收時(shí)機(jī)
當(dāng)Python運(yùn)行時(shí),會(huì)記錄其中分配對(duì)象(object allocation)和取消分配對(duì)象(object deallocation)的次數(shù)。當(dāng)兩者的差值高于某一個(gè)閾值時(shí),垃圾回收才會(huì)啟動(dòng)。
查看閾值:gc.get_threshold()
import gc
print(gc.get_threshold()) # (700, 10, 10) 依次表示第0、1、2代垃圾回收閾值
- 手動(dòng)回收
gc.collect()手動(dòng)回收。
objgraph模塊中的count()可以記錄當(dāng)前類產(chǎn)生的未回收實(shí)例對(duì)象個(gè)數(shù)。
class Person(object):
pass
class Cat(object):
pass
p = Person()
c = Cat()
p.pet = c
c.master = p
print(sys.getrefcount(p)) # 3
print(sys.getrefcount(c)) # 3
print(objgraph.count('Person')) # 1
print(objgraph.count('Cat')) # 1
del p
del c
gc.collect() # p與c循環(huán)引用。如果不使用del,則無(wú)法回收。
print(objgraph.count('Person')) # 0
print(objgraph.count('Cat')) # 0
- 內(nèi)存池(
memory pool)機(jī)制
當(dāng)創(chuàng)建大量小內(nèi)存對(duì)象時(shí),頻繁調(diào)用new / malloc會(huì)導(dǎo)致大量?jī)?nèi)存碎片,效率降低。內(nèi)存池就是預(yù)先在內(nèi)存中申請(qǐng)一定數(shù)量的、大小相等的內(nèi)存塊留作備用,當(dāng)有新的內(nèi)存需求時(shí),就先從內(nèi)存池中分配內(nèi)存給這個(gè)需求,不夠了再申請(qǐng)新的內(nèi)存。這樣做可以減少內(nèi)存碎片,提高效率。
Python3中的內(nèi)存管理機(jī)制——Pymalloc
針對(duì)小對(duì)象(<=512bytes),pymalloc會(huì)在內(nèi)存池中申請(qǐng)內(nèi)存空間。
當(dāng)內(nèi)存>512bytes時(shí),則使用PyMen_RawMalloc()和PyMem_RawRealloc()來(lái)申請(qǐng)新的內(nèi)存空間。
注釋:1 Byte = 8 Bits(即1B = 8b)
1.3 總結(jié)
- 賦值語(yǔ)句內(nèi)存分析
使用id()方法訪問(wèn)內(nèi)存地址
使用is比較內(nèi)存地址是否相同 - 垃圾回收機(jī)制
引用計(jì)數(shù)為主,分代收集為輔
引用計(jì)數(shù)的缺陷是循環(huán)引用問(wèn)題 - 內(nèi)存管理機(jī)制
內(nèi)存池(memory pool)機(jī)制
2. 線程、進(jìn)程、協(xié)程
2.1 介紹
- 章節(jié)概要
進(jìn)程、線程與并發(fā)
對(duì)多核的利用
實(shí)現(xiàn)一個(gè)線程
線程之間的通信
線程的調(diào)度和優(yōu)化 - 使用場(chǎng)景
快速高效的爬蟲程序
多用戶同時(shí)訪問(wèn)Web服務(wù)
電商秒殺、搶購(gòu)活動(dòng)
物聯(lián)網(wǎng)傳感器監(jiān)控服務(wù)
2.2 線程
- 線程介紹
在同一個(gè)進(jìn)程下執(zhí)行,并共享相同的上下文。
一個(gè)進(jìn)程中的各個(gè)線程與主線程共享同一片數(shù)據(jù)空間。
線程包括開始、執(zhí)行順序和結(jié)束三部分。
線程可以被強(qiáng)占(中斷)和臨時(shí)掛起(睡眠)——讓步 - 多核的利用
單核CPU系統(tǒng)中,不存在真正的并發(fā)。
GIL全局解釋器鎖 - 強(qiáng)制在任何時(shí)候只有一個(gè)線程可以執(zhí)行Python代碼。
I/O密集型應(yīng)用與CPU密集型應(yīng)用。 -
GIL執(zhí)行順序
(1) 設(shè)置GIL
(2) 切換進(jìn)一個(gè)線程去執(zhí)行
(3) 執(zhí)行下面操作之一
指定數(shù)量的字節(jié)碼指令;
線程主動(dòng)讓出控制權(quán)(可以調(diào)用time.sleep(0)來(lái)完成)。
(4) 把線程設(shè)置回睡眠狀態(tài)(切換出線程)
(5) 解鎖GIL
(6) 重復(fù)上述步驟 -
threading模塊對(duì)象
threading模塊對(duì)象 -
Thread對(duì)象方法
Thread對(duì)象方法 - 線程實(shí)現(xiàn)(面向過(guò)程)
import threading
import time
def loop():
""" 新的線程執(zhí)行的代碼 """
loop_thread = threading.current_thread()
print('loop_thread: {}'.format(loop_thread.name)) # loop_thread: loop_thread
n = 0
while n < 5:
print(n) # 0 1 2 3 4
n += 1
def use_thread():
""" 新的線程執(zhí)行函數(shù) """
now_thread = threading.current_thread()
print('now_thread: {}'.format(now_thread.name)) # now_thread: MainThread
# 創(chuàng)建線程
t = threading.Thread(target=loop, name='loop_thread')
# 啟動(dòng)線程
t.start()
# 掛起線程
t.join()
if __name__ == '__main__':
use_thread()
- 線程實(shí)現(xiàn)(面向?qū)ο?
import threading
class LoopThread(threading.Thread):
""" 自定義線程 """
n = 0
def run(self):
loop_thread = threading.current_thread()
print('loop_thread: {}'.format(loop_thread.name)) # loop_thread: loop_thread
while self.n < 5:
print(self.n) # 0 1 2 3 4
self.n += 1
if __name__ == '__main__':
now_thread = threading.current_thread()
print('now_thread: {}'.format(now_thread.name)) # now_thread: MainThread
# 創(chuàng)建線程
t = LoopThread(name='loop_thread')
# 啟動(dòng)線程
t.start()
# 掛起線程
t.join()
- 多線程及存在的問(wèn)題
import threading
# 我的銀行賬戶
balance = 0
def change(n):
global balance # 局部作用域如果要修改全局變量,則需要global關(guān)鍵字聲明。
balance = balance + n
balance = balance - n
if balance != 0:
print(balance) # balance有可能是-8 -5 0 5 8
class ChangeBalanceThread(threading.Thread):
def __init__(self, num, *args, **kwargs):
super().__init__(*args, **kwargs)
self.num = num
def run(self):
for i in range(100000):
change(self.num)
if __name__ == '__main__':
# 創(chuàng)建線程
t1 = ChangeBalanceThread(5)
t2 = ChangeBalanceThread(8)
# 啟動(dòng)線程
t1.start()
t2.start()
# 掛起線程
t1.join()
t2.join()
注意:局部作用域如果要修改全局變量,則需要global關(guān)鍵字聲明。
- 鎖
import threading
# 鎖
my_lock = threading.Lock()
# 我的銀行賬戶
balance = 0
def change(n):
global balance # 局部作用域如果要修改全局變量,則需要global關(guān)鍵字聲明。
# 添加鎖
my_lock.acquire()
try:
balance = balance + n
balance = balance - n
if balance != 0:
print(balance)
finally:
# 釋放鎖
my_lock.release()
class ChangeBalanceThread(threading.Thread):
def __init__(self, num, *args, **kwargs):
super().__init__(*args, **kwargs)
self.num = num
def run(self):
for i in range(100000):
change(self.num)
if __name__ == '__main__':
# 創(chuàng)建線程
t1 = ChangeBalanceThread(5)
t2 = ChangeBalanceThread(8)
# 啟動(dòng)線程
t1.start()
t2.start()
# 掛起線程
t1.join()
t2.join()
- 鎖的簡(jiǎn)單寫法
# 鎖
my_lock = threading.Lock()
# 我的銀行賬戶
balance = 0
def change(n):
global balance
# 添加鎖
with my_lock:
balance = balance + n
balance = balance - n
if balance != 0:
print(balance)
- 死鎖
# 鎖
my_lock = threading.Lock()
# 我的銀行賬戶
balance = 0
def change(n):
global balance
# 添加鎖
my_lock.acquire()
my_lock.acquire()
try:
balance = balance + n
balance = balance - n
print(balance)
finally:
# 釋放鎖
my_lock.release()
my_lock.release()
-
threading.RLock
我們使用threading.Lock()來(lái)進(jìn)行加鎖。threading中還提供了另外一個(gè)threading.RLock()鎖。在同一線程內(nèi),對(duì)RLock進(jìn)行多次acquire()操作,程序不會(huì)阻塞。
# 鎖
my_lock = threading.RLock()
# 我的銀行賬戶
balance = 0
def change(n):
global balance # 局部作用域如果要修改全局變量,則需要global關(guān)鍵字聲明。
# 添加鎖
my_lock.acquire()
my_lock.acquire()
try:
balance = balance + n
balance = balance - n
print(balance)
finally:
# 釋放鎖
my_lock.release()
my_lock.release()
- 使用線程池
import time
import threading
from concurrent.futures.thread import ThreadPoolExecutor
from multiprocessing.dummy import Pool
def run(n):
""" 線程要做的事情 """
time.sleep(1)
print('thread: {}; n: {}'.format(threading.current_thread().name, n))
def main():
""" 主線程做任務(wù) """
t1 = time.time()
for n in range(100):
run(n)
print(time.time() - t1) # 100.30s
def main_use_thread():
""" 多線程做任務(wù)。10個(gè)線程。"""
t2 = time.time()
ls = []
for i in range(100):
t = threading.Thread(target=run, args=(i,))
ls.append(t)
t.start()
for l in ls:
l.join()
print(time.time() - t2) # 1.01s
def main_use_pool():
""" 線程池 """
t3 = time.time()
pool = Pool(10)
n_list = range(100)
pool.map(run, n_list)
pool.close()
pool.join()
print(time.time() - t3) # 12.08s
def main_use_executor():
""" 使用ThreadPoolExecutor來(lái)優(yōu)化 """
t4 = time.time()
n_list = range(100)
with ThreadPoolExecutor(max_workers=10) as executor:
executor.map(run, n_list)
print(time.time() - t4) # 10.03s
if __name__ == '__main__':
main()
main_use_thread()
main_use_pool()
main_use_executor()
注意:線程傳參的方式threading.Thread(target=run, args=(i,))。其中,(i,)表示元組。
2.3 進(jìn)程
- 進(jìn)程介紹
是一個(gè)執(zhí)行中的程序。
每個(gè)進(jìn)程都擁有自己的地址空間、內(nèi)存、數(shù)據(jù)棧以及其他用于跟蹤執(zhí)行的輔助數(shù)據(jù)。
操作系統(tǒng)管理所有進(jìn)程的執(zhí)行,并為這些進(jìn)程合理地分配時(shí)間。
進(jìn)程也可通過(guò)派生(fork或spawn)新的進(jìn)程來(lái)執(zhí)行其他任務(wù)。 - 進(jìn)程實(shí)現(xiàn)(面向過(guò)程)
import os
import time
from multiprocessing import Process
def do_sth(name):
print("進(jìn)程名稱: {}, pid: {}".format(name, os.getpid())) # 進(jìn)程名稱: my_process, pid: 47634
time.sleep(3)
print("進(jìn)程要做的事情結(jié)束") # 進(jìn)程要做的事情結(jié)束
if __name__ == '__main__':
print('當(dāng)前進(jìn)程pid: {}'.format(os.getpid())) # 當(dāng)前進(jìn)程名稱: 47651
p = Process(target=do_sth, args=('my_process',))
# 啟動(dòng)進(jìn)程
p.start()
# 掛起進(jìn)程
p.join()
- 進(jìn)程實(shí)現(xiàn)(面向?qū)ο?
import os
import time
from multiprocessing import Process
class MyProcess(Process):
def run(self):
print("進(jìn)程名稱: {}, pid: {}".format(self.name, os.getpid())) # 進(jìn)程名稱: my_process, pid: 47758
time.sleep(3)
print("進(jìn)程要做的事情結(jié)束") # 進(jìn)程要做的事情結(jié)束
if __name__ == '__main__':
print('當(dāng)前進(jìn)程pid: {}'.format(os.getpid())) # 當(dāng)前進(jìn)程名稱: 47757
p = MyProcess(name='my_process')
# 啟動(dòng)進(jìn)程
p.start()
# 掛起進(jìn)程
p.join()
- 進(jìn)程之間通信
通過(guò)Queue、Pipes等實(shí)現(xiàn)進(jìn)程之間的通信。
import time
import random
from multiprocessing import Process, Queue, current_process
class WriteProcess(Process):
""" 寫入的進(jìn)程 """
def __init__(self, queue, *args, **kwargs):
self.queue = queue
super().__init__(*args, **kwargs)
def run(self):
ls = [
'第1行內(nèi)容',
'第2行內(nèi)容',
'第3行內(nèi)容',
'第4行內(nèi)容',
'第5行內(nèi)容',
]
for line in ls:
print('進(jìn)程名稱: {}, 寫入內(nèi)容: {}'.format(current_process().name, line))
self.queue.put(line)
# 沒(méi)寫入一次,休息1-3s
time.sleep(random.randint(1, 3))
class ReadProcess(Process):
""" 讀取的進(jìn)程 """
def __init__(self, queue, *args, **kwargs):
self.queue = queue
super().__init__(*args, **kwargs)
def run(self):
while True:
content = self.queue.get()
print('進(jìn)程名稱: {}, 讀取內(nèi)容: {}'.format(self.name, content))
if __name__ == '__main__':
# 通過(guò)Queue共享數(shù)據(jù)
q = Queue()
# 寫入內(nèi)容的進(jìn)程
t_write = WriteProcess(q)
t_write.start()
# 讀取內(nèi)容的進(jìn)程
t_read = ReadProcess(q)
t_read.start()
t_write.join()
# t_read.join()
t_read.terminate()
注意:進(jìn)程類中的self.name等價(jià)于current_process().name。
- 多進(jìn)程中的鎖
import random
import time
from multiprocessing import Process, Lock
class WriteProcess(Process):
""" 寫入文件 """
lock = Lock()
def __init__(self, file_name, index, *args, **kwargs):
self.file_name = file_name
self.index = index
super().__init__(*args, **kwargs)
def run(self):
with self.lock:
for i in range(5):
with open(self.file_name, 'a+', encoding='utf-8') as f:
content = '現(xiàn)在是: {}, pid為: {}, 進(jìn)程序號(hào)為: {}\n'.format(
self.name,
self.pid,
self.index
)
f.write(content)
time.sleep(random.randint(1, 5))
print(content)
if __name__ == '__main__':
file_name = 'test.txt'
for i in range(5):
p = WriteProcess(file_name, i)
p.start()
-
multiprocessing.RLock
我們使用multiprocessing.Lock()來(lái)進(jìn)行加鎖。multiprocessing中還提供了另外一個(gè)multiprocessing.RLock()鎖。在同一進(jìn)程內(nèi),對(duì)RLock進(jìn)行多次acquire()操作,程序不會(huì)阻塞。 - 進(jìn)程池
import os
import random
import time
from multiprocessing import current_process, Pool
def run(file_name, index):
""" 進(jìn)程任務(wù) """
with open(file_name, 'a+', encoding='utf-8') as f:
now_process = current_process()
content = '{} - {} - {} \n'.format(
now_process.name,
now_process.pid,
index
)
f.write(content)
time.sleep(random.randint(1, 3))
print(content)
return 'OK'
if __name__ == '__main__':
# print(os.cpu_count()) # 4核CPU
file_name = 'test_pool.txt'
pool = Pool(2) # 創(chuàng)建有兩個(gè)進(jìn)程的進(jìn)程池
for i in range(20):
# 同步添加任務(wù)
# rest = pool.apply(run, args=(file_name, i))
# 異步添加任務(wù)
rest = pool.apply_async(run, args=(file_name, i))
print('{} {}'.format(i, rest))
pool.close() # 關(guān)閉進(jìn)程池
pool.join()
注釋:異步添加任務(wù),不能保證任務(wù)的執(zhí)行順序。同步添加任務(wù),可以保證任務(wù)的執(zhí)行順序。
2.4 協(xié)程
- 協(xié)程介紹
協(xié)程就是協(xié)同多任務(wù)。
協(xié)程在一個(gè)進(jìn)程或者是一個(gè)線程中執(zhí)行。
協(xié)程不需要鎖機(jī)制。 -
yield生成器
def count_down(n):
""" 倒計(jì)時(shí)效果 """
while n > 0:
yield n
n -= 1
if __name__ == '__main__':
rest = count_down(5)
print(next(rest)) # 5
print(next(rest)) # 4
print(next(rest)) # 3
print(next(rest)) # 2
print(next(rest)) # 1
-
Python3.5以前協(xié)程實(shí)現(xiàn)
使用生成器(yield)實(shí)現(xiàn)
def yield_test():
""" 實(shí)現(xiàn)協(xié)程函數(shù) """
while True:
n = (yield )
print(n)
if __name__ == '__main__':
rest = yield_test()
next(rest)
rest.send('1') # 1
rest.send('2') # 2
-
Python3.5以后協(xié)程實(shí)現(xiàn)
使用async和await關(guān)鍵字實(shí)現(xiàn)
import asyncio
async def do_sth(x):
print("等待中: {}".format(x))
await asyncio.sleep(x)
# 判斷是否是協(xié)程函數(shù)
print(asyncio.iscoroutinefunction(do_sth)) # True
# 創(chuàng)建任務(wù)
coroutine = do_sth(5)
# 創(chuàng)建事件循環(huán)隊(duì)列
loop = asyncio.get_event_loop()
# 注冊(cè)任務(wù)
task = loop.create_task(coroutine)
print(task) # <Task pending coro=<do_sth() running at /Users/nimengwei/Code/mycode/python/Python零基礎(chǔ)入門/Python基礎(chǔ)知識(shí)/chapter09/async_test.py:4>>
# 等待協(xié)程任務(wù)執(zhí)行結(jié)束
loop.run_until_complete(task)
print(task) # <Task finished coro=<do_sth() done, defined at /Users/nimengwei/Code/mycode/python/Python零基礎(chǔ)入門/Python基礎(chǔ)知識(shí)/chapter09/async_test.py:4> result=None>
注意:必須使用asyncio.sleep(),不能使用time.sleep()。只有前者能返回一個(gè)協(xié)程對(duì)象。
- 協(xié)程通信之嵌套調(diào)用
import asyncio
async def compute(x, y):
await asyncio.sleep(1)
return x + y
async def get_sum(x, y):
rest = await compute(x, y)
print("計(jì)算{} + {} = {}".format(x, y, rest))
loop = asyncio.get_event_loop()
# loop.run_until_complete(loop.create_task(get_sum(1, 2)))
loop.run_until_complete(get_sum(1, 2))
loop.close()
- 協(xié)程通信之隊(duì)列
import asyncio
import random
async def add(store):
""" 寫入數(shù)據(jù)到隊(duì)列 """
for i in range(5):
await asyncio.sleep(random.randint(1, 3))
await store.put(i)
print('add {}, queue size: {}'.format(i, store.qsize()))
async def reduce(store):
""" 從隊(duì)列中刪除數(shù)據(jù) """
for i in range(10):
rest = await store.get()
print('reduce {}, queue size: {}'.format(rest, store.qsize()))
if __name__ == '__main__':
# 準(zhǔn)備一個(gè)隊(duì)列
store = asyncio.Queue(maxsize=5)
a1 = add(store)
a2 = add(store)
r1 = reduce(store)
# 添加到事件隊(duì)列
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(a1, a2, r1))
loop.close()

