Skip to content

Commit 37137eb

Browse files
[Service Bus] Client validation improvements for cross-language consistency (Azure#17454)
Adding in client validation for what we envision are common failures in user-scenarios: * Passing empty arrays in to methods like receiveDeferredMessages and cancelScheduledMessages() * Attempting to complete a message that's been received through peekMessage* * Making message ID, session ID or partitionKey too long, causing somewhat cryptic backend failure * Using non-matching partitionKey and sessionIds (again, cryptic backend failure) Fixes Azure#17364
1 parent d38576e commit 37137eb

File tree

7 files changed

+226
-28
lines changed

7 files changed

+226
-28
lines changed

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusMessage.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@
5252
* @see BinaryData
5353
*/
5454
public class ServiceBusMessage {
55+
private static final int MAX_MESSAGE_ID_LENGTH = 128;
56+
private static final int MAX_PARTITION_KEY_LENGTH = 128;
57+
private static final int MAX_SESSION_ID_LENGTH = 128;
58+
5559
private final AmqpAnnotatedMessage amqpAnnotatedMessage;
5660
private final ClientLogger logger = new ClientLogger(ServiceBusMessage.class);
5761

@@ -265,8 +269,10 @@ public String getMessageId() {
265269
* @param messageId to be set.
266270
*
267271
* @return The updated {@link ServiceBusMessage}.
272+
* @throws IllegalArgumentException if {@code messageId} is too long.
268273
*/
269274
public ServiceBusMessage setMessageId(String messageId) {
275+
checkIdLength("messageId", messageId, MAX_MESSAGE_ID_LENGTH);
270276
amqpAnnotatedMessage.getProperties().setMessageId(messageId);
271277
return this;
272278
}
@@ -295,8 +301,13 @@ public String getPartitionKey() {
295301
*
296302
* @return The updated {@link ServiceBusMessage}.
297303
* @see #getPartitionKey()
304+
* @throws IllegalArgumentException if {@code partitionKey} is too long or if the {@code partitionKey}
305+
* does not match the {@code sessionId}.
298306
*/
299307
public ServiceBusMessage setPartitionKey(String partitionKey) {
308+
checkIdLength("partitionKey", partitionKey, MAX_PARTITION_KEY_LENGTH);
309+
checkPartitionKey(partitionKey);
310+
300311
amqpAnnotatedMessage.getMessageAnnotations().put(PARTITION_KEY_ANNOTATION_NAME.getValue(), partitionKey);
301312
return this;
302313
}
@@ -491,8 +502,13 @@ public String getSessionId() {
491502
* @param sessionId to be set.
492503
*
493504
* @return The updated {@link ServiceBusMessage}.
505+
* @throws IllegalArgumentException if {@code sessionId} is too long or if the {@code sessionId}
506+
* does not match the {@code partitionKey}.
494507
*/
495508
public ServiceBusMessage setSessionId(String sessionId) {
509+
checkIdLength("sessionId", sessionId, MAX_SESSION_ID_LENGTH);
510+
checkSessionId(sessionId);
511+
496512
amqpAnnotatedMessage.getProperties().setGroupId(sessionId);
497513
return this;
498514
}
@@ -532,4 +548,54 @@ private void removeValues(Map<String, Object> dataMap, AmqpMessageConstant... ke
532548
dataMap.remove(key.getValue());
533549
}
534550
}
551+
552+
/**
553+
* Checks the length of ID fields.
554+
*
555+
* Some fields within the message will cause a failure in the service without enough context information.
556+
*/
557+
private void checkIdLength(String fieldName, String value, int maxLength) {
558+
if (value != null && value.length() > maxLength) {
559+
final String message = String.format("%s cannot be longer than %d characters.", fieldName, maxLength);
560+
throw logger.logExceptionAsError(new IllegalArgumentException(message));
561+
}
562+
}
563+
564+
/**
565+
* Validates that the user can't set the partitionKey to a different value than the session ID.
566+
* (this will eventually migrate to a service-side check)
567+
*/
568+
private void checkSessionId(String proposedSessionId) {
569+
if (proposedSessionId == null) {
570+
return;
571+
}
572+
573+
if (this.getPartitionKey() != null && this.getPartitionKey().compareTo(proposedSessionId) != 0) {
574+
final String message = String.format(
575+
"sessionId:%s cannot be set to a different value than partitionKey:%s.",
576+
proposedSessionId,
577+
this.getPartitionKey());
578+
throw logger.logExceptionAsError(new IllegalArgumentException(message));
579+
}
580+
}
581+
582+
/**
583+
* Validates that the user can't set the partitionKey to a different value than the session ID.
584+
* (this will eventually migrate to a service-side check)
585+
*/
586+
private void checkPartitionKey(String proposedPartitionKey) {
587+
if (proposedPartitionKey == null) {
588+
return;
589+
}
590+
591+
if (this.getSessionId() != null && this.getSessionId().compareTo(proposedPartitionKey) != 0) {
592+
final String message = String.format(
593+
"partitionKey:%s cannot be set to a different value than sessionId:%s.",
594+
proposedPartitionKey,
595+
this.getSessionId());
596+
597+
throw logger.logExceptionAsError(new IllegalArgumentException(message));
598+
}
599+
}
600+
535601
}

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ public String getEntityPath() {
194194
* @return A {@link Mono} that completes when the Service Bus abandon operation completes.
195195
* @throws NullPointerException if {@code message} is null.
196196
* @throws UnsupportedOperationException if the receiver was opened in {@link ReceiveMode#RECEIVE_AND_DELETE}
197-
* mode.
197+
* mode or if the message was received from peekMessage.
198198
*/
199199
public Mono<Void> abandon(ServiceBusReceivedMessage message) {
200200
return updateDisposition(message, DispositionStatus.ABANDONED, null, null,
@@ -218,7 +218,7 @@ public Mono<Void> abandon(ServiceBusReceivedMessage message) {
218218
* @throws NullPointerException if {@code message} or {@code options} is null. Also if
219219
* {@code transactionContext.transactionId} is null when {@code options.transactionContext} is specified.
220220
* @throws UnsupportedOperationException if the receiver was opened in {@link ReceiveMode#RECEIVE_AND_DELETE}
221-
* mode.
221+
* mode or if the message was received from peekMessage.
222222
*/
223223
public Mono<Void> abandon(ServiceBusReceivedMessage message, AbandonOptions options) {
224224
if (Objects.isNull(options)) {
@@ -241,7 +241,7 @@ public Mono<Void> abandon(ServiceBusReceivedMessage message, AbandonOptions opti
241241
* @return A {@link Mono} that finishes when the message is completed on Service Bus.
242242
* @throws NullPointerException if {@code message} is null.
243243
* @throws UnsupportedOperationException if the receiver was opened in {@link ReceiveMode#RECEIVE_AND_DELETE}
244-
* mode.
244+
* mode or if the message was received from peekMessage.
245245
*/
246246
public Mono<Void> complete(ServiceBusReceivedMessage message) {
247247
return updateDisposition(message, DispositionStatus.COMPLETED, null, null,
@@ -262,7 +262,7 @@ public Mono<Void> complete(ServiceBusReceivedMessage message) {
262262
* @throws NullPointerException if {@code message} or {@code options} is null. Also if
263263
* {@code transactionContext.transactionId} is null when {@code options.transactionContext} is specified.
264264
* @throws UnsupportedOperationException if the receiver was opened in {@link ReceiveMode#RECEIVE_AND_DELETE}
265-
* mode.
265+
* mode or if the message was received from peekMessage.
266266
*/
267267
public Mono<Void> complete(ServiceBusReceivedMessage message, CompleteOptions options) {
268268
if (Objects.isNull(options)) {
@@ -284,7 +284,8 @@ public Mono<Void> complete(ServiceBusReceivedMessage message, CompleteOptions op
284284
*
285285
* @return A {@link Mono} that completes when the Service Bus defer operation finishes.
286286
* @throws NullPointerException if {@code message} is null.
287-
* @throws UnsupportedOperationException if the receiver was opened in {@link ReceiveMode#RECEIVE_AND_DELETE} mode.
287+
* @throws UnsupportedOperationException if the receiver was opened in {@link ReceiveMode#RECEIVE_AND_DELETE} mode
288+
* or if the message was received from peekMessage.
288289
* @see <a href="https://docs.microsoft.com/azure/service-bus-messaging/message-deferral">Message deferral</a>
289290
*/
290291
public Mono<Void> defer(ServiceBusReceivedMessage message) {
@@ -307,7 +308,7 @@ public Mono<Void> defer(ServiceBusReceivedMessage message) {
307308
* @throws NullPointerException if {@code message} or {@code options} is null. Also if
308309
* {@code transactionContext.transactionId} is null when {@code options.transactionContext} is specified.
309310
* @throws UnsupportedOperationException if the receiver was opened in {@link ReceiveMode#RECEIVE_AND_DELETE}
310-
* mode.
311+
* mode or if the message was received from peekMessage.
311312
* @see <a href="https://docs.microsoft.com/azure/service-bus-messaging/message-deferral">Message deferral</a>
312313
*/
313314
public Mono<Void> defer(ServiceBusReceivedMessage message, DeferOptions options) {
@@ -331,7 +332,7 @@ public Mono<Void> defer(ServiceBusReceivedMessage message, DeferOptions options)
331332
* @return A {@link Mono} that completes when the dead letter operation finishes.
332333
* @throws NullPointerException if {@code message} is null.
333334
* @throws UnsupportedOperationException if the receiver was opened in {@link ReceiveMode#RECEIVE_AND_DELETE}
334-
* mode.
335+
* mode or if the message was received from peekMessage.
335336
* @see <a href="https://docs.microsoft.com/azure/service-bus-messaging/service-bus-dead-letter-queues">Dead letter
336337
* queues</a>
337338
*/
@@ -353,7 +354,7 @@ public Mono<Void> deadLetter(ServiceBusReceivedMessage message) {
353354
* @throws NullPointerException if {@code message} or {@code options} is null. Also if
354355
* {@code transactionContext.transactionId} is null when {@code options.transactionContext} is specified.
355356
* @throws UnsupportedOperationException if the receiver was opened in {@link ReceiveMode#RECEIVE_AND_DELETE}
356-
* mode.
357+
* mode or if the message was received from peekMessage.
357358
* @see <a href="https://docs.microsoft.com/azure/service-bus-messaging/service-bus-dead-letter-queues">Dead letter
358359
* queues</a>
359360
*/
@@ -720,7 +721,7 @@ sessionId, getLinkName(sessionId), sequenceNumbers))
720721
* @return The new expiration time for the message.
721722
* @throws NullPointerException if {@code message} or {@code message.getLockToken()} is null.
722723
* @throws UnsupportedOperationException if the receiver was opened in {@link ReceiveMode#RECEIVE_AND_DELETE}
723-
* mode.
724+
* mode or if the message was received from peekMessage.
724725
* @throws IllegalStateException if the receiver is a session receiver.
725726
* @throws IllegalArgumentException if {@code message.getLockToken()} is an empty value.
726727
*/
@@ -735,8 +736,8 @@ public Mono<OffsetDateTime> renewMessageLock(ServiceBusReceivedMessage message)
735736
} else if (message.getLockToken().isEmpty()) {
736737
return monoError(logger, new IllegalArgumentException("'message.getLockToken()' cannot be empty."));
737738
} else if (receiverOptions.isSessionReceiver()) {
738-
return monoError(logger, new IllegalStateException(
739-
String.format("Cannot renew message lock [%s] for a session receiver.", message.getLockToken())));
739+
final String errorMessage = "Renewing message lock is an invalid operation when working with sessions.";
740+
return monoError(logger, new IllegalStateException(errorMessage));
740741
}
741742

742743
return renewMessageLock(message.getLockToken())
@@ -987,6 +988,14 @@ private Mono<Void> updateDisposition(ServiceBusReceivedMessage message, Disposit
987988
} else if (message.isSettled()) {
988989
return Mono.error(logger.logExceptionAsError(
989990
new IllegalArgumentException("The message has either been deleted or already settled.")));
991+
} else if (message.getLockToken() == null) {
992+
// message must be a peeked message (or somehow they created a message w/o a lock token)
993+
final String errorMessage = "This operation is not supported for peeked messages. "
994+
+ "Only messages received using receiveMessages() or receiveMessagesWithContext() "
995+
+ "in PEEK_LOCK mode can be settled.";
996+
return Mono.error(
997+
logger.logExceptionAsError(new UnsupportedOperationException(errorMessage))
998+
);
990999
}
9911000

9921001
final String sessionIdToUse;

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public String getEntityPath() {
8080
*
8181
* @throws NullPointerException if {@code message} is null.
8282
* @throws UnsupportedOperationException if the receiver was opened in {@link ReceiveMode#RECEIVE_AND_DELETE}
83-
* mode.
83+
* mode or if the message was received from peekMessage.
8484
*/
8585
public void abandon(ServiceBusReceivedMessage message) {
8686
asyncClient.abandon(message).block(operationTimeout);
@@ -101,7 +101,7 @@ public void abandon(ServiceBusReceivedMessage message) {
101101
* @throws NullPointerException if {@code message} or {@code options} is null. Also if
102102
* {@code transactionContext.transactionId} is null when {@code options.transactionContext} is specified.
103103
* @throws UnsupportedOperationException if the receiver was opened in {@link ReceiveMode#RECEIVE_AND_DELETE}
104-
* mode.
104+
* mode or if the message was received from peekMessage.
105105
*/
106106
public void abandon(ServiceBusReceivedMessage message, AbandonOptions options) {
107107
asyncClient.abandon(message, options).block(operationTimeout);
@@ -114,7 +114,7 @@ public void abandon(ServiceBusReceivedMessage message, AbandonOptions options) {
114114
*
115115
* @throws NullPointerException if {@code message} is null.
116116
* @throws UnsupportedOperationException if the receiver was opened in {@link ReceiveMode#RECEIVE_AND_DELETE}
117-
* mode.
117+
* mode or if the message was received from peekMessage.
118118
*/
119119
public void complete(ServiceBusReceivedMessage message) {
120120
asyncClient.complete(message).block(operationTimeout);
@@ -132,7 +132,7 @@ public void complete(ServiceBusReceivedMessage message) {
132132
* @throws NullPointerException if {@code message} or {@code options} is null. Also if
133133
* {@code transactionContext.transactionId} is null when {@code options.transactionContext} is specified.
134134
* @throws UnsupportedOperationException if the receiver was opened in {@link ReceiveMode#RECEIVE_AND_DELETE}
135-
* mode.
135+
* mode or if the message was received from peekMessage.
136136
*/
137137
public void complete(ServiceBusReceivedMessage message, CompleteOptions options) {
138138
asyncClient.complete(message, options).block(operationTimeout);
@@ -145,7 +145,7 @@ public void complete(ServiceBusReceivedMessage message, CompleteOptions options)
145145
*
146146
* @throws NullPointerException if {@code message} is null.
147147
* @throws UnsupportedOperationException if the receiver was opened in {@link ReceiveMode#RECEIVE_AND_DELETE}
148-
* mode.
148+
* mode or if the message was received from peekMessage.
149149
* @see <a href="https://docs.microsoft.com/azure/service-bus-messaging/message-deferral">Message deferral</a>
150150
*/
151151
public void defer(ServiceBusReceivedMessage message) {
@@ -166,7 +166,7 @@ public void defer(ServiceBusReceivedMessage message) {
166166
* @throws NullPointerException if {@code message} or {@code options} is null. Also if
167167
* {@code transactionContext.transactionId} is null when {@code options.transactionContext} is specified.
168168
* @throws UnsupportedOperationException if the receiver was opened in {@link ReceiveMode#RECEIVE_AND_DELETE}
169-
* mode.
169+
* mode or if the message was received from peekMessage.
170170
* @see <a href="https://docs.microsoft.com/azure/service-bus-messaging/message-deferral">Message deferral</a>
171171
*/
172172
public void defer(ServiceBusReceivedMessage message, DeferOptions options) {
@@ -180,7 +180,7 @@ public void defer(ServiceBusReceivedMessage message, DeferOptions options) {
180180
*
181181
* @throws NullPointerException if {@code message} is null.
182182
* @throws UnsupportedOperationException if the receiver was opened in {@link ReceiveMode#RECEIVE_AND_DELETE}
183-
* mode.
183+
* mode or if the message was received from peekMessage.
184184
* @see <a href="https://docs.microsoft.com/azure/service-bus-messaging/service-bus-dead-letter-queues">Dead letter
185185
* queues</a>
186186
*/
@@ -203,7 +203,7 @@ public void deadLetter(ServiceBusReceivedMessage message) {
203203
* @throws NullPointerException if {@code message} or {@code options} is null. Also if
204204
* {@code transactionContext.transactionId} is null when {@code options.transactionContext} is specified.
205205
* @throws UnsupportedOperationException if the receiver was opened in {@link ReceiveMode#RECEIVE_AND_DELETE}
206-
* mode.
206+
* mode or if the message was received from peekMessage.
207207
*/
208208
public void deadLetter(ServiceBusReceivedMessage message, DeadLetterOptions options) {
209209
asyncClient.deadLetter(message, options).block(operationTimeout);

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ManagementChannel.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -89,13 +89,18 @@ public class ManagementChannel implements ServiceBusManagementNode {
8989
*/
9090
@Override
9191
public Mono<Void> cancelScheduledMessages(Iterable<Long> sequenceNumbers, String associatedLinkName) {
92+
final List<Long> numbers = new ArrayList<>();
93+
sequenceNumbers.forEach(s -> numbers.add(s));
94+
95+
if (numbers.isEmpty()) {
96+
return Mono.empty();
97+
}
98+
9299
return isAuthorized(ManagementConstants.OPERATION_CANCEL_SCHEDULED_MESSAGE)
93100
.then(createChannel.flatMap(channel -> {
94101
final Message requestMessage = createManagementMessage(
95102
ManagementConstants.OPERATION_CANCEL_SCHEDULED_MESSAGE, associatedLinkName);
96103

97-
final List<Long> numbers = new ArrayList<>();
98-
sequenceNumbers.forEach(s -> numbers.add(s));
99104
final Long[] longs = numbers.toArray(new Long[0]);
100105
requestMessage.setBody(new AmqpValue(Collections.singletonMap(ManagementConstants.SEQUENCE_NUMBERS,
101106
longs)));
@@ -190,6 +195,16 @@ public Flux<ServiceBusReceivedMessage> peek(long fromSequenceNumber, String sess
190195
@Override
191196
public Flux<ServiceBusReceivedMessage> receiveDeferredMessages(ReceiveMode receiveMode, String sessionId,
192197
String associatedLinkName, Iterable<Long> sequenceNumbers) {
198+
if (sequenceNumbers == null) {
199+
return fluxError(logger, new NullPointerException("'sequenceNumbers' cannot be null"));
200+
}
201+
202+
final List<Long> numbers = new ArrayList<>();
203+
sequenceNumbers.forEach(s -> numbers.add(s));
204+
205+
if (numbers.isEmpty()) {
206+
return Flux.empty();
207+
}
193208

194209
return isAuthorized(ManagementConstants.OPERATION_RECEIVE_BY_SEQUENCE_NUMBER)
195210
.thenMany(createChannel.flatMap(channel -> {
@@ -199,10 +214,7 @@ public Flux<ServiceBusReceivedMessage> receiveDeferredMessages(ReceiveMode recei
199214
// set mandatory properties on AMQP message body
200215
final Map<String, Object> requestBodyMap = new HashMap<>();
201216

202-
final List<Long> numbers = new ArrayList<>();
203-
sequenceNumbers.forEach(s -> numbers.add(s));
204-
Long[] longs = numbers.toArray(new Long[0]);
205-
requestBodyMap.put(ManagementConstants.SEQUENCE_NUMBERS, longs);
217+
requestBodyMap.put(ManagementConstants.SEQUENCE_NUMBERS, numbers.toArray(new Long[0]));
206218

207219
requestBodyMap.put(ManagementConstants.RECEIVER_SETTLE_MODE,
208220
UnsignedInteger.valueOf(receiveMode == ReceiveMode.RECEIVE_AND_DELETE ? 0 : 1));
@@ -509,7 +521,6 @@ private Mono<Void> isAuthorized(String operation) {
509521
*
510522
* @param operation Management operation to perform (ie. peek, update-disposition, etc.)
511523
* @param associatedLinkName Name of the open receive link that first received the message.
512-
*
513524
* @return An AMQP message with the required headers.
514525
*/
515526
private Message createManagementMessage(String operation, String associatedLinkName) {

0 commit comments

Comments
 (0)