Skip to content

Commit 3f71627

Browse files
committed
refactor: extract Queue from IncrementalPublisher and IncrementalGraph
Queue is a batching async-aware iterator-like protocol meant to be consumed in the following (and only the following) pattern: > let batch; > if ((batch = queue.currentBatch()) !== undefined) { > doSomethingWithBatch(batch); > } > while ((batch = await queue.nextBatch) !== undefined) { > doSomethingWithBatch(batch); > } A `push()` methods is provided in the Queue constructor (a la repeaters, see https://repeater.js.org/) so that `push()` can be private to the code that constructs the Queue (although it can be saved to be passed to other code). A `stop()` method is also provided as a the second argument to the constructor for convienence, but it is also available on the queue object itself so that it can be called by the executor or by the consumer. So this works: > const queue = new Queue( > async (push, stop) => { > push(1); > push(2) > await Promise.resolve(); > push(3); > stop(); > }, > ); > > const batch1 = Array.from(queue.nextBatch()); // batch1 = [1, 2] > const batch2 = Array.from(await queue.nextBatchAsync()); // batch2 = [3] as does this: > let push; > const queue = new Queue( > (_push) => { > push = _push; > }, > ); > > push(1); > push(2); > > const batch1 = Array.from(queue.nextBatch()); // batch1 = [1, 2] > > const batch2Promise = queue.nextBatchAsync(); > > await Promise.resolve(); > push(3); > queue.stop(); > > const batch2 = await batch2Promise; // batch2 = [3] Note: concurrent calls to `currentBatch()` and `nextBatch` will return the same batch and are not encouraged. A `toAsyncIterable(mapFn)` method transforms coalesces each batch of items into a single value (or undefined if not value is to be emitted for the batch). Using queues, we are able to remove all logic for handling the implicit queue from IncrementalPublisher, retaining only the `_handleCompletedBatch()`, while adding only the required `push()` and `stop()` calls within the IncrementalGraph. Tests do not change, except that `.return()` and `.throw()` (but not `next()` have an extra tick secondary to the additional layers of `withCleanup()`, so that the tests required slight adjustment.
1 parent aea5175 commit 3f71627

File tree

5 files changed

+311
-100
lines changed

5 files changed

+311
-100
lines changed

src/execution/IncrementalGraph.ts

Lines changed: 30 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
import { BoxedPromiseOrValue } from '../jsutils/BoxedPromiseOrValue.js';
22
import { invariant } from '../jsutils/invariant.js';
33
import { isPromise } from '../jsutils/isPromise.js';
4-
import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js';
54

65
import type { GraphQLError } from '../error/GraphQLError.js';
76

7+
import { Queue } from './Queue.js';
88
import type {
99
DeferredFragmentRecord,
1010
DeliveryGroup,
@@ -22,16 +22,18 @@ import { isDeferredFragmentRecord, isPendingExecutionGroup } from './types.js';
2222
*/
2323
export class IncrementalGraph {
2424
private _rootNodes: Set<DeliveryGroup>;
25-
26-
private _completedQueue: Array<IncrementalDataRecordResult>;
27-
private _nextQueue: Array<
28-
(iterable: Iterable<IncrementalDataRecordResult> | undefined) => void
29-
>;
25+
private _completed: Queue<IncrementalDataRecordResult>;
26+
// _push and _stop are assigned in the executor which is executed
27+
// synchronously by the Queue constructor.
28+
private _push!: (item: IncrementalDataRecordResult) => void;
29+
private _stop!: () => void;
3030

3131
constructor() {
3232
this._rootNodes = new Set();
33-
this._completedQueue = [];
34-
this._nextQueue = [];
33+
this._completed = new Queue<IncrementalDataRecordResult>((push, stop) => {
34+
this._push = push;
35+
this._stop = stop;
36+
});
3537
}
3638

3739
getNewRootNodes(
@@ -92,29 +94,6 @@ export class IncrementalGraph {
9294
}
9395
}
9496

95-
*currentCompletedBatch(): Generator<IncrementalDataRecordResult> {
96-
let completed;
97-
while ((completed = this._completedQueue.shift()) !== undefined) {
98-
yield completed;
99-
}
100-
}
101-
102-
nextCompletedBatch(): Promise<
103-
Iterable<IncrementalDataRecordResult> | undefined
104-
> {
105-
const { promise, resolve } = promiseWithResolvers<
106-
Iterable<IncrementalDataRecordResult> | undefined
107-
>();
108-
this._nextQueue.push(resolve);
109-
return promise;
110-
}
111-
112-
abort(): void {
113-
for (const resolve of this._nextQueue) {
114-
resolve(undefined);
115-
}
116-
}
117-
11897
hasNext(): boolean {
11998
return this._rootNodes.size > 0;
12099
}
@@ -146,17 +125,28 @@ export class IncrementalGraph {
146125
const newRootNodes = this._promoteNonEmptyToRoot(
147126
deferredFragmentRecord.children,
148127
);
128+
this._maybeStop();
149129
return { newRootNodes, successfulExecutionGroups };
150130
}
151131

152132
removeDeferredFragment(
153133
deferredFragmentRecord: DeferredFragmentRecord,
154134
): boolean {
155-
return this._rootNodes.delete(deferredFragmentRecord);
135+
const deleted = this._rootNodes.delete(deferredFragmentRecord);
136+
if (!deleted) {
137+
return false;
138+
}
139+
this._maybeStop();
140+
return true;
156141
}
157142

158143
removeStream(streamRecord: StreamRecord): void {
159144
this._rootNodes.delete(streamRecord);
145+
this._maybeStop();
146+
}
147+
148+
subscribe(): Queue<IncrementalDataRecordResult> {
149+
return this._completed;
160150
}
161151

162152
private _addIncrementalDataRecords(
@@ -246,9 +236,9 @@ export class IncrementalGraph {
246236
const value = completedExecutionGroup.value;
247237
if (isPromise(value)) {
248238
// eslint-disable-next-line @typescript-eslint/no-floating-promises
249-
value.then((resolved) => this._enqueue(resolved));
239+
value.then((resolved) => this._push(resolved));
250240
} else {
251-
this._enqueue(value);
241+
this._push(value);
252242
}
253243
}
254244

@@ -266,7 +256,7 @@ export class IncrementalGraph {
266256
: streamItemRecord().value;
267257
if (isPromise(result)) {
268258
if (items.length > 0) {
269-
this._enqueue({
259+
this._push({
270260
streamRecord,
271261
result:
272262
// TODO add additional test case or rework for coverage
@@ -290,14 +280,14 @@ export class IncrementalGraph {
290280
}
291281
if (result.item === undefined) {
292282
if (items.length > 0) {
293-
this._enqueue({
283+
this._push({
294284
streamRecord,
295285
result: errors.length > 0 ? { items, errors } : { items },
296286
newDeferredFragmentRecords,
297287
incrementalDataRecords,
298288
});
299289
}
300-
this._enqueue(
290+
this._push(
301291
result.errors === undefined
302292
? { streamRecord }
303293
: {
@@ -320,12 +310,9 @@ export class IncrementalGraph {
320310
}
321311
}
322312

323-
private _enqueue(completed: IncrementalDataRecordResult): void {
324-
this._completedQueue.push(completed);
325-
const next = this._nextQueue.shift();
326-
if (next === undefined) {
327-
return;
313+
private _maybeStop(): void {
314+
if (!this.hasNext()) {
315+
this._stop();
328316
}
329-
next(this.currentCompletedBatch());
330317
}
331318
}

src/execution/IncrementalPublisher.ts

Lines changed: 38 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -64,13 +64,11 @@ interface SubsequentIncrementalExecutionResultContext {
6464
* @internal
6565
*/
6666
class IncrementalPublisher {
67-
private _isDone: boolean;
6867
private _context: IncrementalPublisherContext;
6968
private _nextId: number;
7069
private _incrementalGraph: IncrementalGraph;
7170

7271
constructor(context: IncrementalPublisherContext) {
73-
this._isDone = false;
7472
this._context = context;
7573
this._nextId = 0;
7674
this._incrementalGraph = new IncrementalGraph();
@@ -95,14 +93,17 @@ class IncrementalPublisher {
9593
? { errors, data, pending, hasNext: true }
9694
: { data, pending, hasNext: true };
9795

98-
const subsequentResults = withCleanup(this._subscribe(), async () => {
99-
this._isDone = true;
100-
this._context.abortSignalListener?.disconnect();
101-
this._incrementalGraph.abort();
102-
await this._returnAsyncIteratorsIgnoringErrors();
103-
});
96+
const subsequentResults = this._incrementalGraph
97+
.subscribe()
98+
.toAsyncGenerator((batch) => this._handleCompletedBatch(batch));
10499

105-
return { initialResult, subsequentResults };
100+
return {
101+
initialResult,
102+
subsequentResults: withCleanup(subsequentResults, async () => {
103+
this._context.abortSignalListener?.disconnect();
104+
await this._returnAsyncIteratorsIgnoringErrors();
105+
}),
106+
};
106107
}
107108

108109
private _toPendingResults(
@@ -128,55 +129,39 @@ class IncrementalPublisher {
128129
return String(this._nextId++);
129130
}
130131

131-
private async *_subscribe(): AsyncGenerator<
132-
SubsequentIncrementalExecutionResult,
133-
void,
134-
void
135-
> {
136-
while (!this._isDone) {
137-
const context: SubsequentIncrementalExecutionResultContext = {
138-
pending: [],
139-
incremental: [],
140-
completed: [],
141-
};
142-
143-
let batch: Iterable<IncrementalDataRecordResult> | undefined =
144-
this._incrementalGraph.currentCompletedBatch();
145-
do {
146-
for (const completedResult of batch) {
147-
this._handleCompletedIncrementalData(completedResult, context);
148-
}
149-
150-
const { incremental, completed } = context;
151-
if (incremental.length > 0 || completed.length > 0) {
152-
const hasNext = this._incrementalGraph.hasNext();
153-
154-
if (!hasNext) {
155-
this._isDone = true;
156-
}
132+
private _handleCompletedBatch(
133+
batch: Iterable<IncrementalDataRecordResult>,
134+
): SubsequentIncrementalExecutionResult | undefined {
135+
const context: SubsequentIncrementalExecutionResultContext = {
136+
pending: [],
137+
incremental: [],
138+
completed: [],
139+
};
157140

158-
const subsequentIncrementalExecutionResult: SubsequentIncrementalExecutionResult =
159-
{ hasNext };
141+
for (const completedResult of batch) {
142+
this._handleCompletedIncrementalData(completedResult, context);
143+
}
160144

161-
const pending = context.pending;
162-
if (pending.length > 0) {
163-
subsequentIncrementalExecutionResult.pending = pending;
164-
}
165-
if (incremental.length > 0) {
166-
subsequentIncrementalExecutionResult.incremental = incremental;
167-
}
168-
if (completed.length > 0) {
169-
subsequentIncrementalExecutionResult.completed = completed;
170-
}
145+
const { incremental, completed } = context;
146+
if (incremental.length === 0 && completed.length === 0) {
147+
return;
148+
}
171149

172-
yield subsequentIncrementalExecutionResult;
173-
break;
174-
}
150+
const hasNext = this._incrementalGraph.hasNext();
175151

176-
// eslint-disable-next-line no-await-in-loop
177-
batch = await this._incrementalGraph.nextCompletedBatch();
178-
} while (batch !== undefined);
152+
const subsequentIncrementalExecutionResult: SubsequentIncrementalExecutionResult =
153+
{ hasNext };
154+
const pending = context.pending;
155+
if (pending.length > 0) {
156+
subsequentIncrementalExecutionResult.pending = pending;
157+
}
158+
if (incremental.length > 0) {
159+
subsequentIncrementalExecutionResult.incremental = incremental;
160+
}
161+
if (completed.length > 0) {
162+
subsequentIncrementalExecutionResult.completed = completed;
179163
}
164+
return subsequentIncrementalExecutionResult;
180165
}
181166

182167
private _handleCompletedIncrementalData(

src/execution/Queue.ts

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
import { isPromise } from '../jsutils/isPromise.js';
2+
import type { PromiseOrValue } from '../jsutils/PromiseOrValue.js';
3+
import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js';
4+
5+
import { withCleanup } from './withCleanup.js';
6+
7+
/**
8+
* @internal
9+
*/
10+
export class Queue<T> {
11+
private _items: Array<T>;
12+
private _stopped: boolean;
13+
private _resolvers: Array<(iterable: Generator<T> | undefined) => void>;
14+
15+
constructor(
16+
executor: (
17+
push: (item: T) => void,
18+
stop: () => void,
19+
) => PromiseOrValue<void>,
20+
) {
21+
this._items = [];
22+
this._stopped = false;
23+
this._resolvers = [];
24+
let result;
25+
try {
26+
result = executor(this._push.bind(this), this.stop.bind(this));
27+
} catch {
28+
// Ignore errors
29+
}
30+
if (isPromise(result)) {
31+
result.catch(() => {
32+
// Ignore errors
33+
});
34+
}
35+
}
36+
37+
currentBatch(): Generator<T> | undefined {
38+
if (this._items.length > 0) {
39+
return this.batch();
40+
}
41+
}
42+
43+
nextBatch(): Promise<Generator<T> | undefined> {
44+
if (this._items.length > 0) {
45+
return Promise.resolve(this.batch());
46+
}
47+
if (this._stopped) {
48+
return Promise.resolve(undefined);
49+
}
50+
const { promise, resolve } = promiseWithResolvers<
51+
Generator<T> | undefined
52+
>();
53+
this._resolvers.push(resolve);
54+
return promise;
55+
}
56+
57+
stop(): void {
58+
this._stopped = true;
59+
this._resolve(undefined);
60+
}
61+
62+
toAsyncGenerator<U>(
63+
mapFn: (generator: Generator<T>) => U | undefined,
64+
): AsyncGenerator<U, void, void> {
65+
return withCleanup(this.toAsyncGeneratorImpl(mapFn), () => this.stop());
66+
}
67+
68+
private async *toAsyncGeneratorImpl<U>(
69+
mapFn: (generator: Generator<T, void, void>) => U | undefined,
70+
): AsyncGenerator<U> {
71+
let batch = this.currentBatch();
72+
if (batch !== undefined) {
73+
const maybe = mapFn(batch);
74+
if (maybe !== undefined) {
75+
yield maybe;
76+
}
77+
}
78+
79+
if (this._stopped) {
80+
return;
81+
}
82+
83+
// eslint-disable-next-line no-await-in-loop
84+
while ((batch = await this.nextBatch()) !== undefined) {
85+
const maybe = mapFn(batch);
86+
if (maybe !== undefined) {
87+
yield maybe;
88+
}
89+
}
90+
}
91+
92+
private _push(item: T): void {
93+
if (this._stopped) {
94+
this._resolve(undefined);
95+
}
96+
this._items.push(item);
97+
this._resolve(this.batch());
98+
}
99+
100+
private _resolve(maybeIterable: Generator<T> | undefined): void {
101+
for (const resolve of this._resolvers) {
102+
resolve(maybeIterable);
103+
}
104+
this._resolvers = [];
105+
}
106+
107+
private *batch(): Generator<T> {
108+
let item: T | undefined;
109+
while ((item = this._items.shift()) !== undefined) {
110+
yield item;
111+
}
112+
}
113+
}

0 commit comments

Comments
 (0)