大家好,我是劍南。
本篇文章,為大家?guī)淼氖莙ueue模塊的詳解!
初識queue模塊
queue模塊實現(xiàn)了多生產(chǎn)者、多消費者隊列。這特別適用于消息必須安全地在多線程交換的線程編程。模塊中的Queue類實現(xiàn)了所需要的鎖定語義。
該模塊實現(xiàn)了三種類型的隊列,它們的區(qū)別是任務(wù)取回的順序。在FIFO隊列中,先添加任務(wù)的先取回。在LIFO隊列中,最后添加的任務(wù)先取回(該操作類似于堆棧)。在優(yōu)先級隊列中,條目將保持排序(使用heapq模塊)并且最小值的任務(wù)第一個返回。
創(chuàng)建“隊列”對象
import queue
q = queue.Queue(maxsize=5)
maxsize是一個整數(shù),用于設(shè)置可以放入隊列中的任務(wù)數(shù)的上限,當(dāng)達(dá)到這個大小的時候,插入操作將阻塞至隊列中的任務(wù)被消除掉。如果maxsize小于等于0,任務(wù)數(shù)量為無限大。
隊列添加數(shù)據(jù)
q.put(1)
q.put(2)
q.put(3)
q.put(4)
q.put(5)
print(q.full())
運行結(jié)果為:True。
Queue.full():表示當(dāng)隊列任務(wù)已滿時,返回的結(jié)果為True。如果full()返回True不保證后續(xù)調(diào)用get()不被阻塞,同樣的道理,如果full()返回False也不保證后續(xù)調(diào)用put()不被阻塞。
Queue.put(item, block=True, timeout=None):將Item放入隊列,如果可選參數(shù)block是True并且timeout是None,則在必要時阻塞至有空閑插槽可用,如果timeout是正數(shù),將最多阻塞timeout秒,如果這段時間沒有可用的空閑插槽,則引發(fā)full異常。反之block為False,如果插槽空閑,則立即使用,把item放入隊列,否則引發(fā)Full異常。
判斷隊列是否為空
Queue.empty():如果隊列為空,則返回True,否則返回False。如果empty()返回True,不保證后續(xù)調(diào)用put()會被阻塞。類似的,如果empty()返回False,也不保證后續(xù)調(diào)用get()會被阻塞。
獲取隊列的大小
Queue.qsize():返回隊列的大小。注意qsize>0不保證后續(xù)的get()有可能被阻塞,qsize<maxsize也不保證put()有可能被阻塞。
獲取隊列中數(shù)據(jù)
Queue.get(block=True, timeout=None):從對列中移除并返回一個數(shù)據(jù)。當(dāng)隊列為空值,將一直等待。
其他的Queue對象
Queue.task_done():表示前面的排隊任務(wù)已經(jīng)完成,被隊列的消費者線程使用。每個get()被用于獲取一個任務(wù),后續(xù)調(diào)用task_done()告訴隊列,該任務(wù)的處理已經(jīng)完成。如果join()當(dāng)前正在阻塞,在所有條目都被處理后,將解除阻塞(意味著每個put()進(jìn)隊列的條目task_done()都被收到)。
Queue.join():阻塞至對列的所有數(shù)據(jù)都被接收和處理完畢。當(dāng)數(shù)據(jù)被添加到隊列時,未完成的任務(wù)的計數(shù)就會增加。每當(dāng)消費者線程調(diào)用task_done()表示這個條目已經(jīng)被收回,未完成的計數(shù)就會減少,當(dāng)完成計數(shù)降到0的時候,阻塞就會解除。
簡單示例
下面的例子要展示的是,我們應(yīng)該如何使用代碼將等待的任務(wù)完成。
具體代碼,如下所示:
import threading, queue
q = queue.Queue()
def worker():
while True:
item = q.get()
print(f'Working on {item}')
print(f'Finished {item}')
q.task_done()
# turn-on the worker thread
threading.Thread(target=worker, daemon=True).start()
# send thirty task requests to the worker
for item in range(30):
q.put(item)
print('All task requests sent\n', end='')
# block until all tasks are done
q.join()
print('All work completed')
實戰(zhàn)-豆瓣短評
對于本次實戰(zhàn),我采用的網(wǎng)站是豆瓣電影。小伙伴們可以自己去找一部電影,爬取里面的短評。
這次,我爬取的電影是《我不是藥神》的短評,采用的便是隊列的技術(shù)。
爬取思路

在上圖中,框出來的數(shù)據(jù),就是我要獲取的數(shù)據(jù),并下得到的數(shù)據(jù)保存到csv文件中。
之所以選擇上面的圖片,其實是有原因的,不知道你發(fā)現(xiàn)沒有,在上面的圖片中第一條評論是沒有給評價的,因此,當(dāng)我們按照相同的規(guī)則去獲取數(shù)據(jù)時,便容易出現(xiàn)異常。
其次,短評的數(shù)據(jù)量一共有52萬條,每頁20條,并且只能獲取到前25頁的數(shù)據(jù)。再加上,如果沒有給出評價的用戶,我直接過濾,因此,最后獲取下來的數(shù)據(jù)應(yīng)該是不足500條的。
再這里,要做的事情就是要完成翻頁的操作。
在本次編碼中,我的思路是采用兩個線程與兩個隊列來完成。一個線程用于獲取數(shù)據(jù),一個線程用于保存數(shù)據(jù);其中的一個隊列用于保存25頁的URL地址,另一個隊列用于保存獲取的數(shù)據(jù)。
獲取數(shù)據(jù)
主線程
首先在主線程中,創(chuàng)建兩個隊列,并將URL添加進(jìn)保存URL的隊列中。
具體代碼,如下所示:
def main():
p_queue = Queue() # 保存URL
d_queue = Queue() # 保存數(shù)據(jù)
for page in range(25):
url = f'https://movie.douban.com/subject/26752088/comments?start={page*20}&limit=20&status=P&sort=new_score'
p_queue.put(url)
獲取數(shù)據(jù)
先說一下前提,這里我才用的解析庫是lxml,因此,小伙伴們需要自行熟悉xpath語法。
這里創(chuàng)建一個獲取數(shù)據(jù)的類,這個類繼承thread,方便接下來開啟線程。
具體代碼,如下所示:
class GetData(threading.Thread):
def __init__(self, page_queue, data_queue):
super(GetData, self).__init__()
self.page_queue = page_queue
self.data_queue = data_queue
self.headers = {
'User-Agent': 你的user-agent,
'Cookie': '你的cookie'
}
def run(self):
while True:
if self.data_queue.empty() and self.page_queue.empty():
break
url = self.page_queue.get()
self.parse_page(url)
def parse_page(self, url):
html = etree.HTML(requests.get(url, headers=self.headers).content.decode('utf-8'))
comment_items = html.xpath('//div[@class="comment-item "]')
for comment_item in comment_items:
try:
user = comment_item.xpath('.//span[2]/a/text()')[0]
comment_time = comment_item.xpath('.//span[2]//span[3]/@title')[0]
star = comment_item.xpath('.//span[2]//span[2]/@title')[0]
content = comment_item.xpath('.//span[@class="short"]/text()')[0]
self.data_queue.put((user, comment_time, star, content))
except:
continue
保存數(shù)據(jù)
同樣的,這里創(chuàng)建一個保存數(shù)據(jù)的類,這個類也是繼承thread,也是方便開啟線程。
具體代碼,如下所示:
class SaveData(threading.Thread):
def __init__(self, page_queue, data_queue):
super(SaveData, self).__init__()
self.data_queue = data_queue
self.page_queue =page_queue
def run(self):
with open('data.csv', 'w', newline='', encoding='utf-8-sig') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(['user', 'comment_time', 'star', 'content'])
while True:
if self.data_queue.empty() and self.page_queue.empty():
break
user, comment_time, star, content = self.data_queue.get()
print(self.data_queue.get())
with open('data.csv', 'a', newline='', encoding='utf-8-sig') as csvfile:
writer = csv.writer(csvfile)
writer.writerow([user, comment_time, star, content])
開啟多線程
多線程實在主線程中開啟,具體代碼,如下所示:
def main():
p_queue = Queue()
d_queue = Queue()
for page in range(25):
url = f'https://movie.douban.com/subject/26752088/comments?start={page*20}&limit=20&status=P&sort=new_score'
p_queue.put(url)
for x in range(5):
t1 = GetData(p_queue, d_queue)
# t1.daemon = True
t1.start()
t2 = SaveData(p_queue, d_queue)
# t2.daemon = True
t2.start()
數(shù)據(jù)展示

不到4秒鐘,便將短評數(shù)據(jù)都抓取下來了,多線程的效率是不是要比單線程要高很多呀!
最后
在本次的分享中,大家要熟悉與了解queue的使用方法,在后期分享中經(jīng)常要用到,希望小伙伴們能夠掌握。
我是劍南,如果文章給到了你幫助,請你點個【贊】與【再看】。
文章的每一個字都是我用心敲出來的,點個【再看】,讓我知道,你也是陪著我一起努力的人。