|
12 | 12 | from splitio.models.telemetry import UpdateFromSSE |
13 | 13 | from splitio.push.parser import UpdateType |
14 | 14 | from splitio.optional.loaders import asyncio |
15 | | - |
| 15 | +from splitio.util.storage_helper import update_feature_flag_storage, update_feature_flag_storage_async |
16 | 16 |
|
17 | 17 | _LOGGER = logging.getLogger(__name__) |
18 | 18 |
|
@@ -218,17 +218,13 @@ def _run(self): |
218 | 218 | try: |
219 | 219 | if self._check_instant_ff_update(event): |
220 | 220 | try: |
221 | | - new_split = from_raw(json.loads(self._get_feature_flag_definition(event))) |
222 | | - if new_split.status == Status.ACTIVE: |
223 | | - self._feature_flag_storage.put(new_split) |
224 | | - _LOGGER.debug('Feature flag %s is updated', new_split.name) |
225 | | - for segment_name in new_split.get_segment_names(): |
226 | | - if self._segment_storage.get(segment_name) is None: |
227 | | - _LOGGER.debug('Fetching new segment %s', segment_name) |
228 | | - self._segment_handler(segment_name, event.change_number) |
229 | | - else: |
230 | | - self._feature_flag_storage.remove(new_split.name) |
231 | | - self._feature_flag_storage.set_change_number(event.change_number) |
| 221 | + new_feature_flag = from_raw(json.loads(self._get_feature_flag_definition(event))) |
| 222 | + segment_list = update_feature_flag_storage(self._feature_flag_storage, [new_feature_flag], event.change_number) |
| 223 | + for segment_name in segment_list: |
| 224 | + if self._segment_storage.get(segment_name) is None: |
| 225 | + _LOGGER.debug('Fetching new segment %s', segment_name) |
| 226 | + self._segment_handler(segment_name, event.change_number) |
| 227 | + |
232 | 228 | self._telemetry_runtime_producer.record_update_from_sse(UpdateFromSSE.SPLIT_UPDATE) |
233 | 229 | continue |
234 | 230 | except Exception as e: |
@@ -318,17 +314,13 @@ async def _run(self): |
318 | 314 | try: |
319 | 315 | if await self._check_instant_ff_update(event): |
320 | 316 | try: |
321 | | - new_split = from_raw(json.loads(self._get_feature_flag_definition(event))) |
322 | | - if new_split.status == Status.ACTIVE: |
323 | | - await self._feature_flag_storage.put(new_split) |
324 | | - _LOGGER.debug('Feature flag %s is updated', new_split.name) |
325 | | - for segment_name in new_split.get_segment_names(): |
326 | | - if await self._segment_storage.get(segment_name) is None: |
327 | | - _LOGGER.debug('Fetching new segment %s', segment_name) |
328 | | - await self._segment_handler(segment_name, event.change_number) |
329 | | - else: |
330 | | - await self._feature_flag_storage.remove(new_split.name) |
331 | | - await self._feature_flag_storage.set_change_number(event.change_number) |
| 317 | + new_feature_flag = from_raw(json.loads(self._get_feature_flag_definition(event))) |
| 318 | + segment_list = await update_feature_flag_storage_async(self._feature_flag_storage, [new_feature_flag], event.change_number) |
| 319 | + for segment_name in segment_list: |
| 320 | + if await self._segment_storage.get(segment_name) is None: |
| 321 | + _LOGGER.debug('Fetching new segment %s', segment_name) |
| 322 | + await self._segment_handler(segment_name, event.change_number) |
| 323 | + |
332 | 324 | await self._telemetry_runtime_producer.record_update_from_sse(UpdateFromSSE.SPLIT_UPDATE) |
333 | 325 | continue |
334 | 326 | except Exception as e: |
|
0 commit comments