swoole RabbitMQ長鏈接,預防假死或斷開鏈接

話不多說直接上代碼

/**

* Created by PhpStorm.

* User: lyb

* Date: 2022/2/11

* Time: 13:28

*/

require_once __DIR__ . '/vendor/autoload.php'; //composer install?php-amqplib

class OrderMq

{

? ? public function __construct() {

? ? ? ? $this->serv = new swoole_http_server(

? ? ? ? ? ? "0.0.0.0",

? ? ? ? ? ? "9502"

? ? ? ? );

? ? ? ? $this->serv->set(array(

? ? ? ? ? ? 'worker_num' => 2,? //工作進程數(shù)量

? ? ? ? ? ? 'max_conn' => 1000,

? ? ? ? ? ? 'max_request' => 10000,

? ? ? ? ? // 'debug_mode'=> 1,

? ? ? ? ? 'task_worker_num'=>2,

? ? ? ? ? // 'daemonize' => true, //是否作為守護進程

? ? ? ? ));

? ? ? ? //內(nèi)存表

? ? ? ? $time_table = new swoole_table(10);

? ? ? ? $time_table->column( "time", swoole_table::TYPE_STRING, 20);

? ? ? ? $time_table->create();

? ? ? ? $this->serv->tb = $time_table;

? ? ? ? $this->serv->tb->set('time-1', ['time' =>time()]);

? ? ? ? $this->serv->on('Start', array($this, 'onStart'));

? ? ? ? $this->serv->on('WorkerStart', array($this, 'onWorkerStart'));

? ? ? ? $this->serv->on('connect', array($this, 'onConnect'));

? ? ? ? $this->serv->on('request' , array( $this , 'onRequest'));

? ? ? ? $this->serv->on('Receive', array($this, 'onReceive'));

? ? ? ? $this->serv->on('Close', array($this, 'onClose'));

? ? ? ? $this->serv->on('Task', array($this, 'onTask'));

? ? ? ? $this->serv->on('Finish', array($this, 'onFinish'));

? ? ? ? $this->serv->start();

}

? ? public function onStart( $serv ) {

? ? ? echo "Start\n";

? ? ? ? swoole_timer_after(3000, function () {

? ? ? ? ? ? echo "after 3000ms.\n";

});

}

? ? public function onRequest($request, $response){

? ? ? /// $this->serv->task("");

? ? ? ? $response->end("hello world");

}

? ? public function onConnect($serv, $fd){

? ? ? ? //echo "Connected\n";

? ? }

? ? public function onWorkerStart( $serv , $workerId) {

? ? ? ? swoole_set_process_name("swoole_mq_9502 master {$workerId}");

? ? ? ? echo "Worker#{$workerId} is started\n";

? ? ? ? $tainfo = $serv->tb->get("time-1");

? ? ? ? var_dump($tainfo['time']);

? ? ? ? try {

? ? ? ? ? ? $AMQConfig = array(

? ? ? ? ? ? ? ? 'host' => '*',

? ? ? ? ? ? ? ? 'port' => '5672',

? ? ? ? ? ? ? ? 'login' => '*',

? ? ? ? ? ? ? ? 'password' => '*',

? ? ? ? ? ? ? ? 'vhost'=>'*

? ? ? ? ? ? );

? ? ? ? ? ? // 建立連接

? ? ? ? ? ? $conn =? new \PhpAmqpLib\Connection\AMQPStreamConnection($AMQConfig['host'],$AMQConfig['port'],$AMQConfig['login'],$AMQConfig['password'],$AMQConfig['vhost'],

? ? ? ? ? ? ? ? false,

? ? ? ? ? ? ? ? 'AMQPLAIN',

? ? ? ? ? ? ? ? null,

? ? ? ? ? ? ? ? 'en_US',

? ? ? ? ? ? ? ? 3.0,

? ? ? ? ? ? ? ? 3.0,

? ? ? ? ? ? ? ? null,

? ? ? ? ? ? ? ? false,

? ? ? ? ? ? ? ? 10,//這里一定要配置自動心態(tài)檢測,不加時間長會斷開

? ? ? ? ? ? ? ? 0.0,

? ? ? ? ? ? ? ? null

? ? ? ? ? ? ? ? );

? ? ? ? ? ? // 創(chuàng)建通道

? ? ? ? ? ? $channel = $conn->channel();

? ? ? ? ? ? // 創(chuàng)建交換機

/**

* name:xxx? ? ? ? ? ? 交換機名稱

* type:direct? ? ? ? ? 類型 fanut,direct,topic,headers

* passive:false? ? ? ? 不存在自動創(chuàng)建,如果設置true的話,返回OK,否則失敗

* durable:false? ? ? ? 是否持久化

* auto_delete:false? ? 自動刪除,最后一個

*/

//$channel->exchange_declare($AMQConfig['vhost'],AMQP_EX_TYPE_DIRECT,false,true,false);

// 創(chuàng)建隊列

/**

* name:xxx? ? ? ? ? ? 隊列名稱

* passive:false? ? ? ? 不存在自動創(chuàng)建,如果設置true的話,返回OK,否則失敗

* durable:false? ? ? ? 是否持久化

* exclusive:false? ? ? 是否排他,如果為true的話,只對當前連接有效,連接斷開后自動刪除

* auto_delete:false? ? 自動刪除,最后一個

*/

? ? ? ? ? ? $queueName = "*";

? ? ? ? ? $channel->queue_declare($queueName,false,true,false,false);

? ? ? ? ? ? // 綁定

/**

* $queue? ? ? ? ? 隊列名稱

* $exchange? ? ? ? 交換機名稱

* $routing_key? ? 路由名稱

*/

? ? ? ? ? ? $routeKey = "*";

? ? ? ? ? ? //? ? $channel->queue_bind($queueName,'rrd',$routeKey);

// 消費

/**

* $queue = '',? ? ? ? 被消費隊列名稱

* $consumer_tag = '',? 消費者客戶端標識,用于區(qū)分客戶端

* $no_local = false,? 這個功能屬于amqp的標準,但是rabbitmq未實現(xiàn)

* $no_ack = false,? ? 收到消息后,是否要ack應答才算被消費

* $exclusive = false,? 是否排他,即為這個隊列只能由一個消費者消費,適用于任務不允許并發(fā)處理

* $nowait = false,? ? 不返回直接結果,但是如果排他開啟的話,則必須需要等待結果的,如果二個都開啟會報錯

* $callback = null,? ? 回調(diào)函數(shù)處理邏輯

*/

// 回調(diào)

? ? ? ? ? ? $callback = function ($msg) use ($workerId) {

? ? ? ? ? ? ? ? var_dump($workerId."-----{$msg->body}");

? ? ? ? ? ? ? // echo "tag:".$msg->delivery_info["delivery_tag"]."
";

?$msg->delivery_info["channel"]->basic_ack($msg->delivery_info["delivery_tag"]);

};

? ? ? ? ? ? $channel->basic_qos(null, 1, null);

? ? ? ? ? ? $channel->basic_consume($queueName,"",false,false,false,false,$callback);

? ? ? ? ? ? if($channel->is_consuming())

{

? ? ? ? ? ? ? ? echo 'mq鏈接成功';

}

? ? ? ? ? ? else

? ? ? ? ? ? {

? ? ? ? ? ? ? ? $this->write_log("mq斷開連接");

}

? ? ? ? ? ? // 監(jiān)聽

? ? ? ? ? ? while ($channel->is_consuming()) {

? ? ? ? ? ? ? ? $channel->wait();

}

? ? ? ? } catch (\Throwable $e) {

? ? ? ? ? $this->write_log("錯誤捕獲:".$e->getMessage());

}

? ? }


public function onReceive(swoole_server $serv, $fd, $from_id, $data){

? ? ? ? $serv->task("");

}

? ? public function onClose( $serv, $fd, $from_id ) {

? ? ? ? echo "Client {$fd} close connection\n";

}

? ? function write_log($txt){

? ? ? $date=date("Ymd");

? ? ? ? //日志內(nèi)容

? ? ? ? $content = [

? ? ? ? ? ? 'date:' => date("Ymd H:i:s"),

? ? ? ? ? ? 'txt:' => $txt,

];

? ? //存入文件

? ? ? ? swoole_async_writefile(__DIR__."/{$date}.txt", json_encode($content,JSON_UNESCAPED_UNICODE).PHP_EOL, function($filename){

? ? ? ? }, FILE_APPEND);

}

? ? public function onTask($serv,$task_id,$from_id, $data) {

? ? ? ? //準備在這里把消息發(fā)送到rabbitmq隊列里

? ? }

? ? public function onFinish($serv,$task_id, $data) {

}

? ? //是否為json

? ? function is_json($data = '', $assoc = false) {

? ? ? ? $data = json_decode($data, $assoc);

? ? ? ? if ($data && (is_object($data)) || (is_array($data) && !empty(current($data)))) {

? ? ? ? ? ? return $data;

}

? ? ? ? return false;

}

}

new OrderMq();

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容