Skip to content

Commit a75cd87

Browse files
authored
Merge pull request #497 from splitio/async-flagsets-push-splitworker
updated push splitworker
2 parents bf48da8 + 27f90c6 commit a75cd87

File tree

2 files changed

+113
-119
lines changed

2 files changed

+113
-119
lines changed

splitio/push/workers.py

Lines changed: 15 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from splitio.models.telemetry import UpdateFromSSE
1313
from splitio.push.parser import UpdateType
1414
from splitio.optional.loaders import asyncio
15-
15+
from splitio.util.storage_helper import update_feature_flag_storage, update_feature_flag_storage_async
1616

1717
_LOGGER = logging.getLogger(__name__)
1818

@@ -219,17 +219,13 @@ def _run(self):
219219
try:
220220
if self._check_instant_ff_update(event):
221221
try:
222-
new_split = from_raw(json.loads(self._get_feature_flag_definition(event)))
223-
if new_split.status == Status.ACTIVE:
224-
self._feature_flag_storage.put(new_split)
225-
_LOGGER.debug('Feature flag %s is updated', new_split.name)
226-
for segment_name in new_split.get_segment_names():
227-
if self._segment_storage.get(segment_name) is None:
228-
_LOGGER.debug('Fetching new segment %s', segment_name)
229-
self._segment_handler(segment_name, event.change_number)
230-
else:
231-
self._feature_flag_storage.remove(new_split.name)
232-
self._feature_flag_storage.set_change_number(event.change_number)
222+
new_feature_flag = from_raw(json.loads(self._get_feature_flag_definition(event)))
223+
segment_list = update_feature_flag_storage(self._feature_flag_storage, [new_feature_flag], event.change_number)
224+
for segment_name in segment_list:
225+
if self._segment_storage.get(segment_name) is None:
226+
_LOGGER.debug('Fetching new segment %s', segment_name)
227+
self._segment_handler(segment_name, event.change_number)
228+
233229
self._telemetry_runtime_producer.record_update_from_sse(UpdateFromSSE.SPLIT_UPDATE)
234230
continue
235231
except Exception as e:
@@ -312,17 +308,13 @@ async def _run(self):
312308
try:
313309
if await self._check_instant_ff_update(event):
314310
try:
315-
new_split = from_raw(json.loads(self._get_feature_flag_definition(event)))
316-
if new_split.status == Status.ACTIVE:
317-
await self._feature_flag_storage.put(new_split)
318-
_LOGGER.debug('Feature flag %s is updated', new_split.name)
319-
for segment_name in new_split.get_segment_names():
320-
if await self._segment_storage.get(segment_name) is None:
321-
_LOGGER.debug('Fetching new segment %s', segment_name)
322-
await self._segment_handler(segment_name, event.change_number)
323-
else:
324-
await self._feature_flag_storage.remove(new_split.name)
325-
await self._feature_flag_storage.set_change_number(event.change_number)
311+
new_feature_flag = from_raw(json.loads(self._get_feature_flag_definition(event)))
312+
segment_list = await update_feature_flag_storage_async(self._feature_flag_storage, [new_feature_flag], event.change_number)
313+
for segment_name in segment_list:
314+
if await self._segment_storage.get(segment_name) is None:
315+
_LOGGER.debug('Fetching new segment %s', segment_name)
316+
await self._segment_handler(segment_name, event.change_number)
317+
326318
await self._telemetry_runtime_producer.record_update_from_sse(UpdateFromSSE.SPLIT_UPDATE)
327319
continue
328320
except Exception as e:

0 commit comments

Comments
 (0)