Skip to content

Commit ed8deaf

Browse files
authored
[Service Bus] Refactor internal API that uses Instant to use OffDateTime (Azure#15804)
* Refactor internal API that uses Instant to use OffDateTime * Fix test code
1 parent a63e68f commit ed8deaf

18 files changed

+85
-100
lines changed

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusMessageSerializer.java

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141

4242
import java.lang.reflect.Array;
4343
import java.time.Duration;
44-
import java.time.Instant;
4544
import java.time.OffsetDateTime;
4645
import java.time.ZoneOffset;
4746
import java.util.ArrayList;
@@ -239,8 +238,8 @@ public <T> List<T> deserializeList(Message message, Class<T> clazz) {
239238
return (List<T>) deserializeListOfMessages(message);
240239
} else if (clazz == OffsetDateTime.class) {
241240
return (List<T>) deserializeListOfOffsetDateTime(message);
242-
} else if (clazz == Instant.class) {
243-
return (List<T>) deserializeListOfInstant(message);
241+
} else if (clazz == OffsetDateTime.class) {
242+
return (List<T>) deserializeListOfOffsetDateTime(message);
244243
} else if (clazz == Long.class) {
245244
return (List<T>) deserializeListOfLong(message);
246245
} else {
@@ -285,24 +284,6 @@ private List<OffsetDateTime> deserializeListOfOffsetDateTime(Message amqpMessage
285284
return Collections.emptyList();
286285
}
287286

288-
private List<Instant> deserializeListOfInstant(Message amqpMessage) {
289-
if (amqpMessage.getBody() instanceof AmqpValue) {
290-
AmqpValue amqpValue = ((AmqpValue) amqpMessage.getBody());
291-
if (amqpValue.getValue() instanceof Map) {
292-
@SuppressWarnings("unchecked")
293-
Map<String, Object> responseBody = (Map<String, Object>) amqpValue.getValue();
294-
Object expirationListObj = responseBody.get(ManagementConstants.EXPIRATIONS);
295-
296-
if (expirationListObj instanceof Date[]) {
297-
return Arrays.stream((Date[]) expirationListObj)
298-
.map(Date::toInstant)
299-
.collect(Collectors.toList());
300-
}
301-
}
302-
}
303-
return Collections.emptyList();
304-
}
305-
306287
@SuppressWarnings("rawtypes")
307288
private List<ServiceBusReceivedMessage> deserializeListOfMessages(Message amqpMessage) {
308289
final List<ServiceBusReceivedMessage> messageList = new ArrayList<>();

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,7 @@
2828
import reactor.core.publisher.Mono;
2929

3030
import java.time.Duration;
31-
import java.time.Instant;
3231
import java.time.OffsetDateTime;
33-
import java.time.ZoneOffset;
3432
import java.util.Collections;
3533
import java.util.Map;
3634
import java.util.Objects;
@@ -693,8 +691,8 @@ sessionId, getLinkName(sessionId), Collections.singleton(sequenceNumber)).last()
693691
}
694692
if (receiverOptions.getReceiveMode() == ReceiveMode.PEEK_LOCK) {
695693
receivedMessage.setLockedUntil(managementNodeLocks.addOrUpdate(receivedMessage.getLockToken(),
696-
receivedMessage.getLockedUntil().toInstant(),
697-
receivedMessage.getLockedUntil()).atOffset(ZoneOffset.UTC));
694+
receivedMessage.getLockedUntil(),
695+
receivedMessage.getLockedUntil()));
698696
}
699697

700698
return receivedMessage;
@@ -739,8 +737,8 @@ sessionId, getLinkName(sessionId), sequenceNumbers))
739737
}
740738
if (receiverOptions.getReceiveMode() == ReceiveMode.PEEK_LOCK) {
741739
receivedMessage.setLockedUntil(managementNodeLocks.addOrUpdate(receivedMessage.getLockToken(),
742-
receivedMessage.getLockedUntil().toInstant(),
743-
receivedMessage.getLockedUntil()).atOffset(ZoneOffset.UTC));
740+
receivedMessage.getLockedUntil(),
741+
receivedMessage.getLockedUntil()));
744742
}
745743

746744
return receivedMessage;
@@ -782,8 +780,8 @@ public Mono<OffsetDateTime> renewMessageLock(ServiceBusReceivedMessage message)
782780
.flatMap(connection -> connection.getManagementNode(entityPath, entityType))
783781
.flatMap(serviceBusManagementNode ->
784782
serviceBusManagementNode.renewMessageLock(message.getLockToken(), getLinkName(null)))
785-
.map(instant -> managementNodeLocks.addOrUpdate(message.getLockToken(), instant,
786-
instant.atOffset(ZoneOffset.UTC)).atOffset(ZoneOffset.UTC));
783+
.map(offsetDateTime -> managementNodeLocks.addOrUpdate(message.getLockToken(), offsetDateTime,
784+
offsetDateTime));
787785
}
788786

789787
/**
@@ -819,7 +817,8 @@ public Mono<Void> renewMessageLock(ServiceBusReceivedMessage message, Duration m
819817

820818
final LockRenewalOperation operation = new LockRenewalOperation(message.getLockToken(),
821819
maxLockRenewalDuration, false, ignored -> renewMessageLock(message));
822-
renewalContainer.addOrUpdate(message.getLockToken(), Instant.now().plus(maxLockRenewalDuration), operation);
820+
renewalContainer.addOrUpdate(message.getLockToken(), OffsetDateTime.now().plus(maxLockRenewalDuration),
821+
operation);
823822

824823
return operation.getCompletionOperation();
825824
}
@@ -846,8 +845,7 @@ public Mono<OffsetDateTime> renewSessionLock(String sessionId) {
846845

847846
return connectionProcessor
848847
.flatMap(connection -> connection.getManagementNode(entityPath, entityType))
849-
.flatMap(channel -> channel.renewSessionLock(sessionId, linkName)
850-
.map(instant -> instant.atOffset(ZoneOffset.UTC)));
848+
.flatMap(channel -> channel.renewSessionLock(sessionId, linkName));
851849
}
852850

853851
/**
@@ -882,7 +880,7 @@ public Mono<Void> renewSessionLock(String sessionId, Duration maxLockRenewalDura
882880
final LockRenewalOperation operation = new LockRenewalOperation(sessionId, maxLockRenewalDuration, true,
883881
this::renewSessionLock);
884882

885-
renewalContainer.addOrUpdate(sessionId, Instant.now().plus(maxLockRenewalDuration), operation);
883+
renewalContainer.addOrUpdate(sessionId, OffsetDateTime.now().plus(maxLockRenewalDuration), operation);
886884
return operation.getCompletionOperation();
887885
}
888886

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/UnnamedSessionManager.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828

2929
import java.time.Duration;
3030
import java.time.OffsetDateTime;
31-
import java.time.ZoneOffset;
3231
import java.util.Collections;
3332
import java.util.Deque;
3433
import java.util.List;
@@ -167,12 +166,12 @@ Mono<OffsetDateTime> renewSessionLock(String sessionId) {
167166
final UnnamedSessionReceiver receiver = sessionReceivers.get(sessionId);
168167
final String associatedLinkName = receiver != null ? receiver.getLinkName() : null;
169168

170-
return channel.renewSessionLock(sessionId, associatedLinkName).handle((instant, sink) -> {
169+
return channel.renewSessionLock(sessionId, associatedLinkName).handle((offsetDateTime, sink) -> {
171170
if (receiver != null) {
172-
receiver.setSessionLockedUntil(instant.atOffset(ZoneOffset.UTC));
171+
receiver.setSessionLockedUntil(offsetDateTime);
173172
}
174173

175-
sink.next(instant.atOffset(ZoneOffset.UTC));
174+
sink.next(offsetDateTime);
176175
});
177176
}));
178177
}

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/UnnamedSessionReceiver.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
import java.time.Duration;
2323
import java.time.OffsetDateTime;
24-
import java.time.ZoneOffset;
2524
import java.util.concurrent.atomic.AtomicBoolean;
2625
import java.util.concurrent.atomic.AtomicReference;
2726
import java.util.function.Function;
@@ -78,7 +77,7 @@ class UnnamedSessionReceiver implements AutoCloseable {
7877

7978
//TODO (conniey): For session receivers, do they have a message lock token?
8079
if (!CoreUtils.isNullOrEmpty(deserialized.getLockToken()) && deserialized.getLockedUntil() != null) {
81-
lockContainer.addOrUpdate(deserialized.getLockToken(), deserialized.getLockedUntil().toInstant(),
80+
lockContainer.addOrUpdate(deserialized.getLockToken(), deserialized.getLockedUntil(),
8281
deserialized.getLockedUntil());
8382
} else {
8483
logger.info("sessionId[{}] message[{}]. There is no lock token.",
@@ -126,13 +125,13 @@ class UnnamedSessionReceiver implements AutoCloseable {
126125
}
127126
}));
128127
this.subscriptions.add(receiveLink.getSessionLockedUntil().subscribe(lockedUntil -> {
129-
if (!sessionLockedUntil.compareAndSet(null, lockedUntil.atOffset(ZoneOffset.UTC))) {
128+
if (!sessionLockedUntil.compareAndSet(null, lockedUntil)) {
130129
logger.info("SessionLockedUntil was already set: {}", sessionLockedUntil);
131130
return;
132131
}
133132

134133
this.renewalOperation.compareAndSet(null, new LockRenewalOperation(sessionId.get(),
135-
Duration.ZERO, true, renewSessionLock, lockedUntil.atOffset(ZoneOffset.UTC)));
134+
Duration.ZERO, true, renewSessionLock, lockedUntil));
136135
}));
137136
}
138137

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/LockContainer.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import reactor.core.publisher.Flux;
99

1010
import java.time.Duration;
11-
import java.time.Instant;
11+
import java.time.OffsetDateTime;
1212
import java.util.List;
1313
import java.util.Map;
1414
import java.util.Objects;
@@ -22,7 +22,7 @@
2222
*/
2323
public class LockContainer<T> implements AutoCloseable {
2424
private final ClientLogger logger = new ClientLogger(LockContainer.class);
25-
private final ConcurrentHashMap<String, Instant> lockTokenExpirationMap = new ConcurrentHashMap<>();
25+
private final ConcurrentHashMap<String, OffsetDateTime> lockTokenExpirationMap = new ConcurrentHashMap<>();
2626
private final ConcurrentHashMap<String, T> lockTokenItemMap = new ConcurrentHashMap<>();
2727
private final AtomicBoolean isDisposed = new AtomicBoolean();
2828
private final Disposable cleanupOperation;
@@ -42,7 +42,7 @@ public LockContainer(Duration cleanupInterval, Consumer<T> onExpired) {
4242
return;
4343
}
4444

45-
final Instant now = Instant.now();
45+
final OffsetDateTime now = OffsetDateTime.now();
4646
final List<String> expired = lockTokenExpirationMap.entrySet().stream()
4747
.filter(entry -> entry.getValue() != null && entry.getValue().isBefore(now))
4848
.map(Map.Entry::getKey)
@@ -63,7 +63,7 @@ public LockContainer(Duration cleanupInterval, Consumer<T> onExpired) {
6363
* @return The updated value in the container. If the expiration time in the container is larger than {@code
6464
* lockTokenExpiration}, then the current container value is used.
6565
*/
66-
public Instant addOrUpdate(String lockToken, Instant lockTokenExpiration, T item) {
66+
public OffsetDateTime addOrUpdate(String lockToken, OffsetDateTime lockTokenExpiration, T item) {
6767
if (isDisposed.get()) {
6868
throw logger.logExceptionAsError(new IllegalStateException("Cannot perform operations on a disposed set."));
6969
}
@@ -73,7 +73,7 @@ public Instant addOrUpdate(String lockToken, Instant lockTokenExpiration, T item
7373
Objects.requireNonNull(lockTokenExpiration, "'lockTokenExpiration' cannot be null.");
7474

7575

76-
final Instant computed = lockTokenExpirationMap.compute(lockToken, (key, existing) -> {
76+
final OffsetDateTime computed = lockTokenExpirationMap.compute(lockToken, (key, existing) -> {
7777
if (existing == null) {
7878
return lockTokenExpiration;
7979
} else {
@@ -100,8 +100,8 @@ public boolean containsUnexpired(String lockToken) {
100100
throw logger.logExceptionAsError(new IllegalStateException("Cannot perform operations on a disposed set."));
101101
}
102102

103-
final Instant value = lockTokenExpirationMap.getOrDefault(lockToken, Instant.MIN);
104-
return value.isAfter(Instant.now());
103+
final OffsetDateTime value = lockTokenExpirationMap.getOrDefault(lockToken, OffsetDateTime.MIN);
104+
return value.isAfter(OffsetDateTime.now());
105105
}
106106

107107
/**

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ManagementChannel.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@
3434

3535
import java.nio.BufferOverflowException;
3636
import java.time.Duration;
37-
import java.time.Instant;
3837
import java.time.OffsetDateTime;
38+
import java.time.ZoneOffset;
3939
import java.util.ArrayList;
4040
import java.util.Arrays;
4141
import java.util.Collection;
@@ -222,7 +222,7 @@ public Flux<ServiceBusReceivedMessage> receiveDeferredMessages(ReceiveMode recei
222222
* {@inheritDoc}
223223
*/
224224
@Override
225-
public Mono<Instant> renewMessageLock(String lockToken, String associatedLinkName) {
225+
public Mono<OffsetDateTime> renewMessageLock(String lockToken, String associatedLinkName) {
226226
return isAuthorized(OPERATION_PEEK).then(createChannel.flatMap(channel -> {
227227
final Message requestMessage = createManagementMessage(ManagementConstants.OPERATION_RENEW_LOCK,
228228
associatedLinkName);
@@ -232,7 +232,8 @@ public Mono<Instant> renewMessageLock(String lockToken, String associatedLinkNam
232232

233233
return sendWithVerify(channel, requestMessage, null);
234234
}).map(responseMessage -> {
235-
final List<Instant> renewTimeList = messageSerializer.deserializeList(responseMessage, Instant.class);
235+
final List<OffsetDateTime> renewTimeList = messageSerializer.deserializeList(responseMessage,
236+
OffsetDateTime.class);
236237
if (CoreUtils.isNullOrEmpty(renewTimeList)) {
237238
throw logger.logExceptionAsError(Exceptions.propagate(new AmqpException(false, String.format(
238239
"Service bus response empty. Could not renew message with lock token: '%s'.", lockToken),
@@ -244,7 +245,7 @@ public Mono<Instant> renewMessageLock(String lockToken, String associatedLinkNam
244245
}
245246

246247
@Override
247-
public Mono<Instant> renewSessionLock(String sessionId, String associatedLinkName) {
248+
public Mono<OffsetDateTime> renewSessionLock(String sessionId, String associatedLinkName) {
248249
if (sessionId == null) {
249250
return monoError(logger, new NullPointerException("'sessionId' cannot be null."));
250251
} else if (sessionId.isEmpty()) {
@@ -277,8 +278,7 @@ public Mono<Instant> renewSessionLock(String sessionId, String associatedLinkNam
277278
"Expiration is not of type Date when renewing session. Id: %s. Value: %s", sessionId,
278279
expirationValue), getErrorContext())));
279280
}
280-
281-
return ((Date) expirationValue).toInstant();
281+
return ((Date) expirationValue).toInstant().atOffset(ZoneOffset.UTC);
282282
});
283283
}
284284

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/MessageManagementOperations.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import org.apache.qpid.proton.amqp.transport.DeliveryState;
88
import reactor.core.publisher.Mono;
99

10-
import java.time.Instant;
10+
import java.time.OffsetDateTime;
1111
import java.util.UUID;
1212

1313
/**
@@ -29,7 +29,7 @@ public interface MessageManagementOperations {
2929
* the lock needs to be renewed. For each renewal, the lock is reset to the entity's LockDuration value.
3030
*
3131
* @param lockToken The {@link UUID} of the message {@link ServiceBusReceivedMessage} to be renewed.
32-
* @return {@link Instant} representing the pending renew.
32+
* @return {@link OffsetDateTime} representing the pending renew.
3333
*/
34-
Mono<Instant> renewMessageLock(String lockToken, String associatedLinkName);
34+
Mono<OffsetDateTime> renewMessageLock(String lockToken, String associatedLinkName);
3535
}

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/MessageUtils.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import java.nio.ByteBuffer;
2020
import java.time.Duration;
2121
import java.time.Instant;
22+
import java.time.OffsetDateTime;
23+
import java.time.ZoneOffset;
2224
import java.util.HashMap;
2325
import java.util.Map;
2426
import java.util.UUID;
@@ -87,19 +89,19 @@ static byte[] convertUUIDToDotNetBytes(UUID uuid) {
8789
}
8890

8991
/**
90-
* Gets the {@link Instant} representation of .NET epoch ticks. .NET ticks are measured from 0001/01/01. Java {@link
91-
* Instant} is measured from 1970/01/01.
92+
* Gets the {@link OffsetDateTime} representation of .NET epoch ticks. .NET ticks are measured from 0001/01/01.
93+
* Java {@link OffsetDateTime} is measured from 1970/01/01.
9294
*
9395
* @param dotNetTicks long measured from 01/01/0001
9496
*
9597
* @return The instant represented by the ticks.
9698
*/
97-
static Instant convertDotNetTicksToInstant(long dotNetTicks) {
99+
static OffsetDateTime convertDotNetTicksToOffsetDateTime(long dotNetTicks) {
98100
long ticksFromEpoch = dotNetTicks - EPOCH_IN_DOT_NET_TICKS;
99101
long millisecondsFromEpoch = Double.valueOf(ticksFromEpoch * 0.0001).longValue();
100102
long fractionTicks = ticksFromEpoch % 10000;
101103

102-
return Instant.ofEpochMilli(millisecondsFromEpoch).plusNanos(fractionTicks * 100);
104+
return Instant.ofEpochMilli(millisecondsFromEpoch).plusNanos(fractionTicks * 100).atOffset(ZoneOffset.UTC);
103105
}
104106

105107
/**

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusManagementNode.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import reactor.core.publisher.Flux;
1111
import reactor.core.publisher.Mono;
1212

13-
import java.time.Instant;
1413
import java.time.OffsetDateTime;
1514
import java.util.Map;
1615

@@ -74,17 +73,17 @@ Flux<ServiceBusReceivedMessage> receiveDeferredMessages(ReceiveMode receiveMode,
7473
* the lock needs to be renewed. For each renewal, the lock is reset to the entity's LockDuration value.
7574
*
7675
* @param messageLock The lock token of the message {@link ServiceBusReceivedMessage} to be renewed.
77-
* @return {@link Instant} representing the pending renew.
76+
* @return {@link OffsetDateTime} representing the pending renew.
7877
*/
79-
Mono<Instant> renewMessageLock(String messageLock, String associatedLinkName);
78+
Mono<OffsetDateTime> renewMessageLock(String messageLock, String associatedLinkName);
8079

8180
/**
8281
* Renews the lock on the session.
8382
*
8483
* @param sessionId Identifier for the session.
8584
* @return The next expiration time for the session.
8685
*/
87-
Mono<Instant> renewSessionLock(String sessionId, String associatedLinkName);
86+
Mono<OffsetDateTime> renewSessionLock(String sessionId, String associatedLinkName);
8887

8988
/**
9089
* Sends a scheduled message to the Azure Service Bus entity this sender is connected to. A scheduled message is

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiver.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import java.io.IOException;
3838
import java.time.Duration;
3939
import java.time.Instant;
40+
import java.time.OffsetDateTime;
41+
import java.time.ZoneOffset;
4042
import java.util.ArrayList;
4143
import java.util.List;
4244
import java.util.Map;
@@ -75,7 +77,7 @@ public class ServiceBusReactorReceiver extends ReactorReceiver implements Servic
7577
private final ReceiveLinkHandler handler;
7678
private final ReactorProvider provider;
7779
private final Mono<String> sessionIdMono;
78-
private final Mono<Instant> sessionLockedUntil;
80+
private final Mono<OffsetDateTime> sessionLockedUntil;
7981

8082
public ServiceBusReactorReceiver(String entityPath, Receiver receiver, ReceiveLinkHandler handler,
8183
TokenManager tokenManager, ReactorProvider provider, Duration timeout, AmqpRetryPolicy retryPolicy) {
@@ -109,11 +111,11 @@ public ServiceBusReactorReceiver(String entityPath, Receiver receiver, ReceiveLi
109111
if (receiver.getRemoteProperties() != null
110112
&& receiver.getRemoteProperties().containsKey(LOCKED_UNTIL_UTC)) {
111113
final long ticks = (long) receiver.getRemoteProperties().get(LOCKED_UNTIL_UTC);
112-
return MessageUtils.convertDotNetTicksToInstant(ticks);
114+
return MessageUtils.convertDotNetTicksToOffsetDateTime(ticks);
113115
} else {
114116
logger.info("entityPath[{}], linkName[{}]. Locked until not set.", entityPath, getLinkName());
115117

116-
return Instant.EPOCH;
118+
return Instant.EPOCH.atOffset(ZoneOffset.UTC);
117119
}
118120
})
119121
.cache(value -> Duration.ofMillis(Long.MAX_VALUE), error -> Duration.ZERO, () -> Duration.ZERO);
@@ -139,7 +141,7 @@ public Mono<String> getSessionId() {
139141
}
140142

141143
@Override
142-
public Mono<Instant> getSessionLockedUntil() {
144+
public Mono<OffsetDateTime> getSessionLockedUntil() {
143145
return sessionLockedUntil;
144146
}
145147

0 commit comments

Comments
 (0)