協(xié)程一例:用aiohttp代替requests寫異步爬蟲

這篇文章不規(guī)范也不完整,重新整理的更詳細規(guī)范的介紹見這里,
非常不建議閱讀下文。

網(wǎng)上aiohttp做爬蟲的資料太少,官網(wǎng)文檔是英文的看起來麻煩,所以自己部分半帶翻譯式的總結下

通過requests獲取html的函數(shù)基本上是這樣

import requests


def func(url: str) ->str:
    headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.100 Safari/537.36'}
    cookies = {'Cookie': ''}
    # 這里暫時懶得用session, verify參數(shù)忽略https網(wǎng)頁的ssl驗證
    r = requests.get(url, headers=headers, timeout=10, cookies=cookies, verify=False)
    r.encoding = r.apparent_encoding  # 自動識別網(wǎng)頁編碼避免中文亂碼,但會拖慢程序
    return r.text  # 或r.content


func('www.sina.com')

aiohttp改寫

import asyncio

import aiohttp


async def html(url: str) ->str:
    code = 'utf-8'
    headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.100 Safari/537.36'}
    async with aiohttp.ClientSession() as session:
        # 老版本aiohttp沒有verify參數(shù),如果報錯卸載重裝最新版本
        async with session.get(url, headers=headers, timeout=10, verify_ssl=False) as r:
            # text()函數(shù)相當于requests中的r.text,r.read()相當于requests中的r.content
            return await r.text()


loop = asyncio.get_event_loop()
loop.run_until_complete(html('www.sina.com'))
# 對需要ssl驗證的網(wǎng)頁,需要250ms左右等待底層連接關閉
loop.run_until_complete(asyncio.sleep(0.25))
loop.close()

基本上的改寫如上,協(xié)程本身的概念不是重點,優(yōu)越性單線程開銷小啥的也不說了,這里只講幾個坑/注意事項。參考文檔

  • 如果要返回text和content:
# requests
return r.text, r.content
# aiohttp
return await r.text(), await r.read()  # 不要漏后面的await,每個coroutine都要接await
  • r.text()報編碼錯誤
return await r.text(errors='ignore')  # 直接忽略那些錯誤,默認是strict嚴格模式導致出現(xiàn)錯誤時會直接拋異常終止程序。

這里注意到,r.encoding = r.apparent_encoding的原理是什么?為什么aiohttp沒有類似代碼?
首先,看一下r.apparent_encoding的源碼

image.png

可以看出,寫法其實就是

import chardet  # 有requests模塊的話已經(jīng)安裝了這個


code = chardet.detect(content)['encoding']

換句話說,套用到aiohttp的代碼中,本來應該這么寫

import asyncio

import aiohttp
import chardet


async def html(url: str) ->str:
    code = 'utf-8'
    headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.100 Safari/537.36'}
    async with aiohttp.ClientSession() as session:
        # 老版本aiohttp沒有verify參數(shù),如果報錯卸載重裝最新版本
        async with session.get(url, headers=headers, timeout=10, verify_ssl=False) as r:
            content = await r.read()
            code = chardet.detect(content)['encoding']
            # text()函數(shù)相當于requests中的r.text,不帶參數(shù)則自動識別網(wǎng)頁編碼,同樣會拖慢程序。r.read()相當于requests中的r.content
            return await r.text(encoding=code, errors='ignore')

不過實際上,r.text()encoding=None(默認參數(shù))的時候已經(jīng)包含了這一步,所以其實無需操心什么chardet,出現(xiàn)編碼錯誤先ignore再單個網(wǎng)頁具體分析,或者就不管算了。
這部分見文檔

If encoding is None content encoding is autocalculated using Content-Type HTTP header and chardet tool if the header is not provided by server.
cchardet is used with fallback to chardet if cchardet is not available.

  • 超時異常處理
    捕捉就好了...基本上碰到的有這些異常
    asyncio.TimeoutError
    aiohttp.client_exceptions.ServerDisconnectedError
    aiohttp.client_exceptions.InvalidURL
    aiohttp.client_exceptions.ClientConnectorError

文檔所寫

import async_timeout

with async_timeout.timeout(0.001):
    async with session.get('https://github.com') as r:
        await r.text()

用了with還是會拋timeout異常...這時要把時間設的稍微長一點比如10s,以及捕捉timeout異常。此外,這種寫法會避免concurrent.futures._base.CancelledError異常。這個異常意思是超時的場合還沒完成的任務會被事件循環(huán)取消掉。

The event loop will ensure to cancel the waiting task when that timeout is reached and the task hasn't completed yet.

下面是兩段作用完全一樣的代碼(有比較多的簡化只保證正常運行),對比aiohttp和多線程
作用是讀取網(wǎng)頁內(nèi)容的標題和正文

aiohttp

import asyncio

import aiohttp
# pip install readability-lxml以安裝
from readability import Document


def title_summary(content: bytes, url: str):
    doc = Document(content, url)
    print(doc.short_title(), doc.summary())


async def read_one(id_: int, url: str):
    headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.100 Safari/537.36'}
    async with aiohttp.ClientSession() as session:
        try:
            async with session.get(
                    url, headers=headers, timeout=1, verify_ssl=False) as r:
                await asyncio.sleep(1 + random())
                content, text = await r.read(), await r.text(
                    encoding=None, errors='ignore')
                if text:
                    title_summary(content, url)
        except:
            pass


def read_many(links: list):
    loop = asyncio.get_event_loop()
    to_do = [read_one(id_, url) for id_, url in links]
    loop.run_until_complete(asyncio.wait(to_do))
    # 或loop.run_until_complete(asyncio.gather(*to_do))這兩行代碼作用似乎沒啥區(qū)別
    loop.close()


def main():
    links = [...]  # 要跑的所有鏈接列表
    read_many(links)


if __name__ == '__main__':
    main()

多線程

from concurrent import futures


import requests
from readability import Document


def title_summary(content: bytes, url: str):
    doc = Document(content, url)
    print(doc.short_title(), doc.summary())


def read_one(url: str):
    headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.100 Safari/537.36'}
        try:
            r = requests.get(url, headers=headers, timeout=1, verify=False)
            r.encoding = r.apparent_encoding
            content, text = r.content, await r.text
            if text:
                title_summary(content, url)
        except:
            pass


def read_many(links: list) ->int:
    workers = min(100, len(links))  # 線程數(shù)
    with futures.ThreadPoolExecutor(workers) as e:
        res = e.map(read_one, links)
    return len(list(res))


def main():
    links = [...]
    read_many(links)


if __name__ == '__main__':
    main()

基本上,協(xié)程和線程的使用就是這樣。但是,如果,任務數(shù)以千計時,asyncio可能會報錯:ValueError: too many file descriptors in select()
這是因為asyncio內(nèi)部調用select,這個打開文件數(shù)是有限度的,這部分需要復習深入理解計算機系統(tǒng)一書。
這個場合不能這樣寫,有可能用到回調,其實也可以不用

def read_many(links: list):
    loop = asyncio.get_event_loop()
    to_do = [read_one(id_, url) for id_, url in links]
    loop.run_until_complete(asyncio.wait(to_do))
    # 或loop.run_until_complete(asyncio.gather(*to_do))這兩行代碼作用似乎沒啥區(qū)別
    loop.close()

以上代碼這樣改

def read_many(links: list):
    loop = asyncio.get_event_loop()
    for id_, url in links:
        task = asyncio.ensure_future(read_one(id_, url))
        loop.run_until_complete(task)
    loop.close()

即可。

這樣改完不再是并發(fā)而是順序執(zhí)行,正確的寫法見文章開頭鏈接的回調部分。

如果要用回調的話,比較麻煩,不少地方要修改,見下,主要是參數(shù)傳遞上要多多注意。
其實沒有必要用回調,雖然拆開寫似乎更規(guī)范,而且可以在需要請求其他頁面時重用,但是受限很多。

import asyncio

import aiohttp
# pip install readability-lxml以安裝
from readability import Document


def title_summary(fut):
    res = fut.result()  # 回調中調用result()才是上個函數(shù)的真實返回值
    if res:
        content, url = res
        doc = Document(content, url)
        print(doc.short_title(), doc.summary())


async def read_one(id_: int, url: str):
    headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.100 Safari/537.36'}
    async with aiohttp.ClientSession() as session:
        try:
            async with session.get(
                    url, headers=headers, timeout=1, verify_ssl=False) as r:
                await asyncio.sleep(1 + random())
                return await r.read(), await r.text(encoding=None, errors='ignore')
        except:
            pass


def read_many(links: list):
    loop = asyncio.get_event_loop()
    for id_, url in links:
        task = asyncio.ensure_future(read_one(id_, url))
        # 注意參數(shù)問題,這里不能傳遞多個參數(shù),要么用functool的partial,要么干脆傳遞元組解包,也可以用lambda,官方比較推薦functool這里就不寫了
        task.add_done_callback(title_summary)
        loop.run_until_complete(task)
    loop.close()


def main():
    links = [...]  # 要跑的所有鏈接列表
    read_many(links)


if __name__ == '__main__':
    main()
最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,695評論 19 139
  • 原文:http://m.itdecent.cn/p/4e048726b613 引言 隨著node.js的盛行,...
    jacke121閱讀 2,296評論 1 3
  • 電影院 圖書館 故事長 票根短 借來的書 沒讀完 要么賴著不還 要么買新版替換 可電影院 的電影啊 即使加場 也會...
    段童閱讀 524評論 0 2
  • 如果你會ios,那么對于cocoapods你一定不會陌生,cocoapods是一個方便管理你項目中的一些使用到的第...
    無名lxl閱讀 308評論 0 1
  • 李存勖是后唐的建立者,在前半生(38年),他用熱血與勇氣打造了一個國家;后半生(稱帝后3年),他用樂器和吝嗇摧毀了...
    梁木純閱讀 554評論 6 4

友情鏈接更多精彩內(nèi)容