Skip to content

Commit 09f6565

Browse files
Fix lease scanner issues when Storage unreachable (#434)
This fix is for issue #432. There are two parts: AzureStorageCheckpointLeaseManager performs certain Storage actions within a forEach. If those actions fail, the StorageException gets wrapped in a NoSuchElementException. Catch those and strip off the NoSuchElementException, then handle the StorageException in the existing way. The unexpected NoSuchElementExceptions were not being caught anywhere and the scanner thread was dying without rescheduling itself. Added code in PartitionMananger.scan to catch any exceptions that leak out of PartitionScanner and reschedule the scanner unless the host instance is shutting down.
1 parent a2013b8 commit 09f6565

File tree

2 files changed

+46
-28
lines changed

2 files changed

+46
-28
lines changed

azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/AzureStorageCheckpointLeaseManager.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.Hashtable;
2323
import java.util.Iterator;
2424
import java.util.List;
25+
import java.util.NoSuchElementException;
2526
import java.util.concurrent.*;
2627
import java.util.regex.Matcher;
2728
import java.util.regex.Pattern;
@@ -317,10 +318,16 @@ public CompletableFuture<List<BaseLease>> getAllLeases() {
317318
(bp.getLeaseState() == LeaseState.LEASED)));
318319
});
319320
future = CompletableFuture.completedFuture(infos);
320-
} catch (URISyntaxException | StorageException e) {
321-
TRACE_LOGGER.warn(this.hostContext.withHost("Failure while getting lease state details"), e);
321+
} catch (Exception e) {
322+
Throwable effective = e;
323+
if (e instanceof NoSuchElementException) {
324+
// If there is a StorageException in the forEach, it arrives wrapped in a NoSuchElementException.
325+
// Strip the misleading NoSuchElementException to provide a meaningful error for the user.
326+
effective = e.getCause();
327+
}
328+
TRACE_LOGGER.warn(this.hostContext.withHost("Failure while getting lease state details"), effective);
322329
future = new CompletableFuture<List<BaseLease>>();
323-
future.completeExceptionally(LoggingUtils.wrapException(e, EventProcessorHostActionStrings.GETTING_LEASE));
330+
future.completeExceptionally(LoggingUtils.wrapException(effective, EventProcessorHostActionStrings.GETTING_LEASE));
324331
}
325332

326333
return future;

azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionManager.java

Lines changed: 36 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -290,31 +290,42 @@ private Void scan(boolean isFirst) {
290290
TRACE_LOGGER.debug(this.hostContext.withHost("Starting lease scan"));
291291
long start = System.currentTimeMillis();
292292

293-
(new PartitionScanner(this.hostContext, (lease) -> this.pumpManager.addPump(lease), this)).scan(isFirst)
294-
.whenCompleteAsync((didSteal, e) ->
295-
{
296-
TRACE_LOGGER.debug(this.hostContext.withHost("Scanning took " + (System.currentTimeMillis() - start)));
297-
if ((e != null) && !(e instanceof ClosingException)) {
298-
TRACE_LOGGER.warn(this.hostContext.withHost("Lease scanner got exception"), e);
299-
}
300-
301-
onPartitionCheckCompleteTestHook();
302-
303-
// Schedule the next scan unless we are shutting down.
304-
if (!this.getIsClosingOrClosed()) {
305-
int seconds = didSteal ? this.hostContext.getPartitionManagerOptions().getFastScanIntervalInSeconds() :
306-
this.hostContext.getPartitionManagerOptions().getSlowScanIntervalInSeconds();
307-
if (isFirst) {
308-
seconds = this.hostContext.getPartitionManagerOptions().getStartupScanDelayInSeconds();
309-
}
310-
synchronized (this.scanFutureSynchronizer) {
311-
this.scanFuture = this.hostContext.getExecutor().schedule(() -> scan(false), seconds, TimeUnit.SECONDS);
312-
}
313-
TRACE_LOGGER.debug(this.hostContext.withHost("Scheduling lease scanner in " + seconds));
314-
} else {
315-
TRACE_LOGGER.warn(this.hostContext.withHost("Not scheduling lease scanner due to shutdown"));
316-
}
317-
}, this.hostContext.getExecutor());
293+
try {
294+
(new PartitionScanner(this.hostContext, (lease) -> this.pumpManager.addPump(lease), this)).scan(isFirst)
295+
.whenCompleteAsync((didSteal, e) ->
296+
{
297+
TRACE_LOGGER.debug(this.hostContext.withHost("Scanning took " + (System.currentTimeMillis() - start)));
298+
if ((e != null) && !(e instanceof ClosingException)) {
299+
TRACE_LOGGER.warn(this.hostContext.withHost("Lease scanner got exception"), e);
300+
}
301+
302+
onPartitionCheckCompleteTestHook();
303+
304+
// Schedule the next scan unless we are shutting down.
305+
if (!this.getIsClosingOrClosed()) {
306+
int seconds = didSteal ? this.hostContext.getPartitionManagerOptions().getFastScanIntervalInSeconds() :
307+
this.hostContext.getPartitionManagerOptions().getSlowScanIntervalInSeconds();
308+
if (isFirst) {
309+
seconds = this.hostContext.getPartitionManagerOptions().getStartupScanDelayInSeconds();
310+
}
311+
synchronized (this.scanFutureSynchronizer) {
312+
this.scanFuture = this.hostContext.getExecutor().schedule(() -> scan(false), seconds, TimeUnit.SECONDS);
313+
}
314+
TRACE_LOGGER.debug(this.hostContext.withHost("Scheduling lease scanner in " + seconds));
315+
} else {
316+
TRACE_LOGGER.warn(this.hostContext.withHost("Not scheduling lease scanner due to shutdown"));
317+
}
318+
}, this.hostContext.getExecutor());
319+
} catch (Exception e) {
320+
TRACE_LOGGER.error(this.hostContext.withHost("Lease scanner threw directly"), e);
321+
if (!this.getIsClosingOrClosed()) {
322+
int seconds = this.hostContext.getPartitionManagerOptions().getSlowScanIntervalInSeconds();
323+
synchronized (this.scanFutureSynchronizer) {
324+
this.scanFuture = this.hostContext.getExecutor().schedule(() -> scan(false), seconds, TimeUnit.SECONDS);
325+
}
326+
TRACE_LOGGER.debug(this.hostContext.withHost("Forced schedule of lease scanner in " + seconds));
327+
}
328+
}
318329

319330
return null;
320331
}

0 commit comments

Comments
 (0)