Skip to content

Commit dba7706

Browse files
authored
CosmosDB: ChangeFeedProcessor custom scheduler and other fixes (Azure#26750)
* Introduce beta API to set custom Reactor scheduler to be used by the ChangeFeedProcessor implementation. Fix an issue related to leases that were found expired. We should ensure that current CFP instance will pick at least one lease in such cases (exception is if maximum scale count was reached), even when it might break the equal partitioning. * Update logs (and add more) in order to get a better understanding of when things don't go as expected. Add a check in the acquiring of a lease to ensure that ownership did not change in between the updates. Remove "lock" that was not needed anymore; we rate limit the upstream instead.
1 parent e421a6a commit dba7706

File tree

12 files changed

+254
-64
lines changed

12 files changed

+254
-64
lines changed

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/ChangeFeedContextClient.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22
// Licensed under the MIT License.
33
package com.azure.cosmos.implementation.changefeed;
44

5+
import com.azure.cosmos.ChangeFeedProcessor;
56
import com.azure.cosmos.CosmosAsyncContainer;
7+
import com.azure.cosmos.models.ChangeFeedProcessorOptions;
68
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
79
import com.azure.cosmos.models.CosmosContainerResponse;
810
import com.azure.cosmos.CosmosAsyncDatabase;
@@ -17,9 +19,11 @@
1719
import com.azure.cosmos.models.PartitionKey;
1820
import com.azure.cosmos.models.SqlQuerySpec;
1921
import com.azure.cosmos.implementation.PartitionKeyRange;
22+
import com.azure.cosmos.util.Beta;
2023
import com.fasterxml.jackson.databind.JsonNode;
2124
import reactor.core.publisher.Flux;
2225
import reactor.core.publisher.Mono;
26+
import reactor.core.scheduler.Scheduler;
2327

2428
import java.net.URI;
2529

@@ -146,4 +150,18 @@ <T> Mono<CosmosItemResponse<T>> readItem(String itemId, PartitionKey partitionKe
146150
* Closes the document client instance and cleans up the resources.
147151
*/
148152
void close();
153+
154+
/**
155+
* Gets the internal {@link Scheduler} that hosts a pool of ExecutorService-based workers for any change feed processor related tasks.
156+
*
157+
* @return a {@link Scheduler} that hosts a pool of ExecutorService-based workers..
158+
*/
159+
Scheduler getScheduler();
160+
161+
/**
162+
* Sets the internal {@link Scheduler} that hosts a pool of ExecutorService-based workers for any change feed processor related tasks.
163+
*
164+
* @param scheduler a {@link Scheduler} that hosts a pool of ExecutorService-based workers.
165+
*/
166+
void setScheduler(Scheduler scheduler);
149167
}

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/ChangeFeedContextClientImpl.java

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
public class ChangeFeedContextClientImpl implements ChangeFeedContextClient {
4141
private final AsyncDocumentClient documentClient;
4242
private final CosmosAsyncContainer cosmosContainer;
43-
private final Scheduler rxScheduler;
43+
private Scheduler scheduler;
4444

4545
/**
4646
* Initializes a new instance of the {@link ChangeFeedContextClient} interface.
@@ -53,29 +53,39 @@ public ChangeFeedContextClientImpl(CosmosAsyncContainer cosmosContainer) {
5353

5454
this.cosmosContainer = cosmosContainer;
5555
this.documentClient = getContextClient(cosmosContainer);
56-
this.rxScheduler = Schedulers.boundedElastic();
56+
this.scheduler = Schedulers.boundedElastic();
5757
}
5858

5959
/**
6060
* Initializes a new instance of the {@link ChangeFeedContextClient} interface.
6161
* @param cosmosContainer existing client.
62-
* @param rxScheduler the RX Java scheduler to observe on.
62+
* @param scheduler the RX Java scheduler to observe on.
6363
*/
64-
public ChangeFeedContextClientImpl(CosmosAsyncContainer cosmosContainer, Scheduler rxScheduler) {
64+
public ChangeFeedContextClientImpl(CosmosAsyncContainer cosmosContainer, Scheduler scheduler) {
6565
if (cosmosContainer == null) {
6666
throw new IllegalArgumentException("cosmosContainer");
6767
}
6868

6969
this.cosmosContainer = cosmosContainer;
7070
this.documentClient = getContextClient(cosmosContainer);
71-
this.rxScheduler = rxScheduler;
71+
this.scheduler = scheduler;
7272

7373
}
7474

75+
@Override
76+
public Scheduler getScheduler() {
77+
return this.scheduler;
78+
}
79+
80+
@Override
81+
public void setScheduler(Scheduler scheduler) {
82+
this.scheduler = scheduler;
83+
}
84+
7585
@Override
7686
public Flux<FeedResponse<PartitionKeyRange>> readPartitionKeyRangeFeed(String partitionKeyRangesOrCollectionLink, CosmosQueryRequestOptions cosmosQueryRequestOptions) {
7787
return this.documentClient.readPartitionKeyRanges(partitionKeyRangesOrCollectionLink, cosmosQueryRequestOptions)
78-
.publishOn(this.rxScheduler);
88+
.publishOn(this.scheduler);
7989
}
8090

8191
@Override
@@ -119,60 +129,60 @@ public Flux<FeedResponse<JsonNode>> createDocumentChangeFeedQuery(
119129
false);
120130
});
121131
});
122-
return feedResponseFlux.publishOn(this.rxScheduler);
132+
return feedResponseFlux.publishOn(this.scheduler);
123133
}
124134

125135
@Override
126136
public Mono<CosmosDatabaseResponse> readDatabase(CosmosAsyncDatabase database, CosmosDatabaseRequestOptions options) {
127137
return database.read()
128-
.publishOn(this.rxScheduler);
138+
.publishOn(this.scheduler);
129139
}
130140

131141
@Override
132142
public Mono<CosmosContainerResponse> readContainer(CosmosAsyncContainer containerLink, CosmosContainerRequestOptions options) {
133143
return containerLink.read(options)
134-
.publishOn(this.rxScheduler);
144+
.publishOn(this.scheduler);
135145
}
136146

137147
@Override
138148
public <T> Mono<CosmosItemResponse<T>> createItem(CosmosAsyncContainer containerLink, T document,
139149
CosmosItemRequestOptions options, boolean disableAutomaticIdGeneration) {
140150
if (options != null) {
141151
return containerLink.createItem(document, options)
142-
.publishOn(this.rxScheduler);
152+
.publishOn(this.scheduler);
143153
} else {
144154
return containerLink.createItem(document)
145-
.publishOn(this.rxScheduler);
155+
.publishOn(this.scheduler);
146156
}
147157
}
148158

149159
@Override
150160
public Mono<CosmosItemResponse<Object>> deleteItem(String itemId, PartitionKey partitionKey,
151161
CosmosItemRequestOptions options) {
152162
return cosmosContainer.deleteItem(itemId, partitionKey, options)
153-
.publishOn(this.rxScheduler);
163+
.publishOn(this.scheduler);
154164
}
155165

156166
@Override
157167
public <T> Mono<CosmosItemResponse<T>> replaceItem(String itemId, PartitionKey partitionKey, T document,
158168
CosmosItemRequestOptions options) {
159169
return cosmosContainer.replaceItem(document, itemId, partitionKey, options)
160-
.publishOn(this.rxScheduler);
170+
.publishOn(this.scheduler);
161171
}
162172

163173
@Override
164174
public <T> Mono<CosmosItemResponse<T>> readItem(String itemId, PartitionKey partitionKey,
165175
CosmosItemRequestOptions options, Class<T> itemType) {
166176
return cosmosContainer.readItem(itemId, partitionKey, options, itemType)
167-
.publishOn(this.rxScheduler);
177+
.publishOn(this.scheduler);
168178
}
169179

170180
@Override
171181
public <T> Flux<FeedResponse<T>> queryItems(CosmosAsyncContainer containerLink, SqlQuerySpec querySpec,
172182
CosmosQueryRequestOptions options, Class<T> klass) {
173183
return containerLink.queryItems(querySpec, options, klass)
174184
.byPage()
175-
.publishOn(this.rxScheduler);
185+
.publishOn(this.scheduler);
176186
}
177187

178188
@Override

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/ChangeFeedProcessorBuilderImpl.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -397,21 +397,21 @@ public ChangeFeedProcessor build() {
397397
logger.warn("Found lower than expected setting for leaseAcquireInterval");
398398
}
399399

400-
if (this.scheduler == null) {
401-
this.scheduler = Schedulers.boundedElastic();
400+
if (this.changeFeedProcessorOptions == null) {
401+
this.changeFeedProcessorOptions = new ChangeFeedProcessorOptions();
402402
}
403403

404+
this.scheduler = this.changeFeedProcessorOptions.getScheduler();
405+
this.feedContextClient.setScheduler(this.scheduler);
406+
this.leaseContextClient.setScheduler(this.scheduler);
407+
404408
return this;
405409
}
406410

407411
public ChangeFeedProcessorBuilderImpl() {
408412
}
409413

410414
private Mono<ChangeFeedProcessor> initializeCollectionPropertiesForBuild() {
411-
if (this.changeFeedProcessorOptions == null) {
412-
this.changeFeedProcessorOptions = new ChangeFeedProcessorOptions();
413-
}
414-
415415
return this.feedContextClient
416416
.readDatabase(this.feedContextClient.getDatabaseClient(), null)
417417
.map( databaseResourceResponse -> {

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/DocumentServiceLeaseUpdaterImpl.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,8 @@ public Mono<Lease> updateLease(final Lease cachedLease, String itemId, Partition
7272
if (throwable instanceof CosmosException) {
7373
CosmosException ex = (CosmosException) throwable;
7474
if (ex.getStatusCode() == HTTP_STATUS_CODE_NOT_FOUND) {
75-
// Partition lease no longer exists
75+
logger.info(
76+
"Partition {} could not be found.", cachedLease.getLeaseToken());
7677
throw new LeaseLostException(cachedLease);
7778
}
7879
}
@@ -88,8 +89,15 @@ public Mono<Lease> updateLease(final Lease cachedLease, String itemId, Partition
8889
cachedLease.getConcurrencyToken(),
8990
serverLease.getOwner(),
9091
serverLease.getConcurrencyToken());
92+
93+
// Check if we still have the expected ownership on the target lease.
94+
if (serverLease.getOwner() != null && !serverLease.getOwner().equalsIgnoreCase(cachedLease.getOwner())) {
95+
logger.info("Partition {} lease was acquired already by owner '{}'", serverLease.getLeaseToken(), serverLease.getOwner());
96+
throw new LeaseLostException(serverLease);
97+
}
98+
99+
cachedLease.setTimestamp(Instant.now());
91100
cachedLease.setConcurrencyToken(serverLease.getConcurrencyToken());
92-
cachedLease.setOwner(serverLease.getOwner());
93101

94102
throw new LeaseConflictException(cachedLease, "Partition update failed");
95103
});

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/EqualPartitionsBalancingStrategy.java

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,12 @@
99

1010
import java.time.Duration;
1111
import java.time.Instant;
12-
import java.time.ZoneId;
1312
import java.util.ArrayList;
13+
import java.util.Collections;
1414
import java.util.HashMap;
1515
import java.util.List;
1616
import java.util.Map;
17+
import java.util.Random;
1718

1819
/**
1920
* Implementation for {@link PartitionLoadBalancingStrategy}.
@@ -24,7 +25,6 @@ class EqualPartitionsBalancingStrategy implements PartitionLoadBalancingStrategy
2425
private final int minPartitionCount;
2526
private final int maxPartitionCount;
2627
private final Duration leaseExpirationInterval;
27-
private volatile int countAssignedLeases;
2828

2929
public EqualPartitionsBalancingStrategy(String hostName, int minPartitionCount, int maxPartitionCount, Duration leaseExpirationInterval) {
3030
if (hostName == null) {
@@ -35,7 +35,6 @@ public EqualPartitionsBalancingStrategy(String hostName, int minPartitionCount,
3535
this.minPartitionCount = minPartitionCount;
3636
this.maxPartitionCount = maxPartitionCount;
3737
this.leaseExpirationInterval = leaseExpirationInterval;
38-
this.countAssignedLeases = 0;
3938
}
4039

4140
@Override
@@ -55,30 +54,36 @@ public List<Lease> selectLeasesToTake(List<Lease> allLeases) {
5554

5655
int target = this.calculateTargetPartitionCount(partitionCount, workerCount);
5756
int myCount = workerToPartitionCount.get(this.hostName);
58-
this.countAssignedLeases = myCount;
5957
int partitionsNeededForMe = target - myCount;
6058

61-
/*
62-
Logger.InfoFormat(
63-
"Host '{0}' {1} partitions, {2} hosts, {3} available leases, target = {4}, min = {5}, max = {6}, mine = {7}, will try to take {8} lease(s) for myself'.",
64-
this.hostName,
65-
partitionCount,
66-
workerCount,
67-
expiredLeases.Count,
68-
target,
69-
this.minScaleCount,
70-
this.maxScaleCount,
71-
myCount,
72-
Math.Max(partitionsNeededForMe, 0));
73-
*/
59+
if (expiredLeases.size() > 0) {
60+
// We should try to pick at least one expired lease even if already overbooked when maximum partition count is not set.
61+
// If other CFP instances are running, limit the number of expired leases to acquire to maximum 1 (non-greedy acquiring).
62+
if ((this.maxPartitionCount == 0 && partitionsNeededForMe <= 0) || (partitionsNeededForMe > 1 && workerToPartitionCount.size() > 1)) {
63+
partitionsNeededForMe = 1;
64+
}
7465

75-
if (partitionsNeededForMe <= 0)
76-
return new ArrayList<Lease>();
66+
if (partitionsNeededForMe == 1) {
67+
// Try to minimize potential collisions between different CFP instances trying to pick the same lease.
68+
Random random = new Random();
69+
Lease expiredLease = expiredLeases.get(random.nextInt(expiredLeases.size()));
70+
this.logger.info("Found unused or expired lease {} (owner was {}); previous lease count for instance owner {} is {}, count of leases to target is {} and maxScaleCount {} ",
71+
expiredLease.getLeaseToken(), expiredLease.getOwner(), this.hostName, myCount, partitionsNeededForMe, this.maxPartitionCount);
72+
73+
return Collections.singletonList(expiredLease);
74+
} else {
75+
for (Lease lease : expiredLeases) {
76+
this.logger.info("Found unused or expired lease {} (owner was {}); previous lease count for instance owner {} is {} and maxScaleCount {} ",
77+
lease.getLeaseToken(), lease.getOwner(), this.hostName, myCount, this.maxPartitionCount);
78+
}
79+
}
7780

78-
if (expiredLeases.size() > 0) {
7981
return expiredLeases.subList(0, Math.min(partitionsNeededForMe, expiredLeases.size()));
8082
}
8183

84+
if (partitionsNeededForMe <= 0)
85+
return new ArrayList<Lease>();
86+
8287
Lease stolenLease = getLeaseToSteal(workerToPartitionCount, target, partitionsNeededForMe, allPartitions);
8388
List<Lease> stolenLeases = new ArrayList<>();
8489

@@ -150,8 +155,6 @@ private void categorizeLeases(
150155
allPartitions.put(lease.getLeaseToken(), lease);
151156

152157
if (lease.getOwner() == null || lease.getOwner().isEmpty() || this.isExpired(lease)) {
153-
this.logger.info("Found unused or expired lease {}; current lease count for instance owner {} is {} and maxScaleCount {} ",
154-
lease.getLeaseToken(), this.hostName, this.countAssignedLeases, this.maxPartitionCount);
155158
expiredLeases.add(lease);
156159
} else {
157160
String assignedTo = lease.getOwner();

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/LeaseStoreManagerImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ public Mono<Lease> acquire(Lease lease) {
234234
this.requestOptionsFactory.createItemRequestOptions(lease),
235235
serverLease -> {
236236
if (serverLease.getOwner() != null && !serverLease.getOwner().equalsIgnoreCase(oldOwner)) {
237-
logger.info("Partition {} lease was taken over by owner '{}'", lease.getLeaseToken(), serverLease.getOwner());
237+
logger.info("Partition {} lease was acquired already by owner '{}'", lease.getLeaseToken(), serverLease.getOwner());
238238
throw new LeaseLostException(lease);
239239
}
240240
serverLease.setOwner(this.settings.getHostName());

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionControllerImpl.java

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
class PartitionControllerImpl implements PartitionController {
2929
private static final Logger logger = LoggerFactory.getLogger(PartitionControllerImpl.class);
3030
private final Map<String, WorkerTask> currentlyOwnedPartitions = new ConcurrentHashMap<>();
31-
private final Object lock;
3231

3332
private final LeaseContainer leaseContainer;
3433
private final LeaseManager leaseManager;
@@ -45,7 +44,6 @@ public PartitionControllerImpl(
4544
PartitionSynchronizer synchronizer,
4645
Scheduler scheduler) {
4746

48-
this.lock = new Object();
4947
this.leaseContainer = leaseContainer;
5048
this.leaseManager = leaseManager;
5149
this.partitionSupervisorFactory = partitionSupervisorFactory;
@@ -71,17 +69,14 @@ public synchronized Mono<Lease> addOrUpdateLease(final Lease lease) {
7169
}
7270

7371
return this.leaseManager.acquire(lease)
74-
.defaultIfEmpty(lease)
7572
.map(updatedLease -> {
76-
synchronized (lock) {
77-
WorkerTask checkTask = this.currentlyOwnedPartitions.get(lease.getLeaseToken());
78-
if (checkTask == null) {
79-
logger.info("Partition {}: acquired.", updatedLease.getLeaseToken());
80-
PartitionSupervisor supervisor = this.partitionSupervisorFactory.create(updatedLease);
81-
this.currentlyOwnedPartitions.put(updatedLease.getLeaseToken(), this.processPartition(supervisor, updatedLease));
82-
}
83-
return updatedLease;
73+
WorkerTask checkTask = this.currentlyOwnedPartitions.get(lease.getLeaseToken());
74+
if (checkTask == null) {
75+
logger.info("Partition {}: acquired.", updatedLease.getLeaseToken());
76+
PartitionSupervisor supervisor = this.partitionSupervisorFactory.create(updatedLease);
77+
this.currentlyOwnedPartitions.put(updatedLease.getLeaseToken(), this.processPartition(supervisor, updatedLease));
8478
}
79+
return updatedLease;
8580
})
8681
.onErrorResume(throwable -> {
8782
logger.warn("Partition {}: unexpected error; removing lease from current cache.", lease.getLeaseToken());

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionLoadBalancerImpl.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,10 +108,13 @@ private Mono<Void> run(CancellationToken cancellationToken) {
108108
.flatMap(allLeases -> {
109109
if (cancellationToken.isCancellationRequested()) return Mono.empty();
110110
List<Lease> leasesToTake = this.partitionLoadBalancingStrategy.selectLeasesToTake(allLeases);
111-
this.logger.debug("Found {} leases, taking {}", allLeases.size(), leasesToTake.size());
111+
if (leasesToTake.size() > 0) {
112+
this.logger.info("Found {} total leases, taking ownership of {}", allLeases.size(), leasesToTake.size());
113+
}
112114

113115
if (cancellationToken.isCancellationRequested()) return Mono.empty();
114116
return Flux.fromIterable(leasesToTake)
117+
.limitRate(1)
115118
.flatMap(lease -> {
116119
if (cancellationToken.isCancellationRequested()) return Mono.empty();
117120
return this.partitionController.addOrUpdateLease(lease);

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionSupervisorImpl.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,6 @@ public PartitionSupervisorImpl(Lease lease, ChangeFeedObserver observer, Partiti
4242
this.processor = processor;
4343
this.renewer = renewer;
4444
this.scheduler = scheduler;
45-
46-
if (scheduler == null) {
47-
this.scheduler = Schedulers.boundedElastic();
48-
}
49-
5045
this.childShutdownCts = new CancellationTokenSource();
5146
}
5247

0 commit comments

Comments
 (0)