tp5使用curl特性進(jìn)行定時(shí)多線程爬蟲(或任務(wù)),使用redis隊(duì)列

利用php(以及開源工具)實(shí)現(xiàn)爬蟲

流程說明

  1. 從數(shù)據(jù)庫或者循環(huán)構(gòu)建爬蟲的url(包括分頁參數(shù))**
  2. 分段取出使用多線程保存數(shù)據(jù)到redis**
  3. 啟用隊(duì)列把數(shù)據(jù)保存到數(shù)據(jù)庫**

開始

  1. 使用tp5.0的框架,安裝爬蟲擴(kuò)展 QueryList 4.0
composer require jaeger/querylist

GitHub地址
2. 安裝多線程curl擴(kuò)展CurlMulti 插件

composer require jaeger/querylist-curl-multi

GitHub地址
3. 如果需要運(yùn)行js腳本,安裝PhantomJS 插件

composer require jaeger/querylist-phantomjs

GitHub地址
4. 安裝tp5的隊(duì)列擴(kuò)展

composer require topthink/think-queue

GitHub地址
5. 安裝taskPHP擴(kuò)展使用php的cli模式(不超時(shí)也可定時(shí)執(zhí)行程序)

composer require taskphp/taskphp dev-master

GitHub地址

配置使用

queue配置(/extra/queue.php)

<?php
return [
    // 'connector'  => 'Database', // 數(shù)據(jù)庫驅(qū)動(dòng)
    // 'expire'     => null, // 任務(wù)的過期時(shí)間,默認(rèn)為60秒; 若要禁用,則設(shè)置為 null
    // 'default'    => 'default', // 默認(rèn)的隊(duì)列名稱
    // 'table'      => 'prefix_jobs', // 存儲(chǔ)消息的表名,不帶前綴
    // 'dsn'        => [],
    // --------------------
    'connector'  => 'Redis', // Redis 驅(qū)動(dòng)
    'expire'     => null, // 任務(wù)的過期時(shí)間,默認(rèn)為60秒; 若要禁用,則設(shè)置為 null
    'default'    => 'default2', // 默認(rèn)的隊(duì)列名稱
    'host'       => '127.0.0.1', // redis 主機(jī)ip
    'port'       => 6379, // redis 端口
    'password'   => '', // redis 密碼
    'select'     => 0, // 使用哪一個(gè) db,默認(rèn)為 db0
    'timeout'    => 0, // redis連接的超時(shí)時(shí)間
    'persistent' => false, // 是否是長連接s
];

taskphp配置

<?php
namespace app\index\controller;

use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\Output;

// 載入taskphp入口文件
require_once dirname(APP_PATH) . '/vendor/taskphp/taskphp/src/taskphp/base.php';

class Taskphp extends Command
{

    protected function get_config()
    {
        return [
            //任務(wù)列表
            'task_list' => [
                //key為任務(wù)名,多任務(wù)下名稱必須唯一
                // 'all' => [
                //     'callback'        => ['app\\index\\controller\\Demo', 'run'], //任務(wù)調(diào)用:類名和方法
                //     //指定任務(wù)進(jìn)程最大內(nèi)存  系統(tǒng)默認(rèn)為512M
                //     'worker_memory'   => '2048M',
                //     //開啟任務(wù)進(jìn)程的多線程模式
                //     'worker_pthreads' => false,
                //     //任務(wù)的進(jìn)程數(shù) 系統(tǒng)默認(rèn)1
                //     'worker_count'    => 1,
                //     //crontad格式 :秒 分 時(shí) 天 月 年 周 // 大概是6分鐘 執(zhí)行一個(gè)周期
                //     'crontab'         => '1 * * * * * *',
                // ],
                // 'bufen' => [
                //     'callback'        => ['app\\index\\controller\\Demo', 'run2'], //任務(wù)調(diào)用:類名和方法
                //     //指定任務(wù)進(jìn)程最大內(nèi)存  系統(tǒng)默認(rèn)為512M
                //     'worker_memory'   => '1024M',
                //     //開啟任務(wù)進(jìn)程的多線程模式
                //     'worker_pthreads' => false,
                //     //任務(wù)的進(jìn)程數(shù) 系統(tǒng)默認(rèn)1
                //     'worker_count'    => 1,
                //     //crontad格式 :秒 分 時(shí) 天 月 年 周 // 大概是6分鐘 執(zhí)行一個(gè)周期
                //     'crontab'         => '1 * * * * * *',
                // ],
                // 'bufenRedis' => [
                //     'callback'        => ['app\\index\\controller\\Demo', 'run3'], //任務(wù)調(diào)用:類名和方法
                //     //指定任務(wù)進(jìn)程最大內(nèi)存  系統(tǒng)默認(rèn)為512M
                //     'worker_memory'   => '1024M',
                //     //開啟任務(wù)進(jìn)程的多線程模式
                //     'worker_pthreads' => false,
                //     //任務(wù)的進(jìn)程數(shù) 系統(tǒng)默認(rèn)1
                //     'worker_count'    => 1,
                //     //crontad格式 :秒 分 時(shí) 天 月 年 周 // 大概是6分鐘 執(zhí)行一個(gè)周期
                //     'crontab'         => '1 * * * * * *',
                // ],
            ],
        ];
    }
    protected function configure()
    {
        $this->addArgument('param', Argument::OPTIONAL);
        // 設(shè)置命令名稱
        $this->setName($_SERVER['argv'][1])->setDescription('this is a taskphp!');
    }

    protected function execute(Input $input, Output $output)
    {
        //系統(tǒng)配置
        $config = $this->get_config();
        //加載配置信息
        \taskphp\Config::load($config);
        //定義啟動(dòng)文件入口標(biāo)記
        define("START_PATH", dirname(APP_PATH));
        //運(yùn)行框架
        \taskphp\App::run();
    }
}

主邏輯

<?php
namespace app\index\controller;

use QL\Ext\CurlMulti;
use QL\Ext\PhantomJs;
use QL\QueryList;
use think\Db;
use think\Queue;

class Test
{
    # 邏輯
    # 使用定時(shí)任務(wù)執(zhí)行所有的 make_url
    # 段查數(shù)據(jù)形成uri段 make_url
    # 把段查數(shù)據(jù)加入到多線程爬蟲里面 start
    # 得到的所有的數(shù)據(jù)加入到Redis隊(duì)列 push
    # 最后加入數(shù)據(jù)庫 [多線程使用startnojs,js可執(zhí)行文件文件會(huì)遇到進(jìn)程阻塞]

    // php think queue:listen
    // php think queue:work --daemon(不加--daemon為執(zhí)行單個(gè)任務(wù))
    // php think queue:work --queue PaChongShuJu --daemon

    /**
     * 段查門店id
     * @return [type] [description]
     */
    public function make_url()
    {
        set_time_limit(0);
        DB::table('aid')->chunk(200, function ($datas) {
            $url = [];
            foreach ($datas as $data) {
                $url[] = 'http://i.meituan.com/poi/' . $data['aid'];
            }
            $this->startnojs($url);
        });
        // 測(cè)試
        // $url = [
        //     'http://i.meituan.com/poi/71225712',
        //     'http://i.meituan.com/poi/116558576',
        // ];
        // $data['aid'] = 1;
        // $this->start($url, $data['aid']);
    }
    public function startnojs($url = [])
    {
        $ql = QueryList::getInstance();
        $ql->use(CurlMulti::class);
        $ql->rules(['html' => ['html', 'html', '']])
            ->curlMulti($url)
            ->success(function (QueryList $ql, CurlMulti $curl, $r) {
                $p   = $r['info']['url'];
                $pid = substr($p, 25);
                $ret2 = $ql->getHtml();
                preg_match("/這家店不錯(cuò)哦,一起去吧!(.*?)。\"/", $ret2, $m);
                if (!empty($m)) {
                    if (!empty($m[1])) {
                        $pieces = explode(",", $m[1]);
                        $data   = [
                            'name'    => $pieces[0],
                            'address' => $pieces[1],
                            'mobile'  => $pieces[2],
                            'p'       => $pid,
                        ];
                        $count = Db::table('info')->where('address', $pieces[1])->count();
                        if ($count > 0) {
                        } else {
                            $this->push($data);
                        }
                    }
                }
                $ql->destruct();
            })->start([
            'maxThread' => 100,
            'maxTry'    => 3,
        ]);
    }
    /**
     * 多線程+cookie爬蟲
     * @return [type] [description]
     */
    public function start($url = [])
    {
        $ql = QueryList::getInstance();
        $ql->use(CurlMulti::class);
        $ql->use(PhantomJs::class, 'D:/phantomjs-2.1.1-windows/bin/phantomjs.exe');
        $ql->rules(['html' => ['html', 'html', '']])
            ->curlMulti($url)
            ->success(function (QueryList $ql, CurlMulti $curl, $r) {
                $p   = $r['info']['url'];
                $pid = substr($p, 25);
                $ret = $ql->browser(function (\JonnyW\PhantomJs\Http\RequestInterface $r) use ($p, $pid) {
                    $r->setMethod('GET');
                    $r->addHeader('Accept', 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8');
                    // $r->addHeader('Referer', 'http://cq.meituan.com/s/%E5%90%83%E9%A5%AD/');
                    $r->addHeader('User-Agent', 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.181 YaBrowser/18.4.0.2080 Yowser/2.5 Safari/537.36');
                    $r->addHeader('Cookie', 'IKUT=9156; BAIDUID=AA818089000D26F1B318D034B442113F:FG=1; BIDUPSID=AA818089000D26F1B318D034B442113F; PSTM=1526021220; BDUSS=h5dVM2Zk56OTIydUk2TFNFZzExVDBHOFNZQXhMOH5yVHFyQTRaaWRycHdBQzViQVFBQUFBJCQAAAAAAAAAAAEAAADXQoYktKi452pjAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAHBzBltwcwZbTk; Hm_lvt_16bc67e4f6394c05d03992ea0a0e9123=1526714879,1527237021,1527237078; BDORZ=B490B5EBF6F3CD402E515D22BCDA1598; Hm_lvt_6859ce5aaf00fb00387e6434e4fcc925=1527583339,1527645854,1527650790,1527651051; PSINO=3; locale=zh; H_PS_PSSID=1449_21111; Hm_lpvt_6859ce5aaf00fb00387e6434e4fcc925=1527652553');
                    $r->setUrl($p);
                    $r->setTimeout(10000); // 10 seconds
                    $r->setDelay(3); // 3 seconds
                    return $r;
                });
                $ret2 = $ret->getHtml();
                preg_match("/這家店不錯(cuò)哦,一起去吧!(.*?)。\"/", $ret2, $m);
                if (!empty($m)) {
                    if (!empty($m[1])) {
                        $pieces = explode(",", $m[1]);
                        $data   = [
                            'name'    => $pieces[0],
                            'address' => $pieces[1],
                            'mobile'  => $pieces[2],
                            'p'       => $pid,
                        ];
                        $count = Db::table('info')->where('address', $pieces[1])->count();
                        if ($count > 0) {
                        } else {
                            $this->push($data);
                        }
                    }
                }
                $ql->destruct();
            })->start([
            'maxThread' => 20,
            'maxTry'    => 3,
        ]);
    }
    /**
     * 推送列隊(duì)
     * @param  array  $data [description]
     * @return [type]       [description]
     */
    public function push($data = [])
    {
        $jobData             = json_encode($data);
        $jobHandlerClassName = 'app\index\controller\Job';
        $jobQueueName        = "PaChongShuJu";
        $isPushed            = Queue::push($jobHandlerClassName, $jobData, $jobQueueName);
        // if ($isPushed) {
        //     echo "ok";
        // }else{
        //     var_dump($isPushed);
        // }
    }
}

job文件

<?php
namespace app\index\controller;

use think\Db;
use think\queue\Job as QueueJob;

class Job
{
    public function fire(QueueJob $job, $data)
    {
        $pieces = json_encode($data);
        $this->add_db($pieces);
        if ($job->attempts() > 3) {
            //通過這個(gè)方法可以檢查這個(gè)任務(wù)已經(jīng)重試了幾次了
            $job->delete();
        }
        //如果任務(wù)執(zhí)行成功后 記得刪除任務(wù),不然這個(gè)任務(wù)會(huì)重復(fù)執(zhí)行,直到達(dá)到最大重試次數(shù)后失敗后,執(zhí)行failed方法
        $job->delete();

        // 也可以重新發(fā)布這個(gè)任務(wù)
        // $job->release($delay); //$delay為延遲時(shí)間

    }

    public function failed($data)
    {
        // ...任務(wù)達(dá)到最大重試次數(shù)后,失敗了
    }
    public function add_db($data = [])
    {
        $data  = (array) json_decode(json_decode($data));
        $count = Db::table('info')->where('address', $data['address'])->count();
        if ($count == 0) {
            Db::table('info')->insert($data);
        }
    }
}

taskPHP任務(wù)入口程序

<?php
namespace app\index\controller;

use taskphp\Utils;
use think\Db;
use QL\Ext\CurlMulti;
use QL\Ext\PhantomJs;
use QL\QueryList;
/**
 * 測(cè)試任務(wù)
 */
class Demo
{
    /**
     * demo任務(wù)入口
     */
    public static function run()
    {
         $papachong = new \app\index\controller\Chong();
         $papachong->once();
    }
    public static function run2()
    {
        $papachong = new \app\index\controller\Chong();
         $papachong->once2();
    }
    public static function run3()
    {
        $papachong = new \app\index\controller\Test();
         $papachong->make_url();
    }

}

tp5命令行配置文件(application/command.php)

<?php

return [
    'app\index\controller\Taskphp',
    'think\queue\command\Listen',
    'think\queue\command\Restart',
    'think\queue\command\Subscribe',
    'think\queue\command\Work',
];

執(zhí)行開始

  1. 開啟redis服務(wù)器
  2. 運(yùn)行監(jiān)聽隊(duì)列
php think queue:work --queue PaChongShuJu
  1. 運(yùn)行taskPHP任務(wù)(win直接運(yùn)行目錄下面的bat文件)開始爬蟲
php think start
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容