From e5e72a5b496bbc85e579635da6e2bdb593df2b6a Mon Sep 17 00:00:00 2001 From: mgiannakopoulos Date: Wed, 26 Nov 2025 12:29:42 -0500 Subject: [PATCH 1/4] fix: Make auth tokens resolved dynamically per request. --- singlestoredb/ai/embeddings.py | 80 +++++++++++++++++++++++++++++----- 1 file changed, 69 insertions(+), 11 deletions(-) diff --git a/singlestoredb/ai/embeddings.py b/singlestoredb/ai/embeddings.py index 2449d94f..5976410b 100644 --- a/singlestoredb/ai/embeddings.py +++ b/singlestoredb/ai/embeddings.py @@ -1,4 +1,5 @@ import os +from collections.abc import Generator from typing import Any from typing import Callable from typing import Optional @@ -7,6 +8,7 @@ import httpx from singlestoredb import manage_workspaces +from singlestoredb.management.inference_api import InferenceAPIInfo try: from langchain_openai import OpenAIEmbeddings @@ -31,19 +33,54 @@ def SingleStoreEmbeddingsFactory( model_name: str, - api_key: Optional[str] = None, + api_key: Optional[Union[Optional[str], Callable[[], Optional[str]]]] = None, http_client: Optional[httpx.Client] = None, - obo_token_getter: Optional[Callable[[], Optional[str]]] = None, + obo_token: Optional[Union[Optional[str], Callable[[], Optional[str]]]] = None, + base_url: Optional[str] = None, + hosting_platform: Optional[str] = None, **kwargs: Any, ) -> Union[OpenAIEmbeddings, BedrockEmbeddings]: """Return an embeddings model instance (OpenAIEmbeddings or BedrockEmbeddings). """ - inference_api_manager = ( - manage_workspaces().organizations.current.inference_apis - ) - info = inference_api_manager.get(model_name=model_name) - token_env = os.environ.get('SINGLESTOREDB_USER_TOKEN') - token = api_key if api_key is not None else token_env + # Handle api_key and obo_token as callable functions + if callable(api_key): + api_key_getter = api_key + else: + def api_key_getter() -> Optional[str]: + if api_key is None: + return os.environ.get('SINGLESTOREDB_USER_TOKEN') + return api_key + + if callable(obo_token): + obo_token_getter = obo_token + else: + def obo_token_getter() -> Optional[str]: + return obo_token + + # handle model info + if base_url is None: + base_url = os.environ.get('SINGLESTOREDB_INFERENCE_API_BASE_URL') + if hosting_platform is None: + hosting_platform = os.environ.get('SINGLESTOREDB_INFERENCE_API_HOSTING_PLATFORM') + + if base_url is None or hosting_platform is None: + inference_api_manager = ( + manage_workspaces().organizations.current.inference_apis + ) + info = inference_api_manager.get(model_name=model_name) + else: + info = InferenceAPIInfo( + service_id='', + model_name=model_name, + name='', + connection_url=base_url, + project_id='', + hosting_platform=hosting_platform, + ) + if base_url is not None: + info.connection_url = base_url + if hosting_platform is not None: + info.hosting_platform = hosting_platform if info.hosting_platform == 'Amazon': # Instantiate Bedrock client @@ -85,12 +122,14 @@ def SingleStoreEmbeddingsFactory( def _inject_headers(request: Any, **_ignored: Any) -> None: """Inject dynamic auth/OBO headers prior to Bedrock sending.""" + if api_key_getter is not None: + token_val = api_key_getter() + if token_val: + request.headers['Authorization'] = f'Bearer {token_val}' if obo_token_getter is not None: obo_val = obo_token_getter() if obo_val: request.headers['X-S2-OBO'] = obo_val - if token: - request.headers['Authorization'] = f'Bearer {token}' request.headers.pop('X-Amz-Date', None) request.headers.pop('X-Amz-Security-Token', None) @@ -114,10 +153,29 @@ def _inject_headers(request: Any, **_ignored: Any) -> None: **kwargs, ) + class OpenAIAuth(httpx.Auth): + def auth_flow( + self, request: httpx.Request, + ) -> Generator[httpx.Request, None, None]: + if api_key_getter is not None: + token_val = api_key_getter() + if token_val: + request.headers['Authorization'] = f'Bearer {token_val}' + if obo_token_getter is not None: + obo_val = obo_token_getter() + if obo_val: + request.headers['X-S2-OBO'] = obo_val + yield request + + http_client = httpx.Client( + timeout=30, + auth=OpenAIAuth(), + ) + # OpenAI / Azure OpenAI path openai_kwargs = dict( base_url=info.connection_url, - api_key=token, + api_key='placeholder', model=model_name, ) if http_client is not None: From 0f8370515380bcd063e6491b5888c29aaf049d31 Mon Sep 17 00:00:00 2001 From: mgiannakopoulos Date: Wed, 26 Nov 2025 17:14:17 -0500 Subject: [PATCH 2/4] Apply same changes to ChatFactory as well. --- singlestoredb/ai/chat.py | 79 +++++++++++++++++++++++++++++----- singlestoredb/ai/embeddings.py | 1 - 2 files changed, 68 insertions(+), 12 deletions(-) diff --git a/singlestoredb/ai/chat.py b/singlestoredb/ai/chat.py index f3419c28..41b393b8 100644 --- a/singlestoredb/ai/chat.py +++ b/singlestoredb/ai/chat.py @@ -1,4 +1,5 @@ import os +from collections.abc import Generator from typing import Any from typing import Callable from typing import Optional @@ -7,6 +8,7 @@ import httpx from singlestoredb import manage_workspaces +from singlestoredb.management.inference_api import InferenceAPIInfo try: from langchain_openai import ChatOpenAI @@ -31,20 +33,54 @@ def SingleStoreChatFactory( model_name: str, - api_key: Optional[str] = None, + api_key: Optional[Union[Optional[str], Callable[[], Optional[str]]]] = None, streaming: bool = True, http_client: Optional[httpx.Client] = None, - obo_token_getter: Optional[Callable[[], Optional[str]]] = None, + obo_token: Optional[Union[Optional[str], Callable[[], Optional[str]]]] = None, + base_url: Optional[str] = None, + hosting_platform: Optional[str] = None, **kwargs: Any, ) -> Union[ChatOpenAI, ChatBedrockConverse]: """Return a chat model instance (ChatOpenAI or ChatBedrockConverse). """ - inference_api_manager = ( - manage_workspaces().organizations.current.inference_apis - ) - info = inference_api_manager.get(model_name=model_name) - token_env = os.environ.get('SINGLESTOREDB_USER_TOKEN') - token = api_key if api_key is not None else token_env + # Handle api_key and obo_token as callable functions + if callable(api_key): + api_key_getter = api_key + else: + def api_key_getter() -> Optional[str]: + if api_key is None: + return os.environ.get('SINGLESTOREDB_USER_TOKEN') + return api_key + + if callable(obo_token): + obo_token_getter = obo_token + else: + def obo_token_getter() -> Optional[str]: + return obo_token + + # handle model info + if base_url is None: + base_url = os.environ.get('SINGLESTOREDB_INFERENCE_API_BASE_URL') + if hosting_platform is None: + hosting_platform = os.environ.get('SINGLESTOREDB_INFERENCE_API_HOSTING_PLATFORM') + if base_url is None or hosting_platform is None: + inference_api_manager = ( + manage_workspaces().organizations.current.inference_apis + ) + info = inference_api_manager.get(model_name=model_name) + else: + info = InferenceAPIInfo( + service_id='', + model_name=model_name, + name='', + connection_url=base_url, + project_id='', + hosting_platform=hosting_platform, + ) + if base_url is not None: + info.connection_url = base_url + if hosting_platform is not None: + info.hosting_platform = hosting_platform if info.hosting_platform == 'Amazon': # Instantiate Bedrock client @@ -86,12 +122,14 @@ def SingleStoreChatFactory( def _inject_headers(request: Any, **_ignored: Any) -> None: """Inject dynamic auth/OBO headers prior to Bedrock sending.""" + if api_key_getter is not None: + token_val = api_key_getter() + if token_val: + request.headers['Authorization'] = f'Bearer {token_val}' if obo_token_getter is not None: obo_val = obo_token_getter() if obo_val: request.headers['X-S2-OBO'] = obo_val - if token: - request.headers['Authorization'] = f'Bearer {token}' request.headers.pop('X-Amz-Date', None) request.headers.pop('X-Amz-Security-Token', None) @@ -124,10 +162,29 @@ def _inject_headers(request: Any, **_ignored: Any) -> None: **kwargs, ) + class OpenAIAuth(httpx.Auth): + def auth_flow( + self, request: httpx.Request, + ) -> Generator[httpx.Request, None, None]: + if api_key_getter is not None: + token_val = api_key_getter() + if token_val: + request.headers['Authorization'] = f'Bearer {token_val}' + if obo_token_getter is not None: + obo_val = obo_token_getter() + if obo_val: + request.headers['X-S2-OBO'] = obo_val + yield request + + http_client = httpx.Client( + timeout=30, + auth=OpenAIAuth(), + ) + # OpenAI / Azure OpenAI path openai_kwargs = dict( base_url=info.connection_url, - api_key=token, + api_key='placeholder', model=model_name, streaming=streaming, ) diff --git a/singlestoredb/ai/embeddings.py b/singlestoredb/ai/embeddings.py index 5976410b..d1bf58b6 100644 --- a/singlestoredb/ai/embeddings.py +++ b/singlestoredb/ai/embeddings.py @@ -62,7 +62,6 @@ def obo_token_getter() -> Optional[str]: base_url = os.environ.get('SINGLESTOREDB_INFERENCE_API_BASE_URL') if hosting_platform is None: hosting_platform = os.environ.get('SINGLESTOREDB_INFERENCE_API_HOSTING_PLATFORM') - if base_url is None or hosting_platform is None: inference_api_manager = ( manage_workspaces().organizations.current.inference_apis From 8492a9d8f4d2167a9c556d041207da3ed9e7abbb Mon Sep 17 00:00:00 2001 From: mgiannakopoulos Date: Wed, 26 Nov 2025 18:22:08 -0500 Subject: [PATCH 3/4] Handle timeout properly without causing regressions until we remove accepting http_client parameter. --- singlestoredb/ai/chat.py | 50 ++++++++++++++++++++-------------- singlestoredb/ai/embeddings.py | 50 ++++++++++++++++++++-------------- 2 files changed, 60 insertions(+), 40 deletions(-) diff --git a/singlestoredb/ai/chat.py b/singlestoredb/ai/chat.py index 41b393b8..a504c750 100644 --- a/singlestoredb/ai/chat.py +++ b/singlestoredb/ai/chat.py @@ -82,29 +82,30 @@ def obo_token_getter() -> Optional[str]: if hosting_platform is not None: info.hosting_platform = hosting_platform + # Extract timeouts from http_client if provided + t = http_client.timeout if http_client is not None else None + connect_timeout = None + read_timeout = None + if t is not None: + if isinstance(t, httpx.Timeout): + if t.connect is not None: + connect_timeout = float(t.connect) + if t.read is not None: + read_timeout = float(t.read) + if connect_timeout is None and read_timeout is not None: + connect_timeout = read_timeout + if read_timeout is None and connect_timeout is not None: + read_timeout = connect_timeout + elif isinstance(t, (int, float)): + connect_timeout = float(t) + read_timeout = float(t) + if info.hosting_platform == 'Amazon': # Instantiate Bedrock client cfg_kwargs = { 'signature_version': UNSIGNED, 'retries': {'max_attempts': 1, 'mode': 'standard'}, } - # Extract timeouts from http_client if provided - t = http_client.timeout if http_client is not None else None - connect_timeout = None - read_timeout = None - if t is not None: - if isinstance(t, httpx.Timeout): - if t.connect is not None: - connect_timeout = float(t.connect) - if t.read is not None: - read_timeout = float(t.read) - if connect_timeout is None and read_timeout is not None: - connect_timeout = read_timeout - if read_timeout is None and connect_timeout is not None: - read_timeout = connect_timeout - elif isinstance(t, (int, float)): - connect_timeout = float(t) - read_timeout = float(t) if read_timeout is not None: cfg_kwargs['read_timeout'] = read_timeout if connect_timeout is not None: @@ -176,8 +177,18 @@ def auth_flow( request.headers['X-S2-OBO'] = obo_val yield request + # Build timeout configuration + if connect_timeout is not None and read_timeout is not None: + t = httpx.Timeout(connect=connect_timeout, read=read_timeout) + elif connect_timeout is not None: + t = httpx.Timeout(connect=connect_timeout) + elif read_timeout is not None: + t = httpx.Timeout(read=read_timeout) + else: + t = 60.0 # default OpenAI client timeout + http_client = httpx.Client( - timeout=30, + timeout=t, auth=OpenAIAuth(), ) @@ -188,8 +199,7 @@ def auth_flow( model=model_name, streaming=streaming, ) - if http_client is not None: - openai_kwargs['http_client'] = http_client + openai_kwargs['http_client'] = http_client return ChatOpenAI( **openai_kwargs, **kwargs, diff --git a/singlestoredb/ai/embeddings.py b/singlestoredb/ai/embeddings.py index d1bf58b6..5d5798b6 100644 --- a/singlestoredb/ai/embeddings.py +++ b/singlestoredb/ai/embeddings.py @@ -81,29 +81,30 @@ def obo_token_getter() -> Optional[str]: if hosting_platform is not None: info.hosting_platform = hosting_platform + # Extract timeouts from http_client if provided + t = http_client.timeout if http_client is not None else None + connect_timeout = None + read_timeout = None + if t is not None: + if isinstance(t, httpx.Timeout): + if t.connect is not None: + connect_timeout = float(t.connect) + if t.read is not None: + read_timeout = float(t.read) + if connect_timeout is None and read_timeout is not None: + connect_timeout = read_timeout + if read_timeout is None and connect_timeout is not None: + read_timeout = connect_timeout + elif isinstance(t, (int, float)): + connect_timeout = float(t) + read_timeout = float(t) + if info.hosting_platform == 'Amazon': # Instantiate Bedrock client cfg_kwargs = { 'signature_version': UNSIGNED, 'retries': {'max_attempts': 1, 'mode': 'standard'}, } - # Extract timeouts from http_client if provided - t = http_client.timeout if http_client is not None else None - connect_timeout = None - read_timeout = None - if t is not None: - if isinstance(t, httpx.Timeout): - if t.connect is not None: - connect_timeout = float(t.connect) - if t.read is not None: - read_timeout = float(t.read) - if connect_timeout is None and read_timeout is not None: - connect_timeout = read_timeout - if read_timeout is None and connect_timeout is not None: - read_timeout = connect_timeout - elif isinstance(t, (int, float)): - connect_timeout = float(t) - read_timeout = float(t) if read_timeout is not None: cfg_kwargs['read_timeout'] = read_timeout if connect_timeout is not None: @@ -166,8 +167,18 @@ def auth_flow( request.headers['X-S2-OBO'] = obo_val yield request + # Build timeout configuration + if connect_timeout is not None and read_timeout is not None: + t = httpx.Timeout(connect=connect_timeout, read=read_timeout) + elif connect_timeout is not None: + t = httpx.Timeout(connect=connect_timeout) + elif read_timeout is not None: + t = httpx.Timeout(read=read_timeout) + else: + t = 60.0 # default OpenAI client timeout + http_client = httpx.Client( - timeout=30, + timeout=t, auth=OpenAIAuth(), ) @@ -177,8 +188,7 @@ def auth_flow( api_key='placeholder', model=model_name, ) - if http_client is not None: - openai_kwargs['http_client'] = http_client + openai_kwargs['http_client'] = http_client return OpenAIEmbeddings( **openai_kwargs, **kwargs, From 03736bd810a49a055852906d4c914d74d74c973d Mon Sep 17 00:00:00 2001 From: mgiannakopoulos Date: Thu, 27 Nov 2025 00:04:41 -0500 Subject: [PATCH 4/4] Fixes; prepare for deprecation of http_client and obo_token_getter params. --- singlestoredb/ai/chat.py | 59 ++++++++++++++++++---------------- singlestoredb/ai/embeddings.py | 59 ++++++++++++++++++---------------- 2 files changed, 64 insertions(+), 54 deletions(-) diff --git a/singlestoredb/ai/chat.py b/singlestoredb/ai/chat.py index a504c750..cc94b699 100644 --- a/singlestoredb/ai/chat.py +++ b/singlestoredb/ai/chat.py @@ -37,26 +37,31 @@ def SingleStoreChatFactory( streaming: bool = True, http_client: Optional[httpx.Client] = None, obo_token: Optional[Union[Optional[str], Callable[[], Optional[str]]]] = None, + obo_token_getter: Optional[Callable[[], Optional[str]]] = None, base_url: Optional[str] = None, hosting_platform: Optional[str] = None, + timeout: Optional[float] = None, **kwargs: Any, ) -> Union[ChatOpenAI, ChatBedrockConverse]: """Return a chat model instance (ChatOpenAI or ChatBedrockConverse). """ # Handle api_key and obo_token as callable functions if callable(api_key): - api_key_getter = api_key + api_key_getter_fn = api_key else: - def api_key_getter() -> Optional[str]: + def api_key_getter_fn() -> Optional[str]: if api_key is None: return os.environ.get('SINGLESTOREDB_USER_TOKEN') return api_key - if callable(obo_token): - obo_token_getter = obo_token + if obo_token_getter is not None: + obo_token_getter_fn = obo_token_getter else: - def obo_token_getter() -> Optional[str]: - return obo_token + if callable(obo_token): + obo_token_getter_fn = obo_token + else: + def obo_token_getter_fn() -> Optional[str]: + return obo_token # handle model info if base_url is None: @@ -99,6 +104,10 @@ def obo_token_getter() -> Optional[str]: elif isinstance(t, (int, float)): connect_timeout = float(t) read_timeout = float(t) + if timeout is not None: + connect_timeout = timeout + read_timeout = timeout + t = httpx.Timeout(timeout) if info.hosting_platform == 'Amazon': # Instantiate Bedrock client @@ -123,12 +132,12 @@ def obo_token_getter() -> Optional[str]: def _inject_headers(request: Any, **_ignored: Any) -> None: """Inject dynamic auth/OBO headers prior to Bedrock sending.""" - if api_key_getter is not None: - token_val = api_key_getter() + if api_key_getter_fn is not None: + token_val = api_key_getter_fn() if token_val: request.headers['Authorization'] = f'Bearer {token_val}' - if obo_token_getter is not None: - obo_val = obo_token_getter() + if obo_token_getter_fn is not None: + obo_val = obo_token_getter_fn() if obo_val: request.headers['X-S2-OBO'] = obo_val request.headers.pop('X-Amz-Date', None) @@ -167,30 +176,26 @@ class OpenAIAuth(httpx.Auth): def auth_flow( self, request: httpx.Request, ) -> Generator[httpx.Request, None, None]: - if api_key_getter is not None: - token_val = api_key_getter() + if api_key_getter_fn is not None: + token_val = api_key_getter_fn() if token_val: request.headers['Authorization'] = f'Bearer {token_val}' - if obo_token_getter is not None: - obo_val = obo_token_getter() + if obo_token_getter_fn is not None: + obo_val = obo_token_getter_fn() if obo_val: request.headers['X-S2-OBO'] = obo_val yield request - # Build timeout configuration - if connect_timeout is not None and read_timeout is not None: - t = httpx.Timeout(connect=connect_timeout, read=read_timeout) - elif connect_timeout is not None: - t = httpx.Timeout(connect=connect_timeout) - elif read_timeout is not None: - t = httpx.Timeout(read=read_timeout) + if t is not None: + http_client = httpx.Client( + timeout=t, + auth=OpenAIAuth(), + ) else: - t = 60.0 # default OpenAI client timeout - - http_client = httpx.Client( - timeout=t, - auth=OpenAIAuth(), - ) + http_client = httpx.Client( + timeout=httpx.Timeout(timeout=600, connect=5.0), # default OpenAI timeout + auth=OpenAIAuth(), + ) # OpenAI / Azure OpenAI path openai_kwargs = dict( diff --git a/singlestoredb/ai/embeddings.py b/singlestoredb/ai/embeddings.py index 5d5798b6..e85f26a7 100644 --- a/singlestoredb/ai/embeddings.py +++ b/singlestoredb/ai/embeddings.py @@ -36,26 +36,31 @@ def SingleStoreEmbeddingsFactory( api_key: Optional[Union[Optional[str], Callable[[], Optional[str]]]] = None, http_client: Optional[httpx.Client] = None, obo_token: Optional[Union[Optional[str], Callable[[], Optional[str]]]] = None, + obo_token_getter: Optional[Callable[[], Optional[str]]] = None, base_url: Optional[str] = None, hosting_platform: Optional[str] = None, + timeout: Optional[float] = None, **kwargs: Any, ) -> Union[OpenAIEmbeddings, BedrockEmbeddings]: """Return an embeddings model instance (OpenAIEmbeddings or BedrockEmbeddings). """ # Handle api_key and obo_token as callable functions if callable(api_key): - api_key_getter = api_key + api_key_getter_fn = api_key else: - def api_key_getter() -> Optional[str]: + def api_key_getter_fn() -> Optional[str]: if api_key is None: return os.environ.get('SINGLESTOREDB_USER_TOKEN') return api_key - if callable(obo_token): - obo_token_getter = obo_token + if obo_token_getter is not None: + obo_token_getter_fn = obo_token_getter else: - def obo_token_getter() -> Optional[str]: - return obo_token + if callable(obo_token): + obo_token_getter_fn = obo_token + else: + def obo_token_getter_fn() -> Optional[str]: + return obo_token # handle model info if base_url is None: @@ -98,6 +103,10 @@ def obo_token_getter() -> Optional[str]: elif isinstance(t, (int, float)): connect_timeout = float(t) read_timeout = float(t) + if timeout is not None: + connect_timeout = timeout + read_timeout = timeout + t = httpx.Timeout(timeout) if info.hosting_platform == 'Amazon': # Instantiate Bedrock client @@ -122,12 +131,12 @@ def obo_token_getter() -> Optional[str]: def _inject_headers(request: Any, **_ignored: Any) -> None: """Inject dynamic auth/OBO headers prior to Bedrock sending.""" - if api_key_getter is not None: - token_val = api_key_getter() + if api_key_getter_fn is not None: + token_val = api_key_getter_fn() if token_val: request.headers['Authorization'] = f'Bearer {token_val}' - if obo_token_getter is not None: - obo_val = obo_token_getter() + if obo_token_getter_fn is not None: + obo_val = obo_token_getter_fn() if obo_val: request.headers['X-S2-OBO'] = obo_val request.headers.pop('X-Amz-Date', None) @@ -157,30 +166,26 @@ class OpenAIAuth(httpx.Auth): def auth_flow( self, request: httpx.Request, ) -> Generator[httpx.Request, None, None]: - if api_key_getter is not None: - token_val = api_key_getter() + if api_key_getter_fn is not None: + token_val = api_key_getter_fn() if token_val: request.headers['Authorization'] = f'Bearer {token_val}' - if obo_token_getter is not None: - obo_val = obo_token_getter() + if obo_token_getter_fn is not None: + obo_val = obo_token_getter_fn() if obo_val: request.headers['X-S2-OBO'] = obo_val yield request - # Build timeout configuration - if connect_timeout is not None and read_timeout is not None: - t = httpx.Timeout(connect=connect_timeout, read=read_timeout) - elif connect_timeout is not None: - t = httpx.Timeout(connect=connect_timeout) - elif read_timeout is not None: - t = httpx.Timeout(read=read_timeout) + if t is not None: + http_client = httpx.Client( + timeout=t, + auth=OpenAIAuth(), + ) else: - t = 60.0 # default OpenAI client timeout - - http_client = httpx.Client( - timeout=t, - auth=OpenAIAuth(), - ) + http_client = httpx.Client( + timeout=httpx.Timeout(timeout=600, connect=5.0), # default OpenAI timeout + auth=OpenAIAuth(), + ) # OpenAI / Azure OpenAI path openai_kwargs = dict(