
什么是流
流是為 Node.js 應(yīng)用程序提供動(dòng)力的基本概念之一。
它們是一種以高效的方式處理讀/寫文件、網(wǎng)絡(luò)通信、或任何類型的端到端的信息交換。
流不是 Node.js 特有的概念。 它們是幾十年前在 Unix 操作系統(tǒng)中引入的,程序可以通過管道運(yùn)算符(|)對(duì)流進(jìn)行相互交互。
例如,在傳統(tǒng)的方式中,當(dāng)告訴程序讀取文件時(shí),這會(huì)將文件從頭到尾讀入內(nèi)存,然后進(jìn)行處理。
使用流,則可以逐個(gè)片段地讀取并處理(而無需全部保存在內(nèi)存中)。
Node.js 的 stream 模塊 提供了構(gòu)建所有流 API 的基礎(chǔ)。 所有的流都是 EventEmitter 的實(shí)例。
為什么是流
相對(duì)于使用其他的數(shù)據(jù)處理方法,流基本上提供了兩個(gè)主要優(yōu)點(diǎn):
- 內(nèi)存效率: 無需加載大量的數(shù)據(jù)到內(nèi)存中即可進(jìn)行處理。
- 時(shí)間效率: 當(dāng)獲得數(shù)據(jù)之后即可立即開始處理數(shù)據(jù),這樣所需的時(shí)間更少,而不必等到整個(gè)數(shù)據(jù)有效負(fù)載可用才開始。
流的示例
一個(gè)典型的例子是從磁盤讀取文件。
使用 Node.js 的 fs 模塊,可以讀取文件,并在與 HTTP 服務(wù)器建立新連接時(shí)通過 HTTP 提供文件:
const http = require('http')
const fs = require('fs')
const server = http.createServer(function(req, res) {
fs.readFile(__dirname + '/data.txt', (err, data) => {
res.end(data)
})
})
server.listen(3000)
readFile() 讀取文件的全部內(nèi)容,并在完成時(shí)調(diào)用回調(diào)函數(shù)。
回調(diào)中的 res.end(data) 會(huì)返回文件的內(nèi)容給 HTTP 客戶端。
如果文件很大,則該操作會(huì)花費(fèi)較多的時(shí)間。 以下是使用流編寫的相同內(nèi)容:
const http = require('http')
const fs = require('fs')
const server = http.createServer((req, res) => {
const stream = fs.createReadStream(__dirname + '/data.txt')
stream.pipe(res)
})
server.listen(3000)
當(dāng)要發(fā)送的數(shù)據(jù)塊已獲得時(shí)就立即開始將其流式傳輸?shù)?HTTP 客戶端,而不是等待直到文件被完全讀取。
pipe()
上面的示例使用了 stream.pipe(res) 這行代碼:在文件流上調(diào)用 pipe() 方法。
該代碼的作用是什么? 它獲取來源流,并將其通過管道傳輸?shù)侥繕?biāo)流。
在來源流上調(diào)用它,在該示例中,文件流通過管道傳輸?shù)?HTTP 響應(yīng)。
pipe() 方法的返回值是目標(biāo)流,這是非常方便的事情,它使得可以鏈接多個(gè) pipe() 調(diào)用,如下所示:
src.pipe(dest1).pipe(dest2)
此構(gòu)造相對(duì)于:
src.pipe(dest1)
dest1.pipe(dest2)
流驅(qū)動(dòng)的 Node.js API
由于它們的優(yōu)點(diǎn),許多 Node.js 核心模塊提供了原生的流處理功能,最值得注意的有:
- process.stdin 返回連接到 stdin 的流。
- process.stdout 返回連接到 stdout 的流。
- process.stderr 返回連接到 stderr 的流。
- fs.createReadStream() 創(chuàng)建文件的可讀流。
- fs.createWriteStream() 創(chuàng)建到文件的可寫流。
- net.connect() 啟動(dòng)基于流的連接。
- http.request() 返回 http.ClientRequest 類的實(shí)例,該實(shí)例是可寫流。
- zlib.createGzip() 使用 gzip(壓縮算法)將數(shù)據(jù)壓縮到流中。
- zlib.createGunzip() 解壓縮 gzip 流。
- zlib.createDeflate() 使用 deflate(壓縮算法)將數(shù)據(jù)壓縮到流中。
- zlib.createInflate() 解壓縮 deflate 流。
不同類型的流
流分為四類:
- Readable: 可以通過管道讀取、但不能通過管道寫入的流(可以接收數(shù)據(jù),但不能向其發(fā)送數(shù)據(jù))。 當(dāng)推送數(shù)據(jù)到可讀流中時(shí),會(huì)對(duì)其進(jìn)行緩沖,直到使用者開始讀取數(shù)據(jù)為止。
- Writable: 可以通過管道寫入、但不能通過管道讀取的流(可以發(fā)送數(shù)據(jù),但不能從中接收數(shù)據(jù))。
- Duplex: 可以通過管道寫入和讀取的流,基本上相對(duì)于是可讀流和可寫流的組合。
- Transform: 類似于雙工流、但其輸出是其輸入的轉(zhuǎn)換的轉(zhuǎn)換流。
如何創(chuàng)建可讀流
從 stream 模塊獲取可讀流,對(duì)其進(jìn)行初始化并實(shí)現(xiàn) readable._read() 方法。
首先創(chuàng)建流對(duì)象:
const Stream = require('stream')
const readableStream = new Stream.Readable()
然后實(shí)現(xiàn) _read:
readableStream._read = () => {}
也可以使用 read 選項(xiàng)實(shí)現(xiàn) _read:
const readableStream = new Stream.Readable({
read() {}
})
現(xiàn)在,流已初始化,可以向其發(fā)送數(shù)據(jù)了:
readableStream.push('hi!')
readableStream.push('ho!')
如何創(chuàng)建可寫流
若要?jiǎng)?chuàng)建可寫流,需要繼承基本的 Writable 對(duì)象,并實(shí)現(xiàn)其 _write() 方法。
首先創(chuàng)建流對(duì)象:
const Stream = require('stream')
const writableStream = new Stream.Writable()
然后實(shí)現(xiàn) _write:
writableStream._write = (chunk, encoding, next) => {
console.log(chunk.toString())
next()
}
現(xiàn)在,可以通過以下方式傳輸可讀流:
process.stdin.pipe(writableStream)
如何從可讀流中獲取數(shù)據(jù)
如何從可讀流中讀取數(shù)據(jù)? 使用可寫流:
const Stream = require('stream')
const readableStream = new Stream.Readable({
read() {}
})
const writableStream = new Stream.Writable()
writableStream._write = (chunk, encoding, next) => {
console.log(chunk.toString())
next()
}
readableStream.pipe(writableStream)
readableStream.push('hi!')
readableStream.push('ho!')
也可以使用 readable 事件直接地消費(fèi)可讀流:
readableStream.on('readable', () => {
console.log(readableStream.read())
})
如何發(fā)送數(shù)據(jù)到可寫流
使用流的 write() 方法:
writableStream.write('hey!\n')
使用信號(hào)通知已結(jié)束寫入的可寫流
使用 end() 方法:
const Stream = require('stream')
const readableStream = new Stream.Readable({
read() {}
})
const writableStream = new Stream.Writable()
writableStream._write = (chunk, encoding, next) => {
console.log(chunk.toString())
next()
}
readableStream.pipe(writableStream)
readableStream.push('hi!')
readableStream.push('ho!')
writableStream.end()
文章來源 node中文官方 http://nodejs.cn/
更多知識(shí)點(diǎn) 請(qǐng)關(guān)注:筆墨是小舟