Skip to content

Commit 878e6ee

Browse files
authored
message receiver - fix null pointer error and ensure that receive link is recreated upon a failure (#439)
* message receiver/sender - fix null pointer error and ensure that receive/send link is recreated on a failure.
1 parent 09f6565 commit 878e6ee

File tree

2 files changed

+30
-22
lines changed

2 files changed

+30
-22
lines changed

azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageReceiver.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ public void run() {
118118
"clientId[%s], path[%s], linkName[%s] - Reschedule operation timer, current: [%s], remaining: [%s] secs",
119119
getClientId(),
120120
receivePath,
121-
receiveLink.getName(),
121+
getReceiveLinkName(),
122122
Instant.now(),
123123
timeoutTracker.remaining().getSeconds()));
124124
}
@@ -154,7 +154,7 @@ public void onComplete(Void result) {
154154
TRACE_LOGGER.debug(
155155
String.format(Locale.US,
156156
"clientId[%s], path[%s], linkName[%s] - token renewed",
157-
getClientId(), receivePath, receiveLink.getName()));
157+
getClientId(), receivePath, getReceiveLinkName()));
158158
}
159159
}
160160

@@ -164,7 +164,7 @@ public void onError(Exception error) {
164164
TRACE_LOGGER.info(
165165
String.format(Locale.US,
166166
"clientId[%s], path[%s], linkName[%s], tokenRenewalFailure[%s]",
167-
getClientId(), receivePath, receiveLink.getName(), error.getMessage()));
167+
getClientId(), receivePath, getReceiveLinkName(), error.getMessage()));
168168
}
169169
}
170170
});
@@ -173,7 +173,7 @@ public void onError(Exception error) {
173173
TRACE_LOGGER.info(
174174
String.format(Locale.US,
175175
"clientId[%s], path[%s], linkName[%s], tokenRenewalScheduleFailure[%s]",
176-
getClientId(), receivePath, receiveLink.getName(), exception.getMessage()));
176+
getClientId(), receivePath, getReceiveLinkName(), exception.getMessage()));
177177
}
178178
}
179179
}
@@ -236,6 +236,10 @@ private List<Message> receiveCore(final int messageCount) {
236236
return returnMessages;
237237
}
238238

239+
private String getReceiveLinkName() {
240+
return this.receiveLink == null ? "null" : this.receiveLink.getName();
241+
}
242+
239243
public Duration getReceiveTimeout() {
240244
return this.receiveTimeout;
241245
}
@@ -263,7 +267,7 @@ public CompletableFuture<Collection<Message>> receive(final int maxMessageCount)
263267
"clientId[%s], path[%s], linkName[%s] - schedule operation timer, current: [%s], remaining: [%s] secs",
264268
this.getClientId(),
265269
this.receivePath,
266-
this.receiveLink.getName(),
270+
this.getReceiveLinkName(),
267271
Instant.now(),
268272
this.receiveTimeout.getSeconds()));
269273
}
@@ -308,7 +312,7 @@ public void onOpenComplete(Exception exception) {
308312

309313
if (TRACE_LOGGER.isInfoEnabled()) {
310314
TRACE_LOGGER.info(String.format("onOpenComplete - clientId[%s], receiverPath[%s], linkName[%s], updated-link-credit[%s], sentCredits[%s]",
311-
this.getClientId(), this.receivePath, this.receiveLink.getName(), this.receiveLink.getCredit(), this.prefetchCount));
315+
this.getClientId(), this.receivePath, this.getReceiveLinkName(), this.receiveLink.getCredit(), this.prefetchCount));
312316
}
313317
} else {
314318
synchronized (this.errorConditionLock) {
@@ -407,7 +411,7 @@ public void onError(final Exception exception) {
407411
String.format(Locale.US, "clientId[%s], receiverPath[%s], linkName[%s], onError: %s",
408412
this.getClientId(),
409413
this.receivePath,
410-
this.receiveLink.getName(),
414+
this.getReceiveLinkName(),
411415
completionException));
412416
}
413417

@@ -425,7 +429,7 @@ public void onError(final Exception exception) {
425429
@Override
426430
public void onEvent() {
427431
if (!MessageReceiver.this.getIsClosingOrClosed()
428-
&& (receiveLink.getLocalState() == EndpointState.CLOSED || receiveLink.getRemoteState() == EndpointState.CLOSED)) {
432+
&& (receiveLink == null || receiveLink.getLocalState() == EndpointState.CLOSED || receiveLink.getRemoteState() == EndpointState.CLOSED)) {
429433
createReceiveLink();
430434
underlyingFactory.getRetryPolicy().incrementRetryCount(getClientId());
431435
}
@@ -438,7 +442,7 @@ public void onEvent() {
438442
String.format(Locale.US, "clientId[%s], receiverPath[%s], linkName[%s], scheduling createLink encountered error: %s",
439443
this.getClientId(),
440444
this.receivePath,
441-
this.receiveLink.getName(), ignore.getLocalizedMessage()));
445+
this.getReceiveLinkName(), ignore.getLocalizedMessage()));
442446
}
443447
}
444448
}
@@ -620,7 +624,7 @@ private void sendFlow(final int credits) {
620624

621625
if (TRACE_LOGGER.isDebugEnabled()) {
622626
TRACE_LOGGER.debug(String.format("clientId[%s], receiverPath[%s], linkName[%s], updated-link-credit[%s], sentCredits[%s], ThreadId[%s]",
623-
this.getClientId(), this.receivePath, this.receiveLink.getName(), this.receiveLink.getCredit(), tempFlow, Thread.currentThread().getId()));
627+
this.getClientId(), this.receivePath, this.getReceiveLinkName(), this.receiveLink.getCredit(), tempFlow, Thread.currentThread().getId()));
624628
}
625629
}
626630
}
@@ -825,7 +829,7 @@ public void onEvent() {
825829
receiveWork.onEvent();
826830

827831
if (!MessageReceiver.this.getIsClosingOrClosed()
828-
&& (receiveLink.getLocalState() == EndpointState.CLOSED || receiveLink.getRemoteState() == EndpointState.CLOSED)) {
832+
&& (receiveLink == null || receiveLink.getLocalState() == EndpointState.CLOSED || receiveLink.getRemoteState() == EndpointState.CLOSED)) {
829833
createReceiveLink();
830834
}
831835
}

azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageSender.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ public void onComplete(Void result) {
107107
if (TRACE_LOGGER.isDebugEnabled()) {
108108
TRACE_LOGGER.debug(String.format(Locale.US,
109109
"clientId[%s], path[%s], linkName[%s] - token renewed",
110-
getClientId(), sendPath, sendLink.getName()));
110+
getClientId(), sendPath, getSendLinkName()));
111111
}
112112
}
113113

@@ -116,15 +116,15 @@ public void onError(Exception error) {
116116
if (TRACE_LOGGER.isInfoEnabled()) {
117117
TRACE_LOGGER.info(String.format(Locale.US,
118118
"clientId[%s], path[%s], linkName[%s] - tokenRenewalFailure[%s]",
119-
getClientId(), sendPath, sendLink.getName(), error.getMessage()));
119+
getClientId(), sendPath, getSendLinkName(), error.getMessage()));
120120
}
121121
}
122122
});
123123
} catch (IOException | NoSuchAlgorithmException | InvalidKeyException | RuntimeException exception) {
124124
if (TRACE_LOGGER.isWarnEnabled()) {
125125
TRACE_LOGGER.warn(String.format(Locale.US,
126126
"clientId[%s], path[%s], linkName[%s] - tokenRenewalScheduleFailure[%s]",
127-
getClientId(), sendPath, sendLink.getName(), exception.getMessage()));
127+
getClientId(), sendPath, getSendLinkName(), exception.getMessage()));
128128
}
129129
}
130130
}
@@ -241,6 +241,10 @@ private CompletableFuture<Void> send(
241241
return this.sendCore(bytes, arrayOffset, messageFormat, onSend, tracker, null, null);
242242
}
243243

244+
private String getSendLinkName() {
245+
return this.sendLink == null ? "null" : this.sendLink.getName();
246+
}
247+
244248
public CompletableFuture<Void> send(final Iterable<Message> messages) {
245249
if (messages == null || IteratorUtil.sizeEquals(messages, 0)) {
246250
throw new IllegalArgumentException(String.format(Locale.US,
@@ -335,7 +339,7 @@ public void onOpenComplete(Exception completionException) {
335339

336340
if (TRACE_LOGGER.isInfoEnabled()) {
337341
TRACE_LOGGER.info(String.format("onOpenComplete - clientId[%s], sendPath[%s], linkName[%s]",
338-
this.getClientId(), this.sendPath, this.sendLink.getName()));
342+
this.getClientId(), this.sendPath, this.getSendLinkName()));
339343
}
340344

341345
if (!this.linkFirstOpen.isDone()) {
@@ -471,7 +475,7 @@ public void onError(final Exception completionException) {
471475
@Override
472476
public void onEvent() {
473477
if (!MessageSender.this.getIsClosingOrClosed()
474-
&& (sendLink.getLocalState() == EndpointState.CLOSED || sendLink.getRemoteState() == EndpointState.CLOSED)) {
478+
&& (sendLink == null || sendLink.getLocalState() == EndpointState.CLOSED || sendLink.getRemoteState() == EndpointState.CLOSED)) {
475479
recreateSendLink();
476480
}
477481
}
@@ -506,7 +510,7 @@ public void onSendComplete(final Delivery delivery) {
506510
String.format(
507511
Locale.US,
508512
"clientId[%s], path[%s], linkName[%s], deliveryTag[%s]",
509-
this.getClientId(), this.sendPath, this.sendLink.getName(), deliveryTag));
513+
this.getClientId(), this.sendPath, this.getSendLinkName(), deliveryTag));
510514

511515
final ReplayableWorkItem<Void> pendingSendWorkItem = this.pendingSendsData.remove(deliveryTag);
512516

@@ -574,7 +578,7 @@ public void onEvent() {
574578
if (TRACE_LOGGER.isDebugEnabled())
575579
TRACE_LOGGER.debug(
576580
String.format(Locale.US, "clientId[%s]. path[%s], linkName[%s], delivery[%s] - mismatch (or send timed out)",
577-
this.getClientId(), this.sendPath, this.sendLink.getName(), deliveryTag));
581+
this.getClientId(), this.sendPath, this.getSendLinkName(), deliveryTag));
578582
}
579583
}
580584

@@ -773,7 +777,7 @@ public void onFlow(final int creditIssued) {
773777
int numberOfSendsWaitingforCredit = this.pendingSends.size();
774778
TRACE_LOGGER.debug(String.format(Locale.US,
775779
"clientId[%s], path[%s], linkName[%s], remoteLinkCredit[%s], pendingSendsWaitingForCredit[%s], pendingSendsWaitingDelivery[%s]",
776-
this.getClientId(), this.sendPath, this.sendLink.getName(), creditIssued, numberOfSendsWaitingforCredit, this.pendingSendsData.size() - numberOfSendsWaitingforCredit));
780+
this.getClientId(), this.sendPath, this.getSendLinkName(), creditIssued, numberOfSendsWaitingforCredit, this.pendingSendsData.size() - numberOfSendsWaitingforCredit));
777781
}
778782

779783
this.sendWork.onEvent();
@@ -786,7 +790,7 @@ private void recreateSendLink() {
786790

787791
// actual send on the SenderLink should happen only in this method & should run on Reactor Thread
788792
private void processSendWork() {
789-
if (this.sendLink.getLocalState() == EndpointState.CLOSED || this.sendLink.getRemoteState() == EndpointState.CLOSED) {
793+
if (this.sendLink == null || this.sendLink.getLocalState() == EndpointState.CLOSED || this.sendLink.getRemoteState() == EndpointState.CLOSED) {
790794
if (!this.getIsClosingOrClosed())
791795
this.recreateSendLink();
792796

@@ -840,7 +844,7 @@ private void processSendWork() {
840844
if (TRACE_LOGGER.isDebugEnabled()) {
841845
TRACE_LOGGER.debug(
842846
String.format(Locale.US, "clientId[%s], path[%s], linkName[%s], deliveryTag[%s], sentMessageSize[%s], payloadActualSize[%s] - sendlink advance failed",
843-
this.getClientId(), this.sendPath, this.sendLink.getName(), deliveryTag, sentMsgSize, sendData.getEncodedMessageSize()));
847+
this.getClientId(), this.sendPath, this.getSendLinkName(), deliveryTag, sentMsgSize, sendData.getEncodedMessageSize()));
844848
}
845849

846850
if (delivery != null) {
@@ -858,7 +862,7 @@ private void processSendWork() {
858862
if (TRACE_LOGGER.isDebugEnabled()) {
859863
TRACE_LOGGER.debug(
860864
String.format(Locale.US, "clientId[%s], path[%s], linkName[%s], deliveryTag[%s] - sendData not found for this delivery.",
861-
this.getClientId(), this.sendPath, this.sendLink.getName(), deliveryTag));
865+
this.getClientId(), this.sendPath, this.getSendLinkName(), deliveryTag));
862866
}
863867
}
864868

0 commit comments

Comments
 (0)