1- """
2- Default implementation of the streaming component.
3- """
4- # currently excluded from documentation - see docs/README.md
5-
61from collections import namedtuple
7-
82import json
93from threading import Thread
10-
11- import logging
124import time
135
146from ldclient .impl .http import HTTPFactory , _http_factory
15- from ldclient .impl .retry_delay import RetryDelayStrategy , DefaultBackoffStrategy , DefaultJitterStrategy
16- from ldclient .impl .sse import SSEClient
17- from ldclient .impl .util import log , UnsuccessfulResponseException , http_error_message , is_http_error_recoverable
7+ from ldclient .impl .util import http_error_message , is_http_error_recoverable , log
188from ldclient .interfaces import UpdateProcessor
199from ldclient .versioned_data_kind import FEATURES , SEGMENTS
2010
11+ from ld_eventsource import SSEClient
12+ from ld_eventsource .actions import Event , Fault
13+ from ld_eventsource .config import ConnectStrategy , ErrorStrategy , RetryDelayStrategy
14+ from ld_eventsource .errors import HTTPStatusError
15+
2116# allows for up to 5 minutes to elapse without any data sent across the stream. The heartbeats sent as comments on the
2217# stream will keep this from triggering
2318stream_read_timeout = 5 * 60
@@ -41,79 +36,59 @@ def __init__(self, config, store, ready, diagnostic_accumulator):
4136 self ._running = False
4237 self ._ready = ready
4338 self ._diagnostic_accumulator = diagnostic_accumulator
44- self ._es_started = None
45- self ._retry_delay = RetryDelayStrategy (
46- config .initial_reconnect_delay ,
47- BACKOFF_RESET_INTERVAL ,
48- DefaultBackoffStrategy (MAX_RETRY_DELAY ),
49- DefaultJitterStrategy (JITTER_RATIO ))
50-
51- # We need to suppress the default logging behavior of the backoff package, because
52- # it logs messages at ERROR level with variable content (the delay time) which will
53- # prevent monitors from coalescing multiple messages. The backoff package attempts
54- # to suppress its own output by default by giving the logger a NullHandler, but it
55- # will still propagate up to the root logger unless we do this:
56- logging .getLogger ('backoff' ).propagate = False
57-
58- # Retry/backoff logic:
59- # Upon any error establishing the stream connection we retry with backoff + jitter.
60- # Upon any error processing the results of the stream we reconnect after one second.
39+ self ._connection_attempt_start_time = None
40+
6141 def run (self ):
6242 log .info ("Starting StreamingUpdateProcessor connecting to uri: " + self ._uri )
6343 self ._running = True
64- attempts = 0
65- while self ._running :
66- if attempts > 0 :
67- delay = self ._retry_delay .next_retry_delay (time .time ())
68- log .info ("Will reconnect after delay of %fs" % delay )
69- time .sleep (delay )
70- attempts += 1
71- try :
72- self ._es_started = int (time .time () * 1000 )
73- messages = self ._connect ()
74- for msg in messages :
75- if not self ._running :
76- break
77- self ._retry_delay .set_good_since (time .time ())
78- message_ok = self .process_message (self ._store , msg )
79- if message_ok :
80- self ._record_stream_init (False )
81- self ._es_started = None
82- if message_ok is True and self ._ready .is_set () is False :
44+ self ._sse = self ._create_sse_client ()
45+ self ._connection_attempt_start_time = time .time ()
46+ for action in self ._sse .all :
47+ if isinstance (action , Event ):
48+ try :
49+ message_ok = self ._process_message (action )
50+ except Exception as e :
51+ log .info ("Error while handling stream event; will restart stream: %s" % e )
52+ self ._sse .interrupt ()
53+ if message_ok :
54+ self ._record_stream_init (False )
55+ self ._connection_attempt_start_time = None
56+ if not self ._ready .is_set ():
8357 log .info ("StreamingUpdateProcessor initialized ok." )
8458 self ._ready .set ()
85- except UnsuccessfulResponseException as e :
86- self ._record_stream_init (True )
87- self ._es_started = None
88-
89- http_error_message_result = http_error_message (e .status , "stream connection" )
90- if is_http_error_recoverable (e .status ):
91- log .warning (http_error_message_result )
92- else :
93- log .error (http_error_message_result )
94- self ._ready .set () # if client is initializing, make it stop waiting; has no effect if already inited
95- self .stop ()
59+ elif isinstance (action , Fault ):
60+ if not self ._handle_error (action .error ):
9661 break
97- except Exception as e :
98- log .warning ("Unexpected error on stream connection: %s, will retry" % e )
99- self ._record_stream_init (True )
100- self ._es_started = None
101- # no stacktrace here because, for a typical connection error, it'll just be a lengthy tour of urllib3 internals
102-
103- def _record_stream_init (self , failed ):
104- if self ._diagnostic_accumulator and self ._es_started :
62+ self ._sse .close ()
63+
64+ def _record_stream_init (self , failed : bool ):
65+ if self ._diagnostic_accumulator and self ._connection_attempt_start_time :
10566 current_time = int (time .time () * 1000 )
106- self ._diagnostic_accumulator .record_stream_init (current_time , current_time - self ._es_started , failed )
67+ elapsed = current_time - int (self ._connection_attempt_start_time * 1000 )
68+ self ._diagnostic_accumulator .record_stream_init (current_time , elapsed if elapsed >= 0 else 0 , failed )
10769
108- def _connect (self ):
70+ def _create_sse_client (self ) -> SSEClient :
10971 # We don't want the stream to use the same read timeout as the rest of the SDK.
11072 http_factory = _http_factory (self ._config )
111- stream_http_factory = HTTPFactory (http_factory .base_headers , http_factory .http_config , override_read_timeout = stream_read_timeout )
112- client = SSEClient (
113- self ._uri ,
114- http_factory = stream_http_factory
73+ stream_http_factory = HTTPFactory (http_factory .base_headers , http_factory .http_config ,
74+ override_read_timeout = stream_read_timeout )
75+ return SSEClient (
76+ connect = ConnectStrategy .http (
77+ url = self ._uri ,
78+ headers = http_factory .base_headers ,
79+ pool = stream_http_factory .create_pool_manager (1 , self ._uri ),
80+ urllib3_request_options = {"timeout" : stream_http_factory .timeout }
81+ ),
82+ error_strategy = ErrorStrategy .always_continue (), # we'll make error-handling decisions when we see a Fault
83+ initial_retry_delay = self ._config .initial_reconnect_delay ,
84+ retry_delay_strategy = RetryDelayStrategy .default (
85+ max_delay = MAX_RETRY_DELAY ,
86+ backoff_multiplier = 2 ,
87+ jitter_multiplier = JITTER_RATIO
88+ ),
89+ retry_delay_reset_threshold = BACKOFF_RESET_INTERVAL ,
90+ logger = log
11591 )
116- return client .events
11792
11893 def stop (self ):
11994 log .info ("Stopping StreamingUpdateProcessor" )
@@ -123,8 +98,7 @@ def initialized(self):
12398 return self ._running and self ._ready .is_set () is True and self ._store .initialized is True
12499
125100 # Returns True if we initialized the feature store
126- @staticmethod
127- def process_message (store , msg ):
101+ def _process_message (self , msg : Event ) -> bool :
128102 if msg .event == 'put' :
129103 all_data = json .loads (msg .data )
130104 init_data = {
@@ -133,7 +107,7 @@ def process_message(store, msg):
133107 }
134108 log .debug ("Received put event with %d flags and %d segments" ,
135109 len (init_data [FEATURES ]), len (init_data [SEGMENTS ]))
136- store .init (init_data )
110+ self . _store .init (init_data )
137111 return True
138112 elif msg .event == 'patch' :
139113 payload = json .loads (msg .data )
@@ -142,7 +116,7 @@ def process_message(store, msg):
142116 log .debug ("Received patch event for %s, New version: [%d]" , path , obj .get ("version" ))
143117 target = StreamingUpdateProcessor ._parse_path (path )
144118 if target is not None :
145- store .upsert (target .kind , obj )
119+ self . _store .upsert (target .kind , obj )
146120 else :
147121 log .warning ("Patch for unknown path: %s" , path )
148122 elif msg .event == 'delete' :
@@ -153,15 +127,39 @@ def process_message(store, msg):
153127 log .debug ("Received delete event for %s, New version: [%d]" , path , version )
154128 target = StreamingUpdateProcessor ._parse_path (path )
155129 if target is not None :
156- store .delete (target .kind , target .key , version )
130+ self . _store .delete (target .kind , target .key , version )
157131 else :
158132 log .warning ("Delete for unknown path: %s" , path )
159133 else :
160134 log .warning ('Unhandled event in stream processor: ' + msg .event )
161135 return False
162136
137+ # Returns true to continue, false to stop
138+ def _handle_error (self , error : Exception ) -> bool :
139+ if not self ._running :
140+ return False # don't retry if we've been deliberately stopped
141+ if isinstance (error , HTTPStatusError ):
142+ self ._record_stream_init (True )
143+ self ._connection_attempt_start_time = None
144+
145+ http_error_message_result = http_error_message (error .status , "stream connection" )
146+ if not is_http_error_recoverable (error .status ):
147+ log .error (http_error_message_result )
148+ self ._ready .set () # if client is initializing, make it stop waiting; has no effect if already inited
149+ self .stop ()
150+ return False
151+ else :
152+ log .warning (http_error_message_result )
153+ else :
154+ log .warning ("Unexpected error on stream connection: %s, will retry" % error )
155+ self ._record_stream_init (True )
156+ self ._connection_attempt_start_time = None
157+ # no stacktrace here because, for a typical connection error, it'll just be a lengthy tour of urllib3 internals
158+ self ._connection_attempt_start_time = time .time () + self ._sse .next_retry_delay
159+ return True
160+
163161 @staticmethod
164- def _parse_path (path ):
162+ def _parse_path (path : str ):
165163 for kind in [FEATURES , SEGMENTS ]:
166164 if path .startswith (kind .stream_api_path ):
167165 return ParsedPath (kind = kind , key = path [len (kind .stream_api_path ):])
@@ -170,6 +168,6 @@ def _parse_path(path):
170168 # magic methods for "with" statement (used in testing)
171169 def __enter__ (self ):
172170 return self
173-
171+
174172 def __exit__ (self , type , value , traceback ):
175173 self .stop ()
0 commit comments