Skip to content

Commit 359b1cb

Browse files
committed
Replace the single “idlePromise” and “resolveIdle” mechanism with a small notification system that supports multiple waiters.
1 parent 4348706 commit 359b1cb

File tree

2 files changed

+84
-53
lines changed

2 files changed

+84
-53
lines changed

biome.jsonc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@
3333
"style":{
3434
"useConsistentBuiltinInstantiation": "error",
3535
"useThrowNewError": "error",
36-
"useThrowOnlyError": "error"
36+
"useThrowOnlyError": "error",
37+
"noNonNullAssertion": "off"
3738
}
3839
}
3940
},

src/bounded-queue.ts

Lines changed: 82 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
1-
export type Producer<ItemType> = () => Promise<ItemType| null>;
1+
export type Producer<ItemType> = () => Promise<ItemType | null>;
22
export type Consumer<ItemType> = (item: ItemType) => Promise<void>;
33

4+
/**
5+
* A bounded asynchronous queue that fills items via a producer
6+
* and consumes them via a consumer.
7+
*/
48
export class BoundedQueue<ItemType> {
59
private queue: ItemType[] = [];
6-
private idlePromise?: Promise<boolean> | null = null;
7-
private resolveIdle: (value: boolean) => void = () => undefined;
8-
private endOfProduction = false;
10+
private productionEnded = false;
11+
// When the queue is full (for the producer) or empty (for the consumer),
12+
// the corresponding loop waits on one or more of these resolvers.
13+
private waiters: (() => void)[] = [];
914

1015
constructor(
1116
private maxQueueSize: number,
@@ -14,73 +19,98 @@ export class BoundedQueue<ItemType> {
1419
) {
1520
}
1621

17-
private async asyncFillQueue(): Promise<void> {
18-
do {
19-
while (this.queue.length < this.maxQueueSize) {
20-
const batch = await this.producer();
21-
if (batch === null) {
22-
this.endOfProduction = true;
23-
this.wakeUp();
24-
return;
25-
}
26-
this.queue.push(batch);
27-
this.wakeUp();
28-
}
29-
} while (await this.idleWait());
22+
/**
23+
* Notifies all waiting producers/consumers that the queue state has changed.
24+
*/
25+
private notifyAll(): void {
26+
for (const resolve of this.waiters) {
27+
resolve();
28+
}
29+
this.waiters = [];
3030
}
3131

32-
private async asyncEmptyQueue(): Promise<void> {
33-
do {
34-
while (this.queue.length > 0) {
35-
const batchItem = this.queue.shift() as ItemType;
36-
this.wakeUp();
37-
if (batchItem === null) {
38-
if (this.idlePromise) {
39-
this.idlePromise = null;
40-
this.resolveIdle(false);
41-
}
42-
return;
43-
}
44-
await this.consumer(batchItem);
45-
}
46-
} while (await this.idleWait());
32+
/**
33+
* Returns a promise that resolves when a notification is sent.
34+
*/
35+
private waitForNotification(): Promise<void> {
36+
return new Promise(resolve => {
37+
this.waiters.push(resolve);
38+
});
4739
}
4840

49-
private wakeUp(): void {
50-
if (this.idlePromise) {
51-
this.idlePromise = null;
52-
this.resolveIdle(!this.endOfProduction);
41+
/**
42+
* The producer loop: repeatedly ask for new items until the producer
43+
* returns `null`. If the queue is full, wait until consumers have removed items.
44+
*/
45+
private async produce(): Promise<void> {
46+
while (!this.productionEnded) {
47+
// Fill the queue until full.
48+
while (this.queue.length < this.maxQueueSize && !this.productionEnded) {
49+
const item = await this.producer();
50+
if (item === null) {
51+
this.productionEnded = true;
52+
// Wake up any waiting consumers.
53+
this.notifyAll();
54+
break;
55+
}
56+
this.queue.push(item);
57+
this.notifyAll();
58+
}
59+
// Wait until a consumer removes some items.
60+
if (!this.productionEnded) {
61+
await this.waitForNotification();
62+
}
5363
}
5464
}
5565

56-
private async idleWait(): Promise<boolean> {
57-
if (this.endOfProduction) {
58-
return false;
66+
/**
67+
* The consumer loop: repeatedly removes items from the queue and
68+
* processes them. It keeps running until production ends and the queue is empty.
69+
*/
70+
private async consume(): Promise<void> {
71+
while (!this.productionEnded || this.queue.length > 0) {
72+
while (this.queue.length > 0) {
73+
// Since the producer never enqueues null, we can safely assert the item exists.
74+
const item = this.queue.shift()!;
75+
await this.consumer(item);
76+
this.notifyAll();
77+
}
78+
// If production is complete and there are no items, exit.
79+
if (this.productionEnded && this.queue.length === 0) {
80+
break;
81+
}
82+
// Wait for new items to be enqueued.
83+
await this.waitForNotification();
5984
}
60-
this.idlePromise = new Promise(resolve => {
61-
this.resolveIdle = resolve;
62-
});
63-
return this.idlePromise;
6485
}
6586

6687
/**
67-
* Number of items queued
88+
* Returns the current number of items in the queue.
6889
*/
69-
public length(): number {
90+
public get length(): number {
7091
return this.queue.length;
7192
}
7293

94+
/**
95+
* Runs the producer and consumer loops concurrently until all work is done.
96+
*/
7397
public async run(): Promise<void> {
74-
await Promise.all([this.asyncFillQueue(), this.asyncEmptyQueue()]);
98+
await Promise.all([this.produce(), this.consume()]);
7599
}
76100
}
77101

78102
/**
79-
* @param maxQueueSize Maximum number of items that can be in the queue.
80-
* @param producer A function that produces items to be added to the queue.
81-
* @param consumer A function that consumes items from the queue.
82-
* @returns {Promise<void>}
103+
* Creates and runs a bounded queue that uses the given producer and consumer.
104+
*
105+
* @param maxQueueSize - Maximum number of items allowed in the queue.
106+
* @param producer - A function producing items (or `null` when done).
107+
* @param consumer - A function that consumes an item.
108+
* @returns A promise that resolves when all production and consumption is complete.
83109
*/
84-
export function queue<ItemType>(maxQueueSize: number, producer: Producer<ItemType>, consumer: Consumer<ItemType>): Promise<void> {
110+
export function queue<ItemType>(
111+
maxQueueSize: number,
112+
producer: Producer<ItemType>,
113+
consumer: Consumer<ItemType>
114+
): Promise<void> {
85115
return new BoundedQueue(maxQueueSize, producer, consumer).run();
86-
}
116+
}

0 commit comments

Comments
 (0)