Skip to content

Commit d006f2c

Browse files
committed
update observer and add counter
1 parent e7bf779 commit d006f2c

File tree

4 files changed

+203
-5
lines changed

4 files changed

+203
-5
lines changed
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package io.split.client.impressions;
2+
3+
import java.util.HashMap;
4+
import java.util.Map;
5+
import java.util.Objects;
6+
import java.util.concurrent.ConcurrentHashMap;
7+
import java.util.concurrent.atomic.AtomicInteger;
8+
9+
public class ImpressionCounter {
10+
11+
private static final long TIME_INTERVAL_MS = 3600 * 1000;
12+
13+
private final ConcurrentHashMap<String, AtomicInteger> _counts;
14+
15+
public ImpressionCounter() {
16+
_counts = new ConcurrentHashMap<>();
17+
}
18+
19+
public void inc(String featureName, long timeFrame, int amount) {
20+
String key = makeKey(featureName, timeFrame);
21+
AtomicInteger count = _counts.get(key);
22+
if (Objects.isNull(count)) {
23+
count = new AtomicInteger();
24+
AtomicInteger old = _counts.putIfAbsent(key, count);
25+
if (!Objects.isNull(old)) { // Some other thread won the race, use that AtomicInteger instead
26+
count = old;
27+
}
28+
}
29+
count.addAndGet(amount);
30+
}
31+
32+
public HashMap<String, Integer> popAll() {
33+
HashMap<String, Integer> toReturn = new HashMap<>();
34+
for (String key : _counts.keySet()) {
35+
AtomicInteger curr = _counts.remove(key);
36+
toReturn.put(key ,curr.get());
37+
}
38+
return toReturn;
39+
}
40+
41+
static String makeKey(String featureName, long timeFrame) {
42+
return String.join("::", featureName, String.valueOf(truncateTimeframe(timeFrame)));
43+
}
44+
45+
static long truncateTimeframe(long timestampInMs) {
46+
return timestampInMs - (timestampInMs % TIME_INTERVAL_MS);
47+
}
48+
}

client/src/main/java/io/split/client/impressions/ImpressionObserver.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
import com.google.common.cache.Cache;
44
import com.google.common.cache.CacheBuilder;
55
import io.split.client.dtos.KeyImpression;
6-
import org.apache.http.annotation.NotThreadSafe;
6+
7+
import java.util.Objects;
78

89
/*
910
According to guava's docs (https://guava.dev/releases/18.0/api/docs/com/google/common/annotations/Beta.html),
@@ -13,7 +14,6 @@
1314
*/
1415

1516
@SuppressWarnings("UnstableApiUsage")
16-
@NotThreadSafe
1717
public class ImpressionObserver {
1818

1919
private final Cache<Long, Long> _cache;
@@ -33,6 +33,6 @@ public Long testAndSet(KeyImpression impression) {
3333
Long hash = ImpressionHasher.process(impression);
3434
Long previous = _cache.getIfPresent(hash);
3535
_cache.put(hash, impression.time);
36-
return previous;
36+
return (Objects.isNull(previous)) ? null : Math.min(previous, impression.time);
3737
}
3838
}
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
package io.split.client.impressions;
2+
3+
import org.junit.Test;
4+
5+
import java.util.Calendar;
6+
import java.util.Date;
7+
import java.util.HashMap;
8+
import java.util.Map;
9+
10+
import static org.hamcrest.CoreMatchers.is;
11+
import static org.hamcrest.MatcherAssert.assertThat;
12+
import static org.hamcrest.core.IsEqual.equalTo;
13+
14+
public class ImpressionCounterTest {
15+
16+
@Test
17+
public void testTruncateTimeFrame() {
18+
assertThat(ImpressionCounter.truncateTimeframe(new Date(2020, Calendar.SEPTEMBER, 2, 10, 53, 12).getTime()),
19+
is(equalTo(new Date(2020, Calendar.SEPTEMBER, 2, 10, 0, 0).getTime())));
20+
assertThat(ImpressionCounter.truncateTimeframe(new Date(2020, Calendar.SEPTEMBER, 2, 10, 00, 00).getTime()),
21+
is(equalTo(new Date(2020, Calendar.SEPTEMBER, 2, 10, 0, 0).getTime())));
22+
assertThat(ImpressionCounter.truncateTimeframe(new Date(2020, Calendar.SEPTEMBER, 2, 10, 53, 00).getTime()),
23+
is(equalTo(new Date(2020, Calendar.SEPTEMBER, 2, 10, 0, 0).getTime())));
24+
assertThat(ImpressionCounter.truncateTimeframe(new Date(2020, Calendar.SEPTEMBER, 2, 10, 00, 12).getTime()),
25+
is(equalTo(new Date(2020, Calendar.SEPTEMBER, 2, 10, 0, 0).getTime())));
26+
assertThat(ImpressionCounter.truncateTimeframe(new Date(1970, Calendar.JANUARY, 0, 0, 0, 0).getTime()),
27+
is(equalTo(new Date(1970, Calendar.JANUARY, 0, 0, 0, 0).getTime())));
28+
}
29+
30+
@Test
31+
public void testMakeKey() {
32+
assertThat(ImpressionCounter.makeKey("someFeature", new Date(2020, Calendar.SEPTEMBER, 2, 10, 5, 23).getTime()),
33+
is(equalTo("someFeature::61557195600000")));
34+
assertThat(ImpressionCounter.makeKey("", new Date(2020, Calendar.SEPTEMBER, 2, 10, 5, 23).getTime()),
35+
is(equalTo("::61557195600000")));
36+
assertThat(ImpressionCounter.makeKey(null, new Date(2020, Calendar.SEPTEMBER, 2, 10, 5, 23).getTime()),
37+
is(equalTo("null::61557195600000")));
38+
assertThat(ImpressionCounter.makeKey(null, 0L), is(equalTo("null::0")));
39+
}
40+
41+
@Test
42+
public void testBasicUsage() {
43+
final ImpressionCounter counter = new ImpressionCounter();
44+
final long timestamp = new Date(2020, Calendar.SEPTEMBER, 2, 10, 10, 12).getTime();
45+
counter.inc("feature1", timestamp, 1);
46+
counter.inc("feature1", timestamp + 1, 1);
47+
counter.inc("feature1", timestamp + 2, 1);
48+
counter.inc("feature2", timestamp + 3, 2);
49+
counter.inc("feature2", timestamp + 4, 2);
50+
Map<String, Integer> counted = counter.popAll();
51+
assertThat(counted.size(), is(equalTo(2)));
52+
assertThat(counted.get(ImpressionCounter.makeKey("feature1", timestamp)), is(equalTo(3)));
53+
assertThat(counted.get(ImpressionCounter.makeKey("feature2", timestamp)), is(equalTo(4)));
54+
assertThat(counter.popAll().size(), is(equalTo(0)));
55+
56+
final long nextHourTimestamp = new Date(2020, Calendar.SEPTEMBER, 2, 11, 10, 12).getTime();
57+
counter.inc("feature1", timestamp, 1);
58+
counter.inc("feature1", timestamp + 1, 1);
59+
counter.inc("feature1", timestamp + 2, 1);
60+
counter.inc("feature2", timestamp + 3, 2);
61+
counter.inc("feature2", timestamp + 4, 2);
62+
counter.inc("feature1", nextHourTimestamp, 1);
63+
counter.inc("feature1", nextHourTimestamp + 1, 1);
64+
counter.inc("feature1", nextHourTimestamp + 2, 1);
65+
counter.inc("feature2", nextHourTimestamp + 3, 2);
66+
counter.inc("feature2", nextHourTimestamp + 4, 2);
67+
counted = counter.popAll();
68+
assertThat(counted.size(), is(equalTo(4)));
69+
assertThat(counted.get(ImpressionCounter.makeKey("feature1", timestamp)), is(equalTo(3)));
70+
assertThat(counted.get(ImpressionCounter.makeKey("feature2", timestamp)), is(equalTo(4)));
71+
assertThat(counted.get(ImpressionCounter.makeKey("feature1", nextHourTimestamp)), is(equalTo(3)));
72+
assertThat(counted.get(ImpressionCounter.makeKey("feature2", nextHourTimestamp)), is(equalTo(4)));
73+
assertThat(counter.popAll().size(), is(equalTo(0)));
74+
}
75+
76+
@Test
77+
public void manyConcurrentCalls() throws InterruptedException {
78+
final int iterations = 10000000;
79+
final long timestamp = new Date(2020, Calendar.SEPTEMBER, 2, 10, 10, 12).getTime();
80+
final long nextHourTimestamp = new Date(2020, Calendar.SEPTEMBER, 2, 11, 10, 12).getTime();
81+
ImpressionCounter counter = new ImpressionCounter();
82+
Thread t1 = new Thread(() -> {
83+
int times = iterations;
84+
while (times-- > 0) {
85+
counter.inc("feature1", timestamp, 1);
86+
counter.inc("feature2", timestamp, 1);
87+
counter.inc("feature1", nextHourTimestamp, 2);
88+
counter.inc("feature2", nextHourTimestamp, 2);
89+
}
90+
});
91+
Thread t2 = new Thread(() -> {
92+
int times = iterations;
93+
while (times-- > 0) {
94+
counter.inc("feature1", timestamp, 2);
95+
counter.inc("feature2", timestamp, 2);
96+
counter.inc("feature1", nextHourTimestamp, 1);
97+
counter.inc("feature2", nextHourTimestamp, 1);
98+
}
99+
});
100+
101+
t1.setDaemon(true); t2.setDaemon(true);
102+
t1.start(); t2.start();
103+
t1.join(); t2.join();
104+
105+
HashMap<String, Integer> counted = counter.popAll();
106+
assertThat(counted.size(), is(equalTo(4)));
107+
assertThat(counted.get(ImpressionCounter.makeKey("feature1", timestamp)), is(equalTo(iterations * 3)));
108+
assertThat(counted.get(ImpressionCounter.makeKey("feature2", timestamp)), is(equalTo(iterations * 3)));
109+
assertThat(counted.get(ImpressionCounter.makeKey("feature1", nextHourTimestamp)), is(equalTo(iterations * 3)));
110+
assertThat(counted.get(ImpressionCounter.makeKey("feature2", nextHourTimestamp)), is(equalTo(iterations * 3)));
111+
}
112+
}

client/src/test/java/io/split/client/impressions/ImpressionObserverTest.java

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,13 @@
1111
import java.lang.reflect.Method;
1212
import java.util.ArrayList;
1313
import java.util.List;
14+
import java.util.Random;
15+
import java.util.concurrent.ConcurrentLinkedQueue;
1416

15-
import static org.hamcrest.Matchers.lessThan;
16-
import static org.hamcrest.Matchers.is;
17+
import static org.hamcrest.Matchers.*;
1718
import static org.junit.Assert.assertNull;
1819
import static org.junit.Assert.assertThat;
20+
import static org.mockito.AdditionalMatchers.or;
1921

2022
public class ImpressionObserverTest {
2123

@@ -92,6 +94,42 @@ public void testMemoryUsageStopsWhenCacheIsFull() throws Exception {
9294
long sizeAfterSecondPopulation = (long) getObjectSize.invoke(null, observer);
9395

9496
assertThat((double) (sizeAfterSecondPopulation - sizeAfterInitialPopulation), lessThan (SIZE_DELTA * sizeAfterInitialPopulation));
97+
}
98+
99+
100+
private void caller(ImpressionObserver o, int count, ConcurrentLinkedQueue<KeyImpression> imps) {
101+
Random rand = new Random();
102+
while (count-- > 0) {
103+
KeyImpression k = new KeyImpression();
104+
k.keyName = "key_" + rand.nextInt(100);
105+
k.feature = "feature_" + rand.nextInt(10);
106+
k.label = "label" + rand.nextInt(5);
107+
k.treatment = rand.nextBoolean() ? "on" : "off";
108+
k.changeNumber = 1234567L;
109+
k.time = System.currentTimeMillis();
110+
k.pt = o.testAndSet(k);
111+
imps.offer(k);
112+
}
113+
}
95114

115+
@Test
116+
public void testConcurrencyVsAccuracy() throws InterruptedException {
117+
ImpressionObserver observer = new ImpressionObserver(500000);
118+
ConcurrentLinkedQueue<KeyImpression> imps = new ConcurrentLinkedQueue<>();
119+
Thread t1 = new Thread(() -> caller(observer, 1000000, imps));
120+
Thread t2 = new Thread(() -> caller(observer, 1000000, imps));
121+
Thread t3 = new Thread(() -> caller(observer, 1000000, imps));
122+
Thread t4 = new Thread(() -> caller(observer, 1000000, imps));
123+
Thread t5 = new Thread(() -> caller(observer, 1000000, imps));
124+
125+
// start the 5 threads an wait for them to finish.
126+
t1.setDaemon(true); t2.setDaemon(true); t3.setDaemon(true); t4.setDaemon(true); t5.setDaemon(true);
127+
t1.start(); t2.start(); t3.start(); t4.start(); t5.start();
128+
t1.join(); t2.join(); t3.join(); t4.join(); t5.join();
129+
130+
assertThat(imps.size(), is(equalTo(5000000)));
131+
for (KeyImpression i : imps) {
132+
assertThat(i.pt, is(anyOf(nullValue(), lessThanOrEqualTo(i.time))));
133+
}
96134
}
97135
}

0 commit comments

Comments
 (0)