From 181979476a6f95b733a08e6e63f5b2975dc2a0b7 Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Tue, 28 Oct 2025 10:00:00 +0200 Subject: [PATCH 1/2] polish(Queue): further simplify algorithm further simplify Queue algorithm at the expense of potentially a few extra micro-tasks (which leads to further coalescing/batching of results, actually a side-benefit). --- src/execution/Queue.ts | 37 ++++---------- src/execution/__tests__/Queue-test.ts | 73 +++++++++++++++++++-------- 2 files changed, 64 insertions(+), 46 deletions(-) diff --git a/src/execution/Queue.ts b/src/execution/Queue.ts index 08047b3cd3..c582bd52e7 100644 --- a/src/execution/Queue.ts +++ b/src/execution/Queue.ts @@ -48,38 +48,23 @@ export class Queue { private async *subscribeImpl( mapFn: (generator: Generator) => U | undefined, ): AsyncGenerator { - while (true) { - if (this._stopped) { - return; - } - - let mapped; - // drain any items pushed prior to or between .next() calls - while ( - this._items.length > 0 && - (mapped = mapFn(this.batch())) !== undefined - ) { + let nextBatch: Generator | undefined; + // eslint-disable-next-line no-await-in-loop + while ((nextBatch = await this._nextBatch()) !== undefined) { + const mapped = mapFn(nextBatch); + if (mapped !== undefined) { yield mapped; - if (this._stopped) { - return; - } } - - // wait for a yield-able batch - do { - // eslint-disable-next-line no-await-in-loop - const nextBatch = await this._nextBatch(); - if (nextBatch === undefined || this._stopped) { - return; - } - mapped = mapFn(nextBatch); - } while (mapped === undefined); - - yield mapped; } } private _nextBatch(): Promise | undefined> { + if (this._stopped) { + return Promise.resolve(undefined); + } + if (this._items.length) { + return Promise.resolve(this.batch()); + } const { promise, resolve } = promiseWithResolvers< Generator | undefined >(); diff --git a/src/execution/__tests__/Queue-test.ts b/src/execution/__tests__/Queue-test.ts index 02c7533e4e..aa040ee6c1 100644 --- a/src/execution/__tests__/Queue-test.ts +++ b/src/execution/__tests__/Queue-test.ts @@ -31,9 +31,8 @@ describe('Queue', () => { expect(await sub.next()).to.deep.equal({ done: false, value: [1, 2, 3] }); }); - it('should yield multiple batches', async () => { + it('should yield sync and async pushed items in order', async () => { const queue = new Queue(async (push) => { - await resolveOnNextTick(); push(1); push(2); push(3); @@ -44,8 +43,51 @@ describe('Queue', () => { }); const sub = queue.subscribe((batch) => Array.from(batch)); - expect(await sub.next()).to.deep.equal({ done: false, value: [1, 2, 3] }); - expect(await sub.next()).to.deep.equal({ done: false, value: [4, 5, 6] }); + expect(await sub.next()).to.deep.equal({ + done: false, + value: [1, 2, 3, 4, 5, 6], + }); + }); + + it('should yield sync and async pushed items in order', async () => { + const queue = new Queue(async (push) => { + push(1); + push(2); + push(3); + // awaiting macro-task delay + await new Promise((r) => setTimeout(r)); + push(4); + push(5); + push(6); + }); + + const sub = queue.subscribe((batch) => Array.from(batch)); + expect(await sub.next()).to.deep.equal({ + done: false, + value: [1, 2, 3], + }); + expect(await sub.next()).to.deep.equal({ + done: false, + value: [4, 5, 6], + }); + }); + + it('should yield multiple async batches', async () => { + const queue = new Queue(async (push) => { + for (let i = 1; i <= 28; i += 3) { + // eslint-disable-next-line no-await-in-loop + await resolveOnNextTick(); + push(i); + push(i + 1); + push(i + 2); + } + }); + + const sub = queue.subscribe((batch) => Array.from(batch)[0]); + expect(await sub.next()).to.deep.equal({ done: false, value: 1 }); + expect(await sub.next()).to.deep.equal({ done: false, value: 4 }); + expect(await sub.next()).to.deep.equal({ done: false, value: 16 }); + expect(await sub.next()).to.deep.equal({ done: false, value: 28 }); }); it('should allow the executor to indicate completion', async () => { @@ -117,18 +159,11 @@ describe('Queue', () => { it('should skip payloads when mapped to undefined, skipping first async payload', async () => { const queue = new Queue(async (push) => { - await resolveOnNextTick(); - push(1); - await resolveOnNextTick(); - push(2); - await resolveOnNextTick(); - push(3); - await resolveOnNextTick(); - push(4); - await resolveOnNextTick(); - push(5); - await resolveOnNextTick(); - push(6); + for (let i = 1; i <= 14; i += 1) { + // eslint-disable-next-line no-await-in-loop + await resolveOnNextTick(); + push(i); + } }); const sub = queue.subscribe((batch) => { @@ -138,10 +173,8 @@ describe('Queue', () => { } }); expect(await sub.next()).to.deep.equal({ done: false, value: [2] }); - // [3, 4, 5] are batched as we await 2: - // - one tick for the [AsyncGeneratorResumeNext] job - // - one tick for the await within the withCleanUp next() - expect(await sub.next()).to.deep.equal({ done: false, value: [6] }); + expect(await sub.next()).to.deep.equal({ done: false, value: [8] }); + expect(await sub.next()).to.deep.equal({ done: false, value: [14] }); }); it('should condense pushes during map into the same batch', async () => { From c909d1dd04730453ea217ac7c1d7885439cc208b Mon Sep 17 00:00:00 2001 From: Yaacov Rydzinski Date: Tue, 28 Oct 2025 10:04:01 +0200 Subject: [PATCH 2/2] rename test --- src/execution/__tests__/Queue-test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/execution/__tests__/Queue-test.ts b/src/execution/__tests__/Queue-test.ts index aa040ee6c1..2096c9f28e 100644 --- a/src/execution/__tests__/Queue-test.ts +++ b/src/execution/__tests__/Queue-test.ts @@ -49,7 +49,7 @@ describe('Queue', () => { }); }); - it('should yield sync and async pushed items in order', async () => { + it('should yield sync and async pushed items in order, separated by macro-task boundary', async () => { const queue = new Queue(async (push) => { push(1); push(2);