diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java index 1f0b2290e..eaa3dfa5f 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java @@ -2,12 +2,12 @@ import static dev.openfeature.contrib.providers.flagd.resolver.common.Convert.convertProtobufMapToStructure; +import com.google.protobuf.Struct; import dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag; import dev.openfeature.contrib.providers.flagd.resolver.process.model.FlagParser; import dev.openfeature.contrib.providers.flagd.resolver.process.model.ParsingResult; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueueSource; -import dev.openfeature.flagd.grpc.sync.Sync.GetMetadataResponse; import dev.openfeature.sdk.ImmutableStructure; import dev.openfeature.sdk.Structure; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; @@ -114,7 +114,7 @@ private void streamerListener(final QueueSource connector) throws InterruptedExc Map flagMap = parsingResult.getFlags(); Map flagSetMetadataMap = parsingResult.getFlagSetMetadata(); - Structure metadata = parseSyncMetadata(payload.getMetadataResponse()); + Structure syncContext = parseSyncContext(payload.getSyncContext()); writeLock.lock(); try { changedFlagsKeys = getChangedFlagsKeys(flagMap); @@ -126,7 +126,7 @@ private void streamerListener(final QueueSource connector) throws InterruptedExc writeLock.unlock(); } if (!stateBlockingQueue.offer( - new StorageStateChange(StorageState.OK, changedFlagsKeys, metadata))) { + new StorageStateChange(StorageState.OK, changedFlagsKeys, syncContext))) { log.warn("Failed to convey OK status, queue is full"); } } catch (Throwable e) { @@ -150,11 +150,13 @@ private void streamerListener(final QueueSource connector) throws InterruptedExc log.info("Shutting down store stream listener"); } - private Structure parseSyncMetadata(GetMetadataResponse metadataResponse) { - try { - return convertProtobufMapToStructure(metadataResponse.getMetadata().getFieldsMap()); - } catch (Exception exception) { - log.error("Failed to parse metadataResponse, provider metadata may not be up-to-date"); + private Structure parseSyncContext(Struct syncContext) { + if (syncContext != null) { + try { + return convertProtobufMapToStructure(syncContext.getFieldsMap()); + } catch (Exception exception) { + log.error("Failed to parse metadataResponse, provider metadata may not be up-to-date"); + } } return new ImmutableStructure(); } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/QueuePayload.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/QueuePayload.java index f18b8d42a..071e51085 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/QueuePayload.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/QueuePayload.java @@ -1,6 +1,6 @@ package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector; -import dev.openfeature.flagd.grpc.sync.Sync.GetMetadataResponse; +import com.google.protobuf.Struct; import lombok.AllArgsConstructor; import lombok.Getter; @@ -10,9 +10,9 @@ public class QueuePayload { private final QueuePayloadType type; private final String flagData; - private final GetMetadataResponse metadataResponse; + private final Struct syncContext; public QueuePayload(QueuePayloadType type, String flagData) { - this(type, flagData, GetMetadataResponse.getDefaultInstance()); + this(type, flagData, null); } } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java index 9e043c9e0..3b3494677 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java @@ -1,5 +1,6 @@ package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.sync; +import com.google.protobuf.Struct; import dev.openfeature.contrib.providers.flagd.FlagdOptions; import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelConnector; import dev.openfeature.contrib.providers.flagd.resolver.common.FlagdProviderEvent; @@ -123,7 +124,7 @@ private void observeSyncStream() throws InterruptedException { log.debug("Initializing sync stream request"); final GetMetadataRequest.Builder metadataRequest = GetMetadataRequest.newBuilder(); - GetMetadataResponse metadataResponse = GetMetadataResponse.getDefaultInstance(); + GetMetadataResponse metadataResponse = null; // create a context which exists to track and cancel the stream try (CancellableContext context = Context.current().withCancellation()) { @@ -162,8 +163,7 @@ private void observeSyncStream() throws InterruptedException { log.debug("Exception in stream RPC, streamException {}, will restart", streamException); if (!outgoingQueue.offer(new QueuePayload( QueuePayloadType.ERROR, - String.format("Error from stream: %s", streamException.getMessage()), - metadataResponse))) { + String.format("Error from stream: %s", streamException.getMessage())))) { log.error("Failed to convey ERROR status, queue is full"); } break; @@ -173,7 +173,14 @@ private void observeSyncStream() throws InterruptedException { final String data = flagsResponse.getFlagConfiguration(); log.debug("Got stream response: {}", data); - if (!outgoingQueue.offer(new QueuePayload(QueuePayloadType.DATA, data, metadataResponse))) { + Struct syncContext = null; + if (flagsResponse.hasSyncContext()) { + syncContext = flagsResponse.getSyncContext(); + } else if (metadataResponse != null) { + syncContext = metadataResponse.getMetadata(); + } + + if (!outgoingQueue.offer(new QueuePayload(QueuePayloadType.DATA, data, syncContext))) { log.error("Stream writing failed"); } } diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/ProviderSteps.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/ProviderSteps.java index ae16290d5..47da566ba 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/ProviderSteps.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/ProviderSteps.java @@ -120,7 +120,11 @@ public void setupProvider(String providerType) throws InterruptedException { state.builder.port(container.getPort(State.resolverType)); } break; - default: + case "syncpayload": + flagdConfig = "sync-payload"; + state.builder.port(container.getPort(State.resolverType)); + break; + case "stable": this.state.providerType = ProviderType.DEFAULT; if (State.resolverType == Config.Resolver.FILE) { @@ -134,6 +138,8 @@ public void setupProvider(String providerType) throws InterruptedException { state.builder.port(container.getPort(State.resolverType)); } break; + default: + throw new IllegalStateException(); } when().post("http://" + container.getLaunchpadUrl() + "/start?config={config}", flagdConfig) .then() diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStoreTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStoreTest.java index c9e4d1b54..7f3fa99da 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStoreTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStoreTest.java @@ -11,7 +11,6 @@ import dev.openfeature.contrib.providers.flagd.resolver.process.model.FlagParser; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayloadType; -import dev.openfeature.flagd.grpc.sync.Sync.GetMetadataResponse; import java.time.Duration; import java.util.HashSet; import java.util.Map; @@ -35,10 +34,7 @@ void connectorHandling() throws Exception { // OK for simple flag assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> { - payload.offer(new QueuePayload( - QueuePayloadType.DATA, - getFlagsFromResource(VALID_SIMPLE), - GetMetadataResponse.getDefaultInstance())); + payload.offer(new QueuePayload(QueuePayloadType.DATA, getFlagsFromResource(VALID_SIMPLE))); }); assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> { @@ -47,10 +43,7 @@ void connectorHandling() throws Exception { // STALE for invalid flag assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> { - payload.offer(new QueuePayload( - QueuePayloadType.DATA, - getFlagsFromResource(INVALID_FLAG), - GetMetadataResponse.getDefaultInstance())); + payload.offer(new QueuePayload(QueuePayloadType.DATA, getFlagsFromResource(INVALID_FLAG))); }); assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> { @@ -59,8 +52,7 @@ void connectorHandling() throws Exception { // OK again for next payload assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> { - payload.offer(new QueuePayload( - QueuePayloadType.DATA, getFlagsFromResource(VALID_LONG), GetMetadataResponse.getDefaultInstance())); + payload.offer(new QueuePayload(QueuePayloadType.DATA, getFlagsFromResource(VALID_LONG))); }); assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> { @@ -69,7 +61,7 @@ void connectorHandling() throws Exception { // ERROR is propagated correctly assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> { - payload.offer(new QueuePayload(QueuePayloadType.ERROR, null, GetMetadataResponse.getDefaultInstance())); + payload.offer(new QueuePayload(QueuePayloadType.ERROR, null)); }); assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> { @@ -93,10 +85,7 @@ public void changedFlags() throws Exception { final BlockingQueue storageStateDTOS = store.getStateQueue(); assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> { - payload.offer(new QueuePayload( - QueuePayloadType.DATA, - getFlagsFromResource(VALID_SIMPLE), - GetMetadataResponse.getDefaultInstance())); + payload.offer(new QueuePayload(QueuePayloadType.DATA, getFlagsFromResource(VALID_SIMPLE))); }); // flags changed for first time assertEquals( @@ -105,8 +94,7 @@ public void changedFlags() throws Exception { storageStateDTOS.take().getChangedFlagsKeys()); assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> { - payload.offer(new QueuePayload( - QueuePayloadType.DATA, getFlagsFromResource(VALID_LONG), GetMetadataResponse.getDefaultInstance())); + payload.offer(new QueuePayload(QueuePayloadType.DATA, getFlagsFromResource(VALID_LONG))); }); Map expectedChangedFlags = FlagParser.parseString(getFlagsFromResource(VALID_LONG), true).getFlags(); diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/MockConnector.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/MockConnector.java index aa314a0a5..e2835af4d 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/MockConnector.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/MockConnector.java @@ -3,7 +3,6 @@ import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayloadType; import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueueSource; -import dev.openfeature.flagd.grpc.sync.Sync.GetMetadataResponse; import java.util.concurrent.BlockingQueue; import lombok.extern.slf4j.Slf4j; @@ -26,8 +25,7 @@ public BlockingQueue getStreamQueue() { public void shutdown() { // Emit error mocking closed connection scenario - if (!mockQueue.offer(new QueuePayload( - QueuePayloadType.ERROR, "shutdown invoked", GetMetadataResponse.getDefaultInstance()))) { + if (!mockQueue.offer(new QueuePayload(QueuePayloadType.ERROR, "shutdown invoked"))) { log.warn("Failed to offer shutdown status"); } } diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSourceTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSourceTest.java index 64ed0e069..274392d5b 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSourceTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSourceTest.java @@ -1,6 +1,7 @@ package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.sync; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; @@ -12,6 +13,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.google.protobuf.Struct; import dev.openfeature.contrib.providers.flagd.FlagdOptions; import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelConnector; import dev.openfeature.contrib.providers.flagd.resolver.common.QueueingStreamObserver; @@ -73,6 +75,7 @@ void onNextEnqueuesDataPayload() throws Exception { BlockingQueue streamQueue = connector.getStreamQueue(); QueuePayload payload = streamQueue.poll(1000, TimeUnit.MILLISECONDS); assertNotNull(payload); + assertNotNull(payload.getSyncContext()); assertEquals(QueuePayloadType.DATA, payload.getType()); // should NOT have restarted the stream (1 call) verify(stub, times(1)).syncFlags(any(), any()); @@ -94,6 +97,7 @@ void onNextEnqueuesDataPayloadMetadataDisabled() throws Exception { BlockingQueue streamQueue = connector.getStreamQueue(); QueuePayload payload = streamQueue.poll(1000, TimeUnit.MILLISECONDS); assertNotNull(payload); + assertNull(payload.getSyncContext()); assertEquals(QueuePayloadType.DATA, payload.getType()); // should NOT have restarted the stream (1 call) verify(stub, times(1)).syncFlags(any(), any()); @@ -101,6 +105,30 @@ void onNextEnqueuesDataPayloadMetadataDisabled() throws Exception { verify(blockingStub, times(0)).getMetadata(any()); } + @Test + void onNextEnqueuesDataPayloadWithSyncContext() throws Exception { + // disable GetMetadata call + SyncStreamQueueSource connector = + new SyncStreamQueueSource(FlagdOptions.builder().build(), mockConnector, stub, blockingStub); + latch = new CountDownLatch(1); + connector.init(); + latch.await(); + + // fire onNext (data) event + Struct syncContext = Struct.newBuilder().build(); + observer.onNext( + SyncFlagsResponse.newBuilder().setSyncContext(syncContext).build()); + + // should enqueue data payload + BlockingQueue streamQueue = connector.getStreamQueue(); + QueuePayload payload = streamQueue.poll(1000, TimeUnit.MILLISECONDS); + assertNotNull(payload); + assertEquals(syncContext, payload.getSyncContext()); + assertEquals(QueuePayloadType.DATA, payload.getType()); + // should NOT have restarted the stream (1 call) + verify(stub, times(1)).syncFlags(any(), any()); + } + @Test void onErrorEnqueuesDataPayload() throws Exception { SyncStreamQueueSource connector = diff --git a/providers/flagd/test-harness b/providers/flagd/test-harness index 59c3c3ccf..fe68e0310 160000 --- a/providers/flagd/test-harness +++ b/providers/flagd/test-harness @@ -1 +1 @@ -Subproject commit 59c3c3ccfb018db82281684d231067e332c8103d +Subproject commit fe68e0310fd817a8f9bc1e2559f2277fed3aed34