話不多說直接上代碼
/**
* Created by PhpStorm.
* User: lyb
* Date: 2022/2/11
* Time: 13:28
*/
require_once __DIR__ . '/vendor/autoload.php'; //composer install?php-amqplib
class OrderMq
{
? ? public function __construct() {
? ? ? ? $this->serv = new swoole_http_server(
? ? ? ? ? ? "0.0.0.0",
? ? ? ? ? ? "9502"
? ? ? ? );
? ? ? ? $this->serv->set(array(
? ? ? ? ? ? 'worker_num' => 2,? //工作進程數(shù)量
? ? ? ? ? ? 'max_conn' => 1000,
? ? ? ? ? ? 'max_request' => 10000,
? ? ? ? ? // 'debug_mode'=> 1,
? ? ? ? ? 'task_worker_num'=>2,
? ? ? ? ? // 'daemonize' => true, //是否作為守護進程
? ? ? ? ));
? ? ? ? //內(nèi)存表
? ? ? ? $time_table = new swoole_table(10);
? ? ? ? $time_table->column( "time", swoole_table::TYPE_STRING, 20);
? ? ? ? $time_table->create();
? ? ? ? $this->serv->tb = $time_table;
? ? ? ? $this->serv->tb->set('time-1', ['time' =>time()]);
? ? ? ? $this->serv->on('Start', array($this, 'onStart'));
? ? ? ? $this->serv->on('WorkerStart', array($this, 'onWorkerStart'));
? ? ? ? $this->serv->on('connect', array($this, 'onConnect'));
? ? ? ? $this->serv->on('request' , array( $this , 'onRequest'));
? ? ? ? $this->serv->on('Receive', array($this, 'onReceive'));
? ? ? ? $this->serv->on('Close', array($this, 'onClose'));
? ? ? ? $this->serv->on('Task', array($this, 'onTask'));
? ? ? ? $this->serv->on('Finish', array($this, 'onFinish'));
? ? ? ? $this->serv->start();
}
? ? public function onStart( $serv ) {
? ? ? echo "Start\n";
? ? ? ? swoole_timer_after(3000, function () {
? ? ? ? ? ? echo "after 3000ms.\n";
});
}
? ? public function onRequest($request, $response){
? ? ? /// $this->serv->task("");
? ? ? ? $response->end("hello world");
}
? ? public function onConnect($serv, $fd){
? ? ? ? //echo "Connected\n";
? ? }
? ? public function onWorkerStart( $serv , $workerId) {
? ? ? ? swoole_set_process_name("swoole_mq_9502 master {$workerId}");
? ? ? ? echo "Worker#{$workerId} is started\n";
? ? ? ? $tainfo = $serv->tb->get("time-1");
? ? ? ? var_dump($tainfo['time']);
? ? ? ? try {
? ? ? ? ? ? $AMQConfig = array(
? ? ? ? ? ? ? ? 'host' => '*',
? ? ? ? ? ? ? ? 'port' => '5672',
? ? ? ? ? ? ? ? 'login' => '*',
? ? ? ? ? ? ? ? 'password' => '*',
? ? ? ? ? ? ? ? 'vhost'=>'*
? ? ? ? ? ? );
? ? ? ? ? ? // 建立連接
? ? ? ? ? ? $conn =? new \PhpAmqpLib\Connection\AMQPStreamConnection($AMQConfig['host'],$AMQConfig['port'],$AMQConfig['login'],$AMQConfig['password'],$AMQConfig['vhost'],
? ? ? ? ? ? ? ? false,
? ? ? ? ? ? ? ? 'AMQPLAIN',
? ? ? ? ? ? ? ? null,
? ? ? ? ? ? ? ? 'en_US',
? ? ? ? ? ? ? ? 3.0,
? ? ? ? ? ? ? ? 3.0,
? ? ? ? ? ? ? ? null,
? ? ? ? ? ? ? ? false,
? ? ? ? ? ? ? ? 10,//這里一定要配置自動心態(tài)檢測,不加時間長會斷開
? ? ? ? ? ? ? ? 0.0,
? ? ? ? ? ? ? ? null
? ? ? ? ? ? ? ? );
? ? ? ? ? ? // 創(chuàng)建通道
? ? ? ? ? ? $channel = $conn->channel();
? ? ? ? ? ? // 創(chuàng)建交換機
/**
* name:xxx? ? ? ? ? ? 交換機名稱
* type:direct? ? ? ? ? 類型 fanut,direct,topic,headers
* passive:false? ? ? ? 不存在自動創(chuàng)建,如果設置true的話,返回OK,否則失敗
* durable:false? ? ? ? 是否持久化
* auto_delete:false? ? 自動刪除,最后一個
*/
//$channel->exchange_declare($AMQConfig['vhost'],AMQP_EX_TYPE_DIRECT,false,true,false);
// 創(chuàng)建隊列
/**
* name:xxx? ? ? ? ? ? 隊列名稱
* passive:false? ? ? ? 不存在自動創(chuàng)建,如果設置true的話,返回OK,否則失敗
* durable:false? ? ? ? 是否持久化
* exclusive:false? ? ? 是否排他,如果為true的話,只對當前連接有效,連接斷開后自動刪除
* auto_delete:false? ? 自動刪除,最后一個
*/
? ? ? ? ? ? $queueName = "*";
? ? ? ? ? $channel->queue_declare($queueName,false,true,false,false);
? ? ? ? ? ? // 綁定
/**
* $queue? ? ? ? ? 隊列名稱
* $exchange? ? ? ? 交換機名稱
* $routing_key? ? 路由名稱
*/
? ? ? ? ? ? $routeKey = "*";
? ? ? ? ? ? //? ? $channel->queue_bind($queueName,'rrd',$routeKey);
// 消費
/**
* $queue = '',? ? ? ? 被消費隊列名稱
* $consumer_tag = '',? 消費者客戶端標識,用于區(qū)分客戶端
* $no_local = false,? 這個功能屬于amqp的標準,但是rabbitmq未實現(xiàn)
* $no_ack = false,? ? 收到消息后,是否要ack應答才算被消費
* $exclusive = false,? 是否排他,即為這個隊列只能由一個消費者消費,適用于任務不允許并發(fā)處理
* $nowait = false,? ? 不返回直接結果,但是如果排他開啟的話,則必須需要等待結果的,如果二個都開啟會報錯
* $callback = null,? ? 回調(diào)函數(shù)處理邏輯
*/
// 回調(diào)
? ? ? ? ? ? $callback = function ($msg) use ($workerId) {
? ? ? ? ? ? ? ? var_dump($workerId."-----{$msg->body}");
? ? ? ? ? ? ? // echo "tag:".$msg->delivery_info["delivery_tag"]."
";
?$msg->delivery_info["channel"]->basic_ack($msg->delivery_info["delivery_tag"]);
};
? ? ? ? ? ? $channel->basic_qos(null, 1, null);
? ? ? ? ? ? $channel->basic_consume($queueName,"",false,false,false,false,$callback);
? ? ? ? ? ? if($channel->is_consuming())
{
? ? ? ? ? ? ? ? echo 'mq鏈接成功';
}
? ? ? ? ? ? else
? ? ? ? ? ? {
? ? ? ? ? ? ? ? $this->write_log("mq斷開連接");
}
? ? ? ? ? ? // 監(jiān)聽
? ? ? ? ? ? while ($channel->is_consuming()) {
? ? ? ? ? ? ? ? $channel->wait();
}
? ? ? ? } catch (\Throwable $e) {
? ? ? ? ? $this->write_log("錯誤捕獲:".$e->getMessage());
}
? ? }
public function onReceive(swoole_server $serv, $fd, $from_id, $data){
? ? ? ? $serv->task("");
}
? ? public function onClose( $serv, $fd, $from_id ) {
? ? ? ? echo "Client {$fd} close connection\n";
}
? ? function write_log($txt){
? ? ? $date=date("Ymd");
? ? ? ? //日志內(nèi)容
? ? ? ? $content = [
? ? ? ? ? ? 'date:' => date("Ymd H:i:s"),
? ? ? ? ? ? 'txt:' => $txt,
];
? ? //存入文件
? ? ? ? swoole_async_writefile(__DIR__."/{$date}.txt", json_encode($content,JSON_UNESCAPED_UNICODE).PHP_EOL, function($filename){
? ? ? ? }, FILE_APPEND);
}
? ? public function onTask($serv,$task_id,$from_id, $data) {
? ? ? ? //準備在這里把消息發(fā)送到rabbitmq隊列里
? ? }
? ? public function onFinish($serv,$task_id, $data) {
}
? ? //是否為json
? ? function is_json($data = '', $assoc = false) {
? ? ? ? $data = json_decode($data, $assoc);
? ? ? ? if ($data && (is_object($data)) || (is_array($data) && !empty(current($data)))) {
? ? ? ? ? ? return $data;
}
? ? ? ? return false;
}
}
new OrderMq();