python3比多線程和多進(jìn)程還好的新模塊 —— 協(xié)程Coroutine

引子

最近把所有系統(tǒng)的python3 版本都更新到了python3.7,然后更新了一下代碼,發(fā)現(xiàn)這個(gè)版本改動(dòng)還是很大的,之前更多還是在使用python2.7做ETL或者操作一些API,沒(méi)想到python的變化如此之大,看來(lái)自己還是太落伍了。于是在知乎和官網(wǎng)上找資料學(xué)習(xí)了下,看到一篇講協(xié)程的文章很受啟發(fā),以后應(yīng)該會(huì)較多使用這個(gè)功能,之前使用的多進(jìn)程多線程效果都不明顯,而協(xié)程應(yīng)該是一個(gè)python的效率利器。

前言

多進(jìn)程和多線程除了創(chuàng)建的開(kāi)銷(xiāo)大之外還有一個(gè)難以根治的缺陷,就是處理進(jìn)程之間或線程之間的協(xié)作問(wèn)題,因?yàn)槭且蕾?lài)多進(jìn)程和多線程的程序在不加鎖的情況下通常是不可控的,而協(xié)程則可以完美地解決協(xié)作問(wèn)題,由用戶(hù)來(lái)決定協(xié)程之間的調(diào)度。

總所周知,Python因?yàn)橛蠫IL(全局解釋鎖)這玩意,不可能有真正的多線程的存在,因此很多情況下都會(huì)用multiprocessing實(shí)現(xiàn)并發(fā),而且在Python中應(yīng)用多線程還要注意關(guān)鍵地方的同步,不太方便,用協(xié)程代替多線程和多進(jìn)程是一個(gè)很好的選擇,因?yàn)樗说奶匦裕?em>主動(dòng)調(diào)用/退出,狀態(tài)保存,避免cpu上下文切換等…

協(xié)程

基本概念

協(xié)程,又稱(chēng)作Coroutine,通過(guò) async/await 語(yǔ)法進(jìn)行聲明,是編寫(xiě)異步應(yīng)用的推薦方式。

從字面上來(lái)理解,即協(xié)同運(yùn)行的例程,它是比是線程(thread)更細(xì)量級(jí)的用戶(hù)態(tài)線程,特點(diǎn)是允許用戶(hù)的主動(dòng)調(diào)用和主動(dòng)退出,掛起當(dāng)前的例程然后返回值或去執(zhí)行其他任務(wù),接著返回原來(lái)停下的點(diǎn)繼續(xù)執(zhí)行。等下,這是否有點(diǎn)奇怪?我們都知道一般函數(shù)都是線性執(zhí)行的,不可能說(shuō)執(zhí)行到一半返回,等會(huì)兒又跑到原來(lái)的地方繼續(xù)執(zhí)行。但一些熟悉python(or其他動(dòng)態(tài)語(yǔ)言)的童鞋都知道這可以做到,答案是用yield語(yǔ)句。其實(shí)這里我們要感謝操作系統(tǒng)(OS)為我們做的工作,因?yàn)樗哂術(shù)etcontext和swapcontext這些特性,通過(guò)系統(tǒng)調(diào)用,我們可以把上下文和狀態(tài)保存起來(lái),切換到其他的上下文,這些特性為coroutine的實(shí)現(xiàn)提供了底層的基礎(chǔ)。操作系統(tǒng)的Interrupts和Traps機(jī)制則為這種實(shí)現(xiàn)提供了可能性,因此它看起來(lái)可能是下面這樣的:

image
>>> import asyncio

>>> async def main():
...     print('hello')
...     await asyncio.sleep(1)
...     print('world')

>>> asyncio.run(main())
hello
world

理解生成器(generator)

學(xué)過(guò)生成器和迭代器的同學(xué)應(yīng)該都知道python有yield這個(gè)關(guān)鍵字,yield能把一個(gè)函數(shù)變成一個(gè)generator,與return不同,yield在函數(shù)中返回值時(shí)會(huì)保存函數(shù)的狀態(tài),使下一次調(diào)用函數(shù)時(shí)會(huì)從上一次的狀態(tài)繼續(xù)執(zhí)行,即從yield的下一條語(yǔ)句開(kāi)始執(zhí)行,這樣做有許多好處,比如我們想要生成一個(gè)數(shù)列,若該數(shù)列的存儲(chǔ)空間太大,而我們僅僅需要訪問(wèn)前面幾個(gè)元素,那么yield就派上用場(chǎng)了,它實(shí)現(xiàn)了這種一邊循環(huán)一邊計(jì)算的機(jī)制,節(jié)省了存儲(chǔ)空間,提高了運(yùn)行效率。

運(yùn)行協(xié)程

  1. asyncio.run() 函數(shù)用來(lái)運(yùn)行最高層級(jí)的入口點(diǎn) "main()" 函數(shù)

  2. 等待一個(gè)協(xié)程。以下代碼段會(huì)在等待 1 秒后打印 "hello",然后 再次 等待 2 秒后打印 "world":

    import asyncio
    import time
    
    async def say_after(delay, what):
        await asyncio.sleep(delay)
        print(what)
    
    async def main():
        print(f"started at {time.strftime('%X')}")
    
        await say_after(1, 'hello')
        await say_after(2, 'world')
    
        print(f"finished at {time.strftime('%X')}")
    
    asyncio.run(main())
    
  1. asyncio.create_task() 函數(shù)用來(lái)并發(fā)運(yùn)行作為 asyncio 任務(wù) 的多個(gè)協(xié)程。

    async def main():
        task1 = asyncio.create_task(
            say_after(1, 'hello'))
    
        task2 = asyncio.create_task(
            say_after(2, 'world'))
    
        print(f"started at {time.strftime('%X')}")
    
        # Wait until both tasks are completed (should take
        # around 2 seconds.)
        await task1
        await task2
    
        print(f"finished at {time.strftime('%X')}")
    

可等待對(duì)象

如果一個(gè)對(duì)象可以在 await 語(yǔ)句中使用,那么它就是 可等待 對(duì)象。許多 asyncio API 都被設(shè)計(jì)為接受可等待對(duì)象。

可等待 對(duì)象有三種主要類(lèi)型: 協(xié)程, 任務(wù) 和 Future.

協(xié)程

Python 協(xié)程屬于 可等待 對(duì)象,因此可以在其他協(xié)程中被等待:

import asyncio

async def nested():
    return 42

async def main():
    # Nothing happens if we just call "nested()".
    # A coroutine object is created but not awaited,
    # so it *won't run at all*.
    nested()

    # Let's do it differently now and await it:
    print(await nested())  # will print "42".

asyncio.run(main())

重要

在本文檔中 "協(xié)程" 可用來(lái)表示兩個(gè)緊密關(guān)聯(lián)的概念:

  • 協(xié)程函數(shù): 定義形式為 async def 的函數(shù);
  • 協(xié)程對(duì)象: 調(diào)用 協(xié)程函數(shù) 所返回的對(duì)象。

asyncio 也支持舊式的 基于生成器的 協(xié)程。

任務(wù)

任務(wù) 被用來(lái)設(shè)置日程以便 并發(fā) 執(zhí)行協(xié)程。

當(dāng)一個(gè)協(xié)程通過(guò) asyncio.create_task() 等函數(shù)被打包為一個(gè) 任務(wù),該協(xié)程將自動(dòng)排入日程準(zhǔn)備立即運(yùn)行:

import asyncio

async def nested():
    return 42

async def main():
    # Schedule nested() to run soon concurrently
    # with "main()".
    task = asyncio.create_task(nested())

    # "task" can now be used to cancel "nested()", or
    # can simply be awaited to wait until it is complete:
    await task

asyncio.run(main())

Future 對(duì)象

Future 是一種特殊的 低層級(jí) 可等待對(duì)象,表示一個(gè)異步操作的 最終結(jié)果。

當(dāng)一個(gè) Future 對(duì)象 被等待,這意味著協(xié)程將保持等待直到該 Future 對(duì)象在其他地方操作完畢。

在 asyncio 中需要 Future 對(duì)象以便允許通過(guò) async/await 使用基于回調(diào)的代碼。

通常情況下 沒(méi)有必要 在應(yīng)用層級(jí)的代碼中創(chuàng)建 Future 對(duì)象。

Future 對(duì)象有時(shí)會(huì)由庫(kù)和某些 asyncio API 暴露給用戶(hù),用作可等待對(duì)象:

async def main():
    await function_that_returns_a_future_object()

    # this is also valid:
    await asyncio.gather(
        function_that_returns_a_future_object(),
        some_python_coroutine()
    )

一個(gè)很好的返回對(duì)象的低層級(jí)函數(shù)的示例是 loop.run_in_executor()。

并發(fā)運(yùn)行任務(wù)

awaitable asyncio.gather(*aws, loop=None, return_exceptions=False)

并發(fā) 運(yùn)行 aws 序列中的 可等待對(duì)象

如果 aws 中的某個(gè)可等待對(duì)象為協(xié)程,它將自動(dòng)作為一個(gè)任務(wù)加入日程。

如果所有可等待對(duì)象都成功完成,結(jié)果將是一個(gè)由所有返回值聚合而成的列表。結(jié)果值的順序與 aws 中可等待對(duì)象的順序一致。

如果 return_exceptionsFalse (默認(rèn)),所引發(fā)的首個(gè)異常會(huì)立即傳播給等待 gather() 的任務(wù)。aws序列中的其他可等待對(duì)象 不會(huì)被取消 并將繼續(xù)運(yùn)行。

如果 return_exceptionsTrue,異常會(huì)和成功的結(jié)果一樣處理,并聚合至結(jié)果列表。

如果 gather() 被取消,所有被提交 (尚未完成) 的可等待對(duì)象也會(huì) 被取消。

如果 aws 序列中的任一 Task 或 Future 對(duì)象 被取消,它將被當(dāng)作引發(fā)了 CancelledError 一樣處理 -- 在此情況下 gather() 調(diào)用 不會(huì) 被取消。這是為了防止一個(gè)已提交的 Task/Future 被取消導(dǎo)致其他 Tasks/Future 也被取消。

import asyncio

async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({i})...")
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")

async def main():
    # Schedule three calls *concurrently*:
    await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
    )

asyncio.run(main())

# Expected output:
#
#     Task A: Compute factorial(2)...
#     Task B: Compute factorial(2)...
#     Task C: Compute factorial(2)...
#     Task A: factorial(2) = 2
#     Task B: Compute factorial(3)...
#     Task C: Compute factorial(3)...
#     Task B: factorial(3) = 6
#     Task C: Compute factorial(4)...
#     Task C: factorial(4) = 24

爬蟲(chóng)例子

使用爬蟲(chóng)爬取豆瓣top250

from lxml import etree
from time import time
import asyncio
import aiohttp

url = "https://movie.douban.com/top250"
header = {
    "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.86 Safari/537.36",
    "content-type": "text/plain;charset=UTF-8",
}


async def fetch_content(url):
    # await asyncio.sleep(1) # 防止請(qǐng)求過(guò)快 等待1秒
    async with aiohttp.ClientSession(
        headers=header, connector=aiohttp.TCPConnector(ssl=False)
    ) as session:
        async with session.get(url) as response:
            return await response.text()


async def parse(url):
    page = await fetch_content(url)
    html = etree.HTML(page)

    xpath_movie = '//*[@id="content"]/div/div[1]/ol/li'
    xpath_title = './/span[@class="title"]'
    xpath_pages = '//*[@id="content"]/div/div[1]/div[2]/a'
    xpath_descs = './/span[@class="inq"]'
    xpath_links = './/div[@class="info"]/div[@class="hd"]/a'

    pages = html.xpath(xpath_pages)  # 所有頁(yè)面的鏈接都在底部獲取
    fetch_list = []
    result = []

    for element_movie in html.xpath(xpath_movie):
        result.append(element_movie)

    for p in pages:
        fetch_list.append(url + p.get("href"))  # 解析翻頁(yè)按鈕對(duì)應(yīng)的鏈接 組成完整后邊頁(yè)面鏈接

    tasks = [fetch_content(url) for url in fetch_list]  # 并行處理所有翻頁(yè)的頁(yè)面
    pages = await asyncio.gather(*tasks)
    # 并發(fā) 運(yùn)行 aws 序列中的 可等待對(duì)象。
    # 如果 aws 中的某個(gè)可等待對(duì)象為協(xié)程,它將自動(dòng)作為一個(gè)任務(wù)加入日程。
    # 如果所有可等待對(duì)象都成功完成,結(jié)果將是一個(gè)由所有返回值聚合而成的列表。結(jié)果值的順序與 aws 中可等待對(duì)象的順序一致。
    for page in pages:
        html = etree.HTML(page)
        for element_movie in html.xpath(xpath_movie):
            result.append(element_movie)

    for i, movie in enumerate(result, 1):
        title = movie.find(xpath_title).text
        desc = (
            "<" + movie.find(xpath_descs).text + ">"
            if movie.find(xpath_descs) is not None
            else None
        )
        link = movie.find(xpath_links).get("href")
        print(i, title, desc, link)


async def main():
    start = time()
    for i in range(5):
        await parse(url)
    end = time()
    print("Cost {} seconds".format((end - start) / 5))


if __name__ == "__main__":
    asyncio.run(main())

參考文章

  1. 從0到1,Python異步編程的演進(jìn)之路
  2. Python3 Async/Await解釋
  3. 官方文檔 協(xié)程與任務(wù)
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容