Skip to content

Commit d3c29f0

Browse files
Adding TelemtrySyncTask
1 parent 4b67733 commit d3c29f0

File tree

5 files changed

+60
-19
lines changed

5 files changed

+60
-19
lines changed

client/src/main/java/io/split/telemetry/synchronizer/HttpTelemetryMemorySender.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,11 @@
11
package io.split.telemetry.synchronizer;
22

33
import com.google.common.annotations.VisibleForTesting;
4-
import io.split.client.impressions.HttpImpressionsSender;
54
import io.split.client.utils.Utils;
5+
import io.split.service.HttpPostImp;
66
import io.split.telemetry.domain.Config;
77
import io.split.telemetry.domain.Stats;
8-
import io.split.service.HttpPostImp;
98
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
10-
import org.slf4j.Logger;
11-
import org.slf4j.LoggerFactory;
129

1310
import java.net.URI;
1411
import java.net.URISyntaxException;
@@ -20,8 +17,6 @@ public class HttpTelemetryMemorySender extends HttpPostImp {
2017
private static final String CONFIG_METRICS = "Config metrics ";
2118
private static final String STATS_METRICS = "Stats metrics ";
2219

23-
private static final Logger _logger = LoggerFactory.getLogger(HttpImpressionsSender.class);
24-
2520
private final URI _impressionConfigTarget;
2621
private final URI _impressionStatsTarget;
2722

client/src/main/java/io/split/telemetry/synchronizer/TelemetrySynchronizerImp.java renamed to client/src/main/java/io/split/telemetry/synchronizer/SynchronizerMemory.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,10 @@
1919
import java.net.URI;
2020
import java.net.URISyntaxException;
2121
import java.util.ArrayList;
22-
import java.util.Arrays;
2322
import java.util.List;
2423
import java.util.Map;
25-
import java.util.stream.Collectors;
2624

27-
public class TelemetrySynchronizerImp implements TelemetrySynchronizer{
25+
public class SynchronizerMemory implements TelemetrySynchronizer{
2826

2927
private static final int OPERATION_MODE = 0;
3028
private static final String STORAGE = "memory";
@@ -34,8 +32,8 @@ public class TelemetrySynchronizerImp implements TelemetrySynchronizer{
3432
private SplitCache _splitCache;
3533
private SegmentCache _segmentCache;
3634

37-
public TelemetrySynchronizerImp(CloseableHttpClient client, URI telemetryRootEndpoint, TelemetryStorageConsumer telemetryStorageConsumer, SplitCache splitCache,
38-
SegmentCache segmentCache) throws URISyntaxException {
35+
public SynchronizerMemory(CloseableHttpClient client, URI telemetryRootEndpoint, TelemetryStorageConsumer telemetryStorageConsumer, SplitCache splitCache,
36+
SegmentCache segmentCache) throws URISyntaxException {
3937
_httpHttpTelemetryMemorySender = HttpTelemetryMemorySender.create(client, telemetryRootEndpoint);
4038
_teleTelemetryStorageConsumer = telemetryStorageConsumer;
4139
_splitCache = splitCache;
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package io.split.telemetry.synchronizer;
2+
3+
import com.google.common.annotations.VisibleForTesting;
4+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
5+
6+
import java.util.concurrent.Executors;
7+
import java.util.concurrent.ScheduledExecutorService;
8+
import java.util.concurrent.ThreadFactory;
9+
import java.util.concurrent.TimeUnit;
10+
11+
public class TelemetrySyncTask {
12+
13+
private final ScheduledExecutorService _telemetrySyncScheduledExecutorService;
14+
private final TelemetrySynchronizer _telemetrySynchronizer;
15+
private final int _telemetryRefreshRate;
16+
17+
public TelemetrySyncTask(int telemetryRefreshRate, TelemetrySynchronizer telemetrySynchronizer) {
18+
ThreadFactory telemetrySyncThreadFactory = new ThreadFactoryBuilder()
19+
.setDaemon(true)
20+
.setNameFormat("Telemetry-synk-%d")
21+
.build();
22+
_telemetrySynchronizer = telemetrySynchronizer; //TODO
23+
_telemetryRefreshRate = telemetryRefreshRate;
24+
_telemetrySyncScheduledExecutorService = Executors.newSingleThreadScheduledExecutor(telemetrySyncThreadFactory);
25+
}
26+
27+
@VisibleForTesting
28+
protected void startScheduledTask() throws Exception {
29+
_telemetrySyncScheduledExecutorService.scheduleWithFixedDelay(() -> {
30+
try {
31+
_telemetrySynchronizer.synchronizeStats();
32+
} catch (Exception e) {
33+
e.printStackTrace();
34+
}
35+
},0l, _telemetryRefreshRate, TimeUnit.SECONDS);
36+
}
37+
}

client/src/test/java/io/split/telemetry/synchronizer/TelemetrySynchronizerImpTest.java renamed to client/src/test/java/io/split/telemetry/synchronizer/SynchronizerMemoryTest.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,15 @@
11
package io.split.telemetry.synchronizer;
22

33
import io.split.TestHelper;
4-
import io.split.cache.InMemoryCacheImp;
54
import io.split.cache.SegmentCache;
65
import io.split.cache.SegmentCacheInMemoryImpl;
76
import io.split.cache.SplitCache;
87
import io.split.client.SplitClientConfig;
9-
import io.split.service.HttpPostImp;
108
import io.split.telemetry.storage.InMemoryTelemetryStorage;
119
import io.split.telemetry.storage.TelemetryStorageConsumer;
12-
import org.apache.hc.client5.http.classic.methods.HttpOptions;
1310
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
14-
import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
15-
import org.apache.hc.core5.http.ClassicHttpRequest;
1611
import org.apache.hc.core5.http.HttpStatus;
1712
import org.junit.Test;
18-
import org.mockito.Mock;
1913
import org.mockito.Mockito;
2014

2115
import java.io.IOException;
@@ -25,7 +19,7 @@
2519
import java.util.ArrayList;
2620
import java.util.HashMap;
2721

28-
public class TelemetrySynchronizerImpTest{
22+
public class SynchronizerMemoryTest {
2923

3024
public static final String TELEMETRY_ENDPOINT = "https://telemetry.split.io/api/v1";
3125

@@ -53,7 +47,7 @@ private TelemetrySynchronizer getTelemetrySynchronizer(CloseableHttpClient httpC
5347
TelemetryStorageConsumer consumer = Mockito.mock(InMemoryTelemetryStorage.class);
5448
SplitCache splitCache = Mockito.mock(SplitCache.class);
5549
SegmentCache segmentCache = Mockito.mock(SegmentCacheInMemoryImpl.class);
56-
TelemetrySynchronizer telemetrySynchronizer = new TelemetrySynchronizerImp(httpClient, URI.create(TELEMETRY_ENDPOINT), consumer, splitCache, segmentCache);
50+
TelemetrySynchronizer telemetrySynchronizer = new SynchronizerMemory(httpClient, URI.create(TELEMETRY_ENDPOINT), consumer, splitCache, segmentCache);
5751
return telemetrySynchronizer;
5852
}
5953

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package io.split.telemetry.synchronizer;
2+
3+
import org.junit.Test;
4+
import org.mockito.Mockito;
5+
6+
public class TelemetrySyncTaskTest {
7+
8+
@Test
9+
public void testSynchronizationTask() throws Exception {
10+
TelemetrySynchronizer telemetrySynchronizer = Mockito.mock(SynchronizerMemory.class);
11+
TelemetrySyncTask telemetrySyncTask = new TelemetrySyncTask(1, telemetrySynchronizer);
12+
telemetrySyncTask.startScheduledTask();
13+
Thread.sleep(3000);
14+
Mockito.verify(telemetrySynchronizer, Mockito.times(3)).synchronizeStats();
15+
}
16+
17+
}

0 commit comments

Comments
 (0)