其實
asyncio的學(xué)習(xí)一點也不快樂
一、python 的多線程和多進(jìn)程
要想理解 asyncio 的異步編程,需要簡單了解一下 python 的多線程和多進(jìn)程知識
-
1、多線程
python有GIL機(jī)制,因此,python的多線程雖然是操作系統(tǒng)的原生線程,但無法完成真正的并行運行,而僅僅在線程處于睡眠或者等待I/O時,才會發(fā)揮真正的多線程作用。
-
1.1、睡眠
time.sleep()threading.Lock- 線程模塊其他同步對象
-
1.2、I/O
aiohttpopen
-
1.3、釋放
GIL- 所謂釋放
GIL是指當(dāng)前線程執(zhí)行一定長度字節(jié)碼或者一段時間后,釋放GIL,由系統(tǒng)將GIL分配給其他線程,當(dāng)前線程進(jìn)入等待狀態(tài) -
py2解釋器每執(zhí)行1000字節(jié)碼釋放GIL -
py3解釋器每執(zhí)行15ms釋放GIL
- 所謂釋放
-
1.4、
GIL全局解釋器鎖- 同一進(jìn)程同一時間只有一個線程在執(zhí)行字節(jié)碼,但睡眠線程或者
I/O操作相關(guān)線程不受GIL鎖限制,允許并發(fā)執(zhí)行。(GIL保證同一時刻只有一個線程對共享資源進(jìn)行存取,省去線程間資源鎖的開銷)
- 同一進(jìn)程同一時間只有一個線程在執(zhí)行字節(jié)碼,但睡眠線程或者
-
1.5、
GIL原理/* s.connect((host, port)) method */ static PyObject * sock_connect(PySocketSockObject *s, PyObject *addro) { sock_addr_t addrbuf; int addrlen; int res; /* convert (host, port) tuple to C address */ getsockaddrarg(s, addro, SAS2SA(&addrbuf), &addrlen); Py_BEGIN_ALLOW_THREADS res = connect(s->sock_fd, addr, addrlen); Py_END_ALLOW_THREADS /* error handling and so on .... */ }-
Py_BEGIN_ALLOW_THREADS放棄GIL -
Py_END_ALLOW_THREADS重新獲取GIL,一個線程會在這個位置阻塞,等待另一個線程釋放鎖;一旦出現(xiàn)這個情況,等待的線程會搶奪回鎖,并恢復(fù)字節(jié)碼的執(zhí)行 - 簡而言之:允許有N個線程在網(wǎng)絡(luò)
I/O堵塞,或等待重新獲取GIL,但只有一個線程運行字節(jié)碼
-
-
1.6、示例
- 睡眠阻塞
import time from threading import Thread from datetime import datetime def write(i): print('{} start write --> {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), i)) time.sleep(4) print('{} end write --> {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), i)) def fun(): print('start ...') for i in range(3): Thread(target=write, args=(i,), daemon=False).start() print('end ...') # 輸出結(jié)果 start ... 2018-02-09 23:58:25 start write --> 0 2018-02-09 23:58:25 start write --> 1 2018-02-09 23:58:25 start write --> 2 end ... 2018-02-09 23:58:29 end write --> 0 2018-02-09 23:58:29 end write --> 1 2018-02-09 23:58:29 end write --> 2- CPU 阻塞
import time from threading import Thread from datetime import datetime def write(n): print('{} start write --> {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), n)) l, sum_ = list(range(100000000)), 0 for i in l: sum_ += i print('{} end write --> {}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), n)) def fun(): print('start ...') for i in range(3): Thread(target=write, args=(i,), daemon=False).start() print('end ...') # 輸出結(jié)果 start ... 2018-02-10 00:13:55 start write --> 0 2018-02-10 00:13:58 start write --> 1 2018-02-10 00:14:02 start write --> 2 end ... 2018-02-10 00:14:27 end write --> 0 2018-02-10 00:14:32 end write --> 1 2018-02-10 00:14:35 end write --> 2- 總結(jié)
- 對于睡眠操作或者
I/O操作,多線程的作用非常明顯,明顯減少所消耗總時間; - 對于
CPU計算型操作,多線程操作反而因為多線程間獲取GIL而增加總的消耗時間。
- 對于睡眠操作或者
-
2、
python多進(jìn)程
python多進(jìn)程即其他語言中的多進(jìn)程概念,不再累述
二、異步編程思想
- 1、協(xié)程
coroutine - 2、任務(wù)
Task - 3、事件循環(huán)
loop
1、Task 對象主要包含 協(xié)程(coro)和輪詢對象(loop)2個屬性;
2、Loop 對象使用隊列和堆數(shù)據(jù)結(jié)構(gòu)存放Handle對象(綁定了回調(diào)函數(shù),如:task的_step方法等)。隊列中存放的是可以立即執(zhí)行的任務(wù),堆中存放的是一定時間后要執(zhí)行的任務(wù)。對于yield from asyncio.sleep() 的任務(wù)則是添加到堆中,到達(dá)指定時間后執(zhí)行。
# 簡單的調(diào)用示例
import asyncio
@asyncio.coroutine
def coro_fun():
yield from range(10)
loop = asyncio.get_event_loop()
loop.run_until_complete(coro_fun())
# or
tasks = [asyncio.ensure_future(coro_fun())]
loop.run_until_complete(asyncio.wait(tasks))
三、什么是協(xié)程
進(jìn)程或線程間的創(chuàng)建依賴于系統(tǒng)底層進(jìn)程或線程庫,其運行也依賴于系統(tǒng)的任務(wù)調(diào)度系統(tǒng),在任務(wù)切換時,
cpu需要進(jìn)行上下文切換。
協(xié)程是運行在單線程上,協(xié)程間的切換是在語言層級實現(xiàn)的,依賴于對應(yīng)協(xié)程庫。
- 在單線程執(zhí)行過程中,如果涉及
sleep,網(wǎng)絡(luò)IO操作時,線程會阻塞住等待任務(wù)完成; - 但如果使用協(xié)程,輪詢對象(
loop)在輪詢事件時,會分別處理就緒對象_ready和調(diào)度對象_scheduled以及select監(jiān)聽對象。每次進(jìn)行輪詢時,會篩選出調(diào)度對象中滿足執(zhí)行條件的對象以及select監(jiān)聽到可讀或可寫的對象,添加到就緒對象中,由loop對象進(jìn)行循環(huán)調(diào)度。_ready-
_ready+= 滿足執(zhí)行條件的_scheduled對象 -
_ready+=select監(jiān)聽到的可讀或可寫對象 -
loop遍歷執(zhí)行_ready中的對象
四、什么是期物
期物對象的設(shè)計初衷是,期物用來追蹤任務(wù)或者協(xié)程的運行狀態(tài)。一般使用中,期物用來追蹤
_ready,_scheduled,select監(jiān)聽的對象,在各對象執(zhí)行完成后設(shè)置期物對象狀態(tài)為FINISHED,并將設(shè)置_loop輪詢對象狀態(tài)為close的函數(shù)注冊到loop對象的_ready隊列中,由loop對象輪詢完成。
五、源代碼分析
- 關(guān)于 _ code _.co_flags
# 每個函數(shù)或方法都有 __code__ 魔法方法 以及其對應(yīng)的 co_flags 值 # 在 Cpython 中, 1、生成器函數(shù)的標(biāo)識符為 CO_GENERATOR 即 0x20, 2、協(xié)程函數(shù)的標(biāo)識符為 CO_COROUTINE 即 0x180 3、CO_ITERABLE_COROUTINE 即 0x100 # 通過對函數(shù)對象的 __code__.co_flags 與 對應(yīng)的標(biāo)識符做位與運算,如果是真值,則表明函數(shù)對象屬于生成器函數(shù)或協(xié)程函數(shù) def gen_fun(): yield from range(10) >>> gen_fun.__code__.co_flags # 99 >>> 99 & 0x20 # 32, True >>> 99 & 0x180 # 0, False async def asy_fun(): await sleep(4) >>> asy_fun.__code__.co_flags # 227 >>> 99 & 0x20 # 32, True >>> 227 & 0x180 # 128, True - 關(guān)于類型判斷
from collections import Iterator, Awaitable # 判斷迭代器 和 Awaitable 對象 class A: def __iter__(self): return iter([1,2,3,4,5]) def __await__(self): return iter([1,2,3,4,5]) a = A() >>> isinstance(a, Iterator) # True >>> isinstance(a, Awaitable) # True # 判斷是否為協(xié)程等 import inspect async def asy_fun(): await a >>> inspect.iscoroutine(asy_fun()) # True - @asyncio.coroutine
def coroutine(func): # 將一個生成器標(biāo)記為協(xié)程,如果在destroyed前沒有調(diào)用,則會記錄錯誤 # 這個方法是使用 inspect.iscoroutinefunction 方法判斷是否為協(xié)程方法,使用 types.coroutine 裝飾的生成器,或 async def 語法定義的函數(shù)都會返回 True if _inspect_iscoroutinefunction(func): return func # 使用 co_flags 判斷是否為生成器 if inspect.isgeneratorfunction(func): coro = func else: @functools.wraps(func) def coro(*args, **kw): res = func(*args, **kw) # 判斷 res 是否為期物,生成器 或 協(xié)程包裝類 實例 if isinstance(res, futures.Future) or inspect.isgenerator(res) or \ isinstance(res, CoroWrapper): res = yield from res elif _AwaitableABC is not None: # py 3.5 才會有 Awaitable 類 try: # 如果有 __await__屬性,__await__屬性只會返回一個不是協(xié)程的迭代器 await_meth = res.__await__ except AttributeError: pass else: # 如果是 Awaitable 對象 if isinstance(res, _AwaitableABC): # 使用 yield from 處理其迭代器 res = yield from await_meth() return res # 使用 types.coroutine 包裝 coro(注意,多層 @types.coroutine 裝飾不會影響,會直接return裝飾的值) if not _DEBUG: if _types_coroutine is None: wrapper = coro else: wrapper = _types_coroutine(coro) else: @functools.wraps(func) def wrapper(*args, **kwds): # 使用協(xié)程包裝器處理 w = CoroWrapper(coro(*args, **kwds), func=func) if w._source_traceback: del w._source_traceback[-1] # 如果是 py 3.5 則包裝增加 協(xié)程 對象的屬性,否則包裝為 生成器 對象的屬性 w.__name__ = getattr(func, '__name__', None) w.__qualname__ = getattr(func, '__qualname__', None) return w # 用以別處使用 asyncio.iscoroutinefunction() 判斷為 True 的作用 wrapper._is_coroutine = True # For iscoroutinefunction(). return wrapper - @types.coroutine
def coroutine(func): # 將一個普通的生成器函數(shù)轉(zhuǎn)化為協(xié)程 if not callable(func): raise TypeError('types.coroutine() expects a callable') if (func.__class__ is FunctionType and getattr(func, '__code__', None).__class__ is CodeType): # 獲取函數(shù)的 co_flags co_flags = func.__code__.co_flags # 檢查是否為協(xié)程函數(shù) if co_flags & 0x180: return func # 檢查是否為生成器函數(shù),此步主要作用是將生成器的 co_flags 同 0x100 做位或運算,將其標(biāo)識變更為協(xié)程標(biāo)識 if co_flags & 0x20: # TODO: Implement this in C. co = func.__code__ func.__code__ = CodeType( co.co_argcount, co.co_kwonlyargcount, co.co_nlocals, co.co_stacksize, co.co_flags | 0x100, # 0x100 == CO_ITERABLE_COROUTINE co.co_code, co.co_consts, co.co_names, co.co_varnames, co.co_filename, co.co_name, co.co_firstlineno, co.co_lnotab, co.co_freevars, co.co_cellvars) return func # 用以支持類似生成器的對象 @_functools.wraps(func) def wrapped(*args, **kwargs): coro = func(*args, **kwargs) # 協(xié)程或 co_flags 大于 256 的生成器對象,直接返回 if (coro.__class__ is CoroutineType or coro.__class__ is GeneratorType and coro.gi_code.co_flags & 0x100): return coro if (isinstance(coro, _collections_abc.Generator) and not isinstance(coro, _collections_abc.Coroutine)): # 實現(xiàn)了生成器抽象類的方法,使用生成器包裝器處理成生成器 return _GeneratorWrapper(coro) # 協(xié)程抽象類實例或其他對象 return coro return wrapped