33import re
44import itertools
55import yaml
6+ import time
67
7- from splitio .api import APIException
8+ from splitio .api import APIException , FetchOptions
89from splitio .models import splits
10+ from splitio .util .backoff import Backoff
911
1012
1113_LEGACY_COMMENT_LINE_RE = re .compile (r'^#.*$' )
1517_LOGGER = logging .getLogger (__name__ )
1618
1719
20+ _ON_DEMAND_FETCH_BACKOFF_BASE = 10 # backoff base starting at 10 seconds
21+ _ON_DEMAND_FETCH_BACKOFF_MAX_WAIT = 60 # don't sleep for more than 1 minute
22+ _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES = 10
23+
24+
1825class SplitSynchronizer (object ):
1926 """Split changes synchronizer."""
2027
@@ -30,39 +37,83 @@ def __init__(self, split_api, split_storage):
3037 """
3138 self ._api = split_api
3239 self ._split_storage = split_storage
40+ self ._backoff = Backoff (
41+ _ON_DEMAND_FETCH_BACKOFF_BASE ,
42+ _ON_DEMAND_FETCH_BACKOFF_MAX_WAIT )
3343
34- def synchronize_splits (self , till = None ):
44+ def attempt_split_sync (self , fetch_options , till = None ):
3545 """
3646 Hit endpoint, update storage and return True if sync is complete.
3747
48+ :param fetch_options Fetch options for getting split definitions.
49+ :type fetch_options splitio.api.FetchOptions
50+
3851 :param till: Passed till from Streaming.
3952 :type till: int
53+
54+ :return: Flags to check if it should perform bypass or operation ended
55+ :rtype: bool, int, int
4056 """
57+ self ._backoff .reset ()
58+ remaining_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES
4159 while True :
42- change_number = self ._split_storage .get_change_number ()
43- if change_number is None :
44- change_number = - 1
45- if till is not None and till < change_number :
46- # the passed till is less than change_number, no need to perform updates
47- return
48-
49- try :
50- split_changes = self ._api .fetch_splits (change_number )
51- except APIException as exc :
52- _LOGGER .error ('Exception raised while fetching splits' )
53- _LOGGER .debug ('Exception information: ' , exc_info = True )
54- raise exc
55-
56- for split in split_changes .get ('splits' , []):
57- if split ['status' ] == splits .Status .ACTIVE .value :
58- self ._split_storage .put (splits .from_raw (split ))
59- else :
60- self ._split_storage .remove (split ['name' ])
61-
62- self ._split_storage .set_change_number (split_changes ['till' ])
63- if split_changes ['till' ] == split_changes ['since' ] \
64- and (till is None or split_changes ['till' ] >= till ):
65- return
60+ remaining_attempts -= 1
61+ while True : # Fetch until since==till
62+ change_number = self ._split_storage .get_change_number ()
63+ if change_number is None :
64+ change_number = - 1
65+ if till is not None and till < change_number :
66+ # the passed till is less than change_number, no need to perform updates
67+ break
68+
69+ try :
70+ split_changes = self ._api .fetch_splits (change_number , fetch_options )
71+ except APIException as exc :
72+ _LOGGER .error ('Exception raised while fetching splits' )
73+ _LOGGER .debug ('Exception information: ' , exc_info = True )
74+ raise exc
75+
76+ for split in split_changes .get ('splits' , []):
77+ if split ['status' ] == splits .Status .ACTIVE .value :
78+ self ._split_storage .put (splits .from_raw (split ))
79+ else :
80+ self ._split_storage .remove (split ['name' ])
81+
82+ self ._split_storage .set_change_number (split_changes ['till' ])
83+ if split_changes ['till' ] == split_changes ['since' ]:
84+ break
85+
86+ if till is None or till <= change_number :
87+ return True , remaining_attempts , change_number
88+ elif remaining_attempts <= 0 :
89+ return False , remaining_attempts , change_number
90+ how_long = self ._backoff .get ()
91+ time .sleep (how_long )
92+
93+ def synchronize_splits (self , till = None ):
94+ """
95+ Hit endpoint, update storage and return True if sync is complete.
96+
97+ :param till: Passed till from Streaming.
98+ :type till: int
99+ """
100+ fetch_options = FetchOptions (True ) # Set Cache-Control to no-cache
101+ successful_sync , remaining_attempts , change_number = self .attempt_split_sync (fetch_options ,
102+ till )
103+ attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
104+ if successful_sync : # succedeed sync
105+ _LOGGER .debug ('Refresh completed in %s attempts.' , attempts )
106+ return
107+ with_cdn_bypass = FetchOptions (True , change_number ) # Set flag for bypassing CDN
108+ without_cdn_successful_sync , remaining_attempts , change_number = self .attempt_split_sync (with_cdn_bypass , till )
109+ without_cdn_attempts = _ON_DEMAND_FETCH_BACKOFF_MAX_RETRIES - remaining_attempts
110+ if without_cdn_successful_sync :
111+ _LOGGER .debug ('Refresh completed bypassing the CDN in %s attempts.' ,
112+ without_cdn_attempts )
113+ return
114+ else :
115+ _LOGGER .debug ('No changes fetched after %s attempts with CDN bypassed.' ,
116+ without_cdn_attempts )
66117
67118 def kill_split (self , split_name , default_treatment , change_number ):
68119 """
0 commit comments