使用 swoole_process 實現(xiàn) PHP 進程池

swoole_process 主要是用來代替 PHP 的 pcntl 擴展。我們知道 pcntl 是用來進行多進程編程的,而 pcntl 只提供了 fork 這樣原始的接口,容易使用錯誤,并且沒有提供進程間通信以及重定向標準輸入輸出的功能。

而 swoole_process 則提供了比 pcntl 更強大的功能,更易用的API,使PHP在多進程編程方面更加輕松。

本文使用 swoole_process 與 EventLoop 完成一個 php 的進程池,并且支持動態(tài)創(chuàng)建新進程。

EventLoop

swoole 有一個 Reactor 線程,這個線程可以說是對 epoll 模型的封裝,可以設(shè)置 read 事件和 write 事件的監(jiān)聽回調(diào)函數(shù)。

下面會用到一個函數(shù):

bool swoole_event_add(mixed $sock, mixed $read_callback, mixed $write_callback = null, int $flags = null);
  • 參數(shù)1為一個文件描述符,包括swoole_client->$sock、swoole_process->$pipe或者其他 fd(socket_create 創(chuàng)建的資源 , stream_socket_client/fsockopen創(chuàng)建的資源)
  • 參數(shù)2為可讀事件回調(diào)函數(shù)
  • 參數(shù)3為可寫事件回調(diào)函數(shù)

多進程編程少不了進程之間的通訊,swoole 的進程之間有兩種通信方式,一種是消息隊列(queue),另一種是管道(pipe)。那么本文使用的是 pipe 的方式。

下面是一個定時向進程池投遞任務(wù)的例子。

代碼:

<?php
class ProcessPool{

    private $process;

    /**
     * Worker 進程數(shù)組
     * @var array
     */
    private $process_list = [];

    /**
     * 正在被使用的進程
     * @var array
     */
    private $process_use = [];

    /**
     * 最少進程數(shù)量
     * @var int
     */
    private $min_worker_num = 3;

    /**
     * 最多進程數(shù)量
     * @var int
     */
    private $max_worker_num = 6;

    /**
     * 當前進程數(shù)量
     * @var int
     */
    private $current_num;


    public function __construct()
    {
        $this->process = new swoole_process(array($this, 'run'), false, 2);
        $this->process->start();
        swoole_process::wait();
    }

    public function run()
    {
        $this->current_num = $this->min_worker_num;
        //創(chuàng)建所有的worker進程
        for($i = 0; $i < $this->current_num; $i++){
            $process = new swoole_process(array($this, 'task_run'), false, 2);
            $pid = $process->start();
            $this->process_list[$pid] = $process;
            $this->process_use[$pid] = 0;
        }

        foreach($this->process_list as $process){
            swoole_event_add($process->pipe, function ($pipe) use ($process){
                $data = $process->read();
                var_dump($data . '空閑');
                //接收子進程處理完成的信息,并且重置為空閑
                $this->process_use[$data] = 0;
            });
        }

        //每秒定時向worker管道投遞任務(wù)
        swoole_timer_tick(1000 ,function ($timer_id){
            static $index = 0;
            $index = $index + 1;
            $flag = true; //是否新建worker
            foreach ($this->process_use as $pid => $used){
                if($used == 0){
                    $flag = false;
                    //標記為正在使用
                    $this->process_use[$pid] = 1;
                    // 在父進程內(nèi)調(diào)用write,子進程可以調(diào)用read接收此數(shù)據(jù)
                    $this->process_list[$pid]->write($index. "hello");
                    break;
                }
            }

            if($flag && $this->current_num < $this->max_worker_num){
                //沒有閑置worker,新建worker來處理
                $process = new swoole_process(array($this, 'task_run'), false, 2);
                $pid = $process->start();
                $this->process_list[$pid] = $process;
                $this->process_use[$pid] = 1;
                $this->process_list[$pid]->write($index. "hello");
                $this->current_num++;
            }
            var_dump('第' .$index. '個任務(wù)');
            if($index == 10){
                foreach($this->process_list as $process){
                    $process->write("exit");
                }
                swoole_timer_clear($timer_id);
                $this->process->exit();
            }

        });
    }

    /**
     * 子進程處理
     * @param $worker
     */
    public function task_run($worker)
    {
        swoole_event_add($worker->pipe, function($pipe)use($worker){
            $data = $worker->read();
            var_dump($worker->pid . ':' . $data);
            if($data == 'exit'){
                $worker->exit();
                exit;
            }
            //模擬耗時任務(wù)
            sleep(5);
            //告訴主進程處理完成
            //在子進程內(nèi)調(diào)用write,父進程可以調(diào)用read接收此數(shù)據(jù)
            $worker->write($worker->pid);
        });
    }

}

new ProcessPool();

首先定義幾個重要的屬性:

  • $process_list :Worker 進程數(shù)組
  • $process_use:正在被使用的進程
  • $min_worker_num :最少進程數(shù)量
  • $max_worker_num :最多進程數(shù)量
  • $current_num :當前進程數(shù)量
  • $process : 主進程

在實例化的時候創(chuàng)建主進程,并且運行 run 方法,在 run 方法里面先創(chuàng)建所有的 worker 進程,并且設(shè)置為空閑狀態(tài)。

接著遍歷所有的 worker 進程,并且加入 EventLoop 中,設(shè)置可讀事件,用于接收子進程的空閑信號。

最后每隔一秒向 worker 進程投遞任務(wù)。動態(tài)擴充進程池則在這里實現(xiàn),如果沒有閑置的進程,而此時又有新的任務(wù),則需要動態(tài)創(chuàng)建一個新的進程并且置為繁忙狀態(tài)。由于只模擬了十次任務(wù),則第十個任務(wù)完成之后在父進程中發(fā)送 exit 使所有子進程退出。

運行效果與圖解:
Snipaste_2018-10-27_18-36-11.png
參考鏈接:

https://wiki.swoole.com/wiki/page/p-process.html
https://opso.coding.me/2018/07/07/swoole-process/

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • swoole 安裝用的是centOSphp安裝目錄:/usr/local/php php.ini配置文件路徑:/u...
    a十二_4765閱讀 12,083評論 3 9
  • 前文再續(xù),就書接上一回,隨著與Server、TCP、Protocol的邂逅,Swoole終于迎來了自己的故事,今天...
    蝸牛淋雨閱讀 1,911評論 1 14
  • date: 2018-1-8 20:56:08title: Swoole| Swoole 中 Process 這篇...
    daydaygo閱讀 10,235評論 11 21
  • 12月1日晚:燈紅酒綠的晚宴(綠洲酒店)可能會有令你驚喜的嘉賓。請參加的伙伴接龍 1 2 3 4 5 6 7
    嘉怡寶貝閱讀 166評論 0 0
  • 李雪焦點21期2018年7月5日 堅持讀書打卡第656天 成功就是和時間死磕 有些等待一開始并不被人理解,...
    窗邊的小米豆閱讀 982評論 0 0

友情鏈接更多精彩內(nèi)容