diff --git a/src/execution/IncrementalGraph.ts b/src/execution/IncrementalGraph.ts index 98dcdb3fec..971e08a271 100644 --- a/src/execution/IncrementalGraph.ts +++ b/src/execution/IncrementalGraph.ts @@ -1,10 +1,10 @@ import { BoxedPromiseOrValue } from '../jsutils/BoxedPromiseOrValue.js'; import { invariant } from '../jsutils/invariant.js'; import { isPromise } from '../jsutils/isPromise.js'; -import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js'; import type { GraphQLError } from '../error/GraphQLError.js'; +import { Queue } from './Queue.js'; import type { DeferredFragmentRecord, DeliveryGroup, @@ -22,16 +22,18 @@ import { isDeferredFragmentRecord, isPendingExecutionGroup } from './types.js'; */ export class IncrementalGraph { private _rootNodes: Set; - - private _completedQueue: Array; - private _nextQueue: Array< - (iterable: Iterable | undefined) => void - >; + private _completed: Queue; + // _push and _stop are assigned in the executor which is executed + // synchronously by the Queue constructor. + private _push!: (item: IncrementalDataRecordResult) => void; + private _stop!: () => void; constructor() { this._rootNodes = new Set(); - this._completedQueue = []; - this._nextQueue = []; + this._completed = new Queue((push, stop) => { + this._push = push; + this._stop = stop; + }); } getNewRootNodes( @@ -92,29 +94,6 @@ export class IncrementalGraph { } } - *currentCompletedBatch(): Generator { - let completed; - while ((completed = this._completedQueue.shift()) !== undefined) { - yield completed; - } - } - - nextCompletedBatch(): Promise< - Iterable | undefined - > { - const { promise, resolve } = promiseWithResolvers< - Iterable | undefined - >(); - this._nextQueue.push(resolve); - return promise; - } - - abort(): void { - for (const resolve of this._nextQueue) { - resolve(undefined); - } - } - hasNext(): boolean { return this._rootNodes.size > 0; } @@ -146,17 +125,30 @@ export class IncrementalGraph { const newRootNodes = this._promoteNonEmptyToRoot( deferredFragmentRecord.children, ); + this._maybeStop(); return { newRootNodes, successfulExecutionGroups }; } removeDeferredFragment( deferredFragmentRecord: DeferredFragmentRecord, ): boolean { - return this._rootNodes.delete(deferredFragmentRecord); + const deleted = this._rootNodes.delete(deferredFragmentRecord); + if (!deleted) { + return false; + } + this._maybeStop(); + return true; } removeStream(streamRecord: StreamRecord): void { this._rootNodes.delete(streamRecord); + this._maybeStop(); + } + + subscribe( + mapFn: (generator: Generator) => U | undefined, + ): AsyncGenerator { + return this._completed.subscribe(mapFn); } private _addIncrementalDataRecords( @@ -246,9 +238,9 @@ export class IncrementalGraph { const value = completedExecutionGroup.value; if (isPromise(value)) { // eslint-disable-next-line @typescript-eslint/no-floating-promises - value.then((resolved) => this._enqueue(resolved)); + value.then((resolved) => this._push(resolved)); } else { - this._enqueue(value); + this._push(value); } } @@ -266,7 +258,7 @@ export class IncrementalGraph { : streamItemRecord().value; if (isPromise(result)) { if (items.length > 0) { - this._enqueue({ + this._push({ streamRecord, result: // TODO add additional test case or rework for coverage @@ -290,14 +282,14 @@ export class IncrementalGraph { } if (result.item === undefined) { if (items.length > 0) { - this._enqueue({ + this._push({ streamRecord, result: errors.length > 0 ? { items, errors } : { items }, newDeferredFragmentRecords, incrementalDataRecords, }); } - this._enqueue( + this._push( result.errors === undefined ? { streamRecord } : { @@ -320,12 +312,9 @@ export class IncrementalGraph { } } - private _enqueue(completed: IncrementalDataRecordResult): void { - this._completedQueue.push(completed); - const next = this._nextQueue.shift(); - if (next === undefined) { - return; + private _maybeStop(): void { + if (!this.hasNext()) { + this._stop(); } - next(this.currentCompletedBatch()); } } diff --git a/src/execution/IncrementalPublisher.ts b/src/execution/IncrementalPublisher.ts index 1bf83bb65f..201179874f 100644 --- a/src/execution/IncrementalPublisher.ts +++ b/src/execution/IncrementalPublisher.ts @@ -64,13 +64,11 @@ interface SubsequentIncrementalExecutionResultContext { * @internal */ class IncrementalPublisher { - private _isDone: boolean; private _context: IncrementalPublisherContext; private _nextId: number; private _incrementalGraph: IncrementalGraph; constructor(context: IncrementalPublisherContext) { - this._isDone = false; this._context = context; this._nextId = 0; this._incrementalGraph = new IncrementalGraph(); @@ -95,14 +93,17 @@ class IncrementalPublisher { ? { errors, data, pending, hasNext: true } : { data, pending, hasNext: true }; - const subsequentResults = withCleanup(this._subscribe(), async () => { - this._isDone = true; - this._context.abortSignalListener?.disconnect(); - this._incrementalGraph.abort(); - await this._returnAsyncIteratorsIgnoringErrors(); - }); + const subsequentResults = this._incrementalGraph.subscribe((batch) => + this._handleCompletedBatch(batch), + ); - return { initialResult, subsequentResults }; + return { + initialResult, + subsequentResults: withCleanup(subsequentResults, async () => { + this._context.abortSignalListener?.disconnect(); + await this._returnAsyncIteratorsIgnoringErrors(); + }), + }; } private _toPendingResults( @@ -128,55 +129,39 @@ class IncrementalPublisher { return String(this._nextId++); } - private async *_subscribe(): AsyncGenerator< - SubsequentIncrementalExecutionResult, - void, - void - > { - while (!this._isDone) { - const context: SubsequentIncrementalExecutionResultContext = { - pending: [], - incremental: [], - completed: [], - }; - - let batch: Iterable | undefined = - this._incrementalGraph.currentCompletedBatch(); - do { - for (const completedResult of batch) { - this._handleCompletedIncrementalData(completedResult, context); - } - - const { incremental, completed } = context; - if (incremental.length > 0 || completed.length > 0) { - const hasNext = this._incrementalGraph.hasNext(); - - if (!hasNext) { - this._isDone = true; - } + private _handleCompletedBatch( + batch: Iterable, + ): SubsequentIncrementalExecutionResult | undefined { + const context: SubsequentIncrementalExecutionResultContext = { + pending: [], + incremental: [], + completed: [], + }; - const subsequentIncrementalExecutionResult: SubsequentIncrementalExecutionResult = - { hasNext }; + for (const completedResult of batch) { + this._handleCompletedIncrementalData(completedResult, context); + } - const pending = context.pending; - if (pending.length > 0) { - subsequentIncrementalExecutionResult.pending = pending; - } - if (incremental.length > 0) { - subsequentIncrementalExecutionResult.incremental = incremental; - } - if (completed.length > 0) { - subsequentIncrementalExecutionResult.completed = completed; - } + const { incremental, completed } = context; + if (incremental.length === 0 && completed.length === 0) { + return; + } - yield subsequentIncrementalExecutionResult; - break; - } + const hasNext = this._incrementalGraph.hasNext(); - // eslint-disable-next-line no-await-in-loop - batch = await this._incrementalGraph.nextCompletedBatch(); - } while (batch !== undefined); + const subsequentIncrementalExecutionResult: SubsequentIncrementalExecutionResult = + { hasNext }; + const pending = context.pending; + if (pending.length > 0) { + subsequentIncrementalExecutionResult.pending = pending; + } + if (incremental.length > 0) { + subsequentIncrementalExecutionResult.incremental = incremental; + } + if (completed.length > 0) { + subsequentIncrementalExecutionResult.completed = completed; } + return subsequentIncrementalExecutionResult; } private _handleCompletedIncrementalData( diff --git a/src/execution/Queue.ts b/src/execution/Queue.ts new file mode 100644 index 0000000000..08047b3cd3 --- /dev/null +++ b/src/execution/Queue.ts @@ -0,0 +1,108 @@ +import { isPromise } from '../jsutils/isPromise.js'; +import type { PromiseOrValue } from '../jsutils/PromiseOrValue.js'; +import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js'; + +import { withCleanup } from './withCleanup.js'; + +/** + * @internal + */ +export class Queue { + private _items: Array; + private _stopped: boolean; + private _resolvers: Array<(iterable: Generator | undefined) => void>; + + constructor( + executor: ( + push: (item: T) => void, + stop: () => void, + ) => PromiseOrValue, + ) { + this._items = []; + this._stopped = false; + this._resolvers = []; + let result; + try { + result = executor(this._push.bind(this), this.stop.bind(this)); + } catch { + // ignore sync executor errors + } + if (isPromise(result)) { + result.catch(() => { + /* ignore async executor errors */ + }); + } + } + + stop(): void { + this._stopped = true; + this._resolve(undefined); + } + + subscribe( + mapFn: (generator: Generator) => U | undefined, + ): AsyncGenerator { + return withCleanup(this.subscribeImpl(mapFn), () => this.stop()); + } + + private async *subscribeImpl( + mapFn: (generator: Generator) => U | undefined, + ): AsyncGenerator { + while (true) { + if (this._stopped) { + return; + } + + let mapped; + // drain any items pushed prior to or between .next() calls + while ( + this._items.length > 0 && + (mapped = mapFn(this.batch())) !== undefined + ) { + yield mapped; + if (this._stopped) { + return; + } + } + + // wait for a yield-able batch + do { + // eslint-disable-next-line no-await-in-loop + const nextBatch = await this._nextBatch(); + if (nextBatch === undefined || this._stopped) { + return; + } + mapped = mapFn(nextBatch); + } while (mapped === undefined); + + yield mapped; + } + } + + private _nextBatch(): Promise | undefined> { + const { promise, resolve } = promiseWithResolvers< + Generator | undefined + >(); + this._resolvers.push(resolve); + return promise; + } + + private _push(item: T): void { + this._items.push(item); + this._resolve(this.batch()); + } + + private _resolve(maybeIterable: Generator | undefined): void { + for (const resolve of this._resolvers) { + resolve(maybeIterable); + } + this._resolvers = []; + } + + private *batch(): Generator { + let item: T | undefined; + while ((item = this._items.shift()) !== undefined) { + yield item; + } + } +} diff --git a/src/execution/__tests__/Queue-test.ts b/src/execution/__tests__/Queue-test.ts new file mode 100644 index 0000000000..02c7533e4e --- /dev/null +++ b/src/execution/__tests__/Queue-test.ts @@ -0,0 +1,174 @@ +import { expect } from 'chai'; +import { describe, it } from 'mocha'; + +import { resolveOnNextTick } from '../../__testUtils__/resolveOnNextTick.js'; + +import { promiseWithResolvers } from '../../jsutils/promiseWithResolvers.js'; + +import { Queue } from '../Queue.js'; + +describe('Queue', () => { + it('should yield sync pushed items in order', async () => { + const queue = new Queue((push) => { + push(1); + push(2); + push(3); + }); + + const sub = queue.subscribe((batch) => Array.from(batch)); + expect(await sub.next()).to.deep.equal({ done: false, value: [1, 2, 3] }); + }); + + it('should yield async pushed items in order', async () => { + const queue = new Queue(async (push) => { + await resolveOnNextTick(); + push(1); + push(2); + push(3); + }); + + const sub = queue.subscribe((batch) => Array.from(batch)); + expect(await sub.next()).to.deep.equal({ done: false, value: [1, 2, 3] }); + }); + + it('should yield multiple batches', async () => { + const queue = new Queue(async (push) => { + await resolveOnNextTick(); + push(1); + push(2); + push(3); + await resolveOnNextTick(); + push(4); + push(5); + push(6); + }); + + const sub = queue.subscribe((batch) => Array.from(batch)); + expect(await sub.next()).to.deep.equal({ done: false, value: [1, 2, 3] }); + expect(await sub.next()).to.deep.equal({ done: false, value: [4, 5, 6] }); + }); + + it('should allow the executor to indicate completion', async () => { + const queue = new Queue((push, stop) => { + stop(); + push(1); // should be ignored + }); + + const sub = queue.subscribe((batch) => batch); + expect(await sub.next()).to.deep.equal({ done: true, value: undefined }); + }); + + it('should allow a consumer to abort a pending call to next', async () => { + const queue = new Queue(async () => { + const { promise } = promiseWithResolvers(); + // wait forever + await promise; + }); + + const sub = queue.subscribe((batch) => batch); + const nextPromise = sub.next(); + queue.stop(); + expect(await nextPromise).to.deep.equal({ done: true, value: undefined }); + }); + + it('should allow saving the push function', async () => { + let push!: (item: number) => void; + const queue = new Queue((_push) => { + push = _push; + }); + + const sub = queue.subscribe((batch) => Array.from(batch)); + + await resolveOnNextTick(); + push(1); + push(2); + push(3); + + expect(await sub.next()).to.deep.equal({ done: false, value: [1, 2, 3] }); + }); + + it('should ignore sync error in the executor', async () => { + let push!: (item: number) => void; + const queue = new Queue((_push) => { + push = _push; + throw new Error('Oops'); + }); + + push(1); + + const sub = queue.subscribe((batch) => Array.from(batch)); + expect(await sub.next()).to.deep.equal({ done: false, value: [1] }); + }); + + it('should ignore async error in the executor', async () => { + let push!: (item: number) => void; + const queue = new Queue(async (_push) => { + push = _push; + await resolveOnNextTick(); + throw new Error('Oops'); + }); + + await resolveOnNextTick(); + push(1); + + const sub = queue.subscribe((batch) => Array.from(batch)); + expect(await sub.next()).to.deep.equal({ done: false, value: [1] }); + }); + + it('should skip payloads when mapped to undefined, skipping first async payload', async () => { + const queue = new Queue(async (push) => { + await resolveOnNextTick(); + push(1); + await resolveOnNextTick(); + push(2); + await resolveOnNextTick(); + push(3); + await resolveOnNextTick(); + push(4); + await resolveOnNextTick(); + push(5); + await resolveOnNextTick(); + push(6); + }); + + const sub = queue.subscribe((batch) => { + const arr = Array.from(batch); + if (arr[0] % 2 === 0) { + return arr; + } + }); + expect(await sub.next()).to.deep.equal({ done: false, value: [2] }); + // [3, 4, 5] are batched as we await 2: + // - one tick for the [AsyncGeneratorResumeNext] job + // - one tick for the await within the withCleanUp next() + expect(await sub.next()).to.deep.equal({ done: false, value: [6] }); + }); + + it('should condense pushes during map into the same batch', async () => { + let push!: (item: number) => void; + const queue = new Queue((_push) => { + push = _push; + }); + + await resolveOnNextTick(); + push(1); + push(2); + + const itemsToAdd = [3, 4]; + const items: Array = []; + const sub = queue.subscribe((batch) => { + for (const item of batch) { + const itemToAdd = itemsToAdd.shift(); + if (itemToAdd !== undefined) { + push(itemToAdd); + } + items.push(item); + } + return items; + }); + expect(await sub.next()).to.deep.equal({ + done: false, + value: [1, 2, 3, 4], + }); + }); +}); diff --git a/src/execution/__tests__/stream-test.ts b/src/execution/__tests__/stream-test.ts index ada96d9c44..305dd74d43 100644 --- a/src/execution/__tests__/stream-test.ts +++ b/src/execution/__tests__/stream-test.ts @@ -2432,14 +2432,13 @@ describe('Execute: stream directive', () => { hasNext: true, }); - const returnPromise = iterator.return(); + await iterator.return(); const result2 = await iterator.next(); expectJSON(result2).toDeepEqual({ done: true, value: undefined, }); - await returnPromise; }); it('Returns underlying async iterables when returned generator is thrown', async () => { let index = 0; @@ -2495,14 +2494,14 @@ describe('Execute: stream directive', () => { hasNext: true, }); - const throwPromise = iterator.throw(new Error('bad')); + await expectPromise(iterator.throw(new Error('bad'))).toRejectWith('bad'); const result2 = await iterator.next(); expectJSON(result2).toDeepEqual({ done: true, value: undefined, }); - await expectPromise(throwPromise).toRejectWith('bad'); + assert(returned); }); it('Returns underlying async iterables when uses resource is disposed', async () => {