diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java index a3b01f913..a8ed3ae86 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java @@ -21,7 +21,10 @@ import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; @@ -49,6 +52,7 @@ public class SyncStreamQueueSource implements QueueSource { private final BlockingQueue outgoingQueue = new LinkedBlockingQueue<>(QUEUE_SIZE); private final FlagSyncServiceStub flagSyncStub; private final FlagSyncServiceBlockingStub metadataStub; + private final ScheduledExecutorService scheduler; /** * Creates a new SyncStreamQueueSource responsible for observing the event stream. @@ -65,6 +69,7 @@ public SyncStreamQueueSource(final FlagdOptions options, Consumer { + Thread t = new Thread(r, "flagd-sync-retry-scheduler"); + t.setDaemon(true); + return t; + }); } /** Initialize sync stream connector. */ @@ -109,6 +123,13 @@ public void shutdown() throws InterruptedException { log.debug("Shutdown already in progress or completed"); return; } + this.scheduler.shutdownNow(); + try { + this.scheduler.awaitTermination(deadline, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + log.debug("Scheduler termination was interrupted", e); + Thread.currentThread().interrupt(); + } this.channelConnector.shutdown(); } @@ -120,45 +141,54 @@ private void observeSyncStream() { // "waitForReady" on the channel, plus our retry policy slow this loop down in // error conditions while (!shutdown.get()) { + if (shouldThrottle.getAndSet(false)) { + log.debug("Previous stream ended with error, waiting {} ms before retry", this.maxBackoffMs); + scheduleRetry(); + return; + } + + log.debug("Initializing sync stream request"); + SyncStreamObserver observer = new SyncStreamObserver(outgoingQueue, shouldThrottle); + try { + observer.metadata = getMetadata(); + } catch (Exception metaEx) { + // retry if getMetadata fails + String message = metaEx.getMessage(); + log.debug("Metadata request error: {}, will restart", message, metaEx); + enqueueError(String.format("Error in getMetadata request: %s", message)); + shouldThrottle.set(true); + continue; + } + try { - if (shouldThrottle.getAndSet(false)) { - log.debug("Previous stream ended with error, waiting {} ms before retry", this.maxBackoffMs); - Thread.sleep(this.maxBackoffMs); - - // Check shutdown again after sleep to avoid unnecessary work - if (shutdown.get()) { - break; - } - } - - log.debug("Initializing sync stream request"); - SyncStreamObserver observer = new SyncStreamObserver(outgoingQueue, shouldThrottle); - try { - observer.metadata = getMetadata(); - } catch (Exception metaEx) { - // retry if getMetadata fails - String message = metaEx.getMessage(); - log.debug("Metadata request error: {}, will restart", message, metaEx); - enqueueError(String.format("Error in getMetadata request: %s", message)); - shouldThrottle.set(true); - continue; - } - - try { - syncFlags(observer); - } catch (Exception ex) { - log.error("Unexpected sync stream exception, will restart.", ex); - enqueueError(String.format("Error in syncStream: %s", ex.getMessage())); - shouldThrottle.set(true); - } - } catch (InterruptedException ie) { - log.debug("Stream loop interrupted, most likely shutdown was invoked", ie); + syncFlags(observer); + } catch (Exception ex) { + log.error("Unexpected sync stream exception, will restart.", ex); + enqueueError(String.format("Error in syncStream: %s", ex.getMessage())); + shouldThrottle.set(true); } } log.info("Shutdown invoked, exiting event stream listener"); } + /** + * Schedules a retry of the sync stream after the backoff period. + * Uses a non-blocking approach instead of Thread.sleep(). + */ + private void scheduleRetry() { + if (shutdown.get()) { + log.debug("Shutdown in progress, not scheduling retry."); + return; + } + try { + scheduler.schedule(this::observeSyncStream, this.maxBackoffMs, TimeUnit.MILLISECONDS); + } catch (RejectedExecutionException e) { + // Scheduler was shut down after the shutdown check, which is fine + log.debug("Retry scheduling rejected, scheduler is shut down", e); + } + } + // TODO: remove the metadata call entirely after https://github.com/open-feature/flagd/issues/1584 private Struct getMetadata() { if (syncMetadataDisabled) {