Skip to content

Commit 19684da

Browse files
committed
polish(queue): only subscribe needs to be public
1 parent 3f71627 commit 19684da

File tree

4 files changed

+139
-92
lines changed

4 files changed

+139
-92
lines changed

src/execution/IncrementalGraph.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,8 +145,10 @@ export class IncrementalGraph {
145145
this._maybeStop();
146146
}
147147

148-
subscribe(): Queue<IncrementalDataRecordResult> {
149-
return this._completed;
148+
subscribe<U>(
149+
mapFn: (generator: Generator<IncrementalDataRecordResult>) => U | undefined,
150+
): AsyncGenerator<U, void, void> {
151+
return this._completed.subscribe(mapFn);
150152
}
151153

152154
private _addIncrementalDataRecords(

src/execution/IncrementalPublisher.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,9 +93,9 @@ class IncrementalPublisher {
9393
? { errors, data, pending, hasNext: true }
9494
: { data, pending, hasNext: true };
9595

96-
const subsequentResults = this._incrementalGraph
97-
.subscribe()
98-
.toAsyncGenerator((batch) => this._handleCompletedBatch(batch));
96+
const subsequentResults = this._incrementalGraph.subscribe((batch) =>
97+
this._handleCompletedBatch(batch),
98+
);
9999

100100
return {
101101
initialResult,

src/execution/Queue.ts

Lines changed: 44 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -25,70 +25,78 @@ export class Queue<T> {
2525
try {
2626
result = executor(this._push.bind(this), this.stop.bind(this));
2727
} catch {
28-
// Ignore errors
28+
// Ignore actual error
29+
this.stop();
2930
}
3031
if (isPromise(result)) {
3132
result.catch(() => {
32-
// Ignore errors
33+
// Ignore actual error
34+
this.stop();
3335
});
3436
}
3537
}
3638

37-
currentBatch(): Generator<T> | undefined {
38-
if (this._items.length > 0) {
39-
return this.batch();
40-
}
41-
}
42-
43-
nextBatch(): Promise<Generator<T> | undefined> {
44-
if (this._items.length > 0) {
45-
return Promise.resolve(this.batch());
46-
}
47-
if (this._stopped) {
48-
return Promise.resolve(undefined);
49-
}
50-
const { promise, resolve } = promiseWithResolvers<
51-
Generator<T> | undefined
52-
>();
53-
this._resolvers.push(resolve);
54-
return promise;
55-
}
56-
5739
stop(): void {
5840
this._stopped = true;
5941
this._resolve(undefined);
6042
}
6143

62-
toAsyncGenerator<U>(
44+
subscribe<U>(
6345
mapFn: (generator: Generator<T>) => U | undefined,
6446
): AsyncGenerator<U, void, void> {
65-
return withCleanup(this.toAsyncGeneratorImpl(mapFn), () => this.stop());
47+
return withCleanup(this.subscribeImpl(mapFn), () => this.stop());
6648
}
6749

68-
private async *toAsyncGeneratorImpl<U>(
50+
private async *subscribeImpl<U>(
6951
mapFn: (generator: Generator<T, void, void>) => U | undefined,
7052
): AsyncGenerator<U> {
71-
let batch = this.currentBatch();
72-
if (batch !== undefined) {
73-
const maybe = mapFn(batch);
74-
if (maybe !== undefined) {
75-
yield maybe;
76-
}
77-
}
78-
7953
if (this._stopped) {
8054
return;
8155
}
8256

57+
while (this._items.length > 0) {
58+
const maybe = mapFn(this.batch());
59+
if (maybe === undefined) {
60+
continue;
61+
}
62+
yield maybe;
63+
if (this._stopped) {
64+
return;
65+
}
66+
}
67+
68+
let batch: Generator<T> | undefined;
8369
// eslint-disable-next-line no-await-in-loop
84-
while ((batch = await this.nextBatch()) !== undefined) {
85-
const maybe = mapFn(batch);
86-
if (maybe !== undefined) {
70+
while ((batch = await this._nextBatch()) !== undefined) {
71+
let maybe = mapFn(batch);
72+
if (maybe === undefined) {
73+
continue;
74+
}
75+
yield maybe;
76+
if (this._stopped) {
77+
return;
78+
}
79+
while (this._items.length > 0) {
80+
maybe = mapFn(this.batch());
81+
if (maybe === undefined) {
82+
continue;
83+
}
8784
yield maybe;
85+
if (this._stopped) {
86+
return;
87+
}
8888
}
8989
}
9090
}
9191

92+
private _nextBatch(): Promise<Generator<T> | undefined> {
93+
const { promise, resolve } = promiseWithResolvers<
94+
Generator<T> | undefined
95+
>();
96+
this._resolvers.push(resolve);
97+
return promise;
98+
}
99+
92100
private _push(item: T): void {
93101
if (this._stopped) {
94102
this._resolve(undefined);
Lines changed: 88 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,70 +1,61 @@
1-
import { assert, expect } from 'chai';
1+
import { expect } from 'chai';
22
import { describe, it } from 'mocha';
33

4+
import { resolveOnNextTick } from '../../__testUtils__/resolveOnNextTick.js';
5+
46
import { promiseWithResolvers } from '../../jsutils/promiseWithResolvers.js';
57

68
import { Queue } from '../Queue.js';
79

810
describe('Queue', () => {
9-
it('should yield sync pushed items in order', () => {
11+
it('should yield sync pushed items in order', async () => {
1012
const queue = new Queue<number>((push) => {
1113
push(1);
1214
push(2);
1315
push(3);
1416
});
1517

16-
const batch1 = queue.currentBatch();
17-
assert(batch1 !== undefined);
18-
expect(Array.from(batch1)).to.deep.equal([1, 2, 3]);
18+
const sub = queue.subscribe((batch) => Array.from(batch));
19+
expect(await sub.next()).to.deep.equal({ done: false, value: [1, 2, 3] });
1920
});
2021

2122
it('should yield async pushed items in order', async () => {
2223
const queue = new Queue<number>(async (push) => {
23-
await Promise.resolve();
24+
await resolveOnNextTick();
2425
push(1);
2526
push(2);
2627
push(3);
2728
});
2829

29-
const batch1 = queue.currentBatch();
30-
expect(batch1).to.deep.equal(undefined);
31-
const batch2 = await queue.nextBatch();
32-
assert(batch2 !== undefined);
33-
expect(Array.from(batch2)).to.deep.equal([1, 2, 3]);
30+
const sub = queue.subscribe((batch) => Array.from(batch));
31+
expect(await sub.next()).to.deep.equal({ done: false, value: [1, 2, 3] });
3432
});
3533

3634
it('should yield multiple batches', async () => {
3735
const queue = new Queue<number>(async (push) => {
38-
await Promise.resolve();
36+
await resolveOnNextTick();
3937
push(1);
4038
push(2);
4139
push(3);
42-
await Promise.resolve();
40+
await resolveOnNextTick();
4341
push(4);
4442
push(5);
4543
push(6);
4644
});
4745

48-
const batch1 = queue.currentBatch();
49-
expect(batch1).to.equal(undefined);
50-
const batch2 = await queue.nextBatch();
51-
assert(batch2 !== undefined);
52-
expect(Array.from(batch2)).to.deep.equal([1, 2, 3]);
53-
const batch3 = await queue.nextBatch();
54-
assert(batch3 !== undefined);
55-
expect(Array.from(batch3)).to.deep.equal([4, 5, 6]);
46+
const sub = queue.subscribe((batch) => Array.from(batch));
47+
expect(await sub.next()).to.deep.equal({ done: false, value: [1, 2, 3] });
48+
expect(await sub.next()).to.deep.equal({ done: false, value: [4, 5, 6] });
5649
});
5750

5851
it('should allow the executor to indicate completion', async () => {
5952
const queue = new Queue<number>(async (_push, stop) => {
60-
await Promise.resolve();
53+
await resolveOnNextTick();
6154
stop();
6255
});
6356

64-
const batch1 = queue.currentBatch();
65-
expect(batch1).to.equal(undefined);
66-
const batch2 = await queue.nextBatch();
67-
expect(batch2).to.equal(undefined);
57+
const sub = queue.subscribe((batch) => batch);
58+
expect(await sub.next()).to.deep.equal({ done: true, value: undefined });
6859
});
6960

7061
it('should allow a consumer to abort a pending call to nextBatch', async () => {
@@ -74,11 +65,10 @@ describe('Queue', () => {
7465
await promise;
7566
});
7667

77-
const batch1 = queue.currentBatch();
78-
expect(batch1).to.equal(undefined);
79-
const batch2Promise = queue.nextBatch();
68+
const sub = queue.subscribe((batch) => batch);
69+
const nextPromise = sub.next();
8070
queue.stop();
81-
expect(await batch2Promise).to.equal(undefined);
71+
expect(await nextPromise).to.deep.equal({ done: true, value: undefined });
8272
});
8373

8474
it('should allow saving the push function', async () => {
@@ -87,41 +77,88 @@ describe('Queue', () => {
8777
push = _push;
8878
});
8979

90-
const batch1 = queue.currentBatch();
91-
expect(batch1).to.equal(undefined);
92-
93-
const batch2Promise = queue.nextBatch();
80+
const sub = queue.subscribe((batch) => Array.from(batch));
9481

95-
await Promise.resolve();
82+
await resolveOnNextTick();
9683
push(1);
9784
push(2);
9885
push(3);
9986

100-
const batch2 = await batch2Promise;
101-
assert(batch2 !== undefined);
102-
expect(Array.from(batch2)).to.deep.equal([1, 2, 3]);
87+
expect(await sub.next()).to.deep.equal({ done: false, value: [1, 2, 3] });
10388
});
10489

105-
it('should ignore sync errors in the executor', async () => {
90+
it('should stop on sync error in the executor', async () => {
10691
const queue = new Queue<number>(() => {
10792
throw new Error('Oops');
10893
});
109-
const batch1 = queue.currentBatch();
110-
expect(batch1).to.equal(undefined);
111-
const batch2Promise = queue.nextBatch();
112-
queue.stop();
113-
expect(await batch2Promise).to.equal(undefined);
94+
95+
const sub = queue.subscribe((batch) => Array.from(batch));
96+
expect(await sub.next()).to.deep.equal({ done: true, value: undefined });
11497
});
11598

116-
it('should ignore async errors in the executor', async () => {
99+
it('should stop on async errors in the executor', async () => {
117100
const queue = new Queue<number>(async () => {
118-
await Promise.resolve();
101+
await resolveOnNextTick();
119102
throw new Error('Oops');
120103
});
121-
const batch1 = queue.currentBatch();
122-
expect(batch1).to.equal(undefined);
123-
const batch2Promise = queue.nextBatch();
124-
queue.stop();
125-
expect(await batch2Promise).to.equal(undefined);
104+
105+
const sub = queue.subscribe((batch) => Array.from(batch));
106+
expect(await sub.next()).to.deep.equal({ done: true, value: undefined });
107+
});
108+
109+
it('should skip payloads when mapped to undefined, skipping first async payload', async () => {
110+
const queue = new Queue<number>(async (push) => {
111+
await resolveOnNextTick();
112+
push(1);
113+
await resolveOnNextTick();
114+
push(2);
115+
await resolveOnNextTick();
116+
push(3);
117+
await resolveOnNextTick();
118+
push(4);
119+
await resolveOnNextTick();
120+
push(5);
121+
await resolveOnNextTick();
122+
push(6);
123+
});
124+
125+
const sub = queue.subscribe((batch) => {
126+
const arr = Array.from(batch);
127+
if (arr[0] % 2 === 0) {
128+
return arr;
129+
}
130+
});
131+
expect(await sub.next()).to.deep.equal({ done: false, value: [2] });
132+
// [3, 4, 5] are batched as we await 2:
133+
// - one tick for the [AsyncGeneratorResumeNext] job
134+
// - one tick for the await within the withCleanUp next()
135+
expect(await sub.next()).to.deep.equal({ done: false, value: [6] });
136+
});
137+
138+
it('should skip payloads when mapped to undefined, skipping second async payload', async () => {
139+
const queue = new Queue<number>(async (push) => {
140+
await resolveOnNextTick();
141+
push(0);
142+
await resolveOnNextTick();
143+
push(1);
144+
await resolveOnNextTick();
145+
push(2);
146+
await resolveOnNextTick();
147+
push(3);
148+
await resolveOnNextTick();
149+
push(4);
150+
});
151+
152+
const sub = queue.subscribe((batch) => {
153+
const arr = Array.from(batch);
154+
if (arr[0] % 2 === 0) {
155+
return arr;
156+
}
157+
});
158+
expect(await sub.next()).to.deep.equal({ done: false, value: [0] });
159+
// [1, 2, 3] are batched as we await 0
160+
// - one tick for the [AsyncGeneratorResumeNext] job
161+
// - one tick for the await within the withCleanUp next()
162+
expect(await sub.next()).to.deep.equal({ done: false, value: [4] });
126163
});
127164
});

0 commit comments

Comments
 (0)