とあるNodeJSのサーバー実装を読んでて、頻出するこいつらに目が慣れてなかったせいで、読み進めるのに時間がかかってた。
ので、思い出しながら、おさらいがてらメモを残してたので記事にしておく。
考え方やデザインがどうっていうよりも、コードを読む時に思い出せるようにって感じの切り口になってます。
よくみるAPI
- Readable: ソース
- Writable: 目的地
- Duplex: ReadableでありWritable
- Transform: Duplexと同じだが、その名の通り加工してつなぐ用
- finished()
- pipeline()
すべてビルトインの`stream`から。
`finished`と`pipeline`はv10から追加されたもので、それまで同等のことをするには、`readable-stream`を別途インストールして、そこから使ってたぽい。
ちなみにTypeScriptの型はこちら。
DefinitelyTyped/stream.d.ts at master · DefinitelyTyped/DefinitelyTyped · GitHub
更新されてずれてそう。
使われ方の基本
Streamはつなげてなんぼなので、まずそのあたりから。
const { pipeline } = require('stream'); // simple readable.pipe(writable); // pipe chaining readable .pipe(transform) .pipe(writable); // pipeline pipeline(readable, transform, writable, callback); pipeline(readable, transform1, transform2, writable, callback);
こうすると上流である`readable`からデータが流れてく。
`readable`は`implements Readable`な感じで、`Duplex`でもいい。`writable`も同じく。
なのでこういうコードもある。
pipeline(readable, duplex, callback); // duplex as writable pipeline(duplex, writable, callback); // duplex as readable
readable → duplex → writableという流れ。
流れるきっかけ
`Readable`なストリームも用意しただけじゃデータは流れない。
- `on('data')`する
- `pipe()`する
- 内部的に`on('data')`してる
- `pipeline()`も同じ
- `resume()`する
こうすることで流れ出す。
内部的にも`readableFlowing`というGetterから取れるフラグがある。
const r = getReadableStreamSomehow(); console.log(r.readableFlowing); // null r.on('data', console.log); console.log(r.readableFlowing); // true r.pause(); console.log(r.readableFlowing); // false r.resume(); console.log(r.readableFlowing); // true
基本的には`pipe()`だけ使ってれば良いはず。
ただし、流れるデータを特定の単位で扱いたいとかの要件があるなら、手動で`on('data')`したり`pause/resume()`したりする。
`on('readable')` / `readable.read()`についてはいったん割愛。
さて、接続はこれでできるけど、データを流すためにはそれぞれちゃんと実装されてる必要があり・・。
というわけで実装する側のポイントへ。
Readable
いわゆるデータのEmitter。
パターンに応じて、`_read()`もしくは`read()`を実装する必要がある。
実装のパターンはいくつかあるけど、結果的には同じ。
const { Readable } = require('stream'); // new はあってもなくても const r1 = new Readable({ // _read ではない read() { this.push(`${Date.now()}\n`); } }) const r2 = new Readable(); // read ではない r2._read = () => { r2.push(`${Date.now()}\n`); }; const r3 = new class extends Readable { // read ではない _read() { this.push(`${Date.now()}\n`); } }();
`new Readable(options)`が一番シュッと書けるけど、`constructor()`でいろいろやりたいのでだいたい`class`で書くことになりそう。
`Readable`なストリームは、`push()`を使って任意のタイミングでバッファにデータを貯める。
`push(null)`は特別で、そのストリームのデータの終端を表す。
`read`と`_read`は空の関数を実装しておいて、任意のタイミングで`push()`する実装例もある。
この例だと、ものすごい勢いで`Date.now()`が流れるストリームができる。
Writable
いわゆるデータのReceiver。
パターンに応じて、`_write()`もしくは`write()`を実装する必要がある。
const { Writable } = require('stream'); const w = new Writable({ write(data, _encoding, callback) { console.log(data); callback(); } });
コードの構造は`Readable`と一緒。
上層からデータが流れてくると、`write()`(か`_write()`)が呼ばれるので、そこでやりたい処理を書く。
この例だとコンソールに出力するだけのストリーム。
Duplex
`Readable`であり`Writable`でもある。
なのでパターンに応じて、`read()` or `_read()`および`write()` or `_write()`を実装しないといけない。
それぞれにバッファがあるのがポイント。
なのでオプションも`(readable|writable)ObjectMode`と2つあって、`(readable|writable)HighWaterMark`と2つある。
1人2役できるので便利やけど、コードを読むのがちょっと大変ではある・・。
Transform
特殊な`Duplex`ストリーム。
パターンに応じて、`_transform()`か`transform()`を実装する必要がある。
class extends Transform { _transform(data, encoding, callback) { // transform w/ data // そして次へ送る this.push(transformedData); callback(); // こう書いても同じ // callback(null, transformedData); } }
もちろん`push()`をサボってもいい。
ただサボるなら`Transform`である必要はない・・。
そのほか
実装すべきメソッド
`_final()`とか`_flush()`とか、ストリームの種類によってはまだいろいろある。
流れるデータの型
基本的に`String`か`Buffer`だが、`{ objectMode: true }`をストリーム初期化時に渡すと、`null`以外の値はそのまま流せるようになる。
ただし`pipe()`または`pipeline()`でつなぐストリームが同じオプションになってないと、思わぬ挙動になるので注意とのこと。
finished() / pipeline()
ストリームのエラー検知、後始末に使えるやつ。
const { finished } = require('stream'); const rs = getReadableStreamSomehow(); finished(rs, err => { if (err) { console.error('Stream failed.', err); } else { console.log('Stream is done reading.'); } });
`util.promisify(finished)`すれば、PromiseなコードとStreamのコードを混ぜて使える。
`pipeline()`もそうで、最後に渡したコールバックでまとめてエラー処理ができる。
同じく`util.promisify(pipeline)`できる。
おわりに
いざ自分が実装するとなると、もう少し知っておくべきメソッドやらイベントやらがある。
ただ、なんとなくコードを読むだけならコレで十分なのではないかなーと思う。
より詳しい記事みつけた。