swoole_process 主要是用來代替 PHP 的 pcntl 擴展。我們知道 pcntl 是用來進行多進程編程的,而 pcntl 只提供了 fork 這樣原始的接口,容易使用錯誤,并且沒有提供進程間通信以及重定向標準輸入輸出的功能。
而 swoole_process 則提供了比 pcntl 更強大的功能,更易用的API,使PHP在多進程編程方面更加輕松。
本文使用 swoole_process 與 EventLoop 完成一個 php 的進程池,并且支持動態(tài)創(chuàng)建新進程。
EventLoop
swoole 有一個 Reactor 線程,這個線程可以說是對 epoll 模型的封裝,可以設(shè)置 read 事件和 write 事件的監(jiān)聽回調(diào)函數(shù)。
下面會用到一個函數(shù):
bool swoole_event_add(mixed $sock, mixed $read_callback, mixed $write_callback = null, int $flags = null);
- 參數(shù)1為一個文件描述符,包括
swoole_client->$sock、swoole_process->$pipe或者其他 fd(socket_create創(chuàng)建的資源 ,stream_socket_client/fsockopen創(chuàng)建的資源) - 參數(shù)2為可讀事件回調(diào)函數(shù)
- 參數(shù)3為可寫事件回調(diào)函數(shù)
多進程編程少不了進程之間的通訊,swoole 的進程之間有兩種通信方式,一種是消息隊列(queue),另一種是管道(pipe)。那么本文使用的是 pipe 的方式。
下面是一個定時向進程池投遞任務(wù)的例子。
代碼:
<?php
class ProcessPool{
private $process;
/**
* Worker 進程數(shù)組
* @var array
*/
private $process_list = [];
/**
* 正在被使用的進程
* @var array
*/
private $process_use = [];
/**
* 最少進程數(shù)量
* @var int
*/
private $min_worker_num = 3;
/**
* 最多進程數(shù)量
* @var int
*/
private $max_worker_num = 6;
/**
* 當前進程數(shù)量
* @var int
*/
private $current_num;
public function __construct()
{
$this->process = new swoole_process(array($this, 'run'), false, 2);
$this->process->start();
swoole_process::wait();
}
public function run()
{
$this->current_num = $this->min_worker_num;
//創(chuàng)建所有的worker進程
for($i = 0; $i < $this->current_num; $i++){
$process = new swoole_process(array($this, 'task_run'), false, 2);
$pid = $process->start();
$this->process_list[$pid] = $process;
$this->process_use[$pid] = 0;
}
foreach($this->process_list as $process){
swoole_event_add($process->pipe, function ($pipe) use ($process){
$data = $process->read();
var_dump($data . '空閑');
//接收子進程處理完成的信息,并且重置為空閑
$this->process_use[$data] = 0;
});
}
//每秒定時向worker管道投遞任務(wù)
swoole_timer_tick(1000 ,function ($timer_id){
static $index = 0;
$index = $index + 1;
$flag = true; //是否新建worker
foreach ($this->process_use as $pid => $used){
if($used == 0){
$flag = false;
//標記為正在使用
$this->process_use[$pid] = 1;
// 在父進程內(nèi)調(diào)用write,子進程可以調(diào)用read接收此數(shù)據(jù)
$this->process_list[$pid]->write($index. "hello");
break;
}
}
if($flag && $this->current_num < $this->max_worker_num){
//沒有閑置worker,新建worker來處理
$process = new swoole_process(array($this, 'task_run'), false, 2);
$pid = $process->start();
$this->process_list[$pid] = $process;
$this->process_use[$pid] = 1;
$this->process_list[$pid]->write($index. "hello");
$this->current_num++;
}
var_dump('第' .$index. '個任務(wù)');
if($index == 10){
foreach($this->process_list as $process){
$process->write("exit");
}
swoole_timer_clear($timer_id);
$this->process->exit();
}
});
}
/**
* 子進程處理
* @param $worker
*/
public function task_run($worker)
{
swoole_event_add($worker->pipe, function($pipe)use($worker){
$data = $worker->read();
var_dump($worker->pid . ':' . $data);
if($data == 'exit'){
$worker->exit();
exit;
}
//模擬耗時任務(wù)
sleep(5);
//告訴主進程處理完成
//在子進程內(nèi)調(diào)用write,父進程可以調(diào)用read接收此數(shù)據(jù)
$worker->write($worker->pid);
});
}
}
new ProcessPool();
首先定義幾個重要的屬性:
- $process_list :Worker 進程數(shù)組
- $process_use:正在被使用的進程
- $min_worker_num :最少進程數(shù)量
- $max_worker_num :最多進程數(shù)量
- $current_num :當前進程數(shù)量
- $process : 主進程
在實例化的時候創(chuàng)建主進程,并且運行 run 方法,在 run 方法里面先創(chuàng)建所有的 worker 進程,并且設(shè)置為空閑狀態(tài)。
接著遍歷所有的 worker 進程,并且加入 EventLoop 中,設(shè)置可讀事件,用于接收子進程的空閑信號。
最后每隔一秒向 worker 進程投遞任務(wù)。動態(tài)擴充進程池則在這里實現(xiàn),如果沒有閑置的進程,而此時又有新的任務(wù),則需要動態(tài)創(chuàng)建一個新的進程并且置為繁忙狀態(tài)。由于只模擬了十次任務(wù),則第十個任務(wù)完成之后在父進程中發(fā)送 exit 使所有子進程退出。
運行效果與圖解:

參考鏈接:
https://wiki.swoole.com/wiki/page/p-process.html
https://opso.coding.me/2018/07/07/swoole-process/