@@ -119,6 +119,23 @@ def _get_headers(self, extra_headers, sdk_key):
119119 headers .update (extra_headers )
120120 return headers
121121
122+ def _record_telemetry (self , status_code , elapsed ):
123+ """
124+ Record Telemetry info
125+
126+ :param status_code: http request status code
127+ :type status_code: int
128+
129+ :param elapsed: response time elapsed.
130+ :type status_code: int
131+ """
132+ self ._telemetry_runtime_producer .record_sync_latency (self ._metric_name , elapsed )
133+ if 200 <= status_code < 300 :
134+ self ._telemetry_runtime_producer .record_successful_sync (self ._metric_name , get_current_epoch_time_ms ())
135+ return
136+
137+ self ._telemetry_runtime_producer .record_sync_error (self ._metric_name , status_code )
138+
122139class HttpClient (HttpClientBase ):
123140 """HttpClient wrapper."""
124141
@@ -140,7 +157,6 @@ def __init__(self, timeout=None, sdk_url=None, events_url=None, auth_url=None, t
140157 _LOGGER .debug ("Initializing httpclient" )
141158 self ._timeout = timeout / 1000 if timeout else None # Convert ms to seconds.
142159 self ._urls = _construct_urls (sdk_url , events_url , auth_url , telemetry_url )
143- self ._lock = threading .RLock ()
144160
145161 def get (self , server , path , sdk_key , query = None , extra_headers = None ): # pylint: disable=too-many-arguments
146162 """
@@ -208,23 +224,6 @@ def post(self, server, path, sdk_key, body, query=None, extra_headers=None): #
208224 except Exception as exc : # pylint: disable=broad-except
209225 raise HttpClientException (_EXC_MSG .format (source = 'request' )) from exc
210226
211- def _record_telemetry (self , status_code , elapsed ):
212- """
213- Record Telemetry info
214-
215- :param status_code: http request status code
216- :type status_code: int
217-
218- :param elapsed: response time elapsed.
219- :type status_code: int
220- """
221- self ._telemetry_runtime_producer .record_sync_latency (self ._metric_name , elapsed )
222- if 200 <= status_code < 300 :
223- self ._telemetry_runtime_producer .record_successful_sync (self ._metric_name , get_current_epoch_time_ms ())
224- return
225-
226- self ._telemetry_runtime_producer .record_sync_error (self ._metric_name , status_code )
227-
228227class HttpClientAsync (HttpClientBase ):
229228 """HttpClientAsync wrapper."""
230229
@@ -350,7 +349,7 @@ async def close_session(self):
350349 if not self ._session .closed :
351350 await self ._session .close ()
352351
353- class HttpClientKerberos (HttpClient ):
352+ class HttpClientKerberos (HttpClientBase ):
354353 """HttpClient wrapper."""
355354
356355 def __init__ (self , timeout = None , sdk_url = None , events_url = None , auth_url = None , telemetry_url = None , authentication_scheme = None , authentication_params = None ):
@@ -367,11 +366,22 @@ def __init__(self, timeout=None, sdk_url=None, events_url=None, auth_url=None, t
367366 :type auth_url: str
368367 :param telemetry_url: Optional alternative telemetry URL.
369368 :type telemetry_url: str
369+ :param authentication_scheme: Optional authentication scheme to use.
370+ :type authentication_scheme: splitio.client.config.AuthenticateScheme
371+ :param authentication_params: Optional authentication username and password to use.
372+ :type authentication_params: [str, str]
370373 """
371374 _LOGGER .debug ("Initializing httpclient for Kerberos auth" )
372- HttpClient .__init__ (self , timeout = timeout , sdk_url = sdk_url , events_url = events_url , auth_url = auth_url , telemetry_url = telemetry_url )
375+ self ._timeout = timeout / 1000 if timeout else None # Convert ms to seconds.
376+ self ._urls = _construct_urls (sdk_url , events_url , auth_url , telemetry_url )
373377 self ._authentication_scheme = authentication_scheme
374378 self ._authentication_params = authentication_params
379+ self ._lock = threading .RLock ()
380+ self ._sessions = {'sdk' : requests .Session (),
381+ 'events' : requests .Session (),
382+ 'auth' : requests .Session (),
383+ 'telemetry' : requests .Session ()}
384+ self ._set_authentication ()
375385
376386 def get (self , server , path , sdk_key , query = None , extra_headers = None ): # pylint: disable=too-many-arguments
377387 """
@@ -392,21 +402,49 @@ def get(self, server, path, sdk_key, query=None, extra_headers=None): # pylint:
392402 """
393403 with self ._lock :
394404 start = get_current_epoch_time_ms ()
395- with requests .Session () as session :
396- self ._set_authentication (session )
405+ try :
406+ return self ._do_get (server , path , sdk_key , query , extra_headers , start )
407+
408+ except requests .exceptions .ProxyError as exc :
409+ _LOGGER .debug ("Proxy Exception caught, resetting the http session" )
410+ self ._sessions [server ].close ()
411+ self ._sessions [server ] = requests .Session ()
412+ self ._set_authentication (server_name = server )
397413 try :
398- response = session .get (
399- _build_url (server , path , self ._urls ),
400- headers = self ._get_headers (extra_headers , sdk_key ),
401- params = query ,
402- timeout = self ._timeout
403- )
404- self ._record_telemetry (response .status_code , get_current_epoch_time_ms () - start )
405- return HttpResponse (response .status_code , response .text , response .headers )
406-
407- except Exception as exc : # pylint: disable=broad-except
414+ return self ._do_get (server , path , sdk_key , query , extra_headers , start )
415+
416+ except Exception as exc :
408417 raise HttpClientException (_EXC_MSG .format (source = 'request' )) from exc
409418
419+ except Exception as exc : # pylint: disable=broad-except
420+ raise HttpClientException (_EXC_MSG .format (source = 'request' )) from exc
421+
422+ def _do_get (self , server , path , sdk_key , query , extra_headers , start ):
423+ """
424+ Issue a get request.
425+ :param server: Whether the request is for SDK server, Events server or Auth server.
426+ :typee server: str
427+ :param path: path to append to the host url.
428+ :type path: str
429+ :param sdk_key: sdk key.
430+ :type sdk_key: str
431+ :param query: Query string passed as dictionary.
432+ :type query: dict
433+ :param extra_headers: key/value pairs of possible extra headers.
434+ :type extra_headers: dict
435+
436+ :return: Tuple of status_code & response text
437+ :rtype: HttpResponse
438+ """
439+ with self ._sessions [server ].get (
440+ _build_url (server , path , self ._urls ),
441+ headers = self ._get_headers (extra_headers , sdk_key ),
442+ params = query ,
443+ timeout = self ._timeout
444+ ) as response :
445+ self ._record_telemetry (response .status_code , get_current_epoch_time_ms () - start )
446+ return HttpResponse (response .status_code , response .text , response .headers )
447+
410448 def post (self , server , path , sdk_key , body , query = None , extra_headers = None ): # pylint: disable=too-many-arguments
411449 """
412450 Issue a POST request.
@@ -429,31 +467,72 @@ def post(self, server, path, sdk_key, body, query=None, extra_headers=None): #
429467 """
430468 with self ._lock :
431469 start = get_current_epoch_time_ms ()
432- with requests .Session () as session :
433- self ._set_authentication (session )
470+ try :
471+ return self ._do_post (server , path , sdk_key , query , extra_headers , body , start )
472+
473+ except requests .exceptions .ProxyError as exc :
474+ _LOGGER .debug ("Proxy Exception caught, resetting the http session" )
475+ self ._sessions [server ].close ()
476+ self ._sessions [server ] = requests .Session ()
477+ self ._set_authentication (server_name = server )
434478 try :
435- response = session .post (
436- _build_url (server , path , self ._urls ),
437- params = query ,
438- headers = self ._get_headers (extra_headers , sdk_key ),
439- json = body ,
440- timeout = self ._timeout ,
441- )
442- self ._record_telemetry (response .status_code , get_current_epoch_time_ms () - start )
443- return HttpResponse (response .status_code , response .text , response .headers )
444- except Exception as exc : # pylint: disable=broad-except
479+ return self ._do_post (server , path , sdk_key , query , extra_headers , body , start )
480+
481+ except Exception as exc :
445482 raise HttpClientException (_EXC_MSG .format (source = 'request' )) from exc
446483
447- def _set_authentication (self , session ):
448- if self ._authentication_scheme == AuthenticateScheme .KERBEROS_SPNEGO :
449- _LOGGER .debug ("Using Kerberos Spnego Authentication" )
450- if self ._authentication_params != [None , None ]:
451- session .auth = HTTPKerberosAuth (principal = self ._authentication_params [0 ], password = self ._authentication_params [1 ], mutual_authentication = OPTIONAL )
452- else :
453- session .auth = HTTPKerberosAuth (mutual_authentication = OPTIONAL )
454- elif self ._authentication_scheme == AuthenticateScheme .KERBEROS_PROXY :
455- _LOGGER .debug ("Using Kerberos Proxy Authentication" )
456- if self ._authentication_params != [None , None ]:
457- session .mount ('https://' , HTTPAdapterWithProxyKerberosAuth (principal = self ._authentication_params [0 ], password = self ._authentication_params [1 ]))
458- else :
459- session .mount ('https://' , HTTPAdapterWithProxyKerberosAuth ())
484+ except Exception as exc : # pylint: disable=broad-except
485+ raise HttpClientException (_EXC_MSG .format (source = 'request' )) from exc
486+
487+ def _do_post (self , server , path , sdk_key , query , extra_headers , body , start ):
488+ """
489+ Issue a POST request.
490+
491+ :param server: Whether the request is for SDK server or Events server.
492+ :typee server: str
493+ :param path: path to append to the host url.
494+ :type path: str
495+ :param sdk_key: sdk key.
496+ :type sdk_key: str
497+ :param body: body sent in the request.
498+ :type body: str
499+ :param query: Query string passed as dictionary.
500+ :type query: dict
501+ :param extra_headers: key/value pairs of possible extra headers.
502+ :type extra_headers: dict
503+
504+ :return: Tuple of status_code & response text
505+ :rtype: HttpResponse
506+ """
507+ with self ._sessions [server ].post (
508+ _build_url (server , path , self ._urls ),
509+ params = query ,
510+ headers = self ._get_headers (extra_headers , sdk_key ),
511+ json = body ,
512+ timeout = self ._timeout ,
513+ ) as response :
514+ self ._record_telemetry (response .status_code , get_current_epoch_time_ms () - start )
515+ return HttpResponse (response .status_code , response .text , response .headers )
516+
517+ def _set_authentication (self , server_name = None ):
518+ """
519+ Set the authentication for all self._sessions variables based on authentication scheme.
520+
521+ :param server: If set, will only add the auth for its session variable, otherwise will set all sessions.
522+ :typee server: str
523+ """
524+ for server in ['sdk' , 'events' , 'auth' , 'telemetry' ]:
525+ if server_name is not None and server_name != server :
526+ continue
527+ if self ._authentication_scheme == AuthenticateScheme .KERBEROS_SPNEGO :
528+ _LOGGER .debug ("Using Kerberos Spnego Authentication" )
529+ if self ._authentication_params != [None , None ]:
530+ self ._sessions [server ].auth = HTTPKerberosAuth (principal = self ._authentication_params [0 ], password = self ._authentication_params [1 ], mutual_authentication = OPTIONAL )
531+ else :
532+ self ._sessions [server ].auth = HTTPKerberosAuth (mutual_authentication = OPTIONAL )
533+ elif self ._authentication_scheme == AuthenticateScheme .KERBEROS_PROXY :
534+ _LOGGER .debug ("Using Kerberos Proxy Authentication" )
535+ if self ._authentication_params != [None , None ]:
536+ self ._sessions [server ].mount ('https://' , HTTPAdapterWithProxyKerberosAuth (principal = self ._authentication_params [0 ], password = self ._authentication_params [1 ]))
537+ else :
538+ self ._sessions [server ].mount ('https://' , HTTPAdapterWithProxyKerberosAuth ())
0 commit comments