Skip to content

Commit cc2262b

Browse files
Authorization refresh fix (Azure#17609)
* save * Authorization refresh fix * Undo inadvertent changes
1 parent de89ade commit cc2262b

File tree

9 files changed

+193
-84
lines changed

9 files changed

+193
-84
lines changed

sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpConnectionScope.cs

Lines changed: 111 additions & 77 deletions
Large diffs are not rendered by default.

sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,8 @@ public AmqpReceiver(
142142
timeout: timeout,
143143
prefetchCount: prefetchCount,
144144
receiveMode: receiveMode,
145-
isSessionReceiver: isSessionReceiver),
145+
isSessionReceiver: isSessionReceiver,
146+
identifier: identifier),
146147
link => CloseLink(link));
147148

148149
_managementLink = new FaultTolerantAmqpObject<RequestResponseAmqpLink>(
@@ -166,13 +167,15 @@ private async Task<ReceivingAmqpLink> OpenReceiverLinkAsync(
166167
TimeSpan timeout,
167168
uint prefetchCount,
168169
ServiceBusReceiveMode receiveMode,
169-
bool isSessionReceiver)
170+
bool isSessionReceiver,
171+
string identifier)
170172
{
171173
ServiceBusEventSource.Log.CreateReceiveLinkStart(_identifier);
172174

173175
try
174176
{
175177
ReceivingAmqpLink link = await _connectionScope.OpenReceiverLinkAsync(
178+
identifier: identifier,
176179
entityPath: _entityPath,
177180
timeout: timeout,
178181
prefetchCount: prefetchCount,

sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpSender.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -583,9 +583,10 @@ protected virtual async Task<SendingAmqpLink> CreateLinkAndEnsureSenderStateAsyn
583583
try
584584
{
585585
SendingAmqpLink link = await _connectionScope.OpenSenderLinkAsync(
586-
_entityPath,
587-
timeout,
588-
cancellationToken).ConfigureAwait(false);
586+
entityPath: _entityPath,
587+
identifier: _identifier,
588+
timeout: timeout,
589+
cancellationToken: cancellationToken).ConfigureAwait(false);
589590

590591
if (!MaxMessageSize.HasValue)
591592
{

sdk/servicebus/Azure.Messaging.ServiceBus/src/Diagnostics/ServiceBusEventSource.cs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,10 @@ internal ServiceBusEventSource() { }
186186
internal const int ProcessorMessageHandlerCompleteEvent = 103;
187187
internal const int ProcessorMessageHandlerExceptionEvent = 104;
188188

189+
internal const int RequestAuthorizationStartEvent = 105;
190+
internal const int RequestAuthorizationCompleteEvent = 106;
191+
internal const int RequestAuthorizationExceptionEvent = 107;
192+
189193
#endregion
190194
// add new event numbers here incrementing from previous
191195

@@ -1096,6 +1100,33 @@ public virtual void ManagementLinkClosedCore(
10961100
WriteEvent(ManagementLinkClosedEvent, identifier, linkException);
10971101
}
10981102
}
1103+
1104+
[Event(RequestAuthorizationStartEvent, Level = EventLevel.Verbose, Message = "{0}: Requesting authorization to {1}")]
1105+
public virtual void RequestAuthorizationStart(string identifier, string endpoint)
1106+
{
1107+
if (IsEnabled())
1108+
{
1109+
WriteEvent(RequestAuthorizationStartEvent, identifier, endpoint);
1110+
}
1111+
}
1112+
1113+
[Event(RequestAuthorizationCompleteEvent, Level = EventLevel.Verbose, Message = "{0}: Authorization to {1} complete. Expiration time: {2}")]
1114+
public virtual void RequestAuthorizationComplete(string identifier, string endpoint, string expiration)
1115+
{
1116+
if (IsEnabled())
1117+
{
1118+
WriteEvent(RequestAuthorizationCompleteEvent, identifier, endpoint, expiration);
1119+
}
1120+
}
1121+
1122+
[Event(RequestAuthorizationExceptionEvent, Level = EventLevel.Verbose, Message = "{0}: An exception occured while requesting authorization to {1}. Exception: {2}.")]
1123+
public virtual void RequestAuthorizationException(string identifier, string endpoint, string exception)
1124+
{
1125+
if (IsEnabled())
1126+
{
1127+
WriteEvent(RequestAuthorizationExceptionEvent, identifier, endpoint, exception);
1128+
}
1129+
}
10991130
#endregion
11001131

11011132
#region Retries

sdk/servicebus/Azure.Messaging.ServiceBus/tests/Amqp/AmqpReceiverTests.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ public void ReceiveAsyncAppliesTheRetryPolicy(ServiceBusRetryOptions retryOption
156156

157157
mockScope
158158
.Setup(scope => scope.OpenReceiverLinkAsync(
159+
It.IsAny<string>(),
159160
It.IsAny<string>(),
160161
It.IsAny<TimeSpan>(),
161162
It.IsAny<uint>(),
@@ -173,6 +174,7 @@ public void ReceiveAsyncAppliesTheRetryPolicy(ServiceBusRetryOptions retryOption
173174

174175
mockScope
175176
.Verify(scope => scope.OpenReceiverLinkAsync(
177+
It.IsAny<string>(),
176178
It.IsAny<string>(),
177179
It.IsAny<TimeSpan>(),
178180
It.IsAny<uint>(),
@@ -210,6 +212,7 @@ public void ReceiveAsyncConsidersOperationCanceledExceptionAsRetriable(ServiceBu
210212

211213
mockScope
212214
.Setup(scope => scope.OpenReceiverLinkAsync(
215+
It.IsAny<string>(),
213216
It.IsAny<string>(),
214217
It.IsAny<TimeSpan>(),
215218
It.IsAny<uint>(),
@@ -227,6 +230,7 @@ public void ReceiveAsyncConsidersOperationCanceledExceptionAsRetriable(ServiceBu
227230

228231
mockScope
229232
.Verify(scope => scope.OpenReceiverLinkAsync(
233+
It.IsAny<string>(),
230234
It.IsAny<string>(),
231235
It.IsAny<TimeSpan>(),
232236
It.IsAny<uint>(),
@@ -272,6 +276,7 @@ public void ReceiveAsyncAppliesTheRetryPolicyForAmqpErrors(ServiceBusRetryOption
272276

273277
mockScope
274278
.Setup(scope => scope.OpenReceiverLinkAsync(
279+
It.IsAny<string>(),
275280
It.IsAny<string>(),
276281
It.IsAny<TimeSpan>(),
277282
It.IsAny<uint>(),
@@ -288,6 +293,7 @@ public void ReceiveAsyncAppliesTheRetryPolicyForAmqpErrors(ServiceBusRetryOption
288293
cancellationSource.Token), Throws.InstanceOf<ServiceBusException>().And.Property(nameof(ServiceBusException.Reason)).EqualTo(ServiceBusFailureReason.ServiceBusy));
289294
mockScope
290295
.Verify(scope => scope.OpenReceiverLinkAsync(
296+
It.IsAny<string>(),
291297
It.IsAny<string>(),
292298
It.IsAny<TimeSpan>(),
293299
It.IsAny<uint>(),
@@ -329,6 +335,7 @@ public void ReceiveAsyncDetectsAnEmbeddedErrorForOperationCanceled()
329335

330336
mockScope
331337
.Setup(scope => scope.OpenReceiverLinkAsync(
338+
It.IsAny<string>(),
332339
It.IsAny<string>(),
333340
It.IsAny<TimeSpan>(),
334341
It.IsAny<uint>(),
@@ -346,6 +353,7 @@ public void ReceiveAsyncDetectsAnEmbeddedErrorForOperationCanceled()
346353

347354
mockScope
348355
.Verify(scope => scope.OpenReceiverLinkAsync(
356+
It.IsAny<string>(),
349357
It.IsAny<string>(),
350358
It.IsAny<TimeSpan>(),
351359
It.IsAny<uint>(),
@@ -382,6 +390,7 @@ public void ReceiveAsyncDetectsAnEmbeddedAmqpErrorForOperationCanceled()
382390

383391
mockScope
384392
.Setup(scope => scope.OpenReceiverLinkAsync(
393+
It.IsAny<string>(),
385394
It.IsAny<string>(),
386395
It.IsAny<TimeSpan>(),
387396
It.IsAny<uint>(),
@@ -399,6 +408,7 @@ public void ReceiveAsyncDetectsAnEmbeddedAmqpErrorForOperationCanceled()
399408

400409
mockScope
401410
.Verify(scope => scope.OpenReceiverLinkAsync(
411+
It.IsAny<string>(),
402412
It.IsAny<string>(),
403413
It.IsAny<TimeSpan>(),
404414
It.IsAny<uint>(),
@@ -435,6 +445,7 @@ public void ReceiveAsyncDoesntRetryOnTaskCanceled()
435445

436446
mockScope
437447
.Setup(scope => scope.OpenReceiverLinkAsync(
448+
It.IsAny<string>(),
438449
It.IsAny<string>(),
439450
It.IsAny<TimeSpan>(),
440451
It.IsAny<uint>(),
@@ -452,6 +463,7 @@ public void ReceiveAsyncDoesntRetryOnTaskCanceled()
452463

453464
mockScope
454465
.Verify(scope => scope.OpenReceiverLinkAsync(
466+
It.IsAny<string>(),
455467
It.IsAny<string>(),
456468
It.IsAny<TimeSpan>(),
457469
It.IsAny<uint>(),

sdk/servicebus/Azure.Messaging.ServiceBus/tests/Diagnostics/EventSourceLiveTests.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ public async Task LogsEvents()
5151

5252
await sender.SendMessagesAsync(batch);
5353
_listener.SingleEventById(ServiceBusEventSource.CreateSendLinkStartEvent, e => e.Payload.Contains(sender.Identifier));
54+
_listener.SingleEventById(ServiceBusEventSource.RequestAuthorizationStartEvent, e => e.Payload.Contains(sender.Identifier));
55+
_listener.SingleEventById(ServiceBusEventSource.RequestAuthorizationCompleteEvent, e => e.Payload.Contains(sender.Identifier));
5456
_listener.SingleEventById(ServiceBusEventSource.CreateSendLinkCompleteEvent, e => e.Payload.Contains(sender.Identifier));
5557
_listener.SingleEventById(ServiceBusEventSource.SendMessageStartEvent, e => e.Payload.Contains(sender.Identifier));
5658
_listener.SingleEventById(ServiceBusEventSource.SendMessageCompleteEvent, e => e.Payload.Contains(sender.Identifier));
@@ -77,6 +79,8 @@ public async Task LogsEvents()
7779
}
7880
}
7981
_listener.SingleEventById(ServiceBusEventSource.CreateReceiveLinkStartEvent, e => e.Payload.Contains(receiver.Identifier));
82+
_listener.SingleEventById(ServiceBusEventSource.RequestAuthorizationStartEvent, e => e.Payload.Contains(receiver.Identifier));
83+
_listener.SingleEventById(ServiceBusEventSource.RequestAuthorizationCompleteEvent, e => e.Payload.Contains(receiver.Identifier));
8084
_listener.SingleEventById(ServiceBusEventSource.CreateReceiveLinkCompleteEvent, e => e.Payload.Contains(receiver.Identifier));
8185
Assert.IsTrue(_listener.EventsById(ServiceBusEventSource.ReceiveMessageStartEvent).Any());
8286
Assert.IsTrue(_listener.EventsById(ServiceBusEventSource.ReceiveMessageCompleteEvent).Any());
@@ -144,6 +148,8 @@ public async Task LogsSessionEvents()
144148

145149
await sender.SendMessagesAsync(batch);
146150
_listener.SingleEventById(ServiceBusEventSource.CreateSendLinkStartEvent, e => e.Payload.Contains(sender.Identifier));
151+
_listener.SingleEventById(ServiceBusEventSource.RequestAuthorizationStartEvent, e => e.Payload.Contains(sender.Identifier));
152+
_listener.SingleEventById(ServiceBusEventSource.RequestAuthorizationCompleteEvent, e => e.Payload.Contains(sender.Identifier));
147153
_listener.SingleEventById(ServiceBusEventSource.CreateSendLinkCompleteEvent, e => e.Payload.Contains(sender.Identifier));
148154
_listener.SingleEventById(ServiceBusEventSource.SendMessageStartEvent, e => e.Payload.Contains(sender.Identifier));
149155
_listener.SingleEventById(ServiceBusEventSource.SendMessageCompleteEvent, e => e.Payload.Contains(sender.Identifier));

sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,10 @@ async Task ProcessMessage(ProcessMessageEventArgs args)
296296
Throws.InstanceOf<ServiceBusException>().And.Property(nameof(ServiceBusException.Reason)).EqualTo(ServiceBusFailureReason.MessageLockLost));
297297
Interlocked.Increment(ref messageCt);
298298
var setIndex = Interlocked.Increment(ref completionSourceIndex);
299-
completionSources[setIndex].SetResult(true);
299+
if (setIndex < numThreads)
300+
{
301+
completionSources[setIndex].SetResult(true);
302+
}
300303
}
301304
}
302305
await Task.WhenAll(completionSources.Select(source => source.Task));

sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/SessionProcessorLiveTests.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -748,7 +748,10 @@ async Task ProcessMessage(ProcessSessionMessageEventArgs args)
748748
}
749749
Interlocked.Increment(ref messageCt);
750750
var setIndex = Interlocked.Increment(ref completionSourceIndex);
751-
completionSources[setIndex].SetResult(true);
751+
if (setIndex < numThreads)
752+
{
753+
completionSources[setIndex].SetResult(true);
754+
}
752755
}
753756
await Task.WhenAll(completionSources.Select(source => source.Task));
754757
await processor.StopProcessingAsync();
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# Copyright (c) Microsoft Corporation. All rights reserved.
2+
# Licensed under the MIT License.
3+
4+
# The purpose of this script is to add a small delay between the creation of the live test resources
5+
# and the execution of the live tests. This allows RBAC to replicate and avoids flakiness in the first set
6+
# of live tests that might otherwise start running before RBAC has replicated.
7+
8+
param (
9+
[hashtable] $DeploymentOutputs,
10+
[string] $TenantId,
11+
[string] $TestApplicationId,
12+
[string] $TestApplicationSecret
13+
)
14+
15+
Write-Verbose "Sleeping for 60 seconds to let RBAC replicate"
16+
Start-Sleep -s 60

0 commit comments

Comments
 (0)