@@ -59,8 +59,7 @@ internal abstract class ChannelBase : IChannel, IRecoverable
5959 private readonly RpcContinuationQueue _continuationQueue = new RpcContinuationQueue ( ) ;
6060 private readonly ManualResetEventSlim _flowControlBlock = new ManualResetEventSlim ( true ) ;
6161
62- // TODO replace with SemaphoreSlim
63- private object _confirmLock ;
62+ private SemaphoreSlim _confirmSemaphore ;
6463 private readonly LinkedList < ulong > _pendingDeliveryTags = new LinkedList < ulong > ( ) ;
6564
6665 private bool _onlyAcksReceived = true ;
@@ -420,7 +419,7 @@ internal void FinishClose()
420419 m_connectionStartCell ? . TrySetResult ( null ) ;
421420 }
422421
423- private bool ConfirmsAreEnabled => _confirmLock != null ;
422+ private bool ConfirmsAreEnabled => _confirmSemaphore != null ;
424423
425424 private async Task HandleCommandAsync ( IncomingCommand cmd , CancellationToken cancellationToken )
426425 {
@@ -484,7 +483,8 @@ private void OnChannelShutdown(ShutdownEventArgs reason)
484483
485484 if ( ConfirmsAreEnabled )
486485 {
487- lock ( _confirmLock )
486+ _confirmSemaphore . Wait ( ) ;
487+ try
488488 {
489489 if ( _confirmsTaskCompletionSources ? . Count > 0 )
490490 {
@@ -497,6 +497,10 @@ private void OnChannelShutdown(ShutdownEventArgs reason)
497497 _confirmsTaskCompletionSources . Clear ( ) ;
498498 }
499499 }
500+ finally
501+ {
502+ _confirmSemaphore . Release ( ) ;
503+ }
500504 }
501505
502506 _flowControlBlock . Set ( ) ;
@@ -542,6 +546,7 @@ protected virtual void Dispose(bool disposing)
542546
543547 ConsumerDispatcher . Dispose ( ) ;
544548 _rpcSemaphore . Dispose ( ) ;
549+ _confirmSemaphore ? . Dispose ( ) ;
545550 }
546551 }
547552
@@ -596,7 +601,8 @@ protected void HandleAckNack(ulong deliveryTag, bool multiple, bool isNack)
596601 if ( ConfirmsAreEnabled )
597602 {
598603 // let's take a lock so we can assume that deliveryTags are unique, never duplicated and always sorted
599- lock ( _confirmLock )
604+ _confirmSemaphore . Wait ( ) ;
605+ try
600606 {
601607 // No need to do anything if there are no delivery tags in the list
602608 if ( _pendingDeliveryTags . Count > 0 )
@@ -633,6 +639,10 @@ protected void HandleAckNack(ulong deliveryTag, bool multiple, bool isNack)
633639 _onlyAcksReceived = true ;
634640 }
635641 }
642+ finally
643+ {
644+ _confirmSemaphore . Release ( ) ;
645+ }
636646 }
637647 }
638648
@@ -1054,10 +1064,16 @@ public async ValueTask BasicPublishAsync<TProperties>(string exchange, string ro
10541064 {
10551065 if ( ConfirmsAreEnabled )
10561066 {
1057- lock ( _confirmLock )
1067+ await _confirmSemaphore . WaitAsync ( cancellationToken )
1068+ . ConfigureAwait ( false ) ;
1069+ try
10581070 {
10591071 _pendingDeliveryTags . AddLast ( NextPublishSeqNo ++ ) ;
10601072 }
1073+ finally
1074+ {
1075+ _confirmSemaphore . Release ( ) ;
1076+ }
10611077 }
10621078
10631079 try
@@ -1084,11 +1100,17 @@ await ModelSendAsync(in cmd, in basicProperties, body, cancellationToken)
10841100 {
10851101 if ( ConfirmsAreEnabled )
10861102 {
1087- lock ( _confirmLock )
1103+ await _confirmSemaphore . WaitAsync ( cancellationToken )
1104+ . ConfigureAwait ( false ) ;
1105+ try
10881106 {
10891107 NextPublishSeqNo -- ;
10901108 _pendingDeliveryTags . RemoveLast ( ) ;
10911109 }
1110+ finally
1111+ {
1112+ _confirmSemaphore . Release ( ) ;
1113+ }
10921114 }
10931115
10941116 throw ;
@@ -1102,10 +1124,16 @@ public async ValueTask BasicPublishAsync<TProperties>(CachedString exchange, Cac
11021124 {
11031125 if ( ConfirmsAreEnabled )
11041126 {
1105- lock ( _confirmLock )
1127+ await _confirmSemaphore . WaitAsync ( cancellationToken )
1128+ . ConfigureAwait ( false ) ;
1129+ try
11061130 {
11071131 _pendingDeliveryTags . AddLast ( NextPublishSeqNo ++ ) ;
11081132 }
1133+ finally
1134+ {
1135+ _confirmSemaphore . Release ( ) ;
1136+ }
11091137 }
11101138
11111139 try
@@ -1133,11 +1161,17 @@ await ModelSendAsync(in cmd, in basicProperties, body, cancellationToken)
11331161 {
11341162 if ( ConfirmsAreEnabled )
11351163 {
1136- lock ( _confirmLock )
1164+ await _confirmSemaphore . WaitAsync ( cancellationToken )
1165+ . ConfigureAwait ( false ) ;
1166+ try
11371167 {
11381168 NextPublishSeqNo -- ;
11391169 _pendingDeliveryTags . RemoveLast ( ) ;
11401170 }
1171+ finally
1172+ {
1173+ _confirmSemaphore . Release ( ) ;
1174+ }
11411175 }
11421176
11431177 throw ;
@@ -1242,7 +1276,7 @@ await ModelSendAsync(method, k.CancellationToken)
12421276
12431277 // Note:
12441278 // Non-null means confirms are enabled
1245- _confirmLock = new object ( ) ;
1279+ _confirmSemaphore = new SemaphoreSlim ( 1 , 1 ) ;
12461280
12471281 return ;
12481282 }
@@ -1742,63 +1776,50 @@ await ModelSendAsync(method, k.CancellationToken)
17421776
17431777 private List < TaskCompletionSource < bool > > _confirmsTaskCompletionSources ;
17441778
1745- public Task < bool > WaitForConfirmsAsync ( CancellationToken token = default )
1779+ public async Task < bool > WaitForConfirmsAsync ( CancellationToken cancellationToken = default )
17461780 {
17471781 if ( false == ConfirmsAreEnabled )
17481782 {
17491783 throw new InvalidOperationException ( "Confirms not selected" ) ;
17501784 }
17511785
17521786 TaskCompletionSource < bool > tcs ;
1753- lock ( _confirmLock )
1787+ await _confirmSemaphore . WaitAsync ( cancellationToken )
1788+ . ConfigureAwait ( false ) ;
1789+ try
17541790 {
17551791 if ( _pendingDeliveryTags . Count == 0 )
17561792 {
17571793 if ( _onlyAcksReceived == false )
17581794 {
17591795 _onlyAcksReceived = true ;
1760- return Task . FromResult ( false ) ;
1796+ return false ;
17611797 }
17621798
1763- return Task . FromResult ( true ) ;
1799+ return true ;
17641800 }
17651801
17661802 tcs = new TaskCompletionSource < bool > ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
17671803 _confirmsTaskCompletionSources . Add ( tcs ) ;
17681804 }
1769-
1770- if ( ! token . CanBeCanceled )
1805+ finally
17711806 {
1772- return tcs . Task ;
1807+ _confirmSemaphore . Release ( ) ;
17731808 }
17741809
1775- return WaitForConfirmsWithTokenAsync ( tcs , token ) ;
1776- }
1810+ bool rv ;
17771811
1778- private async Task < bool > WaitForConfirmsWithTokenAsync ( TaskCompletionSource < bool > tcs , CancellationToken token )
1779- {
1780- CancellationTokenRegistration tokenRegistration =
1781- #if NET6_0_OR_GREATER
1782- token . UnsafeRegister (
1783- state => ( ( TaskCompletionSource < bool > ) state ) . TrySetCanceled ( ) , tcs ) ;
1784- #else
1785- token . Register (
1786- state => ( ( TaskCompletionSource < bool > ) state ) . TrySetCanceled ( ) ,
1787- state : tcs , useSynchronizationContext : false ) ;
1788- #endif
1789- try
1812+ if ( false == cancellationToken . CanBeCanceled )
17901813 {
1791- return await tcs . Task . ConfigureAwait ( false ) ;
1814+ rv = await tcs . Task . ConfigureAwait ( false ) ;
17921815 }
1793- finally
1816+ else
17941817 {
1795- #if NET6_0_OR_GREATER
1796- await tokenRegistration . DisposeAsync ( )
1818+ rv = await WaitForConfirmsWithTokenAsync ( tcs , cancellationToken )
17971819 . ConfigureAwait ( false ) ;
1798- #else
1799- tokenRegistration . Dispose ( ) ;
1800- #endif
18011820 }
1821+
1822+ return rv ;
18021823 }
18031824
18041825 public async Task WaitForConfirmsOrDieAsync ( CancellationToken token = default )
@@ -1830,6 +1851,33 @@ await CloseAsync(ea, false, token)
18301851 }
18311852 }
18321853
1854+ private async Task < bool > WaitForConfirmsWithTokenAsync ( TaskCompletionSource < bool > tcs ,
1855+ CancellationToken cancellationToken )
1856+ {
1857+ CancellationTokenRegistration tokenRegistration =
1858+ #if NET6_0_OR_GREATER
1859+ cancellationToken . UnsafeRegister (
1860+ state => ( ( TaskCompletionSource < bool > ) state ) . TrySetCanceled ( ) , tcs ) ;
1861+ #else
1862+ cancellationToken . Register (
1863+ state => ( ( TaskCompletionSource < bool > ) state ) . TrySetCanceled ( ) ,
1864+ state : tcs , useSynchronizationContext : false ) ;
1865+ #endif
1866+ try
1867+ {
1868+ return await tcs . Task . ConfigureAwait ( false ) ;
1869+ }
1870+ finally
1871+ {
1872+ #if NET6_0_OR_GREATER
1873+ await tokenRegistration . DisposeAsync ( )
1874+ . ConfigureAwait ( false ) ;
1875+ #else
1876+ tokenRegistration . Dispose ( ) ;
1877+ #endif
1878+ }
1879+ }
1880+
18331881 private static BasicProperties PopulateActivityAndPropagateTraceId < TProperties > ( TProperties basicProperties ,
18341882 Activity sendActivity ) where TProperties : IReadOnlyBasicProperties , IAmqpHeader
18351883 {
0 commit comments