Skip to content

Commit 2421d37

Browse files
xinlian12annie-mac
andauthored
[ChangeFeedProcessor]suppressE2ETimeoutConfigOnChangeFeedProcessor (Azure#36775)
* suppress e2e timeout policy on changeFeedProcessor --------- Co-authored-by: annie-mac <xinlian@microsoft.com>
1 parent b21e8a8 commit 2421d37

File tree

11 files changed

+285
-27
lines changed

11 files changed

+285
-27
lines changed

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/FullFidelityChangeFeedProcessorTest.java

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import com.azure.cosmos.CosmosAsyncContainer;
99
import com.azure.cosmos.CosmosAsyncDatabase;
1010
import com.azure.cosmos.CosmosClientBuilder;
11+
import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfigBuilder;
1112
import com.azure.cosmos.implementation.InternalObjectNode;
1213
import com.azure.cosmos.implementation.Utils;
1314
import com.azure.cosmos.implementation.changefeed.epkversion.ServiceItemLeaseV1;
@@ -879,6 +880,78 @@ public void inactiveOwnersRecovery() throws InterruptedException {
879880
}
880881
}
881882

883+
@Test(groups = { "emulator" }, timeOut = 50 * CHANGE_FEED_PROCESSOR_TIMEOUT)
884+
public void endToEndTimeoutConfigShouldBeSuppressed() throws InterruptedException {
885+
CosmosAsyncClient clientWithE2ETimeoutConfig = null;
886+
CosmosAsyncContainer createdFeedCollection = createFeedCollection(FEED_COLLECTION_THROUGHPUT);
887+
CosmosAsyncContainer createdLeaseCollection = createLeaseCollection(LEASE_COLLECTION_THROUGHPUT);
888+
889+
try {
890+
clientWithE2ETimeoutConfig = this.getClientBuilder()
891+
.endToEndOperationLatencyPolicyConfig(new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofMillis(1)).build())
892+
.contentResponseOnWriteEnabled(true)
893+
.buildAsyncClient();
894+
895+
CosmosAsyncDatabase testDatabase = clientWithE2ETimeoutConfig.getDatabase(this.createdDatabase.getId());
896+
CosmosAsyncContainer createdFeedCollectionDuplicate = testDatabase.getContainer(createdFeedCollection.getId());
897+
CosmosAsyncContainer createdLeaseCollectionDuplicate = testDatabase.getContainer(createdLeaseCollection.getId());
898+
899+
List<InternalObjectNode> createdDocuments = new ArrayList<>();
900+
Map<String, ChangeFeedProcessorItem> receivedDocuments = new ConcurrentHashMap<>();
901+
ChangeFeedProcessorOptions changeFeedProcessorOptions = new ChangeFeedProcessorOptions();
902+
ChangeFeedProcessor changeFeedProcessor = new ChangeFeedProcessorBuilder()
903+
.options(changeFeedProcessorOptions)
904+
.hostName(hostName)
905+
.handleAllVersionsAndDeletesChanges((List<ChangeFeedProcessorItem> docs) -> {
906+
log.info("START processing from thread {}", Thread.currentThread().getId());
907+
for (ChangeFeedProcessorItem item : docs) {
908+
processItem(item, receivedDocuments);
909+
}
910+
log.info("END processing from thread {}", Thread.currentThread().getId());
911+
})
912+
.feedContainer(createdFeedCollectionDuplicate)
913+
.leaseContainer(createdLeaseCollectionDuplicate)
914+
.buildChangeFeedProcessor();
915+
916+
try {
917+
changeFeedProcessor.start().subscribeOn(Schedulers.boundedElastic())
918+
.timeout(Duration.ofMillis(2 * CHANGE_FEED_PROCESSOR_TIMEOUT))
919+
.subscribe();
920+
logger.info("Starting ChangeFeed processor");
921+
922+
// Wait for the feed processor to receive and process the documents.
923+
Thread.sleep(2 * CHANGE_FEED_PROCESSOR_TIMEOUT);
924+
925+
logger.info("Finished starting ChangeFeed processor");
926+
927+
setupReadFeedDocuments(createdDocuments, receivedDocuments, createdFeedCollection, FEED_COUNT);
928+
logger.info("Set up read feed documents");
929+
930+
// Wait for the feed processor to receive and process the documents.
931+
Thread.sleep(2 * CHANGE_FEED_PROCESSOR_TIMEOUT);
932+
logger.info("Validating changes now");
933+
934+
validateChangeFeedProcessing(changeFeedProcessor, createdDocuments, receivedDocuments, 10 * CHANGE_FEED_PROCESSOR_TIMEOUT);
935+
936+
changeFeedProcessor.stop().subscribeOn(Schedulers.boundedElastic()).timeout(Duration.ofMillis(CHANGE_FEED_PROCESSOR_TIMEOUT)).subscribe();
937+
938+
// Wait for the feed processor to shut down.
939+
Thread.sleep(2 * CHANGE_FEED_PROCESSOR_TIMEOUT);
940+
941+
} catch (Exception ex) {
942+
log.error("Change feed processor did not start and stopped in the expected time", ex);
943+
throw ex;
944+
}
945+
946+
} finally {
947+
safeDeleteCollection(createdFeedCollection);
948+
safeDeleteCollection(createdLeaseCollection);
949+
safeClose(clientWithE2ETimeoutConfig);
950+
// Allow some time for the collections to be deleted before exiting.
951+
Thread.sleep(500);
952+
}
953+
}
954+
882955
void validateChangeFeedProcessing(ChangeFeedProcessor changeFeedProcessor, List<InternalObjectNode> createdDocuments, Map<String, ChangeFeedProcessorItem> receivedDocuments, int sleepTime) throws InterruptedException {
883956
assertThat(changeFeedProcessor.isStarted()).as("Change Feed Processor instance is running").isTrue();
884957

@@ -991,4 +1064,4 @@ private static synchronized void processItem(ChangeFeedProcessorItem item, Map<S
9911064
log.info("RECEIVED {}", item);
9921065
receivedDocuments.put(item.getCurrent().get("id").asText(), item);
9931066
}
994-
}
1067+
}

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/IncrementalChangeFeedProcessorTest.java

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import com.azure.cosmos.CosmosAsyncContainer;
1010
import com.azure.cosmos.CosmosAsyncDatabase;
1111
import com.azure.cosmos.CosmosClientBuilder;
12+
import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfigBuilder;
1213
import com.azure.cosmos.implementation.AsyncDocumentClient;
1314
import com.azure.cosmos.implementation.DatabaseAccount;
1415
import com.azure.cosmos.implementation.DatabaseAccountLocation;
@@ -1540,6 +1541,75 @@ public void readFeedDocuments_pollDelay() throws InterruptedException {
15401541
}
15411542
}
15421543

1544+
@Test(groups = {"query" }, timeOut = 2 * TIMEOUT)
1545+
public void endToEndTimeoutConfigShouldBeSuppressed() throws InterruptedException {
1546+
CosmosAsyncClient clientWithE2ETimeoutConfig = null;
1547+
CosmosAsyncContainer createdFeedCollection = createFeedCollection(FEED_COLLECTION_THROUGHPUT);
1548+
CosmosAsyncContainer createdLeaseCollection = createLeaseCollection(LEASE_COLLECTION_THROUGHPUT);
1549+
1550+
try {
1551+
clientWithE2ETimeoutConfig = this.getClientBuilder()
1552+
.endToEndOperationLatencyPolicyConfig(new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofMillis(1)).build())
1553+
.contentResponseOnWriteEnabled(true)
1554+
.buildAsyncClient();
1555+
1556+
CosmosAsyncDatabase testDatabase = clientWithE2ETimeoutConfig.getDatabase(this.createdDatabase.getId());
1557+
CosmosAsyncContainer createdFeedCollectionDuplicate = testDatabase.getContainer(createdFeedCollection.getId());
1558+
CosmosAsyncContainer createdLeaseCollectionDuplicate = testDatabase.getContainer(createdLeaseCollection.getId());
1559+
1560+
List<InternalObjectNode> createdDocuments = new ArrayList<>();
1561+
Map<String, JsonNode> receivedDocuments = new ConcurrentHashMap<>();
1562+
setupReadFeedDocuments(createdDocuments, receivedDocuments, createdFeedCollection, FEED_COUNT);
1563+
1564+
changeFeedProcessor = new ChangeFeedProcessorBuilder()
1565+
.hostName(hostName)
1566+
.handleLatestVersionChanges(changeFeedProcessorHandler(receivedDocuments))
1567+
.feedContainer(createdFeedCollectionDuplicate)
1568+
.leaseContainer(createdLeaseCollectionDuplicate)
1569+
.options(new ChangeFeedProcessorOptions()
1570+
.setLeaseRenewInterval(Duration.ofSeconds(20))
1571+
.setLeaseAcquireInterval(Duration.ofSeconds(10))
1572+
.setLeaseExpirationInterval(Duration.ofSeconds(30))
1573+
.setFeedPollDelay(Duration.ofSeconds(2))
1574+
.setLeasePrefix("TEST")
1575+
.setMaxItemCount(10)
1576+
.setStartFromBeginning(true)
1577+
.setMaxScaleCount(0) // unlimited
1578+
)
1579+
.buildChangeFeedProcessor();
1580+
1581+
try {
1582+
changeFeedProcessor.start().subscribeOn(Schedulers.boundedElastic())
1583+
.timeout(Duration.ofMillis(2 * CHANGE_FEED_PROCESSOR_TIMEOUT))
1584+
.subscribe();
1585+
} catch (Exception ex) {
1586+
logger.error("Change feed processor did not start in the expected time", ex);
1587+
throw ex;
1588+
}
1589+
1590+
// Wait for the feed processor to receive and process the documents.
1591+
Thread.sleep(2 * CHANGE_FEED_PROCESSOR_TIMEOUT);
1592+
1593+
assertThat(changeFeedProcessor.isStarted()).as("Change Feed Processor instance is running").isTrue();
1594+
1595+
changeFeedProcessor.stop().subscribeOn(Schedulers.boundedElastic()).timeout(Duration.ofMillis(CHANGE_FEED_PROCESSOR_TIMEOUT)).subscribe();
1596+
1597+
for (InternalObjectNode item : createdDocuments) {
1598+
assertThat(receivedDocuments.containsKey(item.getId())).as("Document with getId: " + item.getId()).isTrue();
1599+
}
1600+
1601+
// Wait for the feed processor to shutdown.
1602+
Thread.sleep(CHANGE_FEED_PROCESSOR_TIMEOUT);
1603+
} finally {
1604+
safeDeleteCollection(createdFeedCollection);
1605+
safeDeleteCollection(createdLeaseCollection);
1606+
safeClose(clientWithE2ETimeoutConfig);
1607+
1608+
// Allow some time for the collections to be deleted before exiting.
1609+
Thread.sleep(500);
1610+
}
1611+
}
1612+
15431613
void validateChangeFeedProcessing(ChangeFeedProcessor changeFeedProcessor, List<InternalObjectNode> createdDocuments, Map<String, JsonNode> receivedDocuments, int sleepTime) throws InterruptedException {
15441614
try {
15451615
changeFeedProcessor.start().subscribeOn(Schedulers.boundedElastic())

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/pkversion/IncrementalChangeFeedProcessorTest.java

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import com.azure.cosmos.CosmosAsyncContainer;
99
import com.azure.cosmos.CosmosAsyncDatabase;
1010
import com.azure.cosmos.CosmosClientBuilder;
11+
import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfigBuilder;
1112
import com.azure.cosmos.implementation.AsyncDocumentClient;
1213
import com.azure.cosmos.implementation.DatabaseAccount;
1314
import com.azure.cosmos.implementation.DatabaseAccountLocation;
@@ -1696,6 +1697,75 @@ public void readFeedDocuments_pollDelay() throws InterruptedException {
16961697
}
16971698
}
16981699

1700+
@Test(groups = { "emulator" }, timeOut = 2 * TIMEOUT)
1701+
public void endToEndTimeoutConfigShouldBeSuppressed() throws InterruptedException {
1702+
CosmosAsyncClient clientWithE2ETimeoutConfig = null;
1703+
CosmosAsyncContainer createdFeedCollection = createFeedCollection(FEED_COLLECTION_THROUGHPUT);
1704+
CosmosAsyncContainer createdLeaseCollection = createLeaseCollection(LEASE_COLLECTION_THROUGHPUT);
1705+
1706+
try {
1707+
clientWithE2ETimeoutConfig = this.getClientBuilder()
1708+
.endToEndOperationLatencyPolicyConfig(new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofMillis(1)).build())
1709+
.contentResponseOnWriteEnabled(true)
1710+
.buildAsyncClient();
1711+
1712+
CosmosAsyncDatabase testDatabase = clientWithE2ETimeoutConfig.getDatabase(this.createdDatabase.getId());
1713+
CosmosAsyncContainer createdFeedCollectionDuplicate = testDatabase.getContainer(createdFeedCollection.getId());
1714+
CosmosAsyncContainer createdLeaseCollectionDuplicate = testDatabase.getContainer(createdLeaseCollection.getId());
1715+
1716+
List<InternalObjectNode> createdDocuments = new ArrayList<>();
1717+
Map<String, JsonNode> receivedDocuments = new ConcurrentHashMap<>();
1718+
setupReadFeedDocuments(createdDocuments, receivedDocuments, createdFeedCollection, FEED_COUNT);
1719+
1720+
changeFeedProcessor = new ChangeFeedProcessorBuilder()
1721+
.hostName(hostName)
1722+
.handleChanges(changeFeedProcessorHandler(receivedDocuments))
1723+
.feedContainer(createdFeedCollectionDuplicate)
1724+
.leaseContainer(createdLeaseCollectionDuplicate)
1725+
.options(new ChangeFeedProcessorOptions()
1726+
.setLeaseRenewInterval(Duration.ofSeconds(20))
1727+
.setLeaseAcquireInterval(Duration.ofSeconds(10))
1728+
.setLeaseExpirationInterval(Duration.ofSeconds(30))
1729+
.setFeedPollDelay(Duration.ofSeconds(2))
1730+
.setLeasePrefix("TEST")
1731+
.setMaxItemCount(10)
1732+
.setStartFromBeginning(true)
1733+
.setMaxScaleCount(0) // unlimited
1734+
)
1735+
.buildChangeFeedProcessor();
1736+
1737+
try {
1738+
changeFeedProcessor.start().subscribeOn(Schedulers.boundedElastic())
1739+
.timeout(Duration.ofMillis(2 * CHANGE_FEED_PROCESSOR_TIMEOUT))
1740+
.subscribe();
1741+
} catch (Exception ex) {
1742+
log.error("Change feed processor did not start in the expected time", ex);
1743+
throw ex;
1744+
}
1745+
1746+
// Wait for the feed processor to receive and process the documents.
1747+
Thread.sleep(2 * CHANGE_FEED_PROCESSOR_TIMEOUT);
1748+
1749+
assertThat(changeFeedProcessor.isStarted()).as("Change Feed Processor instance is running").isTrue();
1750+
1751+
changeFeedProcessor.stop().subscribeOn(Schedulers.boundedElastic()).timeout(Duration.ofMillis(CHANGE_FEED_PROCESSOR_TIMEOUT)).subscribe();
1752+
1753+
for (InternalObjectNode item : createdDocuments) {
1754+
assertThat(receivedDocuments.containsKey(item.getId())).as("Document with getId: " + item.getId()).isTrue();
1755+
}
1756+
1757+
// Wait for the feed processor to shutdown.
1758+
Thread.sleep(CHANGE_FEED_PROCESSOR_TIMEOUT);
1759+
} finally {
1760+
safeDeleteCollection(createdFeedCollection);
1761+
safeDeleteCollection(createdLeaseCollection);
1762+
safeClose(clientWithE2ETimeoutConfig);
1763+
1764+
// Allow some time for the collections to be deleted before exiting.
1765+
Thread.sleep(500);
1766+
}
1767+
}
1768+
16991769
void validateChangeFeedProcessing(ChangeFeedProcessor changeFeedProcessor, List<InternalObjectNode> createdDocuments, Map<String, JsonNode> receivedDocuments, int sleepTime) throws InterruptedException {
17001770
try {
17011771
changeFeedProcessor.start().subscribeOn(Schedulers.boundedElastic())

sdk/cosmos/azure-cosmos/CHANGELOG.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@
99
#### Breaking Changes
1010

1111
#### Bugs Fixed
12+
* Disabled `CosmosEndToEndOperationLatencyPolicyConfig` feature in `ChangeFeedProcessor`. Setting `CosmosEndToEndOperationLatencyPolicyConfig` at `CosmosClient` level will not affect `ChangeFeedProcessor` requests in any way. See [PR 36775](https://github.com/Azure/azure-sdk-for-java/pull/36775)
1213
* Fixed staleness issue of `COSMOS.MIN_CONNECTION_POOL_SIZE_PER_ENDPOINT` system property - See [PR 36599](https://github.com/Azure/azure-sdk-for-java/pull/36599).
13-
* Fixed an issue where `pageSize` from `byPage` is not always being honored. This only happens when the same `CosmosQueryRequestOptions` being used through different
14-
requests, and different pageSize being used. See [PR 36847](https://github.com/Azure/azure-sdk-for-java/pull/36847)
14+
* Fixed an issue where `pageSize` from `byPage` is not always being honored. This only happens when the same `CosmosQueryRequestOptions` being used through different requests, and different pageSize being used. See [PR 36847](https://github.com/Azure/azure-sdk-for-java/pull/36847)
1515

1616
#### Other Changes
1717
* Handling negative end-to-end timeouts provided more gracefully by throwing a `CosmsoException` (`OperationCancelledException`) instead of `IllegalArgumentException`. - See [PR 36507](https://github.com/Azure/azure-sdk-for-java/pull/36507)

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/PartitionedByIdCollectionRequestOptionsFactory.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,33 @@
22
// Licensed under the MIT License.
33
package com.azure.cosmos.implementation.changefeed.common;
44

5+
import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfig;
6+
import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfigBuilder;
57
import com.azure.cosmos.models.CosmosItemRequestOptions;
68
import com.azure.cosmos.models.CosmosQueryRequestOptions;
79
import com.azure.cosmos.implementation.changefeed.Lease;
810
import com.azure.cosmos.implementation.changefeed.RequestOptionsFactory;
911

12+
import java.time.Duration;
13+
1014
/**
1115
* Used to create request setOptions for partitioned lease collections, when partition getKey is defined as /getId.
1216
*/
1317
public class PartitionedByIdCollectionRequestOptionsFactory implements RequestOptionsFactory {
18+
private static CosmosEndToEndOperationLatencyPolicyConfig DISABLED_E2E_TIMEOUT_CONFIG =
19+
new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(5))
20+
.enable(false)
21+
.build();
22+
1423
@Override
1524
public CosmosItemRequestOptions createItemRequestOptions(Lease lease) {
16-
return new CosmosItemRequestOptions();
25+
// Disable e2e timeout config within changeFeedProcessor
26+
return new CosmosItemRequestOptions().setCosmosEndToEndOperationLatencyPolicyConfig(DISABLED_E2E_TIMEOUT_CONFIG);
1727
}
1828

1929
@Override
2030
public CosmosQueryRequestOptions createQueryRequestOptions() {
21-
return new CosmosQueryRequestOptions();
31+
// Disable e2e timeout config within changeFeedProcessor
32+
return new CosmosQueryRequestOptions().setCosmosEndToEndOperationLatencyPolicyConfig(DISABLED_E2E_TIMEOUT_CONFIG);
2233
}
2334
}

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/DocumentServiceLeaseUpdaterImpl.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public Mono<Lease> updateLease(
6060

6161
return
6262
Mono.just(this)
63-
.flatMap( value -> this.tryReplaceLease(cachedLease, itemId, partitionKey))
63+
.flatMap( value -> this.tryReplaceLease(cachedLease, itemId, partitionKey, requestOptions))
6464
.map(leaseDocument -> {
6565
cachedLease.setServiceItemLease(ServiceItemLeaseV1.fromDocument(leaseDocument));
6666
return cachedLease;
@@ -137,8 +137,13 @@ public Mono<Lease> updateLease(
137137
private Mono<InternalObjectNode> tryReplaceLease(
138138
Lease lease,
139139
String itemId,
140-
PartitionKey partitionKey) throws LeaseLostException {
141-
return this.client.replaceItem(itemId, partitionKey, lease, this.getCreateIfMatchOptions(lease))
140+
PartitionKey partitionKey,
141+
CosmosItemRequestOptions cosmosItemRequestOptions) throws LeaseLostException {
142+
return this.client.replaceItem(
143+
itemId,
144+
partitionKey,
145+
lease,
146+
this.getCreateIfMatchOptions(cosmosItemRequestOptions, lease))
142147
.map(cosmosItemResponse -> BridgeInternal.getProperties(cosmosItemResponse))
143148
.onErrorResume(re -> {
144149
if (re instanceof CosmosException) {
@@ -162,8 +167,7 @@ private Mono<InternalObjectNode> tryReplaceLease(
162167
});
163168
}
164169

165-
private CosmosItemRequestOptions getCreateIfMatchOptions(Lease lease) {
166-
CosmosItemRequestOptions createIfMatchOptions = new CosmosItemRequestOptions();
170+
private CosmosItemRequestOptions getCreateIfMatchOptions(CosmosItemRequestOptions createIfMatchOptions, Lease lease) {
167171
createIfMatchOptions.setIfMatchETag(lease.getConcurrencyToken());
168172

169173
return createIfMatchOptions;

0 commit comments

Comments
 (0)