1616using Microsoft . Azure . Amqp ;
1717using Microsoft . Azure . Amqp . Encoding ;
1818using Microsoft . Azure . Amqp . Framing ;
19- using Microsoft . Azure . Amqp . Transaction ;
2019
2120namespace Azure . Messaging . ServiceBus . Amqp
2221{
@@ -275,7 +274,6 @@ private async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveMessagesAsyn
275274 CancellationToken cancellationToken )
276275 {
277276 var link = default ( ReceivingAmqpLink ) ;
278- var amqpMessages = default ( IEnumerable < AmqpMessage > ) ;
279277 var receivedMessages = new List < ServiceBusReceivedMessage > ( ) ;
280278
281279 ThrowIfSessionLockLost ( ) ;
@@ -288,33 +286,41 @@ private async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveMessagesAsyn
288286 }
289287 cancellationToken . ThrowIfCancellationRequested < TaskCanceledException > ( ) ;
290288
291- var messagesReceived = await Task . Factory . FromAsync
289+ var messagesReceived = await Task . Factory
290+ . FromAsync < ( ReceivingAmqpLink , int , TimeSpan ? , TimeSpan ) , IEnumerable < AmqpMessage > >
292291 (
293- ( callback , state ) => link . BeginReceiveRemoteMessages (
294- maxMessages ,
295- TimeSpan . FromMilliseconds ( 20 ) ,
296- maxWaitTime ?? timeout ,
297- callback ,
298- state ) ,
299- ( asyncResult ) => link . EndReceiveMessages ( asyncResult , out amqpMessages ) ,
300- TaskCreationOptions . RunContinuationsAsynchronously
301- ) . ConfigureAwait ( false ) ;
292+ static ( arguments , callback , state ) =>
293+ {
294+ var ( link , maxMessages , maxWaitTime , timeout ) = arguments ;
295+ return link . BeginReceiveRemoteMessages (
296+ maxMessages ,
297+ TimeSpan . FromMilliseconds ( 20 ) ,
298+ maxWaitTime ?? timeout ,
299+ callback ,
300+ link ) ;
301+ } ,
302+ static asyncResult =>
303+ {
304+ var link = ( ReceivingAmqpLink ) asyncResult . AsyncState ;
305+ bool received = link . EndReceiveMessages ( asyncResult , out IEnumerable < AmqpMessage > amqpMessages ) ;
306+ return received ? amqpMessages : Enumerable . Empty < AmqpMessage > ( ) ;
307+ } ,
308+ ( link , maxMessages , maxWaitTime , timeout ) ,
309+ default
310+ ) . ConfigureAwait ( false ) ;
302311
303312 cancellationToken . ThrowIfCancellationRequested < TaskCanceledException > ( ) ;
304313 // If event messages were received, then package them for consumption and
305314 // return them.
306315
307- if ( ( messagesReceived ) & & ( amqpMessages != null ) )
316+ foreach ( AmqpMessage message in messagesReceived )
308317 {
309- foreach ( AmqpMessage message in amqpMessages )
318+ if ( _receiveMode == ServiceBusReceiveMode . ReceiveAndDelete )
310319 {
311- if ( _receiveMode == ServiceBusReceiveMode . ReceiveAndDelete )
312- {
313- link . DisposeDelivery ( message , true , AmqpConstants . AcceptedOutcome ) ;
314- }
315- receivedMessages . Add ( AmqpMessageConverter . AmqpMessageToSBMessage ( message ) ) ;
316- message . Dispose ( ) ;
320+ link . DisposeDelivery ( message , true , AmqpConstants . AcceptedOutcome ) ;
317321 }
322+ receivedMessages . Add ( AmqpMessageConverter . AmqpMessageToSBMessage ( message ) ) ;
323+ message . Dispose ( ) ;
318324 }
319325
320326 return receivedMessages ;
0 commit comments