Skip to content

Commit e421a6a

Browse files
aayush3011Aayush Kataria
andauthored
Changes to add bulk api to public surface area (Azure#26933)
* Changes to add bulk api to public surface area * Changes to add bulk api to public surface area * Changes to add bulk api to public surface area * Changes to add bulk api to public surface area * Changes to add bulk api to public surface area Co-authored-by: Aayush Kataria <aayushkataria@CSI8I0248DVT005.redmond.corp.microsoft.com>
1 parent 7644d9e commit e421a6a

File tree

3 files changed

+384
-21
lines changed

3 files changed

+384
-21
lines changed

sdk/cosmos/azure-cosmos-encryption/src/main/java/com/azure/cosmos/encryption/CosmosEncryptionContainer.java

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,19 @@
1818
import com.azure.cosmos.models.FeedResponse;
1919
import com.azure.cosmos.models.PartitionKey;
2020
import com.azure.cosmos.models.SqlQuerySpec;
21+
import com.azure.cosmos.models.CosmosBulkOperationResponse;
22+
import com.azure.cosmos.models.CosmosItemOperation;
23+
import com.azure.cosmos.models.CosmosBulkExecutionOptions;
24+
import com.azure.cosmos.models.CosmosPatchOperations;
25+
import com.azure.cosmos.models.CosmosPatchItemRequestOptions;
2126
import com.azure.cosmos.util.CosmosPagedFlux;
2227
import com.azure.cosmos.util.CosmosPagedIterable;
2328
import reactor.core.Exceptions;
29+
import reactor.core.publisher.Flux;
2430
import reactor.core.publisher.Mono;
2531

32+
import java.util.List;
33+
2634
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;
2735

2836
/**
@@ -359,6 +367,82 @@ public CosmosBatchResponse executeCosmosBatch(
359367
return this.blockBatchResponse(this.cosmosEncryptionAsyncContainer.executeCosmosBatch(cosmosBatch, requestOptions));
360368
}
361369

370+
/**
371+
* Run patch operations on an Item.
372+
*
373+
* @param <T> the type parameter.
374+
* @param itemId the item id.
375+
* @param partitionKey the partition key.
376+
* @param cosmosPatchOperations Represents a container having list of operations to be sequentially applied to the referred Cosmos item.
377+
* @param options the request options.
378+
* @param itemType the item type.
379+
*
380+
* @return the Cosmos item resource response with the patched item or an exception.
381+
*/
382+
public <T> CosmosItemResponse<T> patchItem(
383+
String itemId,
384+
PartitionKey partitionKey,
385+
CosmosPatchOperations cosmosPatchOperations,
386+
CosmosPatchItemRequestOptions options,
387+
Class<T> itemType) {
388+
389+
return this.blockItemResponse(this.cosmosEncryptionAsyncContainer.patchItem(itemId, partitionKey, cosmosPatchOperations, options, itemType));
390+
}
391+
392+
/**
393+
* Executes list of operations in Bulk.
394+
*
395+
* @param <TContext> The context for the bulk processing.
396+
* @param operations list of operation which will be executed by this container.
397+
*
398+
* @return An Iterable of {@link CosmosBulkOperationResponse} which contains operation and it's response or exception.
399+
* <p>
400+
* To create a operation which can be executed here, use {@link com.azure.cosmos.models.CosmosBulkOperations}. For eg.
401+
* for a upsert operation use {@link com.azure.cosmos.models.CosmosBulkOperations#getUpsertItemOperation(Object, PartitionKey)}
402+
* </p>
403+
* <p>
404+
* We can get the corresponding operation using {@link CosmosBulkOperationResponse#getOperation()} and
405+
* it's response using {@link CosmosBulkOperationResponse#getResponse()}. If the operation was executed
406+
* successfully, the value returned by {@link com.azure.cosmos.models.CosmosBulkItemResponse#isSuccessStatusCode()} will be true. To get
407+
* actual status use {@link com.azure.cosmos.models.CosmosBulkItemResponse#getStatusCode()}.
408+
* </p>
409+
* To check if the operation had any exception, use {@link CosmosBulkOperationResponse#getException()} to
410+
* get the exception.
411+
*/
412+
public <TContext> Iterable<CosmosBulkOperationResponse<TContext>> executeBulkOperations(
413+
Iterable<CosmosItemOperation> operations) {
414+
return this.blockBulkResponse(this.cosmosEncryptionAsyncContainer.executeBulkOperations(Flux.fromIterable(operations)));
415+
}
416+
417+
/**
418+
* Executes list of operations in Bulk.
419+
*
420+
* @param <TContext> The context for the bulk processing.
421+
*
422+
* @param operations list of operation which will be executed by this container.
423+
* @param bulkOptions Options that apply for this Bulk request which specifies options regarding execution like
424+
* concurrency, batching size, interval and context.
425+
*
426+
* @return An Iterable of {@link CosmosBulkOperationResponse} which contains operation and it's response or exception.
427+
* <p>
428+
* To create a operation which can be executed here, use {@link com.azure.cosmos.models.CosmosBulkOperations}. For eg.
429+
* for a upsert operation use {@link com.azure.cosmos.models.CosmosBulkOperations#getUpsertItemOperation(Object, PartitionKey)}
430+
* </p>
431+
* <p>
432+
* We can get the corresponding operation using {@link CosmosBulkOperationResponse#getOperation()} and
433+
* it's response using {@link CosmosBulkOperationResponse#getResponse()}. If the operation was executed
434+
* successfully, the value returned by {@link com.azure.cosmos.models.CosmosBulkItemResponse#isSuccessStatusCode()} will be true. To get
435+
* actual status use {@link com.azure.cosmos.models.CosmosBulkItemResponse#getStatusCode()}.
436+
* </p>
437+
* To check if the operation had any exception, use {@link CosmosBulkOperationResponse#getException()} to
438+
* get the exception.
439+
*/
440+
public <TContext> Iterable<CosmosBulkOperationResponse<TContext>> executeBulkOperations(
441+
Iterable<CosmosItemOperation> operations,
442+
CosmosBulkExecutionOptions bulkOptions) {
443+
return this.blockBulkResponse(this.cosmosEncryptionAsyncContainer.executeBulkOperations(Flux.fromIterable(operations), bulkOptions));
444+
}
445+
362446
/**
363447
* Gets the CosmosContainer
364448
*
@@ -408,4 +492,19 @@ private CosmosBatchResponse blockBatchResponse(Mono<CosmosBatchResponse> batchRe
408492
}
409493
}
410494
}
495+
496+
private <TContext> List<CosmosBulkOperationResponse<TContext>> blockBulkResponse(
497+
Flux<CosmosBulkOperationResponse<TContext>> bulkResponse) {
498+
499+
try {
500+
return bulkResponse.collectList().block();
501+
} catch (Exception ex) {
502+
final Throwable throwable = Exceptions.unwrap(ex);
503+
if (throwable instanceof CosmosException) {
504+
throw (CosmosException) throwable;
505+
} else {
506+
throw ex;
507+
}
508+
}
509+
}
411510
}

sdk/cosmos/azure-cosmos-encryption/src/test/java/com/azure/cosmos/encryption/EncryptionAsyncApiCrudTest.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import com.azure.cosmos.models.CosmosItemOperation;
2323
import com.azure.cosmos.models.CosmosBulkItemResponse;
2424
import com.azure.cosmos.models.CosmosBulkOperations;
25-
import com.azure.cosmos.models.CosmosBulkExecutionOptions;
2625
import com.azure.cosmos.models.CosmosBulkOperationResponse;
2726
import com.azure.cosmos.models.FeedResponse;
2827
import com.azure.cosmos.models.PartitionKey;
@@ -692,9 +691,8 @@ public void bulkExecution_createItem() {
692691
return CosmosBulkOperations.getCreateItemOperation(createPojo, new PartitionKey(createPojo.getMypk()));
693692
});
694693

695-
CosmosBulkExecutionOptions cosmosBulkExecutionOptions = new CosmosBulkExecutionOptions();
696694
Flux<CosmosBulkOperationResponse<EncryptionAsyncApiCrudTest>> responseFlux = this.cosmosEncryptionAsyncContainer.
697-
executeBulkOperations(cosmosItemOperationsFlux, cosmosBulkExecutionOptions);
695+
executeBulkOperations(cosmosItemOperationsFlux);
698696

699697
AtomicInteger processedDoc = new AtomicInteger(0);
700698
responseFlux
@@ -737,10 +735,9 @@ public void bulkExecution_upsertItem() {
737735
return CosmosBulkOperations.getUpsertItemOperation(createPojo, new PartitionKey(createPojo.getMypk()));
738736
});
739737

740-
CosmosBulkExecutionOptions cosmosBulkExecutionOptions = new CosmosBulkExecutionOptions();
741738

742739
Flux<CosmosBulkOperationResponse<Object>> responseFlux = this.cosmosEncryptionAsyncContainer
743-
.executeBulkOperations(cosmosItemOperationsFlux, cosmosBulkExecutionOptions);
740+
.executeBulkOperations(cosmosItemOperationsFlux);
744741

745742
AtomicInteger processedDoc = new AtomicInteger(0);
746743
responseFlux
@@ -789,10 +786,9 @@ public void bulkExecution_deleteItem() {
789786
return CosmosBulkOperations.getDeleteItemOperation(encryptionPojo.getId(), cosmosItemOperation.getPartitionKeyValue());
790787
});
791788

792-
CosmosBulkExecutionOptions cosmosBulkExecutionOptions = new CosmosBulkExecutionOptions();
793789

794790
Flux<CosmosBulkOperationResponse<Object>> responseFlux = this.cosmosEncryptionAsyncContainer
795-
.executeBulkOperations(deleteCosmosItemOperationsFlux, cosmosBulkExecutionOptions);
791+
.executeBulkOperations(deleteCosmosItemOperationsFlux);
796792

797793
AtomicInteger processedDoc = new AtomicInteger(0);
798794
responseFlux
@@ -841,10 +837,9 @@ public void bulkExecution_readItem() {
841837
return CosmosBulkOperations.getReadItemOperation(encryptionPojo.getId(), cosmosItemOperation.getPartitionKeyValue());
842838
});
843839

844-
CosmosBulkExecutionOptions cosmosBulkExecutionOptions = new CosmosBulkExecutionOptions();
845840

846841
Flux<CosmosBulkOperationResponse<Object>> responseFlux = this.cosmosEncryptionAsyncContainer
847-
.executeBulkOperations(readCosmosItemOperationsFlux, cosmosBulkExecutionOptions);
842+
.executeBulkOperations(readCosmosItemOperationsFlux);
848843

849844
AtomicInteger processedDoc = new AtomicInteger(0);
850845
responseFlux
@@ -875,10 +870,9 @@ public void bulkExecution_readItem() {
875870
}
876871

877872
private void createItemsAndVerify(List<CosmosItemOperation> cosmosItemOperations) {
878-
CosmosBulkExecutionOptions cosmosBulkExecutionOptions = new CosmosBulkExecutionOptions();
879873

880874
Flux<CosmosBulkOperationResponse<Object>> createResponseFlux = this.cosmosEncryptionAsyncContainer.
881-
executeBulkOperations(Flux.fromIterable(cosmosItemOperations), cosmosBulkExecutionOptions);
875+
executeBulkOperations(Flux.fromIterable(cosmosItemOperations));
882876

883877
HashSet<String> distinctIndex = new HashSet<>();
884878
AtomicInteger processedDoc = new AtomicInteger(0);

0 commit comments

Comments
 (0)