Nodejs cluster 模塊

cluster 和 child_process 模塊子進程的區(qū)別

child_process 執(zhí)行 shell 命令、利用多進程執(zhí)行代碼
cluster 通過多進程 master、worker 實現多個 HTTP 應用服務器架構

總結寫前面

cluster 模塊是 node 利用多進程處理網絡連接的應用架構
cluster 通過進程 IPC 通道共享主進程的 server handle 句柄創(chuàng)建 socket 文件描述符 實現子進程共同監(jiān)聽同一端口
cluster 在 http 網絡請求中采用 RoundRobin 輪詢的負載均衡方式對 woker 進行調度

框架圖

http createServer 時 child 通過 IPC 通道獲取 master 的 server.handle 流程

多進程通信,子進程監(jiān)聽同一端口為什么不沖突

不同進程之間的 server 通過 IPC 通道共享監(jiān)聽某個端口的 socket 連接句柄來解決沖突。

// master.js
const { createServer } = require('net')
const { fork } = require('child_process')
const cpus = require('os').cpus()

const netServer = createServer().listen(3000) // create TCP server
for (let i = 0; i < cpus.length; i++) {
  const worker = fork('worker.js')
  worker.send('server', netServer)
  console.log('worker process created, pid: %s ppid: %s', worker.pid, process.pid);
}

// worker.js
const http = require('http')

const server = http.createServer((req, res) => { //   this.on('connection', connectionListener);
  res.end('I am worker, pid: ' + process.pid + ', ppid: ' + process.ppid);
})

let _handle
process.on('message', (msg, handle) => {
  if (msg !== 'server') return
  _handle = handle
  _handle.on('connection', (socket) => { // _http_server.js 中實現, this.on('connection', connectionListener)
    server.emit('connection', socket); // 與子進程 server 共享 socket 處理連接后執(zhí)行子進程 http.createServer 的 callback
  })
})

server 共享 socket 過程

看下 createServer 的處理過程就可以知道 server.emit('connection', socket); 是如何共享 socket 并何時觸發(fā) createServer 回調對用戶進行響應的

// Server 構造函數
function Server {
 ... 
  if (requestListener) {
    this.on('request', requestListener);
  }

  this.on('connection', connectionListener);
}
function connectionListener(socket) {
  defaultTriggerAsyncIdScope(
    getOrSetAsyncId(socket), connectionListenerInternal, this, socket
  );
}
function connectionListenerInternal(server, socket) {
// ...
  parser.onIncoming = parserOnIncoming.bind(undefined, server, socket, state)
}
function resOnFinish(req, res, socket, state, server) {
  // ...
  res.detachSocket(socket); // 關閉 socket
  // ...
}

function parserOnIncoming(server, socket, state, req, keepAlive) {
  // ...
  res.on('finish', resOnFinish.bind(undefined, req, res, socket, state, server));

  if (req.headers.expect !== undefined &&
      (req.httpVersionMajor === 1 && req.httpVersionMinor === 1)) {
  // ...
  } else {
    server.emit('request', req, res);
  }
}

cluster 源碼

// cluster.js
const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
  console.log(`Master 進程 ${process.pid} 正在運行`);

  for (let i = 0; i < 1; i++) { // 衍生工作進程。
    cluster.fork();
  }

  cluster.on('exit', (worker, code, signal) => { console.log(`Worker ${worker.process.pid} 已退出`) });
} else {
  http.createServer((req, res) => res.end(`你好世界 ${process.pid}`)).listen(8000);
  console.log(`Worker 進程 ${process.pid} 已啟動`);
}

主進程創(chuàng)建子進程 cluster fork

createWorkerProcess 通過 child_process fork 創(chuàng)建子進程源碼
在主進程中 cluster.fork 通過 child_process fork 創(chuàng)建子進程

function createWorkerProcess(id, env) {
// ...
  return fork(cluster.settings.exec, cluster.settings.args, {
    cwd: cluster.settings.cwd,
    env: workerEnv,
    serialization: cluster.settings.serialization,
    silent: cluster.settings.silent,
    windowsHide: cluster.settings.windowsHide,
    execArgv: execArgv,
    stdio: cluster.settings.stdio,
    gid: cluster.settings.gid,
    uid: cluster.settings.uid
  });
}

在子進程中的 http.createServer

當在子進程通過 createServer 并 listen 端口時,net 模塊會根據 isMaster 來當前進程是主進程直接監(jiān)聽端口,當前進程是子進程則通過 IPC 通道獲取 master 進程的服務器(server)句柄(handle),并監(jiān)聽(listen)它。

http createServer 時 child 通過 IPC 通道獲取 master 的 server.handle 流程

listenInCluster

http.createServer().listen(port) 會執(zhí)行 net 模塊的 Server.prototype.listen 方法 調用 listenInCluster,在此方法中根據 isMaster 判斷,子進程時通過 cluster 模塊 cluster._getServer 方法與 master 建立 IPC 通道獲取 master 中 創(chuàng)建 server 的 handle 并在子進程代碼中進行 listen。
listenInCluster 源碼

function listenInCluster(server, address, port, addressType, backlog, fd, exclusive, flags) {
  if (cluster === undefined) cluster = require('cluster');

  if (cluster.isMaster || exclusive) {
    server._listen2(address, port, addressType, backlog, fd, flags);
    return;
  }

  const serverQuery = {
    address: address,
    port: port,
    addressType: addressType,
    fd: fd,
    flags,
  };

  // 通過 IPC 通道獲取 master server 的 handle 進行監(jiān)聽
  cluster._getServer(server, serverQuery, listenOnMasterHandle);

  function listenOnMasterHandle(err, handle) {
    err = checkBindError(err, port, handle);

    if (err) {
      const ex = exceptionWithHostPort(err, 'bind', address, port);
      return server.emit('error', ex);
    }

    server._handle = handle; // 重用 master handle

    server._listen2(address, port, addressType, backlog, fd, flags);
  }
}

子進程 cluster._getServer

子進程中 _getServer 向 master 通過 IPC 發(fā)送 node 內部消息為 act: 'queryServer ' 的通信獲取 master handle
子進程 cluster._getServer

// lib/internal/cluster/master.js
// obj 在 http 請求 TCP 連接中是 net 的 Server 實例,UDP 連接是 dgram 的 Socket 實例
cluster._getServer = function(obj, options, cb) {
  ...
  const message = {
    act: 'queryServer',
    index,
    data: null,
    ...options
  };

  message.address = address;

  if (obj._getServerData)
    message.data = obj._getServerData();
  // 向 master 發(fā)送 querServer 消息
  send(message, (reply, handle) => {
    if (typeof obj._setServerData === 'function')
      obj._setServerData(reply.data);

    if (handle)
      shared(reply, handle, indexesKey, cb);  // Shared listen socket. UDP 連接處理方式
    else
      rr(reply, indexesKey, cb);              // Round-robin. TCP 連接 rr 模式
  });

  obj.once('listening', () => {
    ...
  }
}

主進程 master 中 queryServer

// master 監(jiān)聽內部消息
function onmessage(message, handle) {
  const worker = this;

  if (message.act === 'online')
    online(worker);
  else if (message.act === 'queryServer')
    queryServer(worker, message); // queryServer
  else if (message.act === 'listening')
    listening(worker, message);
  else if (message.act === 'exitedAfterDisconnect')
    exitedAfterDisconnect(worker, message);
  else if (message.act === 'close')
    close(worker, message);
}

queryServer 當不存在 RoundRobinHandle 實例時會創(chuàng)建一個, 通過 RoundRobinHandle 原型 add 方法添加 woker 到 實例 this.all 屬性中,用來進行 master 對 worker 的負載均衡策略。

function queryServer(worker, message) {
  ...
  let handle = handles.get(key);
  // 創(chuàng)建 TCP RoundRobinHandle rr 實例, master 邏輯
  if (handle === undefined) {
    ...
    let constructor = RoundRobinHandle;
    handle = new constructor(key, address, message);
    handles.set(key, handle);
  }
  ...
  handle.add(worker, (errno, reply, handle) => {
    const { data } = handles.get(key);

    if (errno)
      handles.delete(key);  // Gives other workers a chance to retry.
     // handle.add 后去執(zhí)行子進程 queryServe 的 cb,告知采用 UDP 或 TCP
    send(worker, {
      errno,
      key,
      ack: message.seq,
      data,
      ...reply
    }, handle);
  });
}

RoundRobinHandle 實例創(chuàng)建

創(chuàng)建 server,重寫 server._handle 句柄的 onconnection 改用輪詢的方式分發(fā)句柄給子進程處理

在 master 進程中會接收、傳遞請求給 worker 處理,RoundRobinHandle 的作用就是用來對 woker 進行分發(fā)、任務交接、調度的負載均衡策略,同時進程間共享的 TCP server handle 是在 RoundRobinHandle 實例創(chuàng)建時生成的。

  • RoundRobinHandle 實例創(chuàng)建,重寫 server._handle.onconnection 處理請求
    通過傳入無操作的回調用 net.createServer 創(chuàng)建 server 并監(jiān)聽端口,通過對 server.once('listening') 的監(jiān)聽重寫 this.server._handle 的 onconnection,當 server 的 handle 遇到 connection 事件時將會使用 RoundRobinHandle 實例的 distribute 進行 handle 的分發(fā)
// lib/internal/cluster/round_robin_handle.js
function RoundRobinHandle(key, address, { port, fd, flags }) {
  this.key = key;
  this.all = new Map(); // 所有的 woker 
  this.free = new Map(); // 空閑可用的 woker
  this.handles = [];
  this.handle = null;
  this.server = net.createServer(assert.fail); // assert.fail typeof Function, 這里給了個沒用的 onconnection 回調用來生成 server

  if (fd >= 0)
    this.server.listen({ fd });
  else if (port >= 0) { // fd: undefined, port: 9000
    this.server.listen({ // 監(jiān)聽 address port, 觸發(fā) listening 事件
      port,
      host: address,
      // Currently, net module only supports `ipv6Only` option in `flags`.
      ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY),
    });
  } else
    this.server.listen(address);  // UNIX socket path.
  // 在調用 server.listen() 后綁定服務器時觸發(fā)。
  this.server.once('listening', () => {
    this.handle = this.server._handle;
    this.handle.onconnection = (err, handle) => this.distribute(err, handle); // 改寫 net.Server onconnection
    this.server._handle = null;
    this.server = null;
  });
}

RoundRobinHandle 輪詢分配策略

RoundRobinHandle 通過輪詢分配 handle 給 woker 的負載策略共享 handle 的 socket 解決子進程共同監(jiān)聽一個端口處理請求。



最后就回到文章中最開始 server 共享 socket 過程 中觸發(fā) createServer((req, res) => {}) 回調的內容。

參考

源碼分析
cluster-base
cluster 模塊的主要功能實現

egg-cluster 模塊的實現

cluster 模塊是用來處理網絡連接的多進程模塊,egg-cluster 通過 cluster 模塊對 egg 進行多進程管理的基礎模塊
在 egg-cluster 中:
master 主進程類似守護進程在后臺執(zhí)行
agent 是由 child_process 模塊 fork 創(chuàng)建,當 master 退出時會優(yōu)雅的退出 agent 進程(防止變?yōu)楣聝哼M程被系統(tǒng) init 收養(yǎng) parentId: 0)
woker 是由 cluster 模塊 fork 創(chuàng)建,用來處理 http 請求
可以參考文章 egg-cluster

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容