@@ -103,10 +103,10 @@ private static class SyncResult {
103103 private final int _remainingAttempts ;
104104 }
105105
106- private SyncResult attemptSync (long targetChangeNumber ,
107- FetchOptions opts ,
108- Function <Void , Long > nextWaitMs ,
109- int maxRetries ) {
106+ private SyncResult attemptSplitsSync (long targetChangeNumber ,
107+ FetchOptions opts ,
108+ Function <Void , Long > nextWaitMs ,
109+ int maxRetries ) {
110110 int remainingAttempts = maxRetries ;
111111 while (true ) {
112112 remainingAttempts --;
@@ -126,9 +126,9 @@ private SyncResult attemptSync(long targetChangeNumber,
126126 }
127127 }
128128
129- private void logCdnHeaders (int maxRetries , int remainingAttempts , List <Map <String , String >> headers ) {
129+ private void logCdnHeaders (String prefix , int maxRetries , int remainingAttempts , List <Map <String , String >> headers ) {
130130 if (maxRetries - remainingAttempts > _failedAttemptsBeforeLogging ) {
131- _log .info (String .format ("CDN Debug headers: %s" , gson .toJson (headers )));
131+ _log .info (String .format ("%s: CDN Debug headers: %s" , prefix , gson .toJson (headers )));
132132 }
133133 }
134134
@@ -146,22 +146,22 @@ public void refreshSplits(long targetChangeNumber) {
146146 .responseHeadersCallback (_cdnResponseHeadersLogging ? captor ::handle : null )
147147 .build ();
148148
149- SyncResult regularResult = attemptSync (targetChangeNumber , opts ,
149+ SyncResult regularResult = attemptSplitsSync (targetChangeNumber , opts ,
150150 (discard ) -> (long ) _onDemandFetchRetryDelayMs , _onDemandFetchMaxRetries );
151151
152152 int attempts = _onDemandFetchMaxRetries - regularResult .remainingAttempts ();
153153 if (regularResult .success ()) {
154154 _log .debug (String .format ("Refresh completed in %s attempts." , attempts ));
155155 if (_cdnResponseHeadersLogging ) {
156- logCdnHeaders (_onDemandFetchMaxRetries , regularResult .remainingAttempts (), captor .get ());
156+ logCdnHeaders ("[splits]" , _onDemandFetchMaxRetries , regularResult .remainingAttempts (), captor .get ());
157157 }
158158 return ;
159159 }
160160
161161 _log .info (String .format ("No changes fetched after %s attempts. Will retry bypassing CDN." , attempts ));
162162 FetchOptions withCdnBypass = new FetchOptions .Builder (opts ).targetChangeNumber (targetChangeNumber ).build ();
163163 Backoff backoff = new Backoff (ON_DEMAND_FETCH_BACKOFF_BASE_MS , ON_DEMAND_FETCH_BACKOFF_MAX_WAIT_MS );
164- SyncResult withCDNBypassed = attemptSync (targetChangeNumber , withCdnBypass ,
164+ SyncResult withCDNBypassed = attemptSplitsSync (targetChangeNumber , withCdnBypass ,
165165 (discard ) -> backoff .interval (), ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES );
166166
167167 int withoutCDNAttempts = ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - withCDNBypassed ._remainingAttempts ;
@@ -172,7 +172,7 @@ public void refreshSplits(long targetChangeNumber) {
172172 }
173173
174174 if (_cdnResponseHeadersLogging ) {
175- logCdnHeaders (_onDemandFetchMaxRetries + ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES ,
175+ logCdnHeaders ("[splits]" , _onDemandFetchMaxRetries + ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES ,
176176 withCDNBypassed .remainingAttempts (), captor .get ());
177177 }
178178 }
@@ -185,19 +185,76 @@ public void localKillSplit(String splitName, String defaultTreatment, long newCh
185185 }
186186 }
187187
188- @ Override
189- public void refreshSegment (String segmentName , long changeNumber ) {
190- int retries = 1 ;
191- while (changeNumber > _segmentCache .getChangeNumber (segmentName ) && retries <= _onDemandFetchMaxRetries ) {
192- SegmentFetcher fetcher = _segmentSynchronizationTaskImp .getFetcher (segmentName );
193- try {
194- fetcher .fetch (true );
188+ public SyncResult attemptSegmentSync (String segmentName ,
189+ long targetChangeNumber ,
190+ FetchOptions opts ,
191+ Function <Void , Long > nextWaitMs ,
192+ int maxRetries ) {
193+
194+ int remainingAttempts = maxRetries ;
195+ SegmentFetcher fetcher = _segmentSynchronizationTaskImp .getFetcher (segmentName );
196+ checkNotNull (fetcher );
197+
198+ while (true ) {
199+ remainingAttempts --;
200+ fetcher .fetch (opts );
201+ if (targetChangeNumber <= _segmentCache .getChangeNumber (segmentName )) {
202+ return new SyncResult (true , remainingAttempts );
203+ } else if (remainingAttempts <= 0 ) {
204+ return new SyncResult (false , remainingAttempts );
195205 }
196- //We are sure this will never happen because getFetcher firts initiate the segment. This try/catch is for safe only.
197- catch (NullPointerException np ){
198- throw new NullPointerException ();
206+ try {
207+ long howLong = nextWaitMs .apply (null );
208+ Thread .sleep (howLong );
209+ } catch (InterruptedException e ) {
210+ Thread .currentThread ().interrupt ();
211+ _log .debug ("Error trying to sleep current Thread." );
212+ }
213+ }
214+ }
215+
216+ @ Override
217+ public void refreshSegment (String segmentName , long targetChangeNumber ) {
218+
219+ if (targetChangeNumber <= _segmentCache .getChangeNumber (segmentName )) {
220+ return ;
221+ }
222+
223+ FastlyHeadersCaptor captor = new FastlyHeadersCaptor ();
224+ FetchOptions opts = new FetchOptions .Builder ()
225+ .cacheControlHeaders (true )
226+ .fastlyDebugHeader (_cdnResponseHeadersLogging )
227+ .responseHeadersCallback (_cdnResponseHeadersLogging ? captor ::handle : null )
228+ .build ();
229+
230+ SyncResult regularResult = attemptSegmentSync (segmentName , targetChangeNumber , opts ,
231+ (discard ) -> (long ) _onDemandFetchRetryDelayMs , _onDemandFetchMaxRetries );
232+
233+ int attempts = _onDemandFetchMaxRetries - regularResult .remainingAttempts ();
234+ if (regularResult .success ()) {
235+ _log .debug (String .format ("Segment %s refresh completed in %s attempts." , segmentName , attempts ));
236+ if (_cdnResponseHeadersLogging ) {
237+ logCdnHeaders (String .format ("[segment/%s]" , segmentName ), _onDemandFetchMaxRetries , regularResult .remainingAttempts (), captor .get ());
199238 }
200- retries ++;
239+ return ;
240+ }
241+
242+ _log .info (String .format ("No changes fetched for segment %s after %s attempts. Will retry bypassing CDN." , segmentName , attempts ));
243+ FetchOptions withCdnBypass = new FetchOptions .Builder (opts ).targetChangeNumber (targetChangeNumber ).build ();
244+ Backoff backoff = new Backoff (ON_DEMAND_FETCH_BACKOFF_BASE_MS , ON_DEMAND_FETCH_BACKOFF_MAX_WAIT_MS );
245+ SyncResult withCDNBypassed = attemptSegmentSync (segmentName , targetChangeNumber , withCdnBypass ,
246+ (discard ) -> backoff .interval (), ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES );
247+
248+ int withoutCDNAttempts = ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - withCDNBypassed ._remainingAttempts ;
249+ if (withCDNBypassed .success ()) {
250+ _log .debug (String .format ("Segment %s refresh completed bypassing the CDN in %s attempts." , segmentName , withoutCDNAttempts ));
251+ } else {
252+ _log .debug (String .format ("No changes fetched for segment %s after %s attempts with CDN bypassed." , segmentName , withoutCDNAttempts ));
253+ }
254+
255+ if (_cdnResponseHeadersLogging ) {
256+ logCdnHeaders (String .format ("[segment/%s]" , segmentName ), _onDemandFetchMaxRetries + ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES ,
257+ withCDNBypassed .remainingAttempts (), captor .get ());
201258 }
202259 }
203260}
0 commit comments