Skip to content

Commit 40c124d

Browse files
committed
moved SSE closedbleHttpClient build
1 parent f4eaa1b commit 40c124d

File tree

7 files changed

+112
-55
lines changed

7 files changed

+112
-55
lines changed

client/src/main/java/io/split/client/SplitFactoryImpl.java

Lines changed: 52 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import io.split.integrations.IntegrationsConfig;
2323
import org.apache.hc.client5.http.auth.AuthScope;
2424
import org.apache.hc.client5.http.auth.Credentials;
25-
import org.apache.hc.client5.http.auth.CredentialsProvider;
2625
import org.apache.hc.client5.http.auth.UsernamePasswordCredentials;
2726
import org.apache.hc.client5.http.config.RequestConfig;
2827
import org.apache.hc.client5.http.cookie.StandardCookieSpec;
@@ -33,25 +32,18 @@
3332
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
3433
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder;
3534
import org.apache.hc.client5.http.impl.routing.DefaultProxyRoutePlanner;
36-
import org.apache.hc.client5.http.socket.ConnectionSocketFactory;
37-
import org.apache.hc.client5.http.socket.PlainConnectionSocketFactory;
3835
import org.apache.hc.client5.http.ssl.SSLConnectionSocketFactory;
3936
import org.apache.hc.client5.http.ssl.SSLConnectionSocketFactoryBuilder;
40-
import org.apache.hc.core5.http.config.Registry;
41-
import org.apache.hc.core5.http.config.RegistryBuilder;
4237
import org.apache.hc.core5.http.io.SocketConfig;
4338
import org.apache.hc.core5.http.ssl.TLS;
4439
import org.apache.hc.core5.ssl.SSLContexts;
4540
import org.apache.hc.core5.util.Timeout;
4641
import org.slf4j.Logger;
4742
import org.slf4j.LoggerFactory;
4843

49-
import javax.net.ssl.SSLContext;
5044
import java.io.IOException;
5145
import java.net.URI;
5246
import java.net.URISyntaxException;
53-
import java.security.KeyManagementException;
54-
import java.security.NoSuchAlgorithmException;
5547
import java.util.ArrayList;
5648
import java.util.List;
5749
import java.util.Random;
@@ -60,6 +52,8 @@
6052

6153
public class SplitFactoryImpl implements SplitFactory {
6254
private static final Logger _log = LoggerFactory.getLogger(SplitFactory.class);
55+
private final static long SSE_CONNECT_TIMEOUT = 30000;
56+
private final static long SSE_SOCKET_TIMEOUT = 70000;
6357

6458
private static final Multiset<String> USED_API_TOKENS = ConcurrentHashMultiset.create();
6559
private static Random RANDOM = new Random();
@@ -100,23 +94,60 @@ private static CloseableHttpClient buildHttpClient(String apiToken, SplitClientC
10094

10195
// Set up proxy is it exists
10296
if (config.proxy() != null) {
103-
_log.info("Initializing Split SDK with proxy settings");
104-
DefaultProxyRoutePlanner routePlanner = new DefaultProxyRoutePlanner(config.proxy());
105-
httpClientbuilder.setRoutePlanner(routePlanner);
106-
107-
if (config.proxyUsername() != null && config.proxyPassword() != null) {
108-
_log.debug("Proxy setup using credentials");
109-
BasicCredentialsProvider credsProvider = new BasicCredentialsProvider();
110-
AuthScope siteScope = new AuthScope(config.proxy().getHostName(), config.proxy().getPort());
111-
Credentials siteCreds = new UsernamePasswordCredentials(config.proxyUsername(), config.proxyPassword().toCharArray());
112-
credsProvider.setCredentials(siteScope, siteCreds);
113-
httpClientbuilder.setDefaultCredentialsProvider(credsProvider);
114-
}
97+
httpClientbuilder = setupProxy(httpClientbuilder, config);
11598
}
11699

117100
return httpClientbuilder.build();
118101
}
119102

103+
private static CloseableHttpClient buildSSEdHttpClient(SplitClientConfig config) {
104+
RequestConfig requestConfig = RequestConfig.custom()
105+
.setConnectTimeout(Timeout.of(SSE_CONNECT_TIMEOUT, TimeUnit.MILLISECONDS))
106+
.build();
107+
108+
SSLConnectionSocketFactory sslSocketFactory = SSLConnectionSocketFactoryBuilder.create()
109+
.setSslContext(SSLContexts.createSystemDefault())
110+
.setTlsVersions(TLS.V_1_1, TLS.V_1_2)
111+
.build();
112+
113+
PoolingHttpClientConnectionManager cm = PoolingHttpClientConnectionManagerBuilder.create()
114+
.setSSLSocketFactory(sslSocketFactory)
115+
.setDefaultSocketConfig(SocketConfig.custom()
116+
.setSoTimeout(Timeout.ofMilliseconds(SSE_SOCKET_TIMEOUT))
117+
.build())
118+
.build();
119+
cm.setMaxTotal(1);
120+
cm.setDefaultMaxPerRoute(1);
121+
122+
HttpClientBuilder httpClientbuilder = HttpClients.custom()
123+
.setConnectionManager(cm)
124+
.setDefaultRequestConfig(requestConfig);
125+
126+
// Set up proxy is it exists
127+
if (config.proxy() != null) {
128+
httpClientbuilder = setupProxy(httpClientbuilder, config);
129+
}
130+
131+
return httpClientbuilder.build();
132+
}
133+
134+
private static HttpClientBuilder setupProxy(HttpClientBuilder httpClientbuilder, SplitClientConfig config) {
135+
_log.info("Initializing Split SDK with proxy settings");
136+
DefaultProxyRoutePlanner routePlanner = new DefaultProxyRoutePlanner(config.proxy());
137+
httpClientbuilder.setRoutePlanner(routePlanner);
138+
139+
if (config.proxyUsername() != null && config.proxyPassword() != null) {
140+
_log.debug("Proxy setup using credentials");
141+
BasicCredentialsProvider credsProvider = new BasicCredentialsProvider();
142+
AuthScope siteScope = new AuthScope(config.proxy().getHostName(), config.proxy().getPort());
143+
Credentials siteCreds = new UsernamePasswordCredentials(config.proxyUsername(), config.proxyPassword().toCharArray());
144+
credsProvider.setCredentials(siteScope, siteCreds);
145+
httpClientbuilder.setDefaultCredentialsProvider(credsProvider);
146+
}
147+
148+
return httpClientbuilder;
149+
}
150+
120151
public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyntaxException {
121152
_apiToken = apiToken;
122153

@@ -190,7 +221,7 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
190221
final EventClient eventClient = EventClientImpl.create(httpclient, eventsRootTarget, config.eventsQueueSize(), config.eventFlushIntervalInMillis(), config.waitBeforeShutdown());
191222

192223
// SyncManager
193-
final SyncManager syncManager = SyncManagerImp.build(config.streamingEnabled(), splitFetcherProvider, segmentFetcher, config.authServiceURL(), httpclient, config.streamingServiceURL(), config.authRetryBackoffBase());
224+
final SyncManager syncManager = SyncManagerImp.build(config.streamingEnabled(), splitFetcherProvider, segmentFetcher, config.authServiceURL(), httpclient, config.streamingServiceURL(), config.authRetryBackoffBase(), buildSSEdHttpClient(config));
194225
syncManager.start();
195226

196227
destroyer = new Runnable() {

client/src/main/java/io/split/engine/common/PushManagerImp.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,13 @@ public static PushManagerImp build(Synchronizer synchronizer,
6565
String authUrl,
6666
CloseableHttpClient httpClient,
6767
int authRetryBackOffBase,
68-
LinkedBlockingQueue<PushManager.Status> statusMessages) {
69-
68+
LinkedBlockingQueue<PushManager.Status> statusMessages,
69+
CloseableHttpClient sseHttpClient) {
7070
SplitsWorker splitsWorker = new SplitsWorkerImp(synchronizer);
7171
Worker<SegmentQueueDto> segmentWorker = new SegmentsWorkerImp(synchronizer);
7272
PushStatusTracker pushStatusTracker = new PushStatusTrackerImp(statusMessages);
7373
return new PushManagerImp(new AuthApiClientImp(authUrl, httpClient),
74-
EventSourceClientImp.build(streamingUrl, splitsWorker, segmentWorker, pushStatusTracker),
74+
EventSourceClientImp.build(streamingUrl, splitsWorker, segmentWorker, pushStatusTracker, sseHttpClient),
7575
splitsWorker,
7676
segmentWorker,
7777
new Backoff(authRetryBackOffBase),

client/src/main/java/io/split/engine/common/SyncManagerImp.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,11 @@ public static SyncManagerImp build(boolean streamingEnabledConfig,
4949
String authUrl,
5050
CloseableHttpClient httpClient,
5151
String streamingServiceUrl,
52-
int authRetryBackOffBase) {
53-
52+
int authRetryBackOffBase,
53+
CloseableHttpClient sseHttpClient) {
5454
LinkedBlockingQueue<PushManager.Status> pushMessages = new LinkedBlockingQueue<>();
5555
Synchronizer synchronizer = new SynchronizerImp(refreshableSplitFetcherProvider, segmentFetcher);
56-
PushManager pushManager = PushManagerImp.build(synchronizer, streamingServiceUrl, authUrl, httpClient, authRetryBackOffBase, pushMessages);
56+
PushManager pushManager = PushManagerImp.build(synchronizer, streamingServiceUrl, authUrl, httpClient, authRetryBackOffBase, pushMessages, sseHttpClient);
5757
return new SyncManagerImp(streamingEnabledConfig, synchronizer, pushManager, pushMessages);
5858
}
5959

client/src/main/java/io/split/engine/sse/EventSourceClientImp.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import io.split.engine.sse.exceptions.EventParsingException;
88
import io.split.engine.sse.workers.SplitsWorker;
99
import io.split.engine.sse.workers.Worker;
10+
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
1011
import org.apache.hc.core5.net.URIBuilder;
1112
import org.slf4j.Logger;
1213
import org.slf4j.LoggerFactory;
@@ -29,27 +30,30 @@ public class EventSourceClientImp implements EventSourceClient {
2930
/* package private */ EventSourceClientImp(String baseStreamingUrl,
3031
NotificationParser notificationParser,
3132
NotificationProcessor notificationProcessor,
32-
PushStatusTracker pushStatusTracker) {
33+
PushStatusTracker pushStatusTracker,
34+
CloseableHttpClient sseHttpClient) {
3335
_baseStreamingUrl = checkNotNull(baseStreamingUrl);
3436
_notificationParser = checkNotNull(notificationParser);
3537
_notificationProcessor = checkNotNull(notificationProcessor);
3638
_pushStatusTracker = pushStatusTracker;
3739

3840
_sseClient = new SSEClient(
3941
inboundEvent -> { onMessage(inboundEvent); return null; },
40-
status -> { _pushStatusTracker.handleSseStatus(status); return null; });
42+
status -> { _pushStatusTracker.handleSseStatus(status); return null; },
43+
sseHttpClient);
4144

4245
}
4346

4447
public static EventSourceClientImp build(String baseStreamingUrl,
4548
SplitsWorker splitsWorker,
4649
Worker<SegmentQueueDto> segmentWorker,
47-
PushStatusTracker pushStatusTracker) {
48-
50+
PushStatusTracker pushStatusTracker,
51+
CloseableHttpClient sseHttpClient) {
4952
return new EventSourceClientImp(baseStreamingUrl,
5053
new NotificationParserImp(),
5154
NotificationProcessorImp.build(splitsWorker, segmentWorker, pushStatusTracker),
52-
pushStatusTracker);
55+
pushStatusTracker,
56+
sseHttpClient);
5357
}
5458

5559
@Override

client/src/main/java/io/split/engine/sse/client/SSEClient.java

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,12 @@ private enum ConnectionState {
5252
private final AtomicReference<ConnectionState> _state = new AtomicReference<>(ConnectionState.CLOSED);
5353
private final AtomicReference<CloseableHttpResponse> _ongoingResponse = new AtomicReference<>();
5454

55-
public SSEClient(Function<RawEvent, Void> eventCallback, Function<StatusMessage, Void> statusCallback) {
56-
_client = buildHttpClient();
55+
public SSEClient(Function<RawEvent, Void> eventCallback,
56+
Function<StatusMessage, Void> statusCallback,
57+
CloseableHttpClient client) {
5758
_eventCallback = eventCallback;
5859
_statusCallback = statusCallback;
60+
_client = client;
5961
}
6062

6163
public synchronized boolean open(URI uri) {
@@ -176,22 +178,6 @@ static private String readMessageAsString(BufferedReader reader) throws IOExcept
176178
}
177179
}
178180

179-
private static CloseableHttpClient buildHttpClient() {
180-
RequestConfig requestConfig = RequestConfig.custom()
181-
.setConnectTimeout(Timeout.of(CONNECT_TIMEOUT, TimeUnit.MILLISECONDS))
182-
//.setSocketTimeout(SOCKET_TIMEOUT) TODO: Use a PoolingHttpClientConnectionManager to set socket timeout
183-
.build();
184-
185-
PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
186-
cm.setMaxTotal(1);
187-
cm.setDefaultMaxPerRoute(1);
188-
189-
return HttpClients.custom()
190-
.setConnectionManager(cm)
191-
.setDefaultRequestConfig(requestConfig)
192-
.build();
193-
}
194-
195181
private void handleMessage(String message) {
196182
if (Strings.isNullOrEmpty(message) || KEEP_ALIVE_PAYLOAD.equals(message)) {
197183
_log.debug("Keep Alive event");

client/src/test/java/io/split/engine/sse/EventSourceClientTest.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@
44
import io.split.engine.sse.client.SSEClient;
55
import io.split.engine.sse.dtos.ErrorNotification;
66
import io.split.engine.sse.dtos.SplitChangeNotification;
7+
import org.apache.hc.client5.http.config.RequestConfig;
8+
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
9+
import org.apache.hc.client5.http.impl.classic.HttpClients;
10+
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
11+
import org.apache.hc.core5.util.Timeout;
712
import org.awaitility.Awaitility;
813
import org.glassfish.grizzly.utils.Pair;
914
import org.glassfish.jersey.media.sse.OutboundEvent;
@@ -33,7 +38,8 @@ public void startShouldConnect() throws IOException {
3338
SSEMockServer.SseEventQueue eventQueue = new SSEMockServer.SseEventQueue();
3439
SSEMockServer sseServer = buildSSEMockServer(eventQueue);
3540
sseServer.start();
36-
EventSourceClient eventSourceClient = new EventSourceClientImp("http://localhost:" + sseServer.getPort(), _notificationParser, _notificationProcessor, _pushStatusTracker);
41+
42+
EventSourceClient eventSourceClient = new EventSourceClientImp("http://localhost:" + sseServer.getPort(), _notificationParser, _notificationProcessor, _pushStatusTracker, buildHttpClient());
3743

3844
boolean result = eventSourceClient.start("channel-test","token-test");
3945

@@ -47,7 +53,7 @@ public void startShouldNotConnect() throws IOException {
4753
SSEMockServer.SseEventQueue eventQueue = new SSEMockServer.SseEventQueue();
4854
SSEMockServer sseServer = buildSSEMockServer(eventQueue);
4955
sseServer.start();
50-
EventSourceClient eventSourceClient = new EventSourceClientImp("http://fake:" + sseServer.getPort(), _notificationParser, _notificationProcessor, _pushStatusTracker);
56+
EventSourceClient eventSourceClient = new EventSourceClientImp("http://fake:" + sseServer.getPort(), _notificationParser, _notificationProcessor, _pushStatusTracker, buildHttpClient());
5157

5258
boolean result = eventSourceClient.start("channel-test","token-test");
5359

@@ -63,7 +69,7 @@ public void startAndReceiveNotification() throws IOException {
6369
SSEMockServer.SseEventQueue eventQueue = new SSEMockServer.SseEventQueue();
6470
SSEMockServer sseServer = buildSSEMockServer(eventQueue);
6571
sseServer.start();
66-
EventSourceClient eventSourceClient = new EventSourceClientImp("http://localhost:" + sseServer.getPort(), _notificationParser, _notificationProcessor, _pushStatusTracker);
72+
EventSourceClient eventSourceClient = new EventSourceClientImp("http://localhost:" + sseServer.getPort(), _notificationParser, _notificationProcessor, _pushStatusTracker, buildHttpClient());
6773

6874
boolean result = eventSourceClient.start("channel-test","token-test");
6975

@@ -110,4 +116,19 @@ private SSEMockServer buildSSEMockServer(SSEMockServer.SseEventQueue eventQueue)
110116
return new Pair<>(null, true);
111117
});
112118
}
119+
120+
private static CloseableHttpClient buildHttpClient() {
121+
RequestConfig requestConfig = RequestConfig.custom()
122+
.setConnectTimeout(Timeout.of(70000, TimeUnit.MILLISECONDS))
123+
.build();
124+
125+
PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
126+
cm.setMaxTotal(1);
127+
cm.setDefaultMaxPerRoute(1);
128+
129+
return HttpClients.custom()
130+
.setConnectionManager(cm)
131+
.setDefaultRequestConfig(requestConfig)
132+
.build();
133+
}
113134
}

client/src/test/java/io/split/engine/sse/SSEClientTest.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,18 @@
1-
package io.split.engine.sse;
1+
/*package io.split.engine.sse;
22
33
import io.split.engine.sse.client.SSEClient;
4+
import org.apache.hc.client5.http.config.RequestConfig;
5+
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
6+
import org.apache.hc.client5.http.impl.classic.HttpClientBuilder;
7+
import org.apache.hc.client5.http.impl.classic.HttpClients;
48
import org.apache.hc.core5.net.URIBuilder;
9+
import org.apache.hc.core5.util.Timeout;
510
import org.junit.Ignore;
611
import org.junit.Test;
712
813
import java.net.URI;
914
import java.net.URISyntaxException;
15+
import java.util.concurrent.TimeUnit;
1016
1117
public class SSEClientTest {
1218
@@ -19,12 +25,21 @@ public void basicUsageTest() throws URISyntaxException, InterruptedException {
1925
.addParameter("channels", "[?occupancy=metrics.publishers]control_pri")
2026
.build();
2127
28+
RequestConfig requestConfig = RequestConfig.custom()
29+
.setConnectTimeout(Timeout.of(70000, TimeUnit.MILLISECONDS))
30+
.build();
31+
32+
HttpClientBuilder httpClientbuilder = HttpClients.custom()
33+
.setDefaultRequestConfig(requestConfig);
34+
35+
CloseableHttpClient httpClient = httpClientbuilder.build();
2236
2337
SSEClient sse = new SSEClient(e -> { System.out.println(e); return null; },
24-
s -> { System.out.println(s); return null; });
38+
s -> { System.out.println(s); return null; }, httpClient);
2539
sse.open(uri);
2640
Thread.sleep(5000);
2741
sse.close();
2842
Thread.sleep(100000);
2943
}
3044
}
45+
*/

0 commit comments

Comments
 (0)