Skip to content

Commit 4b67733

Browse files
Merge pull request #211 from splitio/telemetry-synchronizer
Telemetry synchronizer
2 parents d5ef9ef + 86be0c0 commit 4b67733

30 files changed

+1125
-60
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ Split has built and maintains SDKs for:
6464
* Java [Github](https://github.com/splitio/java-client) [Docs](https://help.split.io/hc/en-us/articles/360020405151-Java-SDK)
6565
* Javascript [Github](https://github.com/splitio/javascript-client) [Docs](https://help.split.io/hc/en-us/articles/360020448791-JavaScript-SDK)
6666
* Node [Github](https://github.com/splitio/javascript-client) [Docs](https://help.split.io/hc/en-us/articles/360020564931-Node-js-SDK)
67-
* .NET [Github](https://github.com/splitio/.net-core-client) [Docs](https://help.split.io/hc/en-us/articles/360020240172--NET-SDK)
67+
* .NET [Github](https://github.com/splitio/dotnet-client) [Docs](https://help.split.io/hc/en-us/articles/360020240172--NET-SDK)
6868
* Ruby [Github](https://github.com/splitio/ruby-client) [Docs](https://help.split.io/hc/en-us/articles/360020673251-Ruby-SDK)
6969
* PHP [Github](https://github.com/splitio/php-client) [Docs](https://help.split.io/hc/en-us/articles/360020350372-PHP-SDK)
7070
* Python [Github](https://github.com/splitio/python-client) [Docs](https://help.split.io/hc/en-us/articles/360020359652-Python-SDK)

client/CHANGES.txt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
CHANGES
22

3+
4.1.6 (Apr 15, 2021)
4+
-Updated log level and message in some messages.
5+
6+
4.1.5 (Apr 6, 2021)
7+
-Updated: Streaming retry fix.
8+
39
4.1.4 (Mar 19, 2021)
410
- Updated: Internal cache structure refactor.
511
- Updated: Streaming revamp with several bugfixes and improved log messages.

client/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<parent>
66
<groupId>io.split.client</groupId>
77
<artifactId>java-client-parent</artifactId>
8-
<version>4.1.4</version>
8+
<version>4.2.0-rc1</version>
99
</parent>
1010
<artifactId>java-client</artifactId>
1111
<packaging>jar</packaging>

client/src/main/java/io/split/cache/SegmentCache.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package io.split.cache;
22

3+
import io.split.engine.segments.SegmentImp;
4+
35
import java.util.List;
6+
import java.util.Set;
47

58
/**
69
* Memory for segments
@@ -42,4 +45,16 @@ public interface SegmentCache {
4245
* clear all segments
4346
*/
4447
void clear();
48+
49+
/**
50+
* return every segment
51+
* @return
52+
*/
53+
List<SegmentImp> getAll();
54+
55+
/**
56+
* return every key
57+
* @return
58+
*/
59+
Set<String> getAllKeys();
4560
}

client/src/main/java/io/split/cache/SegmentCacheInMemoryImpl.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66
import org.slf4j.LoggerFactory;
77

88
import java.util.List;
9+
import java.util.Set;
910
import java.util.concurrent.ConcurrentMap;
11+
import java.util.stream.Collectors;
1012

1113
/**
1214
* InMemoryCache Implementation
@@ -59,4 +61,14 @@ public long getChangeNumber(String segmentName) {
5961
public void clear() {
6062
_segments.clear();
6163
}
64+
65+
@Override
66+
public List<SegmentImp> getAll() {
67+
return _segments.values().stream().collect(Collectors.toList());
68+
}
69+
70+
@Override
71+
public Set<String> getAllKeys() {
72+
return _segments.values().stream().flatMap(si -> si.getKeys().stream()).collect(Collectors.toSet());
73+
}
6274
}

client/src/main/java/io/split/client/SplitClientConfig.java

Lines changed: 95 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@
1717
public class SplitClientConfig {
1818

1919
public static final String LOCALHOST_DEFAULT_FILE = "split.yaml";
20+
public static final String SDK_ENDPOINT = "https://sdk.split.io";
21+
public static final String EVENTS_ENDPOINT = "https://events.split.io";
22+
public static final String AUTH_ENDPOINT = "https://auth.split.io/api/auth";
23+
public static final String STREAMING_ENDPOINT = "https://streaming.split.io/sse";
24+
public static final String TELEMETRY_ENDPOINT = "https://telemetry.split.io/api/v1";
2025

2126
private final String _endpoint;
2227
private final String _eventsEndpoint;
@@ -46,6 +51,9 @@ public class SplitClientConfig {
4651
private final int _streamingReconnectBackoffBase;
4752
private final String _authServiceURL;
4853
private final String _streamingServiceURL;
54+
private final String _telemetryURL;
55+
private final int _telemetryRefreshRate;
56+
private final int _onDemandFetchRetryDelayMs;
4957

5058
// Proxy configs
5159
private final HttpHost _proxy;
@@ -89,7 +97,10 @@ private SplitClientConfig(String endpoint,
8997
int authRetryBackoffBase,
9098
int streamingReconnectBackoffBase,
9199
String authServiceURL,
92-
String streamingServiceURL) {
100+
String streamingServiceURL,
101+
String telemetryURL,
102+
int telemetryRefreshRate,
103+
int onDemandFetchRetryDelayMs) {
93104
_endpoint = endpoint;
94105
_eventsEndpoint = eventsEndpoint;
95106
_featuresRefreshRate = pollForFeatureChangesEveryNSeconds;
@@ -120,6 +131,9 @@ private SplitClientConfig(String endpoint,
120131
_streamingReconnectBackoffBase = streamingReconnectBackoffBase;
121132
_authServiceURL = authServiceURL;
122133
_streamingServiceURL = streamingServiceURL;
134+
_telemetryURL = telemetryURL;
135+
_telemetryRefreshRate = telemetryRefreshRate;
136+
_onDemandFetchRetryDelayMs = onDemandFetchRetryDelayMs;
123137

124138
Properties props = new Properties();
125139
try {
@@ -248,11 +262,20 @@ public String streamingServiceURL() {
248262
return _streamingServiceURL;
249263
}
250264

265+
public String get_telemetryURL() {
266+
return _telemetryURL;
267+
}
268+
269+
public int get_telemetryRefreshRate() {
270+
return _telemetryRefreshRate;
271+
}
272+
public int streamingRetryDelay() {return _onDemandFetchRetryDelayMs;}
273+
251274
public static final class Builder {
252275

253-
private String _endpoint = "https://sdk.split.io";
276+
private String _endpoint = SDK_ENDPOINT;
254277
private boolean _endpointSet = false;
255-
private String _eventsEndpoint = "https://events.split.io";
278+
private String _eventsEndpoint = EVENTS_ENDPOINT;
256279
private boolean _eventsEndpointSet = false;
257280
private int _featuresRefreshRate = 60;
258281
private int _segmentsRefreshRate = 60;
@@ -281,8 +304,11 @@ public static final class Builder {
281304
private boolean _streamingEnabled = true;
282305
private int _authRetryBackoffBase = 1;
283306
private int _streamingReconnectBackoffBase = 1;
284-
private String _authServiceURL = "https://auth.split.io/api/auth";
285-
private String _streamingServiceURL = "https://streaming.split.io/sse";
307+
private String _authServiceURL = AUTH_ENDPOINT;
308+
private String _streamingServiceURL = STREAMING_ENDPOINT;
309+
private String _telemetryURl = TELEMETRY_ENDPOINT;
310+
private int _telemetryRefreshRate = 60;
311+
private int _onDemandFetchRetryDelayMs = 50;
286312

287313
public Builder() {
288314
}
@@ -674,6 +700,27 @@ public Builder streamingServiceURL(String streamingServiceURL) {
674700
return this;
675701
}
676702

703+
/**
704+
* Set telemetry service URL.
705+
* @param telemetryURL
706+
* @return
707+
*/
708+
public Builder telemetryURL(String telemetryURL) {
709+
_telemetryURl = telemetryURL;
710+
return this;
711+
}
712+
713+
/**
714+
* How often send telemetry data
715+
*
716+
* @param telemetryRefreshRate
717+
* @return this builder
718+
*/
719+
public Builder telemetryRefreshRate(int telemetryRefreshRate) {
720+
_telemetryRefreshRate = telemetryRefreshRate;
721+
return this;
722+
}
723+
677724
public SplitClientConfig build() {
678725
if (_featuresRefreshRate < 5 ) {
679726
throw new IllegalArgumentException("featuresRefreshRate must be >= 5: " + _featuresRefreshRate);
@@ -744,37 +791,48 @@ public SplitClientConfig build() {
744791
throw new IllegalArgumentException("streamingServiceURL must not be null");
745792
}
746793

747-
return new SplitClientConfig(
748-
_endpoint,
749-
_eventsEndpoint,
750-
_featuresRefreshRate,
751-
_segmentsRefreshRate,
752-
_impressionsRefreshRate,
753-
_impressionsQueueSize,
754-
_impressionsMode,
755-
_metricsRefreshRate,
756-
_connectionTimeout,
757-
_readTimeout,
758-
_numThreadsForSegmentFetch,
759-
_ready,
760-
_debugEnabled,
761-
_labelsEnabled,
762-
_ipAddressEnabled,
763-
_waitBeforeShutdown,
764-
proxy(),
765-
_proxyUsername,
766-
_proxyPassword,
767-
_eventsQueueSize,
768-
_eventFlushIntervalInMillis,
769-
_maxStringLength,
770-
_destroyOnShutDown,
771-
_splitFile,
772-
_integrationsConfig,
773-
_streamingEnabled,
774-
_authRetryBackoffBase,
775-
_streamingReconnectBackoffBase,
776-
_authServiceURL,
777-
_streamingServiceURL);
778-
}
794+
if (_telemetryURl == null) {
795+
throw new IllegalArgumentException("telemetryURl must not be null");
796+
}
797+
798+
if (_onDemandFetchRetryDelayMs <= 0) {
799+
throw new IllegalStateException("streamingRetryDelay must be > 0");
800+
}
801+
802+
return new SplitClientConfig(
803+
_endpoint,
804+
_eventsEndpoint,
805+
_featuresRefreshRate,
806+
_segmentsRefreshRate,
807+
_impressionsRefreshRate,
808+
_impressionsQueueSize,
809+
_impressionsMode,
810+
_metricsRefreshRate,
811+
_connectionTimeout,
812+
_readTimeout,
813+
_numThreadsForSegmentFetch,
814+
_ready,
815+
_debugEnabled,
816+
_labelsEnabled,
817+
_ipAddressEnabled,
818+
_waitBeforeShutdown,
819+
proxy(),
820+
_proxyUsername,
821+
_proxyPassword,
822+
_eventsQueueSize,
823+
_eventFlushIntervalInMillis,
824+
_maxStringLength,
825+
_destroyOnShutDown,
826+
_splitFile,
827+
_integrationsConfig,
828+
_streamingEnabled,
829+
_authRetryBackoffBase,
830+
_streamingReconnectBackoffBase,
831+
_authServiceURL,
832+
_streamingServiceURL,
833+
_telemetryURl,
834+
_telemetryRefreshRate,
835+
_onDemandFetchRetryDelayMs);
836+
}
779837
}
780838
}

client/src/main/java/io/split/client/SplitFactoryImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
132132
_eventClient = EventClientImpl.create(_httpclient, _eventsRootTarget, config.eventsQueueSize(), config.eventFlushIntervalInMillis(), config.waitBeforeShutdown());
133133

134134
// SyncManager
135-
_syncManager = SyncManagerImp.build(config.streamingEnabled(), _splitSynchronizationTask, _splitFetcher, _segmentSynchronizationTaskImp, _splitCache, config.authServiceURL(), _httpclient, config.streamingServiceURL(), config.authRetryBackoffBase(), buildSSEdHttpClient(config), _segmentCache);
135+
_syncManager = SyncManagerImp.build(config.streamingEnabled(), _splitSynchronizationTask, _splitFetcher, _segmentSynchronizationTaskImp, _splitCache, config.authServiceURL(), _httpclient, config.streamingServiceURL(), config.authRetryBackoffBase(), buildSSEdHttpClient(config), _segmentCache, config.streamingRetryDelay());
136136
_syncManager.start();
137137

138138
// Evaluator

client/src/main/java/io/split/client/metrics/HttpMetrics.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public void time(Latency dto) {
5050
try {
5151
post(_timeTarget, dto);
5252
} catch (Throwable t) {
53-
_log.warn("Exception when posting metric" + dto, t);
53+
_log.warn("Exception when posting metric " + dto, t);
5454
}
5555
;
5656

@@ -61,7 +61,7 @@ public void count(Counter dto) {
6161
try {
6262
post(_counterTarget, dto);
6363
} catch (Throwable t) {
64-
_log.warn("Exception when posting metric" + dto, t);
64+
_log.warn("Exception when posting metric " + dto, t);
6565
}
6666

6767
}
@@ -85,9 +85,9 @@ private void post(URI uri, Object dto) {
8585
}
8686

8787
} catch (Throwable t) {
88-
_log.warn("Exception when posting metrics:" + t.getMessage());
88+
_log.warn("Exception when posting metrics: " + t.getMessage());
8989
if (_log.isDebugEnabled()) {
90-
_log.debug("Reason:", t);
90+
_log.debug("Reason: ", t);
9191
}
9292
} finally {
9393
Utils.forceClose(response);

client/src/main/java/io/split/engine/common/SyncManagerImp.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,10 @@ public static SyncManagerImp build(boolean streamingEnabledConfig,
5959
String streamingServiceUrl,
6060
int authRetryBackOffBase,
6161
CloseableHttpClient sseHttpClient,
62-
SegmentCache segmentCache) {
62+
SegmentCache segmentCache,
63+
int streamingRetryDelay) {
6364
LinkedBlockingQueue<PushManager.Status> pushMessages = new LinkedBlockingQueue<>();
64-
Synchronizer synchronizer = new SynchronizerImp(splitSynchronizationTask, splitFetcher, segmentSynchronizationTaskImp, splitCache, segmentCache);
65+
Synchronizer synchronizer = new SynchronizerImp(splitSynchronizationTask, splitFetcher, segmentSynchronizationTaskImp, splitCache, segmentCache, streamingRetryDelay);
6566
PushManager pushManager = PushManagerImp.build(synchronizer, streamingServiceUrl, authUrl, httpClient, pushMessages, sseHttpClient);
6667
return new SyncManagerImp(streamingEnabledConfig, synchronizer, pushManager, pushMessages, authRetryBackOffBase);
6768
}

client/src/main/java/io/split/engine/common/SynchronizerImp.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,28 @@
1919

2020
public class SynchronizerImp implements Synchronizer {
2121
private static final Logger _log = LoggerFactory.getLogger(Synchronizer.class);
22+
private static final int RETRIES_NUMBER = 10;
2223

2324
private final SplitSynchronizationTask _splitSynchronizationTask;
2425
private final SplitFetcher _splitFetcher;
2526
private final SegmentSynchronizationTask _segmentSynchronizationTaskImp;
2627
private final ScheduledExecutorService _syncAllScheduledExecutorService;
2728
private final SplitCache _splitCache;
2829
private final SegmentCache _segmentCache;
30+
private final int _onDemandFetchRetryDelayMs;
2931

3032
public SynchronizerImp(SplitSynchronizationTask splitSynchronizationTask,
3133
SplitFetcher splitFetcher,
3234
SegmentSynchronizationTask segmentSynchronizationTaskImp,
3335
SplitCache splitCache,
34-
SegmentCache segmentCache) {
36+
SegmentCache segmentCache,
37+
int onDemandFetchRetryDelayMs) {
3538
_splitSynchronizationTask = checkNotNull(splitSynchronizationTask);
3639
_splitFetcher = checkNotNull(splitFetcher);
3740
_segmentSynchronizationTaskImp = checkNotNull(segmentSynchronizationTaskImp);
3841
_splitCache = checkNotNull(splitCache);
3942
_segmentCache = checkNotNull(segmentCache);
43+
_onDemandFetchRetryDelayMs = checkNotNull(onDemandFetchRetryDelayMs);
4044

4145
ThreadFactory splitsThreadFactory = new ThreadFactoryBuilder()
4246
.setDaemon(true)
@@ -69,8 +73,23 @@ public void stopPeriodicFetching() {
6973

7074
@Override
7175
public void refreshSplits(long targetChangeNumber) {
72-
if (targetChangeNumber > _splitCache.getChangeNumber()) {
76+
int retries = RETRIES_NUMBER;
77+
while(targetChangeNumber > _splitCache.getChangeNumber()) {
78+
retries--;
7379
_splitFetcher.forceRefresh(true);
80+
if (targetChangeNumber <= _splitCache.getChangeNumber()) {
81+
_log.debug("Refresh completed in %s attempts.", RETRIES_NUMBER - retries);
82+
return;
83+
} else if (retries <= 0) {
84+
_log.warn("No changes fetched after %s attempts.", RETRIES_NUMBER);
85+
return;
86+
}
87+
try {
88+
Thread.sleep(_onDemandFetchRetryDelayMs);
89+
} catch (InterruptedException e) {
90+
Thread.currentThread().interrupt();
91+
_log.debug("Error trying to sleep current Thread.");
92+
}
7493
}
7594
}
7695

@@ -84,7 +103,8 @@ public void localKillSplit(String splitName, String defaultTreatment, long newCh
84103

85104
@Override
86105
public void refreshSegment(String segmentName, long changeNumber) {
87-
if (changeNumber > _segmentCache.getChangeNumber(segmentName)) {
106+
int retries = 1;
107+
while(changeNumber > _segmentCache.getChangeNumber(segmentName) && retries <= RETRIES_NUMBER) {
88108
SegmentFetcher fetcher = _segmentSynchronizationTaskImp.getFetcher(segmentName);
89109
try{
90110
fetcher.fetch(true);
@@ -93,6 +113,7 @@ public void refreshSegment(String segmentName, long changeNumber) {
93113
catch (NullPointerException np){
94114
throw new NullPointerException();
95115
}
116+
retries++;
96117
}
97118
}
98119
}

0 commit comments

Comments
 (0)