Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

import static dev.openfeature.contrib.providers.flagd.resolver.common.Convert.convertProtobufMapToStructure;

import com.google.protobuf.Struct;
import dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag;
import dev.openfeature.contrib.providers.flagd.resolver.process.model.FlagParser;
import dev.openfeature.contrib.providers.flagd.resolver.process.model.ParsingResult;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueueSource;
import dev.openfeature.flagd.grpc.sync.Sync.GetMetadataResponse;
import dev.openfeature.sdk.ImmutableStructure;
import dev.openfeature.sdk.Structure;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
Expand Down Expand Up @@ -114,7 +114,7 @@ private void streamerListener(final QueueSource connector) throws InterruptedExc
Map<String, FeatureFlag> flagMap = parsingResult.getFlags();
Map<String, Object> flagSetMetadataMap = parsingResult.getFlagSetMetadata();

Structure metadata = parseSyncMetadata(payload.getMetadataResponse());
Structure metadata = parseSyncContextOrMetadata(payload.getSyncContext());
writeLock.lock();
try {
changedFlagsKeys = getChangedFlagsKeys(flagMap);
Expand Down Expand Up @@ -150,9 +150,9 @@ private void streamerListener(final QueueSource connector) throws InterruptedExc
log.info("Shutting down store stream listener");
}

private Structure parseSyncMetadata(GetMetadataResponse metadataResponse) {
private Structure parseSyncContextOrMetadata(Struct syncContext) {
try {
return convertProtobufMapToStructure(metadataResponse.getMetadata().getFieldsMap());
return convertProtobufMapToStructure(syncContext.getFieldsMap());
} catch (Exception exception) {
log.error("Failed to parse metadataResponse, provider metadata may not be up-to-date");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector;

import dev.openfeature.flagd.grpc.sync.Sync.GetMetadataResponse;
import com.google.protobuf.Struct;
import lombok.AllArgsConstructor;
import lombok.Getter;

Expand All @@ -10,9 +10,9 @@
public class QueuePayload {
private final QueuePayloadType type;
private final String flagData;
private final GetMetadataResponse metadataResponse;
private final Struct syncContext;

public QueuePayload(QueuePayloadType type, String flagData) {
this(type, flagData, GetMetadataResponse.getDefaultInstance());
this(type, flagData, null);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.sync;

import com.google.protobuf.Struct;
import dev.openfeature.contrib.providers.flagd.FlagdOptions;
import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelConnector;
import dev.openfeature.contrib.providers.flagd.resolver.common.FlagdProviderEvent;
Expand Down Expand Up @@ -123,7 +124,7 @@ private void observeSyncStream() throws InterruptedException {

log.debug("Initializing sync stream request");
final GetMetadataRequest.Builder metadataRequest = GetMetadataRequest.newBuilder();
GetMetadataResponse metadataResponse = GetMetadataResponse.getDefaultInstance();
GetMetadataResponse metadataResponse = null;

// create a context which exists to track and cancel the stream
try (CancellableContext context = Context.current().withCancellation()) {
Expand Down Expand Up @@ -162,8 +163,7 @@ private void observeSyncStream() throws InterruptedException {
log.debug("Exception in stream RPC, streamException {}, will restart", streamException);
if (!outgoingQueue.offer(new QueuePayload(
QueuePayloadType.ERROR,
String.format("Error from stream: %s", streamException.getMessage()),
metadataResponse))) {
String.format("Error from stream: %s", streamException.getMessage())))) {
log.error("Failed to convey ERROR status, queue is full");
}
break;
Expand All @@ -173,7 +173,14 @@ private void observeSyncStream() throws InterruptedException {
final String data = flagsResponse.getFlagConfiguration();
log.debug("Got stream response: {}", data);

if (!outgoingQueue.offer(new QueuePayload(QueuePayloadType.DATA, data, metadataResponse))) {
Struct syncContext = null;
if (flagsResponse.hasSyncContext()) {
syncContext = flagsResponse.getSyncContext();
} else if (metadataResponse != null) {
syncContext = metadataResponse.getMetadata();
}

if (!outgoingQueue.offer(new QueuePayload(QueuePayloadType.DATA, data, syncContext))) {
log.error("Stream writing failed");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,11 @@ public void setupProvider(String providerType) throws InterruptedException {
state.builder.port(container.getPort(State.resolverType));
}
break;
default:
case "syncpayload":
flagdConfig = "sync-payload";
state.builder.port(container.getPort(State.resolverType));
break;
case "stable":
this.state.providerType = ProviderType.DEFAULT;
if (State.resolverType == Config.Resolver.FILE) {

Expand All @@ -134,6 +138,8 @@ public void setupProvider(String providerType) throws InterruptedException {
state.builder.port(container.getPort(State.resolverType));
}
break;
default:
throw new IllegalStateException();
}
when().post("http://" + container.getLaunchpadUrl() + "/start?config={config}", flagdConfig)
.then()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import dev.openfeature.contrib.providers.flagd.resolver.process.model.FlagParser;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayloadType;
import dev.openfeature.flagd.grpc.sync.Sync.GetMetadataResponse;
import java.time.Duration;
import java.util.HashSet;
import java.util.Map;
Expand All @@ -35,10 +34,7 @@ void connectorHandling() throws Exception {

// OK for simple flag
assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> {
payload.offer(new QueuePayload(
QueuePayloadType.DATA,
getFlagsFromResource(VALID_SIMPLE),
GetMetadataResponse.getDefaultInstance()));
payload.offer(new QueuePayload(QueuePayloadType.DATA, getFlagsFromResource(VALID_SIMPLE)));
});

assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> {
Expand All @@ -47,10 +43,7 @@ void connectorHandling() throws Exception {

// STALE for invalid flag
assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> {
payload.offer(new QueuePayload(
QueuePayloadType.DATA,
getFlagsFromResource(INVALID_FLAG),
GetMetadataResponse.getDefaultInstance()));
payload.offer(new QueuePayload(QueuePayloadType.DATA, getFlagsFromResource(INVALID_FLAG)));
});

assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> {
Expand All @@ -59,8 +52,7 @@ void connectorHandling() throws Exception {

// OK again for next payload
assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> {
payload.offer(new QueuePayload(
QueuePayloadType.DATA, getFlagsFromResource(VALID_LONG), GetMetadataResponse.getDefaultInstance()));
payload.offer(new QueuePayload(QueuePayloadType.DATA, getFlagsFromResource(VALID_LONG)));
});

assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> {
Expand All @@ -69,7 +61,7 @@ void connectorHandling() throws Exception {

// ERROR is propagated correctly
assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> {
payload.offer(new QueuePayload(QueuePayloadType.ERROR, null, GetMetadataResponse.getDefaultInstance()));
payload.offer(new QueuePayload(QueuePayloadType.ERROR, null));
});

assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> {
Expand All @@ -93,10 +85,7 @@ public void changedFlags() throws Exception {
final BlockingQueue<StorageStateChange> storageStateDTOS = store.getStateQueue();

assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> {
payload.offer(new QueuePayload(
QueuePayloadType.DATA,
getFlagsFromResource(VALID_SIMPLE),
GetMetadataResponse.getDefaultInstance()));
payload.offer(new QueuePayload(QueuePayloadType.DATA, getFlagsFromResource(VALID_SIMPLE)));
});
// flags changed for first time
assertEquals(
Expand All @@ -105,8 +94,7 @@ public void changedFlags() throws Exception {
storageStateDTOS.take().getChangedFlagsKeys());

assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> {
payload.offer(new QueuePayload(
QueuePayloadType.DATA, getFlagsFromResource(VALID_LONG), GetMetadataResponse.getDefaultInstance()));
payload.offer(new QueuePayload(QueuePayloadType.DATA, getFlagsFromResource(VALID_LONG)));
});
Map<String, FeatureFlag> expectedChangedFlags =
FlagParser.parseString(getFlagsFromResource(VALID_LONG), true).getFlags();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayloadType;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueueSource;
import dev.openfeature.flagd.grpc.sync.Sync.GetMetadataResponse;
import java.util.concurrent.BlockingQueue;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -26,8 +25,7 @@ public BlockingQueue<QueuePayload> getStreamQueue() {

public void shutdown() {
// Emit error mocking closed connection scenario
if (!mockQueue.offer(new QueuePayload(
QueuePayloadType.ERROR, "shutdown invoked", GetMetadataResponse.getDefaultInstance()))) {
if (!mockQueue.offer(new QueuePayload(QueuePayloadType.ERROR, "shutdown invoked"))) {
log.warn("Failed to offer shutdown status");
}
}
Expand Down
2 changes: 1 addition & 1 deletion providers/flagd/test-harness