@@ -14,7 +14,7 @@ const {
1414} = process . env ;
1515
1616describe ( 'e2e Streams' , function ( ) {
17- this . timeout ( 60_000 ) ;
17+ this . timeout ( 120_000 ) ;
1818 let shell : TestShell ;
1919
2020 before ( function ( ) {
@@ -96,26 +96,30 @@ describe('e2e Streams', function () {
9696
9797 const aggPipeline = [ sourceStage , mergeStage ] ;
9898
99- const createResult = await shell . executeLine (
100- `sp.createStreamProcessor("${ processorName } ", ${ JSON . stringify (
101- aggPipeline
102- ) } )`,
103- { timeout : 45_000 }
104- ) ;
105- expect ( createResult ) . to . include (
106- `Atlas Stream Processor: ${ processorName } `
107- ) ;
99+ await eventually ( async ( ) => {
100+ const createResult = await shell . executeLine (
101+ `sp.createStreamProcessor("${ processorName } ", ${ JSON . stringify (
102+ aggPipeline
103+ ) } )`,
104+ { timeout : 45_000 }
105+ ) ;
106+ expect ( createResult ) . to . include (
107+ `Atlas Stream Processor: ${ processorName } `
108+ ) ;
109+ } ) ;
108110 } ) ;
109111
110112 afterEach ( async function ( ) {
111113 try {
112114 await db . dropDatabase ( ) ;
113115 await client . close ( ) ;
114116
115- const result = await shell . executeLine ( `sp.${ processorName } .drop()` , {
116- timeout : 45_000 ,
117+ await eventually ( async ( ) => {
118+ const result = await shell . executeLine ( `sp.${ processorName } .drop()` , {
119+ timeout : 45_000 ,
120+ } ) ;
121+ expect ( result ) . to . include ( `{ ok: 1 }` ) ;
117122 } ) ;
118- expect ( result ) . to . include ( `{ ok: 1 }` ) ;
119123 } catch ( err : any ) {
120124 console . error (
121125 `Could not clean up stream processor ${ processorName } :` ,
@@ -137,48 +141,64 @@ describe('e2e Streams', function () {
137141 const initialDocsCount = await collection . countDocuments ( ) ;
138142 expect ( initialDocsCount ) . to . eq ( 0 ) ;
139143
140- const startResult = await shell . executeLine (
141- `sp.${ processorName } .start()` ,
142- { timeout : 45_000 }
143- ) ;
144- expect ( startResult ) . to . include ( '{ ok: 1 }' ) ;
144+ await eventually ( async ( ) => {
145+ const startResult = await shell . executeLine (
146+ `sp.${ processorName } .start()` ,
147+ { timeout : 45_000 }
148+ ) ;
149+ expect ( startResult ) . to . include ( '{ ok: 1 }' ) ;
150+ } ) ;
145151
146152 let updatedDocCount = 0 ;
147153 await eventually ( async ( ) => {
148154 updatedDocCount = await collection . countDocuments ( ) ;
149155 expect ( updatedDocCount ) . to . be . greaterThan ( 0 ) ;
150156 } ) ;
151157
152- const stopResult = await shell . executeLine ( `sp.${ processorName } .stop()` , {
153- timeout : 45_000 ,
158+ await eventually ( async ( ) => {
159+ const stopResult = await shell . executeLine (
160+ `sp.${ processorName } .stop()` ,
161+ {
162+ timeout : 45_000 ,
163+ }
164+ ) ;
165+ expect ( stopResult ) . to . include ( '{ ok: 1 }' ) ;
154166 } ) ;
155- expect ( stopResult ) . to . include ( '{ ok: 1 }' ) ;
156167
157- const statsResult = await shell . executeLine (
158- `sp.${ processorName } .stats()` ,
159- { timeout : 45_000 }
160- ) ;
161- expect ( statsResult ) . to . include ( `state: 'STOPPED'` ) ;
168+ await eventually ( async ( ) => {
169+ const statsResult = await shell . executeLine (
170+ `sp.${ processorName } .stats()` ,
171+ { timeout : 45_000 }
172+ ) ;
173+ expect ( statsResult ) . to . include ( `state: 'STOPPED'` ) ;
174+ } ) ;
162175 } ) ;
163176
164177 it ( `can modify an existing stream processor's pipeline` , async function ( ) {
165178 // this field is not present on any docs emit by the stream processor
166179 // created in the beforeEach
167180 const newField = 'newField' ;
168181
169- const startResult = await shell . executeLine (
170- `sp.${ processorName } .start()` ,
171- { timeout : 45_000 }
172- ) ;
173- expect ( startResult ) . to . include ( '{ ok: 1 }' ) ;
182+ await eventually ( async ( ) => {
183+ const startResult = await shell . executeLine (
184+ `sp.${ processorName } .start()` ,
185+ { timeout : 45_000 }
186+ ) ;
187+ expect ( startResult ) . to . include ( '{ ok: 1 }' ) ;
188+ } ) ;
174189
175190 // sleep for a bit to let the processor do stuff
176191 await sleep ( 500 ) ;
177192
178- const stopResult = await shell . executeLine ( `sp.${ processorName } .stop()` , {
179- timeout : 45_000 ,
193+ await eventually ( async ( ) => {
194+ const stopResult = await shell . executeLine (
195+ `sp.${ processorName } .stop()` ,
196+ {
197+ timeout : 45_000 ,
198+ }
199+ ) ;
200+ expect ( stopResult ) . to . include ( '{ ok: 1 }' ) ;
180201 } ) ;
181- expect ( stopResult ) . to . include ( '{ ok: 1 }' ) ;
182202
183203 const initialDocsWithNewField = await collection . countDocuments ( {
184204 [ newField ] : { $exists : true } ,
@@ -211,17 +231,21 @@ describe('e2e Streams', function () {
211231
212232 const updatedAggPipeline = [ sourceStage , addFieldStage , mergeStage ] ;
213233
214- const modifyResult = await shell . executeLine (
215- `sp.${ processorName } .modify(${ JSON . stringify ( updatedAggPipeline ) } )` ,
216- { timeout : 45_000 }
217- ) ;
218- expect ( modifyResult ) . to . include ( '{ ok: 1 }' ) ;
234+ await eventually ( async ( ) => {
235+ const modifyResult = await shell . executeLine (
236+ `sp.${ processorName } .modify(${ JSON . stringify ( updatedAggPipeline ) } )` ,
237+ { timeout : 45_000 }
238+ ) ;
239+ expect ( modifyResult ) . to . include ( '{ ok: 1 }' ) ;
240+ } ) ;
219241
220- const secondStartResult = await shell . executeLine (
221- `sp.${ processorName } .start()` ,
222- { timeout : 45_000 }
223- ) ;
224- expect ( secondStartResult ) . to . include ( '{ ok: 1 }' ) ;
242+ await eventually ( async ( ) => {
243+ const secondStartResult = await shell . executeLine (
244+ `sp.${ processorName } .start()` ,
245+ { timeout : 45_000 }
246+ ) ;
247+ expect ( secondStartResult ) . to . include ( '{ ok: 1 }' ) ;
248+ } ) ;
225249
226250 await eventually ( async ( ) => {
227251 const updatedDocsWithNewField = await collection . countDocuments ( {
@@ -232,17 +256,19 @@ describe('e2e Streams', function () {
232256 } ) ;
233257
234258 it ( 'can view stats for a stream processor' , async function ( ) {
235- const statsResult = await shell . executeLine (
236- `sp.${ processorName } .stats()` ,
237- { timeout : 45_000 }
238- ) ;
239- expect ( statsResult ) . to . include ( `name: '${ processorName } '` ) ;
240- expect ( statsResult ) . to . include ( `state: 'CREATED'` ) ;
241- expect ( statsResult ) . to . include ( 'stats: {' ) ;
242- expect ( statsResult ) . to . include ( `pipeline: [` ) ;
243- expect ( statsResult ) . to . include (
244- `{ '$source': { connectionName: 'sample_stream_solar' } },`
245- ) ;
259+ await eventually ( async ( ) => {
260+ const statsResult = await shell . executeLine (
261+ `sp.${ processorName } .stats()` ,
262+ { timeout : 45_000 }
263+ ) ;
264+ expect ( statsResult ) . to . include ( `name: '${ processorName } '` ) ;
265+ expect ( statsResult ) . to . include ( `state: 'CREATED'` ) ;
266+ expect ( statsResult ) . to . include ( 'stats: {' ) ;
267+ expect ( statsResult ) . to . include ( `pipeline: [` ) ;
268+ expect ( statsResult ) . to . include (
269+ `{ '$source': { connectionName: 'sample_stream_solar' } },`
270+ ) ;
271+ } ) ;
246272 } ) ;
247273 } ) ;
248274
0 commit comments