Skip to content

Commit a260dc8

Browse files
aayush3011Aayush Kataria
andauthored
PartitionKeyDelete: Adds API to delete all items in a partition key (Azure#23819)
* PK Delete changes * PK Delete changes * PK Delete changes * PK Delete changes * PK Delete changes * PK Delete changes * PK Delete changes * PK Delete changes Co-authored-by: Aayush Kataria <aayushkataria@MININT-S65P4F2.fareast.corp.microsoft.com>
1 parent fcb93d4 commit a260dc8

File tree

10 files changed

+197
-8
lines changed

10 files changed

+197
-8
lines changed

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ public class CosmosAsyncContainer {
9393
private final String readItemSpanName;
9494
private final String upsertItemSpanName;
9595
private final String deleteItemSpanName;
96+
private final String deleteAllItemsByPartitionKeySpanName;
9697
private final String replaceItemSpanName;
9798
private final String patchItemSpanName;
9899
private final String createItemSpanName;
@@ -117,6 +118,7 @@ public class CosmosAsyncContainer {
117118
this.readItemSpanName = "readItem." + this.id;
118119
this.upsertItemSpanName = "upsertItem." + this.id;
119120
this.deleteItemSpanName = "deleteItem." + this.id;
121+
this.deleteAllItemsByPartitionKeySpanName = "deleteAllItemsByPartitionKey." + this.id;
120122
this.replaceItemSpanName = "replaceItem." + this.id;
121123
this.patchItemSpanName = "patchItem." + this.id;
122124
this.createItemSpanName = "createItem." + this.id;
@@ -1473,6 +1475,28 @@ public Mono<CosmosItemResponse<Object>> deleteItem(
14731475
return withContext(context -> deleteItemInternal(itemId, null, requestOptions, context));
14741476
}
14751477

1478+
/**
1479+
* Deletes all items in the Container with the specified partitionKey value.
1480+
* Starts an asynchronous Cosmos DB background operation which deletes all items in the Container with the specified value.
1481+
* The asynchronous Cosmos DB background operation runs using a percentage of user RUs.
1482+
*
1483+
* After subscription the operation will be performed.
1484+
* The {@link Mono} upon successful completion will contain a single Cosmos item response for all the deleted items.
1485+
*
1486+
* @param partitionKey partitionKey of the item.
1487+
* @param options the request options.
1488+
* @return an {@link Mono} containing the Cosmos item resource response.
1489+
*/
1490+
@Beta(value = Beta.SinceVersion.V4_19_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
1491+
public Mono<CosmosItemResponse<Object>> deleteAllItemsByPartitionKey(PartitionKey partitionKey, CosmosItemRequestOptions options) {
1492+
if (options == null) {
1493+
options = new CosmosItemRequestOptions();
1494+
}
1495+
ModelBridgeInternal.setPartitionKey(options, partitionKey);
1496+
RequestOptions requestOptions = ModelBridgeInternal.toRequestOptions(options);
1497+
return withContext(context -> deleteAllItemsByPartitionKeyInternal(partitionKey, requestOptions, context));
1498+
}
1499+
14761500
/**
14771501
* Deletes the item.
14781502
* <p>
@@ -1654,6 +1678,31 @@ private Mono<CosmosItemResponse<Object>> deleteItemInternal(
16541678
requestOptions.getThresholdForDiagnosticsOnTracer());
16551679
}
16561680

1681+
private Mono<CosmosItemResponse<Object>> deleteAllItemsByPartitionKeyInternal(
1682+
PartitionKey partitionKey,
1683+
RequestOptions requestOptions,
1684+
Context context) {
1685+
Mono<CosmosItemResponse<Object>> responseMono = this.getDatabase()
1686+
.getDocClientWrapper()
1687+
.deleteAllDocumentsByPartitionKey(getLink(), partitionKey, requestOptions)
1688+
.map(response -> ModelBridgeInternal.createCosmosAsyncItemResponseWithObjectType(response))
1689+
.single();
1690+
return database
1691+
.getClient()
1692+
.getTracerProvider()
1693+
.traceEnabledCosmosItemResponsePublisher(
1694+
responseMono,
1695+
context,
1696+
this.deleteAllItemsByPartitionKeySpanName,
1697+
this.getId(),
1698+
database.getId(),
1699+
database.getClient(),
1700+
requestOptions.getConsistencyLevel(),
1701+
OperationType.Delete,
1702+
ResourceType.PartitionKey,
1703+
requestOptions.getThresholdForDiagnosticsOnTracer());
1704+
}
1705+
16571706
private <T> Mono<CosmosItemResponse<T>> replaceItemInternal(
16581707
Class<T> itemType,
16591708
String itemId,

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosContainer.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -632,6 +632,20 @@ public CosmosItemResponse<Object> deleteItem(String itemId, PartitionKey partiti
632632
return this.blockDeleteItemResponse(asyncContainer.deleteItem(itemId, partitionKey, options));
633633
}
634634

635+
/**
636+
* Deletes all items in the Container with the specified partitionKey value.
637+
* Starts an asynchronous Cosmos DB background operation which deletes all items in the Container with the specified value.
638+
* The asynchronous Cosmos DB background operation runs using a percentage of user RUs.
639+
*
640+
* @param partitionKey the partition key.
641+
* @param options the options.
642+
* @return the Cosmos item response
643+
*/
644+
@Beta(value = Beta.SinceVersion.V4_19_0, warningText = Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
645+
public CosmosItemResponse<Object> deleteAllItemsByPartitionKey(PartitionKey partitionKey, CosmosItemRequestOptions options) {
646+
return this.blockDeleteItemResponse(asyncContainer.deleteAllItemsByPartitionKey(partitionKey, options));
647+
}
648+
635649
/**
636650
* Deletes an item in the current container.
637651
*
@@ -646,12 +660,12 @@ public <T> CosmosItemResponse<Object> deleteItem(T item, CosmosItemRequestOption
646660

647661
/**
648662
* Executes the transactional batch.
649-
*
663+
*
650664
* @deprecated forRemoval = true, since = "4.19"
651665
* This overload will be removed. Please use one of the following APIs instead
652666
* - {@link CosmosContainer#executeCosmosBatch(CosmosBatch)}
653667
* - {@link CosmosContainer#executeCosmosBatch(CosmosBatch, CosmosBatchRequestOptions)}
654-
*
668+
*
655669
* @param transactionalBatch Batch having list of operation and partition key which will be executed by this container.
656670
*
657671
* @return A TransactionalBatchResponse which contains details of execution of the transactional batch.
@@ -724,7 +738,7 @@ public CosmosBatchResponse executeCosmosBatch(CosmosBatch cosmosBatch) {
724738
* @deprecated forRemoval = true, since = "4.19"
725739
* This overload will be removed. Please use one of the following APIs instead
726740
* - {@link CosmosContainer#executeCosmosBatch(CosmosBatch)}
727-
* - {@link CosmosContainer#executeCosmosBatch(CosmosBatch, CosmosBatchRequestOptions)}
741+
* - {@link CosmosContainer#executeCosmosBatch(CosmosBatch, CosmosBatchRequestOptions)}
728742
*
729743
* @param transactionalBatch Batch having list of operation and partition key which will be executed by this container.
730744
* @param requestOptions Options that apply specifically to batch request.

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/AsyncDocumentClient.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -627,6 +627,7 @@ Mono<ResourceResponse<Document>> upsertDocument(String collectionLink, Object do
627627
*/
628628
Mono<ResourceResponse<Document>> deleteDocument(String documentLink, InternalObjectNode internalObjectNode, RequestOptions options);
629629

630+
Mono<ResourceResponse<Document>> deleteAllDocumentsByPartitionKey(String collectionLink, PartitionKey partitionKey, RequestOptions options);
630631
/**
631632
* Reads a document
632633
* <p>

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/BaseAuthorizationTokenProvider.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ private static String getResourceSegment(ResourceType resourceType) {
4949
case Document:
5050
return Paths.DOCUMENTS_PATH_SEGMENT;
5151
case DocumentCollection:
52+
case PartitionKey:
5253
return Paths.COLLECTIONS_PATH_SEGMENT;
5354
case Offer:
5455
return Paths.OFFERS_PATH_SEGMENT;

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Paths.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ public class Paths {
1111
static final char ROOT_CHAR = '/';
1212
static final char ESCAPE_CHAR = '\\';
1313

14+
public static final String OPERATIONS_PATH_SEGMENT = "operations";
1415
public static final String DATABASES_PATH_SEGMENT = "dbs";
1516
public static final String DATABASES_ROOT = ROOT + DATABASES_PATH_SEGMENT;
1617

@@ -37,6 +38,7 @@ public class Paths {
3738
public static final String MEDIA_ROOT = ROOT + MEDIA_PATH_SEGMENT;
3839
public static final String SCHEMAS_PATH_SEGMENT = "schemas";
3940
public static final String PARTITION_KEY_RANGES_PATH_SEGMENT = "pkranges";
41+
public static final String PARTITION_KEY_DELETE_PATH_SEGMENT = "partitionkeydelete";
4042

4143
public static final String USER_DEFINED_TYPES_PATH_SEGMENT = "udts";
4244

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/PathsHelper.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@ public class PathsHelper {
2121

2222
public static String generatePath(ResourceType resourceType, RxDocumentServiceRequest request, boolean isFeed) {
2323
if (request.getIsNameBased()) {
24-
return generatePathForNameBased(resourceType, request.getResourceAddress(), isFeed);
24+
return generatePathForNameBased(resourceType, request.getResourceAddress(), isFeed, request.getOperationType());
2525
} else {
26-
return generatePath(resourceType, request.getResourceId(), isFeed);
26+
return generatePath(resourceType, request.getResourceId(), isFeed, request.getOperationType());
2727
}
2828
}
2929

@@ -63,14 +63,16 @@ public static String generatePathForNameBased(Resource resourceType, String reso
6363
throw new IllegalArgumentException(errorMessage);
6464
}
6565

66-
private static String generatePathForNameBased(ResourceType resourceType, String resourceFullName, boolean isFeed) {
66+
private static String generatePathForNameBased(ResourceType resourceType, String resourceFullName, boolean isFeed, OperationType operationType) {
6767
if (isFeed && Strings.isNullOrEmpty(resourceFullName) && resourceType != ResourceType.Database) {
6868
String errorMessage = String.format(RMResources.UnexpectedResourceType, resourceType);
6969
throw new IllegalArgumentException(errorMessage);
7070
}
7171

7272
String resourcePath = null;
73-
if (!isFeed) {
73+
if (resourceType == ResourceType.PartitionKey && operationType == OperationType.Delete) {
74+
resourcePath = resourceFullName + "/" + Paths.OPERATIONS_PATH_SEGMENT + "/" + Paths.PARTITION_KEY_DELETE_PATH_SEGMENT;
75+
} else if (!isFeed) {
7476
resourcePath = resourceFullName;
7577
} else if (resourceType == ResourceType.Database) {
7678
return Paths.DATABASES_PATH_SEGMENT;
@@ -110,6 +112,14 @@ private static String generatePathForNameBased(ResourceType resourceType, String
110112
}
111113

112114
public static String generatePath(ResourceType resourceType, String ownerOrResourceId, boolean isFeed) {
115+
if (resourceType == ResourceType.PartitionKey) {
116+
return generatePath(resourceType, ownerOrResourceId, isFeed, OperationType.Delete);
117+
} else {
118+
return generatePath(resourceType, ownerOrResourceId, isFeed, null);
119+
}
120+
}
121+
122+
private static String generatePath(ResourceType resourceType, String ownerOrResourceId, boolean isFeed, OperationType operationType) {
113123
if (isFeed && (ownerOrResourceId == null || ownerOrResourceId.isEmpty()) &&
114124
resourceType != ResourceType.Database &&
115125
resourceType != ResourceType.Offer &&
@@ -277,6 +287,12 @@ public static String generatePath(ResourceType resourceType, String ownerOrResou
277287
ResourceId clientEncryptionKeyId = ResourceId.parse(ownerOrResourceId);
278288
return Paths.DATABASES_PATH_SEGMENT + "/" + clientEncryptionKeyId.getDatabaseId().toString() + "/" +
279289
Paths.CLIENT_ENCRYPTION_KEY_PATH_SEGMENT + "/" + clientEncryptionKeyId.getClientEncryptionKeyId().toString();
290+
} else if (resourceType == ResourceType.PartitionKey && operationType == OperationType.Delete) {
291+
ResourceId documentCollectionId = ResourceId.parse(ownerOrResourceId);
292+
293+
return Paths.DATABASES_PATH_SEGMENT + "/" + documentCollectionId.getDatabaseId().toString() + "/" +
294+
Paths.COLLECTIONS_PATH_SEGMENT + "/" + documentCollectionId.getDocumentCollectionId().toString() + "/" +
295+
Paths.OPERATIONS_PATH_SEGMENT + "/" + Paths.PARTITION_KEY_DELETE_PATH_SEGMENT;
280296
}
281297

282298
String errorMessage = "invalid resource type";
@@ -783,6 +799,9 @@ private static String[] getResourcePathArray(ResourceType resourceType) {
783799
}
784800
} else if(resourceType == ResourceType.PartitionKeyRange) {
785801
segments.add(Paths.PARTITION_KEY_RANGES_PATH_SEGMENT);
802+
} else if (resourceType == ResourceType.PartitionKey) {
803+
segments.add(Paths.COLLECTIONS_PATH_SEGMENT);
804+
segments.add(Paths.OPERATIONS_PATH_SEGMENT);
786805
}
787806
} else if (resourceType != ResourceType.Database) {
788807
return null;

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ResourceType.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ public enum ResourceType {
3131
ModuleCommand("ModuleCommand", 103),
3232
Offer("Offer", 113),
3333
PartitionKeyRange("PartitionKeyRange", 125),
34+
PartitionKey("PartitionKey", 136),
3435
PartitionSetInformation("PartitionSetInformation", 114),
3536
Permission("Permission", 5),
3637
PreviousImage("PreviousImage", 128),

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -990,6 +990,18 @@ private Mono<RxDocumentServiceResponse> delete(RxDocumentServiceRequest request,
990990
});
991991
}
992992

993+
private Mono<RxDocumentServiceResponse> deleteAllItemsByPartitionKey(RxDocumentServiceRequest request, DocumentClientRetryPolicy documentClientRetryPolicy, OperationContextAndListenerTuple operationContextAndListenerTuple) {
994+
return populateHeaders(request, RequestVerb.POST)
995+
.flatMap(requestPopulated -> {
996+
RxStoreModel storeProxy = this.getStoreProxy(requestPopulated);
997+
if (documentClientRetryPolicy.getRetryContext() != null && documentClientRetryPolicy.getRetryContext().getRetryCount() > 0) {
998+
documentClientRetryPolicy.getRetryContext().updateEndTime();
999+
}
1000+
1001+
return storeProxy.processMessage(requestPopulated, operationContextAndListenerTuple);
1002+
});
1003+
}
1004+
9931005
private Mono<RxDocumentServiceResponse> read(RxDocumentServiceRequest request, DocumentClientRetryPolicy documentClientRetryPolicy) {
9941006
return populateHeaders(request, RequestVerb.GET)
9951007
.flatMap(requestPopulated -> {
@@ -1941,6 +1953,42 @@ private Mono<ResourceResponse<Document>> deleteDocumentInternal(String documentL
19411953
}
19421954
}
19431955

1956+
@Override
1957+
public Mono<ResourceResponse<Document>> deleteAllDocumentsByPartitionKey(String collectionLink, PartitionKey partitionKey, RequestOptions options) {
1958+
DocumentClientRetryPolicy requestRetryPolicy = this.resetSessionTokenRetryPolicy.getRequestPolicy();
1959+
return ObservableHelper.inlineIfPossibleAsObs(() -> deleteAllDocumentsByPartitionKeyInternal(collectionLink, options, requestRetryPolicy),
1960+
requestRetryPolicy);
1961+
}
1962+
1963+
private Mono<ResourceResponse<Document>> deleteAllDocumentsByPartitionKeyInternal(String collectionLink, RequestOptions options,
1964+
DocumentClientRetryPolicy retryPolicyInstance) {
1965+
try {
1966+
if (StringUtils.isEmpty(collectionLink)) {
1967+
throw new IllegalArgumentException("collectionLink");
1968+
}
1969+
1970+
logger.debug("Deleting all items by Partition Key. collectionLink: [{}]", collectionLink);
1971+
String path = Utils.joinPath(collectionLink, null);
1972+
Map<String, String> requestHeaders = this.getRequestHeaders(options, ResourceType.PartitionKey, OperationType.Delete);
1973+
RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this,
1974+
OperationType.Delete, ResourceType.PartitionKey, path, requestHeaders, options);
1975+
if (retryPolicyInstance != null) {
1976+
retryPolicyInstance.onBeforeSendRequest(request);
1977+
}
1978+
1979+
Mono<Utils.ValueHolder<DocumentCollection>> collectionObs = collectionCache.resolveCollectionAsync(BridgeInternal.getMetaDataDiagnosticContext(request.requestContext.cosmosDiagnostics), request);
1980+
1981+
Mono<RxDocumentServiceRequest> requestObs = addPartitionKeyInformation(request, null, null, options, collectionObs);
1982+
1983+
return requestObs.flatMap(req -> this
1984+
.deleteAllItemsByPartitionKey(req, retryPolicyInstance, getOperationContextAndListenerTuple(options))
1985+
.map(serviceResponse -> toResourceResponse(serviceResponse, Document.class)));
1986+
} catch (Exception e) {
1987+
logger.debug("Failure in deleting documents due to [{}]", e.getMessage());
1988+
return Mono.error(e);
1989+
}
1990+
}
1991+
19441992
@Override
19451993
public Mono<ResourceResponse<Document>> readDocument(String documentLink, RequestOptions options) {
19461994
DocumentClientRetryPolicy retryPolicyInstance = this.resetSessionTokenRetryPolicy.getRequestPolicy();
@@ -3919,7 +3967,8 @@ private RxStoreModel getStoreProxy(RxDocumentServiceRequest request) {
39193967
if (resourceType == ResourceType.Offer ||
39203968
resourceType == ResourceType.ClientEncryptionKey ||
39213969
resourceType.isScript() && operationType != OperationType.ExecuteJavaScript ||
3922-
resourceType == ResourceType.PartitionKeyRange) {
3970+
resourceType == ResourceType.PartitionKeyRange ||
3971+
resourceType == ResourceType.PartitionKey && operationType == OperationType.Delete) {
39233972
return this.gatewayProxy;
39243973
}
39253974

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxGatewayStoreModel.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,10 @@ private Mono<RxDocumentServiceResponse> delete(RxDocumentServiceRequest request)
109109
return this.performRequest(request, HttpMethod.DELETE);
110110
}
111111

112+
private Mono<RxDocumentServiceResponse> deleteByPartitionKey(RxDocumentServiceRequest request) {
113+
return this.performRequest(request, HttpMethod.POST);
114+
}
115+
112116
private Mono<RxDocumentServiceResponse> execute(RxDocumentServiceRequest request) {
113117
return this.performRequest(request, HttpMethod.POST);
114118
}
@@ -390,6 +394,9 @@ private Mono<RxDocumentServiceResponse> invokeAsyncInternal(RxDocumentServiceReq
390394
case Upsert:
391395
return this.upsert(request);
392396
case Delete:
397+
if (request.getResourceType() == ResourceType.PartitionKey) {
398+
return this.deleteByPartitionKey(request);
399+
}
393400
return this.delete(request);
394401
case ExecuteJavaScript:
395402
return this.execute(request);

sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosItemTest.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,52 @@ public void deleteItem() throws Exception {
155155
assertThat(deleteResponse.getStatusCode()).isEqualTo(204);
156156
}
157157

158+
@Test(groups = {"simple"}, timeOut = TIMEOUT)
159+
public void deleteAllItemsByPartitionKey() throws Exception {
160+
String pkValue1 = UUID.randomUUID().toString();
161+
String pkValue2 = UUID.randomUUID().toString();
162+
163+
// item 1
164+
ObjectNode properties1 = getDocumentDefinition(UUID.randomUUID().toString(), pkValue1);
165+
CosmosItemResponse<ObjectNode> itemResponse1 = container.createItem(properties1);
166+
167+
// item 2
168+
ObjectNode properties2 = getDocumentDefinition(UUID.randomUUID().toString(), pkValue1);
169+
CosmosItemResponse<ObjectNode> itemResponse2 = container.createItem(properties2);
170+
171+
172+
// item 3
173+
ObjectNode properties3 = getDocumentDefinition(UUID.randomUUID().toString(), pkValue2);
174+
CosmosItemResponse<ObjectNode> itemResponse3 = container.createItem(properties3);
175+
176+
177+
// delete the items with partition key pk1
178+
CosmosItemResponse<?> deleteResponse = container.deleteAllItemsByPartitionKey(
179+
new PartitionKey(pkValue1), new CosmosItemRequestOptions());
180+
181+
assertThat(deleteResponse.getStatusCode()).isEqualTo(200);
182+
CosmosItemRequestOptions cosmosItemRequestOptions = new CosmosItemRequestOptions();
183+
184+
// verify that the items with partition key pkValue1 are deleted
185+
CosmosPagedIterable<ObjectNode> feedResponseIterator1 =
186+
container.readAllItems(
187+
new PartitionKey(pkValue1),
188+
new CosmosQueryRequestOptions(),
189+
ObjectNode.class);
190+
// Very basic validation
191+
assertThat(feedResponseIterator1.iterator().hasNext()).isFalse();
192+
193+
//verify that the item with the other partition Key pkValue2 is not deleted
194+
CosmosPagedIterable<ObjectNode> feedResponseIterator2 =
195+
container.readAllItems(
196+
new PartitionKey(pkValue2),
197+
new CosmosQueryRequestOptions(),
198+
ObjectNode.class);
199+
// Very basic validation
200+
assertThat(feedResponseIterator2.iterator().hasNext()).isTrue();
201+
202+
}
203+
158204
@Test(groups = { "simple" }, timeOut = TIMEOUT)
159205
public void deleteItemUsingEntity() throws Exception {
160206
InternalObjectNode properties = getDocumentDefinition(UUID.randomUUID().toString());

0 commit comments

Comments
 (0)