1- export type Producer < ItemType > = ( ) => Promise < ItemType | null > ;
1+ export type Producer < ItemType > = ( ) => Promise < ItemType | null > ;
22export type Consumer < ItemType > = ( item : ItemType ) => Promise < void > ;
33
4+ /**
5+ * A bounded asynchronous queue that fills items via a producer
6+ * and consumes them via a consumer.
7+ */
48export class BoundedQueue < ItemType > {
59 private queue : ItemType [ ] = [ ] ;
6- private idlePromise ?: Promise < boolean > | null = null ;
7- private resolveIdle : ( value : boolean ) => void = ( ) => undefined ;
8- private endOfProduction = false ;
10+ private productionEnded = false ;
11+ // When the queue is full (for the producer) or empty (for the consumer),
12+ // the corresponding loop waits on one or more of these resolvers.
13+ private waiters : ( ( ) => void ) [ ] = [ ] ;
914
1015 constructor (
1116 private maxQueueSize : number ,
@@ -14,73 +19,98 @@ export class BoundedQueue<ItemType> {
1419 ) {
1520 }
1621
17- private async asyncFillQueue ( ) : Promise < void > {
18- do {
19- while ( this . queue . length < this . maxQueueSize ) {
20- const batch = await this . producer ( ) ;
21- if ( batch === null ) {
22- this . endOfProduction = true ;
23- this . wakeUp ( ) ;
24- return ;
25- }
26- this . queue . push ( batch ) ;
27- this . wakeUp ( ) ;
28- }
29- } while ( await this . idleWait ( ) ) ;
22+ /**
23+ * Notifies all waiting producers/consumers that the queue state has changed.
24+ */
25+ private notifyAll ( ) : void {
26+ for ( const resolve of this . waiters ) {
27+ resolve ( ) ;
28+ }
29+ this . waiters = [ ] ;
3030 }
3131
32- private async asyncEmptyQueue ( ) : Promise < void > {
33- do {
34- while ( this . queue . length > 0 ) {
35- const batchItem = this . queue . shift ( ) as ItemType ;
36- this . wakeUp ( ) ;
37- if ( batchItem === null ) {
38- if ( this . idlePromise ) {
39- this . idlePromise = null ;
40- this . resolveIdle ( false ) ;
41- }
42- return ;
43- }
44- await this . consumer ( batchItem ) ;
45- }
46- } while ( await this . idleWait ( ) ) ;
32+ /**
33+ * Returns a promise that resolves when a notification is sent.
34+ */
35+ private waitForNotification ( ) : Promise < void > {
36+ return new Promise ( resolve => {
37+ this . waiters . push ( resolve ) ;
38+ } ) ;
4739 }
4840
49- private wakeUp ( ) : void {
50- if ( this . idlePromise ) {
51- this . idlePromise = null ;
52- this . resolveIdle ( ! this . endOfProduction ) ;
41+ /**
42+ * The producer loop: repeatedly ask for new items until the producer
43+ * returns `null`. If the queue is full, wait until consumers have removed items.
44+ */
45+ private async produce ( ) : Promise < void > {
46+ while ( ! this . productionEnded ) {
47+ // Fill the queue until full.
48+ while ( this . queue . length < this . maxQueueSize && ! this . productionEnded ) {
49+ const item = await this . producer ( ) ;
50+ if ( item === null ) {
51+ this . productionEnded = true ;
52+ // Wake up any waiting consumers.
53+ this . notifyAll ( ) ;
54+ break ;
55+ }
56+ this . queue . push ( item ) ;
57+ this . notifyAll ( ) ;
58+ }
59+ // Wait until a consumer removes some items.
60+ if ( ! this . productionEnded ) {
61+ await this . waitForNotification ( ) ;
62+ }
5363 }
5464 }
5565
56- private async idleWait ( ) : Promise < boolean > {
57- if ( this . endOfProduction ) {
58- return false ;
66+ /**
67+ * The consumer loop: repeatedly removes items from the queue and
68+ * processes them. It keeps running until production ends and the queue is empty.
69+ */
70+ private async consume ( ) : Promise < void > {
71+ while ( ! this . productionEnded || this . queue . length > 0 ) {
72+ while ( this . queue . length > 0 ) {
73+ // Since the producer never enqueues null, we can safely assert the item exists.
74+ const item = this . queue . shift ( ) ! ;
75+ await this . consumer ( item ) ;
76+ this . notifyAll ( ) ;
77+ }
78+ // If production is complete and there are no items, exit.
79+ if ( this . productionEnded && this . queue . length === 0 ) {
80+ break ;
81+ }
82+ // Wait for new items to be enqueued.
83+ await this . waitForNotification ( ) ;
5984 }
60- this . idlePromise = new Promise ( resolve => {
61- this . resolveIdle = resolve ;
62- } ) ;
63- return this . idlePromise ;
6485 }
6586
6687 /**
67- * Number of items queued
88+ * Returns the current number of items in the queue.
6889 */
69- public length ( ) : number {
90+ public get length ( ) : number {
7091 return this . queue . length ;
7192 }
7293
94+ /**
95+ * Runs the producer and consumer loops concurrently until all work is done.
96+ */
7397 public async run ( ) : Promise < void > {
74- await Promise . all ( [ this . asyncFillQueue ( ) , this . asyncEmptyQueue ( ) ] ) ;
98+ await Promise . all ( [ this . produce ( ) , this . consume ( ) ] ) ;
7599 }
76100}
77101
78102/**
79- * @param maxQueueSize Maximum number of items that can be in the queue.
80- * @param producer A function that produces items to be added to the queue.
81- * @param consumer A function that consumes items from the queue.
82- * @returns {Promise<void> }
103+ * Creates and runs a bounded queue that uses the given producer and consumer.
104+ *
105+ * @param maxQueueSize - Maximum number of items allowed in the queue.
106+ * @param producer - A function producing items (or `null` when done).
107+ * @param consumer - A function that consumes an item.
108+ * @returns A promise that resolves when all production and consumption is complete.
83109 */
84- export function queue < ItemType > ( maxQueueSize : number , producer : Producer < ItemType > , consumer : Consumer < ItemType > ) : Promise < void > {
110+ export function queue < ItemType > (
111+ maxQueueSize : number ,
112+ producer : Producer < ItemType > ,
113+ consumer : Consumer < ItemType >
114+ ) : Promise < void > {
85115 return new BoundedQueue ( maxQueueSize , producer , consumer ) . run ( ) ;
86- }
116+ }
0 commit comments