Skip to content

Commit 319d6ce

Browse files
authored
Merge pull request #155 from splitio/release/4.1.0
Release/4.1.0
2 parents cc9fdec + f0c3122 commit 319d6ce

36 files changed

+1509
-550
lines changed

client/CHANGES.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
CHANGES
22

3+
4.1.0 (Sep 25, 2020)
4+
- Add local impressions deduping (enabled by default)
5+
36
4.0.1 (Sep 4, 2020)
47
- Remove jersey. Use custom SSE implementation
58
- Bumped guava version to 29

client/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<parent>
66
<groupId>io.split.client</groupId>
77
<artifactId>java-client-parent</artifactId>
8-
<version>4.0.1</version>
8+
<version>4.1.0</version>
99
</parent>
1010
<artifactId>java-client</artifactId>
1111
<packaging>jar</packaging>

client/src/main/java/io/split/client/SplitClientConfig.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33

44
import io.split.client.impressions.ImpressionListener;
5+
import io.split.client.impressions.ImpressionsManager;
56
import io.split.integrations.IntegrationsConfig;
67
import org.apache.http.HttpHost;
78

@@ -24,6 +25,7 @@ public class SplitClientConfig {
2425
private final int _segmentsRefreshRate;
2526
private final int _impressionsRefreshRate;
2627
private final int _impressionsQueueSize;
28+
private final ImpressionsManager.Mode _impressionsMode;
2729
private final int _metricsRefreshRate;
2830
private final int _connectionTimeout;
2931
private final int _readTimeout;
@@ -64,6 +66,7 @@ private SplitClientConfig(String endpoint,
6466
int segmentsRefreshRate,
6567
int impressionsRefreshRate,
6668
int impressionsQueueSize,
69+
ImpressionsManager.Mode impressionsMode,
6770
int metricsRefreshRate,
6871
int connectionTimeout,
6972
int readTimeout,
@@ -93,6 +96,7 @@ private SplitClientConfig(String endpoint,
9396
_segmentsRefreshRate = segmentsRefreshRate;
9497
_impressionsRefreshRate = impressionsRefreshRate;
9598
_impressionsQueueSize = impressionsQueueSize;
99+
_impressionsMode = impressionsMode;
96100
_metricsRefreshRate = metricsRefreshRate;
97101
_connectionTimeout = connectionTimeout;
98102
_readTimeout = readTimeout;
@@ -158,6 +162,8 @@ public int impressionsQueueSize() {
158162
return _impressionsQueueSize;
159163
}
160164

165+
public ImpressionsManager.Mode impressionsMode() { return _impressionsMode; }
166+
161167
public int metricsRefreshRate() {
162168
return _metricsRefreshRate;
163169
}
@@ -250,8 +256,9 @@ public static final class Builder {
250256
private boolean _eventsEndpointSet = false;
251257
private int _featuresRefreshRate = 60;
252258
private int _segmentsRefreshRate = 60;
253-
private int _impressionsRefreshRate = 30;
259+
private int _impressionsRefreshRate = -1; // use -1 to identify lack of a user submitted value & handle in build()
254260
private int _impressionsQueueSize = 30000;
261+
private ImpressionsManager.Mode _impressionsMode = ImpressionsManager.Mode.OPTIMIZED;
255262
private int _connectionTimeout = 15000;
256263
private int _readTimeout = 15000;
257264
private int _numThreadsForSegmentFetch = 2;
@@ -380,6 +387,11 @@ public Builder impressionsRefreshRate(int seconds) {
380387
return this;
381388
}
382389

390+
public Builder impressionsMode(ImpressionsManager.Mode mode) {
391+
_impressionsMode = mode;
392+
return this;
393+
}
394+
383395
/**
384396
* The impression listener captures the which key saw what treatment ("on", "off", etc)
385397
* at what time. This log is periodically pushed back to split endpoint.
@@ -671,8 +683,13 @@ public SplitClientConfig build() {
671683
throw new IllegalArgumentException("segmentsRefreshRate must be >= 30: " + _segmentsRefreshRate);
672684
}
673685

674-
if (_impressionsRefreshRate <= 0) {
675-
throw new IllegalArgumentException("impressionsRefreshRate must be > 0: " + _impressionsRefreshRate);
686+
switch (_impressionsMode) {
687+
case OPTIMIZED:
688+
_impressionsRefreshRate = (_impressionsRefreshRate <= 0) ? 300 : Math.max(60, _impressionsRefreshRate);
689+
break;
690+
case DEBUG:
691+
_impressionsRefreshRate = (_impressionsRefreshRate <= 0) ? 60 : _impressionsRefreshRate;
692+
break;
676693
}
677694

678695
if (_eventFlushIntervalInMillis < 1000) {
@@ -734,6 +751,7 @@ public SplitClientConfig build() {
734751
_segmentsRefreshRate,
735752
_impressionsRefreshRate,
736753
_impressionsQueueSize,
754+
_impressionsMode,
737755
_metricsRefreshRate,
738756
_connectionTimeout,
739757
_readTimeout,

client/src/main/java/io/split/client/SplitClientImpl.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77
import io.split.client.dtos.Event;
88
import io.split.client.exceptions.ChangeNumberExceptionWrapper;
99
import io.split.client.impressions.Impression;
10-
import io.split.client.impressions.ImpressionListener;
10+
import io.split.client.impressions.ImpressionsManager;
11+
import io.split.client.impressions.ImpressionsManagerImpl;
1112
import io.split.engine.SDKReadinessGates;
1213
import io.split.engine.experiments.ParsedCondition;
1314
import io.split.engine.experiments.ParsedSplit;
@@ -50,7 +51,7 @@ public final class SplitClientImpl implements SplitClient {
5051

5152
private final SplitFactory _container;
5253
private final SplitFetcher _splitFetcher;
53-
private final ImpressionListener _impressionListener;
54+
private final ImpressionsManager _impressionManager;
5455
private final Metrics _metrics;
5556
private final SplitClientConfig _config;
5657
private final EventClient _eventClient;
@@ -59,22 +60,22 @@ public final class SplitClientImpl implements SplitClient {
5960

6061
public SplitClientImpl(SplitFactory container,
6162
SplitFetcher splitFetcher,
62-
ImpressionListener impressionListener,
63+
ImpressionsManager impressionManager,
6364
Metrics metrics,
6465
EventClient eventClient,
6566
SplitClientConfig config,
6667
SDKReadinessGates gates) {
6768
_container = container;
6869
_splitFetcher = splitFetcher;
69-
_impressionListener = impressionListener;
70+
_impressionManager = impressionManager;
7071
_metrics = metrics;
7172
_eventClient = eventClient;
7273
_config = config;
7374
_gates = gates;
7475

7576
checkNotNull(gates);
7677
checkNotNull(_splitFetcher);
77-
checkNotNull(_impressionListener);
78+
checkNotNull(_impressionManager);
7879
}
7980

8081
@Override
@@ -222,7 +223,7 @@ private SplitResult getTreatmentWithConfigInternal(String label, String matching
222223
private void recordStats(String matchingKey, String bucketingKey, String split, long start, String result,
223224
String operation, String label, Long changeNumber, Map<String, Object> attributes) {
224225
try {
225-
_impressionListener.log(new Impression(matchingKey, bucketingKey, split, result, System.currentTimeMillis(), label, changeNumber, attributes));
226+
_impressionManager.track(new Impression(matchingKey, bucketingKey, split, result, System.currentTimeMillis(), label, changeNumber, attributes));
226227
_metrics.time(operation, System.currentTimeMillis() - start);
227228
} catch (Throwable t) {
228229
_log.error("Exception", t);

client/src/main/java/io/split/client/SplitFactoryImpl.java

Lines changed: 13 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import com.google.common.collect.Multiset;
55
import io.split.client.impressions.AsynchronousImpressionListener;
66
import io.split.client.impressions.ImpressionListener;
7-
import io.split.client.impressions.ImpressionsManager;
7+
import io.split.client.impressions.ImpressionsManagerImpl;
88
import io.split.client.interceptors.AddSplitHeadersFilter;
99
import io.split.client.interceptors.GzipDecoderResponseInterceptor;
1010
import io.split.client.interceptors.GzipEncoderRequestInterceptor;
@@ -51,6 +51,7 @@
5151
import java.util.List;
5252
import java.util.Random;
5353
import java.util.concurrent.TimeUnit;
54+
import java.util.stream.Collectors;
5455

5556
public class SplitFactoryImpl implements SplitFactory {
5657
private static final Logger _log = LoggerFactory.getLogger(SplitFactory.class);
@@ -168,43 +169,21 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
168169

169170
final RefreshableSplitFetcherProvider splitFetcherProvider = new RefreshableSplitFetcherProvider(splitChangeFetcher, splitParser, findPollingPeriod(RANDOM, config.featuresRefreshRate()), gates);
170171

171-
// Impressions
172-
final ImpressionsManager splitImpressionListener = ImpressionsManager.instance(httpclient, config);
173172

174173
List<ImpressionListener> impressionListeners = new ArrayList<>();
175-
impressionListeners.add(splitImpressionListener);
176-
177174
// Setup integrations
178175
if (config.integrationsConfig() != null) {
176+
config.integrationsConfig().getImpressionsListeners(IntegrationsConfig.Execution.ASYNC).stream()
177+
.map(l -> AsynchronousImpressionListener.build(l.listener(), l.queueSize()))
178+
.collect(Collectors.toCollection(() -> impressionListeners));
179179

180-
// asynchronous impressions listeners
181-
List<IntegrationsConfig.ImpressionListenerWithMeta> asyncListeners = config
182-
.integrationsConfig()
183-
.getImpressionsListeners(IntegrationsConfig.Execution.ASYNC);
184-
185-
for (IntegrationsConfig.ImpressionListenerWithMeta listener : asyncListeners) {
186-
AsynchronousImpressionListener wrapper = AsynchronousImpressionListener
187-
.build(listener.listener(), listener.queueSize());
188-
impressionListeners.add(wrapper);
189-
}
190-
191-
// synchronous impressions listeners
192-
List<IntegrationsConfig.ImpressionListenerWithMeta> syncListeners = config
193-
.integrationsConfig()
194-
.getImpressionsListeners(IntegrationsConfig.Execution.SYNC);
195-
for (IntegrationsConfig.ImpressionListenerWithMeta listener: syncListeners) {
196-
impressionListeners.add(listener.listener());
197-
198-
}
180+
config.integrationsConfig().getImpressionsListeners(IntegrationsConfig.Execution.SYNC).stream()
181+
.map(IntegrationsConfig.ImpressionListenerWithMeta::listener)
182+
.collect(Collectors.toCollection(() -> impressionListeners));
199183
}
200184

201-
final ImpressionListener impressionListener;
202-
if (impressionListeners.size() > 1) {
203-
// since there are more than just the default integration, let's federate and add them all.
204-
impressionListener = new ImpressionListener.FederatedImpressionListener(impressionListeners);
205-
} else {
206-
impressionListener = splitImpressionListener;
207-
}
185+
// Impressions
186+
final ImpressionsManagerImpl impressionsManager = ImpressionsManagerImpl.instance(httpclient, config, impressionListeners);
208187

209188
CachedMetrics cachedMetrics = new CachedMetrics(httpMetrics, TimeUnit.SECONDS.toMillis(config.metricsRefreshRate()));
210189
final FireAndForgetMetrics cachedFireAndForgetMetrics = FireAndForgetMetrics.instance(cachedMetrics, 2, 1000);
@@ -223,12 +202,12 @@ public void run() {
223202
_log.info("Successful shutdown of segment fetchers");
224203
splitFetcherProvider.close();
225204
_log.info("Successful shutdown of splits");
205+
impressionsManager.close();
206+
_log.info("Successful shutdown of impressions manager");
226207
uncachedFireAndForget.close();
227208
_log.info("Successful shutdown of metrics 1");
228209
cachedFireAndForgetMetrics.close();
229210
_log.info("Successful shutdown of metrics 2");
230-
impressionListener.close();
231-
_log.info("Successful shutdown of ImpressionListener");
232211
httpclient.close();
233212
_log.info("Successful shutdown of httpclient");
234213
eventClient.close();
@@ -253,7 +232,7 @@ public void run() {
253232

254233
_client = new SplitClientImpl(this,
255234
splitFetcherProvider.getFetcher(),
256-
impressionListener,
235+
impressionsManager,
257236
cachedFireAndForgetMetrics,
258237
eventClient,
259238
config,
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package io.split.client.dtos;
2+
3+
import com.google.gson.annotations.SerializedName;
4+
import io.split.client.impressions.ImpressionCounter;
5+
import java.util.List;
6+
import java.util.Map;
7+
import java.util.Objects;
8+
import java.util.stream.Collectors;
9+
10+
public class ImpressionCount {
11+
12+
private static final String FIELD_PER_FEATURE_COUNTS = "pf";
13+
14+
@SerializedName(FIELD_PER_FEATURE_COUNTS)
15+
public final List<CountPerFeature> perFeature;
16+
17+
public ImpressionCount(List<CountPerFeature> cs) {
18+
perFeature = cs;
19+
}
20+
21+
public static ImpressionCount fromImpressionCounterData(Map<ImpressionCounter.Key, Integer> raw) {
22+
return new ImpressionCount(raw.entrySet().stream()
23+
.map(e -> new CountPerFeature(e.getKey().featureName(), e.getKey().timeFrame(), e.getValue()))
24+
.collect(Collectors.toList()));
25+
}
26+
27+
@Override
28+
public int hashCode() {
29+
return Objects.hash(perFeature);
30+
}
31+
32+
@Override
33+
public boolean equals(Object o) {
34+
if (this == o) return true;
35+
if (o == null || getClass() != o.getClass()) return false;
36+
37+
ImpressionCount c = (ImpressionCount) o;
38+
return Objects.equals(perFeature, c.perFeature);
39+
}
40+
41+
public static class CountPerFeature {
42+
43+
private static final String FIELD_FEATURE = "f";
44+
private static final String FIELD_TIMEFRAME = "m";
45+
private static final String FIELD_COUNT = "rc";
46+
47+
@SerializedName(FIELD_FEATURE)
48+
public final String feature;
49+
50+
@SerializedName(FIELD_TIMEFRAME)
51+
public final long timeframe;
52+
53+
@SerializedName(FIELD_COUNT)
54+
public final int count;
55+
56+
public CountPerFeature(String f, long t, int c) {
57+
feature = f;
58+
timeframe = t;
59+
count = c;
60+
}
61+
62+
@Override
63+
public int hashCode() {
64+
return Objects.hash(feature, timeframe, count);
65+
}
66+
67+
@Override
68+
public boolean equals(Object o) {
69+
if (this == o) return true;
70+
if (o == null || getClass() != o.getClass()) return false;
71+
72+
CountPerFeature c = (CountPerFeature) o;
73+
return Objects.equals(feature, c.feature) && Objects.equals(timeframe, c.timeframe) &&
74+
Objects.equals(count, c.count);
75+
}
76+
}
77+
}

0 commit comments

Comments
 (0)