File tree Expand file tree Collapse file tree 2 files changed +20
-55
lines changed
Expand file tree Collapse file tree 2 files changed +20
-55
lines changed Original file line number Diff line number Diff line change @@ -50,42 +50,34 @@ export class Queue<T> {
5050 private async * subscribeImpl < U > (
5151 mapFn : ( generator : Generator < T , void , void > ) => U | undefined ,
5252 ) : AsyncGenerator < U > {
53- if ( this . _stopped ) {
54- return ;
55- }
56-
57- while ( this . _items . length > 0 ) {
58- const maybe = mapFn ( this . batch ( ) ) ;
59- if ( maybe === undefined ) {
60- continue ;
61- }
62- yield maybe ;
53+ while ( true ) {
6354 if ( this . _stopped ) {
6455 return ;
6556 }
66- }
6757
68- let batch : Generator < T > | undefined ;
69- // eslint-disable-next-line no-await-in-loop
70- while ( ( batch = await this . _nextBatch ( ) ) !== undefined ) {
71- let maybe = mapFn ( batch ) ;
72- if ( maybe === undefined ) {
73- continue ;
74- }
75- yield maybe ;
76- if ( this . _stopped ) {
77- return ;
78- }
79- while ( this . _items . length > 0 ) {
80- maybe = mapFn ( this . batch ( ) ) ;
81- if ( maybe === undefined ) {
82- continue ;
83- }
84- yield maybe ;
58+ let mapped ;
59+ // drain any items pushed prior to or between .next() calls
60+ while (
61+ this . _items . length > 0 &&
62+ ( mapped = mapFn ( this . batch ( ) ) ) !== undefined
63+ ) {
64+ yield mapped ;
8565 if ( this . _stopped ) {
8666 return ;
8767 }
8868 }
69+
70+ // wait for a yield-able batch
71+ do {
72+ // eslint-disable-next-line no-await-in-loop
73+ const nextBatch = await this . _nextBatch ( ) ;
74+ if ( nextBatch === undefined || this . _stopped ) {
75+ return ;
76+ }
77+ mapped = mapFn ( nextBatch ) ;
78+ } while ( mapped === undefined ) ;
79+
80+ yield mapped ;
8981 }
9082 }
9183
Original file line number Diff line number Diff line change @@ -135,33 +135,6 @@ describe('Queue', () => {
135135 expect ( await sub . next ( ) ) . to . deep . equal ( { done : false , value : [ 6 ] } ) ;
136136 } ) ;
137137
138- it ( 'should skip payloads when mapped to undefined, skipping second async payload' , async ( ) => {
139- const queue = new Queue < number > ( async ( push ) => {
140- await resolveOnNextTick ( ) ;
141- push ( 0 ) ;
142- await resolveOnNextTick ( ) ;
143- push ( 1 ) ;
144- await resolveOnNextTick ( ) ;
145- push ( 2 ) ;
146- await resolveOnNextTick ( ) ;
147- push ( 3 ) ;
148- await resolveOnNextTick ( ) ;
149- push ( 4 ) ;
150- } ) ;
151-
152- const sub = queue . subscribe ( ( batch ) => {
153- const arr = Array . from ( batch ) ;
154- if ( arr [ 0 ] % 2 === 0 ) {
155- return arr ;
156- }
157- } ) ;
158- expect ( await sub . next ( ) ) . to . deep . equal ( { done : false , value : [ 0 ] } ) ;
159- // [1, 2, 3] are batched as we await 0
160- // - one tick for the [AsyncGeneratorResumeNext] job
161- // - one tick for the await within the withCleanUp next()
162- expect ( await sub . next ( ) ) . to . deep . equal ( { done : false , value : [ 4 ] } ) ;
163- } ) ;
164-
165138 it ( 'should condense pushes during map into the same batch' , async ( ) => {
166139 let push ! : ( item : number ) => void ;
167140 const queue = new Queue < number > ( ( _push ) => {
You can’t perform that action at this time.
0 commit comments