Skip to content

Commit a63e68f

Browse files
CosmosPagedFlux & CosmosPagedIterable Handler (Azure#15742)
* Added functionality to provide handler to CosmosPagedFlux and CosmosPagedIterable * Updated tests with ObjectNode and other code review comments
1 parent b543bc2 commit a63e68f

File tree

6 files changed

+295
-18
lines changed

6 files changed

+295
-18
lines changed

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/util/CosmosPagedFlux.java

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,15 @@
1616
import reactor.core.publisher.Signal;
1717

1818
import java.util.concurrent.atomic.AtomicReference;
19+
import java.util.function.Consumer;
1920
import java.util.function.Function;
2021

2122
/**
2223
* Cosmos implementation of {@link ContinuablePagedFlux}.
2324
* <p>
24-
* This type is a Flux that provides the ability to operate on pages of type {@link FeedResponse}
25-
* and individual items in such pages. This type supports {@link String} type continuation tokens,
26-
* allowing for restarting from a previously-retrieved continuation token.
25+
* This type is a Flux that provides the ability to operate on pages of type {@link FeedResponse} and individual items
26+
* in such pages. This type supports {@link String} type continuation tokens, allowing for restarting from a
27+
* previously-retrieved continuation token.
2728
* <p>
2829
* For more information on the base type, refer {@link ContinuablePagedFlux}
2930
*
@@ -36,8 +37,28 @@ public final class CosmosPagedFlux<T> extends ContinuablePagedFlux<String, T, Fe
3637

3738
private final Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> optionsFluxFunction;
3839

40+
private final Consumer<FeedResponse<T>> feedResponseConsumer;
41+
3942
CosmosPagedFlux(Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> optionsFluxFunction) {
4043
this.optionsFluxFunction = optionsFluxFunction;
44+
this.feedResponseConsumer = null;
45+
}
46+
47+
CosmosPagedFlux(Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> optionsFluxFunction,
48+
Consumer<FeedResponse<T>> feedResponseConsumer) {
49+
this.optionsFluxFunction = optionsFluxFunction;
50+
this.feedResponseConsumer = feedResponseConsumer;
51+
}
52+
53+
/**
54+
* Handle for invoking "side-effects" on each FeedResponse returned by CosmosPagedFlux
55+
*
56+
* @param feedResponseConsumer handler
57+
* @return CosmosPagedFlux instance with attached handler
58+
*/
59+
@Beta(value = Beta.SinceVersion.V4_6_0)
60+
public CosmosPagedFlux<T> handle(Consumer<FeedResponse<T>> feedResponseConsumer) {
61+
return new CosmosPagedFlux<T>(this.optionsFluxFunction, feedResponseConsumer);
4162
}
4263

4364
@Override
@@ -69,9 +90,8 @@ public Flux<FeedResponse<T>> byPage(String continuationToken, int preferredPageS
6990
}
7091

7192
/**
72-
* Subscribe to consume all items of type {@code T} in the sequence respectively.
73-
* This is recommended for most common scenarios. This will seamlessly fetch next
74-
* page when required and provide with a {@link Flux} of items.
93+
* Subscribe to consume all items of type {@code T} in the sequence respectively. This is recommended for most
94+
* common scenarios. This will seamlessly fetch next page when required and provide with a {@link Flux} of items.
7595
*
7696
* @param coreSubscriber The subscriber for this {@link CosmosPagedFlux}
7797
*/
@@ -105,6 +125,11 @@ private Flux<FeedResponse<T>> byPage(CosmosPagedFluxOptions pagedFluxOptions, Co
105125
pagedFluxOptions.getTracerProvider().endSpan(parentContext.get(), Signal.error(throwable),
106126
TracerProvider.ERROR_CODE);
107127
}
128+
}).doOnNext(feedResponse -> {
129+
// If the user has passed feedResponseConsumer, then call it with each feedResponse
130+
if (feedResponseConsumer != null) {
131+
feedResponseConsumer.accept(feedResponse);
132+
}
108133
});
109134
}
110135
}

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/util/CosmosPagedIterable.java

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,16 @@
22
// Licensed under the MIT License.
33
package com.azure.cosmos.util;
44

5-
import com.azure.core.util.paging.ContinuablePagedFlux;
65
import com.azure.core.util.paging.ContinuablePagedIterable;
76
import com.azure.cosmos.models.FeedResponse;
87

8+
import java.util.function.Consumer;
9+
910
/**
1011
* Cosmos implementation of {@link com.azure.core.util.paging.ContinuablePagedIterable}.
1112
* <p>
1213
* This type is a {@link com.azure.core.util.IterableStream} that provides the ability to operate on pages of type
13-
* {@link FeedResponse}
14-
* and individual items in such pages. This type supports {@link String} type continuation tokens,
14+
* {@link FeedResponse} and individual items in such pages. This type supports {@link String} type continuation tokens,
1515
* allowing for restarting from a previously-retrieved continuation token.
1616
* <p>
1717
* For more information on the base type, refer {@link com.azure.core.util.paging.ContinuablePagedIterable}
@@ -22,12 +22,27 @@
2222
*/
2323
public final class CosmosPagedIterable<T> extends ContinuablePagedIterable<String, T, FeedResponse<T>> {
2424

25+
private final CosmosPagedFlux<T> cosmosPagedFlux;
26+
27+
/**
28+
* Creates instance given {@link CosmosPagedFlux}.
29+
*
30+
* @param cosmosPagedFlux the paged flux use as iterable
31+
*/
32+
CosmosPagedIterable(CosmosPagedFlux<T> cosmosPagedFlux) {
33+
super(cosmosPagedFlux);
34+
this.cosmosPagedFlux = cosmosPagedFlux;
35+
}
36+
2537
/**
26-
* Creates instance given {@link ContinuablePagedFlux}.
38+
* Handle for invoking "side-effects" on each FeedResponse returned by CosmosPagedIterable
2739
*
28-
* @param pagedFlux the paged flux use as iterable
40+
* @param feedResponseConsumer handler
41+
* @return CosmosPagedIterable instance with attached handler
2942
*/
30-
CosmosPagedIterable(ContinuablePagedFlux<String, T, FeedResponse<T>> pagedFlux) {
31-
super(pagedFlux);
43+
@Beta(value = Beta.SinceVersion.V4_6_0)
44+
public CosmosPagedIterable<T> handle(Consumer<FeedResponse<T>> feedResponseConsumer) {
45+
CosmosPagedFlux<T> cosmosPagedFlux = this.cosmosPagedFlux.handle(feedResponseConsumer);
46+
return new CosmosPagedIterable<>(cosmosPagedFlux);
3247
}
3348
}

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/util/UtilBridgeInternal.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33

44
package com.azure.cosmos.util;
55

6-
import com.azure.core.util.paging.ContinuablePagedFlux;
76
import com.azure.cosmos.implementation.CosmosPagedFluxOptions;
87
import com.azure.cosmos.implementation.Warning;
98
import com.azure.cosmos.models.FeedResponse;
@@ -29,7 +28,7 @@ public static <T> CosmosPagedFlux<T> createCosmosPagedFlux(Function<CosmosPagedF
2928
}
3029

3130
@Warning(value = INTERNAL_USE_ONLY_WARNING)
32-
public static <T> CosmosPagedIterable<T> createCosmosPagedIterable(ContinuablePagedFlux<String, T, FeedResponse<T>> pagedFlux) {
33-
return new CosmosPagedIterable<>(pagedFlux);
31+
public static <T> CosmosPagedIterable<T> createCosmosPagedIterable(CosmosPagedFlux<T> cosmosPagedFlux) {
32+
return new CosmosPagedIterable<>(cosmosPagedFlux);
3433
}
3534
}

sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosItemTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
package com.azure.cosmos;
88

9-
import com.azure.cosmos.implementation.Document;
9+
import com.azure.cosmos.implementation.HttpConstants;
1010
import com.azure.cosmos.implementation.InternalObjectNode;
1111
import com.azure.cosmos.models.CosmosItemRequestOptions;
1212
import com.azure.cosmos.models.CosmosItemResponse;
@@ -16,7 +16,6 @@
1616
import com.azure.cosmos.models.PartitionKey;
1717
import com.azure.cosmos.models.SqlQuerySpec;
1818
import com.azure.cosmos.rx.TestSuiteBase;
19-
import com.azure.cosmos.implementation.HttpConstants;
2019
import com.azure.cosmos.util.CosmosPagedIterable;
2120
import com.fasterxml.jackson.core.JsonProcessingException;
2221
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -166,6 +165,7 @@ public void queryItems() throws Exception{
166165

167166
CosmosPagedIterable<InternalObjectNode> feedResponseIterator1 =
168167
container.queryItems(query, cosmosQueryRequestOptions, InternalObjectNode.class);
168+
169169
// Very basic validation
170170
assertThat(feedResponseIterator1.iterator().hasNext()).isTrue();
171171

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Copyright (c) Microsoft Corporation. All rights reserved.
3+
* Licensed under the MIT License.
4+
*
5+
*/
6+
7+
package com.azure.cosmos;
8+
9+
import com.azure.cosmos.models.CosmosQueryRequestOptions;
10+
import com.azure.cosmos.rx.TestSuiteBase;
11+
import com.azure.cosmos.util.CosmosPagedFlux;
12+
import com.fasterxml.jackson.core.JsonProcessingException;
13+
import com.fasterxml.jackson.databind.ObjectMapper;
14+
import com.fasterxml.jackson.databind.node.ObjectNode;
15+
import org.testng.annotations.AfterClass;
16+
import org.testng.annotations.BeforeClass;
17+
import org.testng.annotations.Factory;
18+
import org.testng.annotations.Test;
19+
20+
import java.util.UUID;
21+
import java.util.concurrent.atomic.AtomicInteger;
22+
23+
import static org.assertj.core.api.Assertions.assertThat;
24+
25+
public class CosmosPagedFluxTest extends TestSuiteBase {
26+
27+
private static final int NUM_OF_ITEMS = 10;
28+
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
29+
30+
private CosmosAsyncClient cosmosAsyncClient;
31+
private CosmosAsyncContainer cosmosAsyncContainer;
32+
33+
@Factory(dataProvider = "clientBuildersWithDirectSession")
34+
public CosmosPagedFluxTest(CosmosClientBuilder clientBuilder) {
35+
super(clientBuilder);
36+
}
37+
38+
@BeforeClass(groups = { "simple" }, timeOut = SETUP_TIMEOUT)
39+
public void before_CosmosPagedFluxTest() throws JsonProcessingException {
40+
assertThat(this.cosmosAsyncClient).isNull();
41+
this.cosmosAsyncClient = getClientBuilder().buildAsyncClient();
42+
CosmosAsyncContainer asyncContainer = getSharedMultiPartitionCosmosContainer(this.cosmosAsyncClient);
43+
cosmosAsyncContainer =
44+
cosmosAsyncClient.getDatabase(asyncContainer.getDatabase().getId()).getContainer(asyncContainer.getId());
45+
createItems(NUM_OF_ITEMS);
46+
}
47+
48+
@AfterClass(groups = { "simple" }, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true)
49+
public void afterClass() {
50+
assertThat(this.cosmosAsyncClient).isNotNull();
51+
this.cosmosAsyncClient.close();
52+
}
53+
54+
@Test(groups = { "simple" }, timeOut = TIMEOUT)
55+
public void readAllItemsByPageWithCosmosPagedFluxHandler() throws Exception {
56+
CosmosQueryRequestOptions cosmosQueryRequestOptions = new CosmosQueryRequestOptions();
57+
58+
CosmosPagedFlux<ObjectNode> cosmosPagedFlux =
59+
cosmosAsyncContainer.readAllItems(cosmosQueryRequestOptions, ObjectNode.class);
60+
61+
AtomicInteger handleCount = new AtomicInteger();
62+
cosmosPagedFlux = cosmosPagedFlux.handle(feedResponse -> {
63+
CosmosDiagnostics cosmosDiagnostics = feedResponse.getCosmosDiagnostics();
64+
if (cosmosDiagnostics != null) {
65+
handleCount.incrementAndGet();
66+
}
67+
});
68+
69+
AtomicInteger feedResponseCount = new AtomicInteger();
70+
cosmosPagedFlux.byPage().toIterable().forEach(feedResponse -> {
71+
feedResponseCount.incrementAndGet();
72+
});
73+
74+
assertThat(handleCount.get() >= 1).isTrue();
75+
assertThat(handleCount.get()).isEqualTo(feedResponseCount.get());
76+
}
77+
78+
@Test(groups = { "simple" }, timeOut = TIMEOUT)
79+
public void readAllItemsBySubscribeWithCosmosPagedFluxHandler() throws Exception {
80+
81+
CosmosQueryRequestOptions cosmosQueryRequestOptions = new CosmosQueryRequestOptions();
82+
83+
CosmosPagedFlux<ObjectNode> cosmosPagedFlux =
84+
cosmosAsyncContainer.readAllItems(cosmosQueryRequestOptions, ObjectNode.class);
85+
86+
AtomicInteger handleCount = new AtomicInteger();
87+
cosmosPagedFlux = cosmosPagedFlux.handle(feedResponse -> {
88+
CosmosDiagnostics cosmosDiagnostics = feedResponse.getCosmosDiagnostics();
89+
if (cosmosDiagnostics != null) {
90+
handleCount.incrementAndGet();
91+
}
92+
});
93+
94+
// Drain the results of reading the items
95+
cosmosPagedFlux.toIterable().forEach(objectNode -> {});
96+
97+
assertThat(handleCount.get() >= 1).isTrue();
98+
}
99+
100+
private void createItems(int numOfItems) throws JsonProcessingException {
101+
for (int i = 0; i < numOfItems; i++) {
102+
ObjectNode properties = getDocumentDefinition(UUID.randomUUID().toString(), String.valueOf(i));
103+
cosmosAsyncContainer.createItem(properties).block();
104+
}
105+
}
106+
107+
private ObjectNode getDocumentDefinition(String documentId, String pkId) throws JsonProcessingException {
108+
109+
String json = String.format("{ "
110+
+ "\"id\": \"%s\", "
111+
+ "\"mypk\": \"%s\", "
112+
+ "\"sgmts\": [[6519456, 1471916863], [2498434, 1455671440]]"
113+
+ "}"
114+
, documentId, pkId);
115+
return
116+
OBJECT_MAPPER.readValue(json, ObjectNode.class);
117+
}
118+
119+
}

0 commit comments

Comments
 (0)