3838)
3939from ldclient .impl .http import HTTPFactory , _http_factory
4040from ldclient .impl .util import (
41+ _LD_ENVID_HEADER ,
42+ _LD_FD_FALLBACK_HEADER ,
4143 http_error_message ,
4244 is_http_error_recoverable ,
4345 log
5860
5961STREAMING_ENDPOINT = "/sdk/stream"
6062
61-
6263SseClientBuilder = Callable [[Config , SelectorStore ], SSEClient ]
6364
6465
@@ -146,6 +147,7 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
146147 self ._running = True
147148 self ._connection_attempt_start_time = time ()
148149
150+ envid = None
149151 for action in self ._sse .all :
150152 if isinstance (action , Fault ):
151153 # If the SSE client detects the stream has closed, then it will
@@ -154,7 +156,10 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
154156 if action .error is None :
155157 continue
156158
157- (update , should_continue ) = self ._handle_error (action .error )
159+ if action .headers is not None :
160+ envid = action .headers .get (_LD_ENVID_HEADER , envid )
161+
162+ (update , should_continue ) = self ._handle_error (action .error , envid )
158163 if update is not None :
159164 yield update
160165
@@ -163,20 +168,23 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
163168 continue
164169
165170 if isinstance (action , Start ) and action .headers is not None :
166- fallback = action .headers .get ('X-LD-FD-Fallback' ) == 'true'
171+ fallback = action .headers .get (_LD_FD_FALLBACK_HEADER ) == 'true'
172+ envid = action .headers .get (_LD_ENVID_HEADER , envid )
173+
167174 if fallback :
168175 self ._record_stream_init (True )
169176 yield Update (
170177 state = DataSourceState .OFF ,
171- revert_to_fdv1 = True
178+ revert_to_fdv1 = True ,
179+ environment_id = envid ,
172180 )
173181 break
174182
175183 if not isinstance (action , Event ):
176184 continue
177185
178186 try :
179- update = self ._process_message (action , change_set_builder )
187+ update = self ._process_message (action , change_set_builder , envid )
180188 if update is not None :
181189 self ._record_stream_init (False )
182190 self ._connection_attempt_start_time = None
@@ -187,7 +195,7 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
187195 )
188196 self ._sse .interrupt ()
189197
190- (update , should_continue ) = self ._handle_error (e )
198+ (update , should_continue ) = self ._handle_error (e , envid )
191199 if update is not None :
192200 yield update
193201 if not should_continue :
@@ -204,7 +212,7 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
204212 DataSourceErrorKind .UNKNOWN , 0 , time (), str (e )
205213 ),
206214 revert_to_fdv1 = False ,
207- environment_id = None , # TODO(sdk-1410)
215+ environment_id = envid ,
208216 )
209217
210218 self ._sse .close ()
@@ -226,7 +234,7 @@ def _record_stream_init(self, failed: bool):
226234
227235 # pylint: disable=too-many-return-statements
228236 def _process_message (
229- self , msg : Event , change_set_builder : ChangeSetBuilder
237+ self , msg : Event , change_set_builder : ChangeSetBuilder , envid : Optional [ str ]
230238 ) -> Optional [Update ]:
231239 """
232240 Processes a single message from the SSE stream and returns an Update
@@ -247,7 +255,7 @@ def _process_message(
247255 change_set_builder .expect_changes ()
248256 return Update (
249257 state = DataSourceState .VALID ,
250- environment_id = None , # TODO(sdk-1410)
258+ environment_id = envid ,
251259 )
252260 return None
253261
@@ -293,13 +301,13 @@ def _process_message(
293301 return Update (
294302 state = DataSourceState .VALID ,
295303 change_set = change_set ,
296- environment_id = None , # TODO(sdk-1410)
304+ environment_id = envid ,
297305 )
298306
299307 log .info ("Unexpected event found in stream: %s" , msg .event )
300308 return None
301309
302- def _handle_error (self , error : Exception ) -> Tuple [Optional [Update ], bool ]:
310+ def _handle_error (self , error : Exception , envid : Optional [ str ] ) -> Tuple [Optional [Update ], bool ]:
303311 """
304312 This method handles errors that occur during the streaming process.
305313
@@ -328,7 +336,7 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]:
328336 DataSourceErrorKind .INVALID_DATA , 0 , time (), str (error )
329337 ),
330338 revert_to_fdv1 = False ,
331- environment_id = None , # TODO(sdk-1410)
339+ environment_id = envid ,
332340 )
333341 return (update , True )
334342
@@ -344,11 +352,15 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]:
344352 str (error ),
345353 )
346354
347- if error .headers is not None and error .headers .get ("X-LD-FD-Fallback" ) == 'true' :
355+ if envid is None and error .headers is not None :
356+ envid = error .headers .get (_LD_ENVID_HEADER )
357+
358+ if error .headers is not None and error .headers .get (_LD_FD_FALLBACK_HEADER ) == 'true' :
348359 update = Update (
349360 state = DataSourceState .OFF ,
350361 error = error_info ,
351- revert_to_fdv1 = True
362+ revert_to_fdv1 = True ,
363+ environment_id = envid ,
352364 )
353365 return (update , False )
354366
@@ -364,7 +376,7 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]:
364376 ),
365377 error = error_info ,
366378 revert_to_fdv1 = False ,
367- environment_id = None , # TODO(sdk-1410)
379+ environment_id = envid ,
368380 )
369381
370382 if not is_recoverable :
@@ -386,7 +398,7 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]:
386398 DataSourceErrorKind .UNKNOWN , 0 , time (), str (error )
387399 ),
388400 revert_to_fdv1 = False ,
389- environment_id = None , # TODO(sdk-1410)
401+ environment_id = envid ,
390402 )
391403 # no stacktrace here because, for a typical connection error, it'll
392404 # just be a lengthy tour of urllib3 internals
@@ -411,5 +423,4 @@ def __init__(self, config: Config):
411423
412424 def build (self ) -> StreamingDataSource :
413425 """Builds a StreamingDataSource instance with the configured parameters."""
414- # TODO(fdv2): Add in the other controls here.
415426 return StreamingDataSource (self ._config )
0 commit comments