Skip to content

Commit c1736f9

Browse files
authored
Support for additional property types. (Azure#32559)
* Add error message for missing Encoder. * Remove unused LOCK. * Use underlying proton-j encoders to determine size. * Adding test for different types. * Update CHANGELOG * Add suppressions for rawtypes and unchecked in tests. * Add explanations for checkpoint store.
1 parent 499b156 commit c1736f9

File tree

7 files changed

+276
-77
lines changed

7 files changed

+276
-77
lines changed

sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
### Features Added
66

7+
- Added support for setting different value types in `EventData.getProperties()`. [32518](https://github.com/Azure/azure-sdk-for-java/issues/32518)
8+
79
### Breaking Changes
810

911
### Bugs Fixed

sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubMessageSerializer.java

Lines changed: 18 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@
1818
import org.apache.qpid.proton.amqp.messaging.Data;
1919
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
2020
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
21+
import org.apache.qpid.proton.codec.AMQPType;
22+
import org.apache.qpid.proton.codec.DecoderImpl;
23+
import org.apache.qpid.proton.codec.Encoder;
24+
import org.apache.qpid.proton.codec.EncoderImpl;
25+
import org.apache.qpid.proton.codec.TypeEncoding;
2126
import org.apache.qpid.proton.message.Message;
2227

2328
import java.time.Instant;
@@ -41,6 +46,7 @@
4146
* Utility class for converting {@link EventData} to {@link Message}.
4247
*/
4348
class EventHubMessageSerializer implements MessageSerializer {
49+
private static final Encoder ENCODER = new EncoderImpl(new DecoderImpl());
4450
private static final ClientLogger LOGGER = new ClientLogger(EventHubMessageSerializer.class);
4551
private static final Symbol LAST_ENQUEUED_SEQUENCE_NUMBER =
4652
Symbol.getSymbol(MANAGEMENT_RESULT_LAST_ENQUEUED_SEQUENCE_NUMBER);
@@ -348,44 +354,24 @@ private static int getPayloadSize(Message msg) {
348354
return 0;
349355
}
350356

357+
@SuppressWarnings({"unchecked", "rawtypes"})
351358
private static int sizeof(Object obj) {
352-
if (obj instanceof String) {
353-
return obj.toString().length() << 1;
354-
}
355-
356-
if (obj instanceof Symbol) {
357-
return ((Symbol) obj).length() << 1;
358-
}
359-
360-
if (obj instanceof Integer) {
361-
return Integer.BYTES;
362-
}
363-
364-
if (obj instanceof Long) {
365-
return Long.BYTES;
366-
}
367-
368-
if (obj instanceof Short) {
369-
return Short.BYTES;
370-
}
371-
372-
if (obj instanceof Character) {
373-
return Character.BYTES;
374-
}
375-
376-
if (obj instanceof Float) {
377-
return Float.BYTES;
359+
if (obj == null) {
360+
return 0;
378361
}
379362

380-
if (obj instanceof Double) {
381-
return Double.BYTES;
363+
final AMQPType amqpType = ENCODER.getType(obj);
364+
if (amqpType == null) {
365+
throw new IllegalArgumentException(String.format(Messages.ENCODING_TYPE_NOT_SUPPORTED,
366+
obj.getClass()));
382367
}
383368

384-
if (obj instanceof Date) {
385-
return 32;
369+
final TypeEncoding encoding = amqpType.getEncoding(obj);
370+
if (encoding == null) {
371+
throw new IllegalArgumentException(String.format(
372+
Messages.ENCODING_TYPE_NOT_SUPPORTED_ENCODER, obj.getClass()));
386373
}
387374

388-
throw new IllegalArgumentException(String.format(Messages.ENCODING_TYPE_NOT_SUPPORTED,
389-
obj.getClass()));
375+
return encoding.getValueSize(obj);
390376
}
391377
}

sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/Messages.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package com.azure.messaging.eventhubs;
55

66
import com.azure.core.util.CoreUtils;
7+
78
import java.util.Map;
89

910
/**
@@ -24,6 +25,11 @@ public enum Messages {
2425
*/
2526
public static final String ENCODING_TYPE_NOT_SUPPORTED = getMessage("ENCODING_TYPE_NOT_SUPPORTED");
2627

28+
/**
29+
* Encoder is not supported message.
30+
*/
31+
public static final String ENCODING_TYPE_NOT_SUPPORTED_ENCODER = getMessage("ENCODING_TYPE_NOT_SUPPORTED_ENCODER");
32+
2733
/**
2834
* Process span scope type error message.
2935
*/

sdk/eventhubs/azure-messaging-eventhubs/src/main/resources/eventhubs-messages.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,4 @@ ERROR_OCCURRED_IN_SUBSCRIBER_ERROR=Error occurred in subscriber.
1212
EXCEPTION_OCCURRED_WHILE_EMITTING=Exception occurred while emitting next received event.
1313
CLASS_NOT_A_SUPPORTED_TYPE=Class '%s' is not a supported deserializable type.
1414
ENCODING_TYPE_NOT_SUPPORTED=Encoding Type: %s is not supported.
15+
ENCODING_TYPE_NOT_SUPPORTED_ENCODER=Encoding Type: %s does not have an encoder.

sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/SampleCheckpointStore.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@
77
import com.azure.core.util.logging.ClientLogger;
88
import com.azure.messaging.eventhubs.models.Checkpoint;
99
import com.azure.messaging.eventhubs.models.PartitionOwnership;
10-
import java.util.List;
11-
import java.util.Locale;
1210
import reactor.core.publisher.Flux;
1311
import reactor.core.publisher.Mono;
1412

13+
import java.util.List;
14+
import java.util.Locale;
1515
import java.util.Map;
1616
import java.util.UUID;
1717
import java.util.concurrent.ConcurrentHashMap;
@@ -80,10 +80,18 @@ public Flux<PartitionOwnership> claimOwnership(List<PartitionOwnership> requeste
8080
firstEntry.getConsumerGroup(), OWNERSHIP);
8181

8282
return Flux.fromIterable(requestedPartitionOwnerships)
83-
.filter(partitionOwnership -> {
84-
return !partitionOwnershipMap.containsKey(partitionOwnership.getPartitionId())
85-
|| partitionOwnershipMap.get(partitionOwnership.getPartitionId()).getETag()
86-
.equals(partitionOwnership.getETag());
83+
.filter(ownershipRequest -> {
84+
final PartitionOwnership existing = partitionOwnershipMap.get(ownershipRequest.getPartitionId());
85+
86+
// There are no existing ownership records. Safe to claim.
87+
if (existing == null) {
88+
return true;
89+
}
90+
91+
// The eTag for the ownership request matches the one we have in our store. If they did not match,
92+
// it means that between the time the ownership request was being calculated and now, another thread,
93+
// process, etc. updated the blob. Consequently, we will deny this ownership request.
94+
return existing.getETag().equals(ownershipRequest.getETag());
8795
})
8896
.doOnNext(partitionOwnership ->
8997
LOGGER.atInfo()

sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/IntegrationTestBase.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,6 @@ public abstract class IntegrationTestBase extends TestBase {
7373

7474
private static final String AZURE_EVENTHUBS_FULLY_QUALIFIED_DOMAIN_NAME = "AZURE_EVENTHUBS_FULLY_QUALIFIED_DOMAIN_NAME";
7575
private static final String AZURE_EVENTHUBS_EVENT_HUB_NAME = "AZURE_EVENTHUBS_EVENT_HUB_NAME";
76-
private static final Object LOCK = new Object();
7776
private static final Configuration GLOBAL_CONFIGURATION = Configuration.getGlobalConfiguration();
7877

7978
private static Scheduler scheduler;
@@ -341,6 +340,6 @@ protected void dispose(Closeable... closeables) {
341340
}
342341

343342
private void skipIfNotRecordMode() {
344-
Assumptions.assumeTrue(getTestMode() != TestMode.PLAYBACK);
343+
Assumptions.assumeTrue(getTestMode() != TestMode.PLAYBACK, "Is not in RECORD/LIVE mode.");
345344
}
346345
}

0 commit comments

Comments
 (0)