Skip to content

Commit d49be0b

Browse files
xinlian12annie-mac
andauthored
integrateCFPWithThroughputControl (Azure#38052)
* integrate throughput control with CFP --------- Co-authored-by: annie-mac <xinlian@microsoft.com>
1 parent bed106c commit d49be0b

22 files changed

+963
-113
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.cosmos.implementation.changefeed.epkversion;
5+
6+
import com.azure.cosmos.CosmosAsyncContainer;
7+
import com.azure.cosmos.ThroughputControlGroupConfig;
8+
import com.azure.cosmos.ThroughputControlGroupConfigBuilder;
9+
import com.azure.cosmos.implementation.PartitionKeyRange;
10+
import com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient;
11+
import com.azure.cosmos.implementation.changefeed.Lease;
12+
import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl;
13+
import com.azure.cosmos.implementation.routing.PartitionKeyInternalHelper;
14+
import com.azure.cosmos.implementation.routing.Range;
15+
import com.azure.cosmos.models.PriorityLevel;
16+
import org.mockito.Mockito;
17+
import org.testng.annotations.Test;
18+
import reactor.core.publisher.Mono;
19+
20+
import java.util.Arrays;
21+
import java.util.List;
22+
import java.util.UUID;
23+
24+
import static org.assertj.core.api.Assertions.assertThat;
25+
26+
public class FeedRangeThroughputControlConfigManagerTests {
27+
28+
@Test(groups = "unit")
29+
public void getThroughputControlConfigForFeedRange() {
30+
31+
ThroughputControlGroupConfig throughputControlGroupConfig =
32+
new ThroughputControlGroupConfigBuilder()
33+
.groupName("test-" + UUID.randomUUID())
34+
.targetThroughput(100)
35+
.targetThroughputThreshold(0.1)
36+
.priorityLevel(PriorityLevel.LOW)
37+
.build();
38+
39+
ChangeFeedContextClient documentClientMock = Mockito.mock(ChangeFeedContextClient.class);
40+
CosmosAsyncContainer containerMock = Mockito.mock(CosmosAsyncContainer.class);
41+
Mockito.doReturn(containerMock).when(documentClientMock).getContainerClient();
42+
Mockito.doNothing().when(containerMock).enableLocalThroughputControlGroup(Mockito.any());
43+
List<PartitionKeyRange> pkRanges = Arrays.asList(
44+
new PartitionKeyRange("1", "AA", "DD"));
45+
Mockito.doReturn(Mono.just(pkRanges)).when(documentClientMock).getOverlappingRanges(PartitionKeyInternalHelper.FullRange, false);
46+
47+
FeedRangeThroughputControlConfigManager throughputControlConfigManager =
48+
new FeedRangeThroughputControlConfigManager(throughputControlGroupConfig,documentClientMock);
49+
50+
// refresh the throughputControlConfigManager
51+
List<Lease> allLeases = Arrays.asList(
52+
new ServiceItemLeaseV1().withFeedRange(new FeedRangeEpkImpl(new Range<>("AA", "CC", true, false))),
53+
new ServiceItemLeaseV1().withFeedRange(new FeedRangeEpkImpl(new Range<>("CC", "DD", true, false))));
54+
throughputControlConfigManager.refresh(allLeases);
55+
56+
FeedRangeEpkImpl feedRangeEpk = new FeedRangeEpkImpl(new Range<>("AA", "CC", true, false));
57+
Mockito.doReturn(Mono.just(pkRanges)).when(documentClientMock).getOverlappingRanges(feedRangeEpk.getRange(), false);
58+
59+
ThroughputControlGroupConfig pkRangeThroughputControlConfig =
60+
throughputControlConfigManager.getOrCreateThroughputControlConfigForFeedRange(feedRangeEpk).block();
61+
62+
assertThat(pkRangeThroughputControlConfig).isNotNull();
63+
String expectedGroupName = throughputControlGroupConfig.getGroupName() + "-" + feedRangeEpk;
64+
assertThat(pkRangeThroughputControlConfig.getGroupName()).isEqualTo(expectedGroupName);
65+
assertThat(pkRangeThroughputControlConfig.getTargetThroughput()).isEqualTo(throughputControlGroupConfig.getTargetThroughput()/allLeases.size());
66+
assertThat(pkRangeThroughputControlConfig.getTargetThroughputThreshold()).isEqualTo(throughputControlGroupConfig.getTargetThroughputThreshold()/allLeases.size());
67+
assertThat(pkRangeThroughputControlConfig.getPriorityLevel()).isEqualTo(throughputControlGroupConfig.getPriorityLevel());
68+
}
69+
}

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionLoadBalancerImplTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ public void run(boolean loadBalancingSucceeded) throws InterruptedException {
6262
leaseContainerMock,
6363
partitionLoadBalancingStrategyMock,
6464
Duration.ofSeconds(2),
65-
Schedulers.boundedElastic()
65+
Schedulers.boundedElastic(),
66+
null
6667
);
6768

6869
partitionLoadBalancerImpl

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionProcessorImplTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ public void partitionSplitHappenOnFirstRequest() {
6060
partitionCheckpointer,
6161
leaseMock,
6262
ChangeFeedProcessorItem.class,
63-
ChangeFeedMode.INCREMENTAL
63+
ChangeFeedMode.INCREMENTAL,
64+
null
6465
);
6566

6667
StepVerifier.create(partitionProcessor.run(new CancellationTokenSource().getToken()))

sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/changefeed/epkversion/PartitionSynchronizerImplTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public void createAllLeases() {
6969
.withFeedRange(new FeedRangeEpkImpl(new Range<>("BB", "CC", true, false)));
7070
childLease1.setId("TestLease-" + UUID.randomUUID());
7171

72-
when(feedContextClientMock.getOverlappingRanges(PartitionKeyInternalHelper.FullRange))
72+
when(feedContextClientMock.getOverlappingRanges(PartitionKeyInternalHelper.FullRange, true))
7373
.thenReturn(Mono.just(overlappingRanges));
7474
when(leaseContainerMock.getAllLeases()).thenReturn(Flux.empty());
7575
when(leaseManagerMock.createLeaseIfNotExist((FeedRangeEpkImpl) any(), any()))
@@ -128,7 +128,7 @@ public void createMissingLeases() {
128128
.withFeedRange(new FeedRangeEpkImpl(new Range<>("DD", "EE", true, false)));
129129
childLease1.setId("TestLease-" + UUID.randomUUID());
130130

131-
when(feedContextClientMock.getOverlappingRanges(PartitionKeyInternalHelper.FullRange))
131+
when(feedContextClientMock.getOverlappingRanges(PartitionKeyInternalHelper.FullRange, true))
132132
.thenReturn(Mono.just(overlappingRanges));
133133
when(leaseContainerMock.getAllLeases()).thenReturn(Flux.fromIterable(Arrays.asList(childLease1, childLease2)));
134134

@@ -181,7 +181,7 @@ public void createMissingLeasesFromPkRangeIdVersionLeases() {
181181
pkRangeIdVersionLeases.add(pkRangeIdVersionLease);
182182
}
183183

184-
when(feedContextClientMock.getOverlappingRanges(PartitionKeyInternalHelper.FullRange))
184+
when(feedContextClientMock.getOverlappingRanges(PartitionKeyInternalHelper.FullRange, true))
185185
.thenReturn(Mono.just(overlappingRanges));
186186

187187
when(leaseContainerMock.getAllLeases()).thenReturn(Flux.empty());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.cosmos.implementation.changefeed.pkversion;
5+
6+
import com.azure.cosmos.CosmosAsyncContainer;
7+
import com.azure.cosmos.ThroughputControlGroupConfig;
8+
import com.azure.cosmos.ThroughputControlGroupConfigBuilder;
9+
import com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient;
10+
import com.azure.cosmos.implementation.feedranges.FeedRangePartitionKeyRangeImpl;
11+
import com.azure.cosmos.models.FeedRange;
12+
import com.azure.cosmos.models.PriorityLevel;
13+
import org.mockito.Mockito;
14+
import org.testng.annotations.Test;
15+
16+
import java.util.UUID;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
20+
public class FeedRangeThroughputControlConfigManagerTests {
21+
22+
@Test(groups = "unit")
23+
public void getThroughputControlConfigForFeedRange() {
24+
25+
ThroughputControlGroupConfig throughputControlGroupConfig =
26+
new ThroughputControlGroupConfigBuilder()
27+
.groupName("test-" + UUID.randomUUID())
28+
.targetThroughput(100)
29+
.targetThroughputThreshold(0.1)
30+
.priorityLevel(PriorityLevel.LOW)
31+
.build();
32+
33+
ChangeFeedContextClient documentClientMock = Mockito.mock(ChangeFeedContextClient.class);
34+
CosmosAsyncContainer containerMock = Mockito.mock(CosmosAsyncContainer.class);
35+
Mockito.doReturn(containerMock).when(documentClientMock).getContainerClient();
36+
Mockito.doNothing().when(containerMock).enableLocalThroughputControlGroup(Mockito.any());
37+
38+
FeedRangeThroughputControlConfigManager throughputControlConfigManager =
39+
new FeedRangeThroughputControlConfigManager(throughputControlGroupConfig,documentClientMock);
40+
41+
FeedRange feedRange = new FeedRangePartitionKeyRangeImpl("1");
42+
ThroughputControlGroupConfig pkRangeThroughputControlConfig =
43+
throughputControlConfigManager.getThroughputControlConfigForFeedRange(feedRange);
44+
45+
assertThat(pkRangeThroughputControlConfig).isNotNull();
46+
assertThat(pkRangeThroughputControlConfig.getGroupName()).isEqualTo(throughputControlGroupConfig.getGroupName());
47+
assertThat(pkRangeThroughputControlConfig.getTargetThroughput()).isEqualTo(throughputControlGroupConfig.getTargetThroughput());
48+
assertThat(pkRangeThroughputControlConfig.getTargetThroughputThreshold()).isEqualTo(throughputControlGroupConfig.getTargetThroughputThreshold());
49+
assertThat(pkRangeThroughputControlConfig.getPriorityLevel()).isEqualTo(throughputControlGroupConfig.getPriorityLevel());
50+
}
51+
}

0 commit comments

Comments
 (0)