Skip to content

Commit 147f465

Browse files
authored
Merge pull request #152 from splitio/feature/impressionsDedupeV2_observer_counter
update observer and add counter
2 parents e7bf779 + 09522cb commit 147f465

File tree

4 files changed

+212
-7
lines changed

4 files changed

+212
-7
lines changed
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package io.split.client.impressions;
2+
3+
import java.util.HashMap;
4+
import java.util.Objects;
5+
import java.util.concurrent.ConcurrentHashMap;
6+
import java.util.concurrent.atomic.AtomicInteger;
7+
8+
public class ImpressionCounter {
9+
10+
private static final long TIME_INTERVAL_MS = 3600L * 1000L;
11+
12+
private final ConcurrentHashMap<String, AtomicInteger> _counts;
13+
14+
public ImpressionCounter() {
15+
_counts = new ConcurrentHashMap<>();
16+
}
17+
18+
public void inc(String featureName, long timeFrame, int amount) {
19+
String key = makeKey(featureName, timeFrame);
20+
AtomicInteger count = _counts.get(key);
21+
if (Objects.isNull(count)) {
22+
count = new AtomicInteger();
23+
AtomicInteger old = _counts.putIfAbsent(key, count);
24+
if (!Objects.isNull(old)) { // Some other thread won the race, use that AtomicInteger instead
25+
count = old;
26+
}
27+
}
28+
count.addAndGet(amount);
29+
}
30+
31+
public HashMap<String, Integer> popAll() {
32+
HashMap<String, Integer> toReturn = new HashMap<>();
33+
for (String key : _counts.keySet()) {
34+
AtomicInteger curr = _counts.remove(key);
35+
toReturn.put(key ,curr.get());
36+
}
37+
return toReturn;
38+
}
39+
40+
static String makeKey(String featureName, long timeFrame) {
41+
return String.join("::", featureName, String.valueOf(truncateTimeframe(timeFrame)));
42+
}
43+
44+
static long truncateTimeframe(long timestampInMs) {
45+
return timestampInMs - (timestampInMs % TIME_INTERVAL_MS);
46+
}
47+
}

client/src/main/java/io/split/client/impressions/ImpressionObserver.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
import com.google.common.cache.Cache;
44
import com.google.common.cache.CacheBuilder;
55
import io.split.client.dtos.KeyImpression;
6-
import org.apache.http.annotation.NotThreadSafe;
6+
7+
import java.util.Objects;
78

89
/*
910
According to guava's docs (https://guava.dev/releases/18.0/api/docs/com/google/common/annotations/Beta.html),
@@ -13,7 +14,6 @@
1314
*/
1415

1516
@SuppressWarnings("UnstableApiUsage")
16-
@NotThreadSafe
1717
public class ImpressionObserver {
1818

1919
private final Cache<Long, Long> _cache;
@@ -33,6 +33,6 @@ public Long testAndSet(KeyImpression impression) {
3333
Long hash = ImpressionHasher.process(impression);
3434
Long previous = _cache.getIfPresent(hash);
3535
_cache.put(hash, impression.time);
36-
return previous;
36+
return (Objects.isNull(previous)) ? null : Math.min(previous, impression.time);
3737
}
3838
}
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package io.split.client.impressions;
2+
3+
import org.junit.Test;
4+
5+
import java.time.ZoneId;
6+
import java.time.ZonedDateTime;
7+
import java.util.HashMap;
8+
import java.util.Map;
9+
10+
import static org.hamcrest.CoreMatchers.is;
11+
import static org.hamcrest.MatcherAssert.assertThat;
12+
import static org.hamcrest.core.IsEqual.equalTo;
13+
14+
public class ImpressionCounterTest {
15+
16+
private long makeTimestamp(int year, int month, int day, int hour, int minute, int second) {
17+
return ZonedDateTime.of(year, month, day, hour, minute, second, 0, ZoneId.of("UTC")).toInstant().toEpochMilli();
18+
}
19+
20+
@Test
21+
public void testTruncateTimeFrame() {
22+
assertThat(ImpressionCounter.truncateTimeframe(makeTimestamp(2020, 9, 2, 10, 53, 12)),
23+
is(equalTo(makeTimestamp(2020, 9, 2, 10, 0, 0))));
24+
assertThat(ImpressionCounter.truncateTimeframe(makeTimestamp(2020, 9, 2, 10, 0, 0)),
25+
is(equalTo(makeTimestamp(2020, 9, 2, 10, 0, 0))));
26+
assertThat(ImpressionCounter.truncateTimeframe(makeTimestamp(2020, 9, 2, 10, 53, 0 )),
27+
is(equalTo(makeTimestamp(2020, 9, 2, 10, 0, 0))));
28+
assertThat(ImpressionCounter.truncateTimeframe(makeTimestamp(2020, 9, 2, 10, 0, 12)),
29+
is(equalTo(makeTimestamp(2020, 9, 2, 10, 0, 0))));
30+
assertThat(ImpressionCounter.truncateTimeframe(makeTimestamp(1970, 1, 1, 0, 0, 0)),
31+
is(equalTo(makeTimestamp(1970, 1, 1, 0, 0, 0))));
32+
}
33+
34+
@Test
35+
public void testMakeKey() {
36+
long targetTZ = makeTimestamp(2020, 9, 2, 10, 0, 0);
37+
assertThat(ImpressionCounter.makeKey("someFeature", makeTimestamp(2020, 9, 2, 10, 5, 23)),
38+
is(equalTo("someFeature::" + targetTZ)));
39+
assertThat(ImpressionCounter.makeKey("", makeTimestamp(2020, 9, 2, 10, 5, 23)),
40+
is(equalTo("::" + targetTZ)));
41+
assertThat(ImpressionCounter.makeKey(null, makeTimestamp(2020, 9, 2, 10, 5, 23)),
42+
is(equalTo("null::" + targetTZ)));
43+
assertThat(ImpressionCounter.makeKey(null, 0L), is(equalTo("null::0")));
44+
}
45+
46+
@Test
47+
public void testBasicUsage() {
48+
final ImpressionCounter counter = new ImpressionCounter();
49+
final long timestamp = makeTimestamp(2020, 9, 2, 10, 10, 12);
50+
counter.inc("feature1", timestamp, 1);
51+
counter.inc("feature1", timestamp + 1, 1);
52+
counter.inc("feature1", timestamp + 2, 1);
53+
counter.inc("feature2", timestamp + 3, 2);
54+
counter.inc("feature2", timestamp + 4, 2);
55+
Map<String, Integer> counted = counter.popAll();
56+
assertThat(counted.size(), is(equalTo(2)));
57+
assertThat(counted.get(ImpressionCounter.makeKey("feature1", timestamp)), is(equalTo(3)));
58+
assertThat(counted.get(ImpressionCounter.makeKey("feature2", timestamp)), is(equalTo(4)));
59+
assertThat(counter.popAll().size(), is(equalTo(0)));
60+
61+
final long nextHourTimestamp = makeTimestamp(2020, 9, 2, 11, 10, 12);
62+
counter.inc("feature1", timestamp, 1);
63+
counter.inc("feature1", timestamp + 1, 1);
64+
counter.inc("feature1", timestamp + 2, 1);
65+
counter.inc("feature2", timestamp + 3, 2);
66+
counter.inc("feature2", timestamp + 4, 2);
67+
counter.inc("feature1", nextHourTimestamp, 1);
68+
counter.inc("feature1", nextHourTimestamp + 1, 1);
69+
counter.inc("feature1", nextHourTimestamp + 2, 1);
70+
counter.inc("feature2", nextHourTimestamp + 3, 2);
71+
counter.inc("feature2", nextHourTimestamp + 4, 2);
72+
counted = counter.popAll();
73+
assertThat(counted.size(), is(equalTo(4)));
74+
assertThat(counted.get(ImpressionCounter.makeKey("feature1", timestamp)), is(equalTo(3)));
75+
assertThat(counted.get(ImpressionCounter.makeKey("feature2", timestamp)), is(equalTo(4)));
76+
assertThat(counted.get(ImpressionCounter.makeKey("feature1", nextHourTimestamp)), is(equalTo(3)));
77+
assertThat(counted.get(ImpressionCounter.makeKey("feature2", nextHourTimestamp)), is(equalTo(4)));
78+
assertThat(counter.popAll().size(), is(equalTo(0)));
79+
}
80+
81+
@Test
82+
public void manyConcurrentCalls() throws InterruptedException {
83+
final int iterations = 10000000;
84+
final long timestamp = makeTimestamp(2020, 9, 2, 10, 10, 12);
85+
final long nextHourTimestamp = makeTimestamp(2020, 9, 2, 11, 10, 12);
86+
ImpressionCounter counter = new ImpressionCounter();
87+
Thread t1 = new Thread(() -> {
88+
int times = iterations;
89+
while (times-- > 0) {
90+
counter.inc("feature1", timestamp, 1);
91+
counter.inc("feature2", timestamp, 1);
92+
counter.inc("feature1", nextHourTimestamp, 2);
93+
counter.inc("feature2", nextHourTimestamp, 2);
94+
}
95+
});
96+
Thread t2 = new Thread(() -> {
97+
int times = iterations;
98+
while (times-- > 0) {
99+
counter.inc("feature1", timestamp, 2);
100+
counter.inc("feature2", timestamp, 2);
101+
counter.inc("feature1", nextHourTimestamp, 1);
102+
counter.inc("feature2", nextHourTimestamp, 1);
103+
}
104+
});
105+
106+
t1.setDaemon(true); t2.setDaemon(true);
107+
t1.start(); t2.start();
108+
t1.join(); t2.join();
109+
110+
HashMap<String, Integer> counted = counter.popAll();
111+
assertThat(counted.size(), is(equalTo(4)));
112+
assertThat(counted.get(ImpressionCounter.makeKey("feature1", timestamp)), is(equalTo(iterations * 3)));
113+
assertThat(counted.get(ImpressionCounter.makeKey("feature2", timestamp)), is(equalTo(iterations * 3)));
114+
assertThat(counted.get(ImpressionCounter.makeKey("feature1", nextHourTimestamp)), is(equalTo(iterations * 3)));
115+
assertThat(counted.get(ImpressionCounter.makeKey("feature2", nextHourTimestamp)), is(equalTo(iterations * 3)));
116+
}
117+
}

client/src/test/java/io/split/client/impressions/ImpressionObserverTest.java

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,22 @@
22

33
import com.google.common.base.Strings;
44
import io.split.client.dtos.KeyImpression;
5-
// import jdk.nashorn.internal.ir.debug.ObjectSizeCalculator;
65
import org.junit.Test;
76
import org.slf4j.Logger;
87
import org.slf4j.LoggerFactory;
98

10-
import java.lang.reflect.InvocationTargetException;
119
import java.lang.reflect.Method;
1210
import java.util.ArrayList;
1311
import java.util.List;
12+
import java.util.Random;
13+
import java.util.concurrent.ConcurrentLinkedQueue;
1414

15+
import static org.hamcrest.CoreMatchers.nullValue;
1516
import static org.hamcrest.Matchers.lessThan;
16-
import static org.hamcrest.Matchers.is;
17+
import static org.hamcrest.Matchers.lessThanOrEqualTo;
18+
import static org.hamcrest.core.AnyOf.anyOf;
19+
import static org.hamcrest.core.Is.is;
20+
import static org.hamcrest.core.IsEqual.equalTo;
1721
import static org.junit.Assert.assertNull;
1822
import static org.junit.Assert.assertThat;
1923

@@ -24,6 +28,7 @@ public class ImpressionObserverTest {
2428
// We allow the cache implementation to have a 0.01% drift in size when elements change, given that it's internal
2529
// structure/references might vary, and the ObjectSizeCalculator is not 100% accurate
2630
private static final double SIZE_DELTA = 0.01;
31+
private final Random _rand = new Random();
2732

2833
private List<KeyImpression> generateKeyImpressions(long count) {
2934
ArrayList<KeyImpression> imps = new ArrayList<>();
@@ -91,7 +96,43 @@ public void testMemoryUsageStopsWhenCacheIsFull() throws Exception {
9196

9297
long sizeAfterSecondPopulation = (long) getObjectSize.invoke(null, observer);
9398

94-
assertThat((double) (sizeAfterSecondPopulation - sizeAfterInitialPopulation), lessThan (SIZE_DELTA * sizeAfterInitialPopulation));
99+
assertThat((double) (sizeAfterSecondPopulation - sizeAfterInitialPopulation), lessThan(SIZE_DELTA * sizeAfterInitialPopulation));
100+
}
101+
102+
103+
private void caller(ImpressionObserver o, int count, ConcurrentLinkedQueue<KeyImpression> imps) {
95104

105+
while (count-- > 0) {
106+
KeyImpression k = new KeyImpression();
107+
k.keyName = "key_" + _rand.nextInt(100);
108+
k.feature = "feature_" + _rand.nextInt(10);
109+
k.label = "label" + _rand.nextInt(5);
110+
k.treatment = _rand.nextBoolean() ? "on" : "off";
111+
k.changeNumber = 1234567L;
112+
k.time = System.currentTimeMillis();
113+
k.pt = o.testAndSet(k);
114+
imps.offer(k);
115+
}
116+
}
117+
118+
@Test
119+
public void testConcurrencyVsAccuracy() throws InterruptedException {
120+
ImpressionObserver observer = new ImpressionObserver(500000);
121+
ConcurrentLinkedQueue<KeyImpression> imps = new ConcurrentLinkedQueue<>();
122+
Thread t1 = new Thread(() -> caller(observer, 1000000, imps));
123+
Thread t2 = new Thread(() -> caller(observer, 1000000, imps));
124+
Thread t3 = new Thread(() -> caller(observer, 1000000, imps));
125+
Thread t4 = new Thread(() -> caller(observer, 1000000, imps));
126+
Thread t5 = new Thread(() -> caller(observer, 1000000, imps));
127+
128+
// start the 5 threads an wait for them to finish.
129+
t1.setDaemon(true); t2.setDaemon(true); t3.setDaemon(true); t4.setDaemon(true); t5.setDaemon(true);
130+
t1.start(); t2.start(); t3.start(); t4.start(); t5.start();
131+
t1.join(); t2.join(); t3.join(); t4.join(); t5.join();
132+
133+
assertThat(imps.size(), is(equalTo(5000000)));
134+
for (KeyImpression i : imps) {
135+
assertThat(i.pt, is(anyOf(nullValue(), lessThanOrEqualTo(i.time))));
136+
}
96137
}
97138
}

0 commit comments

Comments
 (0)