import datetime
import json
import warnings
from jsonpath import jsonpath
from sqlalchemy.exc import ResourceClosedError
warnings.filterwarnings('ignore')
import random
import time
import requests
import pandas as pd
import records
from urllib import parse
import traceback
from pymysql.converters import escape_string
from sqlalchemy import create_engine
from fake_useragent import UserAgent
from concurrent.futures import ThreadPoolExecutor, as_completed
from collections import deque
# from my_proxies import proxy
"""
1.到douyin_account_all表 獲取main_page字段內(nèi)容
2.根據(jù)main_page 內(nèi)容拼接url
3.發(fā)請(qǐng)求
4.解析數(shù)據(jù)
5.存儲(chǔ)數(shù)據(jù)
"""
proxy = [
{'http://': 'http://192.168.241.62:9976',
'https://': 'https://192.168.241.62:9976'},
]
class CrawlAima:
def __init__(self):
self.ua = UserAgent()
self.db = records.Database(
f"mysql+pymysql://user:{parse.quote_plus('abc')}@0.0.0.0:3306/db?charset=utf8mb4")
self.conn = create_engine(self.db.db_url)
self.dq = deque()
self.counter = {} # sec_id 記錄失敗次數(shù)
self.url = 'https://www.iesdouyin.com/post/'
self.get_time_by_vid_list = []
self.video_sql = self.get_video_sql()
self.vid_dict = self.get_all_video()
# 獲取所有main_page
# self.get_main_page()
def get_all_video(self):
vid_dict = self.db.query("select video_id,post_time from douyin_video_extend_main").as_dict()
return {i['video_id']: str(i['post_time']) for i in vid_dict}
def get_video_sql(self):
video_sql = """
INSERT INTO douyin_video_extend_main (
video_id,
userid,
comment_num,
like_num,
collect_num,
share_num,
url,
content,
cover_pic,
post_time
)
VALUES
(
:video_id,
:userid,
:comment_num,
:like_num,
:collect_num,
:share_num,
:url,
:content,
:cover_pic,
:post_time
)
ON DUPLICATE KEY UPDATE
video_id = :video_id,
userid = :userid,
comment_num = :comment_num,
like_num = :like_num,
collect_num = :collect_num,
share_num = :share_num,
url = :url,
content = :content,
cover_pic = :cover_pic
"""
return video_sql
def get_main_page(self):
main_page = self.db.query("select userid, main_page,open_id from douyin_account_all;")
df = main_page.export("df")
df = df.dropna(subset=['main_page'])
self.main_page_list = []
if not df.empty:
df.main_page = df.main_page.map(lambda x: x.rsplit('/', 1)[-1])
self.main_page_list = df.main_page.to_list()
self.main_page_dict = {v: k+1 for k, v in enumerate(self.main_page_list)}
# 獲取用戶userid
self.userid_list = df.userid.to_list()
self.userid_dict = {str(u): 1 for u in self.userid_list}
# {userid:open_id}
df['userid'] = df['userid'].astype('str')
self.uid_oid_map = dict(zip(df['userid'].to_list(),df['open_id'].to_list()))
def counter_sec_uid(self,sec_uid):
# sec_uid 記錄次數(shù)
if self.counter.get(sec_uid):
self.counter[sec_uid] += 1
else:
self.counter[sec_uid] = 1
def crawl_handler(self, sec_uid, max_cursor=0):
params = {
"sec_uid": sec_uid,
"max_cursor": max_cursor,
"count": 21,
"key": "188"
}
headers = {
"user-agent": self.ua.random,
"Connection": "keep-alive",
# "Host": "api.batmkey.cn:8000",
# "Upgrade-Insecure-Requests": "1",
}
try:
response = requests.get(self.url,
headers=headers,
params=params,
timeout=(100, 100),
verify=False,
proxies=random.choice(proxy))#,proxies=random.choice(proxy)) # , proxies=proxies
if response.status_code == 200:
if response.text:
data = response.json()
if isinstance(data, dict) and isinstance(data.get("aweme_list"), list):
print(f"還剩:{len(self.main_page_dict) - self.main_page_dict[sec_uid]} {sec_uid} success")
# res = data.get("data",{}).get('aweme_list')
res = data.get('aweme_list')
if res:
self.data_process(res)
else:
# print('self.dq.appendleft',sec_uid)
self.counter_sec_uid(sec_uid) # 加到隊(duì)列
self.dq.appendleft(sec_uid) # count + 1
else:
if data.get('code') == 100 and data['msg'] == '沒(méi)有訪問(wèn)權(quán)限':
print("報(bào)錯(cuò):",data)
else:
self.counter_sec_uid(sec_uid) # 加到隊(duì)列
self.dq.appendleft(sec_uid) # count + 1
else:
self.dq.appendleft(sec_uid) # 加到隊(duì)列
self.counter_sec_uid(sec_uid) # count + 1
# print("服務(wù)器沒(méi)有返回?cái)?shù)據(jù)response=",response)
# time.sleep(2)
print(sec_uid,'沒(méi)有獲取到數(shù)據(jù)')
else:
self.dq.appendleft(sec_uid) # 加到隊(duì)列
self.counter_sec_uid(sec_uid) # count + 1
# print("狀態(tài)碼:",response.status_code)
# time.sleep(0.5)
except Exception as e:
# print('Exception>>>>self.dq.appendleft', sec_uid)
self.dq.appendleft(sec_uid)
self.counter_sec_uid(sec_uid)
# traceback.print_exc()
print(f"還剩:{len(self.main_page_dict) - self.main_page_dict[sec_uid]} {sec_uid} fail")
if 'timed out' not in str(e):
traceback.print_exc()
print(sec_uid, '報(bào)錯(cuò):',e,'\n')
time.sleep(random.choice([1, 0.8]))
def get_header(self, vids):
headers = {
'authority': 'www.douyin.com',
'accept': 'application/json',
'accept-language': 'zh-CN,zh;q=0.9',
'cache-control': 'max-age=0',
'sec-ch-ua': '".Not/A)Brand";v="99", "Google Chrome";v="103", "Chromium";v="103"',
'path': f'web/api/v2/aweme/iteminfo/?item_ids={vids}',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-dest': 'document',
'sec-fetch-mode': 'navigate',
'sec-fetch-site': 'none',
'sec-fetch-user': '?1',
'upgrade-insecure-requests': '1',
'user-agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36',
}
return headers
def convert_time(self, t, vid):
get_post_time_url = "https://www.douyin.com/web/api/v2/aweme/iteminfo/?item_ids={}"
post_time = self.vid_dict.get(vid)
if post_time:
return post_time
else:
if t and t.rsplit('_')[-1].isdigit():
t = int(t.rsplit('_')[-1])
return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(t))
else:
res = requests.get(get_post_time_url.format(vid),
headers=self.get_header(vid),
verify=False) # proxies=random.choice(proxy)
if not res.text:
raise ValueError
item = res.json()
item = item['item_list'][0]
# print('item_list:', len(res['item_list']))
create_time = item["create_time"]
post_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(int(create_time)))
print(f"post_time:{post_time} vid:{vid}")
return post_time
def data_process(self, aweme_list):
# print(data.keys()) #['aweme_list', 'has_more', 'log_pb', 'max_cursor', 'request_item_cursor', 'status_code']
save_list = []
for item in aweme_list:
uid = item.get('author', {}).get('uid')
if self.userid_dict.get(str(uid)):
vid = item.get('statistics', {}).get('aweme_id', 0)
save_data = {
"userid": uid,
"cover_pic": item.get("video", {}).get("cover", {}).get("url_list", [''])[0],
"content": escape_string(item.get('desc', '')),
"video_id": vid,
"comment_num": item.get('statistics', {}).get('comment_count', 0),
"like_num": item.get('statistics', {}).get('digg_count', 0),
"share_num": item.get('statistics', {}).get('share_count', 0),
"play_num": item.get('statistics', {}).get('play_count', 0),
"collect_num": item.get('statistics', {}).get('collect_count', 0),
"post_time": self.convert_time(item.get("video", {}).get("dynamic_cover", {}).get('uri'), vid),
# "post_time": self.convert_time(item.get('create_time')),
"url": "https://www.douyin.com/video/" + str(item.get('statistics', {}).get('aweme_id', 0)),
}
userid = save_data['userid']
open_id = self.uid_oid_map.get(str(userid))
if open_id:
open_id = f"'{open_id}'"
else:
open_id = 'null'
save_data['open_id'] = open_id
save_list.append(save_data)
# save_data.pop('url')
# print('save_data:',save_data['post_time'],'video_id:',save_data['video_id'])
# save_list.append(save_data)
else:
print("用戶uid:", uid, '不存在--------------')
# 存main表 遷移到main_true.py
for i in save_list:
self.db.query(self.video_sql, **i)
def loop():
start_time = time.time()
print("開(kāi)始時(shí)間:", time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time)))
res = CrawlAima()
# 獲取所有page
res.get_main_page()
with ThreadPoolExecutor(max_workers=5) as t:
obj_list = []
for i in res.main_page_list:
obj = t.submit(res.crawl_handler, i)
obj_list.append(obj)
for future in as_completed(obj_list):
data = future.result()
# 添加到隊(duì)列
# for i in res.main_page_list:
# res.dq.appendleft(i)
crawl_fail_list = []
while len(res.dq):
page = res.dq.pop()
print(f"剩余總數(shù):{len(res.dq)} 當(dāng)前page:{page}, 第 {res.counter.get(page, 0)} 抓取")
# if res.counter.get(page, 0) > 3: # 請(qǐng)求次數(shù)超過(guò)10 睡3秒
# time.sleep(3)
if res.counter.get(page, 0) > 1:
print(page, "請(qǐng)求次數(shù)超過(guò)3次 放棄抓取", res.counter.get(page, 0))
crawl_fail_list.append(page)
continue
res.crawl_handler(page)
print('------------3次 抓取失敗----------------')
print('抓取失敗:',json.dumps(crawl_fail_list))
print('----------------------------')
end_time = time.time()
print('finish', int(end_time - start_time) / 60, ' min',
time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
if __name__ == '__main__':
# while 1:
loop()
# time.sleep(60*10)
#
線程池抓取
最后編輯于 :
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。
相關(guān)閱讀更多精彩內(nèi)容
- 用一段代碼測(cè)試線程、進(jìn)程、協(xié)程的抓取速度: 我的15款MacBook Pro跑出來(lái)的結(jié)果,辦公室的網(wǎng)一般:
- 說(shuō)明 線程池是多線程的處理機(jī)制,線程池一般用于需要大量線程完成任務(wù),并且完成時(shí)間較短時(shí)使用,大量用于并發(fā)框架和異步...
- 這幾天在寫(xiě)js腳本,突然想寫(xiě)一個(gè)抓取小說(shuō)的腳本,于是磕磕碰碰,慢慢寫(xiě)了一個(gè)比較完善的腳本,同時(shí)對(duì)于自身所學(xué)進(jìn)一步鞏...