From b55178660265bff776583a6f1ede9f67c6a733cd Mon Sep 17 00:00:00 2001 From: mgiannakopoulos Date: Tue, 2 Dec 2025 18:23:27 -0500 Subject: [PATCH 01/16] fix: Headers not propagated correctly for OpenAI requests. --- singlestoredb/ai/chat.py | 191 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 182 insertions(+), 9 deletions(-) diff --git a/singlestoredb/ai/chat.py b/singlestoredb/ai/chat.py index cc94b699..428d121a 100644 --- a/singlestoredb/ai/chat.py +++ b/singlestoredb/ai/chat.py @@ -1,5 +1,4 @@ import os -from collections.abc import Generator from typing import Any from typing import Callable from typing import Optional @@ -172,10 +171,143 @@ def _inject_headers(request: Any, **_ignored: Any) -> None: **kwargs, ) - class OpenAIAuth(httpx.Auth): - def auth_flow( - self, request: httpx.Request, - ) -> Generator[httpx.Request, None, None]: + def inject_auth_headers(request: httpx.Request) -> None: + """Inject dynamic auth/OBO headers before request is sent.""" + if api_key_getter_fn is not None: + token_val = api_key_getter_fn() + if token_val: + request.headers['Authorization'] = f'Bearer {token_val}' + if obo_token_getter_fn is not None: + obo_val = obo_token_getter_fn() + if obo_val: + request.headers['X-S2-OBO'] = obo_val + + if t is not None: + http_client = httpx.Client( + timeout=t, + event_hooks={'request': [inject_auth_headers]}, + ) + else: + http_client = httpx.Client( + timeout=httpx.Timeout(timeout=600, connect=5.0), # default OpenAI timeout + event_hooks={'request': [inject_auth_headers]}, + ) + + # OpenAI / Azure OpenAI path + openai_kwargs = dict( + base_url=info.connection_url, + api_key='placeholder', + model=model_name, + streaming=streaming, + http_client=http_client, + ) + return ChatOpenAI( + **openai_kwargs, + **kwargs, + ) + + +def SingleStoreChatFactoryDebug( + model_name: str, + api_key: Optional[Union[Optional[str], Callable[[], Optional[str]]]] = None, + streaming: bool = True, + http_client: Optional[httpx.Client] = None, + obo_token: Optional[Union[Optional[str], Callable[[], Optional[str]]]] = None, + obo_token_getter: Optional[Callable[[], Optional[str]]] = None, + base_url: Optional[str] = None, + hosting_platform: Optional[str] = None, + timeout: Optional[float] = None, + **kwargs: Any, +) -> Union[ChatOpenAI, ChatBedrockConverse]: + """Return a chat model instance (ChatOpenAI or ChatBedrockConverse). + """ + # Handle api_key and obo_token as callable functions + if callable(api_key): + api_key_getter_fn = api_key + else: + def api_key_getter_fn() -> Optional[str]: + if api_key is None: + return os.environ.get('SINGLESTOREDB_USER_TOKEN') + return api_key + + if obo_token_getter is not None: + obo_token_getter_fn = obo_token_getter + else: + if callable(obo_token): + obo_token_getter_fn = obo_token + else: + def obo_token_getter_fn() -> Optional[str]: + return obo_token + + # handle model info + if base_url is None: + base_url = os.environ.get('SINGLESTOREDB_INFERENCE_API_BASE_URL') + if hosting_platform is None: + hosting_platform = os.environ.get('SINGLESTOREDB_INFERENCE_API_HOSTING_PLATFORM') + if base_url is None or hosting_platform is None: + inference_api_manager = ( + manage_workspaces().organizations.current.inference_apis + ) + info = inference_api_manager.get(model_name=model_name) + else: + info = InferenceAPIInfo( + service_id='', + model_name=model_name, + name='', + connection_url=base_url, + project_id='', + hosting_platform=hosting_platform, + ) + if base_url is not None: + info.connection_url = base_url + if hosting_platform is not None: + info.hosting_platform = hosting_platform + + # Extract timeouts from http_client if provided + t = http_client.timeout if http_client is not None else None + connect_timeout = None + read_timeout = None + if t is not None: + if isinstance(t, httpx.Timeout): + if t.connect is not None: + connect_timeout = float(t.connect) + if t.read is not None: + read_timeout = float(t.read) + if connect_timeout is None and read_timeout is not None: + connect_timeout = read_timeout + if read_timeout is None and connect_timeout is not None: + read_timeout = connect_timeout + elif isinstance(t, (int, float)): + connect_timeout = float(t) + read_timeout = float(t) + if timeout is not None: + connect_timeout = timeout + read_timeout = timeout + t = httpx.Timeout(timeout) + + if info.hosting_platform == 'Amazon': + # Instantiate Bedrock client + cfg_kwargs = { + 'signature_version': UNSIGNED, + 'retries': {'max_attempts': 1, 'mode': 'standard'}, + } + if read_timeout is not None: + cfg_kwargs['read_timeout'] = read_timeout + if connect_timeout is not None: + cfg_kwargs['connect_timeout'] = connect_timeout + + cfg = Config(**cfg_kwargs) + client = boto3.client( + 'bedrock-runtime', + endpoint_url=info.connection_url, + region_name='us-east-1', + aws_access_key_id='placeholder', + aws_secret_access_key='placeholder', + config=cfg, + ) + + def _inject_headers(request: Any, **_ignored: Any) -> None: + """Inject dynamic auth/OBO headers prior to Bedrock sending.""" if api_key_getter_fn is not None: token_val = api_key_getter_fn() if token_val: @@ -184,17 +316,58 @@ def auth_flow( obo_val = obo_token_getter_fn() if obo_val: request.headers['X-S2-OBO'] = obo_val - yield request + request.headers.pop('X-Amz-Date', None) + request.headers.pop('X-Amz-Security-Token', None) + + emitter = client._endpoint._event_emitter + emitter.register_first( + 'before-send.bedrock-runtime.Converse', + _inject_headers, + ) + emitter.register_first( + 'before-send.bedrock-runtime.ConverseStream', + _inject_headers, + ) + emitter.register_first( + 'before-send.bedrock-runtime.InvokeModel', + _inject_headers, + ) + emitter.register_first( + 'before-send.bedrock-runtime.InvokeModelWithResponseStream', + _inject_headers, + ) + + return ChatBedrockConverse( + model_id=model_name, + endpoint_url=info.connection_url, + region_name='us-east-1', + aws_access_key_id='placeholder', + aws_secret_access_key='placeholder', + disable_streaming=not streaming, + client=client, + **kwargs, + ) + + def inject_auth_headers(request: httpx.Request) -> None: + """Inject dynamic auth/OBO headers before request is sent.""" + if api_key_getter_fn is not None: + token_val = api_key_getter_fn() + if token_val: + request.headers['Authorization'] = f'Bearer {token_val}' + if obo_token_getter_fn is not None: + obo_val = obo_token_getter_fn() + if obo_val: + request.headers['X-S2-OBO'] = obo_val if t is not None: http_client = httpx.Client( timeout=t, - auth=OpenAIAuth(), + event_hooks={'request': [inject_auth_headers]}, ) else: http_client = httpx.Client( timeout=httpx.Timeout(timeout=600, connect=5.0), # default OpenAI timeout - auth=OpenAIAuth(), + event_hooks={'request': [inject_auth_headers]}, ) # OpenAI / Azure OpenAI path @@ -203,8 +376,8 @@ def auth_flow( api_key='placeholder', model=model_name, streaming=streaming, + http_client=http_client, ) - openai_kwargs['http_client'] = http_client return ChatOpenAI( **openai_kwargs, **kwargs, From 6bc10df58c373561eb33e2c3839df5caa6af6c8b Mon Sep 17 00:00:00 2001 From: mgiannakopoulos Date: Tue, 2 Dec 2025 18:38:28 -0500 Subject: [PATCH 02/16] Expose the experimental SingleStoreChatFactoryDebug factory. --- singlestoredb/ai/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/singlestoredb/ai/__init__.py b/singlestoredb/ai/__init__.py index 3cc526d6..3e6b1a66 100644 --- a/singlestoredb/ai/__init__.py +++ b/singlestoredb/ai/__init__.py @@ -1,2 +1,3 @@ from .chat import SingleStoreChatFactory # noqa: F401 +from .chat import SingleStoreChatFactoryDebug # noqa: F401 from .embeddings import SingleStoreEmbeddingsFactory # noqa: F401 From ad2ea697968b687cfcf3d052fbd2d6dd355283db Mon Sep 17 00:00:00 2001 From: mgiannakopoulos Date: Tue, 2 Dec 2025 19:29:04 -0500 Subject: [PATCH 03/16] Change package name. --- singlestoredb/ai/__init__.py | 2 +- singlestoredb/ai/chat.py | 177 ------------------------------ singlestoredb/ai/debug.py | 207 +++++++++++++++++++++++++++++++++++ 3 files changed, 208 insertions(+), 178 deletions(-) create mode 100644 singlestoredb/ai/debug.py diff --git a/singlestoredb/ai/__init__.py b/singlestoredb/ai/__init__.py index 3e6b1a66..773941fe 100644 --- a/singlestoredb/ai/__init__.py +++ b/singlestoredb/ai/__init__.py @@ -1,3 +1,3 @@ from .chat import SingleStoreChatFactory # noqa: F401 -from .chat import SingleStoreChatFactoryDebug # noqa: F401 +from .debug import SingleStoreChatFactoryDebug # noqa: F401 from .embeddings import SingleStoreEmbeddingsFactory # noqa: F401 diff --git a/singlestoredb/ai/chat.py b/singlestoredb/ai/chat.py index 428d121a..3526d5b3 100644 --- a/singlestoredb/ai/chat.py +++ b/singlestoredb/ai/chat.py @@ -205,180 +205,3 @@ def inject_auth_headers(request: httpx.Request) -> None: **openai_kwargs, **kwargs, ) - - -def SingleStoreChatFactoryDebug( - model_name: str, - api_key: Optional[Union[Optional[str], Callable[[], Optional[str]]]] = None, - streaming: bool = True, - http_client: Optional[httpx.Client] = None, - obo_token: Optional[Union[Optional[str], Callable[[], Optional[str]]]] = None, - obo_token_getter: Optional[Callable[[], Optional[str]]] = None, - base_url: Optional[str] = None, - hosting_platform: Optional[str] = None, - timeout: Optional[float] = None, - **kwargs: Any, -) -> Union[ChatOpenAI, ChatBedrockConverse]: - """Return a chat model instance (ChatOpenAI or ChatBedrockConverse). - """ - # Handle api_key and obo_token as callable functions - if callable(api_key): - api_key_getter_fn = api_key - else: - def api_key_getter_fn() -> Optional[str]: - if api_key is None: - return os.environ.get('SINGLESTOREDB_USER_TOKEN') - return api_key - - if obo_token_getter is not None: - obo_token_getter_fn = obo_token_getter - else: - if callable(obo_token): - obo_token_getter_fn = obo_token - else: - def obo_token_getter_fn() -> Optional[str]: - return obo_token - - # handle model info - if base_url is None: - base_url = os.environ.get('SINGLESTOREDB_INFERENCE_API_BASE_URL') - if hosting_platform is None: - hosting_platform = os.environ.get('SINGLESTOREDB_INFERENCE_API_HOSTING_PLATFORM') - if base_url is None or hosting_platform is None: - inference_api_manager = ( - manage_workspaces().organizations.current.inference_apis - ) - info = inference_api_manager.get(model_name=model_name) - else: - info = InferenceAPIInfo( - service_id='', - model_name=model_name, - name='', - connection_url=base_url, - project_id='', - hosting_platform=hosting_platform, - ) - if base_url is not None: - info.connection_url = base_url - if hosting_platform is not None: - info.hosting_platform = hosting_platform - - # Extract timeouts from http_client if provided - t = http_client.timeout if http_client is not None else None - connect_timeout = None - read_timeout = None - if t is not None: - if isinstance(t, httpx.Timeout): - if t.connect is not None: - connect_timeout = float(t.connect) - if t.read is not None: - read_timeout = float(t.read) - if connect_timeout is None and read_timeout is not None: - connect_timeout = read_timeout - if read_timeout is None and connect_timeout is not None: - read_timeout = connect_timeout - elif isinstance(t, (int, float)): - connect_timeout = float(t) - read_timeout = float(t) - if timeout is not None: - connect_timeout = timeout - read_timeout = timeout - t = httpx.Timeout(timeout) - - if info.hosting_platform == 'Amazon': - # Instantiate Bedrock client - cfg_kwargs = { - 'signature_version': UNSIGNED, - 'retries': {'max_attempts': 1, 'mode': 'standard'}, - } - if read_timeout is not None: - cfg_kwargs['read_timeout'] = read_timeout - if connect_timeout is not None: - cfg_kwargs['connect_timeout'] = connect_timeout - - cfg = Config(**cfg_kwargs) - client = boto3.client( - 'bedrock-runtime', - endpoint_url=info.connection_url, - region_name='us-east-1', - aws_access_key_id='placeholder', - aws_secret_access_key='placeholder', - config=cfg, - ) - - def _inject_headers(request: Any, **_ignored: Any) -> None: - """Inject dynamic auth/OBO headers prior to Bedrock sending.""" - if api_key_getter_fn is not None: - token_val = api_key_getter_fn() - if token_val: - request.headers['Authorization'] = f'Bearer {token_val}' - if obo_token_getter_fn is not None: - obo_val = obo_token_getter_fn() - if obo_val: - request.headers['X-S2-OBO'] = obo_val - request.headers.pop('X-Amz-Date', None) - request.headers.pop('X-Amz-Security-Token', None) - - emitter = client._endpoint._event_emitter - emitter.register_first( - 'before-send.bedrock-runtime.Converse', - _inject_headers, - ) - emitter.register_first( - 'before-send.bedrock-runtime.ConverseStream', - _inject_headers, - ) - emitter.register_first( - 'before-send.bedrock-runtime.InvokeModel', - _inject_headers, - ) - emitter.register_first( - 'before-send.bedrock-runtime.InvokeModelWithResponseStream', - _inject_headers, - ) - - return ChatBedrockConverse( - model_id=model_name, - endpoint_url=info.connection_url, - region_name='us-east-1', - aws_access_key_id='placeholder', - aws_secret_access_key='placeholder', - disable_streaming=not streaming, - client=client, - **kwargs, - ) - - def inject_auth_headers(request: httpx.Request) -> None: - """Inject dynamic auth/OBO headers before request is sent.""" - if api_key_getter_fn is not None: - token_val = api_key_getter_fn() - if token_val: - request.headers['Authorization'] = f'Bearer {token_val}' - if obo_token_getter_fn is not None: - obo_val = obo_token_getter_fn() - if obo_val: - request.headers['X-S2-OBO'] = obo_val - - if t is not None: - http_client = httpx.Client( - timeout=t, - event_hooks={'request': [inject_auth_headers]}, - ) - else: - http_client = httpx.Client( - timeout=httpx.Timeout(timeout=600, connect=5.0), # default OpenAI timeout - event_hooks={'request': [inject_auth_headers]}, - ) - - # OpenAI / Azure OpenAI path - openai_kwargs = dict( - base_url=info.connection_url, - api_key='placeholder', - model=model_name, - streaming=streaming, - http_client=http_client, - ) - return ChatOpenAI( - **openai_kwargs, - **kwargs, - ) diff --git a/singlestoredb/ai/debug.py b/singlestoredb/ai/debug.py new file mode 100644 index 00000000..5b9ab648 --- /dev/null +++ b/singlestoredb/ai/debug.py @@ -0,0 +1,207 @@ +import os +from typing import Any +from typing import Callable +from typing import Optional +from typing import Union + +import httpx + +from singlestoredb import manage_workspaces +from singlestoredb.management.inference_api import InferenceAPIInfo + +try: + from langchain_openai import ChatOpenAI +except ImportError: + raise ImportError( + 'Could not import langchain_openai python package. ' + 'Please install it with `pip install langchain_openai`.', + ) + +try: + from langchain_aws import ChatBedrockConverse +except ImportError: + raise ImportError( + 'Could not import langchain-aws python package. ' + 'Please install it with `pip install langchain-aws`.', + ) + +import boto3 +from botocore import UNSIGNED +from botocore.config import Config + + +def SingleStoreChatFactoryDebug( + model_name: str, + api_key: Optional[Union[Optional[str], Callable[[], Optional[str]]]] = None, + streaming: bool = True, + http_client: Optional[httpx.Client] = None, + obo_token: Optional[Union[Optional[str], Callable[[], Optional[str]]]] = None, + obo_token_getter: Optional[Callable[[], Optional[str]]] = None, + base_url: Optional[str] = None, + hosting_platform: Optional[str] = None, + timeout: Optional[float] = None, + **kwargs: Any, +) -> Union[ChatOpenAI, ChatBedrockConverse]: + """Return a chat model instance (ChatOpenAI or ChatBedrockConverse). + """ + # Handle api_key and obo_token as callable functions + if callable(api_key): + api_key_getter_fn = api_key + else: + def api_key_getter_fn() -> Optional[str]: + if api_key is None: + return os.environ.get('SINGLESTOREDB_USER_TOKEN') + return api_key + + if obo_token_getter is not None: + obo_token_getter_fn = obo_token_getter + else: + if callable(obo_token): + obo_token_getter_fn = obo_token + else: + def obo_token_getter_fn() -> Optional[str]: + return obo_token + + # handle model info + if base_url is None: + base_url = os.environ.get('SINGLESTOREDB_INFERENCE_API_BASE_URL') + if hosting_platform is None: + hosting_platform = os.environ.get('SINGLESTOREDB_INFERENCE_API_HOSTING_PLATFORM') + if base_url is None or hosting_platform is None: + inference_api_manager = ( + manage_workspaces().organizations.current.inference_apis + ) + info = inference_api_manager.get(model_name=model_name) + else: + info = InferenceAPIInfo( + service_id='', + model_name=model_name, + name='', + connection_url=base_url, + project_id='', + hosting_platform=hosting_platform, + ) + if base_url is not None: + info.connection_url = base_url + if hosting_platform is not None: + info.hosting_platform = hosting_platform + + # Extract timeouts from http_client if provided + t = http_client.timeout if http_client is not None else None + connect_timeout = None + read_timeout = None + if t is not None: + if isinstance(t, httpx.Timeout): + if t.connect is not None: + connect_timeout = float(t.connect) + if t.read is not None: + read_timeout = float(t.read) + if connect_timeout is None and read_timeout is not None: + connect_timeout = read_timeout + if read_timeout is None and connect_timeout is not None: + read_timeout = connect_timeout + elif isinstance(t, (int, float)): + connect_timeout = float(t) + read_timeout = float(t) + if timeout is not None: + connect_timeout = timeout + read_timeout = timeout + t = httpx.Timeout(timeout) + + if info.hosting_platform == 'Amazon': + # Instantiate Bedrock client + cfg_kwargs = { + 'signature_version': UNSIGNED, + 'retries': {'max_attempts': 1, 'mode': 'standard'}, + } + if read_timeout is not None: + cfg_kwargs['read_timeout'] = read_timeout + if connect_timeout is not None: + cfg_kwargs['connect_timeout'] = connect_timeout + + cfg = Config(**cfg_kwargs) + client = boto3.client( + 'bedrock-runtime', + endpoint_url=info.connection_url, + region_name='us-east-1', + aws_access_key_id='placeholder', + aws_secret_access_key='placeholder', + config=cfg, + ) + + def _inject_headers(request: Any, **_ignored: Any) -> None: + """Inject dynamic auth/OBO headers prior to Bedrock sending.""" + if api_key_getter_fn is not None: + token_val = api_key_getter_fn() + if token_val: + request.headers['Authorization'] = f'Bearer {token_val}' + if obo_token_getter_fn is not None: + obo_val = obo_token_getter_fn() + if obo_val: + request.headers['X-S2-OBO'] = obo_val + request.headers.pop('X-Amz-Date', None) + request.headers.pop('X-Amz-Security-Token', None) + + emitter = client._endpoint._event_emitter + emitter.register_first( + 'before-send.bedrock-runtime.Converse', + _inject_headers, + ) + emitter.register_first( + 'before-send.bedrock-runtime.ConverseStream', + _inject_headers, + ) + emitter.register_first( + 'before-send.bedrock-runtime.InvokeModel', + _inject_headers, + ) + emitter.register_first( + 'before-send.bedrock-runtime.InvokeModelWithResponseStream', + _inject_headers, + ) + + return ChatBedrockConverse( + model_id=model_name, + endpoint_url=info.connection_url, + region_name='us-east-1', + aws_access_key_id='placeholder', + aws_secret_access_key='placeholder', + disable_streaming=not streaming, + client=client, + **kwargs, + ) + + def inject_auth_headers(request: httpx.Request) -> None: + """Inject dynamic auth/OBO headers before request is sent.""" + if api_key_getter_fn is not None: + token_val = api_key_getter_fn() + if token_val: + request.headers['Authorization'] = f'Bearer {token_val}' + if obo_token_getter_fn is not None: + obo_val = obo_token_getter_fn() + if obo_val: + request.headers['X-S2-OBO'] = obo_val + + if t is not None: + http_client = httpx.Client( + timeout=t, + event_hooks={'request': [inject_auth_headers]}, + ) + else: + http_client = httpx.Client( + timeout=httpx.Timeout(timeout=600, connect=5.0), # default OpenAI timeout + event_hooks={'request': [inject_auth_headers]}, + ) + + # OpenAI / Azure OpenAI path + openai_kwargs = dict( + base_url=info.connection_url, + api_key='placeholder', + model=model_name, + streaming=streaming, + http_client=http_client, + ) + return ChatOpenAI( + **openai_kwargs, + **kwargs, + ) From 26abd17fef08bb2e3bc1e32084ba19a3ef0b7ea2 Mon Sep 17 00:00:00 2001 From: mgiannakopoulos Date: Tue, 2 Dec 2025 20:15:06 -0500 Subject: [PATCH 04/16] Pass the functions as internal class properties. --- singlestoredb/ai/chat.py | 36 ++++++++++++++++++++++++------------ singlestoredb/ai/debug.py | 36 ++++++++++++++++++++++++------------ 2 files changed, 48 insertions(+), 24 deletions(-) diff --git a/singlestoredb/ai/chat.py b/singlestoredb/ai/chat.py index 3526d5b3..b24a68a4 100644 --- a/singlestoredb/ai/chat.py +++ b/singlestoredb/ai/chat.py @@ -1,4 +1,5 @@ import os +from collections.abc import Generator from typing import Any from typing import Callable from typing import Optional @@ -171,26 +172,37 @@ def _inject_headers(request: Any, **_ignored: Any) -> None: **kwargs, ) - def inject_auth_headers(request: httpx.Request) -> None: - """Inject dynamic auth/OBO headers before request is sent.""" - if api_key_getter_fn is not None: - token_val = api_key_getter_fn() - if token_val: - request.headers['Authorization'] = f'Bearer {token_val}' - if obo_token_getter_fn is not None: - obo_val = obo_token_getter_fn() - if obo_val: - request.headers['X-S2-OBO'] = obo_val + class OpenAIAuth(httpx.Auth): + def __init__( + self, + api_key_getter: Optional[Callable[[], Optional[str]]] = None, + obo_token_getter: Optional[Callable[[], Optional[str]]] = None, + ) -> None: + self.api_key_getter = api_key_getter + self.obo_token_getter = obo_token_getter + + def auth_flow( + self, request: httpx.Request, + ) -> Generator[httpx.Request, None, None]: + if self.api_key_getter is not None: + token_val = self.api_key_getter() + if token_val: + request.headers['Authorization'] = f'Bearer {token_val}' + if self.obo_token_getter is not None: + obo_val = self.obo_token_getter() + if obo_val: + request.headers['X-S2-OBO'] = obo_val + yield request if t is not None: http_client = httpx.Client( timeout=t, - event_hooks={'request': [inject_auth_headers]}, + auth=OpenAIAuth(api_key_getter_fn, obo_token_getter_fn), ) else: http_client = httpx.Client( timeout=httpx.Timeout(timeout=600, connect=5.0), # default OpenAI timeout - event_hooks={'request': [inject_auth_headers]}, + auth=OpenAIAuth(api_key_getter_fn, obo_token_getter_fn), ) # OpenAI / Azure OpenAI path diff --git a/singlestoredb/ai/debug.py b/singlestoredb/ai/debug.py index 5b9ab648..926742e5 100644 --- a/singlestoredb/ai/debug.py +++ b/singlestoredb/ai/debug.py @@ -1,4 +1,5 @@ import os +from collections.abc import Generator from typing import Any from typing import Callable from typing import Optional @@ -171,26 +172,37 @@ def _inject_headers(request: Any, **_ignored: Any) -> None: **kwargs, ) - def inject_auth_headers(request: httpx.Request) -> None: - """Inject dynamic auth/OBO headers before request is sent.""" - if api_key_getter_fn is not None: - token_val = api_key_getter_fn() - if token_val: - request.headers['Authorization'] = f'Bearer {token_val}' - if obo_token_getter_fn is not None: - obo_val = obo_token_getter_fn() - if obo_val: - request.headers['X-S2-OBO'] = obo_val + class OpenAIAuth(httpx.Auth): + def __init__( + self, + api_key_getter: Optional[Callable[[], Optional[str]]] = None, + obo_token_getter: Optional[Callable[[], Optional[str]]] = None, + ) -> None: + self.api_key_getter = api_key_getter + self.obo_token_getter = obo_token_getter + + def auth_flow( + self, request: httpx.Request, + ) -> Generator[httpx.Request, None, None]: + if self.api_key_getter is not None: + token_val = self.api_key_getter() + if token_val: + request.headers['Authorization'] = f'Bearer {token_val}' + if self.obo_token_getter is not None: + obo_val = self.obo_token_getter() + if obo_val: + request.headers['X-S2-OBO'] = obo_val + yield request if t is not None: http_client = httpx.Client( timeout=t, - event_hooks={'request': [inject_auth_headers]}, + auth=OpenAIAuth(api_key_getter_fn, obo_token_getter_fn), ) else: http_client = httpx.Client( timeout=httpx.Timeout(timeout=600, connect=5.0), # default OpenAI timeout - event_hooks={'request': [inject_auth_headers]}, + auth=OpenAIAuth(api_key_getter_fn, obo_token_getter_fn), ) # OpenAI / Azure OpenAI path From 7b37eae2951760d30e9805ae55c0b3f6c43ffec3 Mon Sep 17 00:00:00 2001 From: mgiannakopoulos Date: Tue, 2 Dec 2025 20:31:12 -0500 Subject: [PATCH 05/16] Introduce debug messages. --- singlestoredb/ai/chat.py | 4 ++++ singlestoredb/ai/debug.py | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/singlestoredb/ai/chat.py b/singlestoredb/ai/chat.py index b24a68a4..21ae09b7 100644 --- a/singlestoredb/ai/chat.py +++ b/singlestoredb/ai/chat.py @@ -184,14 +184,18 @@ def __init__( def auth_flow( self, request: httpx.Request, ) -> Generator[httpx.Request, None, None]: + print(f'[DEBUG] auth_flow called for {request.method} {request.url}') if self.api_key_getter is not None: token_val = self.api_key_getter() + print(f"[DEBUG] api_key_getter: {token_val if token_val else 'None'}...") if token_val: request.headers['Authorization'] = f'Bearer {token_val}' if self.obo_token_getter is not None: obo_val = self.obo_token_getter() + print(f"[DEBUG] obo_token_getter: {obo_val if obo_val else 'None'}...") if obo_val: request.headers['X-S2-OBO'] = obo_val + print(f'[DEBUG] Final headers: {dict(request.headers)}') yield request if t is not None: diff --git a/singlestoredb/ai/debug.py b/singlestoredb/ai/debug.py index 926742e5..96394f1f 100644 --- a/singlestoredb/ai/debug.py +++ b/singlestoredb/ai/debug.py @@ -184,14 +184,18 @@ def __init__( def auth_flow( self, request: httpx.Request, ) -> Generator[httpx.Request, None, None]: + print(f'[DEBUG] auth_flow called for {request.method} {request.url}') if self.api_key_getter is not None: token_val = self.api_key_getter() + print(f"[DEBUG] api_key_getter: {token_val if token_val else 'None'}...") if token_val: request.headers['Authorization'] = f'Bearer {token_val}' if self.obo_token_getter is not None: obo_val = self.obo_token_getter() + print(f"[DEBUG] obo_token_getter: {obo_val if obo_val else 'None'}...") if obo_val: request.headers['X-S2-OBO'] = obo_val + print(f'[DEBUG] Final headers: {dict(request.headers)}') yield request if t is not None: From c6aada9164b5803a2c1bb498cc3a658718f7f715 Mon Sep 17 00:00:00 2001 From: mgiannakopoulos Date: Tue, 2 Dec 2025 20:38:19 -0500 Subject: [PATCH 06/16] Do not pass explicitly the api_key. --- singlestoredb/ai/chat.py | 2 +- singlestoredb/ai/debug.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/singlestoredb/ai/chat.py b/singlestoredb/ai/chat.py index 21ae09b7..ecd71545 100644 --- a/singlestoredb/ai/chat.py +++ b/singlestoredb/ai/chat.py @@ -212,7 +212,7 @@ def auth_flow( # OpenAI / Azure OpenAI path openai_kwargs = dict( base_url=info.connection_url, - api_key='placeholder', + # api_key='placeholder', model=model_name, streaming=streaming, http_client=http_client, diff --git a/singlestoredb/ai/debug.py b/singlestoredb/ai/debug.py index 96394f1f..a0317331 100644 --- a/singlestoredb/ai/debug.py +++ b/singlestoredb/ai/debug.py @@ -212,7 +212,7 @@ def auth_flow( # OpenAI / Azure OpenAI path openai_kwargs = dict( base_url=info.connection_url, - api_key='placeholder', + # api_key='placeholder', model=model_name, streaming=streaming, http_client=http_client, From a8bae856d99457bed412acd3b3e00e32490f50c4 Mon Sep 17 00:00:00 2001 From: mgiannakopoulos Date: Tue, 2 Dec 2025 20:57:37 -0500 Subject: [PATCH 07/16] Rename file and factory method. --- singlestoredb/ai/__init__.py | 2 +- singlestoredb/ai/{debug.py => debugv2.py} | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) rename singlestoredb/ai/{debug.py => debugv2.py} (99%) diff --git a/singlestoredb/ai/__init__.py b/singlestoredb/ai/__init__.py index 773941fe..cf04aeed 100644 --- a/singlestoredb/ai/__init__.py +++ b/singlestoredb/ai/__init__.py @@ -1,3 +1,3 @@ from .chat import SingleStoreChatFactory # noqa: F401 -from .debug import SingleStoreChatFactoryDebug # noqa: F401 +from .debugv2 import SingleStoreChatFactoryDebugV2 # noqa: F401 from .embeddings import SingleStoreEmbeddingsFactory # noqa: F401 diff --git a/singlestoredb/ai/debug.py b/singlestoredb/ai/debugv2.py similarity index 99% rename from singlestoredb/ai/debug.py rename to singlestoredb/ai/debugv2.py index a0317331..239d3a09 100644 --- a/singlestoredb/ai/debug.py +++ b/singlestoredb/ai/debugv2.py @@ -31,7 +31,7 @@ from botocore.config import Config -def SingleStoreChatFactoryDebug( +def SingleStoreChatFactoryDebugV2( model_name: str, api_key: Optional[Union[Optional[str], Callable[[], Optional[str]]]] = None, streaming: bool = True, @@ -212,7 +212,7 @@ def auth_flow( # OpenAI / Azure OpenAI path openai_kwargs = dict( base_url=info.connection_url, - # api_key='placeholder', + api_key='placeholder', model=model_name, streaming=streaming, http_client=http_client, From 82e37ac6fe8af6c3040c34fa05f81b1849af3214 Mon Sep 17 00:00:00 2001 From: mgiannakopoulos Date: Tue, 2 Dec 2025 21:17:18 -0500 Subject: [PATCH 08/16] Create separate class. --- singlestoredb/ai/chat.py | 32 +++----------------------------- singlestoredb/ai/debugv2.py | 32 +++----------------------------- singlestoredb/ai/utils.py | 32 ++++++++++++++++++++++++++++++++ 3 files changed, 38 insertions(+), 58 deletions(-) create mode 100644 singlestoredb/ai/utils.py diff --git a/singlestoredb/ai/chat.py b/singlestoredb/ai/chat.py index ecd71545..87922f30 100644 --- a/singlestoredb/ai/chat.py +++ b/singlestoredb/ai/chat.py @@ -1,11 +1,11 @@ import os -from collections.abc import Generator from typing import Any from typing import Callable from typing import Optional from typing import Union import httpx +from utils import SingleStoreOpenAIAuth from singlestoredb import manage_workspaces from singlestoredb.management.inference_api import InferenceAPIInfo @@ -172,41 +172,15 @@ def _inject_headers(request: Any, **_ignored: Any) -> None: **kwargs, ) - class OpenAIAuth(httpx.Auth): - def __init__( - self, - api_key_getter: Optional[Callable[[], Optional[str]]] = None, - obo_token_getter: Optional[Callable[[], Optional[str]]] = None, - ) -> None: - self.api_key_getter = api_key_getter - self.obo_token_getter = obo_token_getter - - def auth_flow( - self, request: httpx.Request, - ) -> Generator[httpx.Request, None, None]: - print(f'[DEBUG] auth_flow called for {request.method} {request.url}') - if self.api_key_getter is not None: - token_val = self.api_key_getter() - print(f"[DEBUG] api_key_getter: {token_val if token_val else 'None'}...") - if token_val: - request.headers['Authorization'] = f'Bearer {token_val}' - if self.obo_token_getter is not None: - obo_val = self.obo_token_getter() - print(f"[DEBUG] obo_token_getter: {obo_val if obo_val else 'None'}...") - if obo_val: - request.headers['X-S2-OBO'] = obo_val - print(f'[DEBUG] Final headers: {dict(request.headers)}') - yield request - if t is not None: http_client = httpx.Client( timeout=t, - auth=OpenAIAuth(api_key_getter_fn, obo_token_getter_fn), + auth=SingleStoreOpenAIAuth(api_key_getter_fn, obo_token_getter_fn), ) else: http_client = httpx.Client( timeout=httpx.Timeout(timeout=600, connect=5.0), # default OpenAI timeout - auth=OpenAIAuth(api_key_getter_fn, obo_token_getter_fn), + auth=SingleStoreOpenAIAuth(api_key_getter_fn, obo_token_getter_fn), ) # OpenAI / Azure OpenAI path diff --git a/singlestoredb/ai/debugv2.py b/singlestoredb/ai/debugv2.py index 239d3a09..3f22150d 100644 --- a/singlestoredb/ai/debugv2.py +++ b/singlestoredb/ai/debugv2.py @@ -1,11 +1,11 @@ import os -from collections.abc import Generator from typing import Any from typing import Callable from typing import Optional from typing import Union import httpx +from utils import SingleStoreOpenAIAuth from singlestoredb import manage_workspaces from singlestoredb.management.inference_api import InferenceAPIInfo @@ -172,41 +172,15 @@ def _inject_headers(request: Any, **_ignored: Any) -> None: **kwargs, ) - class OpenAIAuth(httpx.Auth): - def __init__( - self, - api_key_getter: Optional[Callable[[], Optional[str]]] = None, - obo_token_getter: Optional[Callable[[], Optional[str]]] = None, - ) -> None: - self.api_key_getter = api_key_getter - self.obo_token_getter = obo_token_getter - - def auth_flow( - self, request: httpx.Request, - ) -> Generator[httpx.Request, None, None]: - print(f'[DEBUG] auth_flow called for {request.method} {request.url}') - if self.api_key_getter is not None: - token_val = self.api_key_getter() - print(f"[DEBUG] api_key_getter: {token_val if token_val else 'None'}...") - if token_val: - request.headers['Authorization'] = f'Bearer {token_val}' - if self.obo_token_getter is not None: - obo_val = self.obo_token_getter() - print(f"[DEBUG] obo_token_getter: {obo_val if obo_val else 'None'}...") - if obo_val: - request.headers['X-S2-OBO'] = obo_val - print(f'[DEBUG] Final headers: {dict(request.headers)}') - yield request - if t is not None: http_client = httpx.Client( timeout=t, - auth=OpenAIAuth(api_key_getter_fn, obo_token_getter_fn), + auth=SingleStoreOpenAIAuth(api_key_getter_fn, obo_token_getter_fn), ) else: http_client = httpx.Client( timeout=httpx.Timeout(timeout=600, connect=5.0), # default OpenAI timeout - auth=OpenAIAuth(api_key_getter_fn, obo_token_getter_fn), + auth=SingleStoreOpenAIAuth(api_key_getter_fn, obo_token_getter_fn), ) # OpenAI / Azure OpenAI path diff --git a/singlestoredb/ai/utils.py b/singlestoredb/ai/utils.py new file mode 100644 index 00000000..41cc0cd3 --- /dev/null +++ b/singlestoredb/ai/utils.py @@ -0,0 +1,32 @@ +from collections.abc import Generator +from typing import Callable +from typing import Optional + +import httpx + + +class SingleStoreOpenAIAuth(httpx.Auth): + def __init__( + self, + api_key_getter: Optional[Callable[[], Optional[str]]] = None, + obo_token_getter: Optional[Callable[[], Optional[str]]] = None, + ) -> None: + self.api_key_getter = api_key_getter + self.obo_token_getter = obo_token_getter + + def auth_flow( + self, request: httpx.Request, + ) -> Generator[httpx.Request, None, None]: + print(f'[DEBUG] auth_flow called for {request.method} {request.url}') + if self.api_key_getter is not None: + token_val = self.api_key_getter() + print(f"[DEBUG] api_key_getter: {token_val if token_val else 'None'}...") + if token_val: + request.headers['Authorization'] = f'Bearer {token_val}' + if self.obo_token_getter is not None: + obo_val = self.obo_token_getter() + print(f"[DEBUG] obo_token_getter: {obo_val if obo_val else 'None'}...") + if obo_val: + request.headers['X-S2-OBO'] = obo_val + print(f'[DEBUG] Final headers: {dict(request.headers)}') + yield request From 977c65330833a56d8918551a558c3ca4db4fc75c Mon Sep 17 00:00:00 2001 From: mgiannakopoulos Date: Tue, 2 Dec 2025 21:18:24 -0500 Subject: [PATCH 09/16] Rename from debugv2 to debugv3. --- singlestoredb/ai/__init__.py | 2 +- singlestoredb/ai/{debugv2.py => debugv3.py} | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) rename singlestoredb/ai/{debugv2.py => debugv3.py} (99%) diff --git a/singlestoredb/ai/__init__.py b/singlestoredb/ai/__init__.py index cf04aeed..0b7fa728 100644 --- a/singlestoredb/ai/__init__.py +++ b/singlestoredb/ai/__init__.py @@ -1,3 +1,3 @@ from .chat import SingleStoreChatFactory # noqa: F401 -from .debugv2 import SingleStoreChatFactoryDebugV2 # noqa: F401 +from .debugv3 import SingleStoreChatFactoryDebugV3 # noqa: F401 from .embeddings import SingleStoreEmbeddingsFactory # noqa: F401 diff --git a/singlestoredb/ai/debugv2.py b/singlestoredb/ai/debugv3.py similarity index 99% rename from singlestoredb/ai/debugv2.py rename to singlestoredb/ai/debugv3.py index 3f22150d..f84b7ec3 100644 --- a/singlestoredb/ai/debugv2.py +++ b/singlestoredb/ai/debugv3.py @@ -31,7 +31,7 @@ from botocore.config import Config -def SingleStoreChatFactoryDebugV2( +def SingleStoreChatFactoryDebugV3( model_name: str, api_key: Optional[Union[Optional[str], Callable[[], Optional[str]]]] = None, streaming: bool = True, From 05768a189dcc65f6ad9af7bee87f08cd683e1a7a Mon Sep 17 00:00:00 2001 From: mgiannakopoulos Date: Tue, 2 Dec 2025 21:23:21 -0500 Subject: [PATCH 10/16] Fix imports. --- singlestoredb/ai/chat.py | 2 +- singlestoredb/ai/debugv3.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/singlestoredb/ai/chat.py b/singlestoredb/ai/chat.py index 87922f30..8d842b27 100644 --- a/singlestoredb/ai/chat.py +++ b/singlestoredb/ai/chat.py @@ -5,9 +5,9 @@ from typing import Union import httpx -from utils import SingleStoreOpenAIAuth from singlestoredb import manage_workspaces +from singlestoredb.ai.utils import SingleStoreOpenAIAuth from singlestoredb.management.inference_api import InferenceAPIInfo try: diff --git a/singlestoredb/ai/debugv3.py b/singlestoredb/ai/debugv3.py index f84b7ec3..cf233e11 100644 --- a/singlestoredb/ai/debugv3.py +++ b/singlestoredb/ai/debugv3.py @@ -5,9 +5,9 @@ from typing import Union import httpx -from utils import SingleStoreOpenAIAuth from singlestoredb import manage_workspaces +from singlestoredb.ai.utils import SingleStoreOpenAIAuth from singlestoredb.management.inference_api import InferenceAPIInfo try: From 0c65ac2ea50bbf5a9af17b7ba866b21a772fc18a Mon Sep 17 00:00:00 2001 From: mgiannakopoulos Date: Tue, 2 Dec 2025 21:42:23 -0500 Subject: [PATCH 11/16] Fix 'auth_flow' call. --- singlestoredb/ai/utils.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/singlestoredb/ai/utils.py b/singlestoredb/ai/utils.py index 41cc0cd3..82520fc9 100644 --- a/singlestoredb/ai/utils.py +++ b/singlestoredb/ai/utils.py @@ -10,13 +10,14 @@ def __init__( self, api_key_getter: Optional[Callable[[], Optional[str]]] = None, obo_token_getter: Optional[Callable[[], Optional[str]]] = None, - ) -> None: + ): self.api_key_getter = api_key_getter self.obo_token_getter = obo_token_getter def auth_flow( - self, request: httpx.Request, - ) -> Generator[httpx.Request, None, None]: + self, + request: httpx.Request, + ) -> Generator[httpx.Request, httpx.Response, None]: print(f'[DEBUG] auth_flow called for {request.method} {request.url}') if self.api_key_getter is not None: token_val = self.api_key_getter() From cf59203508ce16fb058690c5d9bd094885ab7361 Mon Sep 17 00:00:00 2001 From: mgiannakopoulos Date: Tue, 2 Dec 2025 22:03:50 -0500 Subject: [PATCH 12/16] Remove explicit types. --- singlestoredb/ai/chat.py | 2 +- singlestoredb/ai/utils.py | 20 +++++++------------- 2 files changed, 8 insertions(+), 14 deletions(-) diff --git a/singlestoredb/ai/chat.py b/singlestoredb/ai/chat.py index 8d842b27..aae28ab4 100644 --- a/singlestoredb/ai/chat.py +++ b/singlestoredb/ai/chat.py @@ -186,7 +186,7 @@ def _inject_headers(request: Any, **_ignored: Any) -> None: # OpenAI / Azure OpenAI path openai_kwargs = dict( base_url=info.connection_url, - # api_key='placeholder', + api_key='placeholder', model=model_name, streaming=streaming, http_client=http_client, diff --git a/singlestoredb/ai/utils.py b/singlestoredb/ai/utils.py index 82520fc9..505d0a93 100644 --- a/singlestoredb/ai/utils.py +++ b/singlestoredb/ai/utils.py @@ -1,23 +1,17 @@ -from collections.abc import Generator -from typing import Callable -from typing import Optional - import httpx class SingleStoreOpenAIAuth(httpx.Auth): - def __init__( - self, - api_key_getter: Optional[Callable[[], Optional[str]]] = None, - obo_token_getter: Optional[Callable[[], Optional[str]]] = None, - ): + def __init__(self, api_key_getter, obo_token_getter): # type: ignore self.api_key_getter = api_key_getter self.obo_token_getter = obo_token_getter - def auth_flow( - self, - request: httpx.Request, - ) -> Generator[httpx.Request, httpx.Response, None]: + # def auth_flow( + # self, + # request: httpx.Request, + # ) -> Generator[httpx.Request, httpx.Response, None]: + + def auth_flow(self, request): # type: ignore[no-untyped-def] print(f'[DEBUG] auth_flow called for {request.method} {request.url}') if self.api_key_getter is not None: token_val = self.api_key_getter() From 5452bc8de37f1429ac72465772a6505157c30def Mon Sep 17 00:00:00 2001 From: mgiannakopoulos Date: Tue, 2 Dec 2025 23:41:33 -0500 Subject: [PATCH 13/16] Populate OBO token ONLY dynamically. --- singlestoredb/ai/debugv3.py | 8 ++++---- singlestoredb/ai/utils.py | 14 +++++++++----- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/singlestoredb/ai/debugv3.py b/singlestoredb/ai/debugv3.py index cf233e11..8e793170 100644 --- a/singlestoredb/ai/debugv3.py +++ b/singlestoredb/ai/debugv3.py @@ -173,12 +173,12 @@ def _inject_headers(request: Any, **_ignored: Any) -> None: ) if t is not None: - http_client = httpx.Client( + http_client_internal = httpx.Client( timeout=t, auth=SingleStoreOpenAIAuth(api_key_getter_fn, obo_token_getter_fn), ) else: - http_client = httpx.Client( + http_client_internal = httpx.Client( timeout=httpx.Timeout(timeout=600, connect=5.0), # default OpenAI timeout auth=SingleStoreOpenAIAuth(api_key_getter_fn, obo_token_getter_fn), ) @@ -186,10 +186,10 @@ def _inject_headers(request: Any, **_ignored: Any) -> None: # OpenAI / Azure OpenAI path openai_kwargs = dict( base_url=info.connection_url, - api_key='placeholder', + api_key=api_key_getter_fn(), model=model_name, streaming=streaming, - http_client=http_client, + http_client=http_client_internal, ) return ChatOpenAI( **openai_kwargs, diff --git a/singlestoredb/ai/utils.py b/singlestoredb/ai/utils.py index 505d0a93..c3049d90 100644 --- a/singlestoredb/ai/utils.py +++ b/singlestoredb/ai/utils.py @@ -13,15 +13,19 @@ def __init__(self, api_key_getter, obo_token_getter): # type: ignore def auth_flow(self, request): # type: ignore[no-untyped-def] print(f'[DEBUG] auth_flow called for {request.method} {request.url}') - if self.api_key_getter is not None: - token_val = self.api_key_getter() - print(f"[DEBUG] api_key_getter: {token_val if token_val else 'None'}...") - if token_val: - request.headers['Authorization'] = f'Bearer {token_val}' + # if self.api_key_getter is not None: + # token_val = self.api_key_getter() + # print(f"[DEBUG] api_key_getter: {token_val if token_val else 'None'}...") + # if token_val: + # request.headers['Authorization'] = f'Bearer {token_val}' + # else: + # print('[DEBUG] api_key_getter is None') if self.obo_token_getter is not None: obo_val = self.obo_token_getter() print(f"[DEBUG] obo_token_getter: {obo_val if obo_val else 'None'}...") if obo_val: request.headers['X-S2-OBO'] = obo_val + else: + print('[DEBUG] obo_token_getter is None') print(f'[DEBUG] Final headers: {dict(request.headers)}') yield request From 1a717315b923f98962642202b02e95f0dec3826b Mon Sep 17 00:00:00 2001 From: mgiannakopoulos Date: Wed, 3 Dec 2025 00:16:44 -0500 Subject: [PATCH 14/16] Switch to old implementation for OpenAI client. --- singlestoredb/ai/chat.py | 19 +++++-------------- singlestoredb/ai/debugv3.py | 19 +++++-------------- 2 files changed, 10 insertions(+), 28 deletions(-) diff --git a/singlestoredb/ai/chat.py b/singlestoredb/ai/chat.py index aae28ab4..12829a1f 100644 --- a/singlestoredb/ai/chat.py +++ b/singlestoredb/ai/chat.py @@ -7,7 +7,6 @@ import httpx from singlestoredb import manage_workspaces -from singlestoredb.ai.utils import SingleStoreOpenAIAuth from singlestoredb.management.inference_api import InferenceAPIInfo try: @@ -172,25 +171,17 @@ def _inject_headers(request: Any, **_ignored: Any) -> None: **kwargs, ) - if t is not None: - http_client = httpx.Client( - timeout=t, - auth=SingleStoreOpenAIAuth(api_key_getter_fn, obo_token_getter_fn), - ) - else: - http_client = httpx.Client( - timeout=httpx.Timeout(timeout=600, connect=5.0), # default OpenAI timeout - auth=SingleStoreOpenAIAuth(api_key_getter_fn, obo_token_getter_fn), - ) - # OpenAI / Azure OpenAI path + token = api_key_getter_fn() + openai_kwargs = dict( base_url=info.connection_url, - api_key='placeholder', + api_key=token, model=model_name, streaming=streaming, - http_client=http_client, ) + if http_client is not None: + openai_kwargs['http_client'] = http_client return ChatOpenAI( **openai_kwargs, **kwargs, diff --git a/singlestoredb/ai/debugv3.py b/singlestoredb/ai/debugv3.py index 8e793170..6b84821f 100644 --- a/singlestoredb/ai/debugv3.py +++ b/singlestoredb/ai/debugv3.py @@ -7,7 +7,6 @@ import httpx from singlestoredb import manage_workspaces -from singlestoredb.ai.utils import SingleStoreOpenAIAuth from singlestoredb.management.inference_api import InferenceAPIInfo try: @@ -172,25 +171,17 @@ def _inject_headers(request: Any, **_ignored: Any) -> None: **kwargs, ) - if t is not None: - http_client_internal = httpx.Client( - timeout=t, - auth=SingleStoreOpenAIAuth(api_key_getter_fn, obo_token_getter_fn), - ) - else: - http_client_internal = httpx.Client( - timeout=httpx.Timeout(timeout=600, connect=5.0), # default OpenAI timeout - auth=SingleStoreOpenAIAuth(api_key_getter_fn, obo_token_getter_fn), - ) - # OpenAI / Azure OpenAI path + token = api_key_getter_fn() + openai_kwargs = dict( base_url=info.connection_url, - api_key=api_key_getter_fn(), + api_key=token, model=model_name, streaming=streaming, - http_client=http_client_internal, ) + if http_client is not None: + openai_kwargs['http_client'] = http_client return ChatOpenAI( **openai_kwargs, **kwargs, From 1e74c1867db157ff0f242815b436cc9a77407b6c Mon Sep 17 00:00:00 2001 From: mgiannakopoulos Date: Wed, 3 Dec 2025 00:44:20 -0500 Subject: [PATCH 15/16] Remove 'timeout', go to the previous way of initializing the OpenAI clients; 'http_client' should be configured outside of the factory method. --- singlestoredb/ai/__init__.py | 1 - singlestoredb/ai/chat.py | 5 - singlestoredb/ai/debugv3.py | 188 --------------------------------- singlestoredb/ai/embeddings.py | 38 +------ singlestoredb/ai/utils.py | 31 ------ 5 files changed, 5 insertions(+), 258 deletions(-) delete mode 100644 singlestoredb/ai/debugv3.py delete mode 100644 singlestoredb/ai/utils.py diff --git a/singlestoredb/ai/__init__.py b/singlestoredb/ai/__init__.py index 0b7fa728..3cc526d6 100644 --- a/singlestoredb/ai/__init__.py +++ b/singlestoredb/ai/__init__.py @@ -1,3 +1,2 @@ from .chat import SingleStoreChatFactory # noqa: F401 -from .debugv3 import SingleStoreChatFactoryDebugV3 # noqa: F401 from .embeddings import SingleStoreEmbeddingsFactory # noqa: F401 diff --git a/singlestoredb/ai/chat.py b/singlestoredb/ai/chat.py index 12829a1f..68cdc7b9 100644 --- a/singlestoredb/ai/chat.py +++ b/singlestoredb/ai/chat.py @@ -39,7 +39,6 @@ def SingleStoreChatFactory( obo_token_getter: Optional[Callable[[], Optional[str]]] = None, base_url: Optional[str] = None, hosting_platform: Optional[str] = None, - timeout: Optional[float] = None, **kwargs: Any, ) -> Union[ChatOpenAI, ChatBedrockConverse]: """Return a chat model instance (ChatOpenAI or ChatBedrockConverse). @@ -103,10 +102,6 @@ def obo_token_getter_fn() -> Optional[str]: elif isinstance(t, (int, float)): connect_timeout = float(t) read_timeout = float(t) - if timeout is not None: - connect_timeout = timeout - read_timeout = timeout - t = httpx.Timeout(timeout) if info.hosting_platform == 'Amazon': # Instantiate Bedrock client diff --git a/singlestoredb/ai/debugv3.py b/singlestoredb/ai/debugv3.py deleted file mode 100644 index 6b84821f..00000000 --- a/singlestoredb/ai/debugv3.py +++ /dev/null @@ -1,188 +0,0 @@ -import os -from typing import Any -from typing import Callable -from typing import Optional -from typing import Union - -import httpx - -from singlestoredb import manage_workspaces -from singlestoredb.management.inference_api import InferenceAPIInfo - -try: - from langchain_openai import ChatOpenAI -except ImportError: - raise ImportError( - 'Could not import langchain_openai python package. ' - 'Please install it with `pip install langchain_openai`.', - ) - -try: - from langchain_aws import ChatBedrockConverse -except ImportError: - raise ImportError( - 'Could not import langchain-aws python package. ' - 'Please install it with `pip install langchain-aws`.', - ) - -import boto3 -from botocore import UNSIGNED -from botocore.config import Config - - -def SingleStoreChatFactoryDebugV3( - model_name: str, - api_key: Optional[Union[Optional[str], Callable[[], Optional[str]]]] = None, - streaming: bool = True, - http_client: Optional[httpx.Client] = None, - obo_token: Optional[Union[Optional[str], Callable[[], Optional[str]]]] = None, - obo_token_getter: Optional[Callable[[], Optional[str]]] = None, - base_url: Optional[str] = None, - hosting_platform: Optional[str] = None, - timeout: Optional[float] = None, - **kwargs: Any, -) -> Union[ChatOpenAI, ChatBedrockConverse]: - """Return a chat model instance (ChatOpenAI or ChatBedrockConverse). - """ - # Handle api_key and obo_token as callable functions - if callable(api_key): - api_key_getter_fn = api_key - else: - def api_key_getter_fn() -> Optional[str]: - if api_key is None: - return os.environ.get('SINGLESTOREDB_USER_TOKEN') - return api_key - - if obo_token_getter is not None: - obo_token_getter_fn = obo_token_getter - else: - if callable(obo_token): - obo_token_getter_fn = obo_token - else: - def obo_token_getter_fn() -> Optional[str]: - return obo_token - - # handle model info - if base_url is None: - base_url = os.environ.get('SINGLESTOREDB_INFERENCE_API_BASE_URL') - if hosting_platform is None: - hosting_platform = os.environ.get('SINGLESTOREDB_INFERENCE_API_HOSTING_PLATFORM') - if base_url is None or hosting_platform is None: - inference_api_manager = ( - manage_workspaces().organizations.current.inference_apis - ) - info = inference_api_manager.get(model_name=model_name) - else: - info = InferenceAPIInfo( - service_id='', - model_name=model_name, - name='', - connection_url=base_url, - project_id='', - hosting_platform=hosting_platform, - ) - if base_url is not None: - info.connection_url = base_url - if hosting_platform is not None: - info.hosting_platform = hosting_platform - - # Extract timeouts from http_client if provided - t = http_client.timeout if http_client is not None else None - connect_timeout = None - read_timeout = None - if t is not None: - if isinstance(t, httpx.Timeout): - if t.connect is not None: - connect_timeout = float(t.connect) - if t.read is not None: - read_timeout = float(t.read) - if connect_timeout is None and read_timeout is not None: - connect_timeout = read_timeout - if read_timeout is None and connect_timeout is not None: - read_timeout = connect_timeout - elif isinstance(t, (int, float)): - connect_timeout = float(t) - read_timeout = float(t) - if timeout is not None: - connect_timeout = timeout - read_timeout = timeout - t = httpx.Timeout(timeout) - - if info.hosting_platform == 'Amazon': - # Instantiate Bedrock client - cfg_kwargs = { - 'signature_version': UNSIGNED, - 'retries': {'max_attempts': 1, 'mode': 'standard'}, - } - if read_timeout is not None: - cfg_kwargs['read_timeout'] = read_timeout - if connect_timeout is not None: - cfg_kwargs['connect_timeout'] = connect_timeout - - cfg = Config(**cfg_kwargs) - client = boto3.client( - 'bedrock-runtime', - endpoint_url=info.connection_url, - region_name='us-east-1', - aws_access_key_id='placeholder', - aws_secret_access_key='placeholder', - config=cfg, - ) - - def _inject_headers(request: Any, **_ignored: Any) -> None: - """Inject dynamic auth/OBO headers prior to Bedrock sending.""" - if api_key_getter_fn is not None: - token_val = api_key_getter_fn() - if token_val: - request.headers['Authorization'] = f'Bearer {token_val}' - if obo_token_getter_fn is not None: - obo_val = obo_token_getter_fn() - if obo_val: - request.headers['X-S2-OBO'] = obo_val - request.headers.pop('X-Amz-Date', None) - request.headers.pop('X-Amz-Security-Token', None) - - emitter = client._endpoint._event_emitter - emitter.register_first( - 'before-send.bedrock-runtime.Converse', - _inject_headers, - ) - emitter.register_first( - 'before-send.bedrock-runtime.ConverseStream', - _inject_headers, - ) - emitter.register_first( - 'before-send.bedrock-runtime.InvokeModel', - _inject_headers, - ) - emitter.register_first( - 'before-send.bedrock-runtime.InvokeModelWithResponseStream', - _inject_headers, - ) - - return ChatBedrockConverse( - model_id=model_name, - endpoint_url=info.connection_url, - region_name='us-east-1', - aws_access_key_id='placeholder', - aws_secret_access_key='placeholder', - disable_streaming=not streaming, - client=client, - **kwargs, - ) - - # OpenAI / Azure OpenAI path - token = api_key_getter_fn() - - openai_kwargs = dict( - base_url=info.connection_url, - api_key=token, - model=model_name, - streaming=streaming, - ) - if http_client is not None: - openai_kwargs['http_client'] = http_client - return ChatOpenAI( - **openai_kwargs, - **kwargs, - ) diff --git a/singlestoredb/ai/embeddings.py b/singlestoredb/ai/embeddings.py index e85f26a7..c5f9ed09 100644 --- a/singlestoredb/ai/embeddings.py +++ b/singlestoredb/ai/embeddings.py @@ -1,5 +1,4 @@ import os -from collections.abc import Generator from typing import Any from typing import Callable from typing import Optional @@ -39,7 +38,6 @@ def SingleStoreEmbeddingsFactory( obo_token_getter: Optional[Callable[[], Optional[str]]] = None, base_url: Optional[str] = None, hosting_platform: Optional[str] = None, - timeout: Optional[float] = None, **kwargs: Any, ) -> Union[OpenAIEmbeddings, BedrockEmbeddings]: """Return an embeddings model instance (OpenAIEmbeddings or BedrockEmbeddings). @@ -103,10 +101,6 @@ def obo_token_getter_fn() -> Optional[str]: elif isinstance(t, (int, float)): connect_timeout = float(t) read_timeout = float(t) - if timeout is not None: - connect_timeout = timeout - read_timeout = timeout - t = httpx.Timeout(timeout) if info.hosting_platform == 'Amazon': # Instantiate Bedrock client @@ -162,38 +156,16 @@ def _inject_headers(request: Any, **_ignored: Any) -> None: **kwargs, ) - class OpenAIAuth(httpx.Auth): - def auth_flow( - self, request: httpx.Request, - ) -> Generator[httpx.Request, None, None]: - if api_key_getter_fn is not None: - token_val = api_key_getter_fn() - if token_val: - request.headers['Authorization'] = f'Bearer {token_val}' - if obo_token_getter_fn is not None: - obo_val = obo_token_getter_fn() - if obo_val: - request.headers['X-S2-OBO'] = obo_val - yield request - - if t is not None: - http_client = httpx.Client( - timeout=t, - auth=OpenAIAuth(), - ) - else: - http_client = httpx.Client( - timeout=httpx.Timeout(timeout=600, connect=5.0), # default OpenAI timeout - auth=OpenAIAuth(), - ) - # OpenAI / Azure OpenAI path + token = api_key_getter_fn() + openai_kwargs = dict( base_url=info.connection_url, - api_key='placeholder', + api_key=token, model=model_name, ) - openai_kwargs['http_client'] = http_client + if http_client is not None: + openai_kwargs['http_client'] = http_client return OpenAIEmbeddings( **openai_kwargs, **kwargs, diff --git a/singlestoredb/ai/utils.py b/singlestoredb/ai/utils.py deleted file mode 100644 index c3049d90..00000000 --- a/singlestoredb/ai/utils.py +++ /dev/null @@ -1,31 +0,0 @@ -import httpx - - -class SingleStoreOpenAIAuth(httpx.Auth): - def __init__(self, api_key_getter, obo_token_getter): # type: ignore - self.api_key_getter = api_key_getter - self.obo_token_getter = obo_token_getter - - # def auth_flow( - # self, - # request: httpx.Request, - # ) -> Generator[httpx.Request, httpx.Response, None]: - - def auth_flow(self, request): # type: ignore[no-untyped-def] - print(f'[DEBUG] auth_flow called for {request.method} {request.url}') - # if self.api_key_getter is not None: - # token_val = self.api_key_getter() - # print(f"[DEBUG] api_key_getter: {token_val if token_val else 'None'}...") - # if token_val: - # request.headers['Authorization'] = f'Bearer {token_val}' - # else: - # print('[DEBUG] api_key_getter is None') - if self.obo_token_getter is not None: - obo_val = self.obo_token_getter() - print(f"[DEBUG] obo_token_getter: {obo_val if obo_val else 'None'}...") - if obo_val: - request.headers['X-S2-OBO'] = obo_val - else: - print('[DEBUG] obo_token_getter is None') - print(f'[DEBUG] Final headers: {dict(request.headers)}') - yield request From be07fd106f471583864dd89328c29f069db87ad7 Mon Sep 17 00:00:00 2001 From: mgiannakopoulos Date: Wed, 3 Dec 2025 12:07:33 -0500 Subject: [PATCH 16/16] Final fixes. --- singlestoredb/ai/chat.py | 36 +++++++++------------------------- singlestoredb/ai/embeddings.py | 36 +++++++++------------------------- 2 files changed, 18 insertions(+), 54 deletions(-) diff --git a/singlestoredb/ai/chat.py b/singlestoredb/ai/chat.py index 68cdc7b9..4878c96c 100644 --- a/singlestoredb/ai/chat.py +++ b/singlestoredb/ai/chat.py @@ -32,10 +32,9 @@ def SingleStoreChatFactory( model_name: str, - api_key: Optional[Union[Optional[str], Callable[[], Optional[str]]]] = None, + api_key: Optional[str] = None, streaming: bool = True, http_client: Optional[httpx.Client] = None, - obo_token: Optional[Union[Optional[str], Callable[[], Optional[str]]]] = None, obo_token_getter: Optional[Callable[[], Optional[str]]] = None, base_url: Optional[str] = None, hosting_platform: Optional[str] = None, @@ -43,24 +42,6 @@ def SingleStoreChatFactory( ) -> Union[ChatOpenAI, ChatBedrockConverse]: """Return a chat model instance (ChatOpenAI or ChatBedrockConverse). """ - # Handle api_key and obo_token as callable functions - if callable(api_key): - api_key_getter_fn = api_key - else: - def api_key_getter_fn() -> Optional[str]: - if api_key is None: - return os.environ.get('SINGLESTOREDB_USER_TOKEN') - return api_key - - if obo_token_getter is not None: - obo_token_getter_fn = obo_token_getter - else: - if callable(obo_token): - obo_token_getter_fn = obo_token - else: - def obo_token_getter_fn() -> Optional[str]: - return obo_token - # handle model info if base_url is None: base_url = os.environ.get('SINGLESTOREDB_INFERENCE_API_BASE_URL') @@ -126,12 +107,12 @@ def obo_token_getter_fn() -> Optional[str]: def _inject_headers(request: Any, **_ignored: Any) -> None: """Inject dynamic auth/OBO headers prior to Bedrock sending.""" - if api_key_getter_fn is not None: - token_val = api_key_getter_fn() - if token_val: - request.headers['Authorization'] = f'Bearer {token_val}' - if obo_token_getter_fn is not None: - obo_val = obo_token_getter_fn() + token_env_val = os.environ.get('SINGLESTOREDB_USER_TOKEN') + token_val = api_key if api_key is not None else token_env_val + if token_val: + request.headers['Authorization'] = f'Bearer {token_val}' + if obo_token_getter is not None: + obo_val = obo_token_getter() if obo_val: request.headers['X-S2-OBO'] = obo_val request.headers.pop('X-Amz-Date', None) @@ -167,7 +148,8 @@ def _inject_headers(request: Any, **_ignored: Any) -> None: ) # OpenAI / Azure OpenAI path - token = api_key_getter_fn() + token_env = os.environ.get('SINGLESTOREDB_USER_TOKEN') + token = api_key if api_key is not None else token_env openai_kwargs = dict( base_url=info.connection_url, diff --git a/singlestoredb/ai/embeddings.py b/singlestoredb/ai/embeddings.py index c5f9ed09..fe23331c 100644 --- a/singlestoredb/ai/embeddings.py +++ b/singlestoredb/ai/embeddings.py @@ -32,9 +32,8 @@ def SingleStoreEmbeddingsFactory( model_name: str, - api_key: Optional[Union[Optional[str], Callable[[], Optional[str]]]] = None, + api_key: Optional[str] = None, http_client: Optional[httpx.Client] = None, - obo_token: Optional[Union[Optional[str], Callable[[], Optional[str]]]] = None, obo_token_getter: Optional[Callable[[], Optional[str]]] = None, base_url: Optional[str] = None, hosting_platform: Optional[str] = None, @@ -42,24 +41,6 @@ def SingleStoreEmbeddingsFactory( ) -> Union[OpenAIEmbeddings, BedrockEmbeddings]: """Return an embeddings model instance (OpenAIEmbeddings or BedrockEmbeddings). """ - # Handle api_key and obo_token as callable functions - if callable(api_key): - api_key_getter_fn = api_key - else: - def api_key_getter_fn() -> Optional[str]: - if api_key is None: - return os.environ.get('SINGLESTOREDB_USER_TOKEN') - return api_key - - if obo_token_getter is not None: - obo_token_getter_fn = obo_token_getter - else: - if callable(obo_token): - obo_token_getter_fn = obo_token - else: - def obo_token_getter_fn() -> Optional[str]: - return obo_token - # handle model info if base_url is None: base_url = os.environ.get('SINGLESTOREDB_INFERENCE_API_BASE_URL') @@ -125,12 +106,12 @@ def obo_token_getter_fn() -> Optional[str]: def _inject_headers(request: Any, **_ignored: Any) -> None: """Inject dynamic auth/OBO headers prior to Bedrock sending.""" - if api_key_getter_fn is not None: - token_val = api_key_getter_fn() - if token_val: - request.headers['Authorization'] = f'Bearer {token_val}' - if obo_token_getter_fn is not None: - obo_val = obo_token_getter_fn() + token_env_val = os.environ.get('SINGLESTOREDB_USER_TOKEN') + token_val = api_key if api_key is not None else token_env_val + if token_val: + request.headers['Authorization'] = f'Bearer {token_val}' + if obo_token_getter is not None: + obo_val = obo_token_getter() if obo_val: request.headers['X-S2-OBO'] = obo_val request.headers.pop('X-Amz-Date', None) @@ -157,7 +138,8 @@ def _inject_headers(request: Any, **_ignored: Any) -> None: ) # OpenAI / Azure OpenAI path - token = api_key_getter_fn() + token_env = os.environ.get('SINGLESTOREDB_USER_TOKEN') + token = api_key if api_key is not None else token_env openai_kwargs = dict( base_url=info.connection_url,