上一次的抓取豆瓣高分計(jì)算機(jī)書(shū)籍的案例,采用的是完全同步的方式。即單個(gè)線程依次執(zhí)行完所有的邏輯,這樣存在的問(wèn)題就是我們的爬蟲(chóng)程序會(huì)非常的慢。
所以本文作為上一次案例的升級(jí)版本,通過(guò)循序漸進(jìn)、動(dòng)手實(shí)踐的方式來(lái)達(dá)到更好的學(xué)習(xí)效果。
相對(duì)于上次的案例,本次主要采用多線程+隊(duì)列的方式來(lái)實(shí)現(xiàn)。
用到的包:
import requests
from bs4 import BeautifulSoup
import re
import numpy as np
import csv
import time
import threading
import queue
本次新增了兩個(gè)包,threading 和 queue。threading 是用來(lái)進(jìn)行多線程編程的,queue 也就是用來(lái)創(chuàng)建隊(duì)列。至于更詳細(xì)的使用方法,可以上網(wǎng)自行學(xué)習(xí)。這里就不多做介紹了。
主要流程:
- 生成 URL
- 創(chuàng)建兩個(gè)隊(duì)列,一個(gè)用保存生成的URL(隊(duì)列1),一個(gè)保存HTML文檔(隊(duì)列2)
- 創(chuàng)建若干個(gè)線程來(lái)下載 HTML,并且保存到隊(duì)列2
- 創(chuàng)建若干個(gè)線程解析文檔
- 排序并保存
代碼:
生成分頁(yè)URL地址
def make_url(page)
根據(jù)評(píng)分排序
def _sort(result)
保存到csv
def save(data)
請(qǐng)求url,下載html
def req_page()
解析html,獲取評(píng)分
def get_content()
以上前三個(gè)方法都沒(méi)有改動(dòng),主要是第四個(gè)和第五個(gè)。
req_page(): 用來(lái)請(qǐng)求url。
def req_page():
while True:
try:
url = url_task.get(block=False)
resp = requests.get(url)
html = resp.text
task_html.put(html)
time.sleep(1)
except:
break
以上代碼會(huì)被若干個(gè)線程執(zhí)行,每一個(gè)線程的流程都是不段的從 url_task 也就是我們創(chuàng)建的隊(duì)列1中取出一個(gè)URL,然后執(zhí)行請(qǐng)求,并把下載到的 HTML 放入隊(duì)列2。這里有兩點(diǎn)要注意的。第一個(gè)點(diǎn)就是通過(guò) url_task.get() 方法從隊(duì)列里拿出任務(wù)的時(shí)候,由于我們的隊(duì)列1是提前設(shè)定好的,也就是說(shuō)當(dāng)下載線程取任務(wù)的時(shí)候并不會(huì)發(fā)生 queue.Empty 的異常。只有當(dāng)隊(duì)列中的數(shù)據(jù)被處理完的時(shí)候才會(huì)執(zhí)行 except,那么線程就可以通過(guò)這個(gè)來(lái)退出。第二點(diǎn)是sleep這塊 ,因?yàn)檎?qǐng)求太頻繁會(huì)被豆瓣封掉IP。
get_content():
def get_content():
if task_html.qsize() > 10:
while True:
try:
html = task_html.get(block=False)
bs4 = BeautifulSoup(html, "lxml")
book_info_list = bs4.find_all('li', class_='subject-item')
if book_info_list is not None:
for book_info in book_info_list:
list_ = []
try:
star = book_info.find('span', class_='rating_nums').get_text()
if float(star) < 9.0:
continue
title = book_info.find('h2').get_text().replace(' ', '').replace('\n', '')
comment = book_info.find('span', class_='pl').get_text()
comment = re.sub("\D", "", comment)
list_.append(title)
list_.append(comment)
list_.append(star)
task_res.append(list_)
except:
continue
except:
break
這個(gè)函數(shù)首先判斷一下 HTML 文檔隊(duì)列(隊(duì)列2)的大小是不是大于10,目的是防止解析線程比下載線程執(zhí)行的快,如果解析線程快于下載線程,那么再還沒(méi)有下載完所有的URL時(shí),就觸發(fā)隊(duì)列的 queue.Empty異常,從而過(guò)早退出線程。中間的代碼也是上次案例中的代碼,不同之處也就是以前是從列表中讀取,現(xiàn)在是從隊(duì)列中讀取。同時(shí)這個(gè)函數(shù)也是由多個(gè)解析線程執(zhí)行。
主函數(shù):
# 生成分頁(yè)url
url_list = make_url(50)
# url 隊(duì)列 (隊(duì)列1)
url_task = queue.Queue()
for url in url_list:
url_task.put(url)
# 下載好的html隊(duì)列 (隊(duì)列2)
task_html = queue.Queue()
# 最終結(jié)果列表
task_res = []
threads = []
# 獲取html線程
for i in range(5):
threads.append(threading.Thread(target=req_page))
# 解析html線程
threads.append(threading.Thread(target=get_content))
threads.append(threading.Thread(target=get_content))
for i in threads:
i.start()
i.join()
# 主線程排序保存
save(_sort(task_res))
主函數(shù)的流程也就是最開(kāi)始寫(xiě)的五個(gè)流程。因?yàn)槲覀儎?chuàng)建的所有線程都調(diào)用了 join() 方法,那么在最后執(zhí)行排序和保存操作的時(shí)候,所有的子線程都已經(jīng)執(zhí)行完畢了。