Skip to content

Commit 74df9ee

Browse files
committed
refactor to fit new logic
1 parent 9c66c71 commit 74df9ee

18 files changed

+504
-338
lines changed

client/src/main/java/io/split/client/SplitClientImpl.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77
import io.split.client.dtos.Event;
88
import io.split.client.exceptions.ChangeNumberExceptionWrapper;
99
import io.split.client.impressions.Impression;
10-
import io.split.client.impressions.ImpressionListener;
10+
import io.split.client.impressions.ImpressionsManager;
11+
import io.split.client.impressions.ImpressionsManagerImpl;
1112
import io.split.engine.SDKReadinessGates;
1213
import io.split.engine.experiments.ParsedCondition;
1314
import io.split.engine.experiments.ParsedSplit;
@@ -50,7 +51,7 @@ public final class SplitClientImpl implements SplitClient {
5051

5152
private final SplitFactory _container;
5253
private final SplitFetcher _splitFetcher;
53-
private final ImpressionListener _impressionListener;
54+
private final ImpressionsManager _impressionManager;
5455
private final Metrics _metrics;
5556
private final SplitClientConfig _config;
5657
private final EventClient _eventClient;
@@ -59,22 +60,22 @@ public final class SplitClientImpl implements SplitClient {
5960

6061
public SplitClientImpl(SplitFactory container,
6162
SplitFetcher splitFetcher,
62-
ImpressionListener impressionListener,
63+
ImpressionsManager impressionManager,
6364
Metrics metrics,
6465
EventClient eventClient,
6566
SplitClientConfig config,
6667
SDKReadinessGates gates) {
6768
_container = container;
6869
_splitFetcher = splitFetcher;
69-
_impressionListener = impressionListener;
70+
_impressionManager = impressionManager;
7071
_metrics = metrics;
7172
_eventClient = eventClient;
7273
_config = config;
7374
_gates = gates;
7475

7576
checkNotNull(gates);
7677
checkNotNull(_splitFetcher);
77-
checkNotNull(_impressionListener);
78+
checkNotNull(_impressionManager);
7879
}
7980

8081
@Override
@@ -222,7 +223,7 @@ private SplitResult getTreatmentWithConfigInternal(String label, String matching
222223
private void recordStats(String matchingKey, String bucketingKey, String split, long start, String result,
223224
String operation, String label, Long changeNumber, Map<String, Object> attributes) {
224225
try {
225-
_impressionListener.log(new Impression(matchingKey, bucketingKey, split, result, System.currentTimeMillis(), label, changeNumber, attributes));
226+
_impressionManager.track(new Impression(matchingKey, bucketingKey, split, result, System.currentTimeMillis(), label, changeNumber, attributes));
226227
_metrics.time(operation, System.currentTimeMillis() - start);
227228
} catch (Throwable t) {
228229
_log.error("Exception", t);

client/src/main/java/io/split/client/SplitFactoryImpl.java

Lines changed: 11 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import com.google.common.collect.Multiset;
55
import io.split.client.impressions.AsynchronousImpressionListener;
66
import io.split.client.impressions.ImpressionListener;
7-
import io.split.client.impressions.ImpressionsManager;
7+
import io.split.client.impressions.ImpressionsManagerImpl;
88
import io.split.client.interceptors.AddSplitHeadersFilter;
99
import io.split.client.interceptors.GzipDecoderResponseInterceptor;
1010
import io.split.client.interceptors.GzipEncoderRequestInterceptor;
@@ -51,6 +51,7 @@
5151
import java.util.List;
5252
import java.util.Random;
5353
import java.util.concurrent.TimeUnit;
54+
import java.util.stream.Collectors;
5455

5556
public class SplitFactoryImpl implements SplitFactory {
5657
private static final Logger _log = LoggerFactory.getLogger(SplitFactory.class);
@@ -168,43 +169,21 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
168169

169170
final RefreshableSplitFetcherProvider splitFetcherProvider = new RefreshableSplitFetcherProvider(splitChangeFetcher, splitParser, findPollingPeriod(RANDOM, config.featuresRefreshRate()), gates);
170171

171-
// Impressions
172-
final ImpressionsManager splitImpressionListener = ImpressionsManager.instance(httpclient, config);
173172

174173
List<ImpressionListener> impressionListeners = new ArrayList<>();
175-
impressionListeners.add(splitImpressionListener);
176-
177174
// Setup integrations
178175
if (config.integrationsConfig() != null) {
176+
config.integrationsConfig().getImpressionsListeners(IntegrationsConfig.Execution.ASYNC).stream()
177+
.map(l -> AsynchronousImpressionListener.build(l.listener(), l.queueSize()))
178+
.collect(Collectors.toCollection(() -> impressionListeners));
179179

180-
// asynchronous impressions listeners
181-
List<IntegrationsConfig.ImpressionListenerWithMeta> asyncListeners = config
182-
.integrationsConfig()
183-
.getImpressionsListeners(IntegrationsConfig.Execution.ASYNC);
184-
185-
for (IntegrationsConfig.ImpressionListenerWithMeta listener : asyncListeners) {
186-
AsynchronousImpressionListener wrapper = AsynchronousImpressionListener
187-
.build(listener.listener(), listener.queueSize());
188-
impressionListeners.add(wrapper);
189-
}
190-
191-
// synchronous impressions listeners
192-
List<IntegrationsConfig.ImpressionListenerWithMeta> syncListeners = config
193-
.integrationsConfig()
194-
.getImpressionsListeners(IntegrationsConfig.Execution.SYNC);
195-
for (IntegrationsConfig.ImpressionListenerWithMeta listener: syncListeners) {
196-
impressionListeners.add(listener.listener());
197-
198-
}
180+
config.integrationsConfig().getImpressionsListeners(IntegrationsConfig.Execution.SYNC).stream()
181+
.map(IntegrationsConfig.ImpressionListenerWithMeta::listener)
182+
.collect(Collectors.toCollection(() -> impressionListeners));
199183
}
200184

201-
final ImpressionListener impressionListener;
202-
if (impressionListeners.size() > 1) {
203-
// since there are more than just the default integration, let's federate and add them all.
204-
impressionListener = new ImpressionListener.FederatedImpressionListener(impressionListeners);
205-
} else {
206-
impressionListener = splitImpressionListener;
207-
}
185+
// Impressions
186+
final ImpressionsManagerImpl impressionsManager = ImpressionsManagerImpl.instance(httpclient, config, impressionListeners);
208187

209188
CachedMetrics cachedMetrics = new CachedMetrics(httpMetrics, TimeUnit.SECONDS.toMillis(config.metricsRefreshRate()));
210189
final FireAndForgetMetrics cachedFireAndForgetMetrics = FireAndForgetMetrics.instance(cachedMetrics, 2, 1000);
@@ -227,8 +206,6 @@ public void run() {
227206
_log.info("Successful shutdown of metrics 1");
228207
cachedFireAndForgetMetrics.close();
229208
_log.info("Successful shutdown of metrics 2");
230-
impressionListener.close();
231-
_log.info("Successful shutdown of ImpressionListener");
232209
httpclient.close();
233210
_log.info("Successful shutdown of httpclient");
234211
eventClient.close();
@@ -253,7 +230,7 @@ public void run() {
253230

254231
_client = new SplitClientImpl(this,
255232
splitFetcherProvider.getFetcher(),
256-
impressionListener,
233+
impressionsManager,
257234
cachedFireAndForgetMetrics,
258235
eventClient,
259236
config,

client/src/main/java/io/split/client/dtos/KeyImpression.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package io.split.client.dtos;
22

33

4+
import io.split.client.impressions.Impression;
5+
46
public class KeyImpression {
57
public String feature;
68
public String keyName;
@@ -39,4 +41,16 @@ public int hashCode() {
3941
result = 31 * result + (int) (time ^ (time >>> 32));
4042
return result;
4143
}
44+
45+
public static KeyImpression fromImpression(Impression i) {
46+
KeyImpression ki = new KeyImpression();
47+
ki.feature = i.split();
48+
ki.keyName = i.key();
49+
ki.bucketingKey = i.bucketingKey();
50+
ki.time = i.time();
51+
ki.changeNumber = i.changeNumber();
52+
ki.treatment = i.treatment();
53+
ki.label = i.appliedRule();
54+
return ki;
55+
}
4256
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,22 @@
11
package io.split.client.dtos;
22

33
import java.util.List;
4+
import java.util.stream.Collectors;
45

56
public class TestImpressions {
67
public String testName;
78
public List<KeyImpression> keyImpressions;
9+
10+
public TestImpressions(String testName_, List<KeyImpression> keyImpressions_) {
11+
testName = testName_;
12+
keyImpressions = keyImpressions_;
13+
}
14+
15+
public static List<TestImpressions> fromKeyImpressions(List<KeyImpression> impressions) {
16+
return impressions.stream()
17+
.collect(Collectors.groupingBy(ki -> ki.feature))
18+
.entrySet().stream()
19+
.map((e) -> new TestImpressions(e.getKey(), e.getValue()))
20+
.collect(Collectors.toList());
21+
}
822
}

client/src/main/java/io/split/client/impressions/Impression.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ public class Impression {
1414
private final long _time;
1515
private final String _appliedRule;
1616
private final Long _changeNumber;
17+
private Long _pt;
1718
private final Map<String, Object> _attributes;
1819

1920

@@ -59,4 +60,10 @@ public Long changeNumber() {
5960
public Map<String, Object> attributes() {
6061
return _attributes;
6162
}
63+
64+
public Long pt() {
65+
return _pt;
66+
}
67+
68+
public Impression withPreviousTime(Long pt) { _pt = pt; return this; }
6269
}

client/src/main/java/io/split/client/impressions/ImpressionHasher.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package io.split.client.impressions;
22

3-
import io.split.client.dtos.KeyImpression;
43
import io.split.client.utils.MurmurHash3;
54

65
public class ImpressionHasher {
@@ -16,15 +15,15 @@ private static Long zeroIfNull(Long l) {
1615
return (l == null) ? 0 : l;
1716
}
1817

19-
public static Long process(KeyImpression impression) {
18+
public static Long process(Impression impression) {
2019
if (null == impression) {
2120
return null;
2221
}
2322
return MurmurHash3.hash128x64(String.format(HASHABLE_FORMAT,
24-
unknownIfNull(impression.keyName),
25-
unknownIfNull(impression.feature),
26-
unknownIfNull(impression.treatment),
27-
unknownIfNull(impression.label),
28-
zeroIfNull(impression.changeNumber)).getBytes())[0];
23+
unknownIfNull(impression.key()),
24+
unknownIfNull(impression.split()),
25+
unknownIfNull(impression.treatment()),
26+
unknownIfNull(impression.appliedRule()),
27+
zeroIfNull(impression.changeNumber())).getBytes())[0];
2928
}
3029
}

client/src/main/java/io/split/client/impressions/ImpressionObserver.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import com.google.common.cache.Cache;
44
import com.google.common.cache.CacheBuilder;
5-
import io.split.client.dtos.KeyImpression;
65

76
import java.util.Objects;
87

@@ -25,14 +24,14 @@ public ImpressionObserver(long size) {
2524
.build();
2625
}
2726

28-
public Long testAndSet(KeyImpression impression) {
27+
public Long testAndSet(Impression impression) {
2928
if (null == impression) {
3029
return null;
3130
}
3231

3332
Long hash = ImpressionHasher.process(impression);
3433
Long previous = _cache.getIfPresent(hash);
35-
_cache.put(hash, impression.time);
36-
return (Objects.isNull(previous)) ? null : Math.min(previous, impression.time);
34+
_cache.put(hash, impression.time());
35+
return (Objects.isNull(previous)) ? null : Math.min(previous, impression.time());
3736
}
3837
}

0 commit comments

Comments
 (0)