Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 32 additions & 43 deletions src/execution/IncrementalGraph.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { BoxedPromiseOrValue } from '../jsutils/BoxedPromiseOrValue.js';
import { invariant } from '../jsutils/invariant.js';
import { isPromise } from '../jsutils/isPromise.js';
import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js';

import type { GraphQLError } from '../error/GraphQLError.js';

import { Queue } from './Queue.js';
import type {
DeferredFragmentRecord,
DeliveryGroup,
Expand All @@ -22,16 +22,18 @@ import { isDeferredFragmentRecord, isPendingExecutionGroup } from './types.js';
*/
export class IncrementalGraph {
private _rootNodes: Set<DeliveryGroup>;

private _completedQueue: Array<IncrementalDataRecordResult>;
private _nextQueue: Array<
(iterable: Iterable<IncrementalDataRecordResult> | undefined) => void
>;
private _completed: Queue<IncrementalDataRecordResult>;
// _push and _stop are assigned in the executor which is executed
// synchronously by the Queue constructor.
private _push!: (item: IncrementalDataRecordResult) => void;
private _stop!: () => void;

constructor() {
this._rootNodes = new Set();
this._completedQueue = [];
this._nextQueue = [];
this._completed = new Queue<IncrementalDataRecordResult>((push, stop) => {
this._push = push;
this._stop = stop;
});
}

getNewRootNodes(
Expand Down Expand Up @@ -92,29 +94,6 @@ export class IncrementalGraph {
}
}

*currentCompletedBatch(): Generator<IncrementalDataRecordResult> {
let completed;
while ((completed = this._completedQueue.shift()) !== undefined) {
yield completed;
}
}

nextCompletedBatch(): Promise<
Iterable<IncrementalDataRecordResult> | undefined
> {
const { promise, resolve } = promiseWithResolvers<
Iterable<IncrementalDataRecordResult> | undefined
>();
this._nextQueue.push(resolve);
return promise;
}

abort(): void {
for (const resolve of this._nextQueue) {
resolve(undefined);
}
}

hasNext(): boolean {
return this._rootNodes.size > 0;
}
Expand Down Expand Up @@ -146,17 +125,30 @@ export class IncrementalGraph {
const newRootNodes = this._promoteNonEmptyToRoot(
deferredFragmentRecord.children,
);
this._maybeStop();
return { newRootNodes, successfulExecutionGroups };
}

removeDeferredFragment(
deferredFragmentRecord: DeferredFragmentRecord,
): boolean {
return this._rootNodes.delete(deferredFragmentRecord);
const deleted = this._rootNodes.delete(deferredFragmentRecord);
if (!deleted) {
return false;
}
this._maybeStop();
return true;
}

removeStream(streamRecord: StreamRecord): void {
this._rootNodes.delete(streamRecord);
this._maybeStop();
}

subscribe<U>(
mapFn: (generator: Generator<IncrementalDataRecordResult>) => U | undefined,
): AsyncGenerator<U, void, void> {
return this._completed.subscribe(mapFn);
}

private _addIncrementalDataRecords(
Expand Down Expand Up @@ -246,9 +238,9 @@ export class IncrementalGraph {
const value = completedExecutionGroup.value;
if (isPromise(value)) {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
value.then((resolved) => this._enqueue(resolved));
value.then((resolved) => this._push(resolved));
} else {
this._enqueue(value);
this._push(value);
}
}

Expand All @@ -266,7 +258,7 @@ export class IncrementalGraph {
: streamItemRecord().value;
if (isPromise(result)) {
if (items.length > 0) {
this._enqueue({
this._push({
streamRecord,
result:
// TODO add additional test case or rework for coverage
Expand All @@ -290,14 +282,14 @@ export class IncrementalGraph {
}
if (result.item === undefined) {
if (items.length > 0) {
this._enqueue({
this._push({
streamRecord,
result: errors.length > 0 ? { items, errors } : { items },
newDeferredFragmentRecords,
incrementalDataRecords,
});
}
this._enqueue(
this._push(
result.errors === undefined
? { streamRecord }
: {
Expand All @@ -320,12 +312,9 @@ export class IncrementalGraph {
}
}

private _enqueue(completed: IncrementalDataRecordResult): void {
this._completedQueue.push(completed);
const next = this._nextQueue.shift();
if (next === undefined) {
return;
private _maybeStop(): void {
if (!this.hasNext()) {
this._stop();
}
next(this.currentCompletedBatch());
}
}
91 changes: 38 additions & 53 deletions src/execution/IncrementalPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,11 @@ interface SubsequentIncrementalExecutionResultContext {
* @internal
*/
class IncrementalPublisher {
private _isDone: boolean;
private _context: IncrementalPublisherContext;
private _nextId: number;
private _incrementalGraph: IncrementalGraph;

constructor(context: IncrementalPublisherContext) {
this._isDone = false;
this._context = context;
this._nextId = 0;
this._incrementalGraph = new IncrementalGraph();
Expand All @@ -95,14 +93,17 @@ class IncrementalPublisher {
? { errors, data, pending, hasNext: true }
: { data, pending, hasNext: true };

const subsequentResults = withCleanup(this._subscribe(), async () => {
this._isDone = true;
this._context.abortSignalListener?.disconnect();
this._incrementalGraph.abort();
await this._returnAsyncIteratorsIgnoringErrors();
});
const subsequentResults = this._incrementalGraph.subscribe((batch) =>
this._handleCompletedBatch(batch),
);

return { initialResult, subsequentResults };
return {
initialResult,
subsequentResults: withCleanup(subsequentResults, async () => {
this._context.abortSignalListener?.disconnect();
await this._returnAsyncIteratorsIgnoringErrors();
}),
};
}

private _toPendingResults(
Expand All @@ -128,55 +129,39 @@ class IncrementalPublisher {
return String(this._nextId++);
}

private async *_subscribe(): AsyncGenerator<
SubsequentIncrementalExecutionResult,
void,
void
> {
while (!this._isDone) {
const context: SubsequentIncrementalExecutionResultContext = {
pending: [],
incremental: [],
completed: [],
};

let batch: Iterable<IncrementalDataRecordResult> | undefined =
this._incrementalGraph.currentCompletedBatch();
do {
for (const completedResult of batch) {
this._handleCompletedIncrementalData(completedResult, context);
}

const { incremental, completed } = context;
if (incremental.length > 0 || completed.length > 0) {
const hasNext = this._incrementalGraph.hasNext();

if (!hasNext) {
this._isDone = true;
}
private _handleCompletedBatch(
batch: Iterable<IncrementalDataRecordResult>,
): SubsequentIncrementalExecutionResult | undefined {
const context: SubsequentIncrementalExecutionResultContext = {
pending: [],
incremental: [],
completed: [],
};

const subsequentIncrementalExecutionResult: SubsequentIncrementalExecutionResult =
{ hasNext };
for (const completedResult of batch) {
this._handleCompletedIncrementalData(completedResult, context);
}

const pending = context.pending;
if (pending.length > 0) {
subsequentIncrementalExecutionResult.pending = pending;
}
if (incremental.length > 0) {
subsequentIncrementalExecutionResult.incremental = incremental;
}
if (completed.length > 0) {
subsequentIncrementalExecutionResult.completed = completed;
}
const { incremental, completed } = context;
if (incremental.length === 0 && completed.length === 0) {
return;
}

yield subsequentIncrementalExecutionResult;
break;
}
const hasNext = this._incrementalGraph.hasNext();

// eslint-disable-next-line no-await-in-loop
batch = await this._incrementalGraph.nextCompletedBatch();
} while (batch !== undefined);
const subsequentIncrementalExecutionResult: SubsequentIncrementalExecutionResult =
{ hasNext };
const pending = context.pending;
if (pending.length > 0) {
subsequentIncrementalExecutionResult.pending = pending;
}
if (incremental.length > 0) {
subsequentIncrementalExecutionResult.incremental = incremental;
}
if (completed.length > 0) {
subsequentIncrementalExecutionResult.completed = completed;
}
return subsequentIncrementalExecutionResult;
}

private _handleCompletedIncrementalData(
Expand Down
108 changes: 108 additions & 0 deletions src/execution/Queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import { isPromise } from '../jsutils/isPromise.js';
import type { PromiseOrValue } from '../jsutils/PromiseOrValue.js';
import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js';

import { withCleanup } from './withCleanup.js';

/**
* @internal
*/
export class Queue<T> {
private _items: Array<T>;
private _stopped: boolean;
private _resolvers: Array<(iterable: Generator<T> | undefined) => void>;

constructor(
executor: (
push: (item: T) => void,
stop: () => void,
) => PromiseOrValue<void>,
) {
this._items = [];
this._stopped = false;
this._resolvers = [];
let result;
try {
result = executor(this._push.bind(this), this.stop.bind(this));
} catch {
// ignore sync executor errors
}
if (isPromise(result)) {
result.catch(() => {
/* ignore async executor errors */
});
}
}

stop(): void {
this._stopped = true;
this._resolve(undefined);
}

subscribe<U>(
mapFn: (generator: Generator<T>) => U | undefined,
): AsyncGenerator<U, void, void> {
return withCleanup(this.subscribeImpl(mapFn), () => this.stop());
}

private async *subscribeImpl<U>(
mapFn: (generator: Generator<T, void, void>) => U | undefined,
): AsyncGenerator<U> {
while (true) {
if (this._stopped) {
return;
}

let mapped;
// drain any items pushed prior to or between .next() calls
while (
this._items.length > 0 &&
(mapped = mapFn(this.batch())) !== undefined
) {
yield mapped;
if (this._stopped) {
return;
}
}

// wait for a yield-able batch
do {
// eslint-disable-next-line no-await-in-loop
const nextBatch = await this._nextBatch();
if (nextBatch === undefined || this._stopped) {
return;
}
mapped = mapFn(nextBatch);
} while (mapped === undefined);

yield mapped;
}
}

private _nextBatch(): Promise<Generator<T> | undefined> {
const { promise, resolve } = promiseWithResolvers<
Generator<T> | undefined
>();
this._resolvers.push(resolve);
return promise;
}

private _push(item: T): void {
this._items.push(item);
this._resolve(this.batch());
}

private _resolve(maybeIterable: Generator<T> | undefined): void {
for (const resolve of this._resolvers) {
resolve(maybeIterable);
}
this._resolvers = [];
}

private *batch(): Generator<T> {
let item: T | undefined;
while ((item = this._items.shift()) !== undefined) {
yield item;
}
}
}
Loading
Loading