Skip to content

Commit 7bf220a

Browse files
xinlian12annie-mac
andauthored
SwitchOffIOThread (Azure#23822)
* Swith off IO thread for response processing Co-authored-by: annie-mac <annie-mac@DESKTOP-62QAK4R.redmond.corp.microsoft.com>
1 parent d362d2f commit 7bf220a

File tree

3 files changed

+41
-4
lines changed

3 files changed

+41
-4
lines changed

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,10 @@ public class Configs {
9191
"COSMOS.DEFAULT_SESSION_TOKEN_MISMATCH_MAXIMUM_BACKOFF_TIME_IN_MILLISECONDS";
9292
private static final int DEFAULT_SESSION_TOKEN_MISMATCH_MAXIMUM_BACKOFF_TIME_IN_MILLISECONDS = 50;
9393

94+
// Whether to process the response on a different thread
95+
private static final String DEFAULT_SWITCH_OFF_IO_THREAD_FOR_RESPONSE_NAME = "COSMOS.SWITCH_OFF_IO_THREAD_FOR_RESPONSE";
96+
private static final boolean DEFAULT_SWITCH_OFF_IO_THREAD_FOR_RESPONSE = false;
97+
9498
public Configs() {
9599
this.sslContext = sslContextInit();
96100
}
@@ -252,6 +256,12 @@ public static int getSessionTokenMismatchMaximumBackoffTimeInMs() {
252256
DEFAULT_SESSION_TOKEN_MISMATCH_MAXIMUM_BACKOFF_TIME_IN_MILLISECONDS);
253257
}
254258

259+
public static boolean shouldSwitchOffIOThreadForResponse() {
260+
return getJVMConfigAsBoolean(
261+
DEFAULT_SWITCH_OFF_IO_THREAD_FOR_RESPONSE_NAME,
262+
DEFAULT_SWITCH_OFF_IO_THREAD_FOR_RESPONSE);
263+
}
264+
255265
private static int getJVMConfigAsInt(String propName, int defaultValue) {
256266
String propValue = System.getProperty(propName);
257267
return getIntValue(propValue, defaultValue);

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosSchedulers.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
public class CosmosSchedulers {
1010
private final static String COSMOS_PARALLEL_THREAD_NAME = "cosmos-parallel";
11+
private final static String TRANSPORT_RESPONSE_BOUNDED_ELASTIC_THREAD_NAME = "transport-response-bounded-elastic";
12+
private final static int TTL_FOR_SCHEDULER_WORKER_IN_SECONDS = 60; // same as BoundedElasticScheduler.DEFAULT_TTL_SECONDS
1113

1214
// Using a custom parallel scheduler to be able to schedule retries etc.
1315
// without being vulnerable to scenarios where applications abuse the
@@ -16,4 +18,13 @@ public class CosmosSchedulers {
1618
COSMOS_PARALLEL_THREAD_NAME,
1719
Schedulers.DEFAULT_POOL_SIZE,
1820
true);
21+
22+
// Custom bounded elastic scheduler to switch off IO thread to process response.
23+
public final static Scheduler TRANSPORT_RESPONSE_BOUNDED_ELASTIC = Schedulers.newBoundedElastic(
24+
Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE,
25+
Schedulers.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
26+
TRANSPORT_RESPONSE_BOUNDED_ELASTIC_THREAD_NAME,
27+
TTL_FOR_SCHEDULER_WORKER_IN_SECONDS,
28+
true
29+
);
1930
}

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/TransportClient.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33

44
package com.azure.cosmos.implementation.directconnectivity;
55

6+
import com.azure.cosmos.implementation.Configs;
7+
import com.azure.cosmos.implementation.CosmosSchedulers;
68
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
79
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
810
import com.azure.cosmos.implementation.throughputControl.ThroughputControlStore;
@@ -15,6 +17,7 @@
1517
// signature for backwards compatibility purposes.
1618
@SuppressWarnings("try")
1719
public abstract class TransportClient implements AutoCloseable {
20+
private final boolean switchOffIOThreadForResponse = Configs.shouldSwitchOffIOThreadForResponse();
1821
private ThroughputControlStore throughputControlStore;
1922

2023
public void enableThroughputControl(ThroughputControlStore throughputControlStore) {
@@ -26,16 +29,29 @@ public Mono<StoreResponse> invokeResourceOperationAsync(Uri physicalAddress, RxD
2629
if (StringUtils.isEmpty(request.requestContext.resourcePhysicalAddress)) {
2730
request.requestContext.resourcePhysicalAddress = physicalAddress.toString();
2831
}
32+
2933
if (this.throughputControlStore != null) {
30-
return this.throughputControlStore.processRequest(
31-
request,
32-
Mono.defer(() -> this.invokeStoreAsync(physicalAddress, request)));
34+
return this.invokeStoreWithThroughputControlAsync(physicalAddress, request);
3335
}
3436

35-
return this.invokeStoreAsync(physicalAddress, request);
37+
return this.invokeStoreInternalAsync(physicalAddress, request);
3638
}
3739

3840
protected abstract Mono<StoreResponse> invokeStoreAsync(
3941
Uri physicalAddress,
4042
RxDocumentServiceRequest request);
43+
44+
private Mono<StoreResponse> invokeStoreWithThroughputControlAsync(Uri physicalAddress, RxDocumentServiceRequest request) {
45+
return this.throughputControlStore.processRequest(
46+
request,
47+
Mono.defer(() -> this.invokeStoreInternalAsync(physicalAddress, request)));
48+
}
49+
50+
private Mono<StoreResponse> invokeStoreInternalAsync(Uri physicalAddress, RxDocumentServiceRequest request) {
51+
if (switchOffIOThreadForResponse) {
52+
return this.invokeStoreAsync(physicalAddress, request).publishOn(CosmosSchedulers.TRANSPORT_RESPONSE_BOUNDED_ELASTIC);
53+
}
54+
55+
return this.invokeStoreAsync(physicalAddress, request);
56+
}
4157
}

0 commit comments

Comments
 (0)