Skip to content

Commit e83e8c8

Browse files
authored
CosmosDB: ChangeFeedProcessor fix for ownership race condition (Azure#25376)
* CosmosDB: ChangeFeedProcessor fix for ownership race condition The issue occurs when the current worker is too slow at processing or renewing the current lease, and gives an opportunity to another CFP instance to take over. If that second worker encounter any errors while processing the feeds, the ownership will be set to 'null'. When the first worker finally resumes, it will continue to checkpoint and renew the lease because the current check which establishes ownership missing the 'null' owner case.
1 parent d06173c commit e83e8c8

File tree

2 files changed

+171
-2
lines changed

2 files changed

+171
-2
lines changed

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/LeaseStoreManagerImpl.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,11 @@ public Mono<Lease> renew(Lease lease) {
314314
this.requestOptionsFactory.createItemRequestOptions(lease),
315315
serverLease ->
316316
{
317-
if (serverLease.getOwner() != null && !serverLease.getOwner().equalsIgnoreCase(lease.getOwner())) {
317+
if (serverLease.getOwner() == null) {
318+
logger.info("Partition {} lease was taken over and released by a different owner", lease.getLeaseToken());
319+
throw new LeaseLostException(lease);
320+
}
321+
else if (!serverLease.getOwner().equalsIgnoreCase(lease.getOwner())) {
318322
logger.info("Partition {} lease was taken over by owner '{}'", lease.getLeaseToken(), serverLease.getOwner());
319323
throw new LeaseLostException(lease);
320324
}
@@ -370,7 +374,11 @@ public Mono<Lease> checkpoint(Lease lease, String continuationToken) {
370374
lease.getId(), new PartitionKey(lease.getId()),
371375
this.requestOptionsFactory.createItemRequestOptions(lease),
372376
serverLease -> {
373-
if (serverLease.getOwner() != null && !serverLease.getOwner().equalsIgnoreCase(lease.getOwner())) {
377+
if (serverLease.getOwner() == null) {
378+
logger.info("Partition {} lease was taken over and released by a different owner", lease.getLeaseToken());
379+
throw new LeaseLostException(lease);
380+
}
381+
else if (!serverLease.getOwner().equalsIgnoreCase(lease.getOwner())) {
374382
logger.info("Partition {} lease was taken over by owner '{}'", lease.getLeaseToken(), serverLease.getOwner());
375383
throw new LeaseLostException(lease);
376384
}

sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ChangeFeedProcessorTest.java

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import java.time.ZoneOffset;
4545
import java.time.ZonedDateTime;
4646
import java.util.ArrayList;
47+
import java.util.Arrays;
4748
import java.util.Collections;
4849
import java.util.List;
4950
import java.util.Map;
@@ -507,6 +508,166 @@ public void staledLeaseAcquiring() throws InterruptedException {
507508
}
508509
}
509510

511+
@Test(groups = { "emulator" }, timeOut = 50 * CHANGE_FEED_PROCESSOR_TIMEOUT)
512+
public void ownerNullAcquiring() throws InterruptedException {
513+
final String ownerFirst = "Owner_First";
514+
final String ownerSecond = "Owner_Second";
515+
final String leasePrefix = "TEST";
516+
CosmosAsyncContainer createdFeedCollection = createFeedCollection(FEED_COLLECTION_THROUGHPUT);
517+
CosmosAsyncContainer createdLeaseCollection = createLeaseCollection(LEASE_COLLECTION_THROUGHPUT);
518+
519+
try {
520+
List<InternalObjectNode> createdDocuments = new ArrayList<>();
521+
Map<String, JsonNode> receivedDocuments = new ConcurrentHashMap<>();
522+
setupReadFeedDocuments(createdDocuments, receivedDocuments, createdFeedCollection, FEED_COUNT);
523+
524+
ChangeFeedProcessor changeFeedProcessorFirst = new ChangeFeedProcessorBuilder()
525+
.hostName(ownerFirst)
526+
.handleChanges(docs -> {
527+
ChangeFeedProcessorTest.log.info("START processing from thread {} using host {}", Thread.currentThread().getId(), ownerFirst);
528+
try {
529+
Thread.sleep(2 * CHANGE_FEED_PROCESSOR_TIMEOUT);
530+
} catch (InterruptedException e) {
531+
throw new RuntimeException("Interrupted exception", e);
532+
}
533+
ChangeFeedProcessorTest.log.info("END processing from thread {} using host {}", Thread.currentThread().getId(), ownerFirst);
534+
})
535+
.feedContainer(createdFeedCollection)
536+
.leaseContainer(createdLeaseCollection)
537+
.options(new ChangeFeedProcessorOptions()
538+
.setLeasePrefix(leasePrefix)
539+
.setLeaseRenewInterval(Duration.ofSeconds(1))
540+
.setLeaseAcquireInterval(Duration.ofSeconds(2))
541+
.setLeaseExpirationInterval(Duration.ofSeconds(20))
542+
.setFeedPollDelay(Duration.ofSeconds(1))
543+
)
544+
.buildChangeFeedProcessor();
545+
546+
ChangeFeedProcessor changeFeedProcessorSecond = new ChangeFeedProcessorBuilder()
547+
.hostName(ownerSecond)
548+
.handleChanges((List<JsonNode> docs) -> {
549+
ChangeFeedProcessorTest.log.info("START processing from thread {} using host {}", Thread.currentThread().getId(), ownerSecond);
550+
for (JsonNode item : docs) {
551+
processItem(item, receivedDocuments);
552+
}
553+
ChangeFeedProcessorTest.log.info("END processing from thread {} using host {}", Thread.currentThread().getId(), ownerSecond);
554+
})
555+
.feedContainer(createdFeedCollection)
556+
.leaseContainer(createdLeaseCollection)
557+
.options(new ChangeFeedProcessorOptions()
558+
.setLeaseRenewInterval(Duration.ofSeconds(10))
559+
.setLeaseAcquireInterval(Duration.ofSeconds(5))
560+
.setLeaseExpirationInterval(Duration.ofSeconds(20))
561+
.setFeedPollDelay(Duration.ofSeconds(2))
562+
.setLeasePrefix(leasePrefix)
563+
.setMaxItemCount(10)
564+
.setStartFromBeginning(true)
565+
.setMaxScaleCount(0) // unlimited
566+
)
567+
.buildChangeFeedProcessor();
568+
569+
try {
570+
ChangeFeedProcessorTest.log.info("Start creating documents");
571+
List<InternalObjectNode> docDefList = new ArrayList<>();
572+
573+
for (int i = 0; i < FEED_COUNT; i++) {
574+
docDefList.add(getDocumentDefinition());
575+
}
576+
577+
bulkInsert(createdFeedCollection, docDefList, FEED_COUNT)
578+
.last()
579+
.flatMap(cosmosItemResponse -> {
580+
ChangeFeedProcessorTest.log.info("Start first Change feed processor");
581+
return changeFeedProcessorFirst.start().subscribeOn(Schedulers.elastic())
582+
.timeout(Duration.ofMillis(2 * CHANGE_FEED_PROCESSOR_TIMEOUT));
583+
})
584+
.then(
585+
Mono.just(changeFeedProcessorFirst)
586+
.flatMap( value -> {
587+
try {
588+
Thread.sleep(2 * CHANGE_FEED_PROCESSOR_TIMEOUT);
589+
} catch (InterruptedException e) {
590+
throw new RuntimeException("Interrupted exception", e);
591+
}
592+
ChangeFeedProcessorTest.log.info("Update leases for Change feed processor in thread {} using host {}", Thread.currentThread().getId(), "Owner_first");
593+
594+
SqlParameter param1 = new SqlParameter();
595+
param1.setName("@PartitionLeasePrefix");
596+
param1.setValue(leasePrefix);
597+
SqlParameter param2 = new SqlParameter();
598+
param2.setName("@Owner");
599+
param2.setValue(ownerFirst);
600+
601+
SqlQuerySpec querySpec = new SqlQuerySpec(
602+
"SELECT * FROM c WHERE STARTSWITH(c.id, @PartitionLeasePrefix) AND c.Owner=@Owner", Arrays.asList(param1, param2));
603+
604+
CosmosQueryRequestOptions cosmosQueryRequestOptions = new CosmosQueryRequestOptions();
605+
606+
return createdLeaseCollection.queryItems(querySpec, cosmosQueryRequestOptions, InternalObjectNode.class).byPage()
607+
.flatMap(documentFeedResponse -> reactor.core.publisher.Flux.fromIterable(documentFeedResponse.getResults()))
608+
.flatMap(doc -> {
609+
ServiceItemLease leaseDocument = ServiceItemLease.fromDocument(doc);
610+
leaseDocument.setOwner(null);
611+
CosmosItemRequestOptions options = new CosmosItemRequestOptions();
612+
return createdLeaseCollection.replaceItem(doc, doc.getId(), new PartitionKey(doc.getId()), options)
613+
.map(itemResponse -> BridgeInternal.getProperties(itemResponse));
614+
})
615+
.map(ServiceItemLease::fromDocument)
616+
.map(leaseDocument -> {
617+
ChangeFeedProcessorTest.log.info("QueryItems after Change feed processor processing; setting host to '{}'", leaseDocument.getOwner());
618+
return leaseDocument;
619+
})
620+
.last()
621+
.flatMap(leaseDocument -> {
622+
ChangeFeedProcessorTest.log.info("Start creating documents");
623+
List<InternalObjectNode> docDefList1 = new ArrayList<>();
624+
625+
for (int i = 0; i < FEED_COUNT; i++) {
626+
docDefList1.add(getDocumentDefinition());
627+
}
628+
629+
return bulkInsert(createdFeedCollection, docDefList1, FEED_COUNT)
630+
.last()
631+
.delayElement(Duration.ofMillis(1000))
632+
.flatMap(cosmosItemResponse -> {
633+
ChangeFeedProcessorTest.log.info("Start second Change feed processor");
634+
return changeFeedProcessorSecond.start().subscribeOn(Schedulers.elastic())
635+
.timeout(Duration.ofMillis(2 * CHANGE_FEED_PROCESSOR_TIMEOUT));
636+
});
637+
});
638+
}))
639+
.subscribe();
640+
} catch (Exception ex) {
641+
log.error("First change feed processor did not start in the expected time", ex);
642+
throw ex;
643+
}
644+
645+
long remainingWork = 20 * CHANGE_FEED_PROCESSOR_TIMEOUT;
646+
while (remainingWork > 0 && changeFeedProcessorFirst.isStarted() && changeFeedProcessorSecond.isStarted()) {
647+
remainingWork -= 100;
648+
Thread.sleep(100);
649+
}
650+
651+
// Wait for the feed processor to receive and process the documents.
652+
waitToReceiveDocuments(receivedDocuments, 10 * CHANGE_FEED_PROCESSOR_TIMEOUT, 2 * FEED_COUNT);
653+
654+
assertThat(changeFeedProcessorSecond.isStarted()).as("Change Feed Processor instance is running").isTrue();
655+
656+
changeFeedProcessorSecond.stop().subscribeOn(Schedulers.elastic()).timeout(Duration.ofMillis(2 * CHANGE_FEED_PROCESSOR_TIMEOUT)).subscribe();
657+
changeFeedProcessorFirst.stop().subscribeOn(Schedulers.elastic()).timeout(Duration.ofMillis(2 * CHANGE_FEED_PROCESSOR_TIMEOUT)).subscribe();
658+
659+
// Wait for the feed processor to shutdown.
660+
Thread.sleep(2 * CHANGE_FEED_PROCESSOR_TIMEOUT);
661+
662+
} finally {
663+
safeDeleteCollection(createdFeedCollection);
664+
safeDeleteCollection(createdLeaseCollection);
665+
666+
// Allow some time for the collections to be deleted before exiting.
667+
Thread.sleep(500);
668+
}
669+
}
670+
510671
@Test(groups = { "simple" }, timeOut = 160 * CHANGE_FEED_PROCESSOR_TIMEOUT)
511672
public void readFeedDocumentsAfterSplit() throws InterruptedException {
512673
CosmosAsyncContainer createdFeedCollectionForSplit = createFeedCollection(FEED_COLLECTION_THROUGHPUT_FOR_SPLIT);

0 commit comments

Comments
 (0)