Skip to content

Commit b1ba01b

Browse files
committed
feat(flagd): Adjust to disable-sync-metadata toggle
Signed-off-by: Konvalinka <lea.konvalinka@dynatrace.com>
1 parent 229ddcb commit b1ba01b

File tree

7 files changed

+33
-34
lines changed

7 files changed

+33
-34
lines changed

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@
22

33
import static dev.openfeature.contrib.providers.flagd.resolver.common.Convert.convertProtobufMapToStructure;
44

5+
import com.google.protobuf.Struct;
56
import dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag;
67
import dev.openfeature.contrib.providers.flagd.resolver.process.model.FlagParser;
78
import dev.openfeature.contrib.providers.flagd.resolver.process.model.ParsingResult;
89
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload;
910
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueueSource;
10-
import dev.openfeature.flagd.grpc.sync.Sync.GetMetadataResponse;
1111
import dev.openfeature.sdk.ImmutableStructure;
1212
import dev.openfeature.sdk.Structure;
1313
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
@@ -114,7 +114,7 @@ private void streamerListener(final QueueSource connector) throws InterruptedExc
114114
Map<String, FeatureFlag> flagMap = parsingResult.getFlags();
115115
Map<String, Object> flagSetMetadataMap = parsingResult.getFlagSetMetadata();
116116

117-
Structure metadata = parseSyncMetadata(payload.getMetadataResponse());
117+
Structure metadata = parseSyncContextOrMetadata(payload.getSyncContext());
118118
writeLock.lock();
119119
try {
120120
changedFlagsKeys = getChangedFlagsKeys(flagMap);
@@ -150,9 +150,9 @@ private void streamerListener(final QueueSource connector) throws InterruptedExc
150150
log.info("Shutting down store stream listener");
151151
}
152152

153-
private Structure parseSyncMetadata(GetMetadataResponse metadataResponse) {
153+
private Structure parseSyncContextOrMetadata(Struct syncContext) {
154154
try {
155-
return convertProtobufMapToStructure(metadataResponse.getMetadata().getFieldsMap());
155+
return convertProtobufMapToStructure(syncContext.getFieldsMap());
156156
} catch (Exception exception) {
157157
log.error("Failed to parse metadataResponse, provider metadata may not be up-to-date");
158158
}
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector;
22

3-
import dev.openfeature.flagd.grpc.sync.Sync.GetMetadataResponse;
3+
import com.google.protobuf.Struct;
44
import lombok.AllArgsConstructor;
55
import lombok.Getter;
66

@@ -10,9 +10,9 @@
1010
public class QueuePayload {
1111
private final QueuePayloadType type;
1212
private final String flagData;
13-
private final GetMetadataResponse metadataResponse;
13+
private final Struct syncContext;
1414

1515
public QueuePayload(QueuePayloadType type, String flagData) {
16-
this(type, flagData, GetMetadataResponse.getDefaultInstance());
16+
this(type, flagData, null);
1717
}
1818
}

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.sync;
22

3+
import com.google.protobuf.Struct;
34
import dev.openfeature.contrib.providers.flagd.FlagdOptions;
45
import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelConnector;
56
import dev.openfeature.contrib.providers.flagd.resolver.common.FlagdProviderEvent;
@@ -123,7 +124,7 @@ private void observeSyncStream() throws InterruptedException {
123124

124125
log.debug("Initializing sync stream request");
125126
final GetMetadataRequest.Builder metadataRequest = GetMetadataRequest.newBuilder();
126-
GetMetadataResponse metadataResponse = GetMetadataResponse.getDefaultInstance();
127+
GetMetadataResponse metadataResponse = null;
127128

128129
// create a context which exists to track and cancel the stream
129130
try (CancellableContext context = Context.current().withCancellation()) {
@@ -162,8 +163,7 @@ private void observeSyncStream() throws InterruptedException {
162163
log.debug("Exception in stream RPC, streamException {}, will restart", streamException);
163164
if (!outgoingQueue.offer(new QueuePayload(
164165
QueuePayloadType.ERROR,
165-
String.format("Error from stream: %s", streamException.getMessage()),
166-
metadataResponse))) {
166+
String.format("Error from stream: %s", streamException.getMessage())))) {
167167
log.error("Failed to convey ERROR status, queue is full");
168168
}
169169
break;
@@ -173,7 +173,14 @@ private void observeSyncStream() throws InterruptedException {
173173
final String data = flagsResponse.getFlagConfiguration();
174174
log.debug("Got stream response: {}", data);
175175

176-
if (!outgoingQueue.offer(new QueuePayload(QueuePayloadType.DATA, data, metadataResponse))) {
176+
Struct syncContext;
177+
if (flagsResponse.hasSyncContext()) {
178+
syncContext = flagsResponse.getSyncContext();
179+
} else {
180+
syncContext = metadataResponse.getMetadata();
181+
}
182+
183+
if (!outgoingQueue.offer(new QueuePayload(QueuePayloadType.DATA, data, syncContext))) {
177184
log.error("Stream writing failed");
178185
}
179186
}

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/ProviderSteps.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,11 @@ public void setupProvider(String providerType) throws InterruptedException {
120120
state.builder.port(container.getPort(State.resolverType));
121121
}
122122
break;
123-
default:
123+
case "syncpayload":
124+
flagdConfig = "sync-payload";
125+
state.builder.port(container.getPort(State.resolverType));
126+
break;
127+
case "stable":
124128
this.state.providerType = ProviderType.DEFAULT;
125129
if (State.resolverType == Config.Resolver.FILE) {
126130

@@ -134,6 +138,8 @@ public void setupProvider(String providerType) throws InterruptedException {
134138
state.builder.port(container.getPort(State.resolverType));
135139
}
136140
break;
141+
default:
142+
throw new IllegalStateException();
137143
}
138144
when().post("http://" + container.getLaunchpadUrl() + "/start?config={config}", flagdConfig)
139145
.then()

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStoreTest.java

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import dev.openfeature.contrib.providers.flagd.resolver.process.model.FlagParser;
1212
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload;
1313
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayloadType;
14-
import dev.openfeature.flagd.grpc.sync.Sync.GetMetadataResponse;
1514
import java.time.Duration;
1615
import java.util.HashSet;
1716
import java.util.Map;
@@ -35,10 +34,7 @@ void connectorHandling() throws Exception {
3534

3635
// OK for simple flag
3736
assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> {
38-
payload.offer(new QueuePayload(
39-
QueuePayloadType.DATA,
40-
getFlagsFromResource(VALID_SIMPLE),
41-
GetMetadataResponse.getDefaultInstance()));
37+
payload.offer(new QueuePayload(QueuePayloadType.DATA, getFlagsFromResource(VALID_SIMPLE)));
4238
});
4339

4440
assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> {
@@ -47,10 +43,7 @@ void connectorHandling() throws Exception {
4743

4844
// STALE for invalid flag
4945
assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> {
50-
payload.offer(new QueuePayload(
51-
QueuePayloadType.DATA,
52-
getFlagsFromResource(INVALID_FLAG),
53-
GetMetadataResponse.getDefaultInstance()));
46+
payload.offer(new QueuePayload(QueuePayloadType.DATA, getFlagsFromResource(INVALID_FLAG)));
5447
});
5548

5649
assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> {
@@ -59,8 +52,7 @@ void connectorHandling() throws Exception {
5952

6053
// OK again for next payload
6154
assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> {
62-
payload.offer(new QueuePayload(
63-
QueuePayloadType.DATA, getFlagsFromResource(VALID_LONG), GetMetadataResponse.getDefaultInstance()));
55+
payload.offer(new QueuePayload(QueuePayloadType.DATA, getFlagsFromResource(VALID_LONG)));
6456
});
6557

6658
assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> {
@@ -69,7 +61,7 @@ void connectorHandling() throws Exception {
6961

7062
// ERROR is propagated correctly
7163
assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> {
72-
payload.offer(new QueuePayload(QueuePayloadType.ERROR, null, GetMetadataResponse.getDefaultInstance()));
64+
payload.offer(new QueuePayload(QueuePayloadType.ERROR, null));
7365
});
7466

7567
assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> {
@@ -93,10 +85,7 @@ public void changedFlags() throws Exception {
9385
final BlockingQueue<StorageStateChange> storageStateDTOS = store.getStateQueue();
9486

9587
assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> {
96-
payload.offer(new QueuePayload(
97-
QueuePayloadType.DATA,
98-
getFlagsFromResource(VALID_SIMPLE),
99-
GetMetadataResponse.getDefaultInstance()));
88+
payload.offer(new QueuePayload(QueuePayloadType.DATA, getFlagsFromResource(VALID_SIMPLE)));
10089
});
10190
// flags changed for first time
10291
assertEquals(
@@ -105,8 +94,7 @@ public void changedFlags() throws Exception {
10594
storageStateDTOS.take().getChangedFlagsKeys());
10695

10796
assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> {
108-
payload.offer(new QueuePayload(
109-
QueuePayloadType.DATA, getFlagsFromResource(VALID_LONG), GetMetadataResponse.getDefaultInstance()));
97+
payload.offer(new QueuePayload(QueuePayloadType.DATA, getFlagsFromResource(VALID_LONG)));
11098
});
11199
Map<String, FeatureFlag> expectedChangedFlags =
112100
FlagParser.parseString(getFlagsFromResource(VALID_LONG), true).getFlags();

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/MockConnector.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload;
44
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayloadType;
55
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueueSource;
6-
import dev.openfeature.flagd.grpc.sync.Sync.GetMetadataResponse;
76
import java.util.concurrent.BlockingQueue;
87
import lombok.extern.slf4j.Slf4j;
98

@@ -26,8 +25,7 @@ public BlockingQueue<QueuePayload> getStreamQueue() {
2625

2726
public void shutdown() {
2827
// Emit error mocking closed connection scenario
29-
if (!mockQueue.offer(new QueuePayload(
30-
QueuePayloadType.ERROR, "shutdown invoked", GetMetadataResponse.getDefaultInstance()))) {
28+
if (!mockQueue.offer(new QueuePayload(QueuePayloadType.ERROR, "shutdown invoked"))) {
3129
log.warn("Failed to offer shutdown status");
3230
}
3331
}

providers/flagd/test-harness

0 commit comments

Comments
 (0)