Skip to content

Commit 6bafa1d

Browse files
authored
Remove lock primitives from tryAddMessage() (Azure#27038)
* Change atomic to primitive types * Remove unused lock * Change LinkedList to ArrayList * Revert import changes * Update CHANGELOG * Update CHANGELOG
1 parent 2c40503 commit 6bafa1d

File tree

2 files changed

+14
-13
lines changed

2 files changed

+14
-13
lines changed

sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
### Breaking Changes
88

99
### Bugs Fixed
10+
- Removed the incorrect use of lock primitives from `ServiceBusMessageBatch.tryAddMessage()` implementation and documented that this API is not thread-safe. ([#25910](https://github.com/Azure/azure-sdk-for-java/issues/25910))
1011

1112
### Other Changes
1213

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusMessageBatch.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,13 @@
99
import com.azure.core.amqp.implementation.MessageSerializer;
1010
import com.azure.core.amqp.implementation.TracerProvider;
1111
import com.azure.core.util.logging.ClientLogger;
12+
import org.apache.qpid.proton.message.Message;
1213

1314
import java.nio.BufferOverflowException;
14-
import java.util.Collections;
15-
import java.util.LinkedList;
15+
import java.util.ArrayList;
1616
import java.util.List;
1717
import java.util.Locale;
1818
import java.util.Objects;
19-
import java.util.concurrent.atomic.AtomicInteger;
2019

2120
import static com.azure.messaging.servicebus.implementation.MessageUtils.traceMessageSpan;
2221

@@ -26,13 +25,12 @@
2625
*/
2726
public final class ServiceBusMessageBatch {
2827
private final ClientLogger logger = new ClientLogger(ServiceBusMessageBatch.class);
29-
private final Object lock = new Object();
3028
private final int maxMessageSize;
3129
private final ErrorContextProvider contextProvider;
3230
private final MessageSerializer serializer;
3331
private final List<ServiceBusMessage> serviceBusMessageList;
3432
private final byte[] eventBytes;
35-
private final AtomicInteger sizeInBytes;
33+
private int sizeInBytes;
3634
private final TracerProvider tracerProvider;
3735
private final String entityPath;
3836
private final String hostname;
@@ -42,8 +40,8 @@ public final class ServiceBusMessageBatch {
4240
this.maxMessageSize = maxMessageSize;
4341
this.contextProvider = contextProvider;
4442
this.serializer = serializer;
45-
this.serviceBusMessageList = Collections.synchronizedList(new LinkedList<>());
46-
this.sizeInBytes = new AtomicInteger((maxMessageSize / 65536) * 1024); // reserve 1KB for every 64KB
43+
this.serviceBusMessageList = new ArrayList<>();
44+
this.sizeInBytes = (maxMessageSize / 65536) * 1024; // reserve 1KB for every 64KB
4745
this.eventBytes = new byte[maxMessageSize];
4846
this.tracerProvider = tracerProvider;
4947
this.entityPath = entityPath;
@@ -74,12 +72,14 @@ public int getMaxSizeInBytes() {
7472
* @return The size of the {@link ServiceBusMessageBatch batch} in bytes.
7573
*/
7674
public int getSizeInBytes() {
77-
return this.sizeInBytes.get();
75+
return this.sizeInBytes;
7876
}
7977

8078
/**
8179
* Tries to add an {@link ServiceBusMessage message} to the batch.
8280
*
81+
* <p>This method is thread-unsafe.</p>
82+
*
8383
* @param serviceBusMessage The {@link ServiceBusMessage} to add to the batch.
8484
*
8585
* @return {@code true} if the message could be added to the batch; {@code false} if the event was too large to fit
@@ -99,9 +99,9 @@ public boolean tryAddMessage(final ServiceBusMessage serviceBusMessage) {
9999
tracerProvider)
100100
: serviceBusMessage;
101101

102-
final AtomicInteger size = new AtomicInteger();
102+
final int size;
103103
try {
104-
size.set(getSize(serviceBusMessageUpdated, serviceBusMessageList.isEmpty()));
104+
size = getSize(serviceBusMessageUpdated, serviceBusMessageList.isEmpty());
105105
} catch (BufferOverflowException exception) {
106106
final RuntimeException ex = new ServiceBusException(
107107
new AmqpException(false, AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED,
@@ -111,11 +111,11 @@ public boolean tryAddMessage(final ServiceBusMessage serviceBusMessage) {
111111
throw logger.logExceptionAsWarning(ex);
112112
}
113113

114-
if (this.sizeInBytes.addAndGet(size.get()) > this.maxMessageSize) {
115-
this.sizeInBytes.addAndGet(-1 * size.get());
114+
if (this.sizeInBytes + size > this.maxMessageSize) {
116115
return false;
117116
}
118117

118+
this.sizeInBytes += size;
119119
this.serviceBusMessageList.add(serviceBusMessageUpdated);
120120
return true;
121121
}
@@ -132,7 +132,7 @@ List<ServiceBusMessage> getMessages() {
132132
private int getSize(final ServiceBusMessage serviceBusMessage, final boolean isFirst) {
133133
Objects.requireNonNull(serviceBusMessage, "'serviceBusMessage' cannot be null.");
134134

135-
final org.apache.qpid.proton.message.Message amqpMessage = serializer.serialize(serviceBusMessage);
135+
final Message amqpMessage = serializer.serialize(serviceBusMessage);
136136
int eventSize = amqpMessage.encode(this.eventBytes, 0, maxMessageSize); // actual encoded bytes size
137137
eventSize += 16; // data section overhead
138138

0 commit comments

Comments
 (0)