Skip to content

Commit 59f37b6

Browse files
authored
Handle ChangeFeedProcessorContext for full-fidelity changefeed. (Azure#36715)
* Adding preliminary changes to handle ChangeFeedProcessorContext. * Adding preliminary changes to handle ChangeFeedProcessorContext. * Fixing CI errors. * Fixing CI errors. * Adding handleChanges which accepts ChangeFeedProcessorContext. * Removed code comments. * Made handleChanges with context as package-private. * Refactorings. * Refactorings. * Adding tests. * Adding tests. * Adding tests. * Adding tests. * Adding tests. * Adding tests. * Adding tests. * Adding validations. * Adding javadoc. * Removed type parameter from ChangeFeedProcessorContext. * Updated javadoc. * Fixing compilation errors. * Fixing compilation errors. * Fixing javadoc. * Fixing javadoc. * Reverting changes. * Adding tests. * Refactorings. * Adding @beta tags. * Updated CHANGELOG.md. * Updated CHANGELOG.md. * Updated javadoc. * Updated javadoc. * Reacting to review comments. * Attempt at fixing CI error. * Attempt at fixing CI pipeline. * Attempt at fixing CI pipeline. * Updating javadoc. * Addressing review comments. * Addressing review comments. * Updated CHANGELOG.md.
1 parent a562e65 commit 59f37b6

File tree

11 files changed

+666
-32
lines changed

11 files changed

+666
-32
lines changed

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/changefeed/epkversion/FullFidelityChangeFeedProcessorTest.java

Lines changed: 413 additions & 25 deletions
Large diffs are not rendered by default.

sdk/cosmos/azure-cosmos/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
### 4.51.0-beta.1 (Unreleased)
44

55
#### Features Added
6+
* Added a preview API to `ChangeFeedProcessorBuilder` to process an additional `ChangeFeedProcessorContext` for handling all versions and deletes changes. - See [PR 36715](https://github.com/Azure/azure-sdk-for-java/pull/36715)
67

78
#### Breaking Changes
89

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/ChangeFeedProcessorBuilder.java

Lines changed: 67 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import com.fasterxml.jackson.databind.JsonNode;
1414

1515
import java.util.List;
16+
import java.util.function.BiConsumer;
1617
import java.util.function.Consumer;
1718

1819
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkArgument;
@@ -54,6 +55,24 @@
5455
* .buildChangeFeedProcessor();
5556
* </pre>
5657
* <!-- end com.azure.cosmos.allVersionsAndDeletesChangeFeedProcessor.builder -->
58+
*
59+
* Below is an example of building ChangeFeedProcessor for AllVersionsAndDeletes mode when also wishing to process a {@link ChangeFeedProcessorContext}.
60+
* <!-- src_embed com.azure.cosmos.allVersionsAndDeletesChangeFeedProcessorWithContext.builder -->
61+
* <pre>
62+
* ChangeFeedProcessor changeFeedProcessor = new ChangeFeedProcessorBuilder&#40;&#41;
63+
* .hostName&#40;hostName&#41;
64+
* .feedContainer&#40;feedContainer&#41;
65+
* .leaseContainer&#40;leaseContainer&#41;
66+
* .handleAllVersionsAndDeletesChanges&#40;&#40;docs, context&#41; -&gt; &#123;
67+
* for &#40;ChangeFeedProcessorItem item : docs&#41; &#123;
68+
* &#47;&#47; Implementation for handling and processing of each ChangeFeedProcessorItem item goes here
69+
* &#125;
70+
* String leaseToken = context.getLeaseToken&#40;&#41;;
71+
* &#47;&#47; Handling of the lease token corresponding to a batch of change feed processor item goes here
72+
* &#125;&#41;
73+
* .buildChangeFeedProcessor&#40;&#41;;
74+
* </pre>
75+
* <!-- end com.azure.cosmos.allVersionsAndDeletesChangeFeedProcessorWithContext.builder -->
5776
*/
5877
public class ChangeFeedProcessorBuilder {
5978
private String hostName;
@@ -63,6 +82,7 @@ public class ChangeFeedProcessorBuilder {
6382
private Consumer<List<JsonNode>> incrementalModeLeaseConsumerPkRangeIdVersion;
6483
private Consumer<List<ChangeFeedProcessorItem>> incrementalModeLeaseConsumerEpkVersion;
6584
private Consumer<List<ChangeFeedProcessorItem>> fullFidelityModeLeaseConsumer;
85+
private BiConsumer<List<ChangeFeedProcessorItem>, ChangeFeedProcessorContext> fullFidelityModeLeaseWithContextConsumer;
6686
private ChangeFeedMode changeFeedMode = ChangeFeedMode.INCREMENTAL;
6787
private LeaseVersion leaseVersion = LeaseVersion.PARTITION_KEY_BASED_LEASE;
6888

@@ -183,12 +203,48 @@ public ChangeFeedProcessorBuilder handleLatestVersionChanges(Consumer<List<Chang
183203
*/
184204
@Beta(value = Beta.SinceVersion.V4_37_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
185205
public ChangeFeedProcessorBuilder handleAllVersionsAndDeletesChanges(Consumer<List<ChangeFeedProcessorItem>> consumer) {
206+
checkNotNull(consumer, "consumer cannot be null");
207+
checkArgument(this.fullFidelityModeLeaseWithContextConsumer == null,
208+
"handleAllVersionsAndDeletesChanges biConsumer has already been defined.");
209+
186210
this.fullFidelityModeLeaseConsumer = consumer;
187211
this.changeFeedMode = ChangeFeedMode.FULL_FIDELITY;
188212
this.leaseVersion = LeaseVersion.EPK_RANGE_BASED_LEASE;
189213
return this;
190214
}
191215

216+
/**
217+
* Sets a {@link BiConsumer} function which will be called to process changes for AllVersionsAndDeletes change feed mode.
218+
*
219+
* <!-- src_embed com.azure.cosmos.allVersionsAndDeletesChangeFeedProcessorWithContext.handleChanges -->
220+
* <pre>
221+
* .handleAllVersionsAndDeletesChanges&#40;&#40;docs, context&#41; -&gt; &#123;
222+
* for &#40;ChangeFeedProcessorItem item : docs&#41; &#123;
223+
* &#47;&#47; Implementation for handling and processing of each ChangeFeedProcessorItem item goes here
224+
* &#125;
225+
* String leaseToken = context.getLeaseToken&#40;&#41;;
226+
* &#47;&#47; Handling of the lease token corresponding to a batch of change feed processor item goes here
227+
* &#125;&#41;
228+
* </pre>
229+
* <!-- end com.azure.cosmos.allVersionsAndDeletesChangeFeedProcessorWithContext.handleChanges -->
230+
*
231+
* @param biConsumer the {@link BiConsumer} to call for handling the feeds and the {@link ChangeFeedProcessorContext} instance.
232+
* @return current Builder.
233+
*/
234+
@Beta(value = Beta.SinceVersion.V4_51_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
235+
public ChangeFeedProcessorBuilder handleAllVersionsAndDeletesChanges(
236+
BiConsumer<List<ChangeFeedProcessorItem>, ChangeFeedProcessorContext> biConsumer) {
237+
238+
checkNotNull(biConsumer, "biConsumer cannot be null");
239+
checkArgument(this.fullFidelityModeLeaseConsumer == null,
240+
"handleAllVersionsAndDeletesChanges consumer has already been defined.");
241+
242+
this.fullFidelityModeLeaseWithContextConsumer = biConsumer;
243+
this.changeFeedMode = ChangeFeedMode.FULL_FIDELITY;
244+
this.leaseVersion = LeaseVersion.EPK_RANGE_BASED_LEASE;
245+
return this;
246+
}
247+
192248
/**
193249
* Sets the {@link ChangeFeedProcessorOptions} to be used.
194250
* Unless specifically set the default values that will be used are:
@@ -225,12 +281,21 @@ public ChangeFeedProcessor buildChangeFeedProcessor() {
225281
if (this.leaseVersion == LeaseVersion.EPK_RANGE_BASED_LEASE) {
226282
switch (this.changeFeedMode) {
227283
case FULL_FIDELITY:
228-
changeFeedProcessor = new FullFidelityChangeFeedProcessorImpl(
284+
if (this.fullFidelityModeLeaseConsumer != null) {
285+
changeFeedProcessor = new FullFidelityChangeFeedProcessorImpl(
229286
this.hostName,
230287
this.feedContainer,
231288
this.leaseContainer,
232289
this.fullFidelityModeLeaseConsumer,
233290
this.changeFeedProcessorOptions);
291+
} else if (this.fullFidelityModeLeaseWithContextConsumer != null) {
292+
changeFeedProcessor = new FullFidelityChangeFeedProcessorImpl(
293+
this.hostName,
294+
this.feedContainer,
295+
this.leaseContainer,
296+
this.fullFidelityModeLeaseWithContextConsumer,
297+
this.changeFeedProcessorOptions);
298+
}
234299
break;
235300
case INCREMENTAL:
236301
changeFeedProcessor = new com.azure.cosmos.implementation.changefeed.epkversion.IncrementalChangeFeedProcessorImpl(
@@ -268,7 +333,7 @@ private void validateChangeFeedProcessorBuilder() {
268333
}
269334

270335
private boolean isFullFidelityConsumerDefined() {
271-
return this.fullFidelityModeLeaseConsumer != null;
336+
return this.fullFidelityModeLeaseConsumer != null || this.fullFidelityModeLeaseWithContextConsumer != null;
272337
}
273338

274339
private boolean isIncrementalConsumerDefined() {
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.cosmos;
5+
6+
import com.azure.cosmos.util.Beta;
7+
8+
import java.util.function.BiConsumer;
9+
10+
/**
11+
* Encapsulates properties which are mapped to a batch of change feed documents
12+
* processed when {@link ChangeFeedProcessorBuilder#handleAllVersionsAndDeletesChanges(BiConsumer)}
13+
* lambda is invoked.
14+
* <br>
15+
* <br>
16+
* NOTE: This interface is not designed to be implemented by end users.
17+
* */
18+
@Beta(value = Beta.SinceVersion.V4_51_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
19+
public interface ChangeFeedProcessorContext {
20+
/**
21+
* Gets the lease token corresponding to the source of
22+
* a batch of change feed documents.
23+
*
24+
* @return the lease token
25+
* */
26+
@Beta(value = Beta.SinceVersion.V4_51_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
27+
String getLeaseToken();
28+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.cosmos.implementation.changefeed.common;
5+
6+
import com.azure.cosmos.ChangeFeedProcessorContext;
7+
import com.azure.cosmos.implementation.changefeed.ChangeFeedObserverContext;
8+
9+
public final class ChangeFeedProcessorContextImpl<T> implements ChangeFeedProcessorContext {
10+
11+
private final ChangeFeedObserverContext<T> changeFeedObserverContext;
12+
13+
public ChangeFeedProcessorContextImpl(ChangeFeedObserverContext<T> changeFeedObserverContext) {
14+
this.changeFeedObserverContext = changeFeedObserverContext;
15+
}
16+
17+
@Override
18+
public String getLeaseToken() {
19+
20+
if (changeFeedObserverContext == null) {
21+
throw new IllegalStateException("changeFeedObserverContext cannot be null!");
22+
}
23+
24+
return changeFeedObserverContext.getLeaseToken();
25+
}
26+
}

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/DefaultObserver.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,28 @@
55
import com.azure.cosmos.implementation.changefeed.ChangeFeedObserver;
66
import com.azure.cosmos.implementation.changefeed.ChangeFeedObserverCloseReason;
77
import com.azure.cosmos.implementation.changefeed.ChangeFeedObserverContext;
8+
import com.azure.cosmos.ChangeFeedProcessorContext;
89
import org.slf4j.Logger;
910
import org.slf4j.LoggerFactory;
1011
import reactor.core.publisher.Mono;
1112

1213
import java.util.List;
14+
import java.util.function.BiConsumer;
1315
import java.util.function.Consumer;
1416

1517
public class DefaultObserver<T> implements ChangeFeedObserver<T> {
1618
private static final Logger log = LoggerFactory.getLogger(DefaultObserver.class);
1719
private final Consumer<List<T>> consumer;
20+
private final BiConsumer<List<T>, ChangeFeedProcessorContext> biConsumer;
1821

1922
public DefaultObserver(Consumer<List<T>> consumer) {
2023
this.consumer = consumer;
24+
this.biConsumer = null;
25+
}
26+
27+
public DefaultObserver(BiConsumer<List<T>, ChangeFeedProcessorContext> biConsumer) {
28+
this.biConsumer = biConsumer;
29+
this.consumer = null;
2130
}
2231

2332
@Override
@@ -34,7 +43,13 @@ public void close(ChangeFeedObserverContext<T> context, ChangeFeedObserverCloseR
3443
public Mono<Void> processChanges(ChangeFeedObserverContext<T> context, List<T> docs) {
3544
log.info("Start processing from thread {}", Thread.currentThread().getId());
3645
try {
37-
consumer.accept(docs);
46+
47+
if (consumer != null) {
48+
consumer.accept(docs);
49+
} else if (biConsumer != null) {
50+
biConsumer.accept(docs, new ChangeFeedProcessorContextImpl<>(context));
51+
}
52+
3853
log.info("Done processing from thread {}", Thread.currentThread().getId());
3954
} catch (Exception ex) {
4055
log.warn("Unexpected exception thrown from thread {}", Thread.currentThread().getId(), ex);

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/DefaultObserverFactory.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,23 +4,38 @@
44

55
import com.azure.cosmos.implementation.changefeed.ChangeFeedObserver;
66
import com.azure.cosmos.implementation.changefeed.ChangeFeedObserverFactory;
7+
import com.azure.cosmos.ChangeFeedProcessorContext;
78
import org.slf4j.Logger;
89
import org.slf4j.LoggerFactory;
910

1011
import java.util.List;
12+
import java.util.function.BiConsumer;
1113
import java.util.function.Consumer;
1214

1315
public class DefaultObserverFactory<T> implements ChangeFeedObserverFactory<T> {
1416
private final Logger log = LoggerFactory.getLogger(DefaultObserverFactory.class);
15-
1617
private final Consumer<List<T>> consumer;
18+
private final BiConsumer<List<T>, ChangeFeedProcessorContext> biConsumer;
1719

1820
public DefaultObserverFactory(Consumer<List<T>> consumer) {
1921
this.consumer = consumer;
22+
this.biConsumer = null;
23+
}
24+
25+
public DefaultObserverFactory(BiConsumer<List<T>, ChangeFeedProcessorContext> biConsumer) {
26+
this.biConsumer = biConsumer;
27+
this.consumer = null;
2028
}
2129

2230
@Override
2331
public ChangeFeedObserver<T> createObserver() {
24-
return new DefaultObserver<>(consumer);
32+
33+
if (consumer != null) {
34+
return new DefaultObserver<>(consumer);
35+
} else if (biConsumer != null) {
36+
return new DefaultObserver<>(biConsumer);
37+
}
38+
39+
throw new IllegalStateException("Both consumer and biConsumer cannot be null");
2540
}
2641
}

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/ChangeFeedProcessorImplBase.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.azure.cosmos.implementation.changefeed.common.EqualPartitionsBalancingStrategy;
2929
import com.azure.cosmos.implementation.changefeed.common.PartitionedByIdCollectionRequestOptionsFactory;
3030
import com.azure.cosmos.implementation.changefeed.common.TraceHealthMonitor;
31+
import com.azure.cosmos.ChangeFeedProcessorContext;
3132
import com.azure.cosmos.models.ChangeFeedProcessorItem;
3233
import com.azure.cosmos.models.ChangeFeedProcessorOptions;
3334
import com.azure.cosmos.models.ChangeFeedProcessorState;
@@ -46,6 +47,7 @@
4647
import java.util.Collections;
4748
import java.util.List;
4849
import java.util.Map;
50+
import java.util.function.BiConsumer;
4951
import java.util.function.Consumer;
5052

5153
import static com.azure.cosmos.CosmosBridgeInternal.getContextClient;
@@ -75,7 +77,6 @@ public abstract class ChangeFeedProcessorImplBase<T> implements ChangeFeedProces
7577
private HealthMonitor healthMonitor;
7678
private volatile PartitionManager partitionManager;
7779

78-
7980
public ChangeFeedProcessorImplBase(
8081
String hostName,
8182
CosmosAsyncContainer feedContainer,
@@ -105,6 +106,33 @@ public ChangeFeedProcessorImplBase(
105106
this.observerFactory = new DefaultObserverFactory<>(consumer);
106107
}
107108

109+
public ChangeFeedProcessorImplBase(String hostName,
110+
CosmosAsyncContainer feedContainer,
111+
CosmosAsyncContainer leaseContainer,
112+
ChangeFeedProcessorOptions changeFeedProcessorOptions,
113+
BiConsumer<List<T>, ChangeFeedProcessorContext> biConsumer,
114+
ChangeFeedMode changeFeedMode) {
115+
checkNotNull(hostName, "Argument 'hostName' can not be null");
116+
checkNotNull(feedContainer, "Argument 'feedContainer' can not be null");
117+
checkNotNull(biConsumer, "Argument 'biConsumer' can not be null");
118+
119+
if (changeFeedProcessorOptions == null) {
120+
changeFeedProcessorOptions = new ChangeFeedProcessorOptions();
121+
}
122+
this.validateChangeFeedProcessorOptions(changeFeedProcessorOptions);
123+
this.validateLeaseContainer(leaseContainer);
124+
125+
this.hostName = hostName;
126+
this.changeFeedProcessorOptions = changeFeedProcessorOptions;
127+
this.feedContextClient = new ChangeFeedContextClientImpl(feedContainer);
128+
this.leaseContextClient = new ChangeFeedContextClientImpl(leaseContainer);
129+
this.scheduler = this.changeFeedProcessorOptions.getScheduler();
130+
this.feedContextClient.setScheduler(this.scheduler);
131+
this.leaseContextClient.setScheduler(this.scheduler);
132+
this.changeFeedMode = changeFeedMode;
133+
this.observerFactory = new DefaultObserverFactory<>(biConsumer);
134+
}
135+
108136
abstract CosmosChangeFeedRequestOptions createRequestOptionsForProcessingFromNow(FeedRange feedRange);
109137

110138
private void validateChangeFeedProcessorOptions(ChangeFeedProcessorOptions changeFeedProcessorOptions) {

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/epkversion/FullFidelityChangeFeedProcessorImpl.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@
55

66
import com.azure.cosmos.CosmosAsyncContainer;
77
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedMode;
8+
import com.azure.cosmos.ChangeFeedProcessorContext;
89
import com.azure.cosmos.models.ChangeFeedProcessorItem;
910
import com.azure.cosmos.models.ChangeFeedProcessorOptions;
1011
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
1112
import com.azure.cosmos.models.FeedRange;
1213

1314
import java.util.List;
15+
import java.util.function.BiConsumer;
1416
import java.util.function.Consumer;
1517

1618
public class FullFidelityChangeFeedProcessorImpl extends ChangeFeedProcessorImplBase<ChangeFeedProcessorItem> {
@@ -25,6 +27,16 @@ public FullFidelityChangeFeedProcessorImpl(
2527
super(hostName, feedContainer, leaseContainer, changeFeedProcessorOptions, consumer, ChangeFeedMode.FULL_FIDELITY);
2628
}
2729

30+
public FullFidelityChangeFeedProcessorImpl(
31+
String hostName,
32+
CosmosAsyncContainer feedContainer,
33+
CosmosAsyncContainer leaseContainer,
34+
BiConsumer<List<ChangeFeedProcessorItem>, ChangeFeedProcessorContext> biConsumer,
35+
ChangeFeedProcessorOptions changeFeedProcessorOptions) {
36+
37+
super(hostName, feedContainer, leaseContainer, changeFeedProcessorOptions, biConsumer, ChangeFeedMode.FULL_FIDELITY);
38+
}
39+
2840
@Override
2941
CosmosChangeFeedRequestOptions createRequestOptionsForProcessingFromNow(FeedRange feedRange) {
3042
return CosmosChangeFeedRequestOptions

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/util/Beta.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ public enum SinceVersion {
9595
/** v4.35.0 */
9696
V4_35_0,
9797
/** v4.37.0 */
98-
V4_37_0
98+
V4_37_0,
99+
/** v4.51.0 */
100+
V4_51_0
99101
}
100102
}

0 commit comments

Comments
 (0)