Skip to content

Commit df200a0

Browse files
xinlian12annie-mac
andauthored
addSupportForReadFeedInFaultInjection (Azure#37108)
* add support for readFeed * fix tests --------- Co-authored-by: annie-mac <xinlian@microsoft.com>
1 parent 079c54b commit df200a0

File tree

4 files changed

+76
-65
lines changed

4 files changed

+76
-65
lines changed

sdk/cosmos/azure-cosmos-test/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
### 1.0.0-beta.6 (Unreleased)
44

55
#### Features Added
6+
* Added support for `ReadFeed` operation type - See [PR 37108](https://github.com/Azure/azure-sdk-for-java/pull/37108)
67

78
#### Breaking Changes
89

sdk/cosmos/azure-cosmos-test/src/main/java/com/azure/cosmos/test/faultinjection/FaultInjectionOperationType.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,5 +58,9 @@ public enum FaultInjectionOperationType {
5858
/**
5959
* Address refresh request.
6060
*/
61-
METADATA_REQUEST_ADDRESS_REFRESH;
61+
METADATA_REQUEST_ADDRESS_REFRESH,
62+
/**
63+
* Read change feed items
64+
*/
65+
READ_FEED_ITEM
6266
}

sdk/cosmos/azure-cosmos-test/src/main/java/com/azure/cosmos/test/implementation/faultinjection/FaultInjectionRuleProcessor.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,8 @@ private OperationType getEffectiveOperationType(FaultInjectionOperationType faul
329329
return OperationType.Create;
330330
case QUERY_ITEM:
331331
return OperationType.Query;
332+
case READ_FEED_ITEM:
333+
return OperationType.ReadFeed;
332334
case UPSERT_ITEM:
333335
return OperationType.Upsert;
334336
case REPLACE_ITEM:
@@ -357,6 +359,7 @@ private ResourceType getEffectiveResourceType(FaultInjectionOperationType faultI
357359
case READ_ITEM:
358360
case CREATE_ITEM:
359361
case QUERY_ITEM:
362+
case READ_FEED_ITEM:
360363
case UPSERT_ITEM:
361364
case REPLACE_ITEM:
362365
case DELETE_ITEM:

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/FaultInjectionServerErrorRuleOnDirectTests.java

Lines changed: 67 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.azure.cosmos.implementation.TestConfigurations;
2222
import com.azure.cosmos.implementation.Utils;
2323
import com.azure.cosmos.implementation.throughputControl.TestItem;
24+
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
2425
import com.azure.cosmos.models.CosmosItemResponse;
2526
import com.azure.cosmos.models.CosmosPatchOperations;
2627
import com.azure.cosmos.models.CosmosQueryRequestOptions;
@@ -57,7 +58,6 @@
5758
import java.util.stream.Collectors;
5859

5960
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
60-
import static org.testng.AssertJUnit.fail;
6161

6262
public class FaultInjectionServerErrorRuleOnDirectTests extends TestSuiteBase {
6363
private static final int TIMEOUT = 60000;
@@ -66,7 +66,7 @@ public class FaultInjectionServerErrorRuleOnDirectTests extends TestSuiteBase {
6666
private static final String FAULT_INJECTION_RULE_NON_APPLICABLE_REGION_ENDPOINT = "RegionEndpoint mismatch";
6767
private static final String FAULT_INJECTION_RULE_NON_APPLICABLE_HIT_LIMIT = "Hit Limit reached";
6868

69-
private CosmosAsyncClient client;
69+
private CosmosAsyncClient clientWithoutPreferredRegions;
7070
private CosmosAsyncContainer cosmosAsyncContainer;
7171
private DatabaseAccount databaseAccount;
7272

@@ -81,13 +81,13 @@ public FaultInjectionServerErrorRuleOnDirectTests(CosmosClientBuilder clientBuil
8181

8282
@BeforeClass(groups = {"multi-region", "long"}, timeOut = TIMEOUT)
8383
public void beforeClass() {
84-
client = getClientBuilder().buildAsyncClient();
85-
AsyncDocumentClient asyncDocumentClient = BridgeInternal.getContextClient(client);
84+
clientWithoutPreferredRegions = getClientBuilder().buildAsyncClient();
85+
AsyncDocumentClient asyncDocumentClient = BridgeInternal.getContextClient(clientWithoutPreferredRegions);
8686
GlobalEndpointManager globalEndpointManager = asyncDocumentClient.getGlobalEndpointManager();
8787

8888
DatabaseAccount databaseAccount = globalEndpointManager.getLatestDatabaseAccount();
8989
this.databaseAccount = databaseAccount;
90-
this.cosmosAsyncContainer = getSharedMultiPartitionCosmosContainerWithIdAsPartitionKey(client);
90+
this.cosmosAsyncContainer = getSharedMultiPartitionCosmosContainerWithIdAsPartitionKey(clientWithoutPreferredRegions);
9191
this.readRegionMap = this.getRegionMap(databaseAccount, false);
9292
this.writeRegionMap = this.getRegionMap(databaseAccount, true);
9393
}
@@ -111,7 +111,7 @@ public static Object[][] faultInjectionOperationTypeProvider() {
111111
{ FaultInjectionOperationType.READ_ITEM, false },
112112
{ FaultInjectionOperationType.REPLACE_ITEM, true },
113113
{ FaultInjectionOperationType.CREATE_ITEM, true },
114-
{ FaultInjectionOperationType.DELETE_ITEM, true},
114+
{ FaultInjectionOperationType.DELETE_ITEM, true },
115115
{ FaultInjectionOperationType.QUERY_ITEM, false },
116116
{ FaultInjectionOperationType.PATCH_ITEM, true }
117117
};
@@ -120,16 +120,33 @@ public static Object[][] faultInjectionOperationTypeProvider() {
120120
@DataProvider(name = "faultInjectionServerErrorResponseProvider")
121121
public static Object[][] faultInjectionServerErrorResponseProvider() {
122122
return new Object[][]{
123-
// faultInjectionServerError, will SDK retry, errorStatusCode, errorSubStatusCode
124-
{ FaultInjectionServerErrorType.GONE, true, 410, HttpConstants.SubStatusCodes.SERVER_GENERATED_410 },
125-
{ FaultInjectionServerErrorType.INTERNAL_SERVER_ERROR, false, 500, 0 },
126-
{ FaultInjectionServerErrorType.RETRY_WITH, true, 449, 0 },
127-
{ FaultInjectionServerErrorType.TOO_MANY_REQUEST, true, 429, 0 },
128-
{ FaultInjectionServerErrorType.READ_SESSION_NOT_AVAILABLE, true, 404, 1002 },
129-
{ FaultInjectionServerErrorType.TIMEOUT, true, 410, HttpConstants.SubStatusCodes.SERVER_GENERATED_408 }, // for server return 408, SDK will wrap into 410/21010
130-
{ FaultInjectionServerErrorType.PARTITION_IS_MIGRATING, true, 410, 1008 },
131-
{ FaultInjectionServerErrorType.PARTITION_IS_SPLITTING, true, 410, 1007 },
132-
{ FaultInjectionServerErrorType.SERVICE_UNAVAILABLE, false, 503, 21008 }
123+
// operationType, faultInjectionOperationType, faultInjectionServerError, will SDK retry within local region, errorStatusCode, errorSubStatusCode
124+
{ OperationType.Read, FaultInjectionOperationType.READ_ITEM, FaultInjectionServerErrorType.GONE, true, 410, HttpConstants.SubStatusCodes.SERVER_GENERATED_410 },
125+
{ OperationType.Read, FaultInjectionOperationType.READ_ITEM, FaultInjectionServerErrorType.INTERNAL_SERVER_ERROR, false, 500, 0 },
126+
{ OperationType.Read, FaultInjectionOperationType.READ_ITEM, FaultInjectionServerErrorType.RETRY_WITH, true, 449, 0 },
127+
{ OperationType.Read, FaultInjectionOperationType.READ_ITEM, FaultInjectionServerErrorType.TOO_MANY_REQUEST, true, 429, 0 },
128+
{ OperationType.Read, FaultInjectionOperationType.READ_ITEM, FaultInjectionServerErrorType.READ_SESSION_NOT_AVAILABLE, true, 404, 1002 },
129+
{ OperationType.Read, FaultInjectionOperationType.READ_ITEM, FaultInjectionServerErrorType.TIMEOUT, true, 410, HttpConstants.SubStatusCodes.SERVER_GENERATED_408 }, // for server return 408, SDK will wrap into 410/21010
130+
{ OperationType.Read, FaultInjectionOperationType.READ_ITEM, FaultInjectionServerErrorType.PARTITION_IS_MIGRATING, true, 410, 1008 },
131+
{ OperationType.Read, FaultInjectionOperationType.READ_ITEM, FaultInjectionServerErrorType.PARTITION_IS_SPLITTING, true, 410, 1007 },
132+
{ OperationType.Read, FaultInjectionOperationType.READ_ITEM, FaultInjectionServerErrorType.SERVICE_UNAVAILABLE, false, 503, 21008 },
133+
{ OperationType.ReadFeed, FaultInjectionOperationType.READ_FEED_ITEM, FaultInjectionServerErrorType.GONE, true, 410, HttpConstants.SubStatusCodes.SERVER_GENERATED_410 },
134+
{ OperationType.ReadFeed, FaultInjectionOperationType.READ_FEED_ITEM, FaultInjectionServerErrorType.INTERNAL_SERVER_ERROR, false, 500, 0 },
135+
{ OperationType.ReadFeed, FaultInjectionOperationType.READ_FEED_ITEM, FaultInjectionServerErrorType.RETRY_WITH, true, 449, 0 },
136+
{ OperationType.ReadFeed, FaultInjectionOperationType.READ_FEED_ITEM, FaultInjectionServerErrorType.TOO_MANY_REQUEST, true, 429, 0 },
137+
{ OperationType.ReadFeed, FaultInjectionOperationType.READ_FEED_ITEM, FaultInjectionServerErrorType.READ_SESSION_NOT_AVAILABLE, true, 404, 1002 },
138+
{ OperationType.ReadFeed, FaultInjectionOperationType.READ_FEED_ITEM, FaultInjectionServerErrorType.TIMEOUT, true, 410, HttpConstants.SubStatusCodes.SERVER_GENERATED_408 }, // for server return 408, SDK will wrap into 410/21010
139+
{ OperationType.ReadFeed, FaultInjectionOperationType.READ_FEED_ITEM, FaultInjectionServerErrorType.PARTITION_IS_MIGRATING, true, 410, 1008 },
140+
{ OperationType.ReadFeed, FaultInjectionOperationType.READ_FEED_ITEM, FaultInjectionServerErrorType.PARTITION_IS_SPLITTING, true, 410, 1007 },
141+
{ OperationType.ReadFeed, FaultInjectionOperationType.READ_FEED_ITEM, FaultInjectionServerErrorType.SERVICE_UNAVAILABLE, false, 503, 21008 },
142+
{ OperationType.Create, FaultInjectionOperationType.CREATE_ITEM, FaultInjectionServerErrorType.GONE, true, 410, HttpConstants.SubStatusCodes.SERVER_GENERATED_410 },
143+
{ OperationType.Create, FaultInjectionOperationType.CREATE_ITEM, FaultInjectionServerErrorType.INTERNAL_SERVER_ERROR, false, 500, 0 },
144+
{ OperationType.Create, FaultInjectionOperationType.CREATE_ITEM, FaultInjectionServerErrorType.RETRY_WITH, true, 449, 0 },
145+
{ OperationType.Create, FaultInjectionOperationType.CREATE_ITEM, FaultInjectionServerErrorType.TOO_MANY_REQUEST, true, 429, 0 },
146+
{ OperationType.Create, FaultInjectionOperationType.CREATE_ITEM, FaultInjectionServerErrorType.TIMEOUT, false, 410, HttpConstants.SubStatusCodes.SERVER_GENERATED_408 }, // for server return 408, SDK will wrap into 410/21010
147+
{ OperationType.Create, FaultInjectionOperationType.CREATE_ITEM, FaultInjectionServerErrorType.PARTITION_IS_MIGRATING, true, 410, 1008 },
148+
{ OperationType.Create, FaultInjectionOperationType.CREATE_ITEM, FaultInjectionServerErrorType.PARTITION_IS_SPLITTING, true, 410, 1007 },
149+
{ OperationType.Create, FaultInjectionOperationType.CREATE_ITEM, FaultInjectionServerErrorType.SERVICE_UNAVAILABLE, false, 503, 21008 },
133150
};
134151
}
135152

@@ -368,7 +385,7 @@ public void faultInjectionServerErrorRuleTests_Region() throws JsonProcessingExc
368385
.endpoint(TestConfigurations.HOST)
369386
.key(TestConfigurations.MASTER_KEY)
370387
.contentResponseOnWriteEnabled(true)
371-
.consistencyLevel(BridgeInternal.getContextClient(this.client).getConsistencyLevel())
388+
.consistencyLevel(BridgeInternal.getContextClient(this.clientWithoutPreferredRegions).getConsistencyLevel())
372389
.preferredRegions(preferredLocations)
373390
.directMode()
374391
.buildAsyncClient();
@@ -523,7 +540,7 @@ public void faultInjectionServerErrorRuleTests_ServerResponseDelay() throws Json
523540
.endpoint(TestConfigurations.HOST)
524541
.key(TestConfigurations.MASTER_KEY)
525542
.contentResponseOnWriteEnabled(true)
526-
.consistencyLevel(BridgeInternal.getContextClient(this.client).getConsistencyLevel())
543+
.consistencyLevel(BridgeInternal.getContextClient(this.clientWithoutPreferredRegions).getConsistencyLevel())
527544
.directMode(directConnectionConfig)
528545
.buildAsyncClient();
529546

@@ -588,7 +605,7 @@ public void faultInjectionServerErrorRuleTests_ServerConnectionTimeout() throws
588605
.endpoint(TestConfigurations.HOST)
589606
.key(TestConfigurations.MASTER_KEY)
590607
.contentResponseOnWriteEnabled(true)
591-
.consistencyLevel(BridgeInternal.getContextClient(this.client).getConsistencyLevel())
608+
.consistencyLevel(BridgeInternal.getContextClient(this.clientWithoutPreferredRegions).getConsistencyLevel())
592609
.directMode(directConnectionConfig)
593610
.buildAsyncClient();
594611

@@ -644,7 +661,7 @@ public void faultInjectionServerErrorRuleTests_ServerConnectionDelay() throws Js
644661
.endpoint(TestConfigurations.HOST)
645662
.key(TestConfigurations.MASTER_KEY)
646663
.contentResponseOnWriteEnabled(true)
647-
.consistencyLevel(BridgeInternal.getContextClient(this.client).getConsistencyLevel())
664+
.consistencyLevel(BridgeInternal.getContextClient(this.clientWithoutPreferredRegions).getConsistencyLevel())
648665
.buildAsyncClient();
649666

650667
CosmosAsyncContainer container =
@@ -704,7 +721,7 @@ public void faultInjectionServerErrorRuleTests_ServerConnectionDelay_warmup(
704721
.endpoint(TestConfigurations.HOST)
705722
.key(TestConfigurations.MASTER_KEY)
706723
.contentResponseOnWriteEnabled(true)
707-
.consistencyLevel(BridgeInternal.getContextClient(this.client).getConsistencyLevel())
724+
.consistencyLevel(BridgeInternal.getContextClient(this.clientWithoutPreferredRegions).getConsistencyLevel())
708725
.directMode(directConnectionConfig)
709726
.buildAsyncClient();
710727

@@ -788,18 +805,23 @@ public void faultInjectionServerErrorRuleTests_ServerConnectionDelay_warmup(
788805

789806
@Test(groups = {"multi-region", "long"}, dataProvider = "faultInjectionServerErrorResponseProvider", timeOut = TIMEOUT)
790807
public void faultInjectionServerErrorRuleTests_ServerErrorResponse(
808+
OperationType operationType,
809+
FaultInjectionOperationType faultInjectionOperationType,
791810
FaultInjectionServerErrorType serverErrorType,
792811
boolean canRetry,
793812
int errorStatusCode,
794813
int errorSubStatusCode) throws JsonProcessingException {
795814

796-
// simulate high channel acquisition/connectionTimeout
815+
// simulate high channel acquisition/connectionTimeout for read/query
816+
TestItem createdItem = TestItem.createNewItem();
817+
cosmosAsyncContainer.createItem(createdItem).block();
818+
797819
String ruleId = "serverErrorRule-" + serverErrorType + "-" + UUID.randomUUID();
798820
FaultInjectionRule serverErrorRule =
799821
new FaultInjectionRuleBuilder(ruleId)
800822
.condition(
801823
new FaultInjectionConditionBuilder()
802-
.operationType(FaultInjectionOperationType.READ_ITEM)
824+
.operationType(faultInjectionOperationType)
803825
.build()
804826
)
805827
.result(
@@ -812,40 +834,13 @@ public void faultInjectionServerErrorRuleTests_ServerErrorResponse(
812834
.build();
813835

814836
try {
815-
TestItem createdItem = TestItem.createNewItem();
816-
cosmosAsyncContainer.createItem(createdItem).block();
817-
818837
CosmosFaultInjectionHelper.configureFaultInjectionRules(cosmosAsyncContainer, Arrays.asList(serverErrorRule)).block();
819838

820-
CosmosDiagnostics cosmosDiagnostics = null;
821-
if (canRetry) {
822-
try {
823-
cosmosDiagnostics =
824-
cosmosAsyncContainer
825-
.readItem(createdItem.getId(), new PartitionKey(createdItem.getId()), TestItem.class)
826-
.block()
827-
.getDiagnostics();
828-
} catch (Exception exception) {
829-
fail("Request should succeeded, but failed with " + exception);
830-
}
831-
} else {
832-
try {
833-
cosmosDiagnostics =
834-
cosmosAsyncContainer
835-
.readItem(createdItem.getId(), new PartitionKey(createdItem.getId()), TestItem.class)
836-
.block()
837-
.getDiagnostics();
838-
fail("Request should fail, but succeeded");
839-
840-
} catch (Exception e) {
841-
cosmosDiagnostics = ((CosmosException)e).getDiagnostics();
842-
}
843-
}
844-
845-
this.validateHitCount(serverErrorRule, 1, OperationType.Read, ResourceType.Document);
839+
CosmosDiagnostics cosmosDiagnostics = performDocumentOperation(cosmosAsyncContainer, operationType, createdItem);;
840+
this.validateHitCount(serverErrorRule, 1, operationType, ResourceType.Document);
846841
this.validateFaultInjectionRuleApplied(
847842
cosmosDiagnostics,
848-
OperationType.Read,
843+
operationType,
849844
errorStatusCode,
850845
errorSubStatusCode,
851846
ruleId,
@@ -855,6 +850,7 @@ public void faultInjectionServerErrorRuleTests_ServerErrorResponse(
855850
} finally {
856851
serverErrorRule.disable();
857852
}
853+
858854
}
859855

860856
@Test(groups = {"multi-region", "long"}, timeOut = TIMEOUT)
@@ -913,7 +909,7 @@ public void faultInjectionServerErrorRuleTests_HitLimit() throws JsonProcessingE
913909

914910
@AfterClass(groups = {"multi-region", "long"}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true)
915911
public void afterClass() {
916-
safeClose(client);
912+
safeClose(clientWithoutPreferredRegions);
917913
}
918914

919915
private CosmosDiagnostics performDocumentOperation(
@@ -974,6 +970,18 @@ private CosmosDiagnostics performDocumentOperation(
974970
}
975971
}
976972

973+
if (operationType == OperationType.ReadFeed) {
974+
List<FeedRange> feedRanges = cosmosAsyncContainer.getFeedRanges().block();
975+
CosmosChangeFeedRequestOptions changeFeedRequestOptions =
976+
CosmosChangeFeedRequestOptions.createForProcessingFromBeginning(feedRanges.get(0));
977+
978+
FeedResponse<TestItem> firstPage = cosmosAsyncContainer
979+
.queryChangeFeed(changeFeedRequestOptions, TestItem.class)
980+
.byPage()
981+
.blockFirst();
982+
return firstPage.getCosmosDiagnostics();
983+
}
984+
977985
throw new IllegalArgumentException("The operation type is not supported");
978986
} catch (CosmosException cosmosException) {
979987
return cosmosException.getDiagnostics();
@@ -983,7 +991,7 @@ private CosmosDiagnostics performDocumentOperation(
983991
@Test(groups = {"long"}, timeOut = TIMEOUT)
984992
public void faultInjectionServerErrorRuleTests_includePrimary() throws JsonProcessingException {
985993
TestItem createdItem = TestItem.createNewItem();
986-
CosmosAsyncContainer singlePartitionContainer = getSharedSinglePartitionCosmosContainer(client);
994+
CosmosAsyncContainer singlePartitionContainer = getSharedSinglePartitionCosmosContainer(clientWithoutPreferredRegions);
987995
List<FeedRange> feedRanges = singlePartitionContainer.getFeedRanges().block();
988996

989997
// Test if includePrimary=true, then primary replica address will always be returned
@@ -1085,11 +1093,9 @@ private void validateFaultInjectionRuleApplied(
10851093
boolean canRetryOnFaultInjectedError) throws JsonProcessingException {
10861094

10871095
List<ObjectNode> diagnosticsNode = new ArrayList<>();
1088-
if (operationType == OperationType.Query) {
1089-
int clientSideDiagnosticsIndex = cosmosDiagnostics.toString().indexOf("[{\"userAgent\"");
1090-
ArrayNode arrayNode =
1091-
(ArrayNode) Utils.getSimpleObjectMapper().readTree(cosmosDiagnostics.toString().substring(clientSideDiagnosticsIndex));
1092-
for (JsonNode node : arrayNode) {
1096+
if (operationType == OperationType.Query || (operationType == OperationType.ReadFeed && canRetryOnFaultInjectedError)) {
1097+
ObjectNode cosmosDiagnosticsNode = (ObjectNode) Utils.getSimpleObjectMapper().readTree(cosmosDiagnostics.toString());
1098+
for (JsonNode node : cosmosDiagnosticsNode.get("clientSideRequestStatistics")) {
10931099
diagnosticsNode.add((ObjectNode) node);
10941100
}
10951101
} else {
@@ -1101,10 +1107,7 @@ private void validateFaultInjectionRuleApplied(
11011107
assertThat(responseStatisticsList.isArray()).isTrue();
11021108

11031109
if (canRetryOnFaultInjectedError) {
1104-
if (responseStatisticsList.size() != 2) {
1105-
System.out.println("FaultInjectionResponseStatisticsList is wrong " + cosmosDiagnostics.toString());
1106-
}
1107-
assertThat(responseStatisticsList.size()).isEqualTo(2);
1110+
assertThat(responseStatisticsList.size()).isGreaterThanOrEqualTo(2);
11081111
} else {
11091112
assertThat(responseStatisticsList.size()).isOne();
11101113
}

0 commit comments

Comments
 (0)