Python Asyncio

大綱

  1. 操作系統(tǒng)任務(wù)調(diào)度
  2. 進程、線程
  3. 協(xié)程
  4. Asyncio
    4.1 定義一個協(xié)程(Coroutine)
    4.2 定義一個任務(wù)(Task / Future)
    4.3 綁定回調(diào) / 獲取任務(wù)返回結(jié)果
    4.4 并發(fā)、并發(fā)控制
    4.5 協(xié)程停止
  5. 結(jié)語
  6. 參考

簡書的錨點已經(jīng)對我造成了成噸的傷害...

操作系統(tǒng)任務(wù)調(diào)度

操作系統(tǒng)執(zhí)行的任務(wù)基本可以分為:CPU 密集型、I/O 密集型。CPU 密集型任務(wù)會消耗大量的 CPU 計算資源,因此讓操作系統(tǒng)調(diào)度任務(wù)的執(zhí)行即可。而 I/O 密集型任務(wù)一般會涉及到硬盤 I/O、網(wǎng)絡(luò)傳輸,大部分的時間在等待 I/O 的完成,因此出現(xiàn)了基于多任務(wù)系統(tǒng)的 CPU 任務(wù)調(diào)度。參考:IBM/調(diào)整 Linux I/O 調(diào)度器優(yōu)化系統(tǒng)性能

在多任務(wù)系統(tǒng)中,操作系統(tǒng)接管了所有硬件資源并持有對硬件控制的最高權(quán)限。在操作系統(tǒng)中執(zhí)行的程序,都以進程的方式運行在低的權(quán)限中。所有的硬件資源,由操作系統(tǒng)根據(jù)進程的優(yōu)先級以及進程的運行狀況進行統(tǒng)一的調(diào)度。

常見 Linux 操作系統(tǒng)搶占式任務(wù)處理(現(xiàn)代操作系統(tǒng)都支持搶占式多任務(wù),包括 Windows、macOS、Linux(包括Android)和 iOS)

進程、線程

程序是一組指令的集合,程序運行時操作系統(tǒng)會將程序載入內(nèi)存空間,在邏輯上產(chǎn)生一個單獨的實例叫做進程(Process)。

隨著多核 CPU 的發(fā)展,為了充分利用多核資源,需要進程內(nèi)能并行地執(zhí)行任務(wù),因此產(chǎn)生了線程(Thread)的概念。

線程是操作系統(tǒng)進行任務(wù)調(diào)度的最小單元,線程存活于進程之中;同一個進程中的線程,共享一個虛擬內(nèi)存空間;線程之間各自持有自己的線程 ID、當前指令的指針(PC)、寄存器集合以及棧。

線程和進程均由操作系統(tǒng)調(diào)度。

image.png

多線程的優(yōu)勢:

  1. 充分利用多核 CPU 資源(在 Python 中是不存在的);
  2. 將等待 I/O 操作的時間,調(diào)度到其他線程執(zhí)行,提高 CPU 利用率;
  3. 將計算密集型的操作留給工作線程,預(yù)留線程保持與用戶的交互;
  4. 同進程內(nèi)多線程之間更加容易實現(xiàn)內(nèi)存共享;

多線程從一定程度上提升了 CPU 資源的利用率,然而類似 C10K 等問題又開始讓程序員對內(nèi)核級別的上下文切換開銷重視起來。

協(xié)程

協(xié)程讓用戶可以自主調(diào)度協(xié)程的運行狀態(tài)(運行,掛起),協(xié)程可以看做是用戶態(tài)線程,協(xié)程的目的在于讓阻塞的 I/O 操作異步化。

一般子程序/函數(shù)的調(diào)用是按照順序執(zhí)行的,一個入口,一次返回。而協(xié)程可以在子程序 A 的調(diào)用過程中中斷執(zhí)行,轉(zhuǎn)而調(diào)用另外一個子程序 B,在適當?shù)臅r機再切回到子程序 A 繼續(xù)執(zhí)行,因此協(xié)程節(jié)省了多線程切換帶來的開銷問題,實現(xiàn)了在單線程中多線程的效果(當然,前提是各個子程序都是非阻塞的)。

協(xié)程擁有自己的寄存器上下文和棧,協(xié)程調(diào)度切換時,將寄存器上下文和棧保存起來,在切回來的時候,恢復(fù)之前保存的寄存器上下文和棧,這種直接切換操作棧的方式(context上下文切換),避開了內(nèi)核切換的開銷,可以不加鎖的訪問全局變量,切換速度快。

協(xié)程的優(yōu)勢:

  1. 比線程開銷?。?/li>
  2. 單線程模型,線程安全避免了資源競爭;
  3. 代碼邏輯清晰,同步的方式編寫異步邏輯代碼;

Asyncio

Python 在 3.4 中引入了協(xié)程的概念,3.5 確定了協(xié)程的語法,Asyncio 基本概念:

  • Event Loop 事件循環(huán):程序開啟一個 While True 循環(huán),用戶將一些函數(shù)注冊到事件循環(huán)上,當滿足事件執(zhí)行條件時,調(diào)用的協(xié)程函數(shù);
  • Coroutine 協(xié)程對象:使用 asnc關(guān)鍵字定義的函數(shù),它的調(diào)用不會立即執(zhí)行函數(shù),而是返回一個協(xié)程對象,協(xié)程對象需要注冊到事件循環(huán)中,由事件循環(huán)負責(zé)調(diào)用;
  • Task:對協(xié)程對象的進一步封裝,包括任務(wù)的各種狀態(tài);
  • Future:代表將來執(zhí)行或沒有執(zhí)行的任務(wù)的結(jié)果,和 Task 沒有本質(zhì)的區(qū)別;
  • async:定義一個協(xié)程對象;
  • await:掛起阻塞的異步調(diào)用接口;

tips : 使用 Cython + libuv 實現(xiàn)的 uvloop 可以提升事件循環(huán)更多的性能:

import asyncio
import uvloop
# 聲明使用 uvloop 事件循環(huán)
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
...
...
loop = uvloop.new_event_loop()
asyncio.set_event_loop(loop)

協(xié)程示例

import asyncio

async def compute(x, y):
    print("Compute %s + %s ..." % (x, y))
    # 阻塞
    await asyncio.sleep(1.0)
    return x + y

async def print_sum(x, y):
    result = await compute(x, y)
    print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

流程

image.png

使用 async 關(guān)鍵字定義一個協(xié)程(coroutine),協(xié)程是一個對象,直接調(diào)用并不會運行??梢酝ㄟ^在協(xié)程內(nèi)部 await coroutine 或 yield from coroutine 運行,或者將協(xié)程加入到事件循環(huán)中讓 EventLoop 調(diào)度執(zhí)行。

Calling a coroutine does not start its code running – the coroutine object returned by the call doesn’t do anything until you schedule its execution. There are two basic ways to start it running: call await coroutine
or yield from coroutine
from another coroutine (assuming the other coroutine is already running!), or schedule its execution using the ensure_future()
function or the AbstractEventLoop.create_task()
method.
Coroutines (and tasks) can only run when the event loop is running.

定義一個協(xié)程(Coroutine)
import time
import asyncio

# 定義協(xié)程
async def test(x):
    print("wait:", x)
    await asyncio.sleep(x)
    
start = time.time()

coroutine = test(1)

# 獲取事件循環(huán)
loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine)

print("time:", time.time() - start)

# 輸出:
# wait: 1
# time: 1.0050649642944336
定義一個任務(wù)(Task / Future)

Future 對象保存了協(xié)程的狀態(tài),可以用來獲取協(xié)程的執(zhí)行返回結(jié)果。
asyncio.ensure_future(coroutine)loop.create_task(coroutine) 都可以創(chuàng)建任務(wù),run_until_complete 的參數(shù)是一個 futrue 對象。當傳入一個協(xié)程方法時,其內(nèi)部會自動封裝成task,task是Future的子類。

import time
import asyncio

# 定義協(xié)程
async def test(x):
    print("wait:", x)
    await asyncio.sleep(x)

start = time.time()

coroutine = test(1)

loop = asyncio.get_event_loop()
# future
# task = asyncio.ensure_future(coroutine)
# 顯式創(chuàng)建任務(wù):task 是 future 的子類
task = loop.create_task(coroutine)

print(task)
loop.run_until_complete(task)
print(task)
print("time:", time.time() - start)

# <Task pending coro=<test() running at xxx>>
# wait: 1
# <Task finished coro=<test() done, defined at xxx> result=None>
# time: 1.006286859512329
綁定回調(diào) / 獲取任務(wù)返回結(jié)果
import time
import asyncio

# 定義協(xié)程
async def test(x):
    print("wait:", x)
    await asyncio.sleep(x)
    return "done of {}".format(x)

def callback(future):
    print("callback:", future.result())
    
start = time.time()    

coroutine = test(1)
loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)
# 回調(diào)
task.add_done_callback(callback)
loop.run_until_complete(task)

# 直接獲取
print("result:", task.result())
print("time:", time.time() - start)

# wait: 1
# callback: done of 1
# result: done of 1
# time: 1.0015690326690674
并發(fā)、并發(fā)控制

多個協(xié)程注冊到事件循環(huán)中,當執(zhí)行某一個協(xié)程時在任務(wù)阻塞的時候用 await 掛起,其他協(xié)程繼續(xù)工作。

import time
import asyncio

async def test(x):
    print("wait:", x)
    await asyncio.sleep(x)
    return "done of {}".format(x)

start = time.time()

# sleep 1s 2s 3s
coroutine1 = test(1)
coroutine2 = test(2)
coroutine3 = test(3)

loop = asyncio.get_event_loop()

task = [
    loop.create_task(coroutine1),
    loop.create_task(coroutine2),
    loop.create_task(coroutine3)
]

# wait方式
# run_task = asyncio.wait(task)
# gather 能保證有序的結(jié)果返回
run_task = asyncio.gather(*task)

loop.run_until_complete(run_task)

for t in task:
    print("task result:", t.result())

print("time:", time.time() - start)

# 輸出:
wait: 1
wait: 2
wait: 3
task result: done of 1
task result: done of 2
task result: done of 3
time: 3.0037271976470947

通過 Semaphore 信號量機制控制并發(fā)數(shù)量
通過 await 再調(diào)用另外一個協(xié)程,這樣可以實現(xiàn)協(xié)程的嵌套

  • await asyncio.gather(*task)
  • await asyncio.wait(task)
  • asyncio.as_completed(task)
import time
import asyncio
import aiohttp

URL = "https://www.baidu.com"

# 設(shè)置并發(fā)數(shù):3
sema = asyncio.Semaphore(3)

cookie_jar = aiohttp.CookieJar(unsafe=True)
session = None

async def fetcher(url, index):
    """
    通過 aiohttp 非阻塞的方式訪問 URL 資源
    """
    async with session.get(url) as resp:
        print("start fetch index:{}".format(index))
        # 假裝多卡1秒
        await asyncio.sleep(1)
        return await resp.text()
    
async def worker(url, index):
    """
    Semaphore信號量機制控制并發(fā)
    """
    with (await sema):
        resp = await fetcher(url, index)
        return ("index:", index, len(resp), time.time())

async def dispatch(task_list):
    """
    派發(fā)下載任務(wù)
    """
    # init session
    global session
    session = aiohttp.ClientSession(cookie_jar=cookie_jar)
    # send task
    tasks = [asyncio.ensure_future(worker(URL, t)) for t in task_list]
    for task in asyncio.as_completed(tasks):
        resp = await task
        print(resp)
    # release session
    session.close()

start = time.time()
        
loop = asyncio.get_event_loop()
coroutine = dispatch(range(5))
loop.run_until_complete(coroutine)
print("total time:", time.time() - start)

# 輸出:
start fetch index:2
start fetch index:1
start fetch index:0
('index:', 2, 227, 1508508870.628295)
('index:', 1, 227, 1508508870.642124)
('index:', 0, 227, 1508508870.6424)
start fetch index:4
start fetch index:3
('index:', 4, 227, 1508508871.736131)
('index:', 3, 227, 1508508871.737195)
total time: 2.2324538230895996
協(xié)程停止

Future 對象狀態(tài):

  • pending
  • running
  • waiting (瞎蒙的)
  • done
  • canceled

Future 對象在協(xié)程創(chuàng)建之后狀態(tài)為 pending,事件循環(huán)調(diào)度執(zhí)行協(xié)程時狀態(tài)變?yōu)?running,想要停止協(xié)程,調(diào)用 future.cancel() 即可。

import time
import asyncio

async def test(x):
    print("wait:", x)
    await asyncio.sleep(x)
    return "done of {}".format(x)

coroutine1 = test(1)
coroutine2 = test(10)
coroutine3 = test(15)

loop = asyncio.get_event_loop()

task = [
    loop.create_task(coroutine1),
    loop.create_task(coroutine2),
    loop.create_task(coroutine3)
]

start = time.time()

try:
    loop.run_until_complete(asyncio.wait(task))
except KeyboardInterrupt:
    print(asyncio.Task.all_tasks())
    print(asyncio.gather(*asyncio.Task.all_tasks()).cancel())
    loop.stop()
    loop.run_forever()
finally:
    loop.close()

print("time:", time.time() - start)

# 輸出:
wait: 1
wait: 10
wait: 15
^C{<Task pending coro=<test() running at a.py:6> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /usr/local/var/pyenv/versions/3.5.3/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py:422]>, <Task pending coro=<test() running at a.py:6> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_wait.<locals>._on_completion() at /usr/local/var/pyenv/versions/3.5.3/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py:422]>, <Task pending coro=<wait() running at /usr/local/var/pyenv/versions/3.5.3/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py:355> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_run_until_complete_cb() at /usr/local/var/pyenv/versions/3.5.3/Python.framework/Versions/3.5/lib/python3.5/asyncio/base_events.py:176]>, <Task finished coro=<test() done, defined at a.py:4> result='done of 1'>}
True
time: 1.4758961200714111
結(jié)語

Asyncio 對于熟悉 Tornado 或 Twisted 等異步框架的同學(xué)上手起來會很快,編程風(fēng)格也可以很"同步化"。目前我們僅在生產(chǎn)環(huán)境嘗試了 asyncio + aiohttp 作為網(wǎng)絡(luò)采集的解決方案,初步使用下來感覺還是挺穩(wěn)定的,并且避免了之前使用 Gevent Monkey Patch 的侵入式改動,Aysncio 還有更多的場景等待我們?nèi)グl(fā)掘(比如 aiohttp 作為 Web 服務(wù))。

目前 Github 開源的部分支持異步非阻塞的 aio 庫,鏈接:https://github.com/aio-libs

對于新事物,永遠保持一顆探索的心,共勉。

參考

https://docs.python.org/3/library/asyncio.html
https://liam0205.me/2017/01/17/layers-and-operation-system/
https://segmentfault.com/a/1190000003063859

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容