stream

1. 认识Stream

2. node中的stream

node stream中可以看到第一段的描述:

A stream is an abstract interface implemented by various objects in Node. For example a request to an HTTP server is a stream, as is stdout. Streams are readable, writable, or both. All streams are instances of EventEmitter

对上面一段话进行解析,可以得到如下几点:

3.Readable Stream 与 Writeable Stream

3.1 二者的关系

Readable Stream是提供数据的Stream,外部来源的数据均会存储到内部的buffer数组内缓存起来。

writeable Stream是消费数据的Stream,从readable stream中获取数据,然后对得到的chunk块数据进行处理,至于如何处理,就依赖于具体实现(也就是_write的实现)。

首先看看Readdable Streamwriteable stream二者之间的流动关系:

Stream

3.2 pipe的流程解析

stream内部是如何从readable stream流到writeable stream里面呢?有两种方法:

a) pipe 连接两个stream

先看一个简单地demo

var Read = require('stream').Readable;
var Write = require('stream').Writable;
var r = new Read();
var w = new Write();

r.push('hello ');
r.push('world!');
r.push(null)


w._write = function (chunk, ev, cb) {
    console.log(chunk.toString());
    cb();
}

r.pipe(w);

pipe是一种最简单直接的方法连接两个stream,内部实现了数据传递的整个过程,在开发的时候不需要关注内部数据的流动:

Readable.prototype.pipe = function (dest, pipeOpts) {
	var src = this;
	...
	src.on('data', ondata);

	function ondata(chunk) {
		var ret = dest.write(chunk);
    	if (false === ret) {
      		debug('false write response, pause',
        	src._readableState.awaitDrain);
		  	src._readableState.awaitDrain++;
      		src.pause();
    	}
	}
	...
}

b) 事件data + 事件drain联合实现

var Read = require('stream').Readable;
var Write = require('stream').Writable;
var r = new Read();
var w = new Write();

r.push('hello ');
r.push('world!');
r.push(null)


w._write = function (chunk, ev, cb) {
    console.log(chunk.toString());
    cb();
}

r.on('data', function (chunk) {
    if (!w.write(chunk)) {
        r.pause();
    }
})

w.on('drain', function () {
    r.resume();
})

// hello
// world!

4 Readable Stream的模式

4.1 内部模式的实现

Readable Stream 存在两种模式(flowing mode 与 paused mode),这两种模式决定了chunk数据流动的方式—自动流动还是手工流动。那如何触发这两种模式呢:

让我们再深入一些,看看里面具体是如何实现的:

// data事件触发flowing mode
Readable.prototype.on = function(ev, fn) {
	...
	if (ev === 'data' && false !== this._readableState.flowing) {
    	this.resume();
  	}
  	...
}

// resume触发flowing mode
Readable.prototype.resume = function() {
	var state = this._readableState;
	if (!state.flowing) {
   		debug('resume');
   		state.flowing = true;
    resume(this, state);
  }
  return this;
}

// pipe方法触发flowing模式
Readable.prototype.resume = function() {
	if (!state.flowing) {
		this.resume()
	}
}

结论

4.2 两种模式的操作

a. paused mode

在paused mode下,需要手动地读取数据,并且可以直接指定读取数据的长度:

var Read = require('stream').Readable;
var r = new Read();

r.push('hello');
r.push('world');
r.push(null);

console.log('输出结果为: ', r.read(1).toString())
// 输出结果为: 'h'

还可以通过监听事件readable,触发时手工读取chunk数据:

var Read = require('stream').Readable;
var r = new Read();

r.push('hello');
r.push('world');
r.push(null);

r.on('readable', function () {
    var chunk = r.read();
    console.log('get data by readable event: ', chunk.toString())
});

// get data by readable event:  hello world!

需要注意的是,一旦注册了readable事件,必须手工读取read数据,否则数据就会流失,看看内部实现:

function emitReadable_(stream) {
	debug('emit readable');
	stream.emit('readable');
	flow(stream);
}

function flow(stream) {
	var state = stream._readableState;
	debug('flow', state.flowing);
	if (state.flowing) {
   		do {
      		var chunk = stream.read();
	    } while (null !== chunk && state.flowing);
	}
}

Readable.prototype.read = function (n) {
	...
	var res = fromList(n, state);

	if (!util.isNull(ret)) {
		this.emit('data', ret);
	}
	...
}

flow方法直接read数据,将得到的数据通过事件data交付出去,然而此处没有注册data事件监控,因此,得到的chunk数据并没有交付给任何对象,这样数据就白白流失了,所以在触发emit(‘readable’)时,需要提前read数据。

b. flowing mode

通过注册data、pipe、resume可以自动获取所需要的数据,看看内部实现:

// 事件data方式
var Read = require('stream').Readable;

var r = new Read();

r.push('hello ');
r.push('world!');
r.push(null)

r.on('data', function (chunk) {
    console.log('chunk :', chunk.toString())
})
// chunk : hello
// chunk : world!
// 通过pipe方式
var r = new Read();

r.push('hello ');
r.push('world!');
r.push(null)

r.pipe(process.stdout)
// hello world!

c. 两种mode的总结

两种mode的总结图

5. transform stream的实现

用过browserify的人都知道,browserify是一种基于stream的模块打包工具,里面存在browserify.prototype.transform(tr)方法,其中的tr就要求是transform stream,且browserify内部通过through2构建了很多tranform stream。也可以说browserify是建立在transform stream的基础上。那么具备readable、writeablestream的transform stream内部是如何工作的呢?

transform stream

6. 自定义stream

自定义stream很简单,只要实现相应的内部待实现方法就可以了,具体来说:

// 自定义readable stream的实现
var Stream = require('stream');
var Read = Stream.Readable;
var util = require('util');

util.inherits(MyReadStream, Read);

function MyReadStream(data, opt) {
    Read.call(this, opt);
    this.data = data || [];
}
MyReadStream.prototype._read = function () {
    var _this = this;
    this.data.forEach(function (d) {
        _this.push(d);
    })
    this.push(null);
}

var data = ['aa', 'bb', 'cc'];
var r = new MyReadStream(data);

r.on('data', function (chunk) {
    console.log(chunk.toString());
})

7. 参考资料