diff --git a/doc/api/errors.md b/doc/api/errors.md index cf21c142d6dd97..278e3386923e76 100644 --- a/doc/api/errors.md +++ b/doc/api/errors.md @@ -630,6 +630,12 @@ display if `block` does not throw. An iterable argument (i.e. a value that works with `for...of` loops) was required, but not provided to a Node.js API. + +### ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE + +A function argument that returns an async iterable (i.e. a value that works +with `for await...of` loops) was required, but not provided to a Node.js API. + ### ERR_ASSERTION diff --git a/doc/api/stream.md b/doc/api/stream.md index ad13e8ee6ade38..3526f869578c8e 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -43,8 +43,8 @@ There are four fundamental stream types within Node.js: is written and read (for example, [`zlib.createDeflate()`][]). Additionally, this module includes the utility functions -[`stream.pipeline()`][], [`stream.finished()`][] and -[`stream.Readable.from()`][]. +[`stream.pipeline()`][], [`stream.finished()`][], +[`stream.Readable.from()`][] and [`stream.Transform.by()`][]. ### Object Mode @@ -1646,6 +1646,49 @@ Calling `Readable.from(string)` or `Readable.from(buffer)` will not have the strings or buffers be iterated to match the other streams semantics for performance reasons. +### stream.Transform.by(asyncGeneratorFunction[, options]) + + +* `asyncGeneratorFunction` {AsyncGeneratorFunction} A mapping function which +accepts a `source` async iterable which can be used to read incoming data, while +transformed data is pushed to the stream with the `yield` keyword. +* `options` {Object} Options provided to `new stream.Transform([options])`. +By default, `Transform.by()` will set `options.objectMode` to `true`, +unless this is explicitly opted out by setting `options.objectMode` to `false`. +* Returns: {stream.Transform} + +A utility method for creating Transform Streams with async generator functions. +The async generator is supplied a single argument, `source`, which is used to +read incoming chunks. + +Yielded values become the data chunks emitted from the stream. + +```js +const { Readable, Transform } = require('stream'); + +const readable = Readable.from(['hello', 'streams']); +async function * mapper(source) { + for await (const chunk of source) { + // If objectMode was set to false, the buffer would have to be converted + // to a string here but since it is true by default for both Readable.from() + // and Transform.by() each chunk is already a string. + yield chunk.toUpperCase(); + } +} +const transform = Transform.by(mapper); +readable.pipe(transform); +transform.on('data', (chunk) => { + console.log(chunk); +}); +``` + +The `source` parameter has an `encoding` property which represents the encoding +of the `WriteableStream` side of the transform. This is the same `encoding` +value that would be passed as the second parameter to the `transform()` function +option (or `_transform()` method) supplied to `stream.Transform`. + ## API for Stream Implementers @@ -1689,7 +1732,7 @@ on the type of stream being created, as detailed in the chart below: | Reading only | [`Readable`][] | [`_read()`][stream-_read] | | Writing only | [`Writable`][] | [`_write()`][stream-_write], [`_writev()`][stream-_writev], [`_final()`][stream-_final] | | Reading and writing | [`Duplex`][] | [`_read()`][stream-_read], [`_write()`][stream-_write], [`_writev()`][stream-_writev], [`_final()`][stream-_final] | -| Operate on written data, then read the result | [`Transform`][] | [`_transform()`][stream-_transform], [`_flush()`][stream-_flush], [`_final()`][stream-_final] | +| Operate on written data, then read the result | [`Transform`][] | [`_transform()`][], [`_flush()`][stream-_flush], [`_final()`][stream-_final] | The implementation code for a stream should *never* call the "public" methods of a stream that are intended for use by consumers (as described in the @@ -2430,7 +2473,7 @@ The `stream.Transform` class is extended to implement a [`Transform`][] stream. The `stream.Transform` class prototypically inherits from `stream.Duplex` and implements its own versions of the `writable._write()` and `readable._read()` methods. Custom `Transform` implementations *must* implement the -[`transform._transform()`][stream-_transform] method and *may* also implement +[`transform._transform()`][] method and *may* also implement the [`transform._flush()`][stream-_flush] method. Care must be taken when using `Transform` streams in that data written to the @@ -2442,7 +2485,7 @@ output on the `Readable` side is not consumed. * `options` {Object} Passed to both `Writable` and `Readable` constructors. Also has the following fields: * `transform` {Function} Implementation for the - [`stream._transform()`][stream-_transform] method. + [`stream._transform()`][] method. * `flush` {Function} Implementation for the [`stream._flush()`][stream-_flush] method. @@ -2489,7 +2532,7 @@ const myTransform = new Transform({ The [`'finish'`][] and [`'end'`][] events are from the `stream.Writable` and `stream.Readable` classes, respectively. The `'finish'` event is emitted after [`stream.end()`][stream-end] is called and all chunks have been processed -by [`stream._transform()`][stream-_transform]. The `'end'` event is emitted +by [`stream._transform()`][]. The `'end'` event is emitted after all data has been output, which occurs after the callback in [`transform._flush()`][stream-_flush] has been called. In the case of an error, neither `'finish'` nor `'end'` should be emitted. @@ -2630,6 +2673,35 @@ readable.on('data', (chunk) => { }); ``` +#### Creating Transform Streams with Async Generator Functions + +We can construct a Node.js Transform stream with an asynchronous +generator function using the `Transform.by()` utility method. + +```js +const { Readable, Transform } = require('stream'); + +async function * toUpperCase(source) { + for await (const chunk of source) { + yield chunk.toUpperCase(); + } +} +const transform = Transform.by(toUpperCase); + +async function * generate() { + yield 'a'; + yield 'b'; + yield 'c'; +} + +const readable = Readable.from(generate()); + +readable.pipe(transform); +transform.on('data', (chunk) => { + console.log(chunk); +}); +``` + #### Piping to Writable Streams from Async Iterators In the scenario of writing to a writable stream from an async iterator, ensure @@ -2819,6 +2891,7 @@ contain multi-byte characters. [`readable.push('')`]: #stream_readable_push [`readable.setEncoding()`]: #stream_readable_setencoding_encoding [`stream.Readable.from()`]: #stream_stream_readable_from_iterable_options +[`stream.Transform.by()`]: #stream_stream_transform_by_asyncgeneratorfunction_options [`stream.cork()`]: #stream_writable_cork [`stream.finished()`]: #stream_stream_finished_stream_options_callback [`stream.pipe()`]: #stream_readable_pipe_destination_options @@ -2853,7 +2926,7 @@ contain multi-byte characters. [stream-_final]: #stream_writable_final_callback [stream-_flush]: #stream_transform_flush_callback [stream-_read]: #stream_readable_read_size_1 -[stream-_transform]: #stream_transform_transform_chunk_encoding_callback +[]: #stream_transform_transform_chunk_encoding_callback [stream-_write]: #stream_writable_write_chunk_encoding_callback_1 [stream-_writev]: #stream_writable_writev_chunks_callback [stream-end]: #stream_writable_end_chunk_encoding_callback diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js index cb4aae2e6d18f4..0eee13369be307 100644 --- a/lib/_stream_transform.js +++ b/lib/_stream_transform.js @@ -65,16 +65,29 @@ const { ObjectSetPrototypeOf, + ObjectGetPrototypeOf, + Symbol } = primordials; module.exports = Transform; const { + ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE, ERR_METHOD_NOT_IMPLEMENTED, ERR_MULTIPLE_CALLBACK, ERR_TRANSFORM_ALREADY_TRANSFORMING, ERR_TRANSFORM_WITH_LENGTH_0 } = require('internal/errors').codes; const Duplex = require('_stream_duplex'); +const AsyncIteratorPrototype = ObjectGetPrototypeOf( + ObjectGetPrototypeOf(async function* () {}).prototype); + +const kSourceIteratorPull = Symbol('kSourceIteratorPull'); +const kSourceIteratorResolve = Symbol('kSourceIteratorResolve'); +const kSourceIteratorChunk = Symbol('kSourceIteratorChunk'); +const kSourceIteratorStream = Symbol('kSourceIteratorStream'); +const kSourceIteratorPump = Symbol('kSourceIteratorPump'); +const kSourceIteratorGrabResolve = Symbol('kSourceIteratorGrabResolve'); + ObjectSetPrototypeOf(Transform.prototype, Duplex.prototype); ObjectSetPrototypeOf(Transform, Duplex); @@ -203,7 +216,6 @@ Transform.prototype._destroy = function(err, cb) { }); }; - function done(stream, er, data) { if (er) return stream.emit('error', er); @@ -219,3 +231,111 @@ function done(stream, er, data) { throw new ERR_TRANSFORM_ALREADY_TRANSFORMING(); return stream.push(null); } + +function SourceIterator(asyncGeneratorFn, opts) { + const source = this; + const result = asyncGeneratorFn(this); + if (typeof result[Symbol.asyncIterator] !== 'function') { + throw new ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE('asyncGeneratorFn'); + } + const iter = result[Symbol.asyncIterator](); + if (typeof iter.next !== 'function') { + throw new ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE('asyncGeneratorFn'); + } + + this[kSourceIteratorPull] = null; + this[kSourceIteratorChunk] = null; + this[kSourceIteratorResolve] = null; + this[kSourceIteratorStream] = new Transform({ + objectMode: true, + ...opts, + transform(chunk, encoding, cb) { + source.encoding = encoding; + if (source[kSourceIteratorResolve] === null) { + source[kSourceIteratorChunk] = chunk; + source[kSourceIteratorPull] = cb; + return; + } + source[kSourceIteratorResolve]({ value: chunk, done: false }); + source[kSourceIteratorResolve] = null; + cb(null); + } + }); + this.encoding = this[kSourceIteratorStream]._transformState.writeencoding; + this[kSourceIteratorGrabResolve] = (resolve) => { + this[kSourceIteratorResolve] = resolve; + }; + const first = iter.next(); + this[kSourceIteratorPump](iter, first); +} + +SourceIterator.prototype[Symbol.asyncIterator] = function() { + return this; +}; + +ObjectSetPrototypeOf(SourceIterator.prototype, AsyncIteratorPrototype); + +SourceIterator.prototype.next = function next() { + if (this[kSourceIteratorPull] === null || this[kSourceIteratorChunk] === null) + return new Promise(this[kSourceIteratorGrabResolve]); + + this[kSourceIteratorPull](null); + const result = Promise.resolve({ + value: this[kSourceIteratorChunk], + done: false + }); + this[kSourceIteratorChunk] = null; + this[kSourceIteratorPull] = null; + return result; +}; + +SourceIterator.prototype[kSourceIteratorPump] = async function pump(iter, p) { + const stream = this[kSourceIteratorStream]; + try { + stream.removeListener('prefinish', prefinish); + stream.on('prefinish', () => { + if (this[kSourceIteratorResolve] !== null) { + this[kSourceIteratorResolve]({ value: undefined, done: true }); + } + }); + let next = await p; + while (true) { + const { done, value } = next; + if (done) { + if (value !== undefined) stream.push(value); + + // In the event of an early return we explicitly + // discard any buffered state + if (stream._writableState.length > 0) { + const { length } = stream._writableState; + const { transforming } = stream._transformState; + stream._writableState.length = 0; + stream._transformState.transforming = false; + prefinish.call(stream); + stream._writableState.length = length; + stream._transformState.transforming = transforming; + } else { + prefinish.call(stream); + } + break; + } + stream.push(value); + next = await iter.next(); + } + } catch (err) { + process.nextTick(() => stream.destroy(err)); + } finally { + this[kSourceIteratorPull] = null; + this[kSourceIteratorChunk] = null; + this[kSourceIteratorResolve] = null; + this[kSourceIteratorStream] = null; + } +}; + + +Transform.by = function by(asyncGeneratorFn, opts) { + const source = new SourceIterator(asyncGeneratorFn, opts); + const stream = source[kSourceIteratorStream]; + + return stream; +}; diff --git a/lib/internal/errors.js b/lib/internal/errors.js index ad12d99c7cc49c..39f580a609cdef 100644 --- a/lib/internal/errors.js +++ b/lib/internal/errors.js @@ -720,6 +720,8 @@ module.exports = { // Note: Node.js specific errors must begin with the prefix ERR_ E('ERR_AMBIGUOUS_ARGUMENT', 'The "%s" argument is ambiguous. %s', TypeError); E('ERR_ARG_NOT_ITERABLE', '%s must be iterable', TypeError); +E('ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE', '%s must return an async iterable', + TypeError); E('ERR_ASSERTION', '%s', Error); E('ERR_ASYNC_CALLBACK', '%s must be a function', TypeError); E('ERR_ASYNC_TYPE', 'Invalid name for async "type": %s', TypeError); diff --git a/test/parallel/test-stream-readable-async-iterators.js b/test/parallel/test-stream-readable-async-iterators.js index 4a63e9fd3022e6..3bad0875522d03 100644 --- a/test/parallel/test-stream-readable-async-iterators.js +++ b/test/parallel/test-stream-readable-async-iterators.js @@ -420,6 +420,30 @@ async function tests() { mustReach[1](); } + { + console.log('readable side of a transform stream pushes null'); + const transform = new Transform({ + objectMode: true, + transform: (chunk, enc, cb) => { cb(null, chunk); } + }); + transform.push(0); + transform.push(1); + process.nextTick(() => { + transform.push(null); + }); + + const mustReach = [ common.mustCall(), common.mustCall() ]; + + const iter = transform[Symbol.asyncIterator](); + assert.strictEqual((await iter.next()).value, 0); + + for await (const d of iter) { + assert.strictEqual(d, 1); + mustReach[0](); + } + mustReach[1](); + } + { console.log('all next promises must be resolved on end'); const r = new Readable({ diff --git a/test/parallel/test-transform-by.js b/test/parallel/test-transform-by.js new file mode 100644 index 00000000000000..fe01986b8d1a0b --- /dev/null +++ b/test/parallel/test-transform-by.js @@ -0,0 +1,309 @@ +'use strict'; +const { mustCall, expectsError } = require('../common'); +const { once } = require('events'); +const { Readable, Transform } = require('stream'); +const { strictEqual } = require('assert'); + +async function transformBy() { + const readable = Readable.from('test'); + async function * mapper(source) { + for await (const chunk of source) { + yield chunk.toUpperCase(); + } + } + + const stream = Transform.by(mapper); + readable.pipe(stream); + const expected = ['T', 'E', 'S', 'T']; + for await (const chunk of stream) { + strictEqual(chunk, expected.shift()); + } +} + +async function transformByFuncReturnsObjectWithSymbolAsyncIterator() { + const readable = Readable.from('test'); + const mapper = (source) => ({ + [Symbol.asyncIterator]() { + return { + async next() { + const { done, value } = await source.next(); + return { done, value: value ? value.toUpperCase() : value }; + } + }; + } + }); + + const stream = Transform.by(mapper); + readable.pipe(stream); + const expected = ['T', 'E', 'S', 'T']; + for await (const chunk of stream) { + strictEqual(chunk, expected.shift()); + } +} + +async function +transformByObjReturnedWSymbolAsyncIteratorWithNonPromiseReturningNext() { + const mapper = (source) => ({ + [Symbol.asyncIterator]() { + return { + next() { + const { done, value } = source.next(); + return { done, value: value ? value.toUpperCase() : value }; + } + }; + } + }); + + expectsError(() => Transform.by(mapper), { + message: 'asyncGeneratorFn must return an async iterable', + code: 'ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE', + type: TypeError + }); +} + +async function transformByObjReturnedWSymbolAsyncIteratorWithNoNext() { + const mapper = () => ({ + [Symbol.asyncIterator]() { + return {}; + } + }); + + expectsError(() => Transform.by(mapper), { + message: 'asyncGeneratorFn must return an async iterable', + code: 'ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE', + type: TypeError + }); +} + +async function transformByObjReturnedWSymbolAsyncIteratorThatIsNotFunction() { + const mapper = () => ({ + [Symbol.asyncIterator]: 'wrong' + }); + + expectsError(() => Transform.by(mapper), { + message: 'asyncGeneratorFn must return an async iterable', + code: 'ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE', + type: TypeError + }); +} + +async function transformByFuncReturnsObjectWithoutSymbolAsyncIterator() { + const mapper = () => ({}); + + expectsError(() => Transform.by(mapper), { + message: 'asyncGeneratorFn must return an async iterable', + code: 'ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE', + type: TypeError + }); +} + +async function transformByEncoding() { + const readable = Readable.from('test'); + async function * mapper(source) { + for await (const chunk of source) { + strictEqual(source.encoding, 'ascii'); + yield chunk.toUpperCase(); + } + } + const stream = Transform.by(mapper); + stream.setDefaultEncoding('ascii'); + readable.pipe(stream); + + const expected = ['T', 'E', 'S', 'T']; + for await (const chunk of stream) { + strictEqual(chunk, expected.shift()); + } +} + +async function transformBySourceIteratorCompletes() { + const readable = Readable.from('test'); + const mustReach = mustCall(); + async function * mapper(source) { + for await (const chunk of source) { + yield chunk.toUpperCase(); + } + mustReach(); + } + + const stream = Transform.by(mapper); + readable.pipe(stream); + const expected = ['T', 'E', 'S', 'T']; + for await (const chunk of stream) { + strictEqual(chunk, expected.shift()); + } +} + +async function transformByYieldPlusReturn() { + const readable = Readable.from('test'); + async function * mapper(source) { + for await (const chunk of source) { + yield chunk.toUpperCase(); + } + return 'final chunk'; + } + + const stream = Transform.by(mapper); + readable.pipe(stream); + const expected = ['T', 'E', 'S', 'T', 'final chunk']; + for await (const chunk of stream) { + strictEqual(chunk, expected.shift()); + } +} + +async function transformByReturnEndsStream() { + const readable = Readable.from('test'); + async function * mapper(source) { + for await (const chunk of source) { + yield chunk.toUpperCase(); + return 'stop'; + } + } + + const stream = Transform.by(mapper); + readable.pipe(stream); + const expected = ['T', 'stop']; + const mustReach = mustCall(); + for await (const chunk of stream) { + strictEqual(chunk, expected.shift()); + } + mustReach(); +} + +async function transformByOnData() { + const readable = Readable.from('test'); + async function * mapper(source) { + for await (const chunk of source) { + yield chunk.toUpperCase(); + } + } + + const stream = Transform.by(mapper); + readable.pipe(stream); + const expected = ['T', 'E', 'S', 'T']; + let iterations = 0; + stream.on('data', (chunk) => { + iterations++; + strictEqual(chunk, expected.shift()); + }); + + await once(stream, 'end'); + strictEqual(iterations, 4); +} + +async function transformByOnDataNonObject() { + const readable = Readable.from('test', { objectMode: false }); + async function * mapper(source) { + for await (const chunk of source) { + yield chunk.toString().toUpperCase(); + } + } + const stream = Transform.by(mapper, { objectMode: false }); + readable.pipe(stream); + const expected = ['T', 'E', 'S', 'T']; + let iterations = 0; + stream.on('data', (chunk) => { + iterations++; + strictEqual(chunk instanceof Buffer, true); + strictEqual(chunk.toString(), expected.shift()); + }); + + await once(stream, 'end'); + strictEqual(iterations, 4); +} + +async function transformByOnErrorAndDestroyed() { + const stream = Readable.from('test').pipe(Transform.by( + async function * mapper(source) { + for await (const chunk of source) { + if (chunk === 'e') throw new Error('kaboom'); + yield chunk.toUpperCase(); + } + } + )); + stream.on('data', (chunk) => { + strictEqual(chunk.toString(), 'T'); + }); + strictEqual(stream.destroyed, false); + const [ err ] = await once(stream, 'error'); + strictEqual(err.message, 'kaboom'); + strictEqual(stream.destroyed, true); +} + +async function transformByErrorTryCatchAndDestroyed() { + const stream = Readable.from('test').pipe(Transform.by( + async function * mapper(source) { + for await (const chunk of source) { + if (chunk === 'e') throw new Error('kaboom'); + yield chunk.toUpperCase(); + } + } + )); + strictEqual(stream.destroyed, false); + try { + for await (const chunk of stream) { + strictEqual(chunk.toString(), 'T'); + } + } catch (err) { + strictEqual(err.message, 'kaboom'); + strictEqual(stream.destroyed, true); + } +} + +async function transformByOnErrorAndTryCatchAndDestroyed() { + const stream = Readable.from('test').pipe(Transform.by( + async function * mapper(source) { + for await (const chunk of source) { + if (chunk === 'e') throw new Error('kaboom'); + yield chunk.toUpperCase(); + } + } + )); + strictEqual(stream.destroyed, false); + stream.once('error', mustCall((err) => { + strictEqual(err.message, 'kaboom'); + })); + try { + for await (const chunk of stream) { + strictEqual(chunk.toString(), 'T'); + } + } catch (err) { + strictEqual(err.message, 'kaboom'); + strictEqual(stream.destroyed, true); + } +} + +async function transformByThrowPriorToForAwait() { + async function * generate() { + yield 'a'; + yield 'b'; + yield 'c'; + } + const read = Readable.from(generate()); + const stream = Transform.by(async function * transformA(source) { + throw new Error('kaboom'); + }); + stream.on('error', mustCall((err) => { + strictEqual(err.message, 'kaboom'); + })); + + read.pipe(stream); +} + +Promise.all([ + transformBy(), + transformByFuncReturnsObjectWithSymbolAsyncIterator(), + transformByObjReturnedWSymbolAsyncIteratorWithNonPromiseReturningNext(), + transformByObjReturnedWSymbolAsyncIteratorWithNoNext(), + transformByObjReturnedWSymbolAsyncIteratorThatIsNotFunction(), + transformByFuncReturnsObjectWithoutSymbolAsyncIterator(), + transformByEncoding(), + transformBySourceIteratorCompletes(), + transformByYieldPlusReturn(), + transformByReturnEndsStream(), + transformByOnData(), + transformByOnDataNonObject(), + transformByOnErrorAndDestroyed(), + transformByErrorTryCatchAndDestroyed(), + transformByOnErrorAndTryCatchAndDestroyed(), + transformByThrowPriorToForAwait() +]).then(mustCall()); diff --git a/tools/doc/type-parser.js b/tools/doc/type-parser.js index ef4499e50ff35a..680a8aef0bd689 100644 --- a/tools/doc/type-parser.js +++ b/tools/doc/type-parser.js @@ -26,6 +26,8 @@ const customTypesMap = { 'this': `${jsDocPrefix}Reference/Operators/this`, + 'AsyncGeneratorFunction': 'https://tc39.es/ecma262/#sec-async-generator-function-definitions', + 'AsyncIterator': 'https://tc39.github.io/ecma262/#sec-asynciterator-interface', 'bigint': `${jsDocPrefix}Reference/Global_Objects/BigInt`,