大綱
- 操作系統(tǒng)任務(wù)調(diào)度
- 進程、線程
- 協(xié)程
- Asyncio
4.1 定義一個協(xié)程(Coroutine)
4.2 定義一個任務(wù)(Task / Future)
4.3 綁定回調(diào) / 獲取任務(wù)返回結(jié)果
4.4 并發(fā)、并發(fā)控制
4.5 協(xié)程停止 - 結(jié)語
- 參考
簡書的錨點已經(jīng)對我造成了成噸的傷害...
操作系統(tǒng)任務(wù)調(diào)度
操作系統(tǒng)執(zhí)行的任務(wù)基本可以分為:CPU 密集型、I/O 密集型。CPU 密集型任務(wù)會消耗大量的 CPU 計算資源,因此讓操作系統(tǒng)調(diào)度任務(wù)的執(zhí)行即可。而 I/O 密集型任務(wù)一般會涉及到硬盤 I/O、網(wǎng)絡(luò)傳輸,大部分的時間在等待 I/O 的完成,因此出現(xiàn)了基于多任務(wù)系統(tǒng)的 CPU 任務(wù)調(diào)度。參考:IBM/調(diào)整 Linux I/O 調(diào)度器優(yōu)化系統(tǒng)性能
在多任務(wù)系統(tǒng)中,操作系統(tǒng)接管了所有硬件資源并持有對硬件控制的最高權(quán)限。在操作系統(tǒng)中執(zhí)行的程序,都以進程的方式運行在低的權(quán)限中。所有的硬件資源,由操作系統(tǒng)根據(jù)進程的優(yōu)先級以及進程的運行狀況進行統(tǒng)一的調(diào)度。
常見 Linux 操作系統(tǒng)搶占式任務(wù)處理(現(xiàn)代操作系統(tǒng)都支持搶占式多任務(wù),包括 Windows、macOS、Linux(包括Android)和 iOS)
進程、線程
程序是一組指令的集合,程序運行時操作系統(tǒng)會將程序載入內(nèi)存空間,在邏輯上產(chǎn)生一個單獨的實例叫做進程(Process)。
隨著多核 CPU 的發(fā)展,為了充分利用多核資源,需要進程內(nèi)能并行地執(zhí)行任務(wù),因此產(chǎn)生了線程(Thread)的概念。
線程是操作系統(tǒng)進行任務(wù)調(diào)度的最小單元,線程存活于進程之中;同一個進程中的線程,共享一個虛擬內(nèi)存空間;線程之間各自持有自己的線程 ID、當前指令的指針(PC)、寄存器集合以及棧。
線程和進程均由操作系統(tǒng)調(diào)度。

多線程的優(yōu)勢:
- 充分利用多核 CPU 資源(在 Python 中是不存在的);
- 將等待 I/O 操作的時間,調(diào)度到其他線程執(zhí)行,提高 CPU 利用率;
- 將計算密集型的操作留給工作線程,預(yù)留線程保持與用戶的交互;
- 同進程內(nèi)多線程之間更加容易實現(xiàn)內(nèi)存共享;
多線程從一定程度上提升了 CPU 資源的利用率,然而類似 C10K 等問題又開始讓程序員對內(nèi)核級別的上下文切換開銷重視起來。
協(xié)程
協(xié)程讓用戶可以自主調(diào)度協(xié)程的運行狀態(tài)(運行,掛起),協(xié)程可以看做是用戶態(tài)線程,協(xié)程的目的在于讓阻塞的 I/O 操作異步化。
一般子程序/函數(shù)的調(diào)用是按照順序執(zhí)行的,一個入口,一次返回。而協(xié)程可以在子程序 A 的調(diào)用過程中中斷執(zhí)行,轉(zhuǎn)而調(diào)用另外一個子程序 B,在適當?shù)臅r機再切回到子程序 A 繼續(xù)執(zhí)行,因此協(xié)程節(jié)省了多線程切換帶來的開銷問題,實現(xiàn)了在單線程中多線程的效果(當然,前提是各個子程序都是非阻塞的)。
協(xié)程擁有自己的寄存器上下文和棧,協(xié)程調(diào)度切換時,將寄存器上下文和棧保存起來,在切回來的時候,恢復(fù)之前保存的寄存器上下文和棧,這種直接切換操作棧的方式(context上下文切換),避開了內(nèi)核切換的開銷,可以不加鎖的訪問全局變量,切換速度快。
協(xié)程的優(yōu)勢:
- 比線程開銷?。?/li>
- 單線程模型,線程安全避免了資源競爭;
- 代碼邏輯清晰,同步的方式編寫異步邏輯代碼;
Asyncio
Python 在 3.4 中引入了協(xié)程的概念,3.5 確定了協(xié)程的語法,Asyncio 基本概念:
- Event Loop 事件循環(huán):程序開啟一個 While True 循環(huán),用戶將一些函數(shù)注冊到事件循環(huán)上,當滿足事件執(zhí)行條件時,調(diào)用的協(xié)程函數(shù);
- Coroutine 協(xié)程對象:使用 asnc關(guān)鍵字定義的函數(shù),它的調(diào)用不會立即執(zhí)行函數(shù),而是返回一個協(xié)程對象,協(xié)程對象需要注冊到事件循環(huán)中,由事件循環(huán)負責(zé)調(diào)用;
- Task:對協(xié)程對象的進一步封裝,包括任務(wù)的各種狀態(tài);
- Future:代表將來執(zhí)行或沒有執(zhí)行的任務(wù)的結(jié)果,和 Task 沒有本質(zhì)的區(qū)別;
- async:定義一個協(xié)程對象;
- await:掛起阻塞的異步調(diào)用接口;
tips : 使用 Cython + libuv 實現(xiàn)的 uvloop 可以提升事件循環(huán)更多的性能:
import asyncio
import uvloop
# 聲明使用 uvloop 事件循環(huán)
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
...
...
loop = uvloop.new_event_loop()
asyncio.set_event_loop(loop)
import asyncio
async def compute(x, y):
print("Compute %s + %s ..." % (x, y))
# 阻塞
await asyncio.sleep(1.0)
return x + y
async def print_sum(x, y):
result = await compute(x, y)
print("%s + %s = %s" % (x, y, result))
loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()
流程:

使用 async 關(guān)鍵字定義一個協(xié)程(coroutine),協(xié)程是一個對象,直接調(diào)用并不會運行??梢酝ㄟ^在協(xié)程內(nèi)部 await coroutine 或 yield from coroutine 運行,或者將協(xié)程加入到事件循環(huán)中讓 EventLoop 調(diào)度執(zhí)行。
Calling a coroutine does not start its code running – the coroutine object returned by the call doesn’t do anything until you schedule its execution. There are two basic ways to start it running: call await coroutine
or yield from coroutine
from another coroutine (assuming the other coroutine is already running!), or schedule its execution using the ensure_future()
function or the AbstractEventLoop.create_task()
method.
Coroutines (and tasks) can only run when the event loop is running.
定義一個協(xié)程(Coroutine)
import time
import asyncio
# 定義協(xié)程
async def test(x):
print("wait:", x)
await asyncio.sleep(x)
start = time.time()
coroutine = test(1)
# 獲取事件循環(huán)
loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine)
print("time:", time.time() - start)
# 輸出:
# wait: 1
# time: 1.0050649642944336
定義一個任務(wù)(Task / Future)
Future 對象保存了協(xié)程的狀態(tài),可以用來獲取協(xié)程的執(zhí)行返回結(jié)果。
asyncio.ensure_future(coroutine) 和 loop.create_task(coroutine) 都可以創(chuàng)建任務(wù),run_until_complete 的參數(shù)是一個 futrue 對象。當傳入一個協(xié)程方法時,其內(nèi)部會自動封裝成task,task是Future的子類。
import time
import asyncio
# 定義協(xié)程
async def test(x):
print("wait:", x)
await asyncio.sleep(x)
start = time.time()
coroutine = test(1)
loop = asyncio.get_event_loop()
# future
# task = asyncio.ensure_future(coroutine)
# 顯式創(chuàng)建任務(wù):task 是 future 的子類
task = loop.create_task(coroutine)
print(task)
loop.run_until_complete(task)
print(task)
print("time:", time.time() - start)
# <Task pending coro=<test() running at xxx>>
# wait: 1
# <Task finished coro=<test() done, defined at xxx> result=None>
# time: 1.006286859512329
綁定回調(diào) / 獲取任務(wù)返回結(jié)果
import time
import asyncio
# 定義協(xié)程
async def test(x):
print("wait:", x)
await asyncio.sleep(x)
return "done of {}".format(x)
def callback(future):
print("callback:", future.result())
start = time.time()
coroutine = test(1)
loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)
# 回調(diào)
task.add_done_callback(callback)
loop.run_until_complete(task)
# 直接獲取
print("result:", task.result())
print("time:", time.time() - start)
# wait: 1
# callback: done of 1
# result: done of 1
# time: 1.0015690326690674
并發(fā)、并發(fā)控制
多個協(xié)程注冊到事件循環(huán)中,當執(zhí)行某一個協(xié)程時在任務(wù)阻塞的時候用 await 掛起,其他協(xié)程繼續(xù)工作。
import time
import asyncio
async def test(x):
print("wait:", x)
await asyncio.sleep(x)
return "done of {}".format(x)
start = time.time()
# sleep 1s 2s 3s
coroutine1 = test(1)
coroutine2 = test(2)
coroutine3 = test(3)
loop = asyncio.get_event_loop()
task = [
loop.create_task(coroutine1),
loop.create_task(coroutine2),
loop.create_task(coroutine3)
]
# wait方式
# run_task = asyncio.wait(task)
# gather 能保證有序的結(jié)果返回
run_task = asyncio.gather(*task)
loop.run_until_complete(run_task)
for t in task:
print("task result:", t.result())
print("time:", time.time() - start)
# 輸出:
wait: 1
wait: 2
wait: 3
task result: done of 1
task result: done of 2
task result: done of 3
time: 3.0037271976470947
通過 Semaphore 信號量機制控制并發(fā)數(shù)量
通過 await 再調(diào)用另外一個協(xié)程,這樣可以實現(xiàn)協(xié)程的嵌套
- await asyncio.gather(*task)
- await asyncio.wait(task)
- asyncio.as_completed(task)
import time
import asyncio
import aiohttp
URL = "https://www.baidu.com"
# 設(shè)置并發(fā)數(shù):3
sema = asyncio.Semaphore(3)
cookie_jar = aiohttp.CookieJar(unsafe=True)
session = None
async def fetcher(url, index):
"""
通過 aiohttp 非阻塞的方式訪問 URL 資源
"""
async with session.get(url) as resp:
print("start fetch index:{}".format(index))
# 假裝多卡1秒
await asyncio.sleep(1)
return await resp.text()
async def worker(url, index):
"""
Semaphore信號量機制控制并發(fā)
"""
with (await sema):
resp = await fetcher(url, index)
return ("index:", index, len(resp), time.time())
async def dispatch(task_list):
"""
派發(fā)下載任務(wù)
"""
# init session
global session
session = aiohttp.ClientSession(cookie_jar=cookie_jar)
# send task
tasks = [asyncio.ensure_future(worker(URL, t)) for t in task_list]
for task in asyncio.as_completed(tasks):
resp = await task
print(resp)
# release session
session.close()
start = time.time()
loop = asyncio.get_event_loop()
coroutine = dispatch(range(5))
loop.run_until_complete(coroutine)
print("total time:", time.time() - start)
# 輸出:
start fetch index:2
start fetch index:1
start fetch index:0
('index:', 2, 227, 1508508870.628295)
('index:', 1, 227, 1508508870.642124)
('index:', 0, 227, 1508508870.6424)
start fetch index:4
start fetch index:3
('index:', 4, 227, 1508508871.736131)
('index:', 3, 227, 1508508871.737195)
total time: 2.2324538230895996
協(xié)程停止
Future 對象狀態(tài):
- pending
- running
- waiting (瞎蒙的)
- done
- canceled
Future 對象在協(xié)程創(chuàng)建之后狀態(tài)為 pending,事件循環(huán)調(diào)度執(zhí)行協(xié)程時狀態(tài)變?yōu)?running,想要停止協(xié)程,調(diào)用 future.cancel() 即可。
import time
import asyncio
async def test(x):
print("wait:", x)
await asyncio.sleep(x)
return "done of {}".format(x)
coroutine1 = test(1)
coroutine2 = test(10)
coroutine3 = test(15)
loop = asyncio.get_event_loop()
task = [
loop.create_task(coroutine1),
loop.create_task(coroutine2),
loop.create_task(coroutine3)
]
start = time.time()
try:
loop.run_until_complete(asyncio.wait(task))
except KeyboardInterrupt:
print(asyncio.Task.all_tasks())
print(asyncio.gather(*asyncio.Task.all_tasks()).cancel())
loop.stop()
loop.run_forever()
finally:
loop.close()
print("time:", time.time() - start)
# 輸出:
wait: 1
wait: 10
wait: 15
^C{<Task pending coro=<test() running at a.py:6> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /usr/local/var/pyenv/versions/3.5.3/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py:422]>, <Task pending coro=<test() running at a.py:6> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /usr/local/var/pyenv/versions/3.5.3/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py:422]>, <Task pending coro=<wait() running at /usr/local/var/pyenv/versions/3.5.3/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py:355> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_run_until_complete_cb() at /usr/local/var/pyenv/versions/3.5.3/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py:176]>, <Task finished coro=<test() done, defined at a.py:4> result='done of 1'>}
True
time: 1.4758961200714111
結(jié)語
Asyncio 對于熟悉 Tornado 或 Twisted 等異步框架的同學(xué)上手起來會很快,編程風(fēng)格也可以很"同步化"。目前我們僅在生產(chǎn)環(huán)境嘗試了 asyncio + aiohttp 作為網(wǎng)絡(luò)采集的解決方案,初步使用下來感覺還是挺穩(wěn)定的,并且避免了之前使用 Gevent Monkey Patch 的侵入式改動,Aysncio 還有更多的場景等待我們?nèi)グl(fā)掘(比如 aiohttp 作為 Web 服務(wù))。
目前 Github 開源的部分支持異步非阻塞的 aio 庫,鏈接:https://github.com/aio-libs
對于新事物,永遠保持一顆探索的心,共勉。
參考
https://docs.python.org/3/library/asyncio.html
https://liam0205.me/2017/01/17/layers-and-operation-system/
https://segmentfault.com/a/1190000003063859