Skip to content

Commit d575244

Browse files
Setting up size to events Queue
1 parent a0c4fd0 commit d575244

File tree

3 files changed

+58
-25
lines changed

3 files changed

+58
-25
lines changed

client/src/main/java/io/split/client/EventClientImpl.java

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -54,23 +54,15 @@ public class EventClientImpl implements EventClient {
5454
private final TelemetryRuntimeProducer _telemetryRuntimeProducer;
5555

5656
ThreadFactory eventClientThreadFactory(final String name) {
57-
return new ThreadFactory() {
58-
@Override
59-
public Thread newThread(final Runnable r) {
60-
return new Thread(new Runnable() {
61-
@Override
62-
public void run() {
63-
Thread.currentThread().setPriority(MIN_PRIORITY);
64-
r.run();
65-
}
66-
}, name);
67-
}
68-
};
57+
return r -> new Thread(() -> {
58+
Thread.currentThread().setPriority(MIN_PRIORITY);
59+
r.run();
60+
}, name);
6961
}
7062

7163

7264
public static EventClientImpl create(CloseableHttpClient httpclient, URI eventsRootTarget, int maxQueueSize, long flushIntervalMillis, int waitBeforeShutdown, TelemetryRuntimeProducer telemetryRuntimeProducer) throws URISyntaxException {
73-
return new EventClientImpl(new LinkedBlockingQueue<WrappedEvent>(),
65+
return new EventClientImpl(new LinkedBlockingQueue<>(maxQueueSize),
7466
httpclient,
7567
Utils.appendPath(eventsRootTarget, "api/events/bulk"),
7668
maxQueueSize,
@@ -131,10 +123,14 @@ public boolean track(Event event, int eventSize) {
131123
if (event == null) {
132124
return false;
133125
}
134-
_eventQueue.put(new WrappedEvent(event, eventSize));
135-
_telemetryRuntimeProducer.recordEventStats(EventsDataRecordsEnum.EVENTS_QUEUED, 1);
126+
if(_eventQueue.offer(new WrappedEvent(event, eventSize))) {
127+
_telemetryRuntimeProducer.recordEventStats(EventsDataRecordsEnum.EVENTS_QUEUED, 1);
128+
}
129+
else {
130+
_telemetryRuntimeProducer.recordEventStats(EventsDataRecordsEnum.EVENTS_DROPPED, 1);
131+
}
136132

137-
} catch (ClassCastException | NullPointerException | InterruptedException e) {
133+
} catch (ClassCastException | NullPointerException | IllegalArgumentException e) {
138134
_telemetryRuntimeProducer.recordEventStats(EventsDataRecordsEnum.EVENTS_DROPPED, 1);
139135
_log.warn("Interruption when adding event withed while adding message %s.", event);
140136
return false;
@@ -164,7 +160,7 @@ public void run() {
164160
List<Event> events = new ArrayList<>();
165161
long accumulated = 0;
166162
try {
167-
while (true) {
163+
while (!Thread.currentThread().isInterrupted()) {
168164
WrappedEvent data = _eventQueue.take();
169165
Event event = data.event();
170166
Long size = data.size();

client/src/test/java/io/split/client/EventsClientImplTest.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package io.split.client;
22

33
import io.split.client.dtos.Event;
4+
import io.split.engine.segments.SegmentSynchronizationTaskImp;
5+
import io.split.telemetry.domain.enums.EventsDataRecordsEnum;
46
import io.split.telemetry.storage.InMemoryTelemetryStorage;
57
import io.split.telemetry.storage.TelemetryStorage;
68
import org.apache.hc.client5.http.classic.methods.HttpUriRequest;
@@ -12,8 +14,12 @@
1214
import org.mockito.Mockito;
1315

1416
import java.io.IOException;
17+
import java.lang.reflect.Field;
18+
import java.lang.reflect.Modifier;
1519
import java.net.URI;
1620
import java.net.URISyntaxException;
21+
import java.util.concurrent.ExecutorService;
22+
import java.util.concurrent.Executors;
1723
import java.util.concurrent.LinkedBlockingQueue;
1824

1925
public class EventsClientImplTest {
@@ -74,4 +80,34 @@ public void testEventsFlushedWhenSizeLimitReached() throws URISyntaxException, I
7480
Thread.sleep(2000);
7581
Mockito.verify(client, Mockito.times(1)).execute((HttpUriRequest) Mockito.any());
7682
}
83+
84+
@Test
85+
public void testEventDropped() throws URISyntaxException, NoSuchFieldException, IllegalAccessException {
86+
CloseableHttpClient client = Mockito.mock(CloseableHttpClient.class);
87+
Field eventDroppedConsumer = EventClientImpl.class.getDeclaredField("_consumerExecutor");
88+
ExecutorService executorService = Executors.newFixedThreadPool(2);
89+
eventDroppedConsumer.setAccessible(true);
90+
91+
Field modifiersField = Field.class.getDeclaredField("modifiers");
92+
modifiersField.setAccessible(true);
93+
modifiersField.setInt(eventDroppedConsumer, eventDroppedConsumer.getModifiers() & ~Modifier.FINAL);
94+
executorService.execute(() -> {
95+
96+
});
97+
EventClientImpl eventClient = new EventClientImpl(new LinkedBlockingQueue<>(2),
98+
client,
99+
URI.create("https://kubernetesturl.com/split"),
100+
10000, // Long queue so it doesn't flush by # of events
101+
100000, // Long period so it doesn't flush by timeout expiration.
102+
0, TELEMETRY_STORAGE);
103+
eventClient.close();
104+
eventDroppedConsumer.set(eventClient, executorService); // 1ms
105+
for (int i = 0; i < 3; ++i) {
106+
Event event = new Event();
107+
eventClient.track(event, 1);
108+
}
109+
110+
Mockito.verify(TELEMETRY_STORAGE, Mockito.times(2)).recordEventStats(EventsDataRecordsEnum.EVENTS_QUEUED, 1);
111+
Mockito.verify(TELEMETRY_STORAGE, Mockito.times(1)).recordEventStats(EventsDataRecordsEnum.EVENTS_DROPPED, 1);
112+
}
77113
}

client/src/test/java/io/split/engine/segments/SegmentSynchronizationTaskImpTest.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -100,14 +100,13 @@ public void testFetchAllAsynchronousAndGetFalse() throws NoSuchFieldException, I
100100
Mockito.doNothing().when(segmentFetcher).callLoopRun(Mockito.anyBoolean(),Mockito.anyBoolean());
101101

102102
// Before executing, we'll update the map of segmentFecthers via reflection.
103-
Field backoffBase = SegmentSynchronizationTaskImp.class.getDeclaredField("_segmentFetchers");
104-
backoffBase.setAccessible(true);
103+
Field segmentFetchersForced = SegmentSynchronizationTaskImp.class.getDeclaredField("_segmentFetchers");
104+
segmentFetchersForced.setAccessible(true);
105105
Field modifiersField = Field.class.getDeclaredField("modifiers");
106106
modifiersField.setAccessible(true);
107-
modifiersField.setInt(backoffBase, backoffBase.getModifiers() & ~Modifier.FINAL);
107+
modifiersField.setInt(segmentFetchersForced, segmentFetchersForced.getModifiers() & ~Modifier.FINAL);
108108

109-
backoffBase.set(fetchers, _segmentFetchers); // 1ms
110-
fetcher1.set(segmentFetcher);
109+
segmentFetchersForced.set(fetchers, _segmentFetchers); // 1ms
111110
boolean fetch = fetchers.fetchAllSynchronous();
112111
Assert.assertEquals(false, fetch);
113112
}
@@ -117,16 +116,18 @@ public void testFetchAllAsynchronousAndGetTrue() throws NoSuchFieldException, Il
117116
SDKReadinessGates gates = new SDKReadinessGates();
118117
SegmentCache segmentCache = Mockito.mock(SegmentCache.class);
119118

119+
ConcurrentMap<String, SegmentFetcher> _segmentFetchers = Maps.newConcurrentMap();
120120
SegmentChangeFetcher segmentChangeFetcher = Mockito.mock(SegmentChangeFetcher.class);
121121
SegmentFetcherImp segmentFetcher = Mockito.mock(SegmentFetcherImp.class);
122122
final SegmentSynchronizationTaskImp fetchers = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1L, 1, gates, segmentCache, TELEMETRY_STORAGE);
123123

124124
// Before executing, we'll update the map of segmentFecthers via reflection.
125-
Field backoffBase = SegmentSynchronizationTaskImp.class.getDeclaredField("_segmentFetchers");
126-
backoffBase.setAccessible(true);
125+
Field segmentFetchersForced = SegmentSynchronizationTaskImp.class.getDeclaredField("_segmentFetchers");
126+
segmentFetchersForced.setAccessible(true);
127127
Field modifiersField = Field.class.getDeclaredField("modifiers");
128128
modifiersField.setAccessible(true);
129-
modifiersField.setInt(backoffBase, backoffBase.getModifiers() & ~Modifier.FINAL);
129+
modifiersField.setInt(segmentFetchersForced, segmentFetchersForced.getModifiers() & ~Modifier.FINAL);
130+
segmentFetchersForced.set(fetchers, _segmentFetchers); // 1ms
130131
Mockito.doNothing().when(segmentFetcher).callLoopRun(Mockito.anyBoolean(),Mockito.anyBoolean());
131132
Mockito.when(segmentFetcher.runWhitCacheHeader()).thenReturn(true);
132133
Mockito.when(segmentFetcher.fetchAndUpdate(Mockito.anyBoolean())).thenReturn(true);

0 commit comments

Comments
 (0)