From 49e4a710de16bae3f66d2a1e172418841224a7db Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Mon, 20 Oct 2025 21:57:20 +0300 Subject: [PATCH 1/8] refactor: extract Queue from IncrementalPublisher and IncrementalGraph Queue is a batching async-aware iterator-like protocol meant to be consumed in the following (and only the following) pattern: > let batch; > if ((batch = queue.currentBatch()) !== undefined) { > doSomethingWithBatch(batch); > } > while ((batch = await queue.nextBatch) !== undefined) { > doSomethingWithBatch(batch); > } A `push()` methods is provided in the Queue constructor (a la repeaters, see https://repeater.js.org/) so that `push()` can be private to the code that constructs the Queue (although it can be saved to be passed to other code). A `stop()` method is also provided as a the second argument to the constructor for convienence, but it is also available on the queue object itself so that it can be called by the executor or by the consumer. So this works: > const queue = new Queue( > async (push, stop) => { > push(1); > push(2) > await Promise.resolve(); > push(3); > stop(); > }, > ); > > const batch1 = Array.from(queue.nextBatch()); // batch1 = [1, 2] > const batch2 = Array.from(await queue.nextBatchAsync()); // batch2 = [3] as does this: > let push; > const queue = new Queue( > (_push) => { > push = _push; > }, > ); > > push(1); > push(2); > > const batch1 = Array.from(queue.nextBatch()); // batch1 = [1, 2] > > const batch2Promise = queue.nextBatchAsync(); > > await Promise.resolve(); > push(3); > queue.stop(); > > const batch2 = await batch2Promise; // batch2 = [3] Note: concurrent calls to `currentBatch()` and `nextBatch` will return the same batch and are not encouraged. A `toAsyncIterable(mapFn)` method transforms coalesces each batch of items into a single value (or undefined if not value is to be emitted for the batch). Using queues, we are able to remove all logic for handling the implicit queue from IncrementalPublisher, retaining only the `_handleCompletedBatch()`, while adding only the required `push()` and `stop()` calls within the IncrementalGraph. Tests do not change, except that `.return()` and `.throw()` (but not `next()` have an extra tick secondary to the additional layers of `withCleanup()`, so that the tests required slight adjustment. --- src/execution/IncrementalGraph.ts | 73 ++++++-------- src/execution/IncrementalPublisher.ts | 91 ++++++++---------- src/execution/Queue.ts | 113 ++++++++++++++++++++++ src/execution/__tests__/Queue-test.ts | 127 +++++++++++++++++++++++++ src/execution/__tests__/stream-test.ts | 7 +- 5 files changed, 311 insertions(+), 100 deletions(-) create mode 100644 src/execution/Queue.ts create mode 100644 src/execution/__tests__/Queue-test.ts diff --git a/src/execution/IncrementalGraph.ts b/src/execution/IncrementalGraph.ts index 98dcdb3fec..da0d1c7ba1 100644 --- a/src/execution/IncrementalGraph.ts +++ b/src/execution/IncrementalGraph.ts @@ -1,10 +1,10 @@ import { BoxedPromiseOrValue } from '../jsutils/BoxedPromiseOrValue.js'; import { invariant } from '../jsutils/invariant.js'; import { isPromise } from '../jsutils/isPromise.js'; -import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js'; import type { GraphQLError } from '../error/GraphQLError.js'; +import { Queue } from './Queue.js'; import type { DeferredFragmentRecord, DeliveryGroup, @@ -22,16 +22,18 @@ import { isDeferredFragmentRecord, isPendingExecutionGroup } from './types.js'; */ export class IncrementalGraph { private _rootNodes: Set; - - private _completedQueue: Array; - private _nextQueue: Array< - (iterable: Iterable | undefined) => void - >; + private _completed: Queue; + // _push and _stop are assigned in the executor which is executed + // synchronously by the Queue constructor. + private _push!: (item: IncrementalDataRecordResult) => void; + private _stop!: () => void; constructor() { this._rootNodes = new Set(); - this._completedQueue = []; - this._nextQueue = []; + this._completed = new Queue((push, stop) => { + this._push = push; + this._stop = stop; + }); } getNewRootNodes( @@ -92,29 +94,6 @@ export class IncrementalGraph { } } - *currentCompletedBatch(): Generator { - let completed; - while ((completed = this._completedQueue.shift()) !== undefined) { - yield completed; - } - } - - nextCompletedBatch(): Promise< - Iterable | undefined - > { - const { promise, resolve } = promiseWithResolvers< - Iterable | undefined - >(); - this._nextQueue.push(resolve); - return promise; - } - - abort(): void { - for (const resolve of this._nextQueue) { - resolve(undefined); - } - } - hasNext(): boolean { return this._rootNodes.size > 0; } @@ -146,17 +125,28 @@ export class IncrementalGraph { const newRootNodes = this._promoteNonEmptyToRoot( deferredFragmentRecord.children, ); + this._maybeStop(); return { newRootNodes, successfulExecutionGroups }; } removeDeferredFragment( deferredFragmentRecord: DeferredFragmentRecord, ): boolean { - return this._rootNodes.delete(deferredFragmentRecord); + const deleted = this._rootNodes.delete(deferredFragmentRecord); + if (!deleted) { + return false; + } + this._maybeStop(); + return true; } removeStream(streamRecord: StreamRecord): void { this._rootNodes.delete(streamRecord); + this._maybeStop(); + } + + subscribe(): Queue { + return this._completed; } private _addIncrementalDataRecords( @@ -246,9 +236,9 @@ export class IncrementalGraph { const value = completedExecutionGroup.value; if (isPromise(value)) { // eslint-disable-next-line @typescript-eslint/no-floating-promises - value.then((resolved) => this._enqueue(resolved)); + value.then((resolved) => this._push(resolved)); } else { - this._enqueue(value); + this._push(value); } } @@ -266,7 +256,7 @@ export class IncrementalGraph { : streamItemRecord().value; if (isPromise(result)) { if (items.length > 0) { - this._enqueue({ + this._push({ streamRecord, result: // TODO add additional test case or rework for coverage @@ -290,14 +280,14 @@ export class IncrementalGraph { } if (result.item === undefined) { if (items.length > 0) { - this._enqueue({ + this._push({ streamRecord, result: errors.length > 0 ? { items, errors } : { items }, newDeferredFragmentRecords, incrementalDataRecords, }); } - this._enqueue( + this._push( result.errors === undefined ? { streamRecord } : { @@ -320,12 +310,9 @@ export class IncrementalGraph { } } - private _enqueue(completed: IncrementalDataRecordResult): void { - this._completedQueue.push(completed); - const next = this._nextQueue.shift(); - if (next === undefined) { - return; + private _maybeStop(): void { + if (!this.hasNext()) { + this._stop(); } - next(this.currentCompletedBatch()); } } diff --git a/src/execution/IncrementalPublisher.ts b/src/execution/IncrementalPublisher.ts index 1bf83bb65f..99d48b265d 100644 --- a/src/execution/IncrementalPublisher.ts +++ b/src/execution/IncrementalPublisher.ts @@ -64,13 +64,11 @@ interface SubsequentIncrementalExecutionResultContext { * @internal */ class IncrementalPublisher { - private _isDone: boolean; private _context: IncrementalPublisherContext; private _nextId: number; private _incrementalGraph: IncrementalGraph; constructor(context: IncrementalPublisherContext) { - this._isDone = false; this._context = context; this._nextId = 0; this._incrementalGraph = new IncrementalGraph(); @@ -95,14 +93,17 @@ class IncrementalPublisher { ? { errors, data, pending, hasNext: true } : { data, pending, hasNext: true }; - const subsequentResults = withCleanup(this._subscribe(), async () => { - this._isDone = true; - this._context.abortSignalListener?.disconnect(); - this._incrementalGraph.abort(); - await this._returnAsyncIteratorsIgnoringErrors(); - }); + const subsequentResults = this._incrementalGraph + .subscribe() + .toAsyncGenerator((batch) => this._handleCompletedBatch(batch)); - return { initialResult, subsequentResults }; + return { + initialResult, + subsequentResults: withCleanup(subsequentResults, async () => { + this._context.abortSignalListener?.disconnect(); + await this._returnAsyncIteratorsIgnoringErrors(); + }), + }; } private _toPendingResults( @@ -128,55 +129,39 @@ class IncrementalPublisher { return String(this._nextId++); } - private async *_subscribe(): AsyncGenerator< - SubsequentIncrementalExecutionResult, - void, - void - > { - while (!this._isDone) { - const context: SubsequentIncrementalExecutionResultContext = { - pending: [], - incremental: [], - completed: [], - }; - - let batch: Iterable | undefined = - this._incrementalGraph.currentCompletedBatch(); - do { - for (const completedResult of batch) { - this._handleCompletedIncrementalData(completedResult, context); - } - - const { incremental, completed } = context; - if (incremental.length > 0 || completed.length > 0) { - const hasNext = this._incrementalGraph.hasNext(); - - if (!hasNext) { - this._isDone = true; - } + private _handleCompletedBatch( + batch: Iterable, + ): SubsequentIncrementalExecutionResult | undefined { + const context: SubsequentIncrementalExecutionResultContext = { + pending: [], + incremental: [], + completed: [], + }; - const subsequentIncrementalExecutionResult: SubsequentIncrementalExecutionResult = - { hasNext }; + for (const completedResult of batch) { + this._handleCompletedIncrementalData(completedResult, context); + } - const pending = context.pending; - if (pending.length > 0) { - subsequentIncrementalExecutionResult.pending = pending; - } - if (incremental.length > 0) { - subsequentIncrementalExecutionResult.incremental = incremental; - } - if (completed.length > 0) { - subsequentIncrementalExecutionResult.completed = completed; - } + const { incremental, completed } = context; + if (incremental.length === 0 && completed.length === 0) { + return; + } - yield subsequentIncrementalExecutionResult; - break; - } + const hasNext = this._incrementalGraph.hasNext(); - // eslint-disable-next-line no-await-in-loop - batch = await this._incrementalGraph.nextCompletedBatch(); - } while (batch !== undefined); + const subsequentIncrementalExecutionResult: SubsequentIncrementalExecutionResult = + { hasNext }; + const pending = context.pending; + if (pending.length > 0) { + subsequentIncrementalExecutionResult.pending = pending; + } + if (incremental.length > 0) { + subsequentIncrementalExecutionResult.incremental = incremental; + } + if (completed.length > 0) { + subsequentIncrementalExecutionResult.completed = completed; } + return subsequentIncrementalExecutionResult; } private _handleCompletedIncrementalData( diff --git a/src/execution/Queue.ts b/src/execution/Queue.ts new file mode 100644 index 0000000000..456c09d5f0 --- /dev/null +++ b/src/execution/Queue.ts @@ -0,0 +1,113 @@ +import { isPromise } from '../jsutils/isPromise.js'; +import type { PromiseOrValue } from '../jsutils/PromiseOrValue.js'; +import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js'; + +import { withCleanup } from './withCleanup.js'; + +/** + * @internal + */ +export class Queue { + private _items: Array; + private _stopped: boolean; + private _resolvers: Array<(iterable: Generator | undefined) => void>; + + constructor( + executor: ( + push: (item: T) => void, + stop: () => void, + ) => PromiseOrValue, + ) { + this._items = []; + this._stopped = false; + this._resolvers = []; + let result; + try { + result = executor(this._push.bind(this), this.stop.bind(this)); + } catch { + // Ignore errors + } + if (isPromise(result)) { + result.catch(() => { + // Ignore errors + }); + } + } + + currentBatch(): Generator | undefined { + if (this._items.length > 0) { + return this.batch(); + } + } + + nextBatch(): Promise | undefined> { + if (this._items.length > 0) { + return Promise.resolve(this.batch()); + } + if (this._stopped) { + return Promise.resolve(undefined); + } + const { promise, resolve } = promiseWithResolvers< + Generator | undefined + >(); + this._resolvers.push(resolve); + return promise; + } + + stop(): void { + this._stopped = true; + this._resolve(undefined); + } + + toAsyncGenerator( + mapFn: (generator: Generator) => U | undefined, + ): AsyncGenerator { + return withCleanup(this.toAsyncGeneratorImpl(mapFn), () => this.stop()); + } + + private async *toAsyncGeneratorImpl( + mapFn: (generator: Generator) => U | undefined, + ): AsyncGenerator { + let batch = this.currentBatch(); + if (batch !== undefined) { + const maybe = mapFn(batch); + if (maybe !== undefined) { + yield maybe; + } + } + + if (this._stopped) { + return; + } + + // eslint-disable-next-line no-await-in-loop + while ((batch = await this.nextBatch()) !== undefined) { + const maybe = mapFn(batch); + if (maybe !== undefined) { + yield maybe; + } + } + } + + private _push(item: T): void { + if (this._stopped) { + this._resolve(undefined); + } + this._items.push(item); + this._resolve(this.batch()); + } + + private _resolve(maybeIterable: Generator | undefined): void { + for (const resolve of this._resolvers) { + resolve(maybeIterable); + } + this._resolvers = []; + } + + private *batch(): Generator { + let item: T | undefined; + while ((item = this._items.shift()) !== undefined) { + yield item; + } + } +} diff --git a/src/execution/__tests__/Queue-test.ts b/src/execution/__tests__/Queue-test.ts new file mode 100644 index 0000000000..9dfc58be4c --- /dev/null +++ b/src/execution/__tests__/Queue-test.ts @@ -0,0 +1,127 @@ +import { assert, expect } from 'chai'; +import { describe, it } from 'mocha'; + +import { promiseWithResolvers } from '../../jsutils/promiseWithResolvers.js'; + +import { Queue } from '../Queue.js'; + +describe('Queue', () => { + it('should yield sync pushed items in order', () => { + const queue = new Queue((push) => { + push(1); + push(2); + push(3); + }); + + const batch1 = queue.currentBatch(); + assert(batch1 !== undefined); + expect(Array.from(batch1)).to.deep.equal([1, 2, 3]); + }); + + it('should yield async pushed items in order', async () => { + const queue = new Queue(async (push) => { + await Promise.resolve(); + push(1); + push(2); + push(3); + }); + + const batch1 = queue.currentBatch(); + expect(batch1).to.deep.equal(undefined); + const batch2 = await queue.nextBatch(); + assert(batch2 !== undefined); + expect(Array.from(batch2)).to.deep.equal([1, 2, 3]); + }); + + it('should yield multiple batches', async () => { + const queue = new Queue(async (push) => { + await Promise.resolve(); + push(1); + push(2); + push(3); + await Promise.resolve(); + push(4); + push(5); + push(6); + }); + + const batch1 = queue.currentBatch(); + expect(batch1).to.equal(undefined); + const batch2 = await queue.nextBatch(); + assert(batch2 !== undefined); + expect(Array.from(batch2)).to.deep.equal([1, 2, 3]); + const batch3 = await queue.nextBatch(); + assert(batch3 !== undefined); + expect(Array.from(batch3)).to.deep.equal([4, 5, 6]); + }); + + it('should allow the executor to indicate completion', async () => { + const queue = new Queue(async (_push, stop) => { + await Promise.resolve(); + stop(); + }); + + const batch1 = queue.currentBatch(); + expect(batch1).to.equal(undefined); + const batch2 = await queue.nextBatch(); + expect(batch2).to.equal(undefined); + }); + + it('should allow a consumer to abort a pending call to nextBatch', async () => { + const queue = new Queue(async () => { + const { promise } = promiseWithResolvers(); + // wait forever + await promise; + }); + + const batch1 = queue.currentBatch(); + expect(batch1).to.equal(undefined); + const batch2Promise = queue.nextBatch(); + queue.stop(); + expect(await batch2Promise).to.equal(undefined); + }); + + it('should allow saving the push function', async () => { + let push!: (item: number) => void; + const queue = new Queue((_push) => { + push = _push; + }); + + const batch1 = queue.currentBatch(); + expect(batch1).to.equal(undefined); + + const batch2Promise = queue.nextBatch(); + + await Promise.resolve(); + push(1); + push(2); + push(3); + + const batch2 = await batch2Promise; + assert(batch2 !== undefined); + expect(Array.from(batch2)).to.deep.equal([1, 2, 3]); + }); + + it('should ignore sync errors in the executor', async () => { + const queue = new Queue(() => { + throw new Error('Oops'); + }); + const batch1 = queue.currentBatch(); + expect(batch1).to.equal(undefined); + const batch2Promise = queue.nextBatch(); + queue.stop(); + expect(await batch2Promise).to.equal(undefined); + }); + + it('should ignore async errors in the executor', async () => { + const queue = new Queue(async () => { + await Promise.resolve(); + throw new Error('Oops'); + }); + const batch1 = queue.currentBatch(); + expect(batch1).to.equal(undefined); + const batch2Promise = queue.nextBatch(); + queue.stop(); + expect(await batch2Promise).to.equal(undefined); + }); +}); diff --git a/src/execution/__tests__/stream-test.ts b/src/execution/__tests__/stream-test.ts index ada96d9c44..305dd74d43 100644 --- a/src/execution/__tests__/stream-test.ts +++ b/src/execution/__tests__/stream-test.ts @@ -2432,14 +2432,13 @@ describe('Execute: stream directive', () => { hasNext: true, }); - const returnPromise = iterator.return(); + await iterator.return(); const result2 = await iterator.next(); expectJSON(result2).toDeepEqual({ done: true, value: undefined, }); - await returnPromise; }); it('Returns underlying async iterables when returned generator is thrown', async () => { let index = 0; @@ -2495,14 +2494,14 @@ describe('Execute: stream directive', () => { hasNext: true, }); - const throwPromise = iterator.throw(new Error('bad')); + await expectPromise(iterator.throw(new Error('bad'))).toRejectWith('bad'); const result2 = await iterator.next(); expectJSON(result2).toDeepEqual({ done: true, value: undefined, }); - await expectPromise(throwPromise).toRejectWith('bad'); + assert(returned); }); it('Returns underlying async iterables when uses resource is disposed', async () => { From f21d0e7e3208c0d12e455db674abbc9b8f0e0906 Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Mon, 27 Oct 2025 15:00:14 +0200 Subject: [PATCH 2/8] polish(queue): only subscribe needs to be public --- src/execution/IncrementalGraph.ts | 6 +- src/execution/IncrementalPublisher.ts | 6 +- src/execution/Queue.ts | 80 ++++++++------- src/execution/__tests__/Queue-test.ts | 139 ++++++++++++++++---------- 4 files changed, 139 insertions(+), 92 deletions(-) diff --git a/src/execution/IncrementalGraph.ts b/src/execution/IncrementalGraph.ts index da0d1c7ba1..971e08a271 100644 --- a/src/execution/IncrementalGraph.ts +++ b/src/execution/IncrementalGraph.ts @@ -145,8 +145,10 @@ export class IncrementalGraph { this._maybeStop(); } - subscribe(): Queue { - return this._completed; + subscribe( + mapFn: (generator: Generator) => U | undefined, + ): AsyncGenerator { + return this._completed.subscribe(mapFn); } private _addIncrementalDataRecords( diff --git a/src/execution/IncrementalPublisher.ts b/src/execution/IncrementalPublisher.ts index 99d48b265d..201179874f 100644 --- a/src/execution/IncrementalPublisher.ts +++ b/src/execution/IncrementalPublisher.ts @@ -93,9 +93,9 @@ class IncrementalPublisher { ? { errors, data, pending, hasNext: true } : { data, pending, hasNext: true }; - const subsequentResults = this._incrementalGraph - .subscribe() - .toAsyncGenerator((batch) => this._handleCompletedBatch(batch)); + const subsequentResults = this._incrementalGraph.subscribe((batch) => + this._handleCompletedBatch(batch), + ); return { initialResult, diff --git a/src/execution/Queue.ts b/src/execution/Queue.ts index 456c09d5f0..7a4b93a482 100644 --- a/src/execution/Queue.ts +++ b/src/execution/Queue.ts @@ -25,70 +25,78 @@ export class Queue { try { result = executor(this._push.bind(this), this.stop.bind(this)); } catch { - // Ignore errors + // Ignore actual error + this.stop(); } if (isPromise(result)) { result.catch(() => { - // Ignore errors + // Ignore actual error + this.stop(); }); } } - currentBatch(): Generator | undefined { - if (this._items.length > 0) { - return this.batch(); - } - } - - nextBatch(): Promise | undefined> { - if (this._items.length > 0) { - return Promise.resolve(this.batch()); - } - if (this._stopped) { - return Promise.resolve(undefined); - } - const { promise, resolve } = promiseWithResolvers< - Generator | undefined - >(); - this._resolvers.push(resolve); - return promise; - } - stop(): void { this._stopped = true; this._resolve(undefined); } - toAsyncGenerator( + subscribe( mapFn: (generator: Generator) => U | undefined, ): AsyncGenerator { - return withCleanup(this.toAsyncGeneratorImpl(mapFn), () => this.stop()); + return withCleanup(this.subscribeImpl(mapFn), () => this.stop()); } - private async *toAsyncGeneratorImpl( + private async *subscribeImpl( mapFn: (generator: Generator) => U | undefined, ): AsyncGenerator { - let batch = this.currentBatch(); - if (batch !== undefined) { - const maybe = mapFn(batch); - if (maybe !== undefined) { - yield maybe; - } - } - if (this._stopped) { return; } + while (this._items.length > 0) { + const maybe = mapFn(this.batch()); + if (maybe === undefined) { + continue; + } + yield maybe; + if (this._stopped) { + return; + } + } + + let batch: Generator | undefined; // eslint-disable-next-line no-await-in-loop - while ((batch = await this.nextBatch()) !== undefined) { - const maybe = mapFn(batch); - if (maybe !== undefined) { + while ((batch = await this._nextBatch()) !== undefined) { + let maybe = mapFn(batch); + if (maybe === undefined) { + continue; + } + yield maybe; + if (this._stopped) { + return; + } + while (this._items.length > 0) { + maybe = mapFn(this.batch()); + if (maybe === undefined) { + continue; + } yield maybe; + if (this._stopped) { + return; + } } } } + private _nextBatch(): Promise | undefined> { + const { promise, resolve } = promiseWithResolvers< + Generator | undefined + >(); + this._resolvers.push(resolve); + return promise; + } + private _push(item: T): void { if (this._stopped) { this._resolve(undefined); diff --git a/src/execution/__tests__/Queue-test.ts b/src/execution/__tests__/Queue-test.ts index 9dfc58be4c..dc815dc72c 100644 --- a/src/execution/__tests__/Queue-test.ts +++ b/src/execution/__tests__/Queue-test.ts @@ -1,70 +1,61 @@ -import { assert, expect } from 'chai'; +import { expect } from 'chai'; import { describe, it } from 'mocha'; +import { resolveOnNextTick } from '../../__testUtils__/resolveOnNextTick.js'; + import { promiseWithResolvers } from '../../jsutils/promiseWithResolvers.js'; import { Queue } from '../Queue.js'; describe('Queue', () => { - it('should yield sync pushed items in order', () => { + it('should yield sync pushed items in order', async () => { const queue = new Queue((push) => { push(1); push(2); push(3); }); - const batch1 = queue.currentBatch(); - assert(batch1 !== undefined); - expect(Array.from(batch1)).to.deep.equal([1, 2, 3]); + const sub = queue.subscribe((batch) => Array.from(batch)); + expect(await sub.next()).to.deep.equal({ done: false, value: [1, 2, 3] }); }); it('should yield async pushed items in order', async () => { const queue = new Queue(async (push) => { - await Promise.resolve(); + await resolveOnNextTick(); push(1); push(2); push(3); }); - const batch1 = queue.currentBatch(); - expect(batch1).to.deep.equal(undefined); - const batch2 = await queue.nextBatch(); - assert(batch2 !== undefined); - expect(Array.from(batch2)).to.deep.equal([1, 2, 3]); + const sub = queue.subscribe((batch) => Array.from(batch)); + expect(await sub.next()).to.deep.equal({ done: false, value: [1, 2, 3] }); }); it('should yield multiple batches', async () => { const queue = new Queue(async (push) => { - await Promise.resolve(); + await resolveOnNextTick(); push(1); push(2); push(3); - await Promise.resolve(); + await resolveOnNextTick(); push(4); push(5); push(6); }); - const batch1 = queue.currentBatch(); - expect(batch1).to.equal(undefined); - const batch2 = await queue.nextBatch(); - assert(batch2 !== undefined); - expect(Array.from(batch2)).to.deep.equal([1, 2, 3]); - const batch3 = await queue.nextBatch(); - assert(batch3 !== undefined); - expect(Array.from(batch3)).to.deep.equal([4, 5, 6]); + const sub = queue.subscribe((batch) => Array.from(batch)); + expect(await sub.next()).to.deep.equal({ done: false, value: [1, 2, 3] }); + expect(await sub.next()).to.deep.equal({ done: false, value: [4, 5, 6] }); }); it('should allow the executor to indicate completion', async () => { const queue = new Queue(async (_push, stop) => { - await Promise.resolve(); + await resolveOnNextTick(); stop(); }); - const batch1 = queue.currentBatch(); - expect(batch1).to.equal(undefined); - const batch2 = await queue.nextBatch(); - expect(batch2).to.equal(undefined); + const sub = queue.subscribe((batch) => batch); + expect(await sub.next()).to.deep.equal({ done: true, value: undefined }); }); it('should allow a consumer to abort a pending call to nextBatch', async () => { @@ -74,11 +65,10 @@ describe('Queue', () => { await promise; }); - const batch1 = queue.currentBatch(); - expect(batch1).to.equal(undefined); - const batch2Promise = queue.nextBatch(); + const sub = queue.subscribe((batch) => batch); + const nextPromise = sub.next(); queue.stop(); - expect(await batch2Promise).to.equal(undefined); + expect(await nextPromise).to.deep.equal({ done: true, value: undefined }); }); it('should allow saving the push function', async () => { @@ -87,41 +77,88 @@ describe('Queue', () => { push = _push; }); - const batch1 = queue.currentBatch(); - expect(batch1).to.equal(undefined); - - const batch2Promise = queue.nextBatch(); + const sub = queue.subscribe((batch) => Array.from(batch)); - await Promise.resolve(); + await resolveOnNextTick(); push(1); push(2); push(3); - const batch2 = await batch2Promise; - assert(batch2 !== undefined); - expect(Array.from(batch2)).to.deep.equal([1, 2, 3]); + expect(await sub.next()).to.deep.equal({ done: false, value: [1, 2, 3] }); }); - it('should ignore sync errors in the executor', async () => { + it('should stop on sync error in the executor', async () => { const queue = new Queue(() => { throw new Error('Oops'); }); - const batch1 = queue.currentBatch(); - expect(batch1).to.equal(undefined); - const batch2Promise = queue.nextBatch(); - queue.stop(); - expect(await batch2Promise).to.equal(undefined); + + const sub = queue.subscribe((batch) => Array.from(batch)); + expect(await sub.next()).to.deep.equal({ done: true, value: undefined }); }); - it('should ignore async errors in the executor', async () => { + it('should stop on async errors in the executor', async () => { const queue = new Queue(async () => { - await Promise.resolve(); + await resolveOnNextTick(); throw new Error('Oops'); }); - const batch1 = queue.currentBatch(); - expect(batch1).to.equal(undefined); - const batch2Promise = queue.nextBatch(); - queue.stop(); - expect(await batch2Promise).to.equal(undefined); + + const sub = queue.subscribe((batch) => Array.from(batch)); + expect(await sub.next()).to.deep.equal({ done: true, value: undefined }); + }); + + it('should skip payloads when mapped to undefined, skipping first async payload', async () => { + const queue = new Queue(async (push) => { + await resolveOnNextTick(); + push(1); + await resolveOnNextTick(); + push(2); + await resolveOnNextTick(); + push(3); + await resolveOnNextTick(); + push(4); + await resolveOnNextTick(); + push(5); + await resolveOnNextTick(); + push(6); + }); + + const sub = queue.subscribe((batch) => { + const arr = Array.from(batch); + if (arr[0] % 2 === 0) { + return arr; + } + }); + expect(await sub.next()).to.deep.equal({ done: false, value: [2] }); + // [3, 4, 5] are batched as we await 2: + // - one tick for the [AsyncGeneratorResumeNext] job + // - one tick for the await within the withCleanUp next() + expect(await sub.next()).to.deep.equal({ done: false, value: [6] }); + }); + + it('should skip payloads when mapped to undefined, skipping second async payload', async () => { + const queue = new Queue(async (push) => { + await resolveOnNextTick(); + push(0); + await resolveOnNextTick(); + push(1); + await resolveOnNextTick(); + push(2); + await resolveOnNextTick(); + push(3); + await resolveOnNextTick(); + push(4); + }); + + const sub = queue.subscribe((batch) => { + const arr = Array.from(batch); + if (arr[0] % 2 === 0) { + return arr; + } + }); + expect(await sub.next()).to.deep.equal({ done: false, value: [0] }); + // [1, 2, 3] are batched as we await 0 + // - one tick for the [AsyncGeneratorResumeNext] job + // - one tick for the await within the withCleanUp next() + expect(await sub.next()).to.deep.equal({ done: false, value: [4] }); }); }); From f94ae8173868a71f56171f1f5727fd922cc81bfa Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Mon, 27 Oct 2025 20:26:07 +0200 Subject: [PATCH 3/8] add test --- src/execution/__tests__/Queue-test.ts | 28 +++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/src/execution/__tests__/Queue-test.ts b/src/execution/__tests__/Queue-test.ts index dc815dc72c..168c268ab5 100644 --- a/src/execution/__tests__/Queue-test.ts +++ b/src/execution/__tests__/Queue-test.ts @@ -161,4 +161,32 @@ describe('Queue', () => { // - one tick for the await within the withCleanUp next() expect(await sub.next()).to.deep.equal({ done: false, value: [4] }); }); + + it('should condense pushes during map into the same batch', async () => { + let push!: (item: number) => void; + const queue = new Queue((_push) => { + push = _push; + }); + + await resolveOnNextTick(); + push(1); + push(2); + + const itemsToAdd = [3, 4]; + const items: Array = []; + const sub = queue.subscribe((batch) => { + for (const item of batch) { + const itemToAdd = itemsToAdd.shift(); + if (itemToAdd !== undefined) { + push(itemToAdd); + } + items.push(item); + } + return items; + }); + expect(await sub.next()).to.deep.equal({ + done: false, + value: [1, 2, 3, 4], + }); + }); }); From 2cc397853522f3294959d2ed364ee3a4ee3e9dc4 Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Mon, 27 Oct 2025 20:41:56 +0200 Subject: [PATCH 4/8] condense loop --- src/execution/Queue.ts | 48 +++++++++++---------------- src/execution/__tests__/Queue-test.ts | 27 --------------- 2 files changed, 20 insertions(+), 55 deletions(-) diff --git a/src/execution/Queue.ts b/src/execution/Queue.ts index 7a4b93a482..a2dea2f03f 100644 --- a/src/execution/Queue.ts +++ b/src/execution/Queue.ts @@ -50,42 +50,34 @@ export class Queue { private async *subscribeImpl( mapFn: (generator: Generator) => U | undefined, ): AsyncGenerator { - if (this._stopped) { - return; - } - - while (this._items.length > 0) { - const maybe = mapFn(this.batch()); - if (maybe === undefined) { - continue; - } - yield maybe; + while (true) { if (this._stopped) { return; } - } - let batch: Generator | undefined; - // eslint-disable-next-line no-await-in-loop - while ((batch = await this._nextBatch()) !== undefined) { - let maybe = mapFn(batch); - if (maybe === undefined) { - continue; - } - yield maybe; - if (this._stopped) { - return; - } - while (this._items.length > 0) { - maybe = mapFn(this.batch()); - if (maybe === undefined) { - continue; - } - yield maybe; + let mapped; + // drain any items pushed prior to or between .next() calls + while ( + this._items.length > 0 && + (mapped = mapFn(this.batch())) !== undefined + ) { + yield mapped; if (this._stopped) { return; } } + + // wait for a yield-able batch + do { + // eslint-disable-next-line no-await-in-loop + const nextBatch = await this._nextBatch(); + if (nextBatch === undefined || this._stopped) { + return; + } + mapped = mapFn(nextBatch); + } while (mapped === undefined); + + yield mapped; } } diff --git a/src/execution/__tests__/Queue-test.ts b/src/execution/__tests__/Queue-test.ts index 168c268ab5..423531c682 100644 --- a/src/execution/__tests__/Queue-test.ts +++ b/src/execution/__tests__/Queue-test.ts @@ -135,33 +135,6 @@ describe('Queue', () => { expect(await sub.next()).to.deep.equal({ done: false, value: [6] }); }); - it('should skip payloads when mapped to undefined, skipping second async payload', async () => { - const queue = new Queue(async (push) => { - await resolveOnNextTick(); - push(0); - await resolveOnNextTick(); - push(1); - await resolveOnNextTick(); - push(2); - await resolveOnNextTick(); - push(3); - await resolveOnNextTick(); - push(4); - }); - - const sub = queue.subscribe((batch) => { - const arr = Array.from(batch); - if (arr[0] % 2 === 0) { - return arr; - } - }); - expect(await sub.next()).to.deep.equal({ done: false, value: [0] }); - // [1, 2, 3] are batched as we await 0 - // - one tick for the [AsyncGeneratorResumeNext] job - // - one tick for the await within the withCleanUp next() - expect(await sub.next()).to.deep.equal({ done: false, value: [4] }); - }); - it('should condense pushes during map into the same batch', async () => { let push!: (item: number) => void; const queue = new Queue((_push) => { From cb1c006dfb41403cd5b5772294d001923b99dfcb Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Mon, 27 Oct 2025 21:14:29 +0200 Subject: [PATCH 5/8] remove unnecessary check --- src/execution/Queue.ts | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/execution/Queue.ts b/src/execution/Queue.ts index a2dea2f03f..2f7fb0866d 100644 --- a/src/execution/Queue.ts +++ b/src/execution/Queue.ts @@ -90,9 +90,6 @@ export class Queue { } private _push(item: T): void { - if (this._stopped) { - this._resolve(undefined); - } this._items.push(item); this._resolve(this.batch()); } From 896db1bc5f9b98b99dda20d480f269cbfff1cdc4 Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Mon, 27 Oct 2025 21:15:05 +0200 Subject: [PATCH 6/8] remove unnecessary stop on error for our purposes, we can just halt the executor --- src/execution/Queue.ts | 6 ++---- src/execution/__tests__/Queue-test.ts | 25 +++---------------------- 2 files changed, 5 insertions(+), 26 deletions(-) diff --git a/src/execution/Queue.ts b/src/execution/Queue.ts index 2f7fb0866d..b088050c0f 100644 --- a/src/execution/Queue.ts +++ b/src/execution/Queue.ts @@ -25,13 +25,11 @@ export class Queue { try { result = executor(this._push.bind(this), this.stop.bind(this)); } catch { - // Ignore actual error - this.stop(); + // ignore errors } if (isPromise(result)) { result.catch(() => { - // Ignore actual error - this.stop(); + // ignore errors }); } } diff --git a/src/execution/__tests__/Queue-test.ts b/src/execution/__tests__/Queue-test.ts index 423531c682..d18a188d20 100644 --- a/src/execution/__tests__/Queue-test.ts +++ b/src/execution/__tests__/Queue-test.ts @@ -49,16 +49,16 @@ describe('Queue', () => { }); it('should allow the executor to indicate completion', async () => { - const queue = new Queue(async (_push, stop) => { - await resolveOnNextTick(); + const queue = new Queue((push, stop) => { stop(); + push(1); // should be ignored }); const sub = queue.subscribe((batch) => batch); expect(await sub.next()).to.deep.equal({ done: true, value: undefined }); }); - it('should allow a consumer to abort a pending call to nextBatch', async () => { + it('should allow a consumer to abort a pending call to next', async () => { const queue = new Queue(async () => { const { promise } = promiseWithResolvers(); // wait forever @@ -87,25 +87,6 @@ describe('Queue', () => { expect(await sub.next()).to.deep.equal({ done: false, value: [1, 2, 3] }); }); - it('should stop on sync error in the executor', async () => { - const queue = new Queue(() => { - throw new Error('Oops'); - }); - - const sub = queue.subscribe((batch) => Array.from(batch)); - expect(await sub.next()).to.deep.equal({ done: true, value: undefined }); - }); - - it('should stop on async errors in the executor', async () => { - const queue = new Queue(async () => { - await resolveOnNextTick(); - throw new Error('Oops'); - }); - - const sub = queue.subscribe((batch) => Array.from(batch)); - expect(await sub.next()).to.deep.equal({ done: true, value: undefined }); - }); - it('should skip payloads when mapped to undefined, skipping first async payload', async () => { const queue = new Queue(async (push) => { await resolveOnNextTick(); From d80c2a1782128073b1cc82c7a5e0df9609e8c7a0 Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Mon, 27 Oct 2025 21:18:23 +0200 Subject: [PATCH 7/8] add a bit of comments --- src/execution/Queue.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/execution/Queue.ts b/src/execution/Queue.ts index b088050c0f..08047b3cd3 100644 --- a/src/execution/Queue.ts +++ b/src/execution/Queue.ts @@ -25,11 +25,11 @@ export class Queue { try { result = executor(this._push.bind(this), this.stop.bind(this)); } catch { - // ignore errors + // ignore sync executor errors } if (isPromise(result)) { result.catch(() => { - // ignore errors + /* ignore async executor errors */ }); } } From 2f43ceac775f4fed627033c3630696dbe8d32093 Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Mon, 27 Oct 2025 21:56:35 +0200 Subject: [PATCH 8/8] add back some tests --- src/execution/__tests__/Queue-test.ts | 28 +++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/src/execution/__tests__/Queue-test.ts b/src/execution/__tests__/Queue-test.ts index d18a188d20..02c7533e4e 100644 --- a/src/execution/__tests__/Queue-test.ts +++ b/src/execution/__tests__/Queue-test.ts @@ -87,6 +87,34 @@ describe('Queue', () => { expect(await sub.next()).to.deep.equal({ done: false, value: [1, 2, 3] }); }); + it('should ignore sync error in the executor', async () => { + let push!: (item: number) => void; + const queue = new Queue((_push) => { + push = _push; + throw new Error('Oops'); + }); + + push(1); + + const sub = queue.subscribe((batch) => Array.from(batch)); + expect(await sub.next()).to.deep.equal({ done: false, value: [1] }); + }); + + it('should ignore async error in the executor', async () => { + let push!: (item: number) => void; + const queue = new Queue(async (_push) => { + push = _push; + await resolveOnNextTick(); + throw new Error('Oops'); + }); + + await resolveOnNextTick(); + push(1); + + const sub = queue.subscribe((batch) => Array.from(batch)); + expect(await sub.next()).to.deep.equal({ done: false, value: [1] }); + }); + it('should skip payloads when mapped to undefined, skipping first async payload', async () => { const queue = new Queue(async (push) => { await resolveOnNextTick();