
原文鏈接:Understanding Node.js Event-Driven Architecture
作者:Samer Buna
譯者:李序鍇
本文已獲得作者授權(quán),轉(zhuǎn)載請(qǐng)注明出處。
大多數(shù)的Node對(duì)象(如HTTP requests, responses以及streams)都實(shí)現(xiàn)了EventEmitter模塊,這樣它們就能夠觸發(fā)和監(jiān)聽事件。

事件驅(qū)動(dòng)特性最簡單的形式就是通用的Node.js函數(shù)當(dāng)中的一些回調(diào)風(fēng)格(例如:fs.readFile)。在這個(gè)類比中,事件會(huì)立即啟動(dòng)(當(dāng)Node準(zhǔn)備調(diào)用回調(diào)時(shí))而回調(diào)則充當(dāng)事件處理器的角色。
當(dāng)你準(zhǔn)備好請(qǐng)調(diào)用我,Node!
Node處理異步事件的最初方式就是通過回調(diào)。而這已經(jīng)是JavaScript擁有原生promises支持以及async/await特性之前很就以前的事情了。
回調(diào)基本上都是你傳遞給其它函數(shù)的函數(shù)。這可能是因?yàn)樵贘avaScript當(dāng)中函數(shù)是第一類對(duì)象。
回調(diào)并不會(huì)在代碼當(dāng)中注明它是異步調(diào)用,理解這一點(diǎn)很關(guān)鍵。函數(shù)可以通過同步和異步兩種方式觸發(fā)回調(diào)。例如:如下是一個(gè)宿主函數(shù)fileSize,它接收一個(gè)回調(diào)函數(shù)cb,根據(jù)條件的不同它可以執(zhí)行同步和異步兩種方式的回調(diào)。
function fileSize (fileName, cb) {
if (typeof fileName !== 'string') {
return cb(new TypeError('argument should be string')); // Sync
}
fs.stat(fileName, (err, stats) => {
if (err) { return cb(err); } // Async
cb(null, stats.size); // Async
});
}
但是這是一種會(huì)導(dǎo)致預(yù)料之外錯(cuò)誤的糟糕實(shí)踐。它使得宿主函數(shù)總是以同步或者異步方式執(zhí)行回調(diào)。
我們來探尋一個(gè)使用回調(diào)風(fēng)格書寫的異步Node函數(shù)的典型例子。
const readFileAsArray = function(file, cb) {
fs.readFile(file, function(err, data) {
if (err) {
return cb(err);
}
const lines = data.toString().trim().split('\n');
cb(null, lines);
});
};
readFileAsArray接受文件路徑和回調(diào)函數(shù)作為參數(shù)。其讀取文件內(nèi)容并將文件內(nèi)容分割為數(shù)組,再利用該數(shù)組調(diào)用回調(diào)函數(shù)。
下方是一個(gè)應(yīng)用實(shí)例。假設(shè)我們?cè)谕?jí)目錄下有一個(gè)numbers.txt文件,其內(nèi)容如下:
10
11
12
13
14
15
假設(shè)我們的任務(wù)是統(tǒng)計(jì)該文件當(dāng)中的奇數(shù)個(gè)數(shù),我們可以使用readFileAsArray來簡化代碼:
readFileAsArray('./numbers.txt', (err, lines) => {
if (err) throw err;
const numbers = lines.map(Number);
const oddNumbers = numbers.filter(n => n%2 === 1);
console.log('Odd numbers count:', oddNumbers.length);
});
這段代碼將數(shù)字內(nèi)容讀取成了字符串?dāng)?shù)組,之后解析成數(shù)字,再之后統(tǒng)計(jì)了奇數(shù)的數(shù)量。
Node回調(diào)風(fēng)格在此處獲得了充分應(yīng)用。該回調(diào)的第一個(gè)參數(shù)是可為空的err參數(shù),我們將該回調(diào)作為最后一個(gè)參數(shù)傳遞給宿主函數(shù)。由于用戶的閱讀習(xí)慣問題,因此你應(yīng)該在你的函數(shù)一直按這種形式書寫。讓回調(diào)作為宿主函數(shù)的的最后一個(gè)參數(shù),將error對(duì)象作為回調(diào)函數(shù)的第一個(gè)參數(shù)。
新版JavaScript對(duì)于回調(diào)的替代形式
在新版JavaScript當(dāng)中,我們有了promise對(duì)象。在異步APIs中Promises可以作為異步回調(diào)的一種替代形式。promise對(duì)象允許我們分別處理success和error的cases而非在同一處同時(shí)傳遞callback和error兩個(gè)參數(shù),并且promise也允許我們串聯(lián)多重異步調(diào)用而不是進(jìn)行嵌套。
如果readFileAsArray函數(shù)支持promises,我們可以做如下應(yīng)用:
readFileAsArray('./numbers.txt')
.then(lines => {
const numbers = lines.map(Number);
const oddNumbers = numbers.filter(n => n%2 === 1);
console.log('Odd numbers count:', oddNumbers.length);
})
.catch(console.error);
我們沒有傳遞callback函數(shù),而是調(diào)用了.then函數(shù)作為宿主函數(shù)的的返回值。這個(gè).then函數(shù)通常能讓我們達(dá)到跟利用帶有callback函數(shù)的代碼同樣的效果,而且我們也能夠像之前一樣在其上做處理。為了處理errors,我們?cè)谀┪蔡砑恿?catch代碼塊,當(dāng)發(fā)生錯(cuò)誤時(shí)我們利用.catch代碼塊進(jìn)行處理。
由于新Promise對(duì)象的存在,讓宿主函數(shù)在新版JavaScript支持promise接口變得更加容易。修改如下的readFileAsArray函數(shù)讓其支持promise接口,以及支持之前已經(jīng)支持的callback接口。
const readFileAsArray = function(file, cb = () => {}) {
return new Promise((resolve, reject) => {
fs.readFile(file, function(err, data) {
if (err) {
reject(err);
return cb(err);
}
const lines = data.toString().trim().split('\n');
resolve(lines);
cb(null, lines);
});
});
};
因此我們讓函數(shù)返回一個(gè)Promise對(duì)象,這個(gè)Promise對(duì)象包含著fs.readFile異步回調(diào)。該promise對(duì)象對(duì)外暴露兩個(gè)參數(shù)(一個(gè)resolve函數(shù)以及一個(gè)reject函數(shù))。
我們總是會(huì)運(yùn)用promise的reject函數(shù)執(zhí)行回調(diào)來處理error,同時(shí)也總是利用resolve函數(shù)執(zhí)行回調(diào)來處理data。
另外我們?cè)谶@個(gè)例子當(dāng)中需要為回調(diào)參數(shù)設(shè)置一個(gè)默認(rèn)值,因?yàn)檫@段代碼有可能會(huì)被用于promise接口。我們可以使用一種簡單的空函數(shù)作為默認(rèn)值,如:() => {}。
以async/await方式執(zhí)行promises
當(dāng)需要循環(huán)嵌套異步函數(shù)時(shí),添加promise接口會(huì)讓你的代碼更易于維護(hù)。回調(diào)則會(huì)讓情況變得復(fù)雜。
Promises稍稍改善了一些這種狀況,而函數(shù)生成器則帶來更多的優(yōu)化。即是說處理異步代碼更新近的替代方式是使用async函數(shù),它能讓我們像是以一種同步的方式處理異步的代碼,這回讓代碼更具有可讀性。
我們通過async/await方式執(zhí)行readFileAsArray函數(shù),代碼如下:
async function countOdd () {
try {
const lines = await readFileAsArray('./numbers');
const numbers = lines.map(Number);
const oddCount = numbers.filter(n => n%2 === 1).length;
console.log('Odd numbers count:', oddCount);
} catch(err) {
console.error(err);
}
}
countOdd();
我們先創(chuàng)建了一個(gè)異步函數(shù),就是在普通函數(shù)的function之前加上關(guān)鍵字async。在這個(gè)異步函數(shù)當(dāng)中,我們調(diào)用readFileAsArray函數(shù)(假設(shè)它返回了lines變量),為了讓這種方式生效,我們使用關(guān)鍵字await。之后,我們繼續(xù)執(zhí)行代碼就如同對(duì)readFileAsArray進(jìn)行同步調(diào)用一樣。
為了讓代碼順利運(yùn)行,我們執(zhí)行異步函數(shù)。這讓代碼變得非常簡潔且易于閱讀。而為了能夠處理errors,我們需要將異步調(diào)用包裹在try/catch語句中。
通過這種async/await特性,我們就不必使用其它特殊的API(例如.then和.catch)。我們僅僅需要特別標(biāo)記一下函數(shù)就可以使用純粹的JavaScript進(jìn)行編程。
我們可以在任何支持promise接口的函數(shù)當(dāng)中使用async/await特性,但是我們不能將其用于回調(diào)風(fēng)格的異步函數(shù)之中(例如:setTimeout)。
EventEmitter模塊
EventEmitter是一種支持Node當(dāng)中對(duì)象間通信的模塊。EventEmitter是Node異步事件驅(qū)動(dòng)架構(gòu)的核心。很多Node內(nèi)置模塊都是繼承自EventEmitter。
概念很簡單:emitter對(duì)象發(fā)出已經(jīng)命名好的事件,該事件會(huì)觸發(fā)之前注冊(cè)好的監(jiān)聽器。因此,一個(gè)emitter對(duì)象通常有兩個(gè)主要特性:
- 發(fā)出命名事件
- 注冊(cè)和注銷監(jiān)聽函數(shù)
為了理解EventEmitter,我們創(chuàng)建一個(gè)繼承自EventEmitter的類(class)
class MyEmitter extends EventEmitter {
}
Emitter對(duì)象是我們通過MyEmitter類(繼承自類EventEmitter)實(shí)例化生成的。
const myEmitter = new MyEmitter();
在這些emitter對(duì)象生命周期的任何階段,我們都可以通過emit函數(shù)發(fā)射我們想要發(fā)射的任何命名事件。
myEmitter.emit('something-happened');
單次事件的觸發(fā)表明已經(jīng)滿足某些條件。該條件在觸發(fā)對(duì)象中通常是一種狀態(tài)變化。
我們可以通過on方法添加監(jiān)聽函數(shù),每當(dāng)發(fā)射器對(duì)象觸發(fā)相關(guān)聯(lián)的命名事件時(shí)這些監(jiān)聽函數(shù)都會(huì)執(zhí)行。
事件!==異步
我們來看一個(gè)例子:
const EventEmitter = require('events');
class WithLog extends EventEmitter {
execute(taskFunc) {
console.log('Before executing');
this.emit('begin');
taskFunc();
this.emit('end');
console.log('After executing');
}
}
const withLog = new WithLog();
withLog.on('begin', () => console.log('About to execute'));
withLog.on('end', () => console.log('Done with execute'));
withLog.execute(() => console.log('*** Executing task ***'));
WithLog類是一個(gè)事件發(fā)射器。它定義了一個(gè)函數(shù)執(zhí)行的實(shí)例。該執(zhí)行函數(shù)接受一個(gè)參數(shù)(一個(gè)任務(wù)函數(shù))并用日志聲明包裹該執(zhí)行函數(shù)。這些日志聲明會(huì)在函數(shù)執(zhí)行前后觸發(fā)。
為了查看函數(shù)的執(zhí)行順序,我們?cè)趦蓚€(gè)命名事件上添加了監(jiān)聽器,最后會(huì)執(zhí)行一個(gè)樣本任務(wù)以啟動(dòng)其它函數(shù)。
如下是函數(shù)的輸出結(jié)果:
Before executing
About to execute
*** Executing task ***
Done with execute
After executing
對(duì)于這些輸出結(jié)果我希望你注意到它們都是同步執(zhí)行的。這段代碼當(dāng)中沒有異步操作。
- 首先輸出了"Before executing"
- 以begin命名的事件輸出了"About to execute"
- 實(shí)際執(zhí)行行(hang)之后輸出了"Executing task"
- 以end命名的事件輸出了"Done with execute"
- 最后我們得到了"After executing"
如同plain-old回調(diào)一樣,事件跟代碼是同步還是異步執(zhí)行沒有什么關(guān)聯(lián)。
這點(diǎn)很重要,因?yàn)槿绻覀儌魅氘惒絫askFunc函數(shù)去執(zhí)行的話,事件的觸發(fā)將會(huì)變得不夠精準(zhǔn)。
我們可以用帶有setImmediate的調(diào)用來模擬上面的例子:
// ...
withLog.execute(() => {
setImmediate(() => {
console.log('*** Executing task ***')
});
});
輸出就會(huì)變成像下面這樣:
Before executing
About to execute
Done with execute
After executing
*** Executing task ***
這是錯(cuò)誤的。異步調(diào)用(其調(diào)用了"Done with execute"和"After executing")之后的那幾行已經(jīng)不再準(zhǔn)確。
為了在一個(gè)異步函數(shù)執(zhí)行完成之后觸發(fā)事件,我們需要結(jié)合基于事件通信的回調(diào)機(jī)制。下面的例子會(huì)進(jìn)行說明。
使用事件而非回調(diào)的好處在于我們可以通過定義多個(gè)監(jiān)聽器對(duì)同一信號(hào)響應(yīng)多次。用回調(diào)實(shí)現(xiàn)相同功能的話,我們需要在單個(gè)回調(diào)當(dāng)中書寫更多的邏輯。事件是一種很好的實(shí)現(xiàn)方式,它讓應(yīng)用程序能夠通過多個(gè)外部插件在其核心之上構(gòu)建功能。你可以將它們當(dāng)做hook points,它們會(huì)為狀態(tài)變化作特定的記錄。
異步事件
我們現(xiàn)在將這個(gè)同步的簡單例子改寫成更加實(shí)用的異步代碼。
const fs = require('fs');
const EventEmitter = require('events');
class WithTime extends EventEmitter {
execute(asyncFunc, ...args) {
this.emit('begin');
console.time('execute');
asyncFunc(...args, (err, data) => {
if (err) {
return this.emit('error', err);
}
this.emit('data', data);
console.timeEnd('execute');
this.emit('end');
});
}
}
const withTime = new WithTime();
withTime.on('begin', () => console.log('About to execute'));
withTime.on('end', () => console.log('Done with execute'));
withTime.execute(fs.readFile, __filename);
WithTime類執(zhí)行了asyncFunc并通過使用console.time以及console.timeEnd記錄了asyncFunc的執(zhí)行時(shí)間。在程序執(zhí)行前后,它觸發(fā)了事件執(zhí)行的正確順序。同時(shí)利用error/data事件去處理異步調(diào)用當(dāng)中的常見信號(hào)。
我們傳入fs.readFile函數(shù)(它是異步函數(shù))來測試withTime發(fā)射器。而非用回調(diào)來處理文件,如此我便能夠監(jiān)聽數(shù)據(jù)對(duì)象了。
當(dāng)執(zhí)行這段代碼時(shí),我們得到事件的正確執(zhí)行順序。不出所料,我們獲得了指定代碼的執(zhí)行時(shí)間,這很有用處:
About to execute
execute: 4.507ms
Done with execute
我們?cè)鯓咏Y(jié)合帶有事件監(jiān)聽器的回調(diào)來實(shí)現(xiàn)呢?如果asynFunc函數(shù)也支持promises的話,我們可以使用async/await特性來實(shí)現(xiàn)相同的功能:
class WithTime extends EventEmitter {
async execute(asyncFunc, ...args) {
this.emit('begin');
try {
console.time('execute');
const data = await asyncFunc(...args);
this.emit('data', data);
console.timeEnd('execute');
this.emit('end');
} catch(err) {
this.emit('error', err);
}
}
}
我不太清楚你的情況,但是相較于基于回調(diào)和帶有.then/.catch的代碼段我覺得以上代碼更容易理解。async/await特性讓我們更接近于JavaScript語言本身,我認(rèn)為這是一個(gè)重大進(jìn)展。
事件參數(shù)和錯(cuò)誤
在前面的例子當(dāng)中,有兩個(gè)事件都是由額外的參數(shù)來觸發(fā)。
error事件是通過一個(gè)error對(duì)象觸發(fā)。
this.emit('error', err);
data事件則是由一個(gè)data對(duì)象觸發(fā)。
this.emit('data', data);
我們可以根據(jù)需要在命名事件之后使用任意數(shù)量的參數(shù),所有參數(shù)在我們?yōu)檫@些命名事件注冊(cè)的監(jiān)聽函數(shù)當(dāng)中都可以使用。
例如,為了處理data事件,我們注冊(cè)的監(jiān)聽事件將能夠獲得我們傳遞給被發(fā)射事件的數(shù)據(jù)參數(shù),而其就是asyncFunc函數(shù)暴露的數(shù)據(jù)對(duì)象。
withTime.on('data', (data) => {
// do something with data
});
error事件通常是一種特殊情況。在基于回調(diào)的例子當(dāng)中,如果我們不使用監(jiān)聽器處理error事件的話,node進(jìn)程實(shí)際上會(huì)退出。
為了說明這種情況,我們用一個(gè)惡性參數(shù)對(duì)執(zhí)行方法做再一次調(diào)用:
class WithTime extends EventEmitter {
execute(asyncFunc, ...args) {
console.time('execute');
asyncFunc(...args, (err, data) => {
if (err) {
return this.emit('error', err); // Not Handled
}
console.timeEnd('execute');
});
}
}
const withTime = new WithTime();
withTime.execute(fs.readFile, ''); // BAD CALL
withTime.execute(fs.readFile, __filename);
如上代碼的第一次執(zhí)行會(huì)引起錯(cuò)誤。node進(jìn)程將會(huì)崩潰并退出:
events.js:163
throw er; // Unhandled 'error' event
^
Error: ENOENT: no such file or directory, open ''
第二次執(zhí)行會(huì)受到崩潰影響根本不會(huì)繼續(xù)往下執(zhí)行。
如果為該特殊error事件注冊(cè)監(jiān)聽器的話,node進(jìn)程的行為將會(huì)發(fā)生變化。例如:
withTime.on('error', (err) => {
// do something with err, for example log it somewhere
console.log(err)
});
如果執(zhí)行以上代碼,第一次執(zhí)行的錯(cuò)誤會(huì)被播報(bào),但是node進(jìn)程不會(huì)崩潰和退出。其它的執(zhí)行調(diào)用會(huì)正常結(jié)束:
{ Error: ENOENT: no such file or directory, open '' errno: -2, code: 'ENOENT', syscall: 'open', path: '' }
execute: 4.276ms
現(xiàn)在基于promise的函數(shù)會(huì)有不同的行為并且只是輸出警告,但是最終會(huì)發(fā)生變化:
UnhandledPromiseRejectionWarning: Unhandled promise rejection (rejection id: 1): Error: ENOENT: no such file or directory, open ''
DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.
處理已觸發(fā)錯(cuò)誤當(dāng)中異常的另一種方式是為全局uncaughtException進(jìn)程事件注冊(cè)一個(gè)監(jiān)聽器。然而,全部捕捉errors是一種糟糕的想法。
對(duì)uncaughtException的通常建議是避免使用它,但是如果非用不可的話(播報(bào)發(fā)生的情況或者直接清除),你應(yīng)該直接讓進(jìn)程退出。
process.on('uncaughtException', (err) => {
// something went unhandled.
// Do any cleanup and exit anyway!
console.error(err); // don't do just that.
// FORCE exit the process too.
process.exit(1);
});
然而,想象一下多個(gè)error事件在同一時(shí)間發(fā)生。這意味著上面的uncaughtException監(jiān)聽器會(huì)啟動(dòng)多次,對(duì)于cleanup代碼這會(huì)是個(gè)問題。一個(gè)典型的例子就是有多次調(diào)用用于數(shù)據(jù)庫關(guān)閉操作。
EventEmitter模塊對(duì)外暴露一個(gè)一次性的方法。該方法只會(huì)啟動(dòng)一次監(jiān)聽器,不會(huì)每次都進(jìn)行響應(yīng)。因此,這是一個(gè)使用uncaughtException的實(shí)際用例,因?yàn)橥ㄟ^第一個(gè)未捕捉的異常我們將會(huì)進(jìn)行清理操作,而且無論如何我們都將退出進(jìn)程。
監(jiān)聽器的順序
如果我們?yōu)橥皇录?cè)了多個(gè)監(jiān)聽器,這些監(jiān)聽器會(huì)按照順序執(zhí)行。注冊(cè)的第一個(gè)監(jiān)聽器將會(huì)第一個(gè)執(zhí)行。
// ?????
withTime.on('data', (data) => {
console.log(`Length: ${data.length}`);
});
// ?????
withTime.on('data', (data) => {
console.log(`Characters: ${data.toString().length}`);
});
withTime.execute(fs.readFile, __filename);
上方代碼中帶有"Length"的那一行會(huì)先于帶有"Characters"的那一行執(zhí)行,因?yàn)檫@是我們定義監(jiān)聽器的順序。
如果你需要定義一個(gè)新監(jiān)聽器但是需要該監(jiān)聽器第一個(gè)執(zhí)行,你可以使用prependListener方法:
// ?????
withTime.on('data', (data) => {
console.log(`Length: ${data.length}`);
});
// ?????
withTime.prependListener('data', (data) => {
console.log(`Characters: ${data.toString().length}`);
});
withTime.execute(fs.readFile, __filename);
上方代碼會(huì)先執(zhí)行帶有"Character"的那一行。
最后,如果需要移除一個(gè)監(jiān)聽器,你可以使用removeListener方法。