Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 11 additions & 26 deletions src/execution/Queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,38 +48,23 @@ export class Queue<T> {
private async *subscribeImpl<U>(
mapFn: (generator: Generator<T, void, void>) => U | undefined,
): AsyncGenerator<U> {
while (true) {
if (this._stopped) {
return;
}

let mapped;
// drain any items pushed prior to or between .next() calls
while (
this._items.length > 0 &&
(mapped = mapFn(this.batch())) !== undefined
) {
let nextBatch: Generator<T> | undefined;
// eslint-disable-next-line no-await-in-loop
while ((nextBatch = await this._nextBatch()) !== undefined) {
const mapped = mapFn(nextBatch);
if (mapped !== undefined) {
yield mapped;
if (this._stopped) {
return;
}
}

// wait for a yield-able batch
do {
// eslint-disable-next-line no-await-in-loop
const nextBatch = await this._nextBatch();
if (nextBatch === undefined || this._stopped) {
return;
}
mapped = mapFn(nextBatch);
} while (mapped === undefined);

yield mapped;
}
}

private _nextBatch(): Promise<Generator<T> | undefined> {
if (this._stopped) {
return Promise.resolve(undefined);
}
if (this._items.length) {
return Promise.resolve(this.batch());
}
const { promise, resolve } = promiseWithResolvers<
Generator<T> | undefined
>();
Expand Down
73 changes: 53 additions & 20 deletions src/execution/__tests__/Queue-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,8 @@ describe('Queue', () => {
expect(await sub.next()).to.deep.equal({ done: false, value: [1, 2, 3] });
});

it('should yield multiple batches', async () => {
it('should yield sync and async pushed items in order', async () => {
const queue = new Queue<number>(async (push) => {
await resolveOnNextTick();
push(1);
push(2);
push(3);
Expand All @@ -44,8 +43,51 @@ describe('Queue', () => {
});

const sub = queue.subscribe((batch) => Array.from(batch));
expect(await sub.next()).to.deep.equal({ done: false, value: [1, 2, 3] });
expect(await sub.next()).to.deep.equal({ done: false, value: [4, 5, 6] });
expect(await sub.next()).to.deep.equal({
done: false,
value: [1, 2, 3, 4, 5, 6],
});
});

it('should yield sync and async pushed items in order, separated by macro-task boundary', async () => {
const queue = new Queue<number>(async (push) => {
push(1);
push(2);
push(3);
// awaiting macro-task delay
await new Promise((r) => setTimeout(r));
push(4);
push(5);
push(6);
});

const sub = queue.subscribe((batch) => Array.from(batch));
expect(await sub.next()).to.deep.equal({
done: false,
value: [1, 2, 3],
});
expect(await sub.next()).to.deep.equal({
done: false,
value: [4, 5, 6],
});
});

it('should yield multiple async batches', async () => {
const queue = new Queue<number>(async (push) => {
for (let i = 1; i <= 28; i += 3) {
// eslint-disable-next-line no-await-in-loop
await resolveOnNextTick();
push(i);
push(i + 1);
push(i + 2);
}
});

const sub = queue.subscribe((batch) => Array.from(batch)[0]);
expect(await sub.next()).to.deep.equal({ done: false, value: 1 });
expect(await sub.next()).to.deep.equal({ done: false, value: 4 });
expect(await sub.next()).to.deep.equal({ done: false, value: 16 });
expect(await sub.next()).to.deep.equal({ done: false, value: 28 });
});

it('should allow the executor to indicate completion', async () => {
Expand Down Expand Up @@ -117,18 +159,11 @@ describe('Queue', () => {

it('should skip payloads when mapped to undefined, skipping first async payload', async () => {
const queue = new Queue<number>(async (push) => {
await resolveOnNextTick();
push(1);
await resolveOnNextTick();
push(2);
await resolveOnNextTick();
push(3);
await resolveOnNextTick();
push(4);
await resolveOnNextTick();
push(5);
await resolveOnNextTick();
push(6);
for (let i = 1; i <= 14; i += 1) {
// eslint-disable-next-line no-await-in-loop
await resolveOnNextTick();
push(i);
}
});

const sub = queue.subscribe((batch) => {
Expand All @@ -138,10 +173,8 @@ describe('Queue', () => {
}
});
expect(await sub.next()).to.deep.equal({ done: false, value: [2] });
// [3, 4, 5] are batched as we await 2:
// - one tick for the [AsyncGeneratorResumeNext] job
// - one tick for the await within the withCleanUp next()
expect(await sub.next()).to.deep.equal({ done: false, value: [6] });
expect(await sub.next()).to.deep.equal({ done: false, value: [8] });
expect(await sub.next()).to.deep.equal({ done: false, value: [14] });
});

it('should condense pushes during map into the same batch', async () => {
Expand Down
Loading