Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

import static dev.openfeature.contrib.providers.flagd.resolver.common.Convert.convertProtobufMapToStructure;

import com.google.protobuf.Struct;
import dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag;
import dev.openfeature.contrib.providers.flagd.resolver.process.model.FlagParser;
import dev.openfeature.contrib.providers.flagd.resolver.process.model.ParsingResult;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueueSource;
import dev.openfeature.flagd.grpc.sync.Sync.GetMetadataResponse;
import dev.openfeature.sdk.ImmutableStructure;
import dev.openfeature.sdk.Structure;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
Expand Down Expand Up @@ -114,7 +114,7 @@ private void streamerListener(final QueueSource connector) throws InterruptedExc
Map<String, FeatureFlag> flagMap = parsingResult.getFlags();
Map<String, Object> flagSetMetadataMap = parsingResult.getFlagSetMetadata();

Structure metadata = parseSyncMetadata(payload.getMetadataResponse());
Structure syncContext = parseSyncContext(payload.getSyncContext());
writeLock.lock();
try {
changedFlagsKeys = getChangedFlagsKeys(flagMap);
Expand All @@ -126,7 +126,7 @@ private void streamerListener(final QueueSource connector) throws InterruptedExc
writeLock.unlock();
}
if (!stateBlockingQueue.offer(
new StorageStateChange(StorageState.OK, changedFlagsKeys, metadata))) {
new StorageStateChange(StorageState.OK, changedFlagsKeys, syncContext))) {
log.warn("Failed to convey OK status, queue is full");
}
} catch (Throwable e) {
Expand All @@ -150,11 +150,13 @@ private void streamerListener(final QueueSource connector) throws InterruptedExc
log.info("Shutting down store stream listener");
}

private Structure parseSyncMetadata(GetMetadataResponse metadataResponse) {
try {
return convertProtobufMapToStructure(metadataResponse.getMetadata().getFieldsMap());
} catch (Exception exception) {
log.error("Failed to parse metadataResponse, provider metadata may not be up-to-date");
private Structure parseSyncContext(Struct syncContext) {
if (syncContext != null) {
try {
return convertProtobufMapToStructure(syncContext.getFieldsMap());
} catch (Exception exception) {
log.error("Failed to parse metadataResponse, provider metadata may not be up-to-date");
}
}
return new ImmutableStructure();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector;

import dev.openfeature.flagd.grpc.sync.Sync.GetMetadataResponse;
import com.google.protobuf.Struct;
import lombok.AllArgsConstructor;
import lombok.Getter;

Expand All @@ -10,9 +10,9 @@
public class QueuePayload {
private final QueuePayloadType type;
private final String flagData;
private final GetMetadataResponse metadataResponse;
private final Struct syncContext;

public QueuePayload(QueuePayloadType type, String flagData) {
this(type, flagData, GetMetadataResponse.getDefaultInstance());
this(type, flagData, null);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.sync;

import com.google.protobuf.Struct;
import dev.openfeature.contrib.providers.flagd.FlagdOptions;
import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelConnector;
import dev.openfeature.contrib.providers.flagd.resolver.common.FlagdProviderEvent;
Expand Down Expand Up @@ -123,7 +124,7 @@ private void observeSyncStream() throws InterruptedException {

log.debug("Initializing sync stream request");
final GetMetadataRequest.Builder metadataRequest = GetMetadataRequest.newBuilder();
GetMetadataResponse metadataResponse = GetMetadataResponse.getDefaultInstance();
GetMetadataResponse metadataResponse = null;

// create a context which exists to track and cancel the stream
try (CancellableContext context = Context.current().withCancellation()) {
Expand Down Expand Up @@ -162,8 +163,7 @@ private void observeSyncStream() throws InterruptedException {
log.debug("Exception in stream RPC, streamException {}, will restart", streamException);
if (!outgoingQueue.offer(new QueuePayload(
QueuePayloadType.ERROR,
String.format("Error from stream: %s", streamException.getMessage()),
metadataResponse))) {
String.format("Error from stream: %s", streamException.getMessage())))) {
log.error("Failed to convey ERROR status, queue is full");
}
break;
Expand All @@ -173,7 +173,14 @@ private void observeSyncStream() throws InterruptedException {
final String data = flagsResponse.getFlagConfiguration();
log.debug("Got stream response: {}", data);

if (!outgoingQueue.offer(new QueuePayload(QueuePayloadType.DATA, data, metadataResponse))) {
Struct syncContext = null;
if (flagsResponse.hasSyncContext()) {
syncContext = flagsResponse.getSyncContext();
} else if (metadataResponse != null) {
syncContext = metadataResponse.getMetadata();
}

if (!outgoingQueue.offer(new QueuePayload(QueuePayloadType.DATA, data, syncContext))) {
log.error("Stream writing failed");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,11 @@ public void setupProvider(String providerType) throws InterruptedException {
state.builder.port(container.getPort(State.resolverType));
}
break;
default:
case "syncpayload":
flagdConfig = "sync-payload";
state.builder.port(container.getPort(State.resolverType));
break;
case "stable":
this.state.providerType = ProviderType.DEFAULT;
if (State.resolverType == Config.Resolver.FILE) {

Expand All @@ -134,6 +138,8 @@ public void setupProvider(String providerType) throws InterruptedException {
state.builder.port(container.getPort(State.resolverType));
}
break;
default:
throw new IllegalStateException();
}
when().post("http://" + container.getLaunchpadUrl() + "/start?config={config}", flagdConfig)
.then()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import dev.openfeature.contrib.providers.flagd.resolver.process.model.FlagParser;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayloadType;
import dev.openfeature.flagd.grpc.sync.Sync.GetMetadataResponse;
import java.time.Duration;
import java.util.HashSet;
import java.util.Map;
Expand All @@ -35,10 +34,7 @@ void connectorHandling() throws Exception {

// OK for simple flag
assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> {
payload.offer(new QueuePayload(
QueuePayloadType.DATA,
getFlagsFromResource(VALID_SIMPLE),
GetMetadataResponse.getDefaultInstance()));
payload.offer(new QueuePayload(QueuePayloadType.DATA, getFlagsFromResource(VALID_SIMPLE)));
});

assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> {
Expand All @@ -47,10 +43,7 @@ void connectorHandling() throws Exception {

// STALE for invalid flag
assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> {
payload.offer(new QueuePayload(
QueuePayloadType.DATA,
getFlagsFromResource(INVALID_FLAG),
GetMetadataResponse.getDefaultInstance()));
payload.offer(new QueuePayload(QueuePayloadType.DATA, getFlagsFromResource(INVALID_FLAG)));
});

assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> {
Expand All @@ -59,8 +52,7 @@ void connectorHandling() throws Exception {

// OK again for next payload
assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> {
payload.offer(new QueuePayload(
QueuePayloadType.DATA, getFlagsFromResource(VALID_LONG), GetMetadataResponse.getDefaultInstance()));
payload.offer(new QueuePayload(QueuePayloadType.DATA, getFlagsFromResource(VALID_LONG)));
});

assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> {
Expand All @@ -69,7 +61,7 @@ void connectorHandling() throws Exception {

// ERROR is propagated correctly
assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> {
payload.offer(new QueuePayload(QueuePayloadType.ERROR, null, GetMetadataResponse.getDefaultInstance()));
payload.offer(new QueuePayload(QueuePayloadType.ERROR, null));
});

assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> {
Expand All @@ -93,10 +85,7 @@ public void changedFlags() throws Exception {
final BlockingQueue<StorageStateChange> storageStateDTOS = store.getStateQueue();

assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> {
payload.offer(new QueuePayload(
QueuePayloadType.DATA,
getFlagsFromResource(VALID_SIMPLE),
GetMetadataResponse.getDefaultInstance()));
payload.offer(new QueuePayload(QueuePayloadType.DATA, getFlagsFromResource(VALID_SIMPLE)));
});
// flags changed for first time
assertEquals(
Expand All @@ -105,8 +94,7 @@ public void changedFlags() throws Exception {
storageStateDTOS.take().getChangedFlagsKeys());

assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> {
payload.offer(new QueuePayload(
QueuePayloadType.DATA, getFlagsFromResource(VALID_LONG), GetMetadataResponse.getDefaultInstance()));
payload.offer(new QueuePayload(QueuePayloadType.DATA, getFlagsFromResource(VALID_LONG)));
});
Map<String, FeatureFlag> expectedChangedFlags =
FlagParser.parseString(getFlagsFromResource(VALID_LONG), true).getFlags();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayloadType;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueueSource;
import dev.openfeature.flagd.grpc.sync.Sync.GetMetadataResponse;
import java.util.concurrent.BlockingQueue;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -26,8 +25,7 @@ public BlockingQueue<QueuePayload> getStreamQueue() {

public void shutdown() {
// Emit error mocking closed connection scenario
if (!mockQueue.offer(new QueuePayload(
QueuePayloadType.ERROR, "shutdown invoked", GetMetadataResponse.getDefaultInstance()))) {
if (!mockQueue.offer(new QueuePayload(QueuePayloadType.ERROR, "shutdown invoked"))) {
log.warn("Failed to offer shutdown status");
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.sync;

import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
Expand All @@ -12,6 +13,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.protobuf.Struct;
import dev.openfeature.contrib.providers.flagd.FlagdOptions;
import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelConnector;
import dev.openfeature.contrib.providers.flagd.resolver.common.QueueingStreamObserver;
Expand Down Expand Up @@ -73,6 +75,7 @@ void onNextEnqueuesDataPayload() throws Exception {
BlockingQueue<QueuePayload> streamQueue = connector.getStreamQueue();
QueuePayload payload = streamQueue.poll(1000, TimeUnit.MILLISECONDS);
assertNotNull(payload);
assertNotNull(payload.getSyncContext());
assertEquals(QueuePayloadType.DATA, payload.getType());
// should NOT have restarted the stream (1 call)
verify(stub, times(1)).syncFlags(any(), any());
Expand All @@ -94,13 +97,38 @@ void onNextEnqueuesDataPayloadMetadataDisabled() throws Exception {
BlockingQueue<QueuePayload> streamQueue = connector.getStreamQueue();
QueuePayload payload = streamQueue.poll(1000, TimeUnit.MILLISECONDS);
assertNotNull(payload);
assertNull(payload.getSyncContext());
assertEquals(QueuePayloadType.DATA, payload.getType());
// should NOT have restarted the stream (1 call)
verify(stub, times(1)).syncFlags(any(), any());
// should NOT have called getMetadata
verify(blockingStub, times(0)).getMetadata(any());
}

@Test
void onNextEnqueuesDataPayloadWithSyncContext() throws Exception {
// disable GetMetadata call
SyncStreamQueueSource connector =
new SyncStreamQueueSource(FlagdOptions.builder().build(), mockConnector, stub, blockingStub);
latch = new CountDownLatch(1);
connector.init();
latch.await();

// fire onNext (data) event
Struct syncContext = Struct.newBuilder().build();
observer.onNext(
SyncFlagsResponse.newBuilder().setSyncContext(syncContext).build());

// should enqueue data payload
BlockingQueue<QueuePayload> streamQueue = connector.getStreamQueue();
QueuePayload payload = streamQueue.poll(1000, TimeUnit.MILLISECONDS);
assertNotNull(payload);
assertEquals(syncContext, payload.getSyncContext());
assertEquals(QueuePayloadType.DATA, payload.getType());
// should NOT have restarted the stream (1 call)
verify(stub, times(1)).syncFlags(any(), any());
}

@Test
void onErrorEnqueuesDataPayload() throws Exception {
SyncStreamQueueSource connector =
Expand Down
2 changes: 1 addition & 1 deletion providers/flagd/test-harness