序言
第1章 Scrapy介紹
第2章 理解HTML和XPath
第3章 爬蟲基礎(chǔ)
第4章 從Scrapy到移動應(yīng)用
第5章 快速構(gòu)建爬蟲
第6章 Scrapinghub部署
第7章 配置和管理
第8章 Scrapy編程
第9章 使用Pipeline
第10章 理解Scrapy的性能
第11章(完) Scrapyd分布式抓取和實(shí)時(shí)分析
我們已經(jīng)學(xué)了很多東西。我們先學(xué)習(xí)了兩種基礎(chǔ)的網(wǎng)絡(luò)技術(shù),HTML和XPath,然后我們學(xué)習(xí)了使用Scrapy抓取復(fù)雜的網(wǎng)站。接著,我們深入學(xué)習(xí)了Scrapy的設(shè)置,然后又進(jìn)一步深入學(xué)習(xí)了Scrapy和Python的內(nèi)部架構(gòu)和Twisted引擎的異步特征。在上一章中,我們學(xué)習(xí)了Scrapy的性能和以及處理復(fù)雜的問題以提高性能。
在本章中,我將展示如何在多臺服務(wù)器上進(jìn)一步提高性能。我們會發(fā)現(xiàn)抓取通常是一個并行問題;因此,我們可以水平延展至多臺服務(wù)器。為了這么做,我們會使用一個Scrapy中間件,我們還會使用Scrapyd,一個用來管理遠(yuǎn)程服務(wù)器爬蟲的應(yīng)用。它可以讓我們像第6章那樣進(jìn)行抓取。
我們最后用Apache Spark對提取的數(shù)據(jù)進(jìn)行實(shí)時(shí)分析。Spark一個非常流行的大數(shù)據(jù)處理框架。收集的數(shù)據(jù)越多、結(jié)果就變得越準(zhǔn)確,我們使用Spark Streaming API展示結(jié)果。最后的結(jié)果展示了Python的強(qiáng)大和成熟,單單用Python的簡明代碼就全棧開發(fā)了從抓取到分析的全過程。
房子的標(biāo)題如何影響價(jià)格?
我們要研究個問題是房子的標(biāo)題和價(jià)格有什么關(guān)系。我們預(yù)計(jì)像“按摩浴缸”和“游泳池”可能和高價(jià)相關(guān),而“打折”會和低價(jià)有關(guān)。將標(biāo)題與地點(diǎn)結(jié)合,例如,可以根據(jù)地點(diǎn)和描述,實(shí)時(shí)判斷哪個房子最劃算。
我們想計(jì)算的就是特定名詞對價(jià)格造成的偏移:

例如,如果平均租金是$1000,我們觀察到帶有按摩浴缸的房子的平均價(jià)格是$1300,沒有的價(jià)格是$995,因此按摩浴缸的偏移值為shiftjacuzzi=(1300-995)/1000=30.5%。如果一個帶有按摩浴缸的房子的價(jià)格直逼平均價(jià)格高5%,那么它的價(jià)格就很劃算。
因?yàn)槊~效應(yīng)會有累加,所以這個指標(biāo)并不繁瑣。例如,標(biāo)題同時(shí)含有按摩浴缸和打折會有一個混合效果。我們收集分析的數(shù)據(jù)越多,估算就會越準(zhǔn)確。稍后回到這個問題,接下來講一個流媒體解決方案。
Scrapyd
現(xiàn)在,我們來介紹Scrapyd。Scrapyd是一個應(yīng)用,使用它,我們可以將爬蟲附屬到服務(wù)器上,并對抓取進(jìn)行規(guī)劃。我們來看看它的使用是多么容易,我們用第3章的代碼,只做一點(diǎn)修改。
我們先來看Scrapyd的界面,在http://localhost:6800/。

你可以看到,它有幾個部分,有Jobs、Items、Logs和Documentation。它還給出了如何規(guī)劃抓取工作的API方法。
為了這么做,我們必須首先將爬蟲部署到服務(wù)器上。第一步是修改scrapy.cfg,如下所示:
$ pwd
/root/book/ch03/properties
$ cat scrapy.cfg
...
[settings]
default = properties.settings
[deploy]
url = http://localhost:6800/
project = properties
我們要做的就是取消url的注釋。默認(rèn)的設(shè)置適合我們。現(xiàn)在,為了部署爬蟲,我們使用scrapyd-client提供的工具scrapyd-deploy。scrapyd-client以前是Scrapy的一部分,但現(xiàn)在是一個獨(dú)立的模塊,可以用pip install scrapyd-client進(jìn)行安裝(開發(fā)機(jī)中已經(jīng)安裝了):
$ scrapyd-deploy
Packing version 1450044699
Deploying to project "properties" in http://localhost:6800/addversion.
json
Server response (200):
{"status": "ok", "project": "properties", "version": "1450044699",
"spiders": 3, "node_name": "dev"}
部署好之后,就可以在Scrapyd的界面的Available projects看到。我們現(xiàn)在可以根據(jù)提示,在當(dāng)前頁提交一個任務(wù):
$ curl http://localhost:6800/schedule.json -d project=properties -d spider=easy
{"status": "ok", "jobid": " d4df...", "node_name": "dev"}
如果我們返回Jobs,我們可以使用jobid schedule.json,它可以在之后用cancel.json取消任務(wù):
$ curl http://localhost:6800/cancel.json -d project=properties -d job=d4df...
{"status": "ok", "prevstate": "running", "node_name": "dev"}
一定要取消進(jìn)程,否則會浪費(fèi)計(jì)算資源。
完畢之后,訪問Logs,我們可以看到日志,在Items我們可以看到抓取過的items。這些數(shù)據(jù)會被周期清空以節(jié)省空間,所以一段時(shí)間后就會失效。
如果發(fā)生沖突或有其它理由的話,我們可以通過http_port修改端口,它是Scrapyd的諸多設(shè)置之一。最好閱讀文檔http://scrapyd.readthedocs.org/,多了解下。我們的部署必須要設(shè)置的是max_proc。如果使用默認(rèn)值0,任務(wù)的并行數(shù)量最多可以是CPU核心的四位。因?yàn)槲覀兛赡軙谔摂M機(jī)中運(yùn)行多個Scrapyd服務(wù)器,我們將max_proc設(shè)為4,可以允許4個任務(wù)同時(shí)進(jìn)行。在真實(shí)環(huán)境中,使用默認(rèn)值就可以。
分布式系統(tǒng)概述
設(shè)計(jì)這個系統(tǒng)對我是個挑戰(zhàn)。我一開始添加了許多特性,導(dǎo)致復(fù)雜度升高,只有高性能的機(jī)器才能完成工作。然后,又不得不進(jìn)行簡化,既對硬件性能要求不那么高,也可以讓本章的重點(diǎn)仍然是Scrapy。

最后,系統(tǒng)中會包括我們的開發(fā)機(jī)和幾臺服務(wù)器。我們用開發(fā)機(jī)進(jìn)行首頁的水平抓取,提取幾批URL。然后用輪訓(xùn)機(jī)制將URL分發(fā)到Scrapyd的結(jié)點(diǎn),并進(jìn)行抓取。最后,通過FTP傳遞.jl文件和Items到運(yùn)行Spark的服務(wù)器上。我選擇FTP和本地文件系統(tǒng),而不是HDFS或Apache Kafka,是因?yàn)镕TP內(nèi)存需求少,并且作為FEED_URI被Scrapy支持。請記住,只要簡單設(shè)置Scrapyd和Spark的配置,我們就可以使用亞馬遜S3存儲這些文件,獲得冗余度和可伸縮性等便利,而不用再使用其它技術(shù)。
筆記:FTP的缺點(diǎn)之一是,上傳過程可能會造成文件不完整。為了避免這點(diǎn),一旦上傳完成,我們便使用Pure-FTPd和調(diào)回腳本將文件上傳到/root/items。
每過幾秒,Spark都讀一下目錄/root/items,讀取任何新文件,取一個小批次進(jìn)行分析。我們使用Spark是因?yàn)樗С諴ython作為編程語言,也支持流分析。到現(xiàn)在,我們使用的爬蟲都比較短,實(shí)際中有的爬蟲是24小時(shí)運(yùn)行的,不斷發(fā)出數(shù)據(jù)流并進(jìn)行分析,數(shù)據(jù)越多,分析的結(jié)果越準(zhǔn)確。我們就是要用Spark進(jìn)行這樣的演示。
筆記:除了Spark和Scrapy,你還可以使用MapReduce,Apache Storm或其它框架。
在本章中,我們不向數(shù)據(jù)庫中插入items。我們在第9章中用的方法也可以在這里使用,但是性能很糟。很少有數(shù)據(jù)庫喜歡每秒被pipelines寫入幾千個文件。如果想進(jìn)行寫入的話,應(yīng)該用Spark專用的方法,即批次導(dǎo)入Items。你可以修改我們Spark的例子,向任何數(shù)據(jù)庫進(jìn)行批次導(dǎo)入。
還有,這個系統(tǒng)的彈性很差。我們假設(shè)每個結(jié)點(diǎn)都是健康的,任何一個損壞的話,也不會對總系統(tǒng)造成影響。Spark提供高可用性的彈性配置。Scrapy不提供此類內(nèi)建的功能,除了Scrapyd的“持續(xù)排隊(duì)”功能,即當(dāng)結(jié)點(diǎn)恢復(fù)時(shí),可以繼續(xù)失敗的任務(wù)。這個功能不一定對你有用。如果彈性對你十分重要,你就得搭建一個監(jiān)督系統(tǒng)和分布式排隊(duì)方案(例如,基于Kafka或RabbitMQ),可以重啟失敗的任務(wù)。
修改爬蟲和中間件
為了搭建這個系統(tǒng),我們要稍稍修改爬蟲和中間件。更具體地,我們要做如下工作:
- 微調(diào)爬蟲,使抓取索引頁的速度達(dá)到最大
- 寫一個中間件,可以將URL批次發(fā)送給scrapyd服務(wù)器。
- 使用相同的中間件,使系統(tǒng)啟動時(shí)就可以將URL分批
我們盡量用簡明的方式來完成這些工作。理想狀態(tài)下,整個過程應(yīng)該對底層的爬蟲代碼簡潔易懂。這是一個底層層面的要求,通過破解爬蟲達(dá)到相同目的不是好主意。
抓取共享首頁
第一步是優(yōu)化抓取首頁的速度,速度越快越好。開始之前,先明確一下目的。假設(shè)爬蟲的并發(fā)數(shù)是16,源網(wǎng)站的延遲大概是0.25秒。這樣,最大吞吐量是16/0.25=64頁/秒。首頁有5000O個子頁,每個索引頁有30個子頁,那就有1667個索引頁。預(yù)計(jì)下載整個首頁需要,1667/64=26秒。
將第3章中的爬蟲重命名為easy。我們將首先進(jìn)行垂直抓取的Rule(含有callback='parse_item'的一項(xiàng))注釋掉,因?yàn)楝F(xiàn)在只想抓取索引頁。
提示:本章的代碼位于目錄ch11。
在進(jìn)行優(yōu)化之前,我們讓scrapy crawl只抓取10個頁面,結(jié)果如下:
$ ls
properties scrapy.cfg
$ pwd
/root/book/ch11/properties
$ time scrapy crawl easy -s CLOSESPIDER_PAGECOUNT=10
...
DEBUG: Crawled (200) <GET ...index_00000.html> (referer: None)
DEBUG: Crawled (200) <GET ...index_00001.html> (referer: ...index_00000.
html)
...
real 0m4.099s
如果10個頁面用時(shí)4秒,26秒內(nèi)是不可能完成1700個頁面的。通過查看日志,我們看到每個索引頁都來自前一個頁面,也就是說,任何時(shí)候最多是在處理一個頁面。實(shí)際上,并發(fā)數(shù)是1。我們需要將其并行化,使達(dá)到并發(fā)數(shù)16。我們將索引頁相互共享,即URL互相連接,再加入一些其他的鏈接,以免爬蟲中沒有URL。我們將首頁分廠20個部分。實(shí)際上,任何大于16的數(shù),都可以提速,但是一旦超過20,速度反而會下降。我們用下面的方法計(jì)算每個部分的起始索引頁:
>>> map(lambda x: 1667 * x / 20, range(20))
[0, 83, 166, 250, 333, 416, 500, ... 1166, 1250, 1333, 1416, 1500, 1583]
據(jù)此,設(shè)置start_URL如下:
start_URL = ['http://web:9312/properties/index_%05d.html' % id
for id in map(lambda x: 1667 * x / 20, range(20))]
這可能會和你的情況不同,所以就不做美化了。將并發(fā)數(shù)(CONCURRENT_REQUESTS, CONCURRENT_REQUESTS_PER_DOMAIN)設(shè)為16,再次運(yùn)行爬蟲,運(yùn)行如下:
$ time scrapy crawl easy -s CONCURRENT_REQUESTS=16 -s CONCURRENT_
REQUESTS_PER_DOMAIN=16
...
real 0m32.344s
結(jié)果接近了我們的目標(biāo)。下載速度是1667頁面/32秒=52頁面/秒,也就是說,每秒可以產(chǎn)生52*30=1560個子頁面。我們現(xiàn)在可以注釋掉垂直抓取的Rule,將文件保存成一個爬蟲。我們不需要進(jìn)一步修改爬蟲代碼,而是用一個功能強(qiáng)大的中間件繼續(xù)來做。如果只用開發(fā)機(jī)運(yùn)行爬蟲,假設(shè)可以像抓取索引頁一樣抓取子頁,可以在50000/52=16分鐘內(nèi)完成抓取。
這里有兩個要點(diǎn)。在學(xué)習(xí)完第10章之后,我們在做的都是工程項(xiàng)目。我們可以想方設(shè)法計(jì)算出系統(tǒng)確切的性能。第二點(diǎn)是,抓取索引頁會產(chǎn)生子頁,但實(shí)際的吞吐量不大。如果產(chǎn)生URL的速度快過scrapyd處理URL的速度,URL就會在scrapyd排隊(duì)?;蛘撸绻a(chǎn)生URL的速度太慢,scrapyd就會空閑。
批次抓取URL
現(xiàn)在來處理子頁面的URL,并把它們分批,然后直接發(fā)送給scrapyds,而不是繼續(xù)抓取。
如果檢查Scrapy的架構(gòu),我們可以明白這么做就是為了做一個中間件,它可以執(zhí)行process_spider_output(),在Requests到達(dá)下載器之前就可以進(jìn)行處理或取消。我們限定中間件只支持CrawlSpider的爬蟲,并且只支持簡單的GET請求。如果要提高復(fù)雜度,例如,POST或認(rèn)證請求,我們必須開發(fā)更多的功能,以傳遞參數(shù)、頭文件、每個批次進(jìn)行重新登陸。
打開Scrapy的GitHub,查看SPIDER_MIDDLEWARES_BASE設(shè)置,看看能否重利用哪個程序。Scrapy 1.0有以下中間件:HttpErrorMiddleware、OffsiteMiddleware、RefererMiddleware、UrlLengthMiddleware和DepthMiddleware。我們看到OffsiteMiddleware(只有60行)好像使我們需要的。它根據(jù)爬蟲屬性allowed_domains限定URL。我們可以用相同的方法嗎?不是丟棄URL,我們轉(zhuǎn)而將它們分批,發(fā)送給scrapyds。我們確實(shí)可以這么做,部分代碼如下:
def __init__(self, crawler):
settings = crawler.settings
self._target = settings.getint('DISTRIBUTED_TARGET_RULE', -1)
self._seen = set()
self._URL = []
self._batch_size = settings.getint('DISTRIBUTED_BATCH_SIZE', 1000)
...
def process_spider_output(self, response, result, spider):
for x in result:
if not isinstance(x, Request):
yield x
else:
rule = x.meta.get('rule')
if rule == self._target:
self._add_to_batch(spider, x)
else:
yield x
def _add_to_batch(self, spider, request):
url = request.url
if not url in self._seen:
self._seen.add(url)
self._URL.append(url)
if len(self._URL) >= self._batch_size:
self._flush_URL(spider)
process_spider_output()處理Item和Request。我們只需要Request,其它就不考慮了。如果查看CrawlSpider的源代碼,我們看到將Request/Response映射到Rule的方式是用一個meta dict中的名為“rule”的整數(shù)字段。我們檢查這個數(shù)字,如果它指向我們想要的Rule(DISTRIBUTED_TARGET_RULE設(shè)置),我們調(diào)用_add_to_batch(),將它的URL添加到這個批次里面。我們?nèi)缓笕∠@個Request。我們接著產(chǎn)生出其他的請求,例如下一頁的鏈接,不進(jìn)行改動。The _add_to_batch()方法起到去重的作用。但是,我們前面描述的碎片化過程,意味著有的URL可能要提取兩次。我們使用_seen set檢測并去除重復(fù)項(xiàng)。然后將這些URL添加到_URL列表,如果它的大小超過了_batch_size(根據(jù)DISTRIBUTED_BATCH_SIZE設(shè)置),就會調(diào)用_flush_URL()。這個方法提供了一下功能:
def __init__(self, crawler):
...
self._targets = settings.get("DISTRIBUTED_TARGET_HOSTS")
self._batch = 1
self._project = settings.get('BOT_NAME')
self._feed_uri = settings.get('DISTRIBUTED_TARGET_FEED_URL', None)
self._scrapyd_submits_to_wait = []
def _flush_URL(self, spider):
if not self._URL:
return
target = self._targets[(self._batch-1) % len(self._targets)]
data = [
("project", self._project),
("spider", spider.name),
("setting", "FEED_URI=%s" % self._feed_uri),
("batch", str(self._batch)),
]
json_URL = json.dumps(self._URL)
data.append(("setting", "DISTRIBUTED_START_URL=%s" % json_URL))
d = treq.post("http://%s/schedule.json" % target,
data=data, timeout=5, persistent=False)
self._scrapyd_submits_to_wait.append(d)
self._URL = []
self._batch += 1
首先,它使用了批次計(jì)數(shù)器(_batch)來決定發(fā)向哪個scrapyd服務(wù)器??捎梅?wù)器保存在_targets(見DISTRIBUTED_TARGET_HOSTS設(shè)置)。我們?nèi)缓笙騭crapyd的schedule.json做一個POST請求。這比之前用過的curl方法高級,因?yàn)樗鼈鬟f了經(jīng)過仔細(xì)選擇的參數(shù)?;谶@些常熟,scrapyd就規(guī)劃了一次抓取,如下所示:
scrapy crawl distr \
-s DISTRIBUTED_START_URL='[".../property_000000.html", ... ]' \
-s FEED_URI='ftp://anonymous@spark/%(batch)s_%(name)s_%(time)s.jl' \
-a batch=1
除了項(xiàng)目和爬蟲的名字,我們想爬蟲傳遞了一個FEED_URI設(shè)置。它的值是從DISTRIBUTED_TARGET_FEED_URL得到的。
因?yàn)镾crapy支持FTP,我們可以讓scrapyds用一個匿名FTP將抓取的Item文件上傳到Spark服務(wù)器。它的格式包括爬蟲的名字(%(name)s和時(shí)間(%(time)s)。如果只有這兩項(xiàng)的話,那么同一時(shí)間創(chuàng)建出來的兩個文件就會有沖突。為了避免覆蓋,我們加入一個參數(shù)%(batch)。Scrapy默認(rèn)是不知道批次的,所以我們必須給設(shè)定一個值。scrapyd的schedule.json API的特點(diǎn)之一是,每個不是設(shè)置的參數(shù)或已知的參數(shù)都被傳遞給了爬蟲。默認(rèn)時(shí),爬蟲的參數(shù)成為了爬蟲的屬性,然后在爬蟲的屬性中尋找未知的FEED_URI參數(shù)。因此,將一批參數(shù)傳遞給schedule.json,我們就可以在FEED_URI中使用它,以避免沖突。
最后是將DISTRIBUTED_START_URL和這一批次的子頁URL編譯為JSON,因?yàn)镴SON是最簡潔的文本格式。
筆記:用命令行將大量數(shù)據(jù)傳遞到Scrapy并不可取。如果你想將參數(shù)存儲到數(shù)據(jù)庫(例如Redis),只傳遞給Scrapy一個ID。這么做的話,需要小幅修改_flush_URL()和process_start_requests()。
我們用treq.post()來做POST請求。Scrapyd處理持續(xù)連接并不好,因此我們用persistent=False取消它。我們還設(shè)置了一個5秒的暫停。這個請求的的延遲項(xiàng)被保存在_scrapyd_submits_to_wait列表。要關(guān)閉這個函數(shù),我們重置_URL列表,并加大當(dāng)前的_batch。
奇怪的是,關(guān)閉操作中會出現(xiàn)許多方法,如下所示:
def __init__(self, crawler):
...
crawler.signals.connect(self._closed, signal=signals.spider_
closed)
@defer.inlineCallbacks
def _closed(self, spider, reason, signal, sender):
# Submit any remaining URL
self._flush_URL(spider)
yield defer.DeferredList(self._scrapyd_submits_to_wait)
調(diào)用_closed()可能是因?yàn)槲覀儼聪铝薈trl + C或因?yàn)樽ト〗Y(jié)束。兩種情況下,我們不想失去任何最后批次的還未發(fā)送的URL。這就是為什么在_closed()中,第一件事是調(diào)用_flush_URL(spider)加載最后的批次。第二個問題是,因?yàn)槭欠亲枞?,停止抓取時(shí),treq.post()可能結(jié)束也可能沒結(jié)束。為了避免丟失最后批次,我們要使用前面提到過的scrapyd_submits_to_wait列表,它包括所有的treq.post()延遲項(xiàng)。我們使用defer.DeferredList()等待,直到全部完成。因?yàn)開closed()使用了@defer.inlineCallbacks,當(dāng)所有請求完成時(shí),我們只yield它并繼續(xù)。
總結(jié)一下,DISTRIBUTED_START_URL設(shè)置中的批次URL會被發(fā)送到scrapyds,scrapyds上面運(yùn)行著相同的爬蟲。很明顯,我們需要使用這個設(shè)置以啟動start_URL。
從settings啟動URL
中間件還提供了一個process_start_requests()方法,使用它可以處理爬蟲提供的start_requests。檢測是否設(shè)定了DISTRIBUTED_START_URL,設(shè)定了的話,用JSON解碼,并使用它的URL產(chǎn)生相關(guān)的請求。對于這些請求,我們設(shè)定CrawlSpider的_response_downloaded()方法作為回調(diào)函數(shù),再設(shè)定參數(shù)meta['rule'],以讓恰當(dāng)?shù)腞ule處理響應(yīng)。我們查看Scrapy的源碼,找到CrawlSpider創(chuàng)建請求的方法,并依法而做:
def __init__(self, crawler):
...
self._start_URL = settings.get('DISTRIBUTED_START_URL', None)
self.is_worker = self._start_URL is not None
def process_start_requests(self, start_requests, spider):
if not self.is_worker:
for x in start_requests:
yield x
else:
for url in json.loads(self._start_URL):
yield Request(url, spider._response_downloaded,
meta={'rule': self._target})
中間件就準(zhǔn)備好了。我們在settings.py進(jìn)行設(shè)置以啟動它:
SPIDER_MIDDLEWARES = {
'properties.middlewares.Distributed': 100,
}
DISTRIBUTED_TARGET_RULE = 1
DISTRIBUTED_BATCH_SIZE = 2000
DISTRIBUTED_TARGET_FEED_URL = ("ftp://anonymous@spark/"
"%(batch)s_%(name)s_%(time)s.jl")
DISTRIBUTED_TARGET_HOSTS = [
"scrapyd1:6800",
"scrapyd2:6800",
"scrapyd3:6800",
]
有人可能認(rèn)為DISTRIBUTED_TARGET_RULE不應(yīng)該作為設(shè)置,因?yàn)樗鼤古老x差異化。你可以認(rèn)為這是個默認(rèn)值,你可以在你的爬蟲中使用屬性custom_settings覆蓋它,例如:
custom_settings = {
'DISTRIBUTED_TARGET_RULE': 3
}
我們的例子并不需要這么做。我們可以做一個測試運(yùn)行,只抓取一個頁面:
$ scrapy crawl distr -s \
DISTRIBUTED_START_URL='["http://web:9312/properties/property_000000.html"]'
這個成功之后,我們進(jìn)一步,抓取一個頁面之后,用FTP將它傳送到Spark服務(wù)器:
scrapy crawl distr -s \
DISTRIBUTED_START_URL='["http://web:9312/properties/property_000000.html"]' \
-s FEED_URI='ftp://anonymous@spark/%(batch)s_%(name)s_%(time)s.jl' -a batch=12
用ssh連接Spark服務(wù)器,你可以看到一個文件,例如/root/items下的12_distr_date_time.jl。
這個中間件的例子可以讓你完成scrapyd的分布式抓取。你可以將它當(dāng)做起點(diǎn),進(jìn)行改造。你可能要做如下修改:
- 爬蟲的類型。除了CrawlSpider,你必須讓爬蟲用恰當(dāng)?shù)膍eta標(biāo)記分布式的請求,用慣用命名法執(zhí)行調(diào)回。
- 向scrapyds傳遞URL的方式。你可能想限定域名,減少傳遞的量。例如,你只想傳遞IDs。
- 你可以用分布式排隊(duì)方案,讓爬蟲可以從失敗恢復(fù),讓scrapyds執(zhí)行更多的URL批次。
- 你可以動態(tài)擴(kuò)展服務(wù)器的規(guī)模,以適應(yīng)需求。
將項(xiàng)目部署到scrapyd服務(wù)器
為了將爬蟲附屬到三臺scrapyd服務(wù)器上,我們必須將它們添加到scrapy.cfg文件。文件上的每個[deploy:target-name]定義了一個新的部署目標(biāo):
$ pwd
/root/book/ch11/properties
$ cat scrapy.cfg
...
[deploy:scrapyd1]
url = http://scrapyd1:6800/
[deploy:scrapyd2]
url = http://scrapyd2:6800/
[deploy:scrapyd3]
url = http://scrapyd3:6800/
你可以用scrapyd-deploy -l查詢可用的服務(wù)器:
$ scrapyd-deploy -l
scrapyd1 http://scrapyd1:6800/
scrapyd2 http://scrapyd2:6800/
scrapyd3 http://scrapyd3:6800/
用scrapyd-deploy <target name>進(jìn)行部署:
$ scrapyd-deploy scrapyd1
Packing version 1449991257
Deploying to project "properties" in http://scrapyd1:6800/addversion.json
Server response (200):
{"status": "ok", "project": "properties", "version": "1449991257",
"spiders": 2, "node_name": "scrapyd1"}
這個過程會產(chǎn)生一些新的目錄和文件(build、project.egg-info、setup.py),可以刪掉。其實(shí),scrapyd-deploy做的就是打包你的項(xiàng)目,并用addversion.json,傳遞到目標(biāo)服務(wù)器上。
之后,如果我們用scrapyd-deploy –L查詢服務(wù)器,我們可以確認(rèn)項(xiàng)目被成功部署了:
$ scrapyd-deploy -L scrapyd1
properties
我還用touch在項(xiàng)目的目錄創(chuàng)建了三個空文件夾,scrapyd1-3。這樣可以將scrapyd的名字傳遞給下面的文件,同時(shí)也是服務(wù)器的名字。然后可以用bash loop將其部署服務(wù)器: for i in scrapyd*; do scrapyd-deploy $i; done。
創(chuàng)建自定義監(jiān)視命令
如果你想在多臺scrapyd服務(wù)器上監(jiān)視抓取的進(jìn)程,你必須親自編寫程序。這是一個練習(xí)所學(xué)知識的好機(jī)會,寫一個原生的Scrapy命令,scrapy monitor,用它監(jiān)視一組scrapyd服務(wù)器。文件命名為monitor.py,在settings.py中添加COMMANDS_MODULE = 'properties.monitor'??焖俨榭磗crapyd的文檔,listjobs.json API給我們提供了關(guān)于任務(wù)的信息。如果我們想找到給定目標(biāo)的根URL,我們可以斷定,它只能是在scrapyd-deploy的代碼中。如果查看https://github.com/scrapy/scrapyd-client/blob/master/scrapyd-client/scrapyd-deploy,我們可以發(fā)現(xiàn)一個_get_targets()函數(shù)(執(zhí)行它不會添加許多值,所以略去了),它可以給出目標(biāo)的名字和根URL。我們現(xiàn)在就可以執(zhí)行命令的第一部分了,如下所示:
class Command(ScrapyCommand):
requires_project = True
def run(self, args, opts):
self._to_monitor = {}
for name, target in self._get_targets().iteritems():
if name in args:
project = self.settings.get('BOT_NAME')
url = target['url'] + "listjobs.json?project=" + project
self._to_monitor[name] = url
l = task.LoopingCall(self._monitor)
l.start(5) # call every 5 seconds
reactor.run()
這段代碼將名字和想要監(jiān)視的API的終點(diǎn)提交給dict _to_monitor。我們?nèi)缓笫褂胻ask.LoopingCall()規(guī)劃向_monitor()方法發(fā)起遞歸調(diào)用。_monitor()使用treq和deferred,我們使用@defer.inlineCallbacks對它進(jìn)行簡化。方法如下(省略了一些錯誤處理和代碼美化):
@defer.inlineCallbacks
def _monitor(self):
all_deferreds = []
for name, url in self._to_monitor.iteritems():
d = treq.get(url, timeout=5, persistent=False)
d.addBoth(lambda resp, name: (name, resp), name)
all_deferreds.append(d)
all_resp = yield defer.DeferredList(all_deferreds)
for (success, (name, resp)) in all_resp:
json_resp = yield resp.json()
print "%-20s running: %d, finished: %d, pending: %d" %
(name, len(json_resp['running']),
len(json_resp['finished']), len(json_resp['pending']))
這幾行代碼包括了目前我們學(xué)過的所有Twisted方法。我們使用treq調(diào)用scrapyd的API和defer.DeferredList,立即處理所有的響應(yīng)。當(dāng)all_resp有了所有結(jié)果之后,我們重復(fù)這個過程,取回它們的JSON對象。treq Response'json()方法返回延遲項(xiàng),而不是實(shí)際值,以與后續(xù)的實(shí)際值繼續(xù)任務(wù)。我們最后打印出結(jié)果。JSON響應(yīng)的列表信息包括懸掛、運(yùn)行中、結(jié)束的任務(wù),我們打印出它的長度。
用Apache Spark streaming計(jì)算偏移值
我們的Scrapy系統(tǒng)現(xiàn)在就功能完備了。讓我們來看看Apache Spark的使用。

讓我來看如何執(zhí)行。請記住這不是Scrapy代碼,所以看起來會覺得陌生,但是是可以看懂的。你可以在 boostwords.py文件找到這個應(yīng)用,這個文件包括了復(fù)雜的測試代碼,可以忽略。它的主要代碼如下:
# Monitor the files and give us a DStream of term-price pairs
raw_data = raw_data = ssc.textFileStream(args[1])
word_prices = preprocess(raw_data)
# Update the counters using Spark's updateStateByKey
running_word_prices = word_prices.updateStateByKey(update_state_
function)
# Calculate shifts out of the counters
shifts = running_word_prices.transform(to_shifts)
# Print the results
shifts.foreachRDD(print_shifts)
Spark使用DStream代表數(shù)據(jù)流。textFileStream()方法監(jiān)督文件系統(tǒng)的一個目錄,當(dāng)檢測到新文件時(shí),就傳出來。我們的preprocess()函數(shù)將它們轉(zhuǎn)化為term/price對。我們用update_state_function()函數(shù)和Spark的updateStateByKey()方法累加這些term/price對。我們最后通過運(yùn)行to_shifts()計(jì)算偏移值,并用print_shifts()函數(shù)打印出極值。大多我們的函數(shù)修改不大,只是高效重塑了數(shù)例據(jù)。例外的是shifts()函數(shù):
def to_shifts(word_prices):
(sum0, cnt0) = word_prices.values().reduce(add_tuples)
avg0 = sum0 / cnt0
def calculate_shift((isum, icnt)):
avg_with = isum / icnt
avg_without = (sum0 - isum) / (cnt0 - icnt)
return (avg_with - avg_without) / avg0
return word_prices.mapValues(calculate_shift)
這段代碼完全是按照公式做的。盡管很簡單,Spark的mapValues()可以讓calculate_shift在Spark服務(wù)器上用最小開銷高效運(yùn)行。
進(jìn)行分布式抓取
我進(jìn)行四臺終端進(jìn)行抓取。我想讓這部分盡量獨(dú)立,所以我還提供了vagrant ssh命令,可以在終端使用。

用終端1來檢測集群的CPU和內(nèi)存的使用。這可以確認(rèn)和修復(fù)問題。設(shè)置方法如下:
$ alias provider_id="vagrant global-status --prune | grep 'docker-
provider' | awk '{print \$1}'"
$ vagrant ssh $(provider_id)
$ docker ps --format "{{.Names}}" | xargs docker stats
前兩行可以讓我們用ssh打開docker provider VM。如果沒有使用VM,只在docker Linux運(yùn)行,我們只需要最后一行。
終端2用作診斷,如下運(yùn)行 scrapy monitor:
$ vagrant ssh
$ cd book/ch11/properties
$ scrapy monitor scrapyd*
使用scrapyd*和空文件夾,空文件夾名字是scrapy monitor,這會擴(kuò)展到scrapy monitor scrapyd1 scrapyd2 scrapyd3。
終端3,是我們啟動抓取的終端。除此之外,它基本是閑置的。開始一個新的抓取,我們操作如下:
$ vagrant ssh
$ cd book/ch11/properties
$ for i in scrapyd*; do scrapyd-deploy $i; done
$ scrapy crawl distr
最后兩行很重要。首先,我們使用一個for循環(huán)和scrapyd-deploy,將爬蟲部署到服務(wù)器上。然后我們用scrapy crawl distr開始抓取。我們隨時(shí)可以運(yùn)行小的抓取,例如,scrapy crawl distr -s CLOSESPIDER_PAGECOUNT=100,來抓取100個索引頁,它會產(chǎn)生大概3000個子頁。
終端4用來連接Spark服務(wù)器,我們用它進(jìn)行實(shí)時(shí)分析:
$ vagrant ssh spark
$ pwd
/root
$ ls
book items
$ spark-submit book/ch11/boostwords.py items
只有最后一行重要,它運(yùn)行了boostwords.py,將本地items目錄傳給了監(jiān)視器。有時(shí),我還使用watch ls -1 items來監(jiān)視item文件。
到底哪個詞對價(jià)格的影響最大呢?這個問題留給讀者。
系統(tǒng)性能
系統(tǒng)的性能極大地依賴于硬件、CPU的數(shù)量、虛擬機(jī)分配內(nèi)存的大小。在真實(shí)情況下,我們可以進(jìn)行水平擴(kuò)展,使抓取提速。
理論最大吞吐量是3臺服務(wù)器4個CPU16并發(fā)數(shù)*4頁/秒=768頁/秒。
實(shí)際中,使用分配了4G內(nèi)存、8CPU的虛擬機(jī)的Macbook Pro,2分40秒內(nèi)下載了50000條URL,即315頁/秒。在一臺亞馬遜EC2 m4.large,它有2個vCPUs、8G內(nèi)存,因?yàn)镃PU頻率低,用時(shí)6分12秒,即134頁/秒。在一臺臺亞馬遜EC2 m4.4xlarge,它有16個vCPUs、64G內(nèi)存,用時(shí)1分44秒,即480頁/秒。在同一臺機(jī)器上,我將scrapyd的數(shù)量提高到6(修改Vagrantfile、scrapy.cfg和settings.py),用時(shí)1分15秒,即667頁/秒。在最后的例子中,網(wǎng)絡(luò)服務(wù)器似乎是瓶頸。
實(shí)際和理論計(jì)算存在差距是合理的。我們的粗略計(jì)算中沒有考慮許多小延遲。盡管我們聲明了每個頁有250ms的延遲,我們在前幾章已經(jīng)看到,實(shí)際延遲要更高,這是因?yàn)槲覀冞€有額外的Twisted和操作系統(tǒng)延遲。還有開發(fā)機(jī)向scrapyds傳遞URL的時(shí)間,F(xiàn)TP向Spark傳遞Items的時(shí)間,還有scrapyd發(fā)現(xiàn)新文件和規(guī)劃任務(wù)的時(shí)間(平均要2.5秒,根據(jù)scrapyd的poll_interval設(shè)置)。還沒計(jì)算開發(fā)機(jī)和scrapyd的啟動時(shí)間。如果不能確定可以提高吞吐量的話,我是不會試圖改進(jìn)這些延遲的。我的下一步是擴(kuò)大抓取的規(guī)模,比如500000個頁面、網(wǎng)絡(luò)服務(wù)器的負(fù)載均衡,在擴(kuò)大的過程中發(fā)現(xiàn)新的挑戰(zhàn)。
要點(diǎn)
本章的要點(diǎn)是,如果要進(jìn)行分布式抓取,一定要使用大小合適的批次。
取決于源網(wǎng)站的響應(yīng)速度,你可能有數(shù)百、數(shù)千、上萬個URL。你希望它們越大越好(在幾分鐘的水平),這樣就可以分?jǐn)倖拥馁M(fèi)用。另一方面,你也不希望它們太大,以免造成機(jī)器故障。在一個有容錯的分布式系統(tǒng)中,你需要重試失敗的批次,而且重試不要浪費(fèi)太多時(shí)間。
總結(jié)
希望你能喜歡這本關(guān)于Scrapy的書?,F(xiàn)在你對Scrapy應(yīng)該已經(jīng)有深入的了解了,并可以解決簡單或復(fù)雜的問題了。你還學(xué)到了Scrapy復(fù)雜的結(jié)構(gòu),以及如何發(fā)揮出它的最大性能。通過抓取,你可以在應(yīng)用中使用龐大的數(shù)據(jù)資源。我們已經(jīng)看到了如何在移動應(yīng)用中使用Scrapy抓取的數(shù)據(jù)并進(jìn)行分析。希望你能用Scrapy做出更多強(qiáng)大的應(yīng)用,為世界做出貢獻(xiàn)。祝你好運(yùn)!
序言
第1章 Scrapy介紹
第2章 理解HTML和XPath
第3章 爬蟲基礎(chǔ)
第4章 從Scrapy到移動應(yīng)用
第5章 快速構(gòu)建爬蟲
第6章 Scrapinghub部署
第7章 配置和管理
第8章 Scrapy編程
第9章 使用Pipeline
第10章 理解Scrapy的性能
第11章(完) Scrapyd分布式抓取和實(shí)時(shí)分析