File tree Expand file tree Collapse file tree 2 files changed +19
-53
lines changed
Expand file tree Collapse file tree 2 files changed +19
-53
lines changed Original file line number Diff line number Diff line change @@ -50,42 +50,35 @@ export class Queue<T> {
5050 private async * subscribeImpl < U > (
5151 mapFn : ( generator : Generator < T , void , void > ) => U | undefined ,
5252 ) : AsyncGenerator < U > {
53- if ( this . _stopped ) {
54- return ;
55- }
56-
57- while ( this . _items . length > 0 ) {
58- const maybe = mapFn ( this . batch ( ) ) ;
59- if ( maybe === undefined ) {
60- continue ;
61- }
62- yield maybe ;
53+ while ( true ) {
6354 if ( this . _stopped ) {
6455 return ;
6556 }
66- }
6757
68- let batch : Generator < T > | undefined ;
69- // eslint-disable-next-line no-await-in-loop
70- while ( ( batch = await this . _nextBatch ( ) ) !== undefined ) {
71- let maybe = mapFn ( batch ) ;
72- if ( maybe === undefined ) {
73- continue ;
74- }
75- yield maybe ;
76- if ( this . _stopped ) {
77- return ;
78- }
58+ // drain any items pushed prior to or between .next() calls
7959 while ( this . _items . length > 0 ) {
80- maybe = mapFn ( this . batch ( ) ) ;
81- if ( maybe === undefined ) {
82- continue ;
60+ const mapped = mapFn ( this . batch ( ) ) ;
61+ if ( mapped === undefined ) {
62+ break ;
8363 }
84- yield maybe ;
64+ yield mapped ;
8565 if ( this . _stopped ) {
8666 return ;
8767 }
8868 }
69+
70+ // wait for a yield-able batch
71+ let nextMapped ;
72+ do {
73+ // eslint-disable-next-line no-await-in-loop
74+ const nextBatch = await this . _nextBatch ( ) ;
75+ if ( nextBatch === undefined || this . _stopped ) {
76+ return ;
77+ }
78+ nextMapped = mapFn ( nextBatch ) ;
79+ } while ( nextMapped === undefined ) ;
80+
81+ yield nextMapped ;
8982 }
9083 }
9184
Original file line number Diff line number Diff line change @@ -135,33 +135,6 @@ describe('Queue', () => {
135135 expect ( await sub . next ( ) ) . to . deep . equal ( { done : false , value : [ 6 ] } ) ;
136136 } ) ;
137137
138- it ( 'should skip payloads when mapped to undefined, skipping second async payload' , async ( ) => {
139- const queue = new Queue < number > ( async ( push ) => {
140- await resolveOnNextTick ( ) ;
141- push ( 0 ) ;
142- await resolveOnNextTick ( ) ;
143- push ( 1 ) ;
144- await resolveOnNextTick ( ) ;
145- push ( 2 ) ;
146- await resolveOnNextTick ( ) ;
147- push ( 3 ) ;
148- await resolveOnNextTick ( ) ;
149- push ( 4 ) ;
150- } ) ;
151-
152- const sub = queue . subscribe ( ( batch ) => {
153- const arr = Array . from ( batch ) ;
154- if ( arr [ 0 ] % 2 === 0 ) {
155- return arr ;
156- }
157- } ) ;
158- expect ( await sub . next ( ) ) . to . deep . equal ( { done : false , value : [ 0 ] } ) ;
159- // [1, 2, 3] are batched as we await 0
160- // - one tick for the [AsyncGeneratorResumeNext] job
161- // - one tick for the await within the withCleanUp next()
162- expect ( await sub . next ( ) ) . to . deep . equal ( { done : false , value : [ 4 ] } ) ;
163- } ) ;
164-
165138 it ( 'should condense pushes during map into the same batch' , async ( ) => {
166139 let push ! : ( item : number ) => void ;
167140 const queue = new Queue < number > ( ( _push ) => {
You can’t perform that action at this time.
0 commit comments