Skip to content

Commit df8dcc9

Browse files
Merge pull request #226 from splitio/size-event-queue
Size event queue
2 parents 859a697 + 154a615 commit df8dcc9

File tree

3 files changed

+39
-19
lines changed

3 files changed

+39
-19
lines changed

client/src/main/java/io/split/client/EventClientImpl.java

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -54,23 +54,15 @@ public class EventClientImpl implements EventClient {
5454
private final TelemetryRuntimeProducer _telemetryRuntimeProducer;
5555

5656
ThreadFactory eventClientThreadFactory(final String name) {
57-
return new ThreadFactory() {
58-
@Override
59-
public Thread newThread(final Runnable r) {
60-
return new Thread(new Runnable() {
61-
@Override
62-
public void run() {
63-
Thread.currentThread().setPriority(MIN_PRIORITY);
64-
r.run();
65-
}
66-
}, name);
67-
}
68-
};
57+
return r -> new Thread(() -> {
58+
Thread.currentThread().setPriority(MIN_PRIORITY);
59+
r.run();
60+
}, name);
6961
}
7062

7163

7264
public static EventClientImpl create(CloseableHttpClient httpclient, URI eventsRootTarget, int maxQueueSize, long flushIntervalMillis, int waitBeforeShutdown, TelemetryRuntimeProducer telemetryRuntimeProducer) throws URISyntaxException {
73-
return new EventClientImpl(new LinkedBlockingQueue<WrappedEvent>(),
65+
return new EventClientImpl(new LinkedBlockingQueue<>(maxQueueSize),
7466
httpclient,
7567
Utils.appendPath(eventsRootTarget, "api/events/bulk"),
7668
maxQueueSize,
@@ -131,10 +123,14 @@ public boolean track(Event event, int eventSize) {
131123
if (event == null) {
132124
return false;
133125
}
134-
_eventQueue.put(new WrappedEvent(event, eventSize));
135-
_telemetryRuntimeProducer.recordEventStats(EventsDataRecordsEnum.EVENTS_QUEUED, 1);
126+
if(_eventQueue.offer(new WrappedEvent(event, eventSize))) {
127+
_telemetryRuntimeProducer.recordEventStats(EventsDataRecordsEnum.EVENTS_QUEUED, 1);
128+
}
129+
else {
130+
_telemetryRuntimeProducer.recordEventStats(EventsDataRecordsEnum.EVENTS_DROPPED, 1);
131+
}
136132

137-
} catch (ClassCastException | NullPointerException | InterruptedException e) {
133+
} catch (ClassCastException | NullPointerException | IllegalArgumentException e) {
138134
_telemetryRuntimeProducer.recordEventStats(EventsDataRecordsEnum.EVENTS_DROPPED, 1);
139135
_log.warn("Interruption when adding event withed while adding message %s.", event);
140136
return false;
@@ -164,7 +160,7 @@ public void run() {
164160
List<Event> events = new ArrayList<>();
165161
long accumulated = 0;
166162
try {
167-
while (true) {
163+
while (!Thread.currentThread().isInterrupted()) {
168164
WrappedEvent data = _eventQueue.take();
169165
Event event = data.event();
170166
Long size = data.size();

client/src/test/java/io/split/client/EventsClientImplTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
package io.split.client;
22

33
import io.split.client.dtos.Event;
4+
import io.split.telemetry.domain.enums.EventsDataRecordsEnum;
45
import io.split.telemetry.storage.InMemoryTelemetryStorage;
56
import io.split.telemetry.storage.TelemetryStorage;
67
import org.apache.hc.client5.http.classic.methods.HttpUriRequest;
78
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
89
import org.apache.hc.client5.http.impl.classic.HttpClients;
910
import org.hamcrest.Matchers;
1011
import org.junit.Assert;
12+
import org.junit.Before;
1113
import org.junit.Test;
1214
import org.mockito.Mockito;
1315

@@ -74,4 +76,25 @@ public void testEventsFlushedWhenSizeLimitReached() throws URISyntaxException, I
7476
Thread.sleep(2000);
7577
Mockito.verify(client, Mockito.times(1)).execute((HttpUriRequest) Mockito.any());
7678
}
79+
80+
@Test
81+
public void testEventDropped() throws URISyntaxException, NoSuchFieldException, IllegalAccessException, InterruptedException {
82+
TelemetryStorage telemetryStorage = Mockito.mock(InMemoryTelemetryStorage.class);
83+
CloseableHttpClient client = Mockito.mock(CloseableHttpClient.class);
84+
EventClientImpl eventClient = new EventClientImpl(new LinkedBlockingQueue<>(2),
85+
client,
86+
URI.create("https://kubernetesturl.com/split"),
87+
10000, // Long queue so it doesn't flush by # of events
88+
100000, // Long period so it doesn't flush by timeout expiration.
89+
0, telemetryStorage);
90+
eventClient.close();
91+
Thread.sleep(1000);
92+
for (int i = 0; i < 3; ++i) {
93+
Event event = new Event();
94+
eventClient.track(event, 1);
95+
}
96+
97+
Mockito.verify(telemetryStorage, Mockito.times(2)).recordEventStats(EventsDataRecordsEnum.EVENTS_QUEUED, 1);
98+
Mockito.verify(telemetryStorage, Mockito.times(1)).recordEventStats(EventsDataRecordsEnum.EVENTS_DROPPED, 1);
99+
}
77100
}

client/src/test/java/io/split/engine/segments/SegmentSynchronizationTaskImpTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,7 @@ public void testFetchAllAsynchronousAndGetFalse() throws NoSuchFieldException, I
106106
modifiersField.setAccessible(true);
107107
modifiersField.setInt(segmentFetchersForced, segmentFetchersForced.getModifiers() & ~Modifier.FINAL);
108108

109-
segmentFetchersForced.set(fetchers, _segmentFetchers); // 1ms
110-
fetcher1.set(segmentFetcher);
109+
segmentFetchersForced.set(fetchers, _segmentFetchers);
111110
boolean fetch = fetchers.fetchAllSynchronous();
112111
Assert.assertEquals(false, fetch);
113112
}
@@ -117,6 +116,7 @@ public void testFetchAllAsynchronousAndGetTrue() throws NoSuchFieldException, Il
117116
SDKReadinessGates gates = new SDKReadinessGates();
118117
SegmentCache segmentCache = Mockito.mock(SegmentCache.class);
119118

119+
ConcurrentMap<String, SegmentFetcher> _segmentFetchers = Maps.newConcurrentMap();
120120
SegmentChangeFetcher segmentChangeFetcher = Mockito.mock(SegmentChangeFetcher.class);
121121
SegmentFetcherImp segmentFetcher = Mockito.mock(SegmentFetcherImp.class);
122122
final SegmentSynchronizationTaskImp fetchers = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1L, 1, gates, segmentCache, TELEMETRY_STORAGE);
@@ -127,6 +127,7 @@ public void testFetchAllAsynchronousAndGetTrue() throws NoSuchFieldException, Il
127127
Field modifiersField = Field.class.getDeclaredField("modifiers");
128128
modifiersField.setAccessible(true);
129129
modifiersField.setInt(segmentFetchersForced, segmentFetchersForced.getModifiers() & ~Modifier.FINAL);
130+
segmentFetchersForced.set(fetchers, _segmentFetchers);
130131
Mockito.doNothing().when(segmentFetcher).callLoopRun(Mockito.anyBoolean(),Mockito.anyBoolean());
131132
Mockito.when(segmentFetcher.runWhitCacheHeader()).thenReturn(true);
132133
Mockito.when(segmentFetcher.fetchAndUpdate(Mockito.anyBoolean())).thenReturn(true);

0 commit comments

Comments
 (0)