這篇文章不規(guī)范也不完整,重新整理的更詳細規(guī)范的介紹見這里,
非常不建議閱讀下文。
網(wǎng)上aiohttp做爬蟲的資料太少,官網(wǎng)文檔是英文的看起來麻煩,所以自己部分半帶翻譯式的總結下
通過requests獲取html的函數(shù)基本上是這樣
import requests
def func(url: str) ->str:
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.100 Safari/537.36'}
cookies = {'Cookie': ''}
# 這里暫時懶得用session, verify參數(shù)忽略https網(wǎng)頁的ssl驗證
r = requests.get(url, headers=headers, timeout=10, cookies=cookies, verify=False)
r.encoding = r.apparent_encoding # 自動識別網(wǎng)頁編碼避免中文亂碼,但會拖慢程序
return r.text # 或r.content
func('www.sina.com')
用aiohttp改寫
import asyncio
import aiohttp
async def html(url: str) ->str:
code = 'utf-8'
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.100 Safari/537.36'}
async with aiohttp.ClientSession() as session:
# 老版本aiohttp沒有verify參數(shù),如果報錯卸載重裝最新版本
async with session.get(url, headers=headers, timeout=10, verify_ssl=False) as r:
# text()函數(shù)相當于requests中的r.text,r.read()相當于requests中的r.content
return await r.text()
loop = asyncio.get_event_loop()
loop.run_until_complete(html('www.sina.com'))
# 對需要ssl驗證的網(wǎng)頁,需要250ms左右等待底層連接關閉
loop.run_until_complete(asyncio.sleep(0.25))
loop.close()
基本上的改寫如上,協(xié)程本身的概念不是重點,優(yōu)越性單線程開銷小啥的也不說了,這里只講幾個坑/注意事項。參考文檔
- 如果要返回text和content:
# requests
return r.text, r.content
# aiohttp
return await r.text(), await r.read() # 不要漏后面的await,每個coroutine都要接await
- r.text()報編碼錯誤
return await r.text(errors='ignore') # 直接忽略那些錯誤,默認是strict嚴格模式導致出現(xiàn)錯誤時會直接拋異常終止程序。
這里注意到,r.encoding = r.apparent_encoding的原理是什么?為什么aiohttp沒有類似代碼?
首先,看一下r.apparent_encoding的源碼

可以看出,寫法其實就是
import chardet # 有requests模塊的話已經(jīng)安裝了這個
code = chardet.detect(content)['encoding']
換句話說,套用到aiohttp的代碼中,本來應該這么寫
import asyncio
import aiohttp
import chardet
async def html(url: str) ->str:
code = 'utf-8'
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.100 Safari/537.36'}
async with aiohttp.ClientSession() as session:
# 老版本aiohttp沒有verify參數(shù),如果報錯卸載重裝最新版本
async with session.get(url, headers=headers, timeout=10, verify_ssl=False) as r:
content = await r.read()
code = chardet.detect(content)['encoding']
# text()函數(shù)相當于requests中的r.text,不帶參數(shù)則自動識別網(wǎng)頁編碼,同樣會拖慢程序。r.read()相當于requests中的r.content
return await r.text(encoding=code, errors='ignore')
不過實際上,r.text()在encoding=None(默認參數(shù))的時候已經(jīng)包含了這一步,所以其實無需操心什么chardet,出現(xiàn)編碼錯誤先ignore再單個網(wǎng)頁具體分析,或者就不管算了。
這部分見文檔
If encoding is
Nonecontent encoding is autocalculated usingContent-TypeHTTP header and chardet tool if the header is not provided by server.
cchardet is used with fallback to chardet if cchardet is not available.
- 超時異常處理
捕捉就好了...基本上碰到的有這些異常
asyncio.TimeoutError
aiohttp.client_exceptions.ServerDisconnectedError
aiohttp.client_exceptions.InvalidURL
aiohttp.client_exceptions.ClientConnectorError
文檔所寫
import async_timeout
with async_timeout.timeout(0.001):
async with session.get('https://github.com') as r:
await r.text()
用了with還是會拋timeout異常...這時要把時間設的稍微長一點比如10s,以及捕捉timeout異常。此外,這種寫法會避免concurrent.futures._base.CancelledError異常。這個異常意思是超時的場合還沒完成的任務會被事件循環(huán)取消掉。
The event loop will ensure to cancel the waiting task when that timeout is reached and the task hasn't completed yet.
下面是兩段作用完全一樣的代碼(有比較多的簡化只保證正常運行),對比aiohttp和多線程
作用是讀取網(wǎng)頁內(nèi)容的標題和正文
aiohttp
import asyncio
import aiohttp
# pip install readability-lxml以安裝
from readability import Document
def title_summary(content: bytes, url: str):
doc = Document(content, url)
print(doc.short_title(), doc.summary())
async def read_one(id_: int, url: str):
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.100 Safari/537.36'}
async with aiohttp.ClientSession() as session:
try:
async with session.get(
url, headers=headers, timeout=1, verify_ssl=False) as r:
await asyncio.sleep(1 + random())
content, text = await r.read(), await r.text(
encoding=None, errors='ignore')
if text:
title_summary(content, url)
except:
pass
def read_many(links: list):
loop = asyncio.get_event_loop()
to_do = [read_one(id_, url) for id_, url in links]
loop.run_until_complete(asyncio.wait(to_do))
# 或loop.run_until_complete(asyncio.gather(*to_do))這兩行代碼作用似乎沒啥區(qū)別
loop.close()
def main():
links = [...] # 要跑的所有鏈接列表
read_many(links)
if __name__ == '__main__':
main()
多線程
from concurrent import futures
import requests
from readability import Document
def title_summary(content: bytes, url: str):
doc = Document(content, url)
print(doc.short_title(), doc.summary())
def read_one(url: str):
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.100 Safari/537.36'}
try:
r = requests.get(url, headers=headers, timeout=1, verify=False)
r.encoding = r.apparent_encoding
content, text = r.content, await r.text
if text:
title_summary(content, url)
except:
pass
def read_many(links: list) ->int:
workers = min(100, len(links)) # 線程數(shù)
with futures.ThreadPoolExecutor(workers) as e:
res = e.map(read_one, links)
return len(list(res))
def main():
links = [...]
read_many(links)
if __name__ == '__main__':
main()
基本上,協(xié)程和線程的使用就是這樣。但是,如果,任務數(shù)以千計時,asyncio可能會報錯:ValueError: too many file descriptors in select()
這是因為asyncio內(nèi)部調用select,這個打開文件數(shù)是有限度的,這部分需要復習深入理解計算機系統(tǒng)一書。
這個場合不能這樣寫,有可能用到回調,其實也可以不用
def read_many(links: list):
loop = asyncio.get_event_loop()
to_do = [read_one(id_, url) for id_, url in links]
loop.run_until_complete(asyncio.wait(to_do))
# 或loop.run_until_complete(asyncio.gather(*to_do))這兩行代碼作用似乎沒啥區(qū)別
loop.close()
以上代碼這樣改
def read_many(links: list):
loop = asyncio.get_event_loop()
for id_, url in links:
task = asyncio.ensure_future(read_one(id_, url))
loop.run_until_complete(task)
loop.close()
即可。
這樣改完不再是并發(fā)而是順序執(zhí)行,正確的寫法見文章開頭鏈接的回調部分。
如果要用回調的話,比較麻煩,不少地方要修改,見下,主要是參數(shù)傳遞上要多多注意。
其實沒有必要用回調,雖然拆開寫似乎更規(guī)范,而且可以在需要請求其他頁面時重用,但是受限很多。
import asyncio
import aiohttp
# pip install readability-lxml以安裝
from readability import Document
def title_summary(fut):
res = fut.result() # 回調中調用result()才是上個函數(shù)的真實返回值
if res:
content, url = res
doc = Document(content, url)
print(doc.short_title(), doc.summary())
async def read_one(id_: int, url: str):
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.100 Safari/537.36'}
async with aiohttp.ClientSession() as session:
try:
async with session.get(
url, headers=headers, timeout=1, verify_ssl=False) as r:
await asyncio.sleep(1 + random())
return await r.read(), await r.text(encoding=None, errors='ignore')
except:
pass
def read_many(links: list):
loop = asyncio.get_event_loop()
for id_, url in links:
task = asyncio.ensure_future(read_one(id_, url))
# 注意參數(shù)問題,這里不能傳遞多個參數(shù),要么用functool的partial,要么干脆傳遞元組解包,也可以用lambda,官方比較推薦functool這里就不寫了
task.add_done_callback(title_summary)
loop.run_until_complete(task)
loop.close()
def main():
links = [...] # 要跑的所有鏈接列表
read_many(links)
if __name__ == '__main__':
main()