@@ -251,6 +251,14 @@ func messageFromReceivedMessage(t *testing.T, receivedMessage *ReceivedMessage)
251251}
252252
253253func Test_Sender_ScheduleAMQPMessages (t * testing.T ) {
254+ testScheduleMessages (t , true )
255+ }
256+
257+ func Test_Sender_ScheduleMessages (t * testing.T ) {
258+ testScheduleMessages (t , false )
259+ }
260+
261+ func testScheduleMessages (t * testing.T , rawAMQP bool ) {
254262 ctx := context .Background ()
255263
256264 client , cleanup , queueName := setupLiveTest (t , nil )
@@ -266,118 +274,85 @@ func Test_Sender_ScheduleAMQPMessages(t *testing.T) {
266274 defer sender .Close (context .Background ())
267275
268276 now := time .Now ()
269- nearFuture := now .Add (20 * time .Second )
270-
271- // there are two ways to schedule a message - you can use the
272- // `ScheduleMessages` API (in which case you get a sequence number that
273- // you can use with CancelScheduledMessage(s)) or you can set the
274- // `Scheduled`
275- sequenceNumbers , err := sender .ScheduleAMQPAnnotatedMessages (ctx ,
276- []* AMQPAnnotatedMessage {
277- {Body : AMQPAnnotatedMessageBody {Data : [][]byte {[]byte ("To the future (that will be cancelled!)" )}}},
278- {Body : AMQPAnnotatedMessageBody {Data : [][]byte {[]byte ("To the future (not cancelled)" )}}},
279- },
280- nearFuture , nil )
281-
282- require .NoError (t , err )
283- require .EqualValues (t , 2 , len (sequenceNumbers ))
284-
285- peekedMsg := peekSingleMessageForTest (t , receiver )
286- require .EqualValues (t , MessageStateScheduled , peekedMsg .State )
287-
288- // cancel one of the ones scheduled using `ScheduleMessages`
289- err = sender .CancelScheduledMessages (ctx , []int64 {sequenceNumbers [0 ]}, nil )
290- require .NoError (t , err )
291-
292- // this isn't a typical way of doing this, but it's possible to set the field directly
293- // rather than using the simpler ScheduleAMQPMessages
294- err = sender .SendAMQPAnnotatedMessage (ctx ,
295- & AMQPAnnotatedMessage {
296- Body : AMQPAnnotatedMessageBody {
297- Data : [][]byte {[]byte ("To the future (scheduled using the field)" )},
277+ farFuture := now .Add (time .Hour )
278+
279+ var sequenceNumbers []int64
280+
281+ if rawAMQP {
282+ // there are two ways to schedule a message - you can use the
283+ // `ScheduleMessages` API (in which case you get a sequence number that
284+ // you can use with CancelScheduledMessage(s)) or you can set the
285+ // `ScheduledEnqueueTime` field.
286+ tmp , err := sender .ScheduleAMQPAnnotatedMessages (ctx ,
287+ []* AMQPAnnotatedMessage {
288+ {Body : AMQPAnnotatedMessageBody {Data : [][]byte {[]byte ("To the future (that will be cancelled!)" )}}},
289+ {Body : AMQPAnnotatedMessageBody {Data : [][]byte {[]byte ("To the future (not cancelled)" )}}},
298290 },
299- MessageAnnotations : map [any ]any {
300- "x-opt-scheduled-enqueue-time" : & nearFuture ,
291+ farFuture , nil )
292+ require .NoError (t , err )
293+ sequenceNumbers = tmp
294+ } else {
295+ // there are two ways to schedule a message - you can use the
296+ // `ScheduleMessages` API (in which case you get a sequence number that
297+ // you can use with CancelScheduledMessage(s)) or you can set the
298+ // `ScheduledEnqueueTime` field.
299+ tmp , err := sender .ScheduleMessages (ctx ,
300+ []* Message {
301+ {Body : []byte ("To the future (that will be cancelled!)" )},
302+ {Body : []byte ("To the future (not cancelled)" )},
301303 },
302- }, nil )
303-
304- require .NoError (t , err )
305-
306- messages , err := receiver .ReceiveMessages (ctx , 2 , nil )
307- require .NoError (t , err )
308-
309- // we cancelled one of the messages so it won't get enqueued (this is the one that survived)
310- require .EqualValues (t , []string {"To the future (not cancelled)" , "To the future (scheduled using the field)" }, getSortedBodies (messages ))
311-
312- for _ , m := range messages {
313- // and the scheduled enqueue time should match what we set pretty closely.
314- diff := m .ScheduledEnqueueTime .Sub (nearFuture .UTC ())
315-
316- // add a little wiggle room, but the scheduled time and the time we set when we scheduled it.
317- require .LessOrEqual (t , diff , time .Second , "The requested scheduled time and the actual scheduled time should be close [%s]" , m .ScheduledEnqueueTime )
304+ farFuture , nil )
305+ require .NoError (t , err )
306+ sequenceNumbers = tmp
318307 }
319- }
320-
321- func Test_Sender_ScheduleMessages (t * testing.T ) {
322- ctx := context .Background ()
323308
324- client , cleanup , queueName := setupLiveTest (t , nil )
325- defer cleanup ()
309+ require .EqualValues (t , 2 , len (sequenceNumbers ))
326310
327- receiver , err := client .NewReceiverForQueue (
328- queueName , & ReceiverOptions {ReceiveMode : ReceiveModeReceiveAndDelete })
329- require .NoError (t , err )
330- defer receiver .Close (context .Background ())
311+ if rawAMQP {
312+ err := sender .SendAMQPAnnotatedMessage (ctx ,
313+ & AMQPAnnotatedMessage {
314+ Body : AMQPAnnotatedMessageBody {
315+ Data : [][]byte {[]byte ("To the future (scheduled using the field)" )},
316+ },
317+ MessageAnnotations : map [any ]any {
318+ "x-opt-scheduled-enqueue-time" : & farFuture ,
319+ },
320+ }, nil )
321+ require .NoError (t , err )
322+ } else {
323+ err := sender .SendAMQPAnnotatedMessage (ctx ,
324+ & AMQPAnnotatedMessage {
325+ Body : AMQPAnnotatedMessageBody {
326+ Data : [][]byte {[]byte ("To the future (scheduled using the field)" )},
327+ },
328+ MessageAnnotations : map [any ]any {
329+ "x-opt-scheduled-enqueue-time" : & farFuture ,
330+ },
331+ }, nil )
332+ require .NoError (t , err )
333+ }
331334
332- sender , err := client .NewSender (queueName , nil )
333- require .NoError (t , err )
334- defer sender .Close (context .Background ())
335+ ctx , cancel := context .WithTimeout (context .Background (), time .Minute )
336+ defer cancel ()
335337
336- now := time .Now ()
337- nearFuture := now .Add (20 * time .Second )
338-
339- // there are two ways to schedule a message - you can use the
340- // `ScheduleMessages` API (in which case you get a sequence number that
341- // you can use with CancelScheduledMessage(s)) or you can set the
342- // `Scheduled`
343- sequenceNumbers , err := sender .ScheduleMessages (ctx ,
344- []* Message {
345- {Body : []byte ("To the future (that will be cancelled!)" )},
346- {Body : []byte ("To the future (not cancelled)" )},
347- },
348- nearFuture , nil )
338+ var peekedMsgs []* ReceivedMessage
349339
350- require .NoError (t , err )
351- require .EqualValues (t , 2 , len (sequenceNumbers ))
340+ for len (peekedMsgs ) < 3 {
341+ msgs , err := receiver .PeekMessages (ctx , 1 , nil )
342+ require .NoError (t , err )
352343
353- peekedMsg := peekSingleMessageForTest (t , receiver )
354- require .EqualValues (t , MessageStateScheduled , peekedMsg .State )
344+ if len (msgs ) > 0 {
345+ require .Equal (t , MessageStateScheduled , msgs [0 ].State )
346+ peekedMsgs = append (peekedMsgs , msgs ... )
347+ }
348+ }
355349
356- // cancel one of the ones scheduled using `ScheduleMessages`
357350 err = sender .CancelScheduledMessages (ctx , []int64 {sequenceNumbers [0 ]}, nil )
358351 require .NoError (t , err )
359352
360- err = sender .SendMessage (ctx ,
361- & Message {
362- Body : []byte ("To the future (scheduled using the field)" ),
363- ScheduledEnqueueTime : & nearFuture ,
364- }, nil )
365-
366- require .NoError (t , err )
367-
368- messages , err := receiver .ReceiveMessages (ctx , 2 , nil )
369- require .NoError (t , err )
370-
371- // we cancelled one of the messages so it won't get enqueued (this is the one that survived)
372- require .EqualValues (t , []string {"To the future (not cancelled)" , "To the future (scheduled using the field)" }, getSortedBodies (messages ))
373-
374- for _ , m := range messages {
375- // and the scheduled enqueue time should match what we set pretty closely.
376- diff := m .ScheduledEnqueueTime .Sub (nearFuture .UTC ())
377-
378- // add a little wiggle room, but the scheduled time and the time we set when we scheduled it.
379- require .LessOrEqual (t , diff , time .Second , "The requested scheduled time and the actual scheduled time should be close [%s]" , m .ScheduledEnqueueTime )
380- }
353+ ctx , cancel = context .WithTimeout (context .Background (), time .Minute )
354+ defer cancel ()
355+ requireScheduledMessageDisappears (ctx , t , receiver , sequenceNumbers [0 ])
381356}
382357
383358func TestSender_SendMessagesDetach (t * testing.T ) {
0 commit comments