前面的话
当内存中无法一次装下需要处理的数据时,或者一边读取一边处理更加高效时,我们就需要用到数据流。NodeJS中通过各种Stream来提供对数据流的操作。本文将详细说明NodeJS中的流stream
概述
流(stream)在Nodejs中是处理流数据的抽象接口。stream模块提供了基础的API 。使用这些API可以很容易地来构建实现流接口的对象。Nodejs提供了多种流对象。 例如,HTTP请求和process.stdout都是流的实例
流可以是可读的、可写的,或是可读写的。所有的流都是 EventEmitter 的实例。
尽管所有的 Node.js 用户都应该理解流的工作方式,这点很重要, 但是 stream 模块本身只对于那些需要创建新的流的实例的开发者最有用处。 对于主要是消费流的开发者来说,他们很少(如果有的话)需要直接使用 stream 模块
【类型】
Node.js 中有四种基本的流类型:
Readable - 可读的流 (例如 fs.createReadStream()). Writable - 可写的流 (例如 fs.createWriteStream()). Duplex - 可读写的流 (例如 net.Socket). Transform - 在读写过程中可以修改和变换数据的 Duplex 流 (例如 zlib.createDeflate()).
所有使用 Node.js API 创建的流对象都只能操作 strings 和 Buffer(或 Uint8Array) 对象。但是,通过一些第三方流的实现,依然能够处理其它类型的 JavaScript 值 (除了 null,它在流处理中有特殊意义)。 这些流被认为是工作在 “对象模式”(object mode)
在创建流的实例时,可以通过 objectMode 选项使流的实例切换到对象模式。试图将已经存在的流切换到对象模式是不安全的
【缓冲】
Writable和Readable流都会将数据存储到内部的缓存(buffer)中。这些缓存可以通过相应的writable._writableState.getBuffer()或readable._readableState.buffer来获取
缓存的大小取决于传递给流构造函数的highWaterMark选项。 对于普通的流,highWaterMark选项指定了总共的字节数。对于工作在对象模式的流,highWaterMark指定了对象的总数
当可读流的实现调用stream.push(chunk)方法时,数据被放到缓存中。如果流的消费者没有调用stream.read()方法, 这些数据会始终存在于内部队列中,直到被消费
当内部可读缓存的大小达到highWaterMark指定的阈值时,流会暂停从底层资源读取数据,直到当前缓存的数据被消费(也就是说,流会在内部停止调用readable._read()来填充可读缓存)
可写流通过反复调用writable.write(chunk)方法将数据放到缓存。当内部可写缓存的总大小小于highWaterMark指定的阈值时,调用writable.write()将返true。 一旦内部缓存的大小达到或超过highWaterMark,调用writable.write()将返回false
stream API 的关键目标, 尤其对于 stream.pipe() 方法, 就是限制缓存数据大小,以达到可接受的程度。这样,对于读写速度不匹配的源头和目标,就不会超出可用的内存大小。
Duplex和Transform都是可读写的。 在内部,它们都维护了两 相互独立的缓存用于读和写。 在维持了合理高效的数据流的同时,也使得对于读和写可以独立进行而互不影响。 例如, net.Socket就是Duplex的实例,它的可读端可以消费从套接字(socket)中接收的数据, 可写端则可以将数据写入到套接字。 由于数据写入到套接字中的速度可能比从套接字接收数据的速度快或者慢, 在读写两端使用独立缓存,并进行独立操作就显得很重要了
几乎所有的 Node.js 应用,不管多么简单,都在某种程度上使用了流。 下面是在 Node.js 应用中使用流实现的一个简单的 HTTP 服务器
var http = require('http');var server = http.createServer((req, res) => { // req 是一个 Readable Stream;res 是一个 Writable Stream var body = ''; req.setEncoding('utf8'); req.on('data', (chunk) => { body += chunk; }); req.on('end', () => { try { var data = JSON.parse(body); res.write(typeof data); res.end(); } catch (er) { res.statusCode = 400; return res.end(`error: ${er.message}`); } }); }); server.listen(1337);
Writable 流 (比如例子中的 res) 暴露了一些方法,比如 write() 和 end() 。这些方法可以将数据写入到流中。当流中的数据可以读取时,Readable 流使用 EventEmitter API 来通知应用。 这些数据可以使用多种方法从流中读取。Writable 和 Readable 流都使用了 EventEmitter API ,通过多种方式, 与流的当前状态进行交互。Duplex 和 Transform 都是同时满足 Writable 和 Readable 。对于只是简单写入数据到流和从流中消费数据的应用来说, 不要求直接实现流接口,通常也不需要调用 require('stream')
可写流
可写流是对数据流向设备的抽象,用来消费上游流过来的数据,通过可写流程序可以把数据写入设备,常见的是本地磁盘文件或者 TCP、HTTP 等网络响应
process.stdin.pipe(process.stdout);
process.stdout是一个可写流,程序把可读流 process.stdin 传过来的数据写入的标准输出设备
Writable(可写流)包括:
HTTP requests, on the client HTTP responses, on the server fs write streams [zlib streams][zlib] crypto streams TCP sockets child process stdin process.stdout, process.stderr
[注意]上面的某些例子事实上是 Duplex 流,只是实现了 Writable 接口
所有 Writable 流都实现了 stream.Writable 类定义的接口。尽管特定的 Writable 流的实现可能略有差别, 所有的 Writable streams 都可以按一种基本模式进行使用
var myStream = getWritableStreamSomehow(); myStream.write('some data'); myStream.write('some more data'); myStream.end('done writing data');
【'close' 事件】
'close'事件将在流或其底层资源(比如一个文件)关闭后触发。'close'事件触发后,该流将不会再触发任何事件
[注意]不是所有可写流都会触发 'close' 事件
【'drain' 事件】
如果调用 stream.write(chunk) 方法返回 false,流将在适当的时机触发 'drain' 事件,这时才可以继续向流中写入数据
// 向可写流中写入数据一百万次。// 需要注意背压(back-pressure)function writeOneMillionTimes(writer, data, encoding, callback) { let i = 1000000; write(); function write() { let ok = true; do { i--; if (i === 0) { // 最后 一次 writer.write(data, encoding, callback); } else { // 检查是否可以继续写入。 // 这里不要传递 callback, 因为写入还没有结束! ok = writer.write(data, encoding); } } while (i > 0 && ok); if (i > 0) { // 这里提前停下了, // 'drain' 事件触发后才可以继续写入 writer.once('drain', write); } } }
【'error' 事件】
'error' 事件在写入数据出错或者使用管道出错时触发。事件发生时,回调函数仅会接收到一个 Error 参数
[注意]'error' 事件发生时,流并不会关闭
【'finish' 事件】
在调用了 stream.end() 方法,且缓冲区数据都已经传给底层系统(underlying system)之后, 'finish' 事件将被触发
const writer = getWritableStreamSomehow();for (let i = 0; i < 100; i++) { writer.write(`hello, #${i}!\n`); } writer.end('This is the end\n'); writer.on('finish', () => { console.error('All writes are now complete.'); });
【'pipe' 事件】
src <stream.Readable> 输出到目标可写流(writable)的源流(source stream)
在可读流(readable stream)上调用 stream.pipe() 方法,并在目标流向 (destinations) 中添加当前可写流 ( writable ) 时,将会在可写流上触发 'pipe' 事件
const writer = getWritableStreamSomehow(); const reader = getReadableStreamSomehow(); writer.on('pipe', (src) => { console.error('something is piping into the writer'); assert.equal(src, reader); }); reader.pipe(writer);
【'unpipe' 事件】
src <Readable Stream> unpiped 当前可写流的源流
在 Readable 上调用 stream.unpipe() 方法,从目标流向中移除当前 Writable 时,将会触发 'unpipe' 事件
const writer = getWritableStreamSomehow(); const reader = getReadableStreamSomehow(); writer.on('unpipe', (src) => { console.error('Something has stopped piping into the writer.'); assert.equal(src, reader); }); reader.pipe(writer); reader.unpipe(writer);
【writable.cork()】
调用 writable.cork() 方法将强制所有写入数据都内存中的缓冲区里。 直到调用 stream.uncork() 或 stream.end() 方法时,缓冲区里的数据才会被输出
在向流中写入大量小块数据(small chunks of data)时,内部缓冲区(internal buffer)可能失效,从而导致性能下降。writable.cork() 方法主要就是用来避免这种情况。 对于这种情况, 实现了 writable._writev() 方法的流可以对写入的数据进行缓冲,从而提高写入效率
【writable.end([chunk][, encoding][, callback])】
chunk <string> | <Buffer> | <Uint8Array> | <any>
chunk <string> | <Buffer> | <Uint8Array> | <any> 可选的,需要写入的数据。对于非对象模式下的流, chunk 必须是字符串、或 Buffer、或 Uint8Array。对于对象模式下的流, chunk 可以是任意的 JavaScript 值,除了 null。 encoding <string> 如果 chunk 是字符串,这里指定字符编码。 callback <Function> 可选的,流结束时的回调函数
调用 writable.end() 方法表明接下来没有数据要被写入 Writable。通过传入可选的 chunk 和 encoding 参数,可以在关闭流之前再写入一段数据。如果传入了可选的 callback 函数,它将作为 'finish' 事件的回调函数。
[注意]在调用了 stream.end() 方法之后,再调用 stream.write() 方法将会导致错误
// 写入 'hello, ' ,并用 'world!' 来结束写入const file = fs.createWriteStream('example.txt'); file.write('hello, '); file.end('world!');// 后面不允许再写入数据!
【writable.setDefaultEncoding(encoding)】
encoding <string> 新的默认编码 返回: this
writable.setDefaultEncoding() 用于为 Writable 设置 encoding
【writable.uncork()】
writable.uncork() 将输出在 stream.cork() 方法被调用之后缓冲在内存中的所有数据
如果使用 writable.cork() 和 writable.uncork() 来管理写入缓存,建议使用 process.nextTick() 来延迟调用 writable.uncork() 方法。通过这种方式,可以对单个 Node.js 事件循环中调用的所有 writable.write() 方法进行批处理
stream.cork(); stream.write('some '); stream.write('data '); process.nextTick(() => stream.uncork());
如果一个流多次调用了 writable.cork() 方法,那么也必须调用同样次数的 writable.uncork() 方法以输出缓冲区数据
stream.cork(); stream.write('some '); stream.cork(); stream.write('data '); process.nextTick(() => { stream.uncork(); // 之前的数据只有在 uncork() 被二次调用后才会输出 stream.uncork(); });
【writable.write(chunk[, encoding][, callback])】
chunk <string> | <Buffer> | <Uint8Array> | <any> 要写入的数据。可选的。 For streams not operating in object mode, chunk must be a string, Buffer or Uint8Array. For object mode streams, chunk may be any JavaScript value other than null. encoding <string> 如果 chunk 是字符串,这里指定字符编码 callback <Function> 缓冲数据输出时的回调函数 返回: <boolean> 如果流需要等待 'drain' 事件触发才能继续写入数据,这里将返回 false ; 否则返回 true。
writable.write() 方法向流中写入数据,并在数据处理完成后调用 callback 。如果有错误发生, callback不一定会接收到这个错误作为第一个参数。要确保可靠地检测到写入错误,应该监听 'error' 事件。
在确认了 chunk 后,如果内部缓冲区的大小小于创建流时设定的 highWaterMark 阈值,函数将返回 true 。 如果返回值为 false ,应该停止向流中写入数据,直到 'drain' 事件被触发。
当一个流不处在 drain 的状态, 对 write() 的调用会缓存数据块, 并且返回 false。 一旦所有当前所有缓存的数据块都排空了(被操作系统接受来进行输出), 那么 'drain' 事件就会被触发。 我们建议, 一旦 write() 返回 false, 在 'drain' 事件触发前, 不能写入任何数据块。 然而,当流不处在 'drain' 状态时, 调用 write() 是被允许的, Node.js 会缓存所有已经写入的数据块, 直到达到最大内存占用, 这时它会无条件中止。 甚至在它中止之前, 高内存占用将会导致差的垃圾回收器的性能和高的系统相对敏感性 (即使内存不在需要,也通常不会被释放回系统)。 如果远程的另一端没有读取数据, TCP sockets 可能永远也不会 drain , 所以写入到一个不会drain的socket可能会导致远程可利用的漏洞。
对于一个 Transform, 写入数据到一个不会drain的流尤其成问题, 因为 Transform 流默认被暂停, 直到它们被pipe或者被添加了 'data' 或 'readable' event handler。
如果将要被写入的数据可以根据需要生成或者取得,我们建议将逻辑封装为一个 Readable 流并且使用 stream.pipe()。 但是如果调用 write() 优先, 那么可以使用 'drain' 事件来防止回压并且避免内存问题:
function write(data, cb) { if (!stream.write(data)) { stream.once('drain', cb); } else { process.nextTick(cb); } }// Wait for cb to be called before doing any other write.write('hello', () => { console.log('write completed, do more writes now'); });
[注意]对象模式的写入流将忽略 encoding 参数
【writable.destroy([error])】
销毁流,并释放已传递的错误。在这之后,可写的流已经结束了。实现者不应该覆盖此方法,而是实现writable._destroy
可读流
可读流(Readable streams)是对提供数据的源头(source)的抽象,是生产数据用来供程序消费的流。我们常见的数据生产方式有读取磁盘文件、读取网络请求内容等
const rs = fs.createReadStream(filePath);
rs就是一个可读流,其生产数据的方式是读取磁盘的文件,我们常见的控制台process.stdin也是一个可读流
process.stdin.pipe(process.stdout);
通过简单的一句话可以把控制台的输入打印出来,process.stdin 生产数据的方式是读取用户在控制台的输入
可读流的例子包括:
HTTP responses, on the client HTTP requests, on the server fs read streams [zlib streams][zlib] crypto streams TCP sockets child process stdout and stderr process.stdin
[注意]所有的 Readable 都实现了 stream.Readable 类定义的接口
【两种模式】
可读流事实上工作在下面两种模式之一:flowing 和 paused 。
在flowing模式下,可读流自动从系统底层读取数据,并通过EventEmitter接口的事件尽快将数据提供给应用
在paused模式下,必须显式调用 stream.read() 方法来从流中读取数据片段。
所有初始工作模式为 paused 的 Readable 流,可以通过下面三种途径切换到 flowing 模式:
监听 'data' 事件。 调用 stream.resume() 方法。 调用 stream.pipe() 方法将数据发送到 Writable。
可读流可以通过下面途径切换到 paused 模式:
http://www.cnblogs.com/xiaohuochai/p/6969307.html