From d957541a46c9ec651491f13e9973517914746a02 Mon Sep 17 00:00:00 2001 From: David Mark Clements Date: Tue, 25 Jun 2019 13:51:54 -0700 Subject: [PATCH 01/42] stream: add Transform.by utility function Analogous to Readable.from, Transform.by creates transform streams from async function generators --- doc/api/stream.md | 42 +++- lib/_stream_transform.js | 124 ++++++++++- .../test-stream-readable-async-iterators.js | 22 ++ test/parallel/test-transform-by.js | 209 ++++++++++++++++++ tools/doc/type-parser.js | 2 + 5 files changed, 394 insertions(+), 5 deletions(-) create mode 100644 test/parallel/test-transform-by.js diff --git a/doc/api/stream.md b/doc/api/stream.md index ad13e8ee6ade38..3b16bbf8c09562 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -43,8 +43,8 @@ There are four fundamental stream types within Node.js: is written and read (for example, [`zlib.createDeflate()`][]). Additionally, this module includes the utility functions -[`stream.pipeline()`][], [`stream.finished()`][] and -[`stream.Readable.from()`][]. +[`stream.pipeline()`][], [`stream.finished()`][], +[`stream.Readable.from()`][] and [`stream.Transform.by()`][]. ### Object Mode @@ -1646,6 +1646,43 @@ Calling `Readable.from(string)` or `Readable.from(buffer)` will not have the strings or buffers be iterated to match the other streams semantics for performance reasons. +### stream.Transform.by(asyncGeneratorFunction, [options]) + +* `asyncGeneratorFunction` {AsyncGeneratorFunction} A mapping function which +accepts a `source` iterable which can be used to read incoming data, while +transformed data is pushed to the stream with the `yield` keyword. +* `options` {Object} Options provided to `new stream.Transform([options])`. +By default, `Readable.transform()` will set `options.objectMode` to `true`, +unless this is explicitly opted out by setting `options.objectMode` to `false`. + +A utility method for creating Transform Streams with async generator functions. +The async generator is supplied a single argument, `source`, which is used to +read incoming chunks. + +Yielded values become the data chunks emitted from the stream. + +```js +const { Readable, Transform } = require('stream'); + +const readable = Readable.from(['hello', 'streams']); +async function * mapper(source) { + for await (const chunk of source) { + yield chunk.toUpperCase(); + } +} +const transform = Transform.by(mapper); +readable.pipe(transform); +transform.on('data', (chunk) => { + console.log(chunk); +}); +``` + +The `source` parameter has an `encoding` property which represents the encoding +of the `WriteableStream` side of the transform. This is the same `encoding` +value that would be passed as the second parameter to the `transform` function +option (or `_transform` method) supplied to `stream.Transform`. + + ## API for Stream Implementers @@ -2819,6 +2856,7 @@ contain multi-byte characters. [`readable.push('')`]: #stream_readable_push [`readable.setEncoding()`]: #stream_readable_setencoding_encoding [`stream.Readable.from()`]: #stream_stream_readable_from_iterable_options +[`stream.Transform.by()`]: #stream_stream_transform_by_asyncgeneratorfunction_options [`stream.cork()`]: #stream_writable_cork [`stream.finished()`]: #stream_stream_finished_stream_options_callback [`stream.pipe()`]: #stream_readable_pipe_destination_options diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js index cb4aae2e6d18f4..c26f3d585531ab 100644 --- a/lib/_stream_transform.js +++ b/lib/_stream_transform.js @@ -69,14 +69,27 @@ const { module.exports = Transform; const { + ERR_INVALID_ARG_TYPE, ERR_METHOD_NOT_IMPLEMENTED, ERR_MULTIPLE_CALLBACK, ERR_TRANSFORM_ALREADY_TRANSFORMING, ERR_TRANSFORM_WITH_LENGTH_0 } = require('internal/errors').codes; const Duplex = require('_stream_duplex'); -ObjectSetPrototypeOf(Transform.prototype, Duplex.prototype); -ObjectSetPrototypeOf(Transform, Duplex); +// eslint-disable-next-line func-style +const { constructor: AsyncGeneratorFunction } = async function * () {}; +const AsyncIteratorPrototype = Object.getPrototypeOf( + Object.getPrototypeOf(async function* () {}).prototype); + +const kSourceIteratorPull = Symbol('SourceIteratorPull'); +const kSourceIteratorResolve = Symbol('SourceIteratorResolve'); +const kSourceIteratorChunk = Symbol('SourceIteratorChunk'); +const kSourceIteratorStream = Symbol('SourceIteratorStream'); +const kSourceIteratorPump = Symbol('SourceIteratorPump'); +const kSourceIteratorGenerator = Symbol('SourceIteratorGenerator'); + +Object.setPrototypeOf(Transform.prototype, Duplex.prototype); +Object.setPrototypeOf(Transform, Duplex); function afterTransform(er, data) { @@ -203,7 +216,6 @@ Transform.prototype._destroy = function(err, cb) { }); }; - function done(stream, er, data) { if (er) return stream.emit('error', er); @@ -219,3 +231,109 @@ function done(stream, er, data) { throw new ERR_TRANSFORM_ALREADY_TRANSFORMING(); return stream.push(null); } + +function SourceIterator(asyncGeneratorFn, opts) { + const source = this; + this[kSourceIteratorPull] = null; + this[kSourceIteratorChunk] = null; + this[kSourceIteratorResolve] = null; + this[kSourceIteratorGenerator] = asyncGeneratorFn; + this[kSourceIteratorStream] = new Transform({ + objectMode: true, + ...opts, + transform(chunk, encoding, cb) { + source.encoding = encoding || + source[kSourceIteratorStream]._transformState.writeencoding; + if (source[kSourceIteratorResolve] === null) { + source[kSourceIteratorChunk] = chunk; + source[kSourceIteratorPull] = cb; + return; + } + source[kSourceIteratorResolve]({ value: chunk, done: false }); + source[kSourceIteratorResolve] = null; + cb(null); + } + }); + this.encoding = this[kSourceIteratorStream]._transformState.writeencoding; + + this[kSourceIteratorPump](); +} + +SourceIterator.prototype[Symbol.asyncIterator] = function() { + return this; +}; + +Object.setPrototypeOf(SourceIterator.prototype, + AsyncIteratorPrototype); + +SourceIterator.prototype.next = function next() { + if (this[kSourceIteratorPull] === null || + this[kSourceIteratorChunk] === null) return new Promise((resolve) => { + this[kSourceIteratorResolve] = resolve; + }); + this[kSourceIteratorPull](null); + const result = Promise.resolve({ + value: this[kSourceIteratorChunk], + done: false }); + this[kSourceIteratorChunk] = null; + this[kSourceIteratorPull] = null; + return result; +}; + +SourceIterator.prototype[kSourceIteratorPump] = async function pump() { + const stream = this[kSourceIteratorStream]; + try { + const asyncGeneratorFn = this[kSourceIteratorGenerator]; + const transformIter = asyncGeneratorFn(this); + stream.removeListener('prefinish', prefinish); + stream.on('prefinish', () => { + if (this[kSourceIteratorResolve] !== null) { + this[kSourceIteratorResolve]({ value: undefined, done: true }); + } + }); + let next = await transformIter.next(); + do { + if (next.done) { + if (next.value !== undefined) stream.push(next.value); + + // In the event of an early return we explicitly + // discard any buffered state + if (stream._writableState.length > 0) { + const { length } = stream._writableState; + const { transforming } = stream._transformState; + stream._writableState.length = 0; + stream._transformState.transforming = false; + prefinish.call(stream); + stream._writableState.length = length; + stream._transformState.transforming = transforming; + } else { + prefinish.call(stream); + } + break; + } + stream.push(next.value); + } while (next = await transformIter.next()); + } catch (err) { + stream.destroy(err); + } finally { + this[kSourceIteratorPull] = null; + this[kSourceIteratorChunk] = null; + this[kSourceIteratorResolve] = null; + this[kSourceIteratorGenerator] = null; + this[kSourceIteratorStream] = null; + } +}; + + +Transform.by = function by(asyncGeneratorFn, opts) { + if (!(asyncGeneratorFn instanceof AsyncGeneratorFunction)) + throw new ERR_INVALID_ARG_TYPE( + 'asyncGeneratorFn', + ['AsyncGeneratorFunction'], + asyncGeneratorFn); + + const source = new SourceIterator(asyncGeneratorFn, opts); + const stream = source[kSourceIteratorStream]; + + return stream; +}; diff --git a/test/parallel/test-stream-readable-async-iterators.js b/test/parallel/test-stream-readable-async-iterators.js index 4a63e9fd3022e6..b08586e7a279b3 100644 --- a/test/parallel/test-stream-readable-async-iterators.js +++ b/test/parallel/test-stream-readable-async-iterators.js @@ -396,6 +396,28 @@ async function tests() { } } + console.log('readable side of a transform stream pushes null'); + { + const transform = new Transform({ + objectMode: true, + transform(chunk, enc, cb) { + cb(null, chunk); + } + }); + transform.push(0); + transform.push(1); + transform.push(null); + const mustReach = common.mustCall(); + const iter = transform[Symbol.asyncIterator](); + assert.strictEqual((await iter.next()).value, 0); + + for await (const d of iter) { + assert.strictEqual(d, 1); + } + + mustReach(); + } + { console.log('readable side of a transform stream pushes null'); const transform = new Transform({ diff --git a/test/parallel/test-transform-by.js b/test/parallel/test-transform-by.js new file mode 100644 index 00000000000000..e009996beb7e8a --- /dev/null +++ b/test/parallel/test-transform-by.js @@ -0,0 +1,209 @@ +'use strict'; +const { mustCall } = require('../common'); +const { once } = require('events'); +const { Readable, Transform } = require('stream'); +const { strictEqual } = require('assert'); + +async function transformBy() { + const readable = Readable.from('test'); + async function * mapper(source) { + for await (const chunk of source) { + yield chunk.toUpperCase(); + } + } + + const stream = Transform.by(mapper); + readable.pipe(stream); + const expected = ['T', 'E', 'S', 'T']; + for await (const chunk of stream) { + strictEqual(chunk, expected.shift()); + } +} + +async function transformByEncoding() { + const readable = Readable.from('test'); + async function * mapper(source) { + for await (const chunk of source) { + strictEqual(source.encoding, 'ascii'); + yield chunk.toUpperCase(); + } + } + const stream = Transform.by(mapper); + stream.setDefaultEncoding('ascii'); + readable.pipe(stream); + + const expected = ['T', 'E', 'S', 'T']; + for await (const chunk of stream) { + strictEqual(chunk, expected.shift()); + } +} + +async function transformBySourceIteratorCompletes() { + const readable = Readable.from('test'); + const mustReach = mustCall(); + async function * mapper(source) { + for await (const chunk of source) { + yield chunk.toUpperCase(); + } + mustReach(); + } + + const stream = Transform.by(mapper); + readable.pipe(stream); + const expected = ['T', 'E', 'S', 'T']; + for await (const chunk of stream) { + strictEqual(chunk, expected.shift()); + } +} + +async function transformByYieldPlusReturn() { + const readable = Readable.from('test'); + async function * mapper(source) { + for await (const chunk of source) { + yield chunk.toUpperCase(); + } + return 'final chunk'; + } + + const stream = Transform.by(mapper); + readable.pipe(stream); + const expected = ['T', 'E', 'S', 'T', 'final chunk']; + for await (const chunk of stream) { + strictEqual(chunk, expected.shift()); + } +} + +async function transformByReturnEndsStream() { + const readable = Readable.from('test'); + async function * mapper(source) { + for await (const chunk of source) { + yield chunk.toUpperCase(); + return 'stop'; + } + } + + const stream = Transform.by(mapper); + readable.pipe(stream); + const expected = ['T', 'stop']; + const mustReach = mustCall(); + for await (const chunk of stream) { + strictEqual(chunk, expected.shift()); + } + mustReach(); +} + +async function transformByOnData() { + const readable = Readable.from('test'); + async function * mapper(source) { + for await (const chunk of source) { + yield chunk.toUpperCase(); + } + } + + const stream = Transform.by(mapper); + readable.pipe(stream); + const expected = ['T', 'E', 'S', 'T']; + let iterations = 0; + stream.on('data', (chunk) => { + iterations++; + strictEqual(chunk, expected.shift()); + }); + + await once(stream, 'end'); + strictEqual(iterations, 4); +} + +async function transformByOnDataNonObject() { + const readable = Readable.from('test', { objectMode: false }); + async function * mapper(source) { + for await (const chunk of source) { + yield chunk.toString().toUpperCase(); + } + } + const stream = Transform.by(mapper, { objectMode: false }); + readable.pipe(stream); + const expected = ['T', 'E', 'S', 'T']; + let iterations = 0; + stream.on('data', (chunk) => { + iterations++; + strictEqual(chunk instanceof Buffer, true); + strictEqual(chunk.toString(), expected.shift()); + }); + + await once(stream, 'end'); + strictEqual(iterations, 4); +} + +async function transformByOnErrorAndDestroyed() { + const stream = Readable.from('test').pipe(Transform.by( + async function * mapper(source) { + for await (const chunk of source) { + if (chunk === 'e') throw new Error('kaboom'); + yield chunk.toUpperCase(); + } + } + )); + stream.on('data', (chunk) => { + strictEqual(chunk.toString(), 'T'); + }); + strictEqual(stream.destroyed, false); + const [ err ] = await once(stream, 'error'); + strictEqual(err.message, 'kaboom'); + strictEqual(stream.destroyed, true); +} + +async function transformByErrorTryCatchAndDestroyed() { + const stream = Readable.from('test').pipe(Transform.by( + async function * mapper(source) { + for await (const chunk of source) { + if (chunk === 'e') throw new Error('kaboom'); + yield chunk.toUpperCase(); + } + } + )); + strictEqual(stream.destroyed, false); + try { + for await (const chunk of stream) { + strictEqual(chunk.toString(), 'T'); + } + } catch (err) { + strictEqual(err.message, 'kaboom'); + strictEqual(stream.destroyed, true); + } +} + +async function transformByOnErrorAndTryCatchAndDestroyed() { + const stream = Readable.from('test').pipe(Transform.by( + async function * mapper(source) { + for await (const chunk of source) { + if (chunk === 'e') throw new Error('kaboom'); + yield chunk.toUpperCase(); + } + } + )); + strictEqual(stream.destroyed, false); + stream.once('error', mustCall((err) => { + strictEqual(err.message, 'kaboom'); + })); + try { + for await (const chunk of stream) { + strictEqual(chunk.toString(), 'T'); + } + } catch (err) { + strictEqual(err.message, 'kaboom'); + strictEqual(stream.destroyed, true); + } +} + +Promise.all([ + transformBy(), + transformByEncoding(), + transformBySourceIteratorCompletes(), + transformByYieldPlusReturn(), + transformByReturnEndsStream(), + transformByOnData(), + transformByOnDataNonObject(), + transformByOnErrorAndDestroyed(), + transformByErrorTryCatchAndDestroyed(), + transformByOnErrorAndTryCatchAndDestroyed() +]).then(mustCall()); diff --git a/tools/doc/type-parser.js b/tools/doc/type-parser.js index ef4499e50ff35a..16b80a36d3d50d 100644 --- a/tools/doc/type-parser.js +++ b/tools/doc/type-parser.js @@ -28,6 +28,8 @@ const customTypesMap = { 'AsyncIterator': 'https://tc39.github.io/ecma262/#sec-asynciterator-interface', + 'AsyncGeneratorFunction': 'https://tc39.es/ecma262/#sec-async-generator-function-definitions', + 'bigint': `${jsDocPrefix}Reference/Global_Objects/BigInt`, 'Iterable': From 9005e46658af53718305af2d2f3d5af7086b0efd Mon Sep 17 00:00:00 2001 From: David Mark Clements Date: Tue, 2 Jul 2019 00:52:47 +0200 Subject: [PATCH 02/42] docs: stream.Transform.by typo --- doc/api/stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index 3b16bbf8c09562..cc9e693a8d95c8 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1652,7 +1652,7 @@ for performance reasons. accepts a `source` iterable which can be used to read incoming data, while transformed data is pushed to the stream with the `yield` keyword. * `options` {Object} Options provided to `new stream.Transform([options])`. -By default, `Readable.transform()` will set `options.objectMode` to `true`, +By default, `Tranfrom.by()` will set `options.objectMode` to `true`, unless this is explicitly opted out by setting `options.objectMode` to `false`. A utility method for creating Transform Streams with async generator functions. From 944f228fd26d4627176dc907d97c34fd34f626b9 Mon Sep 17 00:00:00 2001 From: David Mark Clements Date: Tue, 2 Jul 2019 01:21:02 +0200 Subject: [PATCH 03/42] stream: Transform.by SourceIterator next optimization instead of allocating a new function for the Promise constructor per call to next (e.g. via for await) allocate once on init and reuse --- lib/_stream_transform.js | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js index c26f3d585531ab..2644a6733a598c 100644 --- a/lib/_stream_transform.js +++ b/lib/_stream_transform.js @@ -87,6 +87,7 @@ const kSourceIteratorChunk = Symbol('SourceIteratorChunk'); const kSourceIteratorStream = Symbol('SourceIteratorStream'); const kSourceIteratorPump = Symbol('SourceIteratorPump'); const kSourceIteratorGenerator = Symbol('SourceIteratorGenerator'); +const kSourceIteratorGrabResolve = Symbol('SourceIteratorGrabResolve'); Object.setPrototypeOf(Transform.prototype, Duplex.prototype); Object.setPrototypeOf(Transform, Duplex); @@ -255,7 +256,9 @@ function SourceIterator(asyncGeneratorFn, opts) { } }); this.encoding = this[kSourceIteratorStream]._transformState.writeencoding; - + this[kSourceIteratorGrabResolve] = (resolve) => { + this[kSourceIteratorResolve] = resolve; + }; this[kSourceIteratorPump](); } @@ -267,10 +270,9 @@ Object.setPrototypeOf(SourceIterator.prototype, AsyncIteratorPrototype); SourceIterator.prototype.next = function next() { - if (this[kSourceIteratorPull] === null || - this[kSourceIteratorChunk] === null) return new Promise((resolve) => { - this[kSourceIteratorResolve] = resolve; - }); + if (this[kSourceIteratorPull] === null || this[kSourceIteratorChunk] === null) + return new Promise(this[kSourceIteratorGrabResolve]); + this[kSourceIteratorPull](null); const result = Promise.resolve({ value: this[kSourceIteratorChunk], From 84c84c9fdf00d0594031925cfb46ad12a84f6aaf Mon Sep 17 00:00:00 2001 From: David Mark Clements Date: Tue, 2 Jul 2019 20:44:51 +0200 Subject: [PATCH 04/42] docs: Transform.by doc tweaks Co-Authored-By: Vse Mozhet Byt --- doc/api/stream.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index cc9e693a8d95c8..bc99e2f1040333 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1679,8 +1679,8 @@ transform.on('data', (chunk) => { The `source` parameter has an `encoding` property which represents the encoding of the `WriteableStream` side of the transform. This is the same `encoding` -value that would be passed as the second parameter to the `transform` function -option (or `_transform` method) supplied to `stream.Transform`. +value that would be passed as the second parameter to the `transform()` function +option (or `_transform()` method) supplied to `stream.Transform`. ## API for Stream Implementers From a1937c7aa1afd904530ddecf71e9c4527209dbb5 Mon Sep 17 00:00:00 2001 From: David Mark Clements Date: Tue, 2 Jul 2019 20:47:02 +0200 Subject: [PATCH 05/42] docs: sort type-parser types alphabetically within their groups --- tools/doc/type-parser.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/doc/type-parser.js b/tools/doc/type-parser.js index 16b80a36d3d50d..082cf039c07d81 100644 --- a/tools/doc/type-parser.js +++ b/tools/doc/type-parser.js @@ -28,7 +28,7 @@ const customTypesMap = { 'AsyncIterator': 'https://tc39.github.io/ecma262/#sec-asynciterator-interface', - 'AsyncGeneratorFunction': 'https://tc39.es/ecma262/#sec-async-generator-function-definitions', + 'AsyncIterator': 'https://tc39.github.io/ecma262/#sec-asynciterator-interface', 'bigint': `${jsDocPrefix}Reference/Global_Objects/BigInt`, From ad35382afcde499d48c48061954dda1d63c2b9b3 Mon Sep 17 00:00:00 2001 From: David Mark Clements Date: Tue, 2 Jul 2019 20:48:11 +0200 Subject: [PATCH 06/42] docs: sort type-parser types alphabetically within their groups --- tools/doc/type-parser.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/doc/type-parser.js b/tools/doc/type-parser.js index 082cf039c07d81..680a8aef0bd689 100644 --- a/tools/doc/type-parser.js +++ b/tools/doc/type-parser.js @@ -26,7 +26,7 @@ const customTypesMap = { 'this': `${jsDocPrefix}Reference/Operators/this`, - 'AsyncIterator': 'https://tc39.github.io/ecma262/#sec-asynciterator-interface', + 'AsyncGeneratorFunction': 'https://tc39.es/ecma262/#sec-async-generator-function-definitions', 'AsyncIterator': 'https://tc39.github.io/ecma262/#sec-asynciterator-interface', From 8d82c5b93395de435bed75ae7b6c469cad68ee0d Mon Sep 17 00:00:00 2001 From: David Mark Clements Date: Tue, 2 Jul 2019 20:59:06 +0200 Subject: [PATCH 07/42] docs: typo --- doc/api/stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index bc99e2f1040333..5d0796c1cd8f8b 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1652,7 +1652,7 @@ for performance reasons. accepts a `source` iterable which can be used to read incoming data, while transformed data is pushed to the stream with the `yield` keyword. * `options` {Object} Options provided to `new stream.Transform([options])`. -By default, `Tranfrom.by()` will set `options.objectMode` to `true`, +By default, `Transform.by()` will set `options.objectMode` to `true`, unless this is explicitly opted out by setting `options.objectMode` to `false`. A utility method for creating Transform Streams with async generator functions. From 7a1ef775ca88c24fef87fa312c41956adf59f976 Mon Sep 17 00:00:00 2001 From: David Mark Clements Date: Wed, 3 Jul 2019 00:04:49 +0200 Subject: [PATCH 08/42] docs: Transform.by clarify as async iterable Co-Authored-By: Anna Henningsen --- doc/api/stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index 5d0796c1cd8f8b..9b2c1d78285372 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1649,7 +1649,7 @@ for performance reasons. ### stream.Transform.by(asyncGeneratorFunction, [options]) * `asyncGeneratorFunction` {AsyncGeneratorFunction} A mapping function which -accepts a `source` iterable which can be used to read incoming data, while +accepts a `source` async iterable which can be used to read incoming data, while transformed data is pushed to the stream with the `yield` keyword. * `options` {Object} Options provided to `new stream.Transform([options])`. By default, `Transform.by()` will set `options.objectMode` to `true`, From 57074456c1a64282ff5bf2692045f2719af4a449 Mon Sep 17 00:00:00 2001 From: David Mark Clements Date: Wed, 3 Jul 2019 00:10:01 +0200 Subject: [PATCH 09/42] stream: Transform. by remove unnecessary defensive code --- lib/_stream_transform.js | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js index 2644a6733a598c..2e1d2be6dc11b3 100644 --- a/lib/_stream_transform.js +++ b/lib/_stream_transform.js @@ -243,8 +243,7 @@ function SourceIterator(asyncGeneratorFn, opts) { objectMode: true, ...opts, transform(chunk, encoding, cb) { - source.encoding = encoding || - source[kSourceIteratorStream]._transformState.writeencoding; + source.encoding = encoding if (source[kSourceIteratorResolve] === null) { source[kSourceIteratorChunk] = chunk; source[kSourceIteratorPull] = cb; From 970ed3d69edf8a5b4e3ed4ee13ecc7ad63fdbf82 Mon Sep 17 00:00:00 2001 From: David Mark Clements Date: Wed, 3 Jul 2019 00:10:45 +0200 Subject: [PATCH 10/42] stream: Transform.by code style Co-Authored-By: Anna Henningsen --- lib/_stream_transform.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js index 2e1d2be6dc11b3..92a421cb5e0136 100644 --- a/lib/_stream_transform.js +++ b/lib/_stream_transform.js @@ -275,7 +275,8 @@ SourceIterator.prototype.next = function next() { this[kSourceIteratorPull](null); const result = Promise.resolve({ value: this[kSourceIteratorChunk], - done: false }); + done: false + }); this[kSourceIteratorChunk] = null; this[kSourceIteratorPull] = null; return result; From 6e67a8584c0e0e19476280d32bb8c1b2d70410d4 Mon Sep 17 00:00:00 2001 From: David Mark Clements Date: Wed, 3 Jul 2019 13:32:19 +0200 Subject: [PATCH 11/42] stream: Transform.by minor refactoring --- lib/_stream_transform.js | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js index 92a421cb5e0136..d501c26cbaac30 100644 --- a/lib/_stream_transform.js +++ b/lib/_stream_transform.js @@ -81,13 +81,13 @@ const { constructor: AsyncGeneratorFunction } = async function * () {}; const AsyncIteratorPrototype = Object.getPrototypeOf( Object.getPrototypeOf(async function* () {}).prototype); -const kSourceIteratorPull = Symbol('SourceIteratorPull'); -const kSourceIteratorResolve = Symbol('SourceIteratorResolve'); -const kSourceIteratorChunk = Symbol('SourceIteratorChunk'); -const kSourceIteratorStream = Symbol('SourceIteratorStream'); -const kSourceIteratorPump = Symbol('SourceIteratorPump'); -const kSourceIteratorGenerator = Symbol('SourceIteratorGenerator'); -const kSourceIteratorGrabResolve = Symbol('SourceIteratorGrabResolve'); +const kSourceIteratorPull = Symbol('kSourceIteratorPull'); +const kSourceIteratorResolve = Symbol('kSourceIteratorResolve'); +const kSourceIteratorChunk = Symbol('kSourceIteratorChunk'); +const kSourceIteratorStream = Symbol('kSourceIteratorStream'); +const kSourceIteratorPump = Symbol('kSourceIteratorPump'); +const kSourceIteratorGenerator = Symbol('kSourceIteratorGenerator'); +const kSourceIteratorGrabResolve = Symbol('kSourceIteratorGrabResolve'); Object.setPrototypeOf(Transform.prototype, Duplex.prototype); Object.setPrototypeOf(Transform, Duplex); @@ -243,7 +243,7 @@ function SourceIterator(asyncGeneratorFn, opts) { objectMode: true, ...opts, transform(chunk, encoding, cb) { - source.encoding = encoding + source.encoding = encoding; if (source[kSourceIteratorResolve] === null) { source[kSourceIteratorChunk] = chunk; source[kSourceIteratorPull] = cb; @@ -293,10 +293,10 @@ SourceIterator.prototype[kSourceIteratorPump] = async function pump() { this[kSourceIteratorResolve]({ value: undefined, done: true }); } }); - let next = await transformIter.next(); - do { - if (next.done) { - if (next.value !== undefined) stream.push(next.value); + while (true) { + const { done, value } = await transformIter.next(); + if (done) { + if (value !== undefined) stream.push(value); // In the event of an early return we explicitly // discard any buffered state @@ -313,8 +313,8 @@ SourceIterator.prototype[kSourceIteratorPump] = async function pump() { } break; } - stream.push(next.value); - } while (next = await transformIter.next()); + stream.push(value); + } } catch (err) { stream.destroy(err); } finally { From 98aebc5dabc7fa6afb38c14ae700387959639a34 Mon Sep 17 00:00:00 2001 From: David Mark Clements Date: Fri, 5 Jul 2019 19:52:33 +0200 Subject: [PATCH 12/42] streams: Transform.by check fn return value instead of fn instance --- doc/api/errors.md | 6 +++ lib/_stream_transform.js | 36 +++++++------ lib/internal/errors.js | 2 + test/parallel/test-transform-by.js | 85 +++++++++++++++++++++++++++++- 4 files changed, 111 insertions(+), 18 deletions(-) diff --git a/doc/api/errors.md b/doc/api/errors.md index cf21c142d6dd97..278e3386923e76 100644 --- a/doc/api/errors.md +++ b/doc/api/errors.md @@ -630,6 +630,12 @@ display if `block` does not throw. An iterable argument (i.e. a value that works with `for...of` loops) was required, but not provided to a Node.js API. + +### ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE + +A function argument that returns an async iterable (i.e. a value that works +with `for await...of` loops) was required, but not provided to a Node.js API. + ### ERR_ASSERTION diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js index d501c26cbaac30..a3a55af7f09493 100644 --- a/lib/_stream_transform.js +++ b/lib/_stream_transform.js @@ -69,15 +69,13 @@ const { module.exports = Transform; const { - ERR_INVALID_ARG_TYPE, + ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE, ERR_METHOD_NOT_IMPLEMENTED, ERR_MULTIPLE_CALLBACK, ERR_TRANSFORM_ALREADY_TRANSFORMING, ERR_TRANSFORM_WITH_LENGTH_0 } = require('internal/errors').codes; const Duplex = require('_stream_duplex'); -// eslint-disable-next-line func-style -const { constructor: AsyncGeneratorFunction } = async function * () {}; const AsyncIteratorPrototype = Object.getPrototypeOf( Object.getPrototypeOf(async function* () {}).prototype); @@ -86,7 +84,6 @@ const kSourceIteratorResolve = Symbol('kSourceIteratorResolve'); const kSourceIteratorChunk = Symbol('kSourceIteratorChunk'); const kSourceIteratorStream = Symbol('kSourceIteratorStream'); const kSourceIteratorPump = Symbol('kSourceIteratorPump'); -const kSourceIteratorGenerator = Symbol('kSourceIteratorGenerator'); const kSourceIteratorGrabResolve = Symbol('kSourceIteratorGrabResolve'); Object.setPrototypeOf(Transform.prototype, Duplex.prototype); @@ -235,10 +232,18 @@ function done(stream, er, data) { function SourceIterator(asyncGeneratorFn, opts) { const source = this; + const result = asyncGeneratorFn(this); + if (typeof result[Symbol.asyncIterator] !== 'function') { + throw new ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE('asyncGeneratorFn'); + } + const iter = result[Symbol.asyncIterator](); + if (typeof iter.next !== 'function') { + throw new ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE('asyncGeneratorFn'); + } + this[kSourceIteratorPull] = null; this[kSourceIteratorChunk] = null; this[kSourceIteratorResolve] = null; - this[kSourceIteratorGenerator] = asyncGeneratorFn; this[kSourceIteratorStream] = new Transform({ objectMode: true, ...opts, @@ -258,7 +263,11 @@ function SourceIterator(asyncGeneratorFn, opts) { this[kSourceIteratorGrabResolve] = (resolve) => { this[kSourceIteratorResolve] = resolve; }; - this[kSourceIteratorPump](); + const first = iter.next(); + if (!first || typeof first.then !== 'function') { + throw new ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE('asyncGeneratorFn'); + } + this[kSourceIteratorPump](iter, first); } SourceIterator.prototype[Symbol.asyncIterator] = function() { @@ -282,19 +291,18 @@ SourceIterator.prototype.next = function next() { return result; }; -SourceIterator.prototype[kSourceIteratorPump] = async function pump() { +SourceIterator.prototype[kSourceIteratorPump] = async function pump(iter, p) { const stream = this[kSourceIteratorStream]; try { - const asyncGeneratorFn = this[kSourceIteratorGenerator]; - const transformIter = asyncGeneratorFn(this); stream.removeListener('prefinish', prefinish); stream.on('prefinish', () => { if (this[kSourceIteratorResolve] !== null) { this[kSourceIteratorResolve]({ value: undefined, done: true }); } }); + let next = await p; while (true) { - const { done, value } = await transformIter.next(); + const { done, value } = next; if (done) { if (value !== undefined) stream.push(value); @@ -314,6 +322,7 @@ SourceIterator.prototype[kSourceIteratorPump] = async function pump() { break; } stream.push(value); + next = await iter.next(); } } catch (err) { stream.destroy(err); @@ -321,19 +330,12 @@ SourceIterator.prototype[kSourceIteratorPump] = async function pump() { this[kSourceIteratorPull] = null; this[kSourceIteratorChunk] = null; this[kSourceIteratorResolve] = null; - this[kSourceIteratorGenerator] = null; this[kSourceIteratorStream] = null; } }; Transform.by = function by(asyncGeneratorFn, opts) { - if (!(asyncGeneratorFn instanceof AsyncGeneratorFunction)) - throw new ERR_INVALID_ARG_TYPE( - 'asyncGeneratorFn', - ['AsyncGeneratorFunction'], - asyncGeneratorFn); - const source = new SourceIterator(asyncGeneratorFn, opts); const stream = source[kSourceIteratorStream]; diff --git a/lib/internal/errors.js b/lib/internal/errors.js index ad12d99c7cc49c..39f580a609cdef 100644 --- a/lib/internal/errors.js +++ b/lib/internal/errors.js @@ -720,6 +720,8 @@ module.exports = { // Note: Node.js specific errors must begin with the prefix ERR_ E('ERR_AMBIGUOUS_ARGUMENT', 'The "%s" argument is ambiguous. %s', TypeError); E('ERR_ARG_NOT_ITERABLE', '%s must be iterable', TypeError); +E('ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE', '%s must return an async iterable', + TypeError); E('ERR_ASSERTION', '%s', Error); E('ERR_ASYNC_CALLBACK', '%s must be a function', TypeError); E('ERR_ASYNC_TYPE', 'Invalid name for async "type": %s', TypeError); diff --git a/test/parallel/test-transform-by.js b/test/parallel/test-transform-by.js index e009996beb7e8a..4ac20ea424cebd 100644 --- a/test/parallel/test-transform-by.js +++ b/test/parallel/test-transform-by.js @@ -1,5 +1,5 @@ 'use strict'; -const { mustCall } = require('../common'); +const { mustCall, expectsError } = require('../common'); const { once } = require('events'); const { Readable, Transform } = require('stream'); const { strictEqual } = require('assert'); @@ -20,6 +20,83 @@ async function transformBy() { } } +async function transformByFuncReturnsObjectWithSymbolAsyncIterator() { + const readable = Readable.from('test'); + const mapper = (source) => ({ + [Symbol.asyncIterator]() { + return { + async next() { + const { done, value } = await source.next(); + return { done, value: value ? value.toUpperCase() : value }; + } + }; + } + }); + + const stream = Transform.by(mapper); + readable.pipe(stream); + const expected = ['T', 'E', 'S', 'T']; + for await (const chunk of stream) { + strictEqual(chunk, expected.shift()); + } +} + +async function +transformByObjReturnedWSymbolAsyncIteratorWithNonPromiseReturningNext() { + const mapper = (source) => ({ + [Symbol.asyncIterator]() { + return { + next() { + const { done, value } = source.next(); + return { done, value: value ? value.toUpperCase() : value }; + } + }; + } + }); + + expectsError(() => Transform.by(mapper), { + message: 'asyncGeneratorFn must return an async iterable', + code: 'ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE', + type: TypeError + }); +} + +async function transformByObjReturnedWSymbolAsyncIteratorWithNoNext() { + const mapper = () => ({ + [Symbol.asyncIterator]() { + return {}; + } + }); + + expectsError(() => Transform.by(mapper), { + message: 'asyncGeneratorFn must return an async iterable', + code: 'ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE', + type: TypeError + }); +} + +async function transformByObjReturnedWSymbolAsyncIteratorThatIsNotFunction() { + const mapper = () => ({ + [Symbol.asyncIterator]: 'wrong' + }); + + expectsError(() => Transform.by(mapper), { + message: 'asyncGeneratorFn must return an async iterable', + code: 'ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE', + type: TypeError + }); +} + +async function transformByFuncReturnsObjectWithoutSymbolAsyncIterator() { + const mapper = () => ({}); + + expectsError(() => Transform.by(mapper), { + message: 'asyncGeneratorFn must return an async iterable', + code: 'ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE', + type: TypeError + }); +} + async function transformByEncoding() { const readable = Readable.from('test'); async function * mapper(source) { @@ -197,6 +274,12 @@ async function transformByOnErrorAndTryCatchAndDestroyed() { Promise.all([ transformBy(), + transformByFuncReturnsObjectWithSymbolAsyncIterator(), + transformByObjReturnedWSymbolAsyncIteratorWithNonPromiseReturningNext(), + + transformByObjReturnedWSymbolAsyncIteratorWithNoNext(), + transformByObjReturnedWSymbolAsyncIteratorThatIsNotFunction(), + transformByFuncReturnsObjectWithoutSymbolAsyncIterator(), transformByEncoding(), transformBySourceIteratorCompletes(), transformByYieldPlusReturn(), From eee19c591ecf4195912e280a0f3dd59eb6781de3 Mon Sep 17 00:00:00 2001 From: David Mark Clements Date: Sun, 7 Jul 2019 15:50:56 +0200 Subject: [PATCH 13/42] docs: emphasize Transform.by objectMode default behaviour --- doc/api/stream.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/doc/api/stream.md b/doc/api/stream.md index 9b2c1d78285372..e8449f64634301 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1667,6 +1667,9 @@ const { Readable, Transform } = require('stream'); const readable = Readable.from(['hello', 'streams']); async function * mapper(source) { for await (const chunk of source) { + // If objectMode was set to false, the buffer would have to be converted + // to a string here but since it is true by default for both Readable.from + // and Transform.by each chunk is already a string yield chunk.toUpperCase(); } } From 26e96be1ea86a33c3878eace10d2aebc7f8f2848 Mon Sep 17 00:00:00 2001 From: David Mark Clements Date: Sun, 7 Jul 2019 17:48:40 +0200 Subject: [PATCH 14/42] docs: Transform.by function naming convention Co-Authored-By: Vse Mozhet Byt --- doc/api/stream.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index e8449f64634301..11310b06be7bfc 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1668,8 +1668,8 @@ const readable = Readable.from(['hello', 'streams']); async function * mapper(source) { for await (const chunk of source) { // If objectMode was set to false, the buffer would have to be converted - // to a string here but since it is true by default for both Readable.from - // and Transform.by each chunk is already a string + // to a string here but since it is true by default for both Readable.from() + // and Transform.by() each chunk is already a string. yield chunk.toUpperCase(); } } From 185a6f481a02cff0eeebb2f4acbbed80e4b4648d Mon Sep 17 00:00:00 2001 From: David Mark Clements Date: Sun, 7 Jul 2019 21:55:47 +0200 Subject: [PATCH 15/42] docs: add transform content to streams <-> async generators compatibility --- doc/api/stream.md | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/doc/api/stream.md b/doc/api/stream.md index 11310b06be7bfc..cb9079c0959075 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -2670,6 +2670,36 @@ readable.on('data', (chunk) => { }); ``` +#### Creating Transform Streams with Async Generator Functions + +We can construct a Node.js Transform stream with an asynchronous +generator function using the `Transform.by` utility method. + + +```js +const { Readable, Transform } = require('stream'); + +async function * toUpperCase(source) { + for await (const chunk of source) { + yield chunk.toUpperCase(); + } +} +const transform = Transform.by(toUpperCase); + +async function * generate() { + yield 'a'; + yield 'b'; + yield 'c'; +} + +const readable = Readable.from(generate()); + +readable.pipe(transform); +transform.on('data', (chunk) => { + console.log(chunk); +}); +``` + #### Piping to Writable Streams from Async Iterators In the scenario of writing to a writable stream from an async iterator, ensure From 8e73c519c6b380b800966e356546ba180fedef5e Mon Sep 17 00:00:00 2001 From: David Mark Clements Date: Sun, 7 Jul 2019 21:56:38 +0200 Subject: [PATCH 16/42] docs: includemissing parens --- doc/api/stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index cb9079c0959075..b6b9fd15817cb9 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -2673,7 +2673,7 @@ readable.on('data', (chunk) => { #### Creating Transform Streams with Async Generator Functions We can construct a Node.js Transform stream with an asynchronous -generator function using the `Transform.by` utility method. +generator function using the `Transform.by()` utility method. ```js From b73347f5d5467197d0e5890bcd1f9201a818b342 Mon Sep 17 00:00:00 2001 From: David Mark Clements Date: Sun, 7 Jul 2019 21:59:48 +0200 Subject: [PATCH 17/42] tests: preempt conflict with #28566 --- test/parallel/test-stream-readable-async-iterators.js | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/test/parallel/test-stream-readable-async-iterators.js b/test/parallel/test-stream-readable-async-iterators.js index b08586e7a279b3..1ec13aa06046b4 100644 --- a/test/parallel/test-stream-readable-async-iterators.js +++ b/test/parallel/test-stream-readable-async-iterators.js @@ -400,9 +400,7 @@ async function tests() { { const transform = new Transform({ objectMode: true, - transform(chunk, enc, cb) { - cb(null, chunk); - } + transform: common.mustNotCall() }); transform.push(0); transform.push(1); From 9789a5b1e64c05ccbd06bf70f7ca6aa8a718a5d8 Mon Sep 17 00:00:00 2001 From: David Mark Clements Date: Fri, 12 Jul 2019 14:48:38 +0200 Subject: [PATCH 18/42] tests: fix transform async iterator test --- .../test-stream-readable-async-iterators.js | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/test/parallel/test-stream-readable-async-iterators.js b/test/parallel/test-stream-readable-async-iterators.js index 1ec13aa06046b4..3bad0875522d03 100644 --- a/test/parallel/test-stream-readable-async-iterators.js +++ b/test/parallel/test-stream-readable-async-iterators.js @@ -396,24 +396,28 @@ async function tests() { } } - console.log('readable side of a transform stream pushes null'); { + console.log('readable side of a transform stream pushes null'); const transform = new Transform({ objectMode: true, - transform: common.mustNotCall() + transform: (chunk, enc, cb) => { cb(null, chunk); } }); transform.push(0); transform.push(1); - transform.push(null); - const mustReach = common.mustCall(); + process.nextTick(() => { + transform.push(null); + }); + + const mustReach = [ common.mustCall(), common.mustCall() ]; + const iter = transform[Symbol.asyncIterator](); assert.strictEqual((await iter.next()).value, 0); for await (const d of iter) { assert.strictEqual(d, 1); + mustReach[0](); } - - mustReach(); + mustReach[1](); } { From bad0bfdd491562a5c85307a9e36ad32ae0b0d9a3 Mon Sep 17 00:00:00 2001 From: David Mark Clements Date: Mon, 15 Jul 2019 12:02:22 +0200 Subject: [PATCH 19/42] docs: add meta data to Transform.by Co-Authored-By: Anna Henningsen --- doc/api/stream.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/doc/api/stream.md b/doc/api/stream.md index b6b9fd15817cb9..bb4d2d39c3cb5f 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1647,6 +1647,9 @@ the strings or buffers be iterated to match the other streams semantics for performance reasons. ### stream.Transform.by(asyncGeneratorFunction, [options]) + * `asyncGeneratorFunction` {AsyncGeneratorFunction} A mapping function which accepts a `source` async iterable which can be used to read incoming data, while From f2c8b229ba06fc047536156c0e1969a950f8d3cb Mon Sep 17 00:00:00 2001 From: David Mark Clements Date: Mon, 15 Jul 2019 22:05:57 +0200 Subject: [PATCH 20/42] stream: error handling bug fix, ensure stream is destroyed after process.nextTick --- lib/_stream_transform.js | 2 +- test/parallel/test-transform-by.js | 21 +++++++++++++++++++-- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js index a3a55af7f09493..bb1d207cade0cd 100644 --- a/lib/_stream_transform.js +++ b/lib/_stream_transform.js @@ -325,7 +325,7 @@ SourceIterator.prototype[kSourceIteratorPump] = async function pump(iter, p) { next = await iter.next(); } } catch (err) { - stream.destroy(err); + process.nextTick(() => stream.destroy(err)); } finally { this[kSourceIteratorPull] = null; this[kSourceIteratorChunk] = null; diff --git a/test/parallel/test-transform-by.js b/test/parallel/test-transform-by.js index 4ac20ea424cebd..fe01986b8d1a0b 100644 --- a/test/parallel/test-transform-by.js +++ b/test/parallel/test-transform-by.js @@ -272,11 +272,27 @@ async function transformByOnErrorAndTryCatchAndDestroyed() { } } +async function transformByThrowPriorToForAwait() { + async function * generate() { + yield 'a'; + yield 'b'; + yield 'c'; + } + const read = Readable.from(generate()); + const stream = Transform.by(async function * transformA(source) { + throw new Error('kaboom'); + }); + stream.on('error', mustCall((err) => { + strictEqual(err.message, 'kaboom'); + })); + + read.pipe(stream); +} + Promise.all([ transformBy(), transformByFuncReturnsObjectWithSymbolAsyncIterator(), transformByObjReturnedWSymbolAsyncIteratorWithNonPromiseReturningNext(), - transformByObjReturnedWSymbolAsyncIteratorWithNoNext(), transformByObjReturnedWSymbolAsyncIteratorThatIsNotFunction(), transformByFuncReturnsObjectWithoutSymbolAsyncIterator(), @@ -288,5 +304,6 @@ Promise.all([ transformByOnDataNonObject(), transformByOnErrorAndDestroyed(), transformByErrorTryCatchAndDestroyed(), - transformByOnErrorAndTryCatchAndDestroyed() + transformByOnErrorAndTryCatchAndDestroyed(), + transformByThrowPriorToForAwait() ]).then(mustCall()); From 001fe014a2de99dbf4db6b47ad0748b9211717db Mon Sep 17 00:00:00 2001 From: David Mark Clements Date: Mon, 5 Aug 2019 15:57:06 +0200 Subject: [PATCH 21/42] Update doc/api/stream.md Co-Authored-By: Vse Mozhet Byt --- doc/api/stream.md | 1 - 1 file changed, 1 deletion(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index bb4d2d39c3cb5f..1906b42e279f97 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1688,7 +1688,6 @@ of the `WriteableStream` side of the transform. This is the same `encoding` value that would be passed as the second parameter to the `transform()` function option (or `_transform()` method) supplied to `stream.Transform`. - ## API for Stream Implementers From 9545ee02105474c300f5001a8371aaf8cc01d461 Mon Sep 17 00:00:00 2001 From: David Mark Clements Date: Mon, 5 Aug 2019 15:57:16 +0200 Subject: [PATCH 22/42] Update doc/api/stream.md Co-Authored-By: Vse Mozhet Byt --- doc/api/stream.md | 1 - 1 file changed, 1 deletion(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index 1906b42e279f97..118c0fe55f72ca 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -2677,7 +2677,6 @@ readable.on('data', (chunk) => { We can construct a Node.js Transform stream with an asynchronous generator function using the `Transform.by()` utility method. - ```js const { Readable, Transform } = require('stream'); From f0fa8b61952751cd14a22d1d9db885f4cc994c13 Mon Sep 17 00:00:00 2001 From: David Mark Clements Date: Mon, 5 Aug 2019 15:58:31 +0200 Subject: [PATCH 23/42] Update doc/api/stream.md Co-Authored-By: Vse Mozhet Byt --- doc/api/stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index 118c0fe55f72ca..d9df009580dcac 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1646,7 +1646,7 @@ Calling `Readable.from(string)` or `Readable.from(buffer)` will not have the strings or buffers be iterated to match the other streams semantics for performance reasons. -### stream.Transform.by(asyncGeneratorFunction, [options]) +### stream.Transform.by(asyncGeneratorFunction[, options]) From b504b15eaa1844fdadd5942d190f22cbcd7d9b77 Mon Sep 17 00:00:00 2001 From: David Mark Clements Date: Mon, 5 Aug 2019 15:59:36 +0200 Subject: [PATCH 24/42] Update doc/api/stream.md Co-Authored-By: Vse Mozhet Byt --- doc/api/stream.md | 1 + 1 file changed, 1 insertion(+) diff --git a/doc/api/stream.md b/doc/api/stream.md index d9df009580dcac..54c62199fe0c47 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1657,6 +1657,7 @@ transformed data is pushed to the stream with the `yield` keyword. * `options` {Object} Options provided to `new stream.Transform([options])`. By default, `Transform.by()` will set `options.objectMode` to `true`, unless this is explicitly opted out by setting `options.objectMode` to `false`. +* Returns: {stream.Transform} A utility method for creating Transform Streams with async generator functions. The async generator is supplied a single argument, `source`, which is used to From 447a895818606d73d0e7f6d09bc528bc07eab8d1 Mon Sep 17 00:00:00 2001 From: David Mark Clements Date: Mon, 9 Sep 2019 19:42:00 +0200 Subject: [PATCH 25/42] streams: Transform.by, rm unncessary check --- lib/_stream_transform.js | 3 --- 1 file changed, 3 deletions(-) diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js index bb1d207cade0cd..bb4ca0878c1841 100644 --- a/lib/_stream_transform.js +++ b/lib/_stream_transform.js @@ -264,9 +264,6 @@ function SourceIterator(asyncGeneratorFn, opts) { this[kSourceIteratorResolve] = resolve; }; const first = iter.next(); - if (!first || typeof first.then !== 'function') { - throw new ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE('asyncGeneratorFn'); - } this[kSourceIteratorPump](iter, first); } From 2fe96e46d186b6be83b0eaeb6ae6769521487670 Mon Sep 17 00:00:00 2001 From: David Mark Clements Date: Fri, 13 Dec 2019 18:02:04 +0100 Subject: [PATCH 26/42] lint fixes --- doc/api/stream.md | 10 +++++----- lib/_stream_transform.js | 13 +++++++------ 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index 54c62199fe0c47..3526f869578c8e 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1732,7 +1732,7 @@ on the type of stream being created, as detailed in the chart below: | Reading only | [`Readable`][] | [`_read()`][stream-_read] | | Writing only | [`Writable`][] | [`_write()`][stream-_write], [`_writev()`][stream-_writev], [`_final()`][stream-_final] | | Reading and writing | [`Duplex`][] | [`_read()`][stream-_read], [`_write()`][stream-_write], [`_writev()`][stream-_writev], [`_final()`][stream-_final] | -| Operate on written data, then read the result | [`Transform`][] | [`_transform()`][stream-_transform], [`_flush()`][stream-_flush], [`_final()`][stream-_final] | +| Operate on written data, then read the result | [`Transform`][] | [`_transform()`][], [`_flush()`][stream-_flush], [`_final()`][stream-_final] | The implementation code for a stream should *never* call the "public" methods of a stream that are intended for use by consumers (as described in the @@ -2473,7 +2473,7 @@ The `stream.Transform` class is extended to implement a [`Transform`][] stream. The `stream.Transform` class prototypically inherits from `stream.Duplex` and implements its own versions of the `writable._write()` and `readable._read()` methods. Custom `Transform` implementations *must* implement the -[`transform._transform()`][stream-_transform] method and *may* also implement +[`transform._transform()`][] method and *may* also implement the [`transform._flush()`][stream-_flush] method. Care must be taken when using `Transform` streams in that data written to the @@ -2485,7 +2485,7 @@ output on the `Readable` side is not consumed. * `options` {Object} Passed to both `Writable` and `Readable` constructors. Also has the following fields: * `transform` {Function} Implementation for the - [`stream._transform()`][stream-_transform] method. + [`stream._transform()`][] method. * `flush` {Function} Implementation for the [`stream._flush()`][stream-_flush] method. @@ -2532,7 +2532,7 @@ const myTransform = new Transform({ The [`'finish'`][] and [`'end'`][] events are from the `stream.Writable` and `stream.Readable` classes, respectively. The `'finish'` event is emitted after [`stream.end()`][stream-end] is called and all chunks have been processed -by [`stream._transform()`][stream-_transform]. The `'end'` event is emitted +by [`stream._transform()`][]. The `'end'` event is emitted after all data has been output, which occurs after the callback in [`transform._flush()`][stream-_flush] has been called. In the case of an error, neither `'finish'` nor `'end'` should be emitted. @@ -2926,7 +2926,7 @@ contain multi-byte characters. [stream-_final]: #stream_writable_final_callback [stream-_flush]: #stream_transform_flush_callback [stream-_read]: #stream_readable_read_size_1 -[stream-_transform]: #stream_transform_transform_chunk_encoding_callback +[]: #stream_transform_transform_chunk_encoding_callback [stream-_write]: #stream_writable_write_chunk_encoding_callback_1 [stream-_writev]: #stream_writable_writev_chunks_callback [stream-end]: #stream_writable_end_chunk_encoding_callback diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js index bb4ca0878c1841..0eee13369be307 100644 --- a/lib/_stream_transform.js +++ b/lib/_stream_transform.js @@ -65,6 +65,8 @@ const { ObjectSetPrototypeOf, + ObjectGetPrototypeOf, + Symbol } = primordials; module.exports = Transform; @@ -76,8 +78,8 @@ const { ERR_TRANSFORM_WITH_LENGTH_0 } = require('internal/errors').codes; const Duplex = require('_stream_duplex'); -const AsyncIteratorPrototype = Object.getPrototypeOf( - Object.getPrototypeOf(async function* () {}).prototype); +const AsyncIteratorPrototype = ObjectGetPrototypeOf( + ObjectGetPrototypeOf(async function* () {}).prototype); const kSourceIteratorPull = Symbol('kSourceIteratorPull'); const kSourceIteratorResolve = Symbol('kSourceIteratorResolve'); @@ -86,8 +88,8 @@ const kSourceIteratorStream = Symbol('kSourceIteratorStream'); const kSourceIteratorPump = Symbol('kSourceIteratorPump'); const kSourceIteratorGrabResolve = Symbol('kSourceIteratorGrabResolve'); -Object.setPrototypeOf(Transform.prototype, Duplex.prototype); -Object.setPrototypeOf(Transform, Duplex); +ObjectSetPrototypeOf(Transform.prototype, Duplex.prototype); +ObjectSetPrototypeOf(Transform, Duplex); function afterTransform(er, data) { @@ -271,8 +273,7 @@ SourceIterator.prototype[Symbol.asyncIterator] = function() { return this; }; -Object.setPrototypeOf(SourceIterator.prototype, - AsyncIteratorPrototype); +ObjectSetPrototypeOf(SourceIterator.prototype, AsyncIteratorPrototype); SourceIterator.prototype.next = function next() { if (this[kSourceIteratorPull] === null || this[kSourceIteratorChunk] === null) From a8151fe98c37ae15bd7eb06466dcb4e870c636c1 Mon Sep 17 00:00:00 2001 From: Irakli Safareli Date: Sat, 15 May 2021 16:09:46 +0400 Subject: [PATCH 27/42] use implementation from @ronag --- lib/internal/streams/transform.js | 129 ++++-------------------------- 1 file changed, 16 insertions(+), 113 deletions(-) diff --git a/lib/internal/streams/transform.js b/lib/internal/streams/transform.js index 9efd3d3770f934..9ba4057824a33b 100644 --- a/lib/internal/streams/transform.js +++ b/lib/internal/streams/transform.js @@ -73,16 +73,7 @@ const { ERR_METHOD_NOT_IMPLEMENTED } = require('internal/errors').codes; const Duplex = require('internal/streams/duplex'); -const AsyncIteratorPrototype = ObjectGetPrototypeOf( - ObjectGetPrototypeOf(async function* () {}).prototype); - -const kSourceIteratorPull = Symbol('kSourceIteratorPull'); -const kSourceIteratorResolve = Symbol('kSourceIteratorResolve'); -const kSourceIteratorChunk = Symbol('kSourceIteratorChunk'); -const kSourceIteratorStream = Symbol('kSourceIteratorStream'); -const kSourceIteratorPump = Symbol('kSourceIteratorPump'); -const kSourceIteratorGrabResolve = Symbol('kSourceIteratorGrabResolve'); - +const from = require('internal/streams/from'); ObjectSetPrototypeOf(Transform.prototype, Duplex.prototype); ObjectSetPrototypeOf(Transform, Duplex); @@ -255,110 +246,22 @@ Transform.prototype._read = function() { } }; -function SourceIterator(asyncGeneratorFn, opts) { - const source = this; - const result = asyncGeneratorFn(this); - if (typeof result[Symbol.asyncIterator] !== 'function') { - throw new ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE('asyncGeneratorFn'); - } - const iter = result[Symbol.asyncIterator](); - if (typeof iter.next !== 'function') { - throw new ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE('asyncGeneratorFn'); - } - - this[kSourceIteratorPull] = null; - this[kSourceIteratorChunk] = null; - this[kSourceIteratorResolve] = null; - this[kSourceIteratorStream] = new Transform({ +Transform.by = function by(asyncGeneratorFn, opts) { + let _resolve; + let _promise = new Promise((resolve) => _resolve = resolve); + return from(Duplex, asyncGeneratorFn(async function*() { + while (true) { + const { chunk, done, cb } = await _promise; + if (done) return cb(); + yield chunk; + _promise = new Promise((resolve) => _resolve = resolve); + cb(); + } + }()), { objectMode: true, + autoDestroy: true, ...opts, - transform(chunk, encoding, cb) { - source.encoding = encoding; - if (source[kSourceIteratorResolve] === null) { - source[kSourceIteratorChunk] = chunk; - source[kSourceIteratorPull] = cb; - return; - } - source[kSourceIteratorResolve]({ value: chunk, done: false }); - source[kSourceIteratorResolve] = null; - cb(null); - } + write: (chunk, encoding, cb) => _resolve({ chunk, done: false, cb }), + final: (cb) => _resolve({ done: true, cb }) }); - this.encoding = this[kSourceIteratorStream]._transformState.writeencoding; - this[kSourceIteratorGrabResolve] = (resolve) => { - this[kSourceIteratorResolve] = resolve; - }; - const first = iter.next(); - this[kSourceIteratorPump](iter, first); -} - -SourceIterator.prototype[Symbol.asyncIterator] = function() { - return this; -}; - -ObjectSetPrototypeOf(SourceIterator.prototype, AsyncIteratorPrototype); - -SourceIterator.prototype.next = function next() { - if (this[kSourceIteratorPull] === null || this[kSourceIteratorChunk] === null) - return new Promise(this[kSourceIteratorGrabResolve]); - - this[kSourceIteratorPull](null); - const result = Promise.resolve({ - value: this[kSourceIteratorChunk], - done: false - }); - this[kSourceIteratorChunk] = null; - this[kSourceIteratorPull] = null; - return result; -}; - -SourceIterator.prototype[kSourceIteratorPump] = async function pump(iter, p) { - const stream = this[kSourceIteratorStream]; - try { - stream.removeListener('prefinish', prefinish); - stream.on('prefinish', () => { - if (this[kSourceIteratorResolve] !== null) { - this[kSourceIteratorResolve]({ value: undefined, done: true }); - } - }); - let next = await p; - while (true) { - const { done, value } = next; - if (done) { - if (value !== undefined) stream.push(value); - - // In the event of an early return we explicitly - // discard any buffered state - if (stream._writableState.length > 0) { - const { length } = stream._writableState; - const { transforming } = stream._transformState; - stream._writableState.length = 0; - stream._transformState.transforming = false; - prefinish.call(stream); - stream._writableState.length = length; - stream._transformState.transforming = transforming; - } else { - prefinish.call(stream); - } - break; - } - stream.push(value); - next = await iter.next(); - } - } catch (err) { - process.nextTick(() => stream.destroy(err)); - } finally { - this[kSourceIteratorPull] = null; - this[kSourceIteratorChunk] = null; - this[kSourceIteratorResolve] = null; - this[kSourceIteratorStream] = null; - } -}; - - -Transform.by = function by(asyncGeneratorFn, opts) { - const source = new SourceIterator(asyncGeneratorFn, opts); - const stream = source[kSourceIteratorStream]; - - return stream; }; From 3049db336a1fe3e02ba0f01b210eb854e944d8f9 Mon Sep 17 00:00:00 2001 From: Irakli Safareli Date: Sat, 15 May 2021 16:17:12 +0400 Subject: [PATCH 28/42] remove ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE --- doc/api/errors.md | 6 ---- lib/internal/errors.js | 2 -- test/parallel/test-transform-by.js | 56 ------------------------------ 3 files changed, 64 deletions(-) diff --git a/doc/api/errors.md b/doc/api/errors.md index 4e5c2c03af8f24..f199689f2dc432 100644 --- a/doc/api/errors.md +++ b/doc/api/errors.md @@ -640,12 +640,6 @@ display if `block` does not throw. An iterable argument (i.e. a value that works with `for...of` loops) was required, but not provided to a Node.js API. - -### ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE - -A function argument that returns an async iterable (i.e. a value that works -with `for await...of` loops) was required, but not provided to a Node.js API. - ### `ERR_ASSERTION` diff --git a/lib/internal/errors.js b/lib/internal/errors.js index 93d9076a5a28de..352f8d49ef5f20 100644 --- a/lib/internal/errors.js +++ b/lib/internal/errors.js @@ -836,8 +836,6 @@ module.exports = { // Note: Node.js specific errors must begin with the prefix ERR_ E('ERR_AMBIGUOUS_ARGUMENT', 'The "%s" argument is ambiguous. %s', TypeError); E('ERR_ARG_NOT_ITERABLE', '%s must be iterable', TypeError); -E('ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE', '%s must return an async iterable', - TypeError); E('ERR_ASSERTION', '%s', Error); E('ERR_ASYNC_CALLBACK', '%s must be a function', TypeError); E('ERR_ASYNC_TYPE', 'Invalid name for async "type": %s', TypeError); diff --git a/test/parallel/test-transform-by.js b/test/parallel/test-transform-by.js index fe01986b8d1a0b..72a30279606130 100644 --- a/test/parallel/test-transform-by.js +++ b/test/parallel/test-transform-by.js @@ -41,62 +41,6 @@ async function transformByFuncReturnsObjectWithSymbolAsyncIterator() { } } -async function -transformByObjReturnedWSymbolAsyncIteratorWithNonPromiseReturningNext() { - const mapper = (source) => ({ - [Symbol.asyncIterator]() { - return { - next() { - const { done, value } = source.next(); - return { done, value: value ? value.toUpperCase() : value }; - } - }; - } - }); - - expectsError(() => Transform.by(mapper), { - message: 'asyncGeneratorFn must return an async iterable', - code: 'ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE', - type: TypeError - }); -} - -async function transformByObjReturnedWSymbolAsyncIteratorWithNoNext() { - const mapper = () => ({ - [Symbol.asyncIterator]() { - return {}; - } - }); - - expectsError(() => Transform.by(mapper), { - message: 'asyncGeneratorFn must return an async iterable', - code: 'ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE', - type: TypeError - }); -} - -async function transformByObjReturnedWSymbolAsyncIteratorThatIsNotFunction() { - const mapper = () => ({ - [Symbol.asyncIterator]: 'wrong' - }); - - expectsError(() => Transform.by(mapper), { - message: 'asyncGeneratorFn must return an async iterable', - code: 'ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE', - type: TypeError - }); -} - -async function transformByFuncReturnsObjectWithoutSymbolAsyncIterator() { - const mapper = () => ({}); - - expectsError(() => Transform.by(mapper), { - message: 'asyncGeneratorFn must return an async iterable', - code: 'ERR_ARG_RETURN_VALUE_NOT_ASYNC_ITERABLE', - type: TypeError - }); -} - async function transformByEncoding() { const readable = Readable.from('test'); async function * mapper(source) { From 3ce77acc485c4d20297316781f6aeda8be5d7c2b Mon Sep 17 00:00:00 2001 From: Irakli Safareli Date: Sat, 15 May 2021 22:00:31 +0400 Subject: [PATCH 29/42] fix review comment --- lib/internal/streams/transform.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/internal/streams/transform.js b/lib/internal/streams/transform.js index 9ba4057824a33b..a95c52566f31e2 100644 --- a/lib/internal/streams/transform.js +++ b/lib/internal/streams/transform.js @@ -255,7 +255,7 @@ Transform.by = function by(asyncGeneratorFn, opts) { if (done) return cb(); yield chunk; _promise = new Promise((resolve) => _resolve = resolve); - cb(); + process.nextTick(cb); } }()), { objectMode: true, From 59791d0ef53f7e20bb7d0bad1b891182ceb776f6 Mon Sep 17 00:00:00 2001 From: Irakli Safareli Date: Sat, 15 May 2021 22:03:57 +0400 Subject: [PATCH 30/42] fix lint errors --- lib/internal/streams/transform.js | 1 + test/parallel/test-transform-by.js | 8 ++------ 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/lib/internal/streams/transform.js b/lib/internal/streams/transform.js index a95c52566f31e2..5a59a7ee44feb5 100644 --- a/lib/internal/streams/transform.js +++ b/lib/internal/streams/transform.js @@ -64,6 +64,7 @@ 'use strict'; const { + Promise, ObjectSetPrototypeOf, Symbol } = primordials; diff --git a/test/parallel/test-transform-by.js b/test/parallel/test-transform-by.js index 72a30279606130..ce240896dce5f7 100644 --- a/test/parallel/test-transform-by.js +++ b/test/parallel/test-transform-by.js @@ -1,5 +1,5 @@ 'use strict'; -const { mustCall, expectsError } = require('../common'); +const { mustCall } = require('../common'); const { once } = require('events'); const { Readable, Transform } = require('stream'); const { strictEqual } = require('assert'); @@ -236,10 +236,6 @@ async function transformByThrowPriorToForAwait() { Promise.all([ transformBy(), transformByFuncReturnsObjectWithSymbolAsyncIterator(), - transformByObjReturnedWSymbolAsyncIteratorWithNonPromiseReturningNext(), - transformByObjReturnedWSymbolAsyncIteratorWithNoNext(), - transformByObjReturnedWSymbolAsyncIteratorThatIsNotFunction(), - transformByFuncReturnsObjectWithoutSymbolAsyncIterator(), transformByEncoding(), transformBySourceIteratorCompletes(), transformByYieldPlusReturn(), @@ -249,5 +245,5 @@ Promise.all([ transformByOnErrorAndDestroyed(), transformByErrorTryCatchAndDestroyed(), transformByOnErrorAndTryCatchAndDestroyed(), - transformByThrowPriorToForAwait() + transformByThrowPriorToForAwait(), ]).then(mustCall()); From 6480563812c663e9854108f05df80e95ad6f9f2d Mon Sep 17 00:00:00 2001 From: Irakli Safareli Date: Sat, 15 May 2021 22:05:59 +0400 Subject: [PATCH 31/42] fix md lint errors --- doc/api/stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index 8debb62e546393..fd9d52c3f52304 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -3363,7 +3363,7 @@ contain multi-byte characters. [stream-_final]: #stream_writable_final_callback [stream-_flush]: #stream_transform_flush_callback [stream-_read]: #stream_readable_read_size_1 -[]: #stream_transform_transform_chunk_encoding_callback +[stream-_transform]: #stream_transform_transform_chunk_encoding_callback [stream-_write]: #stream_writable_write_chunk_encoding_callback_1 [stream-_writev]: #stream_writable_writev_chunks_callback [stream-end]: #stream_writable_end_chunk_encoding_callback From 307e926fac9624b8a7b3ea5e173d4e4c6a21871b Mon Sep 17 00:00:00 2001 From: Irakli Safareli Date: Mon, 17 May 2021 11:14:15 +0400 Subject: [PATCH 32/42] fix review comment --- lib/internal/streams/transform.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/internal/streams/transform.js b/lib/internal/streams/transform.js index 5a59a7ee44feb5..5158254417542e 100644 --- a/lib/internal/streams/transform.js +++ b/lib/internal/streams/transform.js @@ -253,7 +253,7 @@ Transform.by = function by(asyncGeneratorFn, opts) { return from(Duplex, asyncGeneratorFn(async function*() { while (true) { const { chunk, done, cb } = await _promise; - if (done) return cb(); + if (done) return process.nextTick(cb); yield chunk; _promise = new Promise((resolve) => _resolve = resolve); process.nextTick(cb); From 382b9b511bb44d40cbe0572ed6a9f5bfbbf51b73 Mon Sep 17 00:00:00 2001 From: Irakli Safareli Date: Mon, 17 May 2021 11:34:56 +0400 Subject: [PATCH 33/42] rename test file --- .../{test-transform-by.js => test-stream-transform-by.js} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename test/parallel/{test-transform-by.js => test-stream-transform-by.js} (100%) diff --git a/test/parallel/test-transform-by.js b/test/parallel/test-stream-transform-by.js similarity index 100% rename from test/parallel/test-transform-by.js rename to test/parallel/test-stream-transform-by.js From 58dc92d7712e679b545723eab36e18ea399298da Mon Sep 17 00:00:00 2001 From: Irakli Safareli Date: Mon, 17 May 2021 11:43:51 +0400 Subject: [PATCH 34/42] try fix doc build error I see error like ``` Error: Invalid param "options" > null > stream.Transform.by(asyncGeneratorFunction[, options]) ``` I guess the should fix the error --- lib/internal/streams/transform.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/internal/streams/transform.js b/lib/internal/streams/transform.js index 5158254417542e..9748d055c92eb1 100644 --- a/lib/internal/streams/transform.js +++ b/lib/internal/streams/transform.js @@ -247,7 +247,7 @@ Transform.prototype._read = function() { } }; -Transform.by = function by(asyncGeneratorFn, opts) { +Transform.by = function by(asyncGeneratorFn, options) { let _resolve; let _promise = new Promise((resolve) => _resolve = resolve); return from(Duplex, asyncGeneratorFn(async function*() { @@ -261,7 +261,7 @@ Transform.by = function by(asyncGeneratorFn, opts) { }()), { objectMode: true, autoDestroy: true, - ...opts, + ...options, write: (chunk, encoding, cb) => _resolve({ chunk, done: false, cb }), final: (cb) => _resolve({ done: true, cb }) }); From b4b5441804e5955f10b49e42a71a9e77993995f5 Mon Sep 17 00:00:00 2001 From: Irakli Safareli Date: Mon, 17 May 2021 11:44:17 +0400 Subject: [PATCH 35/42] fix other doc error I think this should fix the: Warning: 2950:5-2950:30 warning Found reference to undefined definition no-undefined-references remark-lint --- doc/api/stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index fd9d52c3f52304..6cc9a6651443f7 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -2947,7 +2947,7 @@ output on the `Readable` side is not consumed. * `options` {Object} Passed to both `Writable` and `Readable` constructors. Also has the following fields: * `transform` {Function} Implementation for the - [`stream._transform()`][] method. + [`stream._transform()`][stream-_transform] method. * `flush` {Function} Implementation for the [`stream._flush()`][stream-_flush] method. From a98544111d453fabc59f3010a522bef7d0eb8f02 Mon Sep 17 00:00:00 2001 From: Irakli Safareli Date: Fri, 4 Jun 2021 10:26:47 +0400 Subject: [PATCH 36/42] Apply stream.md suggestions from code review Co-authored-by: Rich Trott --- doc/api/stream.md | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index 6cc9a6651443f7..894997a015410b 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1890,7 +1890,7 @@ const stream = addAbortSignal( })(); ``` -### stream.Transform.by(asyncGeneratorFunction[, options]) +### `stream.Transform.by(asyncGeneratorFunction[, options])` @@ -3139,9 +3139,9 @@ readable.on('data', (chunk) => { }); ``` -#### Creating Transform Streams with Async Generator Functions +#### Creating transform streams with async generator functions -We can construct a Node.js Transform stream with an asynchronous +We can construct a Node.js transform stream with an asynchronous generator function using the `Transform.by()` utility method. ```js @@ -3168,7 +3168,8 @@ transform.on('data', (chunk) => { }); ``` -#### Piping to Writable Streams from Async Iterators +#### Piping to writable streams from async iterators +``` When writing to a writable stream from an async iterator, ensure correct handling of backpressure and errors. [`stream.pipeline()`][] abstracts away From e0c1f0e222fdf1e98104409b4f96974d46721921 Mon Sep 17 00:00:00 2001 From: Irakli Safareli Date: Sun, 6 Jun 2021 15:50:56 +0400 Subject: [PATCH 37/42] apply @aduh95 suggestions --- lib/internal/streams/transform.js | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/lib/internal/streams/transform.js b/lib/internal/streams/transform.js index 9748d055c92eb1..daa3029812785a 100644 --- a/lib/internal/streams/transform.js +++ b/lib/internal/streams/transform.js @@ -75,6 +75,7 @@ const { } = require('internal/errors').codes; const Duplex = require('internal/streams/duplex'); const from = require('internal/streams/from'); +const { createDeferredPromise } = require('internal/util'); ObjectSetPrototypeOf(Transform.prototype, Duplex.prototype); ObjectSetPrototypeOf(Transform, Duplex); @@ -248,21 +249,20 @@ Transform.prototype._read = function() { }; Transform.by = function by(asyncGeneratorFn, options) { - let _resolve; - let _promise = new Promise((resolve) => _resolve = resolve); + let { promise, resolve } = createDeferredPromise(); return from(Duplex, asyncGeneratorFn(async function*() { while (true) { - const { chunk, done, cb } = await _promise; - if (done) return process.nextTick(cb); - yield chunk; - _promise = new Promise((resolve) => _resolve = resolve); + const { chunk, done, cb } = await promise; process.nextTick(cb); + if (done) return; + yield chunk; + ({ promise, resolve }) = createDeferredPromise(); } }()), { objectMode: true, autoDestroy: true, ...options, - write: (chunk, encoding, cb) => _resolve({ chunk, done: false, cb }), - final: (cb) => _resolve({ done: true, cb }) + write: (chunk, encoding, cb) => resolve({ chunk, done: false, cb }), + final: (cb) => resolve({ done: true, cb }) }); }; From c5c862d13cc93eb4ca1336c4b0a800f52508f698 Mon Sep 17 00:00:00 2001 From: Irakli Safareli Date: Sun, 6 Jun 2021 17:04:54 +0400 Subject: [PATCH 38/42] Apply suggestions from code review Co-authored-by: Antoine du Hamel --- lib/internal/streams/transform.js | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/internal/streams/transform.js b/lib/internal/streams/transform.js index daa3029812785a..d9ffe01c97b3ed 100644 --- a/lib/internal/streams/transform.js +++ b/lib/internal/streams/transform.js @@ -64,7 +64,6 @@ 'use strict'; const { - Promise, ObjectSetPrototypeOf, Symbol } = primordials; @@ -256,7 +255,7 @@ Transform.by = function by(asyncGeneratorFn, options) { process.nextTick(cb); if (done) return; yield chunk; - ({ promise, resolve }) = createDeferredPromise(); + ({ promise, resolve } = createDeferredPromise()); } }()), { objectMode: true, From adc6059c9c700018854f03f15cd6d48ebe1a44c1 Mon Sep 17 00:00:00 2001 From: Irakli Safareli Date: Sun, 6 Jun 2021 17:09:32 +0400 Subject: [PATCH 39/42] Update doc/api/stream.md Co-authored-by: Antoine du Hamel --- doc/api/stream.md | 1 - 1 file changed, 1 deletion(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index 894997a015410b..10b02e2ea1d5ef 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -3169,7 +3169,6 @@ transform.on('data', (chunk) => { ``` #### Piping to writable streams from async iterators -``` When writing to a writable stream from an async iterator, ensure correct handling of backpressure and errors. [`stream.pipeline()`][] abstracts away From 967989efa319aed035f6512cab867d3726688086 Mon Sep 17 00:00:00 2001 From: Irakli Safareli Date: Tue, 8 Jun 2021 21:08:38 +0400 Subject: [PATCH 40/42] assign `encoding` to the async generator --- lib/internal/streams/transform.js | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/lib/internal/streams/transform.js b/lib/internal/streams/transform.js index d9ffe01c97b3ed..d8175ef6708305 100644 --- a/lib/internal/streams/transform.js +++ b/lib/internal/streams/transform.js @@ -65,6 +65,7 @@ const { ObjectSetPrototypeOf, + ObjectAssign, Symbol } = primordials; @@ -249,7 +250,7 @@ Transform.prototype._read = function() { Transform.by = function by(asyncGeneratorFn, options) { let { promise, resolve } = createDeferredPromise(); - return from(Duplex, asyncGeneratorFn(async function*() { + const asyncGenerator = async function*() { while (true) { const { chunk, done, cb } = await promise; process.nextTick(cb); @@ -257,11 +258,15 @@ Transform.by = function by(asyncGeneratorFn, options) { yield chunk; ({ promise, resolve } = createDeferredPromise()); } - }()), { + }(); + return from(Duplex, asyncGeneratorFn(asyncGenerator), { objectMode: true, autoDestroy: true, ...options, - write: (chunk, encoding, cb) => resolve({ chunk, done: false, cb }), + write: (chunk, encoding, cb) => { + ObjectAssign(asyncGenerator, { encoding }); + resolve({ chunk, done: false, cb }); + }, final: (cb) => resolve({ done: true, cb }) }); }; From 6ccafaba0e4e92ba7ed3582711bc29a9042c2030 Mon Sep 17 00:00:00 2001 From: Irakli Safareli Date: Wed, 9 Jun 2021 20:32:39 +0400 Subject: [PATCH 41/42] add encoding to yield --- lib/internal/streams/transform.js | 8 ++--- test/parallel/test-stream-transform-by.js | 38 +++++++++++------------ 2 files changed, 22 insertions(+), 24 deletions(-) diff --git a/lib/internal/streams/transform.js b/lib/internal/streams/transform.js index d8175ef6708305..b262d9a29e0052 100644 --- a/lib/internal/streams/transform.js +++ b/lib/internal/streams/transform.js @@ -65,7 +65,6 @@ const { ObjectSetPrototypeOf, - ObjectAssign, Symbol } = primordials; @@ -252,10 +251,10 @@ Transform.by = function by(asyncGeneratorFn, options) { let { promise, resolve } = createDeferredPromise(); const asyncGenerator = async function*() { while (true) { - const { chunk, done, cb } = await promise; + const { chunk, done, encoding, cb } = await promise; process.nextTick(cb); if (done) return; - yield chunk; + yield { chunk, encoding }; ({ promise, resolve } = createDeferredPromise()); } }(); @@ -264,8 +263,7 @@ Transform.by = function by(asyncGeneratorFn, options) { autoDestroy: true, ...options, write: (chunk, encoding, cb) => { - ObjectAssign(asyncGenerator, { encoding }); - resolve({ chunk, done: false, cb }); + resolve({ chunk, done: false, encoding, cb }); }, final: (cb) => resolve({ done: true, cb }) }); diff --git a/test/parallel/test-stream-transform-by.js b/test/parallel/test-stream-transform-by.js index ce240896dce5f7..83daed453e86b8 100644 --- a/test/parallel/test-stream-transform-by.js +++ b/test/parallel/test-stream-transform-by.js @@ -7,7 +7,7 @@ const { strictEqual } = require('assert'); async function transformBy() { const readable = Readable.from('test'); async function * mapper(source) { - for await (const chunk of source) { + for await (const { chunk } of source) { yield chunk.toUpperCase(); } } @@ -15,7 +15,7 @@ async function transformBy() { const stream = Transform.by(mapper); readable.pipe(stream); const expected = ['T', 'E', 'S', 'T']; - for await (const chunk of stream) { + for await (const { chunk } of stream) { strictEqual(chunk, expected.shift()); } } @@ -36,7 +36,7 @@ async function transformByFuncReturnsObjectWithSymbolAsyncIterator() { const stream = Transform.by(mapper); readable.pipe(stream); const expected = ['T', 'E', 'S', 'T']; - for await (const chunk of stream) { + for await (const { chunk } of stream) { strictEqual(chunk, expected.shift()); } } @@ -44,8 +44,8 @@ async function transformByFuncReturnsObjectWithSymbolAsyncIterator() { async function transformByEncoding() { const readable = Readable.from('test'); async function * mapper(source) { - for await (const chunk of source) { - strictEqual(source.encoding, 'ascii'); + for await (const { chunk, encoding } of source) { + strictEqual(encoding, 'ascii'); yield chunk.toUpperCase(); } } @@ -54,7 +54,7 @@ async function transformByEncoding() { readable.pipe(stream); const expected = ['T', 'E', 'S', 'T']; - for await (const chunk of stream) { + for await (const { chunk } of stream) { strictEqual(chunk, expected.shift()); } } @@ -63,7 +63,7 @@ async function transformBySourceIteratorCompletes() { const readable = Readable.from('test'); const mustReach = mustCall(); async function * mapper(source) { - for await (const chunk of source) { + for await (const { chunk } of source) { yield chunk.toUpperCase(); } mustReach(); @@ -72,7 +72,7 @@ async function transformBySourceIteratorCompletes() { const stream = Transform.by(mapper); readable.pipe(stream); const expected = ['T', 'E', 'S', 'T']; - for await (const chunk of stream) { + for await (const { chunk } of stream) { strictEqual(chunk, expected.shift()); } } @@ -80,7 +80,7 @@ async function transformBySourceIteratorCompletes() { async function transformByYieldPlusReturn() { const readable = Readable.from('test'); async function * mapper(source) { - for await (const chunk of source) { + for await (const { chunk } of source) { yield chunk.toUpperCase(); } return 'final chunk'; @@ -89,7 +89,7 @@ async function transformByYieldPlusReturn() { const stream = Transform.by(mapper); readable.pipe(stream); const expected = ['T', 'E', 'S', 'T', 'final chunk']; - for await (const chunk of stream) { + for await (const { chunk } of stream) { strictEqual(chunk, expected.shift()); } } @@ -97,7 +97,7 @@ async function transformByYieldPlusReturn() { async function transformByReturnEndsStream() { const readable = Readable.from('test'); async function * mapper(source) { - for await (const chunk of source) { + for await (const { chunk } of source) { yield chunk.toUpperCase(); return 'stop'; } @@ -107,7 +107,7 @@ async function transformByReturnEndsStream() { readable.pipe(stream); const expected = ['T', 'stop']; const mustReach = mustCall(); - for await (const chunk of stream) { + for await (const { chunk } of stream) { strictEqual(chunk, expected.shift()); } mustReach(); @@ -116,7 +116,7 @@ async function transformByReturnEndsStream() { async function transformByOnData() { const readable = Readable.from('test'); async function * mapper(source) { - for await (const chunk of source) { + for await (const { chunk } of source) { yield chunk.toUpperCase(); } } @@ -137,7 +137,7 @@ async function transformByOnData() { async function transformByOnDataNonObject() { const readable = Readable.from('test', { objectMode: false }); async function * mapper(source) { - for await (const chunk of source) { + for await (const { chunk } of source) { yield chunk.toString().toUpperCase(); } } @@ -158,7 +158,7 @@ async function transformByOnDataNonObject() { async function transformByOnErrorAndDestroyed() { const stream = Readable.from('test').pipe(Transform.by( async function * mapper(source) { - for await (const chunk of source) { + for await (const { chunk } of source) { if (chunk === 'e') throw new Error('kaboom'); yield chunk.toUpperCase(); } @@ -176,7 +176,7 @@ async function transformByOnErrorAndDestroyed() { async function transformByErrorTryCatchAndDestroyed() { const stream = Readable.from('test').pipe(Transform.by( async function * mapper(source) { - for await (const chunk of source) { + for await (const { chunk } of source) { if (chunk === 'e') throw new Error('kaboom'); yield chunk.toUpperCase(); } @@ -184,7 +184,7 @@ async function transformByErrorTryCatchAndDestroyed() { )); strictEqual(stream.destroyed, false); try { - for await (const chunk of stream) { + for await (const { chunk } of stream) { strictEqual(chunk.toString(), 'T'); } } catch (err) { @@ -196,7 +196,7 @@ async function transformByErrorTryCatchAndDestroyed() { async function transformByOnErrorAndTryCatchAndDestroyed() { const stream = Readable.from('test').pipe(Transform.by( async function * mapper(source) { - for await (const chunk of source) { + for await (const { chunk } of source) { if (chunk === 'e') throw new Error('kaboom'); yield chunk.toUpperCase(); } @@ -207,7 +207,7 @@ async function transformByOnErrorAndTryCatchAndDestroyed() { strictEqual(err.message, 'kaboom'); })); try { - for await (const chunk of stream) { + for await (const { chunk } of stream) { strictEqual(chunk.toString(), 'T'); } } catch (err) { From 9bccc233a7b59577c137a6b83193456d6fbdbbcb Mon Sep 17 00:00:00 2001 From: Irakli Safareli Date: Tue, 15 Jun 2021 14:03:22 +0400 Subject: [PATCH 42/42] try fix some tests --- test/parallel/test-stream-transform-by.js | 40 +++++++++++------------ 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/test/parallel/test-stream-transform-by.js b/test/parallel/test-stream-transform-by.js index 83daed453e86b8..2affdff759723f 100644 --- a/test/parallel/test-stream-transform-by.js +++ b/test/parallel/test-stream-transform-by.js @@ -5,7 +5,7 @@ const { Readable, Transform } = require('stream'); const { strictEqual } = require('assert'); async function transformBy() { - const readable = Readable.from('test'); + const readable = Readable.from('test'.split('')); async function * mapper(source) { for await (const { chunk } of source) { yield chunk.toUpperCase(); @@ -15,19 +15,19 @@ async function transformBy() { const stream = Transform.by(mapper); readable.pipe(stream); const expected = ['T', 'E', 'S', 'T']; - for await (const { chunk } of stream) { + for await (const chunk of stream) { strictEqual(chunk, expected.shift()); } } async function transformByFuncReturnsObjectWithSymbolAsyncIterator() { - const readable = Readable.from('test'); + const readable = Readable.from('test'.split('')); const mapper = (source) => ({ [Symbol.asyncIterator]() { return { async next() { const { done, value } = await source.next(); - return { done, value: value ? value.toUpperCase() : value }; + return { done, value: value ? value.chunk.toUpperCase() : value }; } }; } @@ -36,13 +36,13 @@ async function transformByFuncReturnsObjectWithSymbolAsyncIterator() { const stream = Transform.by(mapper); readable.pipe(stream); const expected = ['T', 'E', 'S', 'T']; - for await (const { chunk } of stream) { + for await (const chunk of stream) { strictEqual(chunk, expected.shift()); } } async function transformByEncoding() { - const readable = Readable.from('test'); + const readable = Readable.from('test'.split('')); async function * mapper(source) { for await (const { chunk, encoding } of source) { strictEqual(encoding, 'ascii'); @@ -54,13 +54,13 @@ async function transformByEncoding() { readable.pipe(stream); const expected = ['T', 'E', 'S', 'T']; - for await (const { chunk } of stream) { + for await (const chunk of stream) { strictEqual(chunk, expected.shift()); } } async function transformBySourceIteratorCompletes() { - const readable = Readable.from('test'); + const readable = Readable.from('test'.split('')); const mustReach = mustCall(); async function * mapper(source) { for await (const { chunk } of source) { @@ -72,13 +72,13 @@ async function transformBySourceIteratorCompletes() { const stream = Transform.by(mapper); readable.pipe(stream); const expected = ['T', 'E', 'S', 'T']; - for await (const { chunk } of stream) { + for await (const chunk of stream) { strictEqual(chunk, expected.shift()); } } async function transformByYieldPlusReturn() { - const readable = Readable.from('test'); + const readable = Readable.from('test'.split('')); async function * mapper(source) { for await (const { chunk } of source) { yield chunk.toUpperCase(); @@ -89,13 +89,13 @@ async function transformByYieldPlusReturn() { const stream = Transform.by(mapper); readable.pipe(stream); const expected = ['T', 'E', 'S', 'T', 'final chunk']; - for await (const { chunk } of stream) { + for await (const chunk of stream) { strictEqual(chunk, expected.shift()); } } async function transformByReturnEndsStream() { - const readable = Readable.from('test'); + const readable = Readable.from('test'.split('')); async function * mapper(source) { for await (const { chunk } of source) { yield chunk.toUpperCase(); @@ -107,14 +107,14 @@ async function transformByReturnEndsStream() { readable.pipe(stream); const expected = ['T', 'stop']; const mustReach = mustCall(); - for await (const { chunk } of stream) { + for await (const chunk of stream) { strictEqual(chunk, expected.shift()); } mustReach(); } async function transformByOnData() { - const readable = Readable.from('test'); + const readable = Readable.from('test'.split('')); async function * mapper(source) { for await (const { chunk } of source) { yield chunk.toUpperCase(); @@ -135,7 +135,7 @@ async function transformByOnData() { } async function transformByOnDataNonObject() { - const readable = Readable.from('test', { objectMode: false }); + const readable = Readable.from('test'.split(''), { objectMode: false }); async function * mapper(source) { for await (const { chunk } of source) { yield chunk.toString().toUpperCase(); @@ -156,7 +156,7 @@ async function transformByOnDataNonObject() { } async function transformByOnErrorAndDestroyed() { - const stream = Readable.from('test').pipe(Transform.by( + const stream = Readable.from('test'.split('')).pipe(Transform.by( async function * mapper(source) { for await (const { chunk } of source) { if (chunk === 'e') throw new Error('kaboom'); @@ -174,7 +174,7 @@ async function transformByOnErrorAndDestroyed() { } async function transformByErrorTryCatchAndDestroyed() { - const stream = Readable.from('test').pipe(Transform.by( + const stream = Readable.from('test'.split('')).pipe(Transform.by( async function * mapper(source) { for await (const { chunk } of source) { if (chunk === 'e') throw new Error('kaboom'); @@ -184,7 +184,7 @@ async function transformByErrorTryCatchAndDestroyed() { )); strictEqual(stream.destroyed, false); try { - for await (const { chunk } of stream) { + for await (const chunk of stream) { strictEqual(chunk.toString(), 'T'); } } catch (err) { @@ -194,7 +194,7 @@ async function transformByErrorTryCatchAndDestroyed() { } async function transformByOnErrorAndTryCatchAndDestroyed() { - const stream = Readable.from('test').pipe(Transform.by( + const stream = Readable.from('test'.split('')).pipe(Transform.by( async function * mapper(source) { for await (const { chunk } of source) { if (chunk === 'e') throw new Error('kaboom'); @@ -207,7 +207,7 @@ async function transformByOnErrorAndTryCatchAndDestroyed() { strictEqual(err.message, 'kaboom'); })); try { - for await (const { chunk } of stream) { + for await (const chunk of stream) { strictEqual(chunk.toString(), 'T'); } } catch (err) {