繼上篇我們談?wù)摿薈elery的基本知識后,本篇繼續(xù)講解如何一步步使用Celery構(gòu)建分布式爬蟲。這次我們抓取的對象定為celery官方文檔。
首先,我們新建目錄distributedspider,然后再在其中新建文件workers.py,里面內(nèi)容如下
from celery import Celery
app = Celery('crawl_task', include=['tasks'], broker='redis://223.129.0.190:6379/1', backend='redis://223.129.0.190:6379/2')
# 官方推薦使用json作為消息序列化方式
app.conf.update(
CELERY_TIMEZONE='Asia/Shanghai',
CELERY_ENABLE_UTC=True,
CELERY_ACCEPT_CONTENT=['json'],
CELERY_TASK_SERIALIZER='json',
CELERY_RESULT_SERIALIZER='json',
)
上述代碼主要是做Celery實例的初始化工作,include是在初始化celery app的時候需要引入的內(nèi)容,主要就是注冊為網(wǎng)絡(luò)調(diào)用的函數(shù)所在的文件。然后我們再編寫任務(wù)函數(shù),新建文件tasks.py,內(nèi)容如下
import requests
from bs4 import BeautifulSoup
from workers import app
@app.task
def crawl(url):
print('正在抓取鏈接{}'.format(url))
resp_text = requests.get(url).text
soup = BeautifulSoup(resp_text, 'html.parser')
return soup.find('h1').text
它的作用很簡單,就是抓取指定的url,并且把標簽為h1的元素提取出來
最后,我們新建文件task_dispatcher.py,內(nèi)容如下
from workers import app
url_list = [
'http://docs.celeryproject.org/en/latest/getting-started/introduction.html',
'http://docs.celeryproject.org/en/latest/getting-started/brokers/index.html',
'http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html',
'http://docs.celeryproject.org/en/latest/getting-started/next-steps.html',
'http://docs.celeryproject.org/en/latest/getting-started/resources.html',
'http://docs.celeryproject.org/en/latest/userguide/application.html',
'http://docs.celeryproject.org/en/latest/userguide/tasks.html',
'http://docs.celeryproject.org/en/latest/userguide/canvas.html',
'http://docs.celeryproject.org/en/latest/userguide/workers.html',
'http://docs.celeryproject.org/en/latest/userguide/daemonizing.html',
'http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html'
]
def manage_crawl_task(urls):
for url in urls:
app.send_task('tasks.crawl', args=(url,))
if __name__ == '__main__':
manage_crawl_task(url_list)
這段代碼的作用主要就是給worker發(fā)送任務(wù),任務(wù)是tasks.crawl,參數(shù)是url(元祖的形式)
現(xiàn)在,讓我們在節(jié)點A(hostname為resolvewang的主機)上啟動worker
celery -A workers worker -c 2 -l info
這里 -c指定了線程數(shù)為2, -l表示日志等級是info。我們把代碼拷貝到節(jié)點B(節(jié)點名為wpm的主機),同樣以相同命令啟動worker,便可以看到以下輸出

可以看到左邊節(jié)點(A)先是all alone,表示只有一個節(jié)點;后來再節(jié)點B啟動后,它便和B同步了
sync with celery@wpm
這個時候,我們運行給這兩個worker節(jié)點發(fā)送抓取任務(wù)
python task_dispatcher.py
可以看到如下輸出

可以看到兩個節(jié)點都在執(zhí)行抓取任務(wù),并且它們的任務(wù)不會重復(fù)。我們再在redis里看看結(jié)果

可以看到一共有11條結(jié)果,說明 tasks.crawl中返回的數(shù)據(jù)都在db2(backend)中了,并且以json的形式存儲了起來,除了返回的結(jié)果,還有執(zhí)行是否成功等信息。
到此,我們就實現(xiàn)了一個很基礎(chǔ)的分布式網(wǎng)絡(luò)爬蟲,但是它還不具有很好的擴展性,而且貌似太簡單了...下一篇我將以微博數(shù)據(jù)采集為例來演示如何構(gòu)建一個穩(wěn)健的分布式網(wǎng)絡(luò)爬蟲。
對微博大規(guī)模數(shù)據(jù)采集感興趣的同學(xué)可以關(guān)注一下分布式微博爬蟲,用用也是極好的