29. Node.js 流

圖片來源網(wǎng)絡(luò), 侵刪

什么是流

流是為 Node.js 應(yīng)用程序提供動(dòng)力的基本概念之一。

它們是一種以高效的方式處理讀/寫文件、網(wǎng)絡(luò)通信、或任何類型的端到端的信息交換。

流不是 Node.js 特有的概念。 它們是幾十年前在 Unix 操作系統(tǒng)中引入的,程序可以通過管道運(yùn)算符(|)對(duì)流進(jìn)行相互交互。

例如,在傳統(tǒng)的方式中,當(dāng)告訴程序讀取文件時(shí),這會(huì)將文件從頭到尾讀入內(nèi)存,然后進(jìn)行處理。

使用流,則可以逐個(gè)片段地讀取并處理(而無需全部保存在內(nèi)存中)。

Node.js 的 stream 模塊 提供了構(gòu)建所有流 API 的基礎(chǔ)。 所有的流都是 EventEmitter 的實(shí)例。

為什么是流

相對(duì)于使用其他的數(shù)據(jù)處理方法,流基本上提供了兩個(gè)主要優(yōu)點(diǎn):

  • 內(nèi)存效率: 無需加載大量的數(shù)據(jù)到內(nèi)存中即可進(jìn)行處理。
  • 時(shí)間效率: 當(dāng)獲得數(shù)據(jù)之后即可立即開始處理數(shù)據(jù),這樣所需的時(shí)間更少,而不必等到整個(gè)數(shù)據(jù)有效負(fù)載可用才開始。

流的示例

一個(gè)典型的例子是從磁盤讀取文件。

使用 Node.js 的 fs 模塊,可以讀取文件,并在與 HTTP 服務(wù)器建立新連接時(shí)通過 HTTP 提供文件:

const http = require('http')
const fs = require('fs')

const server = http.createServer(function(req, res) {
  fs.readFile(__dirname + '/data.txt', (err, data) => {
    res.end(data)
  })
})
server.listen(3000)

readFile() 讀取文件的全部內(nèi)容,并在完成時(shí)調(diào)用回調(diào)函數(shù)。

回調(diào)中的 res.end(data) 會(huì)返回文件的內(nèi)容給 HTTP 客戶端。

如果文件很大,則該操作會(huì)花費(fèi)較多的時(shí)間。 以下是使用流編寫的相同內(nèi)容:

const http = require('http')
const fs = require('fs')

const server = http.createServer((req, res) => {
  const stream = fs.createReadStream(__dirname + '/data.txt')
  stream.pipe(res)
})
server.listen(3000)

當(dāng)要發(fā)送的數(shù)據(jù)塊已獲得時(shí)就立即開始將其流式傳輸?shù)?HTTP 客戶端,而不是等待直到文件被完全讀取。

pipe()

上面的示例使用了 stream.pipe(res) 這行代碼:在文件流上調(diào)用 pipe() 方法。

該代碼的作用是什么? 它獲取來源流,并將其通過管道傳輸?shù)侥繕?biāo)流。

在來源流上調(diào)用它,在該示例中,文件流通過管道傳輸?shù)?HTTP 響應(yīng)。

pipe() 方法的返回值是目標(biāo)流,這是非常方便的事情,它使得可以鏈接多個(gè) pipe() 調(diào)用,如下所示:

src.pipe(dest1).pipe(dest2)

此構(gòu)造相對(duì)于:

src.pipe(dest1)
dest1.pipe(dest2)

流驅(qū)動(dòng)的 Node.js API

由于它們的優(yōu)點(diǎn),許多 Node.js 核心模塊提供了原生的流處理功能,最值得注意的有:

  • process.stdin 返回連接到 stdin 的流。
  • process.stdout 返回連接到 stdout 的流。
  • process.stderr 返回連接到 stderr 的流。
  • fs.createReadStream() 創(chuàng)建文件的可讀流。
  • fs.createWriteStream() 創(chuàng)建到文件的可寫流。
  • net.connect() 啟動(dòng)基于流的連接。
  • http.request() 返回 http.ClientRequest 類的實(shí)例,該實(shí)例是可寫流。
  • zlib.createGzip() 使用 gzip(壓縮算法)將數(shù)據(jù)壓縮到流中。
  • zlib.createGunzip() 解壓縮 gzip 流。
  • zlib.createDeflate() 使用 deflate(壓縮算法)將數(shù)據(jù)壓縮到流中。
  • zlib.createInflate() 解壓縮 deflate 流。

不同類型的流

流分為四類:

  • Readable: 可以通過管道讀取、但不能通過管道寫入的流(可以接收數(shù)據(jù),但不能向其發(fā)送數(shù)據(jù))。 當(dāng)推送數(shù)據(jù)到可讀流中時(shí),會(huì)對(duì)其進(jìn)行緩沖,直到使用者開始讀取數(shù)據(jù)為止。
  • Writable: 可以通過管道寫入、但不能通過管道讀取的流(可以發(fā)送數(shù)據(jù),但不能從中接收數(shù)據(jù))。
  • Duplex: 可以通過管道寫入和讀取的流,基本上相對(duì)于是可讀流和可寫流的組合。
  • Transform: 類似于雙工流、但其輸出是其輸入的轉(zhuǎn)換的轉(zhuǎn)換流。

如何創(chuàng)建可讀流

stream 模塊獲取可讀流,對(duì)其進(jìn)行初始化并實(shí)現(xiàn) readable._read() 方法。

首先創(chuàng)建流對(duì)象:

const Stream = require('stream')
const readableStream = new Stream.Readable()

然后實(shí)現(xiàn) _read:

readableStream._read = () => {}

也可以使用 read 選項(xiàng)實(shí)現(xiàn) _read:

const readableStream = new Stream.Readable({
  read() {}
})

現(xiàn)在,流已初始化,可以向其發(fā)送數(shù)據(jù)了:

readableStream.push('hi!')
readableStream.push('ho!')

如何創(chuàng)建可寫流

若要?jiǎng)?chuàng)建可寫流,需要繼承基本的 Writable 對(duì)象,并實(shí)現(xiàn)其 _write() 方法。

首先創(chuàng)建流對(duì)象:

const Stream = require('stream')
const writableStream = new Stream.Writable()

然后實(shí)現(xiàn) _write:

writableStream._write = (chunk, encoding, next) => {
  console.log(chunk.toString())
  next()
}

現(xiàn)在,可以通過以下方式傳輸可讀流:

process.stdin.pipe(writableStream)

如何從可讀流中獲取數(shù)據(jù)

如何從可讀流中讀取數(shù)據(jù)? 使用可寫流:

const Stream = require('stream')

const readableStream = new Stream.Readable({
  read() {}
})
const writableStream = new Stream.Writable()

writableStream._write = (chunk, encoding, next) => {
  console.log(chunk.toString())
  next()
}

readableStream.pipe(writableStream)

readableStream.push('hi!')
readableStream.push('ho!')

也可以使用 readable 事件直接地消費(fèi)可讀流:

readableStream.on('readable', () => {
  console.log(readableStream.read())
})

如何發(fā)送數(shù)據(jù)到可寫流

使用流的 write() 方法:

writableStream.write('hey!\n')

使用信號(hào)通知已結(jié)束寫入的可寫流

使用 end() 方法:

const Stream = require('stream')

const readableStream = new Stream.Readable({
  read() {}
})
const writableStream = new Stream.Writable()

writableStream._write = (chunk, encoding, next) => {
  console.log(chunk.toString())
  next()
}

readableStream.pipe(writableStream)

readableStream.push('hi!')
readableStream.push('ho!')

writableStream.end()

文章來源 node中文官方 http://nodejs.cn/

更多知識(shí)點(diǎn) 請(qǐng)關(guān)注:筆墨是小舟

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容