Skip to content

Commit cc9fdec

Browse files
authored
Merge pull request #151 from splitio/imp/removeJersey
remove jersey from actual SDK. only keep as test dependency
2 parents e7bf779 + 739ef88 commit cc9fdec

19 files changed

+439
-248
lines changed

client/CHANGES.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
CHANGES
22

3+
4.0.1 (Sep 4, 2020)
4+
- Remove jersey. Use custom SSE implementation
5+
- Bumped guava version to 29
6+
37
4.0.0 (Aug 19, 2020)
48
- Deprecated Java 7 support. Java 8 is the minimum supported version for this and future releases.
59
- Added support for the new Split streaming architecture. When enabled (default), the SDK will not poll for updates but instead receive notifications every time there's a change in your environments, allowing to process those much quicker. If disabled or in the event of an issue, the SDK will fallback to the known polling mechanism to provide a seamless experience.

client/pom.xml

Lines changed: 15 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<parent>
66
<groupId>io.split.client</groupId>
77
<artifactId>java-client-parent</artifactId>
8-
<version>4.0.0</version>
8+
<version>4.0.1</version>
99
</parent>
1010
<artifactId>java-client</artifactId>
1111
<packaging>jar</packaging>
@@ -47,12 +47,6 @@
4747
<include>org.apache.httpcomponents:httpcore</include>
4848
<include>com.google.code.gson:gson</include>
4949
<include>com.google.guava:guava</include>
50-
<include>org.yaml.snakeyaml.*</include>
51-
<include>org.glassfish.*</include>
52-
<include>jakarta.*</include>
53-
<include>org.javassist:javassist</include>
54-
<include>com.sun.activation:jakarta.activation</include>
55-
<include>org.jvnet.*</include>
5650
</includes>
5751
</artifactSet>
5852
<transformers>
@@ -68,26 +62,6 @@
6862
<pattern>com.google</pattern>
6963
<shadedPattern>split.com.google</shadedPattern>
7064
</relocation>
71-
<relocation>
72-
<pattern>org.glassfish</pattern>
73-
<shadedPattern>split.org.glassfish</shadedPattern>
74-
</relocation>
75-
<relocation>
76-
<pattern>jakarta</pattern>
77-
<shadedPattern>split.jakarta</shadedPattern>
78-
</relocation>
79-
<relocation>
80-
<pattern>org.javassist</pattern>
81-
<shadedPattern>split.org.javassist</shadedPattern>
82-
</relocation>
83-
<relocation>
84-
<pattern>com.sun.activation</pattern>
85-
<shadedPattern>split.com.sun.activation</shadedPattern>
86-
</relocation>
87-
<relocation>
88-
<pattern>org.jvnet</pattern>
89-
<shadedPattern>split.org.jvnet</shadedPattern>
90-
</relocation>
9165
</relocations>
9266
<filters>
9367
<filter>
@@ -96,9 +70,7 @@
9670
<exclude>META-INF/license/**</exclude>
9771
<exclude>META-INF/*</exclude>
9872
<exclude>META-INF/maven/**</exclude>
99-
<!-- Disabled to allow Jersey implementation mappings/injector to work
100-
<exclude>META-INF/services/**</exclude>
101-
-->
73+
<exclude>META-INF/services/**</exclude>
10274
<exclude>LICENSE</exclude>
10375
<exclude>NOTICE</exclude>
10476
<exclude>/*.txt</exclude>
@@ -149,7 +121,7 @@
149121
<dependency>
150122
<groupId>com.google.guava</groupId>
151123
<artifactId>guava</artifactId>
152-
<version>18.0</version>
124+
<version>29.0-jre</version>
153125
</dependency>
154126
<dependency>
155127
<groupId>org.slf4j</groupId>
@@ -170,16 +142,6 @@
170142
<artifactId>snakeyaml</artifactId>
171143
<version>1.21</version>
172144
</dependency>
173-
<dependency>
174-
<groupId>org.glassfish.jersey.media</groupId>
175-
<artifactId>jersey-media-sse</artifactId>
176-
<version>2.31</version>
177-
</dependency>
178-
<dependency>
179-
<groupId>org.glassfish.jersey.inject</groupId>
180-
<artifactId>jersey-hk2</artifactId>
181-
<version>2.31</version>
182-
</dependency>
183145

184146
<!-- Test deps -->
185147
<dependency>
@@ -211,6 +173,18 @@
211173
<version>1.7.21</version>
212174
<scope>test</scope>
213175
</dependency>
176+
<dependency>
177+
<groupId>org.glassfish.jersey.media</groupId>
178+
<artifactId>jersey-media-sse</artifactId>
179+
<version>2.31</version>
180+
<scope>test</scope>
181+
</dependency>
182+
<dependency>
183+
<groupId>org.glassfish.jersey.inject</groupId>
184+
<artifactId>jersey-hk2</artifactId>
185+
<version>2.31</version>
186+
<scope>test</scope>
187+
</dependency>
214188
<dependency>
215189
<groupId>org.glassfish.jersey.containers</groupId>
216190
<artifactId>jersey-container-grizzly2-http</artifactId>

client/src/main/java/io/split/client/SplitClientConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ public static final class Builder {
275275
private int _authRetryBackoffBase = 1;
276276
private int _streamingReconnectBackoffBase = 1;
277277
private String _authServiceURL = "https://auth.split.io/api/auth";
278-
private String _streamingServiceURL = "https://streaming.split.io/event-stream";
278+
private String _streamingServiceURL = "https://streaming.split.io/sse";
279279

280280
public Builder() {
281281
}

client/src/main/java/io/split/engine/sse/AuthApiClientImp.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,6 @@ private AuthenticationResponse getSuccessResponse(String jsonContent) {
7272
expiration = response.getExpiration();
7373
}
7474

75-
return new AuthenticationResponse(response.isPushEnabled(), response.getToken(), channels, 3000/*expiration*/, false);
75+
return new AuthenticationResponse(response.isPushEnabled(), response.getToken(), channels, expiration, false);
7676
}
7777
}
Lines changed: 20 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package io.split.engine.sse;
22

33
import com.google.common.annotations.VisibleForTesting;
4+
import io.split.engine.sse.client.RawEvent;
5+
import io.split.engine.sse.client.SSEClient;
46
import io.split.engine.sse.dtos.SegmentQueueDto;
57
import io.split.engine.sse.exceptions.EventParsingException;
68
import io.split.engine.sse.workers.SplitsWorker;
@@ -9,39 +11,34 @@
911
import org.slf4j.Logger;
1012
import org.slf4j.LoggerFactory;
1113

12-
import javax.ws.rs.client.Client;
13-
import javax.ws.rs.client.ClientBuilder;
14-
import javax.ws.rs.client.WebTarget;
15-
import javax.ws.rs.sse.InboundSseEvent;
14+
import java.net.URI;
1615
import java.net.URISyntaxException;
17-
import java.util.concurrent.TimeUnit;
1816

1917
import static com.google.common.base.Preconditions.checkNotNull;
2018

2119
public class EventSourceClientImp implements EventSourceClient {
2220
private static final Logger _log = LoggerFactory.getLogger(EventSourceClient.class);
2321

2422
private final String _baseStreamingUrl;
25-
private final Client _client;
2623
private final NotificationParser _notificationParser;
2724
private final NotificationProcessor _notificationProcessor;
28-
private final SplitSseEventSource _splitSseEventSource;
25+
private final SSEClient _sseClient;
2926
private final PushStatusTracker _pushStatusTracker;
3027

3128
@VisibleForTesting
3229
/* package private */ EventSourceClientImp(String baseStreamingUrl,
3330
NotificationParser notificationParser,
3431
NotificationProcessor notificationProcessor,
35-
Client client,
3632
PushStatusTracker pushStatusTracker) {
3733
_baseStreamingUrl = checkNotNull(baseStreamingUrl);
3834
_notificationParser = checkNotNull(notificationParser);
3935
_notificationProcessor = checkNotNull(notificationProcessor);
40-
_client = checkNotNull(client);
41-
_splitSseEventSource = new SplitSseEventSource(
42-
inboundEvent -> { onMessage(inboundEvent); return null; },
43-
status -> { onSSeStatusChange(status); return null; });
4436
_pushStatusTracker = pushStatusTracker;
37+
38+
_sseClient = new SSEClient(
39+
inboundEvent -> { onMessage(inboundEvent); return null; },
40+
status -> { _pushStatusTracker.handleSseStatus(status); return null; });
41+
4542
}
4643

4744
public static EventSourceClientImp build(String baseStreamingUrl,
@@ -52,18 +49,17 @@ public static EventSourceClientImp build(String baseStreamingUrl,
5249
return new EventSourceClientImp(baseStreamingUrl,
5350
new NotificationParserImp(),
5451
NotificationProcessorImp.build(splitsWorker, segmentWorker, pushStatusTracker),
55-
ClientBuilder.newBuilder().readTimeout(70, TimeUnit.SECONDS).build(),
5652
pushStatusTracker);
5753
}
5854

5955
@Override
6056
public boolean start(String channelList, String token) {
61-
if (_splitSseEventSource.isOpen()) {
62-
_splitSseEventSource.close();
57+
if (_sseClient.isOpen()) {
58+
_sseClient.close();
6359
}
6460

6561
try {
66-
return _splitSseEventSource.open(buildTarget(channelList, token));
62+
return _sseClient.open(buildUri(channelList, token));
6763
} catch (URISyntaxException e) {
6864
_log.error("Error building Streaming URI: " + e.getMessage());
6965
return false;
@@ -72,25 +68,25 @@ public boolean start(String channelList, String token) {
7268

7369
@Override
7470
public void stop() {
75-
if (!_splitSseEventSource.isOpen()) {
71+
if (!_sseClient.isOpen()) {
7672
_log.warn("Event Source Client is closed.");
7773
return;
7874
}
79-
_splitSseEventSource.close();
75+
_sseClient.close();
8076
}
8177

82-
private WebTarget buildTarget(String channelList, String token) throws URISyntaxException {
83-
return _client.target(new URIBuilder(_baseStreamingUrl)
78+
private URI buildUri(String channelList, String token) throws URISyntaxException {
79+
return new URIBuilder(_baseStreamingUrl)
8480
.addParameter("channels", channelList)
8581
.addParameter("v", "1.1")
8682
.addParameter("accessToken", token)
87-
.build());
83+
.build();
8884
}
8985

90-
private void onMessage(InboundSseEvent event) {
86+
private void onMessage(RawEvent event) {
9187
try {
92-
String type = event.getName();
93-
String payload = event.readData();
88+
String type = event.event();
89+
String payload = event.data();
9490
if (payload.length() > 0) {
9591
_log.debug(String.format("Payload received: %s", payload));
9692
switch (type) {
@@ -110,8 +106,4 @@ private void onMessage(InboundSseEvent event) {
110106
_log.warn(String.format("Error onMessage: %s", e.getMessage()));
111107
}
112108
}
113-
114-
private void onSSeStatusChange(SseStatus status) {
115-
_pushStatusTracker.handleSseStatus(status);
116-
}
117109
}
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.split.engine.sse;
22

3+
import io.split.engine.sse.client.SSEClient;
34
import io.split.engine.sse.dtos.ControlNotification;
45
import io.split.engine.sse.dtos.ErrorNotification;
56
import io.split.engine.sse.dtos.OccupancyNotification;
@@ -8,6 +9,6 @@ public interface PushStatusTracker {
89
void handleIncomingControlEvent(ControlNotification controlNotification);
910
void handleIncomingOccupancyEvent(OccupancyNotification occupancyNotification);
1011
void handleIncomingAblyError(ErrorNotification notification);
11-
void handleSseStatus(SseStatus newStatus);
12+
void handleSseStatus(SSEClient.StatusMessage newStatus);
1213
void forcePushDisable();
1314
}

client/src/main/java/io/split/engine/sse/PushStatusTrackerImp.java

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.split.engine.sse;
22

33
import io.split.engine.common.PushManager;
4+
import io.split.engine.sse.client.SSEClient;
45
import io.split.engine.sse.dtos.ControlNotification;
56
import io.split.engine.sse.dtos.ControlType;
67
import io.split.engine.sse.dtos.ErrorNotification;
@@ -16,7 +17,7 @@ public class PushStatusTrackerImp implements PushStatusTracker {
1617
private static final Logger _log = LoggerFactory.getLogger(PushStatusTracker.class);
1718

1819
private final AtomicBoolean _publishersOnline = new AtomicBoolean(true);
19-
private final AtomicReference<SseStatus> _sseStatus = new AtomicReference<>(SseStatus.DISCONNECTED);
20+
private final AtomicReference<SSEClient.StatusMessage> _sseStatus = new AtomicReference<>(SSEClient.StatusMessage.INITIALIZATION_IN_PROGRESS);
2021
private final AtomicReference<ControlType> _backendStatus = new AtomicReference<>(ControlType.STREAMING_RESUMED);
2122
private final LinkedBlockingQueue<PushManager.Status> _statusMessages;
2223

@@ -26,33 +27,40 @@ public PushStatusTrackerImp(LinkedBlockingQueue<PushManager.Status> statusMessag
2627

2728
public synchronized void reset() {
2829
_publishersOnline.set(true);
29-
_sseStatus.set(SseStatus.DISCONNECTED);
30+
_sseStatus.set(SSEClient.StatusMessage.INITIALIZATION_IN_PROGRESS);
3031
_backendStatus.set(ControlType.STREAMING_RESUMED);
3132
}
3233

3334
@Override
34-
public void handleSseStatus(SseStatus newStatus) {
35-
_log.debug(String.format("handleSseStatus new status: %s", newStatus.toString()));
36-
_log.debug(String.format("handleSseStatus current status: %s", _sseStatus.get().toString()));
35+
public void handleSseStatus(SSEClient.StatusMessage newStatus) {
36+
_log.debug(String.format("Current status: %s. New status: %s", _sseStatus.get().toString(), newStatus.toString()));
37+
3738
switch(newStatus) {
3839
case CONNECTED:
39-
if (_sseStatus.compareAndSet(SseStatus.DISCONNECTED, SseStatus.CONNECTED)
40-
|| _sseStatus.compareAndSet(SseStatus.RETRYABLE_ERROR, SseStatus.CONNECTED)) {
40+
if (_sseStatus.compareAndSet(SSEClient.StatusMessage.INITIALIZATION_IN_PROGRESS, SSEClient.StatusMessage.CONNECTED)
41+
|| _sseStatus.compareAndSet(SSEClient.StatusMessage.RETRYABLE_ERROR, SSEClient.StatusMessage.CONNECTED)) {
4142
_statusMessages.offer(PushManager.Status.STREAMING_READY);
4243
}
4344
break;
4445
case RETRYABLE_ERROR:
45-
if (_sseStatus.compareAndSet(SseStatus.CONNECTED, SseStatus.RETRYABLE_ERROR)) {
46+
if (_sseStatus.compareAndSet(SSEClient.StatusMessage.CONNECTED, SSEClient.StatusMessage.RETRYABLE_ERROR)) {
4647
_statusMessages.offer(PushManager.Status.STREAMING_BACKOFF);
4748
}
4849
break;
4950
case NONRETRYABLE_ERROR:
50-
if (_sseStatus.compareAndSet(SseStatus.CONNECTED, SseStatus.NONRETRYABLE_ERROR)
51-
|| _sseStatus.compareAndSet(SseStatus.RETRYABLE_ERROR, SseStatus.NONRETRYABLE_ERROR)) {
51+
if (_sseStatus.compareAndSet(SSEClient.StatusMessage.CONNECTED, SSEClient.StatusMessage.NONRETRYABLE_ERROR)
52+
|| _sseStatus.compareAndSet(SSEClient.StatusMessage.RETRYABLE_ERROR, SSEClient.StatusMessage.NONRETRYABLE_ERROR)) {
5253
_statusMessages.offer(PushManager.Status.STREAMING_OFF);
5354
}
5455
break;
55-
case DISCONNECTED: // Restore initial status
56+
case FORCED_STOP:
57+
if (_sseStatus.compareAndSet(SSEClient.StatusMessage.CONNECTED, SSEClient.StatusMessage.FORCED_STOP)
58+
|| _sseStatus.compareAndSet(SSEClient.StatusMessage.RETRYABLE_ERROR, SSEClient.StatusMessage.FORCED_STOP)
59+
|| _sseStatus.compareAndSet(SSEClient.StatusMessage.INITIALIZATION_IN_PROGRESS, SSEClient.StatusMessage.FORCED_STOP)) {
60+
_statusMessages.offer(PushManager.Status.STREAMING_DOWN);
61+
}
62+
break;
63+
case INITIALIZATION_IN_PROGRESS: // Restore initial status
5664
reset();
5765
break;
5866
}
@@ -61,6 +69,7 @@ public void handleSseStatus(SseStatus newStatus) {
6169
@Override
6270
public void handleIncomingControlEvent(ControlNotification controlNotification) {
6371
_log.debug(String.format("handleIncomingOccupancyEvent: %s", controlNotification.getControlType()));
72+
6473
if (_backendStatus.get().equals(ControlType.STREAMING_DISABLED)) {
6574
return;
6675
}
@@ -87,6 +96,7 @@ public void handleIncomingControlEvent(ControlNotification controlNotification)
8796
@Override
8897
public void handleIncomingOccupancyEvent(OccupancyNotification occupancyNotification) {
8998
_log.debug(String.format("handleIncomingOccupancyEvent: publishers=%d", occupancyNotification.getMetrics().getPublishers()));
99+
90100
int publishers = occupancyNotification.getMetrics().getPublishers();
91101
if (publishers <= 0 && _publishersOnline.compareAndSet(true, false) && _backendStatus.get().equals(ControlType.STREAMING_RESUMED)) {
92102
_statusMessages.offer(PushManager.Status.STREAMING_DOWN);
@@ -98,6 +108,7 @@ public void handleIncomingOccupancyEvent(OccupancyNotification occupancyNotifica
98108
@Override
99109
public void handleIncomingAblyError(ErrorNotification notification) {
100110
_log.debug(String.format("handleIncomingAblyError: %s", notification.getMessage()));
111+
101112
if (_backendStatus.get().equals(ControlType.STREAMING_DISABLED)) {
102113
return; // Ignore
103114
}
@@ -112,8 +123,9 @@ public void handleIncomingAblyError(ErrorNotification notification) {
112123
@Override
113124
public synchronized void forcePushDisable() {
114125
_log.debug("forcePushDisable");
126+
115127
_publishersOnline.set(false);
116-
_sseStatus.set(SseStatus.DISCONNECTED);
128+
_sseStatus.set(SSEClient.StatusMessage.INITIALIZATION_IN_PROGRESS);
117129
_backendStatus.set(ControlType.STREAMING_DISABLED);
118130
_statusMessages.offer(PushManager.Status.STREAMING_OFF);
119131
}

0 commit comments

Comments
 (0)