diff --git a/doc/api/stream.md b/doc/api/stream.md index cc18b643b7158e..10b02e2ea1d5ef 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -45,8 +45,8 @@ There are four fundamental stream types within Node.js: is written and read (for example, [`zlib.createDeflate()`][]). Additionally, this module includes the utility functions -[`stream.pipeline()`][], [`stream.finished()`][], [`stream.Readable.from()`][] -and [`stream.addAbortSignal()`][]. +[`stream.pipeline()`][], [`stream.finished()`][], [`stream.Readable.from()`][], +[`stream.addAbortSignal()`][] and [`stream.Transform.by()`][]. ### Streams Promises API + +* `asyncGeneratorFunction` {AsyncGeneratorFunction} A mapping function which +accepts a `source` async iterable which can be used to read incoming data, while +transformed data is pushed to the stream with the `yield` keyword. +* `options` {Object} Options provided to `new stream.Transform([options])`. +By default, `Transform.by()` will set `options.objectMode` to `true`, +unless this is explicitly opted out by setting `options.objectMode` to `false`. +* Returns: {stream.Transform} + +A utility method for creating Transform Streams with async generator functions. +The async generator is supplied a single argument, `source`, which is used to +read incoming chunks. + +Yielded values become the data chunks emitted from the stream. + +```js +const { Readable, Transform } = require('stream'); + +const readable = Readable.from(['hello', 'streams']); +async function * mapper(source) { + for await (const chunk of source) { + // If objectMode was set to false, the buffer would have to be converted + // to a string here but since it is true by default for both Readable.from() + // and Transform.by() each chunk is already a string. + yield chunk.toUpperCase(); + } +} +const transform = Transform.by(mapper); +readable.pipe(transform); +transform.on('data', (chunk) => { + console.log(chunk); +}); +``` + +The `source` parameter has an `encoding` property which represents the encoding +of the `WriteableStream` side of the transform. This is the same `encoding` +value that would be passed as the second parameter to the `transform()` function +option (or `_transform()` method) supplied to `stream.Transform`. + +## API for Stream Implementers @@ -1928,7 +1972,7 @@ on the type of stream being created, as detailed in the chart below: | Reading only | [`Readable`][] | [`_read()`][stream-_read] | | Writing only | [`Writable`][] | [`_write()`][stream-_write], [`_writev()`][stream-_writev], [`_final()`][stream-_final] | | Reading and writing | [`Duplex`][] | [`_read()`][stream-_read], [`_write()`][stream-_write], [`_writev()`][stream-_writev], [`_final()`][stream-_final] | -| Operate on written data, then read the result | [`Transform`][] | [`_transform()`][stream-_transform], [`_flush()`][stream-_flush], [`_final()`][stream-_final] | +| Operate on written data, then read the result | [`Transform`][] | [`_transform()`][], [`_flush()`][stream-_flush], [`_final()`][stream-_final] | The implementation code for a stream should *never* call the "public" methods of a stream that are intended for use by consumers (as described in the @@ -3095,6 +3139,35 @@ readable.on('data', (chunk) => { }); ``` +#### Creating transform streams with async generator functions + +We can construct a Node.js transform stream with an asynchronous +generator function using the `Transform.by()` utility method. + +```js +const { Readable, Transform } = require('stream'); + +async function * toUpperCase(source) { + for await (const chunk of source) { + yield chunk.toUpperCase(); + } +} +const transform = Transform.by(toUpperCase); + +async function * generate() { + yield 'a'; + yield 'b'; + yield 'c'; +} + +const readable = Readable.from(generate()); + +readable.pipe(transform); +transform.on('data', (chunk) => { + console.log(chunk); +}); +``` + #### Piping to writable streams from async iterators When writing to a writable stream from an async iterator, ensure correct @@ -3259,6 +3332,7 @@ contain multi-byte characters. [`readable.push('')`]: #stream_readable_push [`readable.setEncoding()`]: #stream_readable_setencoding_encoding [`stream.Readable.from()`]: #stream_stream_readable_from_iterable_options +[`stream.Transform.by()`]: #stream_stream_transform_by_asyncgeneratorfunction_options [`stream.cork()`]: #stream_writable_cork [`stream.finished()`]: #stream_stream_finished_stream_options_callback [`stream.pipe()`]: #stream_readable_pipe_destination_options diff --git a/lib/internal/streams/transform.js b/lib/internal/streams/transform.js index 26e0b07c2956c8..b262d9a29e0052 100644 --- a/lib/internal/streams/transform.js +++ b/lib/internal/streams/transform.js @@ -73,6 +73,8 @@ const { ERR_METHOD_NOT_IMPLEMENTED } = require('internal/errors').codes; const Duplex = require('internal/streams/duplex'); +const from = require('internal/streams/from'); +const { createDeferredPromise } = require('internal/util'); ObjectSetPrototypeOf(Transform.prototype, Duplex.prototype); ObjectSetPrototypeOf(Transform, Duplex); @@ -244,3 +246,25 @@ Transform.prototype._read = function() { callback(); } }; + +Transform.by = function by(asyncGeneratorFn, options) { + let { promise, resolve } = createDeferredPromise(); + const asyncGenerator = async function*() { + while (true) { + const { chunk, done, encoding, cb } = await promise; + process.nextTick(cb); + if (done) return; + yield { chunk, encoding }; + ({ promise, resolve } = createDeferredPromise()); + } + }(); + return from(Duplex, asyncGeneratorFn(asyncGenerator), { + objectMode: true, + autoDestroy: true, + ...options, + write: (chunk, encoding, cb) => { + resolve({ chunk, done: false, encoding, cb }); + }, + final: (cb) => resolve({ done: true, cb }) + }); +}; diff --git a/test/parallel/test-stream-readable-async-iterators.js b/test/parallel/test-stream-readable-async-iterators.js index 7c457fdc3da24b..e92e515fac65bd 100644 --- a/test/parallel/test-stream-readable-async-iterators.js +++ b/test/parallel/test-stream-readable-async-iterators.js @@ -497,6 +497,30 @@ async function tests() { mustReach[1](); } + { + console.log('readable side of a transform stream pushes null'); + const transform = new Transform({ + objectMode: true, + transform: (chunk, enc, cb) => { cb(null, chunk); } + }); + transform.push(0); + transform.push(1); + process.nextTick(() => { + transform.push(null); + }); + + const mustReach = [ common.mustCall(), common.mustCall() ]; + + const iter = transform[Symbol.asyncIterator](); + assert.strictEqual((await iter.next()).value, 0); + + for await (const d of iter) { + assert.strictEqual(d, 1); + mustReach[0](); + } + mustReach[1](); + } + { console.log('all next promises must be resolved on end'); const r = new Readable({ diff --git a/test/parallel/test-stream-transform-by.js b/test/parallel/test-stream-transform-by.js new file mode 100644 index 00000000000000..2affdff759723f --- /dev/null +++ b/test/parallel/test-stream-transform-by.js @@ -0,0 +1,249 @@ +'use strict'; +const { mustCall } = require('../common'); +const { once } = require('events'); +const { Readable, Transform } = require('stream'); +const { strictEqual } = require('assert'); + +async function transformBy() { + const readable = Readable.from('test'.split('')); + async function * mapper(source) { + for await (const { chunk } of source) { + yield chunk.toUpperCase(); + } + } + + const stream = Transform.by(mapper); + readable.pipe(stream); + const expected = ['T', 'E', 'S', 'T']; + for await (const chunk of stream) { + strictEqual(chunk, expected.shift()); + } +} + +async function transformByFuncReturnsObjectWithSymbolAsyncIterator() { + const readable = Readable.from('test'.split('')); + const mapper = (source) => ({ + [Symbol.asyncIterator]() { + return { + async next() { + const { done, value } = await source.next(); + return { done, value: value ? value.chunk.toUpperCase() : value }; + } + }; + } + }); + + const stream = Transform.by(mapper); + readable.pipe(stream); + const expected = ['T', 'E', 'S', 'T']; + for await (const chunk of stream) { + strictEqual(chunk, expected.shift()); + } +} + +async function transformByEncoding() { + const readable = Readable.from('test'.split('')); + async function * mapper(source) { + for await (const { chunk, encoding } of source) { + strictEqual(encoding, 'ascii'); + yield chunk.toUpperCase(); + } + } + const stream = Transform.by(mapper); + stream.setDefaultEncoding('ascii'); + readable.pipe(stream); + + const expected = ['T', 'E', 'S', 'T']; + for await (const chunk of stream) { + strictEqual(chunk, expected.shift()); + } +} + +async function transformBySourceIteratorCompletes() { + const readable = Readable.from('test'.split('')); + const mustReach = mustCall(); + async function * mapper(source) { + for await (const { chunk } of source) { + yield chunk.toUpperCase(); + } + mustReach(); + } + + const stream = Transform.by(mapper); + readable.pipe(stream); + const expected = ['T', 'E', 'S', 'T']; + for await (const chunk of stream) { + strictEqual(chunk, expected.shift()); + } +} + +async function transformByYieldPlusReturn() { + const readable = Readable.from('test'.split('')); + async function * mapper(source) { + for await (const { chunk } of source) { + yield chunk.toUpperCase(); + } + return 'final chunk'; + } + + const stream = Transform.by(mapper); + readable.pipe(stream); + const expected = ['T', 'E', 'S', 'T', 'final chunk']; + for await (const chunk of stream) { + strictEqual(chunk, expected.shift()); + } +} + +async function transformByReturnEndsStream() { + const readable = Readable.from('test'.split('')); + async function * mapper(source) { + for await (const { chunk } of source) { + yield chunk.toUpperCase(); + return 'stop'; + } + } + + const stream = Transform.by(mapper); + readable.pipe(stream); + const expected = ['T', 'stop']; + const mustReach = mustCall(); + for await (const chunk of stream) { + strictEqual(chunk, expected.shift()); + } + mustReach(); +} + +async function transformByOnData() { + const readable = Readable.from('test'.split('')); + async function * mapper(source) { + for await (const { chunk } of source) { + yield chunk.toUpperCase(); + } + } + + const stream = Transform.by(mapper); + readable.pipe(stream); + const expected = ['T', 'E', 'S', 'T']; + let iterations = 0; + stream.on('data', (chunk) => { + iterations++; + strictEqual(chunk, expected.shift()); + }); + + await once(stream, 'end'); + strictEqual(iterations, 4); +} + +async function transformByOnDataNonObject() { + const readable = Readable.from('test'.split(''), { objectMode: false }); + async function * mapper(source) { + for await (const { chunk } of source) { + yield chunk.toString().toUpperCase(); + } + } + const stream = Transform.by(mapper, { objectMode: false }); + readable.pipe(stream); + const expected = ['T', 'E', 'S', 'T']; + let iterations = 0; + stream.on('data', (chunk) => { + iterations++; + strictEqual(chunk instanceof Buffer, true); + strictEqual(chunk.toString(), expected.shift()); + }); + + await once(stream, 'end'); + strictEqual(iterations, 4); +} + +async function transformByOnErrorAndDestroyed() { + const stream = Readable.from('test'.split('')).pipe(Transform.by( + async function * mapper(source) { + for await (const { chunk } of source) { + if (chunk === 'e') throw new Error('kaboom'); + yield chunk.toUpperCase(); + } + } + )); + stream.on('data', (chunk) => { + strictEqual(chunk.toString(), 'T'); + }); + strictEqual(stream.destroyed, false); + const [ err ] = await once(stream, 'error'); + strictEqual(err.message, 'kaboom'); + strictEqual(stream.destroyed, true); +} + +async function transformByErrorTryCatchAndDestroyed() { + const stream = Readable.from('test'.split('')).pipe(Transform.by( + async function * mapper(source) { + for await (const { chunk } of source) { + if (chunk === 'e') throw new Error('kaboom'); + yield chunk.toUpperCase(); + } + } + )); + strictEqual(stream.destroyed, false); + try { + for await (const chunk of stream) { + strictEqual(chunk.toString(), 'T'); + } + } catch (err) { + strictEqual(err.message, 'kaboom'); + strictEqual(stream.destroyed, true); + } +} + +async function transformByOnErrorAndTryCatchAndDestroyed() { + const stream = Readable.from('test'.split('')).pipe(Transform.by( + async function * mapper(source) { + for await (const { chunk } of source) { + if (chunk === 'e') throw new Error('kaboom'); + yield chunk.toUpperCase(); + } + } + )); + strictEqual(stream.destroyed, false); + stream.once('error', mustCall((err) => { + strictEqual(err.message, 'kaboom'); + })); + try { + for await (const chunk of stream) { + strictEqual(chunk.toString(), 'T'); + } + } catch (err) { + strictEqual(err.message, 'kaboom'); + strictEqual(stream.destroyed, true); + } +} + +async function transformByThrowPriorToForAwait() { + async function * generate() { + yield 'a'; + yield 'b'; + yield 'c'; + } + const read = Readable.from(generate()); + const stream = Transform.by(async function * transformA(source) { + throw new Error('kaboom'); + }); + stream.on('error', mustCall((err) => { + strictEqual(err.message, 'kaboom'); + })); + + read.pipe(stream); +} + +Promise.all([ + transformBy(), + transformByFuncReturnsObjectWithSymbolAsyncIterator(), + transformByEncoding(), + transformBySourceIteratorCompletes(), + transformByYieldPlusReturn(), + transformByReturnEndsStream(), + transformByOnData(), + transformByOnDataNonObject(), + transformByOnErrorAndDestroyed(), + transformByErrorTryCatchAndDestroyed(), + transformByOnErrorAndTryCatchAndDestroyed(), + transformByThrowPriorToForAwait(), +]).then(mustCall()); diff --git a/tools/doc/type-parser.js b/tools/doc/type-parser.js index 4fd91f4b478149..3e8ba274368d20 100644 --- a/tools/doc/type-parser.js +++ b/tools/doc/type-parser.js @@ -32,6 +32,8 @@ const customTypesMap = { 'ArrayBufferView': 'https://developer.mozilla.org/en-US/docs/Web/API/ArrayBufferView', + 'AsyncGeneratorFunction': 'https://tc39.es/ecma262/#sec-async-generator-function-definitions', + 'AsyncIterator': 'https://tc39.github.io/ecma262/#sec-asynciterator-interface', 'AsyncIterable': 'https://tc39.github.io/ecma262/#sec-asynciterable-interface',