Skip to content

Commit 7a54f54

Browse files
Added overloads to saveAll and deleteAll APIs with both Publisher and Iterable (Azure#37532)
* Updated params of reactive bulk API to Iterable: * Added additional APIs to match saveAll and deleteAll APIs from spring framework * Removed unused imports
1 parent 4d0ecad commit 7a54f54

File tree

4 files changed

+84
-26
lines changed

4 files changed

+84
-26
lines changed

eng/code-quality-reports/src/main/resources/revapi/revapi.json

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,16 @@
109109
"new": "method <S extends T, T> reactor.core.publisher.Flux<S> com.azure.spring.data.cosmos.core.ReactiveCosmosOperations::insertAll(com.azure.spring.data.cosmos.repository.support.CosmosEntityInformation<T, ?>, reactor.core.publisher.Flux<S>)",
110110
"justification": "Spring interfaces are allowed to add methods."
111111
},
112+
{
113+
"code": "java.method.addedToInterface",
114+
"new": "method <S extends T, T> reactor.core.publisher.Mono<java.lang.Void> com.azure.spring.data.cosmos.core.ReactiveCosmosOperations::deleteEntities(com.azure.spring.data.cosmos.repository.support.CosmosEntityInformation<T, ?>, java.lang.Iterable<S>)",
115+
"justification": "Spring interfaces are allowed to add methods."
116+
},
117+
{
118+
"code": "java.method.addedToInterface",
119+
"new": "method <S extends T, T> reactor.core.publisher.Flux<S> com.azure.spring.data.cosmos.core.ReactiveCosmosOperations::insertAll(com.azure.spring.data.cosmos.repository.support.CosmosEntityInformation<T, ?>, java.lang.Iterable<S>)",
120+
"justification": "Spring interfaces are allowed to add methods."
121+
},
112122
{
113123
"regex": true,
114124
"code": "java\\.annotation\\.(added|attributeValueChanged)",

sdk/spring/azure-spring-data-cosmos/src/main/java/com/azure/spring/data/cosmos/core/ReactiveCosmosOperations.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,17 @@ Mono<CosmosContainerProperties> replaceContainerProperties(String containerName,
147147
*/
148148
<T> Mono<T> insert(String containerName, T objectToSave);
149149

150+
/**
151+
* Insert all items with bulk.
152+
*
153+
* @param entityInformation must not be {@literal null}
154+
* @param entities must not be {@literal null}
155+
* @param <T> type class of domain type
156+
* @param <S> type class of domain type
157+
* @return Flux of result
158+
*/
159+
<S extends T, T> Flux<S> insertAll(CosmosEntityInformation<T, ?> entityInformation, Iterable<S> entities);
160+
150161
/**
151162
* Insert all items with bulk.
152163
*
@@ -220,6 +231,17 @@ Mono<CosmosContainerProperties> replaceContainerProperties(String containerName,
220231
*/
221232
<T> Mono<Void> deleteEntity(String containerName, T entity);
222233

234+
/**
235+
* Delete all items with bulk.
236+
*
237+
* @param entityInformation must not be {@literal null}
238+
* @param entities must not be {@literal null}
239+
* @param <T> type class of domain type
240+
* @param <S> type class of domain type
241+
* @return void Mono
242+
*/
243+
<S extends T, T> Mono<Void> deleteEntities(CosmosEntityInformation<T, ?> entityInformation, Iterable<S> entities);
244+
223245
/**
224246
* Delete all items with bulk.
225247
*

sdk/spring/azure-spring-data-cosmos/src/main/java/com/azure/spring/data/cosmos/core/ReactiveCosmosTemplate.java

Lines changed: 47 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -473,7 +473,20 @@ public <T> Mono<T> insert(String containerName, T objectToSave) {
473473
* Insert all items with bulk.
474474
*
475475
* @param entityInformation the CosmosEntityInformation
476-
* @param entities the Iterable entities to be deleted
476+
* @param entities the Iterable entities to be inserted
477+
* @param <T> type class of domain type
478+
* @param <S> type class of domain type
479+
* @return Flux of result
480+
*/
481+
public <S extends T, T> Flux<S> insertAll(CosmosEntityInformation<T, ?> entityInformation, Iterable<S> entities) {
482+
return insertAll(entityInformation, Flux.fromIterable(entities));
483+
}
484+
485+
/**
486+
* Insert all items with bulk.
487+
*
488+
* @param entityInformation the CosmosEntityInformation
489+
* @param entities the Flux of entities to be inserted
477490
* @param <T> type class of domain type
478491
* @param <S> type class of domain type
479492
* @return Flux of result
@@ -498,19 +511,19 @@ public <S extends T, T> Flux<S> insertAll(CosmosEntityInformation<T, ?> entityIn
498511
cosmosBulkExecutionOptions.setInitialMicroBatchSize(1);
499512

500513
return (Flux<S>) this.getCosmosAsyncClient()
501-
.getDatabase(this.getDatabaseName())
502-
.getContainer(containerName)
503-
.executeBulkOperations(cosmosItemOperationsFlux, cosmosBulkExecutionOptions)
504-
.publishOn(Schedulers.parallel())
505-
.onErrorResume(throwable ->
506-
CosmosExceptionUtils.exceptionHandler("Failed to insert item(s)", throwable,
507-
this.responseDiagnosticsProcessor))
508-
.flatMap(r -> {
509-
CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor,
510-
r.getResponse().getCosmosDiagnostics(), null);
511-
JsonNode responseItem = r.getResponse().getItem(JsonNode.class);
512-
return responseItem != null ? Flux.just(toDomainObject(domainType, responseItem)) : Flux.empty();
513-
});
514+
.getDatabase(this.getDatabaseName())
515+
.getContainer(containerName)
516+
.executeBulkOperations(cosmosItemOperationsFlux, cosmosBulkExecutionOptions)
517+
.publishOn(Schedulers.parallel())
518+
.onErrorResume(throwable ->
519+
CosmosExceptionUtils.exceptionHandler("Failed to insert item(s)", throwable,
520+
this.responseDiagnosticsProcessor))
521+
.flatMap(r -> {
522+
CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor,
523+
r.getResponse().getCosmosDiagnostics(), null);
524+
JsonNode responseItem = r.getResponse().getItem(JsonNode.class);
525+
return responseItem != null ? Flux.just(toDomainObject(domainType, responseItem)) : Flux.empty();
526+
});
514527
}
515528

516529
/**
@@ -674,6 +687,19 @@ public <T> Mono<Void> deleteEntity(String containerName, T entity) {
674687
return deleteItem(originalItem, containerName, domainType).then();
675688
}
676689

690+
/**
691+
* Delete all items with bulk.
692+
*
693+
* @param entityInformation the CosmosEntityInformation
694+
* @param entities the Iterable entities to be deleted
695+
* @param <T> type class of domain type
696+
* @param <S> type class of domain type
697+
* @return void Mono
698+
*/
699+
public <S extends T, T> Mono<Void> deleteEntities(CosmosEntityInformation<T, ?> entityInformation, Iterable<S> entities) {
700+
return deleteEntities(entityInformation, Flux.fromIterable(entities));
701+
}
702+
677703
/**
678704
* Delete all items with bulk.
679705
*
@@ -704,13 +730,13 @@ public <S extends T, T> Mono<Void> deleteEntities(CosmosEntityInformation<T, ?>
704730
cosmosBulkExecutionOptions.setInitialMicroBatchSize(1);
705731

706732
return this.getCosmosAsyncClient()
707-
.getDatabase(this.getDatabaseName())
708-
.getContainer(containerName)
709-
.executeBulkOperations(cosmosItemOperationFlux, cosmosBulkExecutionOptions)
710-
.publishOn(Schedulers.parallel())
711-
.onErrorResume(throwable ->
712-
CosmosExceptionUtils.exceptionHandler("Failed to delete item(s)", throwable,
713-
this.responseDiagnosticsProcessor)).then();
733+
.getDatabase(this.getDatabaseName())
734+
.getContainer(containerName)
735+
.executeBulkOperations(cosmosItemOperationFlux, cosmosBulkExecutionOptions)
736+
.publishOn(Schedulers.parallel())
737+
.onErrorResume(throwable ->
738+
CosmosExceptionUtils.exceptionHandler("Failed to delete item(s)", throwable,
739+
this.responseDiagnosticsProcessor)).then();
714740
}
715741

716742
/**

sdk/spring/azure-spring-data-cosmos/src/main/java/com/azure/spring/data/cosmos/repository/support/SimpleReactiveCosmosRepository.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ public <S extends T> Flux<S> saveAll(Iterable<S> entities) {
130130
Assert.notNull(entities, "The given Iterable of entities must not be null!");
131131

132132
if (entityInformation.getPartitionKeyFieldName() != null) {
133-
return cosmosOperations.insertAll(this.entityInformation, Flux.fromIterable(entities));
133+
return cosmosOperations.insertAll(this.entityInformation, entities);
134134
} else {
135135
return Flux.fromIterable(entities).flatMap(this::save);
136136
}
@@ -253,7 +253,7 @@ public Mono<Void> deleteAll(Iterable<? extends T> entities) {
253253
Assert.notNull(entities, "The given Iterable of entities must not be null!");
254254

255255
if (entityInformation.getPartitionKeyFieldName() != null) {
256-
return cosmosOperations.deleteEntities(this.entityInformation, Flux.fromIterable(entities));
256+
return cosmosOperations.deleteEntities(this.entityInformation, entities);
257257
} else {
258258
return Flux.fromIterable(entities).flatMap(this::delete).then();
259259
}
@@ -267,9 +267,9 @@ public Mono<Void> deleteAll(Publisher<? extends T> entityStream) {
267267
if (entityInformation.getPartitionKeyFieldName() != null) {
268268
return cosmosOperations.deleteEntities(this.entityInformation, Flux.from(entityStream));
269269
} else {
270-
return Flux.from(entityStream)//
271-
.map(entityInformation::getRequiredId)//
272-
.flatMap(this::deleteById)//
270+
return Flux.from(entityStream)
271+
.map(entityInformation::getRequiredId)
272+
.flatMap(this::deleteById)
273273
.then();
274274
}
275275
}

0 commit comments

Comments
 (0)