Skip to content

Commit ba4a829

Browse files
committed
use Queue for streams
1 parent a01f596 commit ba4a829

File tree

6 files changed

+993
-426
lines changed

6 files changed

+993
-426
lines changed

src/execution/IncrementalGraph.ts

Lines changed: 70 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { BoxedPromiseOrValue } from '../jsutils/BoxedPromiseOrValue.js';
22
import { invariant } from '../jsutils/invariant.js';
33
import { isPromise } from '../jsutils/isPromise.js';
4+
import type { PromiseOrValue } from '../jsutils/PromiseOrValue.js';
45

56
import type { GraphQLError } from '../error/GraphQLError.js';
67

@@ -11,7 +12,6 @@ import type {
1112
IncrementalDataRecord,
1213
IncrementalDataRecordResult,
1314
PendingExecutionGroup,
14-
StreamItemRecord,
1515
StreamRecord,
1616
SuccessfulExecutionGroup,
1717
} from './types.js';
@@ -20,20 +20,26 @@ import { isDeferredFragmentRecord, isPendingExecutionGroup } from './types.js';
2020
/**
2121
* @internal
2222
*/
23-
export class IncrementalGraph {
23+
export class IncrementalGraph<U> {
2424
private _rootNodes: Set<DeliveryGroup>;
25-
private _completed: Queue<IncrementalDataRecordResult>;
25+
private _completed: AsyncGenerator<U, void, void>;
2626
// _push and _stop are assigned in the executor which is executed
2727
// synchronously by the Queue constructor.
28-
private _push!: (item: IncrementalDataRecordResult) => void;
28+
private _push!: (item: IncrementalDataRecordResult) => PromiseOrValue<void>;
2929
private _stop!: () => void;
3030

31-
constructor() {
31+
constructor(
32+
reducer: (
33+
generator: Generator<IncrementalDataRecordResult>,
34+
) => U | undefined,
35+
) {
3236
this._rootNodes = new Set();
33-
this._completed = new Queue<IncrementalDataRecordResult>((push, stop) => {
34-
this._push = push;
35-
this._stop = stop;
36-
});
37+
this._completed = new Queue<IncrementalDataRecordResult>(
38+
({ push, stop }) => {
39+
this._push = push;
40+
this._stop = stop;
41+
},
42+
).subscribe(reducer);
3743
}
3844

3945
getNewRootNodes(
@@ -145,10 +151,8 @@ export class IncrementalGraph {
145151
this._maybeStop();
146152
}
147153

148-
subscribe<U>(
149-
mapFn: (generator: Generator<IncrementalDataRecordResult>) => U | undefined,
150-
): AsyncGenerator<U, void, void> {
151-
return this._completed.subscribe(mapFn);
154+
subscribe(): AsyncGenerator<U, void, void> {
155+
return this._completed;
152156
}
153157

154158
private _addIncrementalDataRecords(
@@ -238,77 +242,68 @@ export class IncrementalGraph {
238242
const value = completedExecutionGroup.value;
239243
if (isPromise(value)) {
240244
// eslint-disable-next-line @typescript-eslint/no-floating-promises
241-
value.then((resolved) => this._push(resolved));
245+
value.then((resolved) => {
246+
// eslint-disable-next-line @typescript-eslint/no-floating-promises
247+
this._push(resolved);
248+
});
242249
} else {
250+
// eslint-disable-next-line @typescript-eslint/no-floating-promises
243251
this._push(value);
244252
}
245253
}
246254

247255
private async _onStreamItems(streamRecord: StreamRecord): Promise<void> {
248-
let items: Array<unknown> = [];
249-
let errors: Array<GraphQLError> = [];
250-
let newDeferredFragmentRecords: Array<DeferredFragmentRecord> = [];
251-
let incrementalDataRecords: Array<IncrementalDataRecord> = [];
252256
const streamItemQueue = streamRecord.streamItemQueue;
253-
let streamItemRecord: StreamItemRecord | undefined;
254-
while ((streamItemRecord = streamItemQueue.shift()) !== undefined) {
255-
let result =
256-
streamItemRecord instanceof BoxedPromiseOrValue
257-
? streamItemRecord.value
258-
: streamItemRecord().value;
259-
if (isPromise(result)) {
260-
if (items.length > 0) {
261-
this._push({
262-
streamRecord,
263-
result:
264-
// TODO add additional test case or rework for coverage
265-
errors.length > 0 /* c8 ignore start */
266-
? { items, errors } /* c8 ignore stop */
267-
: { items },
268-
newDeferredFragmentRecords,
269-
incrementalDataRecords,
270-
});
271-
items = [];
272-
errors = [];
273-
newDeferredFragmentRecords = [];
274-
incrementalDataRecords = [];
257+
let closed = false;
258+
try {
259+
await streamItemQueue.forEachBatch((streamItemResults) => {
260+
const items: Array<unknown> = [];
261+
const errors: Array<GraphQLError> = [];
262+
const newDeferredFragmentRecords: Array<DeferredFragmentRecord> = [];
263+
const incrementalDataRecords: Array<IncrementalDataRecord> = [];
264+
265+
for (const result of streamItemResults) {
266+
items.push(result.item);
267+
if (result.errors !== undefined) {
268+
errors.push(...result.errors);
269+
}
270+
if (result.newDeferredFragmentRecords !== undefined) {
271+
newDeferredFragmentRecords.push(
272+
...result.newDeferredFragmentRecords,
273+
);
274+
}
275+
if (result.incrementalDataRecords !== undefined) {
276+
incrementalDataRecords.push(...result.incrementalDataRecords);
277+
}
275278
}
276-
// eslint-disable-next-line no-await-in-loop
277-
result = await result;
278-
// wait an additional tick to coalesce resolving additional promises
279-
// within the queue
280-
// eslint-disable-next-line no-await-in-loop
281-
await Promise.resolve();
282-
}
283-
if (result.item === undefined) {
284-
if (items.length > 0) {
285-
this._push({
286-
streamRecord,
287-
result: errors.length > 0 ? { items, errors } : { items },
288-
newDeferredFragmentRecords,
289-
incrementalDataRecords,
290-
});
279+
280+
// eslint-disable-next-line @typescript-eslint/no-floating-promises
281+
this._push({
282+
streamRecord,
283+
result:
284+
// TODO add additional test case or rework for coverage
285+
errors.length > 0 /* c8 ignore start */
286+
? { items, errors } /* c8 ignore stop */
287+
: { items },
288+
newDeferredFragmentRecords,
289+
incrementalDataRecords,
290+
});
291+
292+
if (streamItemQueue.isStopped()) {
293+
closed = true;
294+
// eslint-disable-next-line @typescript-eslint/no-floating-promises
295+
this._push({ streamRecord });
291296
}
292-
this._push(
293-
result.errors === undefined
294-
? { streamRecord }
295-
: {
296-
streamRecord,
297-
errors: result.errors,
298-
},
299-
);
300-
return;
301-
}
302-
items.push(result.item);
303-
if (result.errors !== undefined) {
304-
errors.push(...result.errors);
305-
}
306-
if (result.newDeferredFragmentRecords !== undefined) {
307-
newDeferredFragmentRecords.push(...result.newDeferredFragmentRecords);
308-
}
309-
if (result.incrementalDataRecords !== undefined) {
310-
incrementalDataRecords.push(...result.incrementalDataRecords);
311-
}
297+
});
298+
} catch (error) {
299+
// eslint-disable-next-line @typescript-eslint/no-floating-promises
300+
this._push({ streamRecord, errors: [error] });
301+
return;
302+
}
303+
304+
if (!closed) {
305+
// eslint-disable-next-line @typescript-eslint/no-floating-promises
306+
this._push({ streamRecord });
312307
}
313308
}
314309

src/execution/IncrementalPublisher.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ class IncrementalPublisher {
6262
private _earlyReturns: Map<StreamRecord, () => Promise<unknown>>;
6363
private _abortSignalListener: AbortSignalListener | undefined;
6464
private _nextId: number;
65-
private _incrementalGraph: IncrementalGraph;
65+
private _incrementalGraph: IncrementalGraph<SubsequentIncrementalExecutionResult>;
6666

6767
constructor(
6868
earlyReturns: Map<StreamRecord, () => Promise<unknown>>,
@@ -71,7 +71,9 @@ class IncrementalPublisher {
7171
this._earlyReturns = earlyReturns;
7272
this._abortSignalListener = abortSignalListener;
7373
this._nextId = 0;
74-
this._incrementalGraph = new IncrementalGraph();
74+
this._incrementalGraph = new IncrementalGraph((batch) =>
75+
this._handleCompletedBatch(batch),
76+
);
7577
}
7678

7779
buildResponse(
@@ -93,9 +95,7 @@ class IncrementalPublisher {
9395
? { errors, data, pending, hasNext: true }
9496
: { data, pending, hasNext: true };
9597

96-
const subsequentResults = this._incrementalGraph.subscribe((batch) =>
97-
this._handleCompletedBatch(batch),
98-
);
98+
const subsequentResults = this._incrementalGraph.subscribe();
9999

100100
return {
101101
initialResult,

0 commit comments

Comments
 (0)