Skip to content

Commit 4dcbff8

Browse files
Merge branch 'development' into telemetry
2 parents efaac85 + 99cf461 commit 4dcbff8

28 files changed

+1149
-149
lines changed

client/src/main/java/io/split/client/HttpSegmentChangeFetcher.java

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import io.split.client.dtos.SegmentChange;
55
import io.split.client.utils.Json;
66
import io.split.client.utils.Utils;
7+
import io.split.engine.common.FetchOptions;
78
import io.split.engine.metrics.Metrics;
89
import io.split.engine.segments.SegmentChangeFetcher;
910
import io.split.telemetry.domain.enums.HTTPLatenciesEnum;
@@ -14,6 +15,7 @@
1415
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
1516
import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
1617
import org.apache.hc.core5.http.HttpStatus;
18+
import org.apache.hc.core5.http.Header;
1719
import org.apache.hc.core5.http.io.entity.EntityUtils;
1820
import org.apache.hc.core5.net.URIBuilder;
1921
import org.slf4j.Logger;
@@ -22,6 +24,8 @@
2224
import java.net.URI;
2325
import java.net.URISyntaxException;
2426
import java.nio.charset.StandardCharsets;
27+
import java.util.Arrays;
28+
import java.util.stream.Collectors;
2529

2630
import static com.google.common.base.Preconditions.checkNotNull;
2731

@@ -32,9 +36,13 @@ public final class HttpSegmentChangeFetcher implements SegmentChangeFetcher {
3236
private static final Logger _log = LoggerFactory.getLogger(HttpSegmentChangeFetcher.class);
3337

3438
private static final String SINCE = "since";
39+
private static final String TILL = "till";
3540
private static final String PREFIX = "segmentChangeFetcher";
36-
private static final String NAME_CACHE = "Cache-Control";
37-
private static final String VALUE_CACHE = "no-cache";
41+
private static final String CACHE_CONTROL_HEADER_NAME = "Cache-Control";
42+
private static final String CACHE_CONTROL_HEADER_VALUE = "no-cache";
43+
44+
private static final String HEADER_FASTLY_DEBUG_NAME = "Fastly-Debug";
45+
private static final String HEADER_FASTLY_DEBUG_VALUE = "1";
3846

3947
private final CloseableHttpClient _client;
4048
private final URI _target;
@@ -52,19 +60,34 @@ private HttpSegmentChangeFetcher(CloseableHttpClient client, URI uri, TelemetryR
5260
}
5361

5462
@Override
55-
public SegmentChange fetch(String segmentName, long since, boolean addCacheHeader) {
63+
public SegmentChange fetch(String segmentName, long since, FetchOptions options) {
5664
long start = System.currentTimeMillis();
5765

5866
CloseableHttpResponse response = null;
5967

6068
try {
6169
String path = _target.getPath() + "/" + segmentName;
62-
URI uri = new URIBuilder(_target).setPath(path).addParameter(SINCE, "" + since).build();
70+
URIBuilder uriBuilder = new URIBuilder(_target)
71+
.setPath(path)
72+
.addParameter(SINCE, "" + since);
73+
if (options.hasCustomCN()) {
74+
uriBuilder.addParameter(TILL, "" + options.targetCN());
75+
}
76+
77+
URI uri = uriBuilder.build();
6378
HttpGet request = new HttpGet(uri);
64-
if(addCacheHeader) {
65-
request.setHeader(NAME_CACHE, VALUE_CACHE);
79+
80+
if(options.cacheControlHeadersEnabled()) {
81+
request.setHeader(CACHE_CONTROL_HEADER_NAME, CACHE_CONTROL_HEADER_VALUE);
6682
}
83+
84+
if (options.fastlyDebugHeaderEnabled()) {
85+
request.addHeader(HEADER_FASTLY_DEBUG_NAME, HEADER_FASTLY_DEBUG_VALUE);
86+
}
87+
6788
response = _client.execute(request);
89+
options.handleResponseHeaders(Arrays.stream(response.getHeaders())
90+
.collect(Collectors.toMap(Header::getName, Header::getValue)));
6891

6992
int statusCode = response.getCode();
7093

client/src/main/java/io/split/client/HttpSplitChangeFetcher.java

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import io.split.client.dtos.SplitChange;
55
import io.split.client.utils.Json;
66
import io.split.client.utils.Utils;
7+
import io.split.engine.common.FetchOptions;
78
import io.split.engine.experiments.SplitChangeFetcher;
89
import io.split.engine.metrics.Metrics;
910
import io.split.telemetry.domain.enums.HTTPLatenciesEnum;
@@ -14,6 +15,7 @@
1415
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
1516
import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
1617
import org.apache.hc.core5.http.HttpStatus;
18+
import org.apache.hc.core5.http.Header;
1719
import org.apache.hc.core5.http.io.entity.EntityUtils;
1820
import org.apache.hc.core5.net.URIBuilder;
1921
import org.slf4j.Logger;
@@ -22,6 +24,8 @@
2224
import java.net.URI;
2325
import java.net.URISyntaxException;
2426
import java.nio.charset.StandardCharsets;
27+
import java.util.Arrays;
28+
import java.util.stream.Collectors;
2529

2630
import static com.google.common.base.Preconditions.checkNotNull;
2731

@@ -32,9 +36,14 @@ public final class HttpSplitChangeFetcher implements SplitChangeFetcher {
3236
private static final Logger _log = LoggerFactory.getLogger(HttpSplitChangeFetcher.class);
3337

3438
private static final String SINCE = "since";
39+
private static final String TILL = "till";
3540
private static final String PREFIX = "splitChangeFetcher";
36-
private static final String NAME_CACHE = "Cache-Control";
37-
private static final String VALUE_CACHE = "no-cache";
41+
42+
private static final String HEADER_CACHE_CONTROL_NAME = "Cache-Control";
43+
private static final String HEADER_CACHE_CONTROL_VALUE = "no-cache";
44+
45+
private static final String HEADER_FASTLY_DEBUG_NAME = "Fastly-Debug";
46+
private static final String HEADER_FASTLY_DEBUG_VALUE = "1";
3847

3948
private final CloseableHttpClient _client;
4049
private final URI _target;
@@ -51,20 +60,37 @@ private HttpSplitChangeFetcher(CloseableHttpClient client, URI uri, TelemetryRun
5160
_telemetryRuntimeProducer = checkNotNull(telemetryRuntimeProducer);
5261
}
5362

63+
long makeRandomTill() {
64+
65+
return (-1)*(long)Math.floor(Math.random()*(Math.pow(2, 63)));
66+
}
67+
5468
@Override
55-
public SplitChange fetch(long since, boolean addCacheHeader) {
69+
public SplitChange fetch(long since, FetchOptions options) {
70+
5671
long start = System.currentTimeMillis();
5772

5873
CloseableHttpResponse response = null;
5974

6075
try {
61-
URI uri = new URIBuilder(_target).addParameter(SINCE, "" + since).build();
76+
URIBuilder uriBuilder = new URIBuilder(_target).addParameter(SINCE, "" + since);
77+
if (options.hasCustomCN()) {
78+
uriBuilder.addParameter(TILL, "" + options.targetCN());
79+
}
80+
URI uri = uriBuilder.build();
6281

6382
HttpGet request = new HttpGet(uri);
64-
if(addCacheHeader) {
65-
request.setHeader(NAME_CACHE, VALUE_CACHE);
83+
if(options.cacheControlHeadersEnabled()) {
84+
request.setHeader(HEADER_CACHE_CONTROL_NAME, HEADER_CACHE_CONTROL_VALUE);
6685
}
86+
87+
if (options.fastlyDebugHeaderEnabled()) {
88+
request.addHeader(HEADER_FASTLY_DEBUG_NAME, HEADER_FASTLY_DEBUG_VALUE);
89+
}
90+
6791
response = _client.execute(request);
92+
options.handleResponseHeaders(Arrays.stream(response.getHeaders())
93+
.collect(Collectors.toMap(Header::getName, Header::getValue)));
6894

6995
int statusCode = response.getCode();
7096

client/src/main/java/io/split/client/SplitClientConfig.java

Lines changed: 61 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ public class SplitClientConfig {
5454
private final String _telemetryURL;
5555
private final int _telemetryRefreshRate;
5656
private final int _onDemandFetchRetryDelayMs;
57+
private final int _onDemandFetchMaxRetries;
58+
private final int _failedAttemptsBeforeLogging;
59+
private final boolean _cdnDebugLogging;
5760

5861
// Proxy configs
5962
private final HttpHost _proxy;
@@ -100,7 +103,10 @@ private SplitClientConfig(String endpoint,
100103
String streamingServiceURL,
101104
String telemetryURL,
102105
int telemetryRefreshRate,
103-
int onDemandFetchRetryDelayMs) {
106+
int onDemandFetchRetryDelayMs,
107+
int onDemandFetchMaxRetries,
108+
int failedAttemptsBeforeLogging,
109+
boolean cdnDebugLogging) {
104110
_endpoint = endpoint;
105111
_eventsEndpoint = eventsEndpoint;
106112
_featuresRefreshRate = pollForFeatureChangesEveryNSeconds;
@@ -134,6 +140,9 @@ private SplitClientConfig(String endpoint,
134140
_telemetryURL = telemetryURL;
135141
_telemetryRefreshRate = telemetryRefreshRate;
136142
_onDemandFetchRetryDelayMs = onDemandFetchRetryDelayMs;
143+
_onDemandFetchMaxRetries = onDemandFetchMaxRetries;
144+
_failedAttemptsBeforeLogging = failedAttemptsBeforeLogging;
145+
_cdnDebugLogging = cdnDebugLogging;
137146

138147
Properties props = new Properties();
139148
try {
@@ -271,6 +280,13 @@ public int get_telemetryRefreshRate() {
271280
}
272281
public int streamingRetryDelay() {return _onDemandFetchRetryDelayMs;}
273282

283+
public int streamingFetchMaxRetries() {return _onDemandFetchMaxRetries;}
284+
285+
public int failedAttemptsBeforeLogging() {return _failedAttemptsBeforeLogging;}
286+
287+
public boolean cdnDebugLogging() { return _cdnDebugLogging; }
288+
289+
274290
public static final class Builder {
275291

276292
private String _endpoint = SDK_ENDPOINT;
@@ -309,6 +325,9 @@ public static final class Builder {
309325
private String _telemetryURl = TELEMETRY_ENDPOINT;
310326
private int _telemetryRefreshRate = 60;
311327
private int _onDemandFetchRetryDelayMs = 50;
328+
private final int _onDemandFetchMaxRetries = 10;
329+
private final int _failedAttemptsBeforeLogging = 10;
330+
private final boolean _cdnDebugLogging = true;
312331

313332
public Builder() {
314333
}
@@ -798,41 +817,47 @@ public SplitClientConfig build() {
798817
if (_onDemandFetchRetryDelayMs <= 0) {
799818
throw new IllegalStateException("streamingRetryDelay must be > 0");
800819
}
801-
802-
return new SplitClientConfig(
803-
_endpoint,
804-
_eventsEndpoint,
805-
_featuresRefreshRate,
806-
_segmentsRefreshRate,
807-
_impressionsRefreshRate,
808-
_impressionsQueueSize,
809-
_impressionsMode,
810-
_metricsRefreshRate,
811-
_connectionTimeout,
812-
_readTimeout,
813-
_numThreadsForSegmentFetch,
814-
_ready,
815-
_debugEnabled,
816-
_labelsEnabled,
817-
_ipAddressEnabled,
818-
_waitBeforeShutdown,
819-
proxy(),
820-
_proxyUsername,
821-
_proxyPassword,
822-
_eventsQueueSize,
823-
_eventFlushIntervalInMillis,
824-
_maxStringLength,
825-
_destroyOnShutDown,
826-
_splitFile,
827-
_integrationsConfig,
828-
_streamingEnabled,
829-
_authRetryBackoffBase,
830-
_streamingReconnectBackoffBase,
831-
_authServiceURL,
832-
_streamingServiceURL,
833-
_telemetryURl,
834-
_telemetryRefreshRate,
835-
_onDemandFetchRetryDelayMs);
820+
if(_onDemandFetchMaxRetries <= 0) {
821+
throw new IllegalStateException("_onDemandFetchMaxRetries must be > 0");
836822
}
823+
824+
return new SplitClientConfig(
825+
_endpoint,
826+
_eventsEndpoint,
827+
_featuresRefreshRate,
828+
_segmentsRefreshRate,
829+
_impressionsRefreshRate,
830+
_impressionsQueueSize,
831+
_impressionsMode,
832+
_metricsRefreshRate,
833+
_connectionTimeout,
834+
_readTimeout,
835+
_numThreadsForSegmentFetch,
836+
_ready,
837+
_debugEnabled,
838+
_labelsEnabled,
839+
_ipAddressEnabled,
840+
_waitBeforeShutdown,
841+
proxy(),
842+
_proxyUsername,
843+
_proxyPassword,
844+
_eventsQueueSize,
845+
_eventFlushIntervalInMillis,
846+
_maxStringLength,
847+
_destroyOnShutDown,
848+
_splitFile,
849+
_integrationsConfig,
850+
_streamingEnabled,
851+
_authRetryBackoffBase,
852+
_streamingReconnectBackoffBase,
853+
_authServiceURL,
854+
_streamingServiceURL,
855+
_onDemandFetchRetryDelayMs,
856+
_onDemandFetchMaxRetries,
857+
_failedAttemptsBeforeLogging,
858+
_cdnDebugLogging,
859+
_telemetryURl,
860+
_telemetryRefreshRate);
861+
}
837862
}
838863
}

client/src/main/java/io/split/client/SplitFactoryImpl.java

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -133,21 +133,54 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
133133
_splitFetcher = buildSplitFetcher();
134134

135135
// SplitSynchronizationTask
136-
_splitSynchronizationTask = new SplitSynchronizationTask(_splitFetcher, _splitCache, findPollingPeriod(RANDOM, config.featuresRefreshRate()));
136+
_splitSynchronizationTask = new SplitSynchronizationTask(_splitFetcher,
137+
_splitCache,
138+
findPollingPeriod(RANDOM, config.featuresRefreshRate()));
137139

138140
// Impressions
139141
_impressionsManager = buildImpressionsManager(config);
140142

141143
// EventClient
142-
_eventClient = EventClientImpl.create(_httpclient, _eventsRootTarget, config.eventsQueueSize(), config.eventFlushIntervalInMillis(), config.waitBeforeShutdown(), _telemetryStorage);
144+
_eventClient = EventClientImpl.create(_httpclient,
145+
_eventsRootTarget,
146+
config.eventsQueueSize(),
147+
config.eventFlushIntervalInMillis(),
148+
config.waitBeforeShutdown());
143149

144150
_telemetrySyncTask = new TelemetrySyncTask(config.get_telemetryRefreshRate(), _telemetrySynchronizer);
145151

152+
// SyncManager
153+
_syncManager = SyncManagerImp.build(config.streamingEnabled(),
154+
_splitSynchronizationTask,
155+
_splitFetcher,
156+
_segmentSynchronizationTaskImp,
157+
_splitCache,
158+
config.authServiceURL(),
159+
_httpclient,
160+
config.streamingServiceURL(),
161+
config.authRetryBackoffBase(),
162+
buildSSEdHttpClient(config),
163+
_segmentCache,
164+
config.streamingRetryDelay(),
165+
config.streamingFetchMaxRetries(),
166+
config.failedAttemptsBeforeLogging(),
167+
config.cdnDebugLogging());
168+
_syncManager.start();
169+
146170
// Evaluator
147171
_evaluator = new EvaluatorImp(_splitCache);
148172

149173
// SplitClient
150-
_client = new SplitClientImpl(this, _splitCache, _impressionsManager, _eventClient, config, _gates, _evaluator, _telemetryStorage, _telemetryStorage);
174+
_client = new SplitClientImpl(this,
175+
_splitCache,
176+
_impressionsManager,
177+
_cachedFireAndForgetMetrics,
178+
_eventClient,
179+
config,
180+
_gates,
181+
_evaluator,
182+
_telemetryStorage,
183+
_telemetryStorage);
151184

152185
// SplitManager
153186
_manager = new SplitManagerImpl(_splitCache, config, _gates, _telemetryStorage);
@@ -160,10 +193,12 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
160193

161194
// DestroyOnShutDown
162195
if (config.destroyOnShutDown()) {
163-
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
196+
Thread shutdown = new Thread(() -> {
164197
// Using the full path to avoid conflicting with Thread.destroy()
165198
SplitFactoryImpl.this.destroy();
166-
}));
199+
});
200+
shutdown.setName("split-destroy-worker");
201+
Runtime.getRuntime().addShutdownHook(shutdown);
167202
}
168203
}
169204

@@ -214,7 +249,6 @@ public boolean isDestroyed() {
214249
}
215250

216251
private static CloseableHttpClient buildHttpClient(String apiToken, SplitClientConfig config) {
217-
218252
SSLConnectionSocketFactory sslSocketFactory = SSLConnectionSocketFactoryBuilder.create()
219253
.setSslContext(SSLContexts.createSystemDefault())
220254
.setTlsVersions(TLS.V_1_1, TLS.V_1_2)

0 commit comments

Comments
 (0)