1616import io .split .engine .SDKReadinessGates ;
1717import io .split .engine .common .SyncManager ;
1818import io .split .engine .common .SyncManagerImp ;
19- import io .split .engine .experiments .SplitFetcherImp ;
20- import io .split .engine .experiments .SplitSynchronizationTask ;
21- import io .split .engine .experiments .SplitChangeFetcher ;
22- import io .split .engine .experiments .SplitParser ;
19+ import io .split .engine .experiments .*;
2320import io .split .engine .segments .SegmentChangeFetcher ;
2421import io .split .cache .SegmentCache ;
2522import io .split .cache .SegmentCacheInMemoryImpl ;
@@ -62,10 +59,31 @@ public class SplitFactoryImpl implements SplitFactory {
6259
6360 private static Random RANDOM = new Random ();
6461
62+ private final URI _rootTarget ;
63+ private final URI _eventsRootTarget ;
64+ private final CloseableHttpClient _httpclient ;
65+ private final SDKReadinessGates _gates ;
66+ private final HttpMetrics _httpMetrics ;
67+ private final FireAndForgetMetrics _unCachedFireAndForget ;
68+ private final SegmentSynchronizationTaskImp _segmentSynchronizationTaskImp ;
69+ private final SplitFetcher _splitFetcher ;
70+ private final SplitSynchronizationTask _splitSynchronizationTask ;
71+ private final ImpressionsManagerImpl _impressionsManager ;
72+ private final FireAndForgetMetrics _cachedFireAndForgetMetrics ;
73+ private final EventClient _eventClient ;
74+ private final SyncManager _syncManager ;
75+ private final Evaluator _evaluator ;
76+ private final Runnable _destroyer ;
77+ private final String _apiToken ;
78+
79+ // Caches
80+ private final SegmentCache _segmentCache ;
81+ private final SplitCache _splitCache ;
82+
83+ // Client and Manager
6584 private final SplitClient _client ;
6685 private final SplitManager _manager ;
67- private final Runnable destroyer ;
68- private final String _apiToken ;
86+
6987 private boolean isTerminated = false ;
7088 private final ApiKeyCounter _apiKeyCounter ;
7189
@@ -81,108 +99,59 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
8199
82100 }
83101
84- final CloseableHttpClient httpclient = buildHttpClient (apiToken , config );
102+ // SDKReadinessGates
103+ _gates = new SDKReadinessGates ();
85104
86- URI rootTarget = URI . create ( config . endpoint ());
87- URI eventsRootTarget = URI . create ( config . eventsEndpoint () );
105+ // HttpClient
106+ _httpclient = buildHttpClient ( apiToken , config );
88107
89- // Metrics
90- HttpMetrics httpMetrics = HttpMetrics .create (httpclient , eventsRootTarget );
91- final FireAndForgetMetrics uncachedFireAndForget = FireAndForgetMetrics . instance ( httpMetrics , 2 , 1000 );
108+ // Roots
109+ _rootTarget = URI .create (config . endpoint () );
110+ _eventsRootTarget = URI . create ( config . eventsEndpoint () );
92111
93- SDKReadinessGates gates = new SDKReadinessGates ();
112+ // HttpMetrics
113+ _httpMetrics = HttpMetrics .create (_httpclient , _eventsRootTarget );
94114
95- // Segments
96- SegmentChangeFetcher segmentChangeFetcher = HttpSegmentChangeFetcher .create (httpclient , rootTarget , uncachedFireAndForget );
97- //This segmentCache is for inMemory Storage (the only one supported by java-client for the moment
98- SegmentCache segmentCache = new SegmentCacheInMemoryImpl ();
99- final SegmentSynchronizationTaskImp segmentSynchronizationTaskImp = new SegmentSynchronizationTaskImp (segmentChangeFetcher ,
100- findPollingPeriod (RANDOM , config .segmentsRefreshRate ()),
101- config .numThreadsForSegmentFetch (),
102- gates ,
103- segmentCache );
115+ // Cache Initialisations
116+ _segmentCache = new SegmentCacheInMemoryImpl ();
117+ _splitCache = new InMemoryCacheImp ();
104118
105- SplitParser splitParser = new SplitParser (segmentSynchronizationTaskImp , segmentCache );
119+ // Metrics
120+ _unCachedFireAndForget = FireAndForgetMetrics .instance (_httpMetrics , 2 , 1000 );
106121
107- // Feature Changes
108- SplitChangeFetcher splitChangeFetcher = HttpSplitChangeFetcher . create ( httpclient , rootTarget , uncachedFireAndForget );
122+ // Segments
123+ _segmentSynchronizationTaskImp = buildSegments ( config );
109124
110- final SplitCache splitCache = new InMemoryCacheImp ();
111- final SplitFetcherImp splitFetcher = new SplitFetcherImp (splitChangeFetcher , splitParser , gates , splitCache );
112- final SplitSynchronizationTask splitSynchronizationTask = new SplitSynchronizationTask (splitFetcher , splitCache , findPollingPeriod (RANDOM , config .featuresRefreshRate ()));
125+ // SplitFetcher
126+ _splitFetcher = buildSplitFetcher ();
113127
114- List <ImpressionListener > impressionListeners = new ArrayList <>();
115- // Setup integrations
116- if (config .integrationsConfig () != null ) {
117- config .integrationsConfig ().getImpressionsListeners (IntegrationsConfig .Execution .ASYNC ).stream ()
118- .map (l -> AsynchronousImpressionListener .build (l .listener (), l .queueSize ()))
119- .collect (Collectors .toCollection (() -> impressionListeners ));
120-
121- config .integrationsConfig ().getImpressionsListeners (IntegrationsConfig .Execution .SYNC ).stream ()
122- .map (IntegrationsConfig .ImpressionListenerWithMeta ::listener )
123- .collect (Collectors .toCollection (() -> impressionListeners ));
124- }
128+ // SplitSynchronizationTask
129+ _splitSynchronizationTask = new SplitSynchronizationTask (_splitFetcher , _splitCache , findPollingPeriod (RANDOM , config .featuresRefreshRate ()));
125130
126131 // Impressions
127- final ImpressionsManagerImpl impressionsManager = ImpressionsManagerImpl . instance ( httpclient , config , impressionListeners );
132+ _impressionsManager = buildImpressionsManager ( config );
128133
129- CachedMetrics cachedMetrics = new CachedMetrics ( httpMetrics , TimeUnit . SECONDS . toMillis ( config . metricsRefreshRate ()));
130- final FireAndForgetMetrics cachedFireAndForgetMetrics = FireAndForgetMetrics . instance ( cachedMetrics , 2 , 1000 );
134+ // CachedFireAndForgetMetrics
135+ _cachedFireAndForgetMetrics = buildCachedFireAndForgetMetrics ( config );
131136
132- final EventClient eventClient = EventClientImpl .create (httpclient , eventsRootTarget , config .eventsQueueSize (), config .eventFlushIntervalInMillis (), config .waitBeforeShutdown ());
137+ // EventClient
138+ _eventClient = EventClientImpl .create (_httpclient , _eventsRootTarget , config .eventsQueueSize (), config .eventFlushIntervalInMillis (), config .waitBeforeShutdown ());
133139
134140 // SyncManager
135- final SyncManager syncManager = SyncManagerImp .build (config .streamingEnabled (), splitSynchronizationTask , splitFetcher , segmentSynchronizationTaskImp , splitCache , config .authServiceURL (), httpclient , config .streamingServiceURL (), config .authRetryBackoffBase (), buildSSEdHttpClient (config ), segmentCache );
136- syncManager .start ();
141+ _syncManager = SyncManagerImp .build (config .streamingEnabled (), _splitSynchronizationTask , _splitFetcher , _segmentSynchronizationTaskImp , _splitCache , config .authServiceURL (), _httpclient , config .streamingServiceURL (), config .authRetryBackoffBase (), buildSSEdHttpClient (config ), _segmentCache );
142+ _syncManager .start ();
143+
144+ // Destroyer
145+ _destroyer = buildDestroyer (config );
137146
138147 // Evaluator
139- final Evaluator evaluator = new EvaluatorImp (splitCache );
140-
141- destroyer = new Runnable () {
142- public void run () {
143- _log .info ("Shutdown called for split" );
144- try {
145- segmentSynchronizationTaskImp .close ();
146- _log .info ("Successful shutdown of segment fetchers" );
147- splitSynchronizationTask .close ();
148- _log .info ("Successful shutdown of splits" );
149- impressionsManager .close ();
150- _log .info ("Successful shutdown of impressions manager" );
151- uncachedFireAndForget .close ();
152- _log .info ("Successful shutdown of metrics 1" );
153- cachedFireAndForgetMetrics .close ();
154- _log .info ("Successful shutdown of metrics 2" );
155- httpclient .close ();
156- _log .info ("Successful shutdown of httpclient" );
157- eventClient .close ();
158- _log .info ("Successful shutdown of eventClient" );
159- new Thread (syncManager ::shutdown ).start ();
160- _log .info ("Successful shutdown of syncManager" );
161- } catch (IOException e ) {
162- _log .error ("We could not shutdown split" , e );
163- }
164- }
165- };
148+ _evaluator = new EvaluatorImp (_splitCache );
166149
167- if (config .destroyOnShutDown ()) {
168- Runtime .getRuntime ().addShutdownHook (new Thread () {
169- @ Override
170- public void run () {
171- // Using the full path to avoid conflicting with Thread.destroy()
172- SplitFactoryImpl .this .destroy ();
173- }
174- });
175- }
150+ // SplitClient
151+ _client = new SplitClientImpl (this , _splitCache , _impressionsManager , _cachedFireAndForgetMetrics , _eventClient , config , _gates , _evaluator );
176152
177- _client = new SplitClientImpl (this ,
178- splitCache ,
179- impressionsManager ,
180- cachedFireAndForgetMetrics ,
181- eventClient ,
182- config ,
183- gates ,
184- evaluator );
185- _manager = new SplitManagerImpl (splitCache , config , gates );
153+ // SplitManager
154+ _manager = new SplitManagerImpl (_splitCache , config , _gates );
186155 }
187156
188157 @ Override
@@ -199,7 +168,7 @@ public SplitManager manager() {
199168 public void destroy () {
200169 synchronized (SplitFactoryImpl .class ) {
201170 if (!isTerminated ) {
202- destroyer .run ();
171+ _destroyer .run ();
203172 _apiKeyCounter .remove (_apiToken );
204173 isTerminated = true ;
205174 }
@@ -299,4 +268,75 @@ private static int findPollingPeriod(Random rand, int max) {
299268 int min = max / 2 ;
300269 return rand .nextInt ((max - min ) + 1 ) + min ;
301270 }
271+
272+ private SegmentSynchronizationTaskImp buildSegments (SplitClientConfig config ) throws URISyntaxException {
273+ SegmentChangeFetcher segmentChangeFetcher = HttpSegmentChangeFetcher .create (_httpclient , _rootTarget , _unCachedFireAndForget );
274+
275+ return new SegmentSynchronizationTaskImp (segmentChangeFetcher ,
276+ findPollingPeriod (RANDOM , config .segmentsRefreshRate ()),
277+ config .numThreadsForSegmentFetch (),
278+ _gates ,
279+ _segmentCache );
280+ }
281+
282+ private SplitFetcher buildSplitFetcher () throws URISyntaxException {
283+ SplitChangeFetcher splitChangeFetcher = HttpSplitChangeFetcher .create (_httpclient , _rootTarget , _unCachedFireAndForget );
284+ SplitParser splitParser = new SplitParser (_segmentSynchronizationTaskImp , _segmentCache );
285+
286+ return new SplitFetcherImp (splitChangeFetcher , splitParser , _gates , _splitCache );
287+ }
288+
289+ private ImpressionsManagerImpl buildImpressionsManager (SplitClientConfig config ) throws URISyntaxException {
290+ List <ImpressionListener > impressionListeners = new ArrayList <>();
291+ if (config .integrationsConfig () != null ) {
292+ config .integrationsConfig ().getImpressionsListeners (IntegrationsConfig .Execution .ASYNC ).stream ()
293+ .map (l -> AsynchronousImpressionListener .build (l .listener (), l .queueSize ()))
294+ .collect (Collectors .toCollection (() -> impressionListeners ));
295+
296+ config .integrationsConfig ().getImpressionsListeners (IntegrationsConfig .Execution .SYNC ).stream ()
297+ .map (IntegrationsConfig .ImpressionListenerWithMeta ::listener )
298+ .collect (Collectors .toCollection (() -> impressionListeners ));
299+ }
300+
301+ return ImpressionsManagerImpl .instance (_httpclient , config , impressionListeners );
302+ }
303+
304+ private FireAndForgetMetrics buildCachedFireAndForgetMetrics (SplitClientConfig config ) {
305+ CachedMetrics cachedMetrics = new CachedMetrics (_httpMetrics , TimeUnit .SECONDS .toMillis (config .metricsRefreshRate ()));
306+
307+ return FireAndForgetMetrics .instance (cachedMetrics , 2 , 1000 );
308+ }
309+
310+ private Runnable buildDestroyer (SplitClientConfig config ) {
311+ if (config .destroyOnShutDown ()) {
312+ Runtime .getRuntime ().addShutdownHook (new Thread (() -> {
313+ // Using the full path to avoid conflicting with Thread.destroy()
314+ SplitFactoryImpl .this .destroy ();
315+ }));
316+ }
317+
318+ return () -> {
319+ _log .info ("Shutdown called for split" );
320+ try {
321+ _segmentSynchronizationTaskImp .close ();
322+ _log .info ("Successful shutdown of segment fetchers" );
323+ _splitSynchronizationTask .close ();
324+ _log .info ("Successful shutdown of splits" );
325+ _impressionsManager .close ();
326+ _log .info ("Successful shutdown of impressions manager" );
327+ _unCachedFireAndForget .close ();
328+ _log .info ("Successful shutdown of metrics 1" );
329+ _cachedFireAndForgetMetrics .close ();
330+ _log .info ("Successful shutdown of metrics 2" );
331+ _httpclient .close ();
332+ _log .info ("Successful shutdown of httpclient" );
333+ _eventClient .close ();
334+ _log .info ("Successful shutdown of eventClient" );
335+ _syncManager .shutdown ();
336+ _log .info ("Successful shutdown of syncManager" );
337+ } catch (IOException e ) {
338+ _log .error ("We could not shutdown split" , e );
339+ }
340+ };
341+ }
302342}
0 commit comments