Skip to content

Commit 84fd4ca

Browse files
Attached every service
1 parent d3a3c7e commit 84fd4ca

36 files changed

+497
-209
lines changed

client/src/main/java/io/split/client/EventClientImpl.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
import io.split.client.dtos.Event;
55
import io.split.client.utils.GenericClientUtil;
66
import io.split.client.utils.Utils;
7+
import io.split.telemetry.domain.enums.EventsDataRecordsEnum;
8+
import io.split.telemetry.domain.enums.HTTPLatenciesEnum;
79
import io.split.telemetry.domain.enums.LastSynchronizationRecordsEnum;
810
import io.split.telemetry.storage.TelemetryEvaluationProducer;
911
import io.split.telemetry.storage.TelemetryRuntimeProducer;
@@ -49,7 +51,7 @@ public class EventClientImpl implements EventClient {
4951
private final CloseableHttpClient _httpclient;
5052
private final URI _target;
5153
private final int _waitBeforeShutdown;
52-
private final TelemetryRuntimeProducer telemetryRuntimeProducer;
54+
private final TelemetryRuntimeProducer _telemetryRuntimeProducer;
5355

5456
ThreadFactory eventClientThreadFactory(final String name) {
5557
return new ThreadFactory() {
@@ -67,18 +69,18 @@ public void run() {
6769
}
6870

6971

70-
public static EventClientImpl create(CloseableHttpClient httpclient, URI eventsRootTarget, int maxQueueSize, long flushIntervalMillis, int waitBeforeShutdown, TelemetryEvaluationProducer telemetryEvaluationProducer) throws URISyntaxException {
72+
public static EventClientImpl create(CloseableHttpClient httpclient, URI eventsRootTarget, int maxQueueSize, long flushIntervalMillis, int waitBeforeShutdown, TelemetryRuntimeProducer telemetryRuntimeProducer) throws URISyntaxException {
7173
return new EventClientImpl(new LinkedBlockingQueue<WrappedEvent>(),
7274
httpclient,
7375
Utils.appendPath(eventsRootTarget, "api/events/bulk"),
7476
maxQueueSize,
7577
flushIntervalMillis,
7678
waitBeforeShutdown,
77-
telemetryEvaluationProducer);
79+
telemetryRuntimeProducer);
7880
}
7981

8082
EventClientImpl(BlockingQueue<WrappedEvent> eventQueue, CloseableHttpClient httpclient, URI target, int maxQueueSize,
81-
long flushIntervalMillis, int waitBeforeShutdown, TelemetryEvaluationProducer telemetryEvaluationProducer) throws URISyntaxException {
83+
long flushIntervalMillis, int waitBeforeShutdown, TelemetryRuntimeProducer telemetryRuntimeProducer) throws URISyntaxException {
8284

8385
_httpclient = httpclient;
8486

@@ -87,9 +89,9 @@ public static EventClientImpl create(CloseableHttpClient httpclient, URI eventsR
8789
_eventQueue = eventQueue;
8890
_waitBeforeShutdown = waitBeforeShutdown;
8991

90-
_maxQueueSize = maxQueueSize;
92+
_maxQueueSize = 1;
9193
_flushIntervalMillis = flushIntervalMillis;
92-
telemetryRuntimeProducer = checkNotNull(telemetryEvaluationProducer);
94+
_telemetryRuntimeProducer = checkNotNull(telemetryRuntimeProducer);
9395

9496
_senderExecutor = new ThreadPoolExecutor(
9597
1,
@@ -129,9 +131,12 @@ public boolean track(Event event, int eventSize) {
129131
if (event == null) {
130132
return false;
131133
}
132-
_eventQueue.put(new WrappedEvent(event, eventSize));
134+
WrappedEvent we = new WrappedEvent(event, eventSize);
135+
_eventQueue.put(we);
136+
_telemetryRuntimeProducer.recordEventStats(EventsDataRecordsEnum.EVENTS_QUEUED, 1);
133137

134-
} catch (InterruptedException e) {
138+
} catch (ClassCastException | NullPointerException | InterruptedException e) {
139+
_telemetryRuntimeProducer.recordEventStats(EventsDataRecordsEnum.EVENTS_DROPPED, 1);
135140
_log.warn("Interruption when adding event withed while adding message %s.", event);
136141
return false;
137142
}
@@ -191,7 +196,8 @@ public void run() {
191196
events = new ArrayList<>();
192197
accumulated = 0;
193198
long endTime = System.currentTimeMillis();
194-
telemetryRuntimeProducer.recordSuccessfulSync(LastSynchronizationRecordsEnum.EVENTS, endTime-initTime);
199+
_telemetryRuntimeProducer.recordSyncLatency(HTTPLatenciesEnum.EVENTS, endTime-initTime);
200+
_telemetryRuntimeProducer.recordSuccessfulSync(LastSynchronizationRecordsEnum.EVENTS, endTime);
195201
}
196202
}
197203
} catch (InterruptedException e) {

client/src/main/java/io/split/client/HttpSegmentChangeFetcher.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@
66
import io.split.client.utils.Utils;
77
import io.split.engine.metrics.Metrics;
88
import io.split.engine.segments.SegmentChangeFetcher;
9+
import io.split.telemetry.domain.enums.HTTPLatenciesEnum;
10+
import io.split.telemetry.domain.enums.LastSynchronizationRecordsEnum;
11+
import io.split.telemetry.domain.enums.ResourceEnum;
12+
import io.split.telemetry.storage.TelemetryRuntimeProducer;
913
import org.apache.hc.client5.http.classic.methods.HttpGet;
1014
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
1115
import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
@@ -33,21 +37,17 @@ public final class HttpSegmentChangeFetcher implements SegmentChangeFetcher {
3337

3438
private final CloseableHttpClient _client;
3539
private final URI _target;
36-
private final Metrics _metrics;
40+
private final TelemetryRuntimeProducer _telemetryRuntimeProducer;
3741

38-
public static HttpSegmentChangeFetcher create(CloseableHttpClient client, URI root) throws URISyntaxException {
39-
return create(client, root, new Metrics.NoopMetrics());
42+
public static HttpSegmentChangeFetcher create(CloseableHttpClient client, URI root, TelemetryRuntimeProducer telemetryRuntimeProducer) throws URISyntaxException {
43+
return new HttpSegmentChangeFetcher(client, Utils.appendPath(root, "api/segmentChanges"), telemetryRuntimeProducer);
4044
}
4145

42-
public static HttpSegmentChangeFetcher create(CloseableHttpClient client, URI root, Metrics metrics) throws URISyntaxException {
43-
return new HttpSegmentChangeFetcher(client, Utils.appendPath(root, "api/segmentChanges"), metrics);
44-
}
45-
46-
private HttpSegmentChangeFetcher(CloseableHttpClient client, URI uri, Metrics metrics) {
46+
private HttpSegmentChangeFetcher(CloseableHttpClient client, URI uri, TelemetryRuntimeProducer telemetryRuntimeProducer) {
4747
_client = client;
4848
_target = uri;
49-
_metrics = metrics;
5049
checkNotNull(_target);
50+
_telemetryRuntimeProducer = checkNotNull(telemetryRuntimeProducer);
5151
}
5252

5353
@Override
@@ -68,16 +68,18 @@ public SegmentChange fetch(String segmentName, long since, boolean addCacheHeade
6868
int statusCode = response.getCode();
6969

7070
if (statusCode < 200 || statusCode >= 300) {
71+
_telemetryRuntimeProducer.recordSyncError(ResourceEnum.SEGMENT_SYNC, statusCode);
7172
_log.error("Response status was: " + statusCode);
7273
if (statusCode == 403) {
7374
_log.error("factory instantiation: you passed a browser type api_key, " +
7475
"please grab an api key from the Split console that is of type sdk");
7576
}
76-
_metrics.count(PREFIX + ".status." + statusCode, 1);
7777
throw new IllegalStateException("Could not retrieve segment changes for " + segmentName + "; http return code " + statusCode);
7878
}
7979

80-
80+
long endTime = System.currentTimeMillis();
81+
_telemetryRuntimeProducer.recordSyncLatency(HTTPLatenciesEnum.SEGMENTS, endTime-start);
82+
_telemetryRuntimeProducer.recordSuccessfulSync(LastSynchronizationRecordsEnum.SEGMENTS, endTime);
8183

8284
String json = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);
8385
if (_log.isDebugEnabled()) {
@@ -86,11 +88,9 @@ public SegmentChange fetch(String segmentName, long since, boolean addCacheHeade
8688

8789
return Json.fromJson(json, SegmentChange.class);
8890
} catch (Throwable t) {
89-
_metrics.count(PREFIX + ".exception", 1);
9091
throw new IllegalStateException("Problem fetching segmentChanges: " + t.getMessage(), t);
9192
} finally {
9293
Utils.forceClose(response);
93-
_metrics.time(PREFIX + ".time", System.currentTimeMillis() - start);
9494
}
9595

9696

client/src/main/java/io/split/client/HttpSplitChangeFetcher.java

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@
66
import io.split.client.utils.Utils;
77
import io.split.engine.experiments.SplitChangeFetcher;
88
import io.split.engine.metrics.Metrics;
9+
import io.split.telemetry.domain.enums.HTTPLatenciesEnum;
10+
import io.split.telemetry.domain.enums.LastSynchronizationRecordsEnum;
11+
import io.split.telemetry.domain.enums.ResourceEnum;
12+
import io.split.telemetry.storage.TelemetryRuntimeProducer;
913
import org.apache.hc.client5.http.classic.methods.HttpGet;
1014
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
1115
import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
@@ -33,26 +37,21 @@ public final class HttpSplitChangeFetcher implements SplitChangeFetcher {
3337

3438
private final CloseableHttpClient _client;
3539
private final URI _target;
36-
private final Metrics _metrics;
40+
private final TelemetryRuntimeProducer _telemetryRuntimeProducer;
3741

38-
public static HttpSplitChangeFetcher create(CloseableHttpClient client, URI root) throws URISyntaxException {
39-
return create(client, root, new Metrics.NoopMetrics());
42+
public static HttpSplitChangeFetcher create(CloseableHttpClient client, URI root, TelemetryRuntimeProducer telemetryRuntimeProducer) throws URISyntaxException {
43+
return new HttpSplitChangeFetcher(client, Utils.appendPath(root, "api/splitChanges"), telemetryRuntimeProducer);
4044
}
4145

42-
public static HttpSplitChangeFetcher create(CloseableHttpClient client, URI root, Metrics metrics) throws URISyntaxException {
43-
return new HttpSplitChangeFetcher(client, Utils.appendPath(root, "api/splitChanges"), metrics);
44-
}
45-
46-
private HttpSplitChangeFetcher(CloseableHttpClient client, URI uri, Metrics metrics) {
46+
private HttpSplitChangeFetcher(CloseableHttpClient client, URI uri, TelemetryRuntimeProducer telemetryRuntimeProducer) {
4747
_client = client;
4848
_target = uri;
49-
_metrics = metrics;
5049
checkNotNull(_target);
50+
_telemetryRuntimeProducer = checkNotNull(telemetryRuntimeProducer);
5151
}
5252

5353
@Override
5454
public SplitChange fetch(long since, boolean addCacheHeader) {
55-
5655
long start = System.currentTimeMillis();
5756

5857
CloseableHttpResponse response = null;
@@ -69,9 +68,11 @@ public SplitChange fetch(long since, boolean addCacheHeader) {
6968
int statusCode = response.getCode();
7069

7170
if (statusCode < 200 || statusCode >= 300) {
72-
_metrics.count(PREFIX + ".status." + statusCode, 1);
71+
_telemetryRuntimeProducer.recordSyncError(ResourceEnum.SPLIT_SYNC, statusCode);
7372
throw new IllegalStateException("Could not retrieve splitChanges; http return code " + statusCode);
7473
}
74+
long endtime = System.currentTimeMillis();
75+
_telemetryRuntimeProducer.recordSyncLatency(HTTPLatenciesEnum.SPLITS, endtime-start);
7576

7677

7778
String json = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);
@@ -81,11 +82,9 @@ public SplitChange fetch(long since, boolean addCacheHeader) {
8182

8283
return Json.fromJson(json, SplitChange.class);
8384
} catch (Throwable t) {
84-
_metrics.count(PREFIX + ".exception", 1);
8585
throw new IllegalStateException("Problem fetching splitChanges: " + t.getMessage(), t);
8686
} finally {
8787
Utils.forceClose(response);
88-
_metrics.time(PREFIX + ".time", System.currentTimeMillis() - start);
8988
}
9089
}
9190

client/src/main/java/io/split/client/LocalhostSplitFactory.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
import io.split.engine.SDKReadinessGates;
77
import io.split.engine.evaluator.EvaluatorImp;
88
import io.split.engine.metrics.Metrics;
9+
import io.split.telemetry.storage.InMemoryTelemetryStorage;
10+
import io.split.telemetry.storage.NoopTelemetryStorage;
911
import org.slf4j.Logger;
1012
import org.slf4j.LoggerFactory;
1113

@@ -59,7 +61,7 @@ public LocalhostSplitFactory(String directory, String file) throws IOException {
5961
_cacheUpdaterService.updateCache(splitAndKeyToTreatment);
6062
_client = new SplitClientImpl(this, splitCache,
6163
new ImpressionsManager.NoOpImpressionsManager(), new NoopEventClient(),
62-
SplitClientConfig.builder().setBlockUntilReadyTimeout(1).build(), sdkReadinessGates, new EvaluatorImp(splitCache));
64+
SplitClientConfig.builder().setBlockUntilReadyTimeout(1).build(), sdkReadinessGates, new EvaluatorImp(splitCache), new NoopTelemetryStorage(), new NoopTelemetryStorage());
6365
_manager = LocalhostSplitManager.of(splitAndKeyToTreatment);
6466

6567
_splitFile.registerWatcher();

client/src/main/java/io/split/client/SplitClientConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public class SplitClientConfig {
2121
public static final String EVENTS_ENDPOINT = "https://events.split.io";
2222
public static final String AUTH_ENDPOINT = "https://auth.split.io/api/auth";
2323
public static final String STREAMING_ENDPOINT = "https://streaming.split.io/sse";
24-
public static final String TELEMETRY_ENDPOINT = "https://telemetry.split.io/api/v1";
24+
public static final String TELEMETRY_ENDPOINT = "https://telemetry.split-stage.io/api/v1/";
2525

2626
private final String _endpoint;
2727
private final String _eventsEndpoint;

client/src/main/java/io/split/client/SplitClientImpl.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515
import io.split.inputValidation.KeyValidator;
1616
import io.split.inputValidation.SplitNameValidator;
1717
import io.split.inputValidation.TrafficTypeValidator;
18+
import io.split.telemetry.domain.enums.LastSynchronizationRecordsEnum;
1819
import io.split.telemetry.domain.enums.MethodEnum;
20+
import io.split.telemetry.storage.TelemetryConfigProducer;
1921
import io.split.telemetry.storage.TelemetryEvaluationProducer;
2022
import org.slf4j.Logger;
2123
import org.slf4j.LoggerFactory;
@@ -49,6 +51,7 @@ public final class SplitClientImpl implements SplitClient {
4951
private final SDKReadinessGates _gates;
5052
private final Evaluator _evaluator;
5153
private final TelemetryEvaluationProducer _telemetryEvaluationProducer;
54+
private final TelemetryConfigProducer _telemetryConfigProducer;
5255

5356
public SplitClientImpl(SplitFactory container,
5457
SplitCache splitCache,
@@ -57,7 +60,8 @@ public SplitClientImpl(SplitFactory container,
5760
SplitClientConfig config,
5861
SDKReadinessGates gates,
5962
Evaluator evaluator,
60-
TelemetryEvaluationProducer telemetryEvaluationProducer) {
63+
TelemetryEvaluationProducer telemetryEvaluationProducer,
64+
TelemetryConfigProducer telemetryConfigProducer) {
6165
_container = container;
6266
_splitCache = checkNotNull(splitCache);
6367
_impressionManager = checkNotNull(impressionManager);
@@ -66,6 +70,7 @@ public SplitClientImpl(SplitFactory container,
6670
_gates = checkNotNull(gates);
6771
_evaluator = checkNotNull(evaluator);
6872
_telemetryEvaluationProducer = checkNotNull(telemetryEvaluationProducer);
73+
_telemetryConfigProducer = checkNotNull(telemetryConfigProducer);
6974
}
7075

7176
@Override
@@ -145,6 +150,7 @@ public void destroy() {
145150
}
146151

147152
private boolean track(Event event) {
153+
long initTime = System.currentTimeMillis();
148154
if (_container.isDestroyed()) {
149155
_log.error("Client has already been destroyed - no calls possible");
150156
return false;
@@ -174,15 +180,19 @@ private boolean track(Event event) {
174180
}
175181

176182
event.properties = propertiesResult.getValue();
183+
_telemetryEvaluationProducer.recordLatency(MethodEnum.TRACK, System.currentTimeMillis() - initTime);
177184

178185
return _eventClient.track(event, propertiesResult.getEventSize());
179186
}
180187

181188
private SplitResult getTreatmentWithConfigInternal(String method, String matchingKey, String bucketingKey, String split, Map<String, Object> attributes, MethodEnum methodEnum) {
182189
long initTime = System.currentTimeMillis();
183190
try {
191+
if(!_gates.isSDKReadyNow()){
192+
_log.warn(method + ": the SDK is not ready, results may be incorrect. Make sure to wait for SDK readiness before using this method");
193+
_telemetryConfigProducer.recordNonReadyUsage();
194+
}
184195
if (_container.isDestroyed()) {
185-
_telemetryEvaluationProducer.recordException(methodEnum);
186196
_log.error("Client has already been destroyed - no calls possible");
187197
return SPLIT_RESULT_CONTROL;
188198
}

0 commit comments

Comments
 (0)