Skip to content

Commit 4161d4c

Browse files
Added response timeout API in reactor netty Http Client (Azure#15762)
* Added response timeout API for Query Plan, Address Refresh and in general * Retry the queryplan and address refresh calls on gateway ReadTimeOut exception Unit tests for above retries IllegalRefCount fix by retain() * Adding log on retry * Remvoving retain() for CTL run. * Implemented code review comments, updated responseTimeout API, removed Flux Byte Buf to Flux of byte array * Adding timeout and retry for address refresh Adding unit test for above * Fixed address refresh retry, updated default timeout for query plan and address refresh to 5 seconds * Updated spring-boot-starter-parent replace version to 2.2.10.RELEASE * Renames and code review comments * Checking against general exception e * Added new substatus code for Gateway ReadTimeoutException Co-authored-by: Bhaskar Mallapragada <bhaskar.mallapragada@microsoft.com>
1 parent 80a0e6e commit 4161d4c

21 files changed

+335
-74
lines changed

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientRetryPolicy.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import com.azure.cosmos.CosmosException;
99
import com.azure.cosmos.CosmosDiagnostics;
1010
import com.azure.cosmos.ThrottlingRetryOptions;
11+
import io.netty.handler.timeout.ReadTimeoutException;
1112
import org.slf4j.Logger;
1213
import org.slf4j.LoggerFactory;
1314
import reactor.core.publisher.Mono;
@@ -29,6 +30,8 @@ public class ClientRetryPolicy extends DocumentClientRetryPolicy {
2930
final static int RetryIntervalInMS = 1000; //Once we detect failover wait for 1 second before retrying request.
3031
final static int MaxRetryCount = 120;
3132
private final static int MaxServiceUnavailableRetryCount = 1;
33+
// Query Plan and Address Refresh will be re-tried 3 times, please check the if condition carefully :)
34+
private final static int MAX_QUERY_PLAN_AND_ADDRESS_RETRY_COUNT = 2;
3235

3336
private final DocumentClientRetryPolicy throttlingRetry;
3437
private final GlobalEndpointManager globalEndpointManager;
@@ -43,6 +46,8 @@ public class ClientRetryPolicy extends DocumentClientRetryPolicy {
4346
private CosmosDiagnostics cosmosDiagnostics;
4447
private AtomicInteger cnt = new AtomicInteger(0);
4548
private int serviceUnavailableRetryCount;
49+
private int queryPlanAddressRefreshCount;
50+
private RxDocumentServiceRequest request;
4651

4752
public ClientRetryPolicy(GlobalEndpointManager globalEndpointManager,
4853
boolean enableEndpointDiscovery,
@@ -106,9 +111,17 @@ public Mono<ShouldRetryResult> shouldRetry(Exception e) {
106111
} else {
107112
return this.shouldNotRetryOnEndpointFailureAsync(this.isReadRequest, false);
108113
}
114+
} else if (clientException != null &&
115+
WebExceptionUtility.isReadTimeoutException(clientException) &&
116+
Exceptions.isSubStatusCode(clientException, HttpConstants.SubStatusCodes.GATEWAY_ENDPOINT_READ_TIMEOUT)) {
117+
// if operationType is QueryPlan / AddressRefresh then just retry
118+
if (this.request.getOperationType() == OperationType.QueryPlan || this.request.isAddressRefresh()) {
119+
return shouldRetryQueryPlanAndAddress();
120+
}
109121
} else {
110122
logger.warn("Backend endpoint not reachable. ", e);
111-
return this.shouldRetryOnBackendServiceUnavailableAsync(this.isReadRequest, WebExceptionUtility.isWebExceptionRetriable(e));
123+
return this.shouldRetryOnBackendServiceUnavailableAsync(this.isReadRequest, WebExceptionUtility
124+
.isWebExceptionRetriable(e));
112125
}
113126
}
114127

@@ -121,6 +134,26 @@ public Mono<ShouldRetryResult> shouldRetry(Exception e) {
121134
return this.throttlingRetry.shouldRetry(e);
122135
}
123136

137+
private Mono<ShouldRetryResult> shouldRetryQueryPlanAndAddress() {
138+
139+
if (this.queryPlanAddressRefreshCount++ > MAX_QUERY_PLAN_AND_ADDRESS_RETRY_COUNT) {
140+
logger
141+
.warn(
142+
"shouldRetryQueryPlanAndAddress() No more retrying on endpoint {}, operationType = {}, count = {}, " +
143+
"isAddressRefresh = {}",
144+
this.locationEndpoint, this.request.getOperationType(), this.queryPlanAddressRefreshCount, this.request.isAddressRefresh());
145+
return Mono.just(ShouldRetryResult.noRetry());
146+
}
147+
148+
logger
149+
.warn("shouldRetryQueryPlanAndAddress() Retrying on endpoint {}, operationType = {}, count = {}, " +
150+
"isAddressRefresh = {}",
151+
this.locationEndpoint, this.request.getOperationType(), this.queryPlanAddressRefreshCount, this.request.isAddressRefresh());
152+
153+
Duration retryDelay = Duration.ZERO;
154+
return Mono.just(ShouldRetryResult.retryAfter(retryDelay));
155+
}
156+
124157
private ShouldRetryResult shouldRetryOnSessionNotAvailable() {
125158
this.sessionTokenRetryCount++;
126159

@@ -234,6 +267,7 @@ private Mono<ShouldRetryResult> shouldRetryOnBackendServiceUnavailableAsync(bool
234267

235268
@Override
236269
public void onBeforeSendRequest(RxDocumentServiceRequest request) {
270+
this.request = request;
237271
this.isReadRequest = request.isReadOnlyRequest();
238272
this.canUseMultipleWriteLocations = this.globalEndpointManager.canUseMultipleWriteLocations(request);
239273
if (request.requestContext != null) {

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ public class Configs {
3838
private static final String MAX_HTTP_CHUNK_SIZE_IN_BYTES = "COSMOS.MAX_HTTP_CHUNK_SIZE_IN_BYTES";
3939
private static final String MAX_HTTP_HEADER_SIZE_IN_BYTES = "COSMOS.MAX_HTTP_HEADER_SIZE_IN_BYTES";
4040
private static final String MAX_DIRECT_HTTPS_POOL_SIZE = "COSMOS.MAX_DIRECT_HTTP_CONNECTION_LIMIT";
41+
private static final String HTTP_RESPONSE_TIMEOUT_IN_SECONDS = "COSMOS.HTTP_RESPONSE_TIMEOUT_IN_SECONDS";
42+
private static final String QUERY_PLAN_RESPONSE_TIMEOUT_IN_SECONDS = "COSMOS.QUERY_PLAN_RESPONSE_TIMEOUT_IN_SECONDS";
43+
private static final String ADDRESS_REFRESH_RESPONSE_TIMEOUT_IN_SECONDS = "COSMOS.ADDRESS_REFRESH_RESPONSE_TIMEOUT_IN_SECONDS";
4144

4245
private static final int DEFAULT_UNAVAILABLE_LOCATIONS_EXPIRATION_TIME_IN_SECONDS = 5 * 60;
4346

@@ -65,6 +68,9 @@ public class Configs {
6568
private static final Duration CONNECTION_ACQUIRE_TIMEOUT = Duration.ofSeconds(45);
6669
private static final int REACTOR_NETTY_MAX_CONNECTION_POOL_SIZE = 1000;
6770
private static final String REACTOR_NETTY_CONNECTION_POOL_NAME = "reactor-netty-connection-pool";
71+
private static final int DEFAULT_HTTP_RESPONSE_TIMEOUT_IN_SECONDS = 60;
72+
private static final int DEFAULT_QUERY_PLAN_RESPONSE_TIMEOUT_IN_SECONDS = 5;
73+
private static final int DEFAULT_ADDRESS_REFRESH_RESPONSE_TIMEOUT_IN_SECONDS = 5;
6874

6975
public Configs() {
7076
this.sslContext = sslContextInit();
@@ -176,6 +182,18 @@ public int getReactorNettyMaxConnectionPoolSize() {
176182
return REACTOR_NETTY_MAX_CONNECTION_POOL_SIZE;
177183
}
178184

185+
public static int getHttpResponseTimeoutInSeconds() {
186+
return getJVMConfigAsInt(HTTP_RESPONSE_TIMEOUT_IN_SECONDS, DEFAULT_HTTP_RESPONSE_TIMEOUT_IN_SECONDS);
187+
}
188+
189+
public static int getQueryPlanResponseTimeoutInSeconds() {
190+
return getJVMConfigAsInt(QUERY_PLAN_RESPONSE_TIMEOUT_IN_SECONDS, DEFAULT_QUERY_PLAN_RESPONSE_TIMEOUT_IN_SECONDS);
191+
}
192+
193+
public static int getAddressRefreshResponseTimeoutInSeconds() {
194+
return getJVMConfigAsInt(ADDRESS_REFRESH_RESPONSE_TIMEOUT_IN_SECONDS, DEFAULT_ADDRESS_REFRESH_RESPONSE_TIMEOUT_IN_SECONDS);
195+
}
196+
179197
private static int getJVMConfigAsInt(String propName, int defaultValue) {
180198
String propValue = System.getProperty(propName);
181199
return getIntValue(propValue, defaultValue);

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,9 @@ public static class SubStatusCodes {
310310

311311
// Client generated gateway network error substatus
312312
public static final int GATEWAY_ENDPOINT_UNAVAILABLE = 10001;
313+
314+
// Client generated gateway network error on ReadTimeoutException
315+
public static final int GATEWAY_ENDPOINT_READ_TIMEOUT = 10002;
313316
}
314317

315318
public static class HeaderValues {

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RequestTimeline.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package com.azure.cosmos.implementation;
55

66
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdObjectMapper;
7+
import com.azure.cosmos.implementation.http.HttpRequest;
78
import com.fasterxml.jackson.annotation.JsonIgnore;
89
import com.fasterxml.jackson.annotation.JsonProperty;
910
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
@@ -24,7 +25,7 @@
2425
* duration properties. Hence, one might use this class to represent any timeline. Today we use it to represent
2526
* request timelines for:
2627
* <p><ul>
27-
* <li>{@link com.azure.cosmos.implementation.http.HttpClient#send},
28+
* <li>{@link com.azure.cosmos.implementation.http.HttpClient#send(HttpRequest, Duration)},
2829
* <li>{@link com.azure.cosmos.implementation.directconnectivity.HttpTransportClient#invokeStoreAsync}, and
2930
* <li>{@link com.azure.cosmos.implementation.directconnectivity.RntbdTransportClient#invokeStoreAsync}.
3031
* </ul></p>

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ public class RxDocumentServiceRequest implements Cloneable {
4545
private volatile PartitionKeyRangeIdentity partitionKeyRangeIdentity;
4646
private volatile Integer defaultReplicaIndex;
4747

48+
private boolean isAddressRefresh;
49+
4850
public DocumentServiceRequestContext requestContext;
4951

5052
// has the non serialized value of the partition-key
@@ -956,6 +958,14 @@ public synchronized Flux<ByteBuf> getContentAsByteBufFlux() {
956958
return Flux.just(Unpooled.wrappedBuffer(contentAsByteArray));
957959
}
958960

961+
public synchronized Flux<byte[]> getContentAsByteArrayFlux() {
962+
if (contentAsByteArray == null) {
963+
return Flux.empty();
964+
}
965+
966+
return Flux.just(contentAsByteArray);
967+
}
968+
959969
public int getContentLength() {
960970
return contentAsByteArray != null ? contentAsByteArray.length : 0;
961971
}
@@ -1023,4 +1033,22 @@ public static byte[] toByteArray(ByteBuffer byteBuffer) {
10231033
private static ByteBuffer wrapByteBuffer(byte[] bytes) {
10241034
return bytes != null ? ByteBuffer.wrap(bytes) : null;
10251035
}
1036+
1037+
/**
1038+
* Getter for property 'addressRefresh'.
1039+
*
1040+
* @return Value for property 'addressRefresh'.
1041+
*/
1042+
public boolean isAddressRefresh() {
1043+
return isAddressRefresh;
1044+
}
1045+
1046+
/**
1047+
* Setter for property 'addressRefresh'.
1048+
*
1049+
* @param addressRefresh Value to set for property 'addressRefresh'.
1050+
*/
1051+
public void setAddressRefresh(final boolean addressRefresh) {
1052+
isAddressRefresh = addressRefresh;
1053+
}
10261054
}

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import com.azure.cosmos.implementation.http.HttpRequest;
1616
import com.azure.cosmos.implementation.http.HttpResponse;
1717
import com.azure.cosmos.implementation.http.ReactorNettyRequestRecord;
18-
import io.netty.buffer.ByteBuf;
1918
import io.netty.handler.codec.http.HttpMethod;
2019
import io.netty.handler.codec.http.HttpResponseStatus;
2120
import org.slf4j.Logger;
@@ -25,6 +24,7 @@
2524

2625
import java.net.URI;
2726
import java.net.URISyntaxException;
27+
import java.time.Duration;
2828
import java.time.Instant;
2929
import java.util.HashMap;
3030
import java.util.Map;
@@ -147,17 +147,22 @@ public Mono<RxDocumentServiceResponse> performRequest(RxDocumentServiceRequest r
147147

148148
HttpHeaders httpHeaders = this.getHttpRequestHeaders(request.getHeaders());
149149

150-
// The RxDocumentServiceRequest::getContentAsByteBufFlux guaranteed to return
151-
// a valid flux (including Flux.empty) hence null check is not required here.
152-
Flux<ByteBuf> byteBufObservable = request.getContentAsByteBufFlux();
150+
Flux<byte[]> contentAsByteArray = request.getContentAsByteArrayFlux();
153151

154152
HttpRequest httpRequest = new HttpRequest(method,
155153
uri,
156154
uri.getPort(),
157155
httpHeaders,
158-
byteBufObservable);
156+
contentAsByteArray);
159157

160-
Mono<HttpResponse> httpResponseMono = this.httpClient.send(httpRequest);
158+
Duration responseTimeout = Duration.ofSeconds(Configs.getHttpResponseTimeoutInSeconds());
159+
if (OperationType.QueryPlan.equals(request.getOperationType())) {
160+
responseTimeout = Duration.ofSeconds(Configs.getQueryPlanResponseTimeoutInSeconds());
161+
} else if (request.isAddressRefresh()) {
162+
responseTimeout = Duration.ofSeconds(Configs.getAddressRefreshResponseTimeoutInSeconds());
163+
}
164+
165+
Mono<HttpResponse> httpResponseMono = this.httpClient.send(httpRequest, responseTimeout);
161166

162167
return toDocumentServiceResponse(httpResponseMono, request);
163168

@@ -254,7 +259,7 @@ private Mono<RxDocumentServiceResponse> toDocumentServiceResponse(Mono<HttpRespo
254259
return contentObservable
255260
.map(content -> {
256261
//Adding transport client request timeline to diagnostics
257-
ReactorNettyRequestRecord reactorNettyRequestRecord = httpResponse.request().getReactorNettyRequestRecord();
262+
ReactorNettyRequestRecord reactorNettyRequestRecord = httpResponse.request().reactorNettyRequestRecord();
258263
if (reactorNettyRequestRecord != null) {
259264
reactorNettyRequestRecord.setTimeCompleted(Instant.now());
260265
BridgeInternal.setTransportClientRequestTimelineOnDiagnostics(request.requestContext.cosmosDiagnostics,
@@ -299,7 +304,11 @@ private Mono<RxDocumentServiceResponse> toDocumentServiceResponse(Mono<HttpRespo
299304
}
300305

301306
if (WebExceptionUtility.isNetworkFailure(dce)) {
302-
BridgeInternal.setSubStatusCode(dce, HttpConstants.SubStatusCodes.GATEWAY_ENDPOINT_UNAVAILABLE);
307+
if (WebExceptionUtility.isReadTimeoutException(dce)) {
308+
BridgeInternal.setSubStatusCode(dce, HttpConstants.SubStatusCodes.GATEWAY_ENDPOINT_READ_TIMEOUT);
309+
} else {
310+
BridgeInternal.setSubStatusCode(dce, HttpConstants.SubStatusCodes.GATEWAY_ENDPOINT_UNAVAILABLE);
311+
}
303312
}
304313

305314
if (request.requestContext.cosmosDiagnostics != null) {

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GatewayAddressCache.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.azure.cosmos.BridgeInternal;
77
import com.azure.cosmos.CosmosException;
88
import com.azure.cosmos.implementation.AuthorizationTokenType;
9+
import com.azure.cosmos.implementation.Configs;
910
import com.azure.cosmos.implementation.Constants;
1011
import com.azure.cosmos.implementation.DocumentCollection;
1112
import com.azure.cosmos.implementation.Exceptions;
@@ -36,6 +37,7 @@
3637
import com.azure.cosmos.implementation.http.HttpResponse;
3738
import com.azure.cosmos.implementation.routing.PartitionKeyRangeIdentity;
3839
import io.netty.handler.codec.http.HttpMethod;
40+
import io.netty.handler.timeout.ReadTimeoutException;
3941
import org.slf4j.Logger;
4042
import org.slf4j.LoggerFactory;
4143
import reactor.core.publisher.Flux;
@@ -248,6 +250,7 @@ public Mono<List<Address>> getServerAddressesViaGatewayAsync(
248250
logger.debug("getServerAddressesViaGatewayAsync collectionRid {}, partitionKeyRangeIds {}", collectionRid,
249251
JavaStreamUtils.toString(partitionKeyRangeIds, ","));
250252
}
253+
request.setAddressRefresh(true);
251254
String entryUrl = PathsHelper.generatePath(ResourceType.Document, collectionRid, true);
252255
HashMap<String, String> addressQuery = new HashMap<>();
253256

@@ -301,7 +304,8 @@ public Mono<List<Address>> getServerAddressesViaGatewayAsync(
301304
Instant addressCallStartTime = Instant.now();
302305
HttpRequest httpRequest = new HttpRequest(HttpMethod.GET, targetEndpoint, targetEndpoint.getPort(), httpHeaders);
303306

304-
Mono<HttpResponse> httpResponseMono = this.httpClient.send(httpRequest);
307+
Mono<HttpResponse> httpResponseMono = this.httpClient.send(httpRequest,
308+
Duration.ofSeconds(Configs.getAddressRefreshResponseTimeoutInSeconds()));
305309

306310
Mono<RxDocumentServiceResponse> dsrObs = HttpClientUtils.parseResponseAsync(httpResponseMono, httpRequest);
307311
return dsrObs.map(
@@ -342,7 +346,11 @@ public Mono<List<Address>> getServerAddressesViaGatewayAsync(
342346
}
343347

344348
if (WebExceptionUtility.isNetworkFailure(dce)) {
345-
BridgeInternal.setSubStatusCode(dce, HttpConstants.SubStatusCodes.GATEWAY_ENDPOINT_UNAVAILABLE);
349+
if (WebExceptionUtility.isReadTimeoutException(dce)) {
350+
BridgeInternal.setSubStatusCode(dce, HttpConstants.SubStatusCodes.GATEWAY_ENDPOINT_READ_TIMEOUT);
351+
} else {
352+
BridgeInternal.setSubStatusCode(dce, HttpConstants.SubStatusCodes.GATEWAY_ENDPOINT_UNAVAILABLE);
353+
}
346354
}
347355

348356
if (request.requestContext.cosmosDiagnostics != null) {
@@ -483,6 +491,7 @@ public Mono<List<Address>> getMasterAddressesViaGatewayAsync(
483491
forceRefresh,
484492
useMasterCollectionResolver
485493
);
494+
request.setAddressRefresh(true);
486495
HashMap<String, String> queryParameters = new HashMap<>();
487496
queryParameters.put(HttpConstants.QueryStrings.URL, HttpUtils.urlEncode(entryUrl));
488497
HashMap<String, String> headers = new HashMap<>(defaultRequestHeaders);
@@ -521,7 +530,8 @@ public Mono<List<Address>> getMasterAddressesViaGatewayAsync(
521530
HttpRequest httpRequest;
522531
httpRequest = new HttpRequest(HttpMethod.GET, targetEndpoint, targetEndpoint.getPort(), defaultHttpHeaders);
523532
Instant addressCallStartTime = Instant.now();
524-
Mono<HttpResponse> httpResponseMono = this.httpClient.send(httpRequest);
533+
Mono<HttpResponse> httpResponseMono = this.httpClient.send(httpRequest,
534+
Duration.ofSeconds(Configs.getAddressRefreshResponseTimeoutInSeconds()));
525535
Mono<RxDocumentServiceResponse> dsrObs = HttpClientUtils.parseResponseAsync(httpResponseMono, httpRequest);
526536

527537
return dsrObs.map(
@@ -559,7 +569,11 @@ public Mono<List<Address>> getMasterAddressesViaGatewayAsync(
559569
}
560570

561571
if (WebExceptionUtility.isNetworkFailure(dce)) {
562-
BridgeInternal.setSubStatusCode(dce, HttpConstants.SubStatusCodes.GATEWAY_ENDPOINT_UNAVAILABLE);
572+
if (WebExceptionUtility.isReadTimeoutException(dce)) {
573+
BridgeInternal.setSubStatusCode(dce, HttpConstants.SubStatusCodes.GATEWAY_ENDPOINT_READ_TIMEOUT);
574+
} else {
575+
BridgeInternal.setSubStatusCode(dce, HttpConstants.SubStatusCodes.GATEWAY_ENDPOINT_UNAVAILABLE);
576+
}
563577
}
564578

565579
if (request.requestContext.cosmosDiagnostics != null) {

0 commit comments

Comments
 (0)