1616import io .split .engine .SDKReadinessGates ;
1717import io .split .engine .common .SyncManager ;
1818import io .split .engine .common .SyncManagerImp ;
19- import io .split .engine .experiments .SplitFetcherImp ;
20- import io .split .engine .experiments .SplitSynchronizationTask ;
2119import io .split .engine .experiments .SplitChangeFetcher ;
20+ import io .split .engine .experiments .SplitFetcher ;
21+ import io .split .engine .experiments .SplitFetcherImp ;
2222import io .split .engine .experiments .SplitParser ;
23+ import io .split .engine .experiments .SplitSynchronizationTask ;
2324import io .split .engine .segments .SegmentChangeFetcher ;
2425import io .split .cache .SegmentCache ;
2526import io .split .cache .SegmentCacheInMemoryImpl ;
@@ -62,10 +63,30 @@ public class SplitFactoryImpl implements SplitFactory {
6263
6364 private static Random RANDOM = new Random ();
6465
66+ private final URI _rootTarget ;
67+ private final URI _eventsRootTarget ;
68+ private final CloseableHttpClient _httpclient ;
69+ private final SDKReadinessGates _gates ;
70+ private final HttpMetrics _httpMetrics ;
71+ private final FireAndForgetMetrics _unCachedFireAndForget ;
72+ private final SegmentSynchronizationTaskImp _segmentSynchronizationTaskImp ;
73+ private final SplitFetcher _splitFetcher ;
74+ private final SplitSynchronizationTask _splitSynchronizationTask ;
75+ private final ImpressionsManagerImpl _impressionsManager ;
76+ private final FireAndForgetMetrics _cachedFireAndForgetMetrics ;
77+ private final EventClient _eventClient ;
78+ private final SyncManager _syncManager ;
79+ private final Evaluator _evaluator ;
80+ private final String _apiToken ;
81+
82+ // Caches
83+ private final SegmentCache _segmentCache ;
84+ private final SplitCache _splitCache ;
85+
86+ // Client and Manager
6587 private final SplitClient _client ;
6688 private final SplitManager _manager ;
67- private final Runnable destroyer ;
68- private final String _apiToken ;
89+
6990 private boolean isTerminated = false ;
7091 private final ApiKeyCounter _apiKeyCounter ;
7192
@@ -81,108 +102,64 @@ public SplitFactoryImpl(String apiToken, SplitClientConfig config) throws URISyn
81102
82103 }
83104
84- final CloseableHttpClient httpclient = buildHttpClient (apiToken , config );
105+ // SDKReadinessGates
106+ _gates = new SDKReadinessGates ();
85107
86- URI rootTarget = URI .create (config .endpoint ());
87- URI eventsRootTarget = URI .create (config .eventsEndpoint ());
88-
89- // Metrics
90- HttpMetrics httpMetrics = HttpMetrics .create (httpclient , eventsRootTarget );
91- final FireAndForgetMetrics uncachedFireAndForget = FireAndForgetMetrics .instance (httpMetrics , 2 , 1000 );
108+ // HttpClient
109+ _httpclient = buildHttpClient (apiToken , config );
92110
93- SDKReadinessGates gates = new SDKReadinessGates ();
111+ // Roots
112+ _rootTarget = URI .create (config .endpoint ());
113+ _eventsRootTarget = URI .create (config .eventsEndpoint ());
94114
95- // Segments
96- SegmentChangeFetcher segmentChangeFetcher = HttpSegmentChangeFetcher .create (httpclient , rootTarget , uncachedFireAndForget );
97- //This segmentCache is for inMemory Storage (the only one supported by java-client for the moment
98- SegmentCache segmentCache = new SegmentCacheInMemoryImpl ();
99- final SegmentSynchronizationTaskImp segmentSynchronizationTaskImp = new SegmentSynchronizationTaskImp (segmentChangeFetcher ,
100- findPollingPeriod (RANDOM , config .segmentsRefreshRate ()),
101- config .numThreadsForSegmentFetch (),
102- gates ,
103- segmentCache );
115+ // HttpMetrics
116+ _httpMetrics = HttpMetrics .create (_httpclient , _eventsRootTarget );
104117
105- SplitParser splitParser = new SplitParser (segmentSynchronizationTaskImp , segmentCache );
118+ // Cache Initialisations
119+ _segmentCache = new SegmentCacheInMemoryImpl ();
120+ _splitCache = new InMemoryCacheImp ();
106121
107- // Feature Changes
108- SplitChangeFetcher splitChangeFetcher = HttpSplitChangeFetcher . create ( httpclient , rootTarget , uncachedFireAndForget );
122+ // Metrics
123+ _unCachedFireAndForget = FireAndForgetMetrics . instance ( _httpMetrics , 2 , 1000 );
109124
110- final SplitCache splitCache = new InMemoryCacheImp ();
111- final SplitFetcherImp splitFetcher = new SplitFetcherImp (splitChangeFetcher , splitParser , gates , splitCache );
112- final SplitSynchronizationTask splitSynchronizationTask = new SplitSynchronizationTask (splitFetcher , splitCache , findPollingPeriod (RANDOM , config .featuresRefreshRate ()));
125+ // Segments
126+ _segmentSynchronizationTaskImp = buildSegments (config );
113127
114- List <ImpressionListener > impressionListeners = new ArrayList <>();
115- // Setup integrations
116- if (config .integrationsConfig () != null ) {
117- config .integrationsConfig ().getImpressionsListeners (IntegrationsConfig .Execution .ASYNC ).stream ()
118- .map (l -> AsynchronousImpressionListener .build (l .listener (), l .queueSize ()))
119- .collect (Collectors .toCollection (() -> impressionListeners ));
128+ // SplitFetcher
129+ _splitFetcher = buildSplitFetcher ();
120130
121- config .integrationsConfig ().getImpressionsListeners (IntegrationsConfig .Execution .SYNC ).stream ()
122- .map (IntegrationsConfig .ImpressionListenerWithMeta ::listener )
123- .collect (Collectors .toCollection (() -> impressionListeners ));
124- }
131+ // SplitSynchronizationTask
132+ _splitSynchronizationTask = new SplitSynchronizationTask (_splitFetcher , _splitCache , findPollingPeriod (RANDOM , config .featuresRefreshRate ()));
125133
126134 // Impressions
127- final ImpressionsManagerImpl impressionsManager = ImpressionsManagerImpl . instance ( httpclient , config , impressionListeners );
135+ _impressionsManager = buildImpressionsManager ( config );
128136
129- CachedMetrics cachedMetrics = new CachedMetrics ( httpMetrics , TimeUnit . SECONDS . toMillis ( config . metricsRefreshRate ()));
130- final FireAndForgetMetrics cachedFireAndForgetMetrics = FireAndForgetMetrics . instance ( cachedMetrics , 2 , 1000 );
137+ // CachedFireAndForgetMetrics
138+ _cachedFireAndForgetMetrics = buildCachedFireAndForgetMetrics ( config );
131139
132- final EventClient eventClient = EventClientImpl .create (httpclient , eventsRootTarget , config .eventsQueueSize (), config .eventFlushIntervalInMillis (), config .waitBeforeShutdown ());
140+ // EventClient
141+ _eventClient = EventClientImpl .create (_httpclient , _eventsRootTarget , config .eventsQueueSize (), config .eventFlushIntervalInMillis (), config .waitBeforeShutdown ());
133142
134143 // SyncManager
135- final SyncManager syncManager = SyncManagerImp .build (config .streamingEnabled (), splitSynchronizationTask , splitFetcher , segmentSynchronizationTaskImp , splitCache , config .authServiceURL (), httpclient , config .streamingServiceURL (), config .authRetryBackoffBase (), buildSSEdHttpClient (config ), segmentCache );
136- syncManager .start ();
144+ _syncManager = SyncManagerImp .build (config .streamingEnabled (), _splitSynchronizationTask , _splitFetcher , _segmentSynchronizationTaskImp , _splitCache , config .authServiceURL (), _httpclient , config .streamingServiceURL (), config .authRetryBackoffBase (), buildSSEdHttpClient (config ), _segmentCache );
145+ _syncManager .start ();
137146
138147 // Evaluator
139- final Evaluator evaluator = new EvaluatorImp (splitCache );
140-
141- destroyer = new Runnable () {
142- public void run () {
143- _log .info ("Shutdown called for split" );
144- try {
145- segmentSynchronizationTaskImp .close ();
146- _log .info ("Successful shutdown of segment fetchers" );
147- splitSynchronizationTask .close ();
148- _log .info ("Successful shutdown of splits" );
149- impressionsManager .close ();
150- _log .info ("Successful shutdown of impressions manager" );
151- uncachedFireAndForget .close ();
152- _log .info ("Successful shutdown of metrics 1" );
153- cachedFireAndForgetMetrics .close ();
154- _log .info ("Successful shutdown of metrics 2" );
155- httpclient .close ();
156- _log .info ("Successful shutdown of httpclient" );
157- eventClient .close ();
158- _log .info ("Successful shutdown of eventClient" );
159- new Thread (syncManager ::shutdown ).start ();
160- _log .info ("Successful shutdown of syncManager" );
161- } catch (IOException e ) {
162- _log .error ("We could not shutdown split" , e );
163- }
164- }
165- };
148+ _evaluator = new EvaluatorImp (_splitCache );
149+
150+ // SplitClient
151+ _client = new SplitClientImpl (this , _splitCache , _impressionsManager , _cachedFireAndForgetMetrics , _eventClient , config , _gates , _evaluator );
166152
153+ // SplitManager
154+ _manager = new SplitManagerImpl (_splitCache , config , _gates );
155+
156+ // DestroyOnShutDown
167157 if (config .destroyOnShutDown ()) {
168- Runtime .getRuntime ().addShutdownHook (new Thread () {
169- @ Override
170- public void run () {
171- // Using the full path to avoid conflicting with Thread.destroy()
172- SplitFactoryImpl .this .destroy ();
173- }
174- });
158+ Runtime .getRuntime ().addShutdownHook (new Thread (() -> {
159+ // Using the full path to avoid conflicting with Thread.destroy()
160+ SplitFactoryImpl .this .destroy ();
161+ }));
175162 }
176-
177- _client = new SplitClientImpl (this ,
178- splitCache ,
179- impressionsManager ,
180- cachedFireAndForgetMetrics ,
181- eventClient ,
182- config ,
183- gates ,
184- evaluator );
185- _manager = new SplitManagerImpl (splitCache , config , gates );
186163 }
187164
188165 @ Override
@@ -196,13 +173,31 @@ public SplitManager manager() {
196173 }
197174
198175 @ Override
199- public void destroy () {
200- synchronized (SplitFactoryImpl .class ) {
201- if (!isTerminated ) {
202- destroyer .run ();
203- _apiKeyCounter .remove (_apiToken );
204- isTerminated = true ;
176+ public synchronized void destroy () {
177+ if (!isTerminated ) {
178+ _log .info ("Shutdown called for split" );
179+ try {
180+ _segmentSynchronizationTaskImp .close ();
181+ _log .info ("Successful shutdown of segment fetchers" );
182+ _splitSynchronizationTask .close ();
183+ _log .info ("Successful shutdown of splits" );
184+ _impressionsManager .close ();
185+ _log .info ("Successful shutdown of impressions manager" );
186+ _unCachedFireAndForget .close ();
187+ _log .info ("Successful shutdown of metrics 1" );
188+ _cachedFireAndForgetMetrics .close ();
189+ _log .info ("Successful shutdown of metrics 2" );
190+ _httpclient .close ();
191+ _log .info ("Successful shutdown of httpclient" );
192+ _eventClient .close ();
193+ _log .info ("Successful shutdown of eventClient" );
194+ _syncManager .shutdown ();
195+ _log .info ("Successful shutdown of syncManager" );
196+ } catch (IOException e ) {
197+ _log .error ("We could not shutdown split" , e );
205198 }
199+ _apiKeyCounter .remove (_apiToken );
200+ isTerminated = true ;
206201 }
207202 }
208203
@@ -299,4 +294,42 @@ private static int findPollingPeriod(Random rand, int max) {
299294 int min = max / 2 ;
300295 return rand .nextInt ((max - min ) + 1 ) + min ;
301296 }
297+
298+ private SegmentSynchronizationTaskImp buildSegments (SplitClientConfig config ) throws URISyntaxException {
299+ SegmentChangeFetcher segmentChangeFetcher = HttpSegmentChangeFetcher .create (_httpclient , _rootTarget , _unCachedFireAndForget );
300+
301+ return new SegmentSynchronizationTaskImp (segmentChangeFetcher ,
302+ findPollingPeriod (RANDOM , config .segmentsRefreshRate ()),
303+ config .numThreadsForSegmentFetch (),
304+ _gates ,
305+ _segmentCache );
306+ }
307+
308+ private SplitFetcher buildSplitFetcher () throws URISyntaxException {
309+ SplitChangeFetcher splitChangeFetcher = HttpSplitChangeFetcher .create (_httpclient , _rootTarget , _unCachedFireAndForget );
310+ SplitParser splitParser = new SplitParser (_segmentSynchronizationTaskImp , _segmentCache );
311+
312+ return new SplitFetcherImp (splitChangeFetcher , splitParser , _gates , _splitCache );
313+ }
314+
315+ private ImpressionsManagerImpl buildImpressionsManager (SplitClientConfig config ) throws URISyntaxException {
316+ List <ImpressionListener > impressionListeners = new ArrayList <>();
317+ if (config .integrationsConfig () != null ) {
318+ config .integrationsConfig ().getImpressionsListeners (IntegrationsConfig .Execution .ASYNC ).stream ()
319+ .map (l -> AsynchronousImpressionListener .build (l .listener (), l .queueSize ()))
320+ .collect (Collectors .toCollection (() -> impressionListeners ));
321+
322+ config .integrationsConfig ().getImpressionsListeners (IntegrationsConfig .Execution .SYNC ).stream ()
323+ .map (IntegrationsConfig .ImpressionListenerWithMeta ::listener )
324+ .collect (Collectors .toCollection (() -> impressionListeners ));
325+ }
326+
327+ return ImpressionsManagerImpl .instance (_httpclient , config , impressionListeners );
328+ }
329+
330+ private FireAndForgetMetrics buildCachedFireAndForgetMetrics (SplitClientConfig config ) {
331+ CachedMetrics cachedMetrics = new CachedMetrics (_httpMetrics , TimeUnit .SECONDS .toMillis (config .metricsRefreshRate ()));
332+
333+ return FireAndForgetMetrics .instance (cachedMetrics , 2 , 1000 );
334+ }
302335}
0 commit comments