Skip to content

Commit ad74eee

Browse files
Perf-improvement avoiding extra-buffer copy for query and point operations (Azure#38072)
* Perf-improvement avoiding extra-buffer copy for query and point operations * Fixing JDK8 build errors * Update StoreResponseBuilder.java * Fixing test issues * Update BatchResponseParser.java * Update CosmosItemResponse.java * Update ResponseUtils.java * Fixing test issues * Test changes * Fixing test failures * Fixing encryption test failures * Update UtilsTest.java * Update ClientTelemetryTest.java * Fixing some encryption relates test failures * Update CosmosItemTest.java * Update CosmosItemResponse.java * Update CosmosItemResponse.java * Avoided double-deserialization in Gateway mode * Removing unnecessary imports * Update RxGatewayStoreModel.java * Addressing code review feedback
1 parent 40ab17a commit ad74eee

File tree

80 files changed

+663
-441
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

80 files changed

+663
-441
lines changed

sdk/cosmos/azure-cosmos-encryption/src/main/java/com/azure/cosmos/encryption/CosmosEncryptionAsyncContainer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.azure.cosmos.implementation.HttpConstants;
2020
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
2121
import com.azure.cosmos.implementation.ItemDeserializer;
22+
import com.azure.cosmos.implementation.apachecommons.lang.tuple.Pair;
2223
import com.azure.cosmos.implementation.batch.ItemBatchOperation;
2324
import com.azure.cosmos.implementation.batch.ItemBulkOperation;
2425
import com.azure.cosmos.implementation.guava25.base.Preconditions;
@@ -782,7 +783,7 @@ Mono<JsonNode> decryptResponseNode(
782783
}
783784

784785
private Mono<CosmosItemResponse<byte[]>> setByteArrayContent(CosmosItemResponse<byte[]> rsp,
785-
Mono<byte[]> bytesMono) {
786+
Mono<Pair<byte[], JsonNode>> bytesMono) {
786787
return bytesMono.flatMap(
787788
bytes -> {
788789
cosmosItemResponseBuilderAccessor.setByteArrayContent(rsp, bytes);

sdk/cosmos/azure-cosmos-encryption/src/main/java/com/azure/cosmos/encryption/implementation/CosmosResponseFactory.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,7 @@ public CosmosResponseFactory() {
1919
public <T> CosmosItemResponse<T> createItemResponse(CosmosItemResponse<byte[]> responseMessage,
2020
Class<T> classType) {
2121
return cosmosItemResponseBuilderAccessor.createCosmosItemResponse(
22-
cosmosItemResponseBuilderAccessor.getResourceResponse(responseMessage),
23-
cosmosItemResponseBuilderAccessor.getByteArrayContent(responseMessage),
22+
responseMessage,
2423
classType,
2524
new ItemDeserializer.JsonDeserializer());
2625
}

sdk/cosmos/azure-cosmos-encryption/src/main/java/com/azure/cosmos/encryption/implementation/EncryptionProcessor.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -497,7 +497,7 @@ public byte[] encryptAndSerializeValue(EncryptionSettings encryptionSettings, Ob
497497
return cipherTextWithTypeMarker;
498498
}
499499

500-
public Mono<byte[]> decrypt(byte[] input) {
500+
public Mono<Pair<byte[], JsonNode>> decrypt(byte[] input) {
501501
if (LOGGER.isDebugEnabled()) {
502502
LOGGER.debug("Encrypting byte[] of size [{}] on thread [{}]",
503503
input == null ? null : input.length,
@@ -512,8 +512,10 @@ public Mono<byte[]> decrypt(byte[] input) {
512512
return decrypt(itemJObj);
513513
}
514514

515-
public Mono<byte[]> decrypt(JsonNode itemJObj) {
516-
return decryptJsonNode(itemJObj).map(decryptedObjectNode -> EncryptionUtils.serializeJsonToByteArray(EncryptionUtils.getSimpleObjectMapper(), decryptedObjectNode));
515+
public Mono<Pair<byte[], JsonNode>> decrypt(JsonNode itemJObj) {
516+
return decryptJsonNode(itemJObj).map(decryptedObjectNode -> Pair.of(
517+
EncryptionUtils.serializeJsonToByteArray(EncryptionUtils.getSimpleObjectMapper(), decryptedObjectNode),
518+
decryptedObjectNode));
517519
}
518520

519521
public Mono<JsonNode> decryptJsonNode(JsonNode itemJObj) {

sdk/cosmos/azure-cosmos-encryption/src/test/java/com/azure/cosmos/encryption/implementation/EncryptionProcessorAndSettingsTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ public void encryptEmptyArray() throws Exception {
267267
encryptionProcessor.setClientEncryptionPolicy(new ClientEncryptionPolicy(paths));
268268

269269
JsonNode document = OBJECT_MAPPER.readValue(json, ObjectNode.class);
270-
String output = new String(encryptionProcessor.decrypt(encryptionProcessor.encryptObjectNode(document).block()).block());
270+
String output = new String(encryptionProcessor.decrypt(encryptionProcessor.encryptObjectNode(document).block()).block().getLeft());
271271
assertThat(output).isEqualTo(json);
272272
}
273273
}

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/ClientTelemetryTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ public void operationsList(CosmosClient cosmosClient) throws Exception {
175175

176176
// Verifying above query operation, we should have 4 operation (1 latency, 1 request charge -
177177
// for both query plan and the actual feed response)
178-
assertThat(clientTelemetry.getClientTelemetryInfo().getOperationInfoMap().size()).isEqualTo(4);
178+
assertThat(clientTelemetry.getClientTelemetryInfo().getOperationInfoMap().size()).isGreaterThanOrEqualTo(4);
179179
}
180180

181181
@Test(groups = {"emulator"}, timeOut = TIMEOUT)

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/ConflictTests.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,26 +5,28 @@
55

66
import com.azure.cosmos.implementation.Conflict;
77
import com.azure.cosmos.implementation.Document;
8+
import com.azure.cosmos.implementation.Utils;
89
import com.azure.cosmos.models.ConflictResolutionPolicy;
910
import com.fasterxml.jackson.databind.JsonNode;
1011
import com.fasterxml.jackson.databind.ObjectMapper;
1112
import org.apache.commons.io.IOUtils;
1213
import org.testng.annotations.BeforeClass;
1314
import org.testng.annotations.Test;
1415

16+
import java.nio.charset.StandardCharsets;
1517
import java.util.ArrayList;
1618
import java.util.List;
1719

1820
import static org.assertj.core.api.Assertions.assertThat;
1921
import static org.assertj.core.api.Assertions.fail;
2022

2123
public class ConflictTests {
22-
private List<String> confList = new ArrayList<String>();
24+
private final List<String> confList = new ArrayList<>();
2325

2426
@BeforeClass(groups = { "unit" })
2527
public void before_ConflictTests() throws Exception {
2628
String conflictAsString = IOUtils.toString(
27-
getClass().getClassLoader().getResourceAsStream("sampleConflict.json"), "UTF-8");
29+
getClass().getClassLoader().getResourceAsStream("sampleConflict.json"), StandardCharsets.UTF_8);
2830
ObjectMapper mapper = new ObjectMapper();
2931
JsonNode valuesNode = mapper.readTree(conflictAsString).get("conflictList");
3032
for (JsonNode node : valuesNode) {
@@ -34,35 +36,35 @@ public void before_ConflictTests() throws Exception {
3436

3537
@Test(groups = { "unit" })
3638
public void getSourceResourceId() {
37-
Conflict conf = new Conflict(confList.get(0));
39+
Conflict conf = new Conflict(Utils.parseJson(confList.get(0)));
3840
assertThat(conf.getSourceResourceId()).isEqualTo("k6d9ALgBmD+ChB4AAAAAAA==");
3941
}
4042

4143
@Test(groups = { "unit" })
4244
public void getOperationKind() {
43-
Conflict conf = new Conflict(confList.get(0));
45+
Conflict conf = new Conflict(Utils.parseJson(confList.get(0)));
4446
assertThat(conf.getOperationKind().toString()).isEqualTo("create");
45-
conf = new Conflict(confList.get(1));
47+
conf = new Conflict(Utils.parseJson(confList.get(1)));
4648
assertThat(conf.getOperationKind().toString()).isEqualTo("update");
47-
conf = new Conflict(confList.get(2));
49+
conf = new Conflict(Utils.parseJson(confList.get(2)));
4850
assertThat(conf.getOperationKind().toString()).isEqualTo("delete");
49-
conf = new Conflict(confList.get(3));
51+
conf = new Conflict(Utils.parseJson(confList.get(3)));
5052
assertThat(conf.getOperationKind().toString()).isEqualTo("replace");
51-
conf = new Conflict(confList.get(4));
53+
conf = new Conflict(Utils.parseJson(confList.get(4)));
5254
assertThat(conf.getOperationKind().toString()).isEqualTo("unknown");
5355
conf.getSourceResourceId();
5456
}
5557

5658
@Test(groups = { "unit" })
5759
public void getResourceType() {
58-
Conflict conf = new Conflict(confList.get(0));
60+
Conflict conf = new Conflict(Utils.parseJson(confList.get(0)));
5961
assertThat(conf.getResourceType()).isEqualTo("document");
6062
conf.getSourceResourceId();
6163
}
6264

6365
@Test(groups = { "unit" })
6466
public void getResource() {
65-
Conflict conf = new Conflict(confList.get(0));
67+
Conflict conf = new Conflict(Utils.parseJson(confList.get(0)));
6668
Document doc = conf.getResource(Document.class);
6769
assertThat(doc.getId()).isEqualTo("0007312a-a1c5-4b54-9e39-35de2367fa33");
6870
assertThat(doc.getInt("regionId")).isEqualTo(2);

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosItemTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -484,6 +484,7 @@ public void readManyOptimizationRequestChargeComparisonForSingleTupleWithSmallSi
484484

485485
@Test(groups = { "fast" }, timeOut = TIMEOUT)
486486
public void queryItemWithDuplicateJsonProperties() throws Exception {
487+
Utils.configureSimpleObjectMapper(true);
487488
String id = UUID.randomUUID().toString();
488489
String rawJson = String.format(
489490
"{ "
@@ -500,7 +501,6 @@ public void queryItemWithDuplicateJsonProperties() throws Exception {
500501
new PartitionKey(id),
501502
new CosmosItemRequestOptions());
502503

503-
Utils.configureSimpleObjectMapper(true);
504504
try {
505505
CosmosPagedIterable<ObjectNode> pagedIterable = container.queryItems (
506506
"SELECT * FROM c WHERE c.id = '" + id + "'",

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/CosmosItemWriteRetriesTest.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -722,9 +722,13 @@ private void executeAndValidate(
722722
}
723723
}
724724
if (contentResponseOnWriteEnabled) {
725-
assertThat(response.getItem().get("id").asText()).isEqualTo(expectedId);
726-
assertThat(response.getItem().get("mypk").asText()).isEqualTo(expectedId);
727-
assertThat(response.getItem()).isNotNull();
725+
if (response.getItem() == null) {
726+
assertThat(expectedId).isNull();
727+
} else {
728+
assertThat(response.getItem().get("id").asText()).isEqualTo(expectedId);
729+
assertThat(response.getItem().get("mypk").asText()).isEqualTo(expectedId);
730+
assertThat(response.getItem()).isNotNull();
731+
}
728732
} else {
729733
assertThat(response.getItem()).isNull();
730734
}

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/PermissionTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Licensed under the MIT License.
33
package com.azure.cosmos;
44

5+
import com.azure.cosmos.implementation.Utils;
56
import com.azure.cosmos.models.CosmosPermissionProperties;
67
import com.azure.cosmos.models.ModelBridgeInternal;
78
import com.azure.cosmos.models.PartitionKey;
@@ -21,7 +22,7 @@ public void deserializeToPermission() {
2122
" 'resource': 'dbs/AQAAAA==/colls/AQAAAJ0fgTc='," +
2223
" 'resourcePartitionKey': ['/id']" +
2324
"}";
24-
Permission p = new Permission(json);
25+
Permission p = new Permission(Utils.parseJson(json));
2526
assertThat(p.getResourcePartitionKey()).isEqualToComparingFieldByField(new PartitionKey("/id"));
2627
assertThat(p.getPermissionMode()).isEqualTo(PermissionMode.READ);
2728
}

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/RetryContextOnDiagnosticTest.java

Lines changed: 12 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import com.azure.cosmos.implementation.ShouldRetryResult;
2626
import com.azure.cosmos.implementation.StoreResponseBuilder;
2727
import com.azure.cosmos.implementation.TestConfigurations;
28-
import com.azure.cosmos.implementation.Utils;
2928
import com.azure.cosmos.implementation.directconnectivity.AddressSelector;
3029
import com.azure.cosmos.implementation.directconnectivity.ConsistencyReader;
3130
import com.azure.cosmos.implementation.directconnectivity.ConsistencyWriter;
@@ -50,18 +49,17 @@
5049
import com.fasterxml.jackson.databind.ObjectMapper;
5150
import io.netty.buffer.ByteBuf;
5251
import io.netty.buffer.ByteBufAllocator;
52+
import io.netty.buffer.ByteBufInputStream;
5353
import io.netty.buffer.ByteBufUtil;
5454
import io.netty.handler.codec.http.HttpMethod;
5555
import io.reactivex.subscribers.TestSubscriber;
5656
import org.mockito.ArgumentMatchers;
5757
import org.mockito.Mockito;
5858
import org.testng.annotations.Test;
59-
import reactor.core.publisher.Flux;
6059
import reactor.core.publisher.Mono;
6160

6261
import java.lang.reflect.Field;
6362
import java.net.URISyntaxException;
64-
import java.nio.charset.Charset;
6563
import java.time.Duration;
6664
import java.util.HashMap;
6765
import java.util.Iterator;
@@ -96,12 +94,15 @@ public void backoffRetryUtilityExecuteRetry() throws Exception {
9694
retryPolicy = new TestRetryPolicy();
9795
addressSelector = Mockito.mock(AddressSelector.class);
9896
CosmosException exception = new CosmosException(410, exceptionText);
97+
String rawJson = "{\"id\":\"" + responseText + "\"}";
98+
ByteBuf buffer = getUTF8BytesOrNull(rawJson);
9999
Mockito.when(callbackMethod.call()).thenThrow(exception, exception, exception, exception, exception)
100-
.thenReturn(Mono.just(new StoreResponse(200, new HashMap<>(), getUTF8BytesOrNull(responseText))));
100+
101+
.thenReturn(Mono.just(new StoreResponse(200, new HashMap<>(), new ByteBufInputStream(buffer, true), buffer.readableBytes())));
101102
Mono<StoreResponse> monoResponse = BackoffRetryUtility.executeRetry(callbackMethod, retryPolicy);
102103
StoreResponse response = validateSuccess(monoResponse);
103104

104-
assertThat(response.getResponseBody()).isEqualTo(getUTF8BytesOrNull(responseText));
105+
assertThat(response.getResponseBodyAsJson().get("id").asText()).isEqualTo(responseText);
105106
assertThat(retryPolicy.getRetryContext().getRetryCount()).isEqualTo(5);
106107
assertThat(retryPolicy.getRetryContext().getStatusAndSubStatusCodes().size()).isEqualTo(retryPolicy.getRetryContext().getRetryCount());
107108
}
@@ -139,8 +140,10 @@ public void backoffRetryUtilityExecuteAsync() {
139140
addressSelector = Mockito.mock(AddressSelector.class);
140141
CosmosException exception = new CosmosException(410, exceptionText);
141142
Mono<StoreResponse> exceptionMono = Mono.error(exception);
143+
String rawJson = "{\"id\":\"" + responseText + "\"}";
144+
ByteBuf buffer = getUTF8BytesOrNull(rawJson);
142145
Mockito.when(parameterizedCallbackMethod.apply(ArgumentMatchers.any())).thenReturn(exceptionMono, exceptionMono, exceptionMono, exceptionMono, exceptionMono)
143-
.thenReturn(Mono.just(new StoreResponse(200, new HashMap<>(), getUTF8BytesOrNull(responseText))));
146+
.thenReturn(Mono.just(new StoreResponse(200, new HashMap<>(), new ByteBufInputStream(buffer, true), buffer.readableBytes())));
144147
Mono<StoreResponse> monoResponse = BackoffRetryUtility.executeAsync(
145148
parameterizedCallbackMethod,
146149
retryPolicy,
@@ -150,7 +153,7 @@ public void backoffRetryUtilityExecuteAsync() {
150153
addressSelector);
151154
StoreResponse response = validateSuccess(monoResponse);
152155

153-
assertThat(response.getResponseBody()).isEqualTo(getUTF8BytesOrNull(responseText));
156+
assertThat(response.getResponseBodyAsJson().get("id").asText()).isEqualTo(responseText);
154157
assertThat(retryPolicy.getRetryContext().getRetryCount()).isEqualTo(5);
155158
assertThat(retryPolicy.getRetryContext().getStatusAndSubStatusCodes().size()).isEqualTo(retryPolicy.getRetryContext().getRetryCount());
156159
}
@@ -938,19 +941,10 @@ public HttpHeaders headers() {
938941
}
939942

940943
@Override
941-
public Flux<ByteBuf> body() {
944+
public Mono<ByteBuf> body() {
942945
try {
943-
return Flux.just(ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT,
946+
return Mono.just(ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT,
944947
OBJECT_MAPPER.writeValueAsString(getTestPojoObject())));
945-
} catch (JsonProcessingException e) {
946-
return Flux.error(e);
947-
}
948-
}
949-
950-
@Override
951-
public Mono<byte[]> bodyAsByteArray() {
952-
try {
953-
return Mono.just(Utils.getUTF8Bytes(OBJECT_MAPPER.writeValueAsString(getTestPojoObject())));
954948
} catch (JsonProcessingException e) {
955949
return Mono.error(e);
956950
}
@@ -964,15 +958,6 @@ public Mono<String> bodyAsString() {
964958
return Mono.error(e);
965959
}
966960
}
967-
968-
@Override
969-
public Mono<String> bodyAsString(Charset charset) {
970-
try {
971-
return Mono.just(OBJECT_MAPPER.writeValueAsString(getTestPojoObject()));
972-
} catch (JsonProcessingException e) {
973-
return Mono.error(e);
974-
}
975-
}
976961
};
977962

978963
try {

0 commit comments

Comments
 (0)