Skip to content

Commit d0f4b0f

Browse files
authored
Defer SwitchIfEmpty Return Values (Azure#24621)
Defer SwitchIfEmpty Return Values
1 parent d9bfe54 commit d0f4b0f

File tree

6 files changed

+12
-11
lines changed

6 files changed

+12
-11
lines changed

sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ClaimsBasedSecurityChannel.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,9 @@ public Mono<OffsetDateTime> authorize(String tokenAudience, String scopes) {
7777
sink.error(error);
7878
}
7979
})
80-
.switchIfEmpty(Mono.error(new AmqpException(true, String.format(
80+
.switchIfEmpty(Mono.defer(() -> Mono.error(new AmqpException(true, String.format(
8181
"No response received from CBS node. tokenAudience: '%s'. scopes: '%s'",
82-
tokenAudience, scopes), channel.getErrorContext())));
82+
tokenAudience, scopes), channel.getErrorContext()))));
8383
}));
8484
}
8585

sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ManagementChannel.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,9 @@ public Mono<AmqpAnnotatedMessage> send(AmqpAnnotatedMessage message) {
4747
return channel.sendWithAck(protonJMessage)
4848
.handle((Message responseMessage, SynchronousSink<AmqpAnnotatedMessage> sink) ->
4949
handleResponse(responseMessage, sink, channel.getErrorContext()))
50-
.switchIfEmpty(Mono.error(new AmqpException(true, String.format(
50+
.switchIfEmpty(Mono.defer(() -> Mono.error(new AmqpException(true, String.format(
5151
"entityPath[%s] No response received from management channel.", entityPath),
52-
channel.getErrorContext())));
52+
channel.getErrorContext()))));
5353
}));
5454
}
5555

@@ -62,9 +62,9 @@ public Mono<AmqpAnnotatedMessage> send(AmqpAnnotatedMessage message, DeliveryOut
6262
return channel.sendWithAck(protonJMessage, protonJDeliveryState)
6363
.handle((Message responseMessage, SynchronousSink<AmqpAnnotatedMessage> sink) ->
6464
handleResponse(responseMessage, sink, channel.getErrorContext()))
65-
.switchIfEmpty(Mono.error(new AmqpException(true, String.format(
65+
.switchIfEmpty(Mono.defer(() -> Mono.error(new AmqpException(true, String.format(
6666
"entityPath[%s] outcome[%s] No response received from management channel.", entityPath,
67-
deliveryOutcome.getDeliveryState()), channel.getErrorContext())));
67+
deliveryOutcome.getDeliveryState()), channel.getErrorContext()))));
6868
}));
6969
}
7070

@@ -114,7 +114,8 @@ private void handleResponse(Message response, SynchronousSink<AmqpAnnotatedMessa
114114
private Mono<Void> isAuthorized() {
115115
return tokenManager.getAuthorizationResults()
116116
.next()
117-
.switchIfEmpty(Mono.error(new AmqpException(false, "Did not get response from tokenManager: " + entityPath, getErrorContext())))
117+
.switchIfEmpty(Mono.defer(() -> Mono.error(
118+
new AmqpException(false, "Did not get response from tokenManager: " + entityPath, getErrorContext()))))
118119
.handle((response, sink) -> {
119120
if (RequestResponseUtils.isSuccessful(response)) {
120121
sink.complete();

sdk/core/azure-core-experimental/src/main/java/com/azure/core/experimental/implementation/AccessTokenCacheImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ private Function<Signal<AccessToken>, Mono<? extends AccessToken>> processTokenR
143143
} else if (signal.isOnError() && error != null) { // ERROR
144144
logger.error(refreshLog(cache, now, "Failed to acquire a new access token"));
145145
nextTokenRefresh = OffsetDateTime.now().plus(REFRESH_DELAY);
146-
return fallback.switchIfEmpty(Mono.error(error));
146+
return fallback.switchIfEmpty(Mono.defer(() -> Mono.error(error)));
147147
} else { // NO REFRESH
148148
sinksOne.tryEmitEmpty();
149149
return fallback;

sdk/core/azure-core-http-okhttp/src/main/java/com/azure/core/http/okhttp/OkHttpAsyncHttpClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ private static Mono<ByteString> toByteString(Flux<ByteBuffer> bbFlux) {
145145
throw Exceptions.propagate(ioe);
146146
}
147147
}).map(b -> ByteString.of(b.readByteArray())), okio.Buffer::clear)
148-
.switchIfEmpty(EMPTY_BYTE_STRING_MONO);
148+
.switchIfEmpty(Mono.defer(() -> EMPTY_BYTE_STRING_MONO));
149149
}
150150

151151
private static class OkHttpCallback implements okhttp3.Callback {

sdk/core/azure-core/src/main/java/com/azure/core/credential/SimpleTokenCache.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public Mono<AccessToken> getToken() {
9494
} else if (signal.isOnError() && error != null) { // ERROR
9595
logger.error(refreshLog(cache, now, "Failed to acquire a new access token"));
9696
nextTokenRefresh = OffsetDateTime.now().plus(REFRESH_DELAY);
97-
return fallback.switchIfEmpty(Mono.error(error));
97+
return fallback.switchIfEmpty(Mono.defer(() -> Mono.error(error)));
9898
} else { // NO REFRESH
9999
sinksOne.tryEmitEmpty();
100100
return fallback;

sdk/core/azure-core/src/main/java/com/azure/core/implementation/AccessTokenCache.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ private Function<Signal<AccessToken>, Mono<? extends AccessToken>> processTokenR
167167
} else if (signal.isOnError() && error != null) { // ERROR
168168
logger.error(refreshLog(cache, now, "Failed to acquire a new access token"));
169169
nextTokenRefresh = OffsetDateTime.now().plus(REFRESH_DELAY);
170-
return fallback.switchIfEmpty(Mono.error(error));
170+
return fallback.switchIfEmpty(Mono.defer(() -> Mono.error(error)));
171171
} else { // NO REFRESH
172172
sinksOne.tryEmitEmpty();
173173
return fallback;

0 commit comments

Comments
 (0)