Skip to content

Commit b52c519

Browse files
Adding BucketCalculator, tests and several minor changes
1 parent 31a8208 commit b52c519

File tree

9 files changed

+288
-39
lines changed

9 files changed

+288
-39
lines changed
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package io.split.telemetry.domain;
2+
3+
import java.util.ArrayList;
4+
import java.util.Arrays;
5+
import java.util.List;
6+
import java.util.concurrent.atomic.AtomicLong;
7+
import java.util.stream.IntStream;
8+
9+
public class AtomicLongArray {
10+
private AtomicLong[] array;
11+
12+
public AtomicLongArray(int size) throws Exception {
13+
if(size == 0) {
14+
throw new Exception("Invalid array size");
15+
}
16+
array = new AtomicLong[size];
17+
IntStream.range(0, array.length).forEach(x -> array[x] = new AtomicLong());
18+
}
19+
20+
public void increment(int index) {
21+
if (index < 0 || index >= array.length) {
22+
throw new ArrayIndexOutOfBoundsException();
23+
}
24+
array[index].getAndIncrement();
25+
}
26+
27+
public List<Long> fetchAndClearAll() {
28+
List<Long> listValues = new ArrayList<>();
29+
for (AtomicLong a: array) {
30+
listValues.add(a.longValue());
31+
}
32+
33+
IntStream.range(0, array.length).forEach(x -> array[x] = new AtomicLong());
34+
35+
return listValues;
36+
}
37+
}

client/src/main/java/io/split/telemetry/storage/InMemoryTelemetryStorage.java

Lines changed: 20 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import java.util.List;
1010
import java.util.concurrent.ConcurrentMap;
1111
import java.util.concurrent.atomic.AtomicLong;
12+
import java.util.stream.Collectors;
1213

1314
public class InMemoryTelemetryStorage implements TelemetryStorage{
1415
public static final int MAX_LATENCY_BUCKET_COUNT = 23;
@@ -54,14 +55,12 @@ public InMemoryTelemetryStorage() throws Exception {
5455

5556
@Override
5657
public long getBURTimeouts() {
57-
long burTimeouts = _factoryCounters.get(FactoryCountersEnum.BUR_TIMEOUTS).get();
58-
return burTimeouts;
58+
return _factoryCounters.get(FactoryCountersEnum.BUR_TIMEOUTS).get();
5959
}
6060

6161
@Override
6262
public long getNonReadyUsages() {
63-
long nonReadyUsages = _factoryCounters.get(FactoryCountersEnum.NON_READY_USAGES).get();
64-
return nonReadyUsages;
63+
return _factoryCounters.get(FactoryCountersEnum.NON_READY_USAGES).get();
6564
}
6665

6766
@Override
@@ -74,13 +73,13 @@ public MethodExceptions popExceptions() throws Exception {
7473
exceptions.set_track(_exceptionsCounters.get(MethodEnum.TRACK).get());
7574

7675
_exceptionsCounters.clear();
77-
initMethodLatencies();
76+
initMethodCounters();
7877

7978
return exceptions;
8079
}
8180

8281
@Override
83-
public MethodLatencies popLatencies() {
82+
public MethodLatencies popLatencies() throws Exception {
8483
MethodLatencies latencies = new MethodLatencies();
8584
latencies.set_treatment(_methodLatencies.get(MethodEnum.TREATMENT).fetchAndClearAll());
8685
latencies.set_treatments(_methodLatencies.get(MethodEnum.TREATMENTS).fetchAndClearAll());
@@ -89,15 +88,14 @@ public MethodLatencies popLatencies() {
8988
latencies.set_track(_methodLatencies.get(MethodEnum.TRACK).fetchAndClearAll());
9089

9190
_methodLatencies.clear();
92-
initMethodCounters();
91+
initMethodLatencies();
9392

9493
return latencies;
9594
}
9695

9796
@Override
9897
public void recordNonReadyUsage() {
9998
_factoryCounters.get(FactoryCountersEnum.NON_READY_USAGES).incrementAndGet();
100-
10199
}
102100

103101
@Override
@@ -106,8 +104,8 @@ public void recordBURTimeout() {
106104
}
107105

108106
@Override
109-
public void recordLatency(String method, int latency) {
110-
int bucket = BucketCalculator.getBucketForLatencyMillis(latency);
107+
public void recordLatency(MethodEnum method, long latency) {
108+
int bucket = BucketCalculator.getBucketForLatency(latency);
111109
_methodLatencies.get(method).increment(bucket);
112110
}
113111

@@ -117,13 +115,13 @@ public void recordException(MethodEnum method) {
117115
}
118116

119117
@Override
120-
public long getImpressionsStats(ImpressionsDataTypeEnum data) {
121-
return _impressionsDataRecords.get(data).get();
118+
public long getImpressionsStats(ImpressionsDataTypeEnum dataType) {
119+
return _impressionsDataRecords.get(dataType).get();
122120
}
123121

124122
@Override
125-
public long getEventStats(EventsDataRecordsEnum type) {
126-
return _eventsDataRecords.get(type).get();
123+
public long getEventStats(EventsDataRecordsEnum dataType) {
124+
return _eventsDataRecords.get(dataType).get();
127125
}
128126

129127
@Override
@@ -195,7 +193,7 @@ public long popTokenRefreshes() {
195193
@Override
196194
public List<StreamingEvent> popStreamingEvents() {
197195
synchronized (_streamingEventsLock) {
198-
List<StreamingEvent> streamingEvents = _streamingEvents;
196+
List<StreamingEvent> streamingEvents = _streamingEvents.stream().collect(Collectors.toList());
199197

200198
_streamingEvents.clear();
201199

@@ -206,7 +204,7 @@ public List<StreamingEvent> popStreamingEvents() {
206204
@Override
207205
public List<String> popTags() {
208206
synchronized (_tagsLock) {
209-
List<String> tags = _tags;
207+
List<String> tags = _tags.stream().collect(Collectors.toList());
210208

211209
_tags.clear();
212210

@@ -228,44 +226,41 @@ public void addTag(String tag) {
228226

229227
@Override
230228
public void recordImpressionStats(ImpressionsDataTypeEnum dataType, long count) {
231-
_impressionsDataRecords.get(dataType).incrementAndGet();
229+
_impressionsDataRecords.get(dataType).addAndGet(count);
232230
}
233231

234232
@Override
235233
public void recordEventStats(EventsDataRecordsEnum dataType, long count) {
236-
_eventsDataRecords.get(dataType).incrementAndGet();
234+
_eventsDataRecords.get(dataType).addAndGet(count);
237235
}
238236

239237
@Override
240238
public void recordSuccessfulSync(LastSynchronizationRecordsEnum resource, long time) {
241239
_lastSynchronizationRecords.replace(resource, new AtomicLong(time));
242-
243240
}
244241

245242
@Override
246243
public void recordSyncError(ResourceEnum resource, int status) {
247244
ConcurrentMap<Long, Long> errors = _httpErrors.get(resource);
248245
errors.putIfAbsent(Long.valueOf(status), 0l);
249-
errors.replace(Long.valueOf(status), errors.get(status) + 1);
246+
errors.replace(Long.valueOf(status), errors.get(Long.valueOf(status)) + 1);
250247
}
251248

252249
@Override
253-
public void recordSyncLatency(String resource, long latency) {
254-
int bucket = BucketCalculator.getBucketForLatencyMillis(latency);
250+
public void recordSyncLatency(HTTPLatenciesEnum resource, long latency) {
251+
int bucket = BucketCalculator.getBucketForLatency(latency);
255252
_httpLatencies.get(resource).increment(bucket);
256253

257254
}
258255

259256
@Override
260257
public void recordAuthRejections() {
261258
_pushCounters.get(PushCountersEnum.AUTH_REJECTIONS).incrementAndGet();
262-
263259
}
264260

265261
@Override
266262
public void recordTokenRefreshes() {
267263
_pushCounters.get(PushCountersEnum.TOKEN_REFRESHES).incrementAndGet();
268-
269264
}
270265

271266
@Override
@@ -308,8 +303,6 @@ private void initHttpErrors() {
308303
_httpErrors.put(ResourceEnum.TOKEN_SYNC, Maps.newConcurrentMap());
309304
}
310305

311-
312-
313306
private void initMethodCounters() {
314307
_exceptionsCounters.put(MethodEnum.TREATMENT, new AtomicLong());
315308
_exceptionsCounters.put(MethodEnum.TREATMENTS, new AtomicLong());
@@ -345,6 +338,7 @@ private void initLastSynchronizationRecords() {
345338
_lastSynchronizationRecords.put(LastSynchronizationRecordsEnum.IMPRESSIONS, new AtomicLong());
346339
_lastSynchronizationRecords.put(LastSynchronizationRecordsEnum.IMPRESSIONS_COUNT, new AtomicLong());
347340
_lastSynchronizationRecords.put(LastSynchronizationRecordsEnum.TOKEN, new AtomicLong());
341+
_lastSynchronizationRecords.put(LastSynchronizationRecordsEnum.TELEMETRY, new AtomicLong());
348342
}
349343

350344
private void initEventDataRecords() {

client/src/main/java/io/split/telemetry/storage/TelemetryEvaluationConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,5 @@
55

66
public interface TelemetryEvaluationConsumer {
77
MethodExceptions popExceptions() throws Exception;
8-
MethodLatencies popLatencies();
8+
MethodLatencies popLatencies() throws Exception;
99
}

client/src/main/java/io/split/telemetry/storage/TelemetryEvaluationProducer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,6 @@
33
import io.split.telemetry.domain.enums.MethodEnum;
44

55
public interface TelemetryEvaluationProducer {
6-
void recordLatency(String method, int latency);
6+
void recordLatency(MethodEnum method, long latency);
77
void recordException(MethodEnum method);
88
}

client/src/main/java/io/split/telemetry/storage/TelemetryRuntimeConsumer.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,4 @@ public interface TelemetryRuntimeConsumer {
2020
List<StreamingEvent> popStreamingEvents();
2121
List<String> popTags();
2222
long getSessionLength();
23-
2423
}

client/src/main/java/io/split/telemetry/storage/TelemetryRuntimeProducer.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,15 @@
11
package io.split.telemetry.storage;
22

33
import io.split.telemetry.domain.StreamingEvent;
4-
import io.split.telemetry.domain.enums.EventsDataRecordsEnum;
5-
import io.split.telemetry.domain.enums.ImpressionsDataTypeEnum;
6-
import io.split.telemetry.domain.enums.LastSynchronizationRecordsEnum;
7-
import io.split.telemetry.domain.enums.ResourceEnum;
4+
import io.split.telemetry.domain.enums.*;
85

96
public interface TelemetryRuntimeProducer {
107
void addTag(String tag);
118
void recordImpressionStats(ImpressionsDataTypeEnum dataType, long count);
129
void recordEventStats(EventsDataRecordsEnum dataType, long count);
1310
void recordSuccessfulSync(LastSynchronizationRecordsEnum resource, long time);
1411
void recordSyncError(ResourceEnum resource, int status);
15-
void recordSyncLatency(String resource, long latency);
12+
void recordSyncLatency(HTTPLatenciesEnum resource, long latency);
1613
void recordAuthRejections();
1714
void recordTokenRefreshes();
1815
void recordStreamingEvents(StreamingEvent streamingEvent);

client/src/main/java/io/split/telemetry/utils/BucketCalculator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ public class BucketCalculator {
4242

4343
static final long MAX_LATENCY = 7481828;
4444

45-
public static int getBucketForLatencyMillis(long latency) {
46-
long micros = latency * 1000;
45+
public static int getBucketForLatency(long latency) {
46+
long micros = latency / 1000; //Convert to milliseconds
4747
if (micros > MAX_LATENCY) {
4848
return BUCKETS.length - 1;
4949
}

0 commit comments

Comments
 (0)