From a9b661157d1a7f8a469f90566545ad8386547f70 Mon Sep 17 00:00:00 2001 From: mgiannakopoulos Date: Wed, 17 Sep 2025 05:56:41 -0400 Subject: [PATCH 01/21] feat: Introduce SingleStoreChat wrapper that uses interchangeably OpenAI or AmazonBedrockConverse protocol. --- singlestoredb/ai/chat.py | 302 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 297 insertions(+), 5 deletions(-) diff --git a/singlestoredb/ai/chat.py b/singlestoredb/ai/chat.py index c637d390d..e5dd5c112 100644 --- a/singlestoredb/ai/chat.py +++ b/singlestoredb/ai/chat.py @@ -1,5 +1,10 @@ import os +import uuid from typing import Any +from typing import AsyncIterator +from typing import Optional + +import httpx from singlestoredb.fusion.handlers.utils import get_workspace_manager @@ -8,33 +13,320 @@ except ImportError: raise ImportError( 'Could not import langchain_openai python package. ' - 'Please install it with `pip install langchain_openai`.', + 'Please install it with `pip install langchain-openai`.', + ) + +try: + from langchain_aws import ChatBedrockConverse +except ImportError: + raise ImportError( + 'Could not import langchain-aws python package. ' + 'Please install it with `pip install langchain-aws`.', ) class SingleStoreChatOpenAI(ChatOpenAI): - def __init__(self, model_name: str, **kwargs: Any): + def __init__(self, model_name: str, api_key: Optional[str] = None, **kwargs: Any): inference_api_manger = ( get_workspace_manager().organizations.current.inference_apis ) info = inference_api_manger.get(model_name=model_name) + token = ( + api_key + if api_key is not None + else os.environ.get('SINGLESTOREDB_USER_TOKEN') + ) super().__init__( base_url=info.connection_url, - api_key=os.environ.get('SINGLESTOREDB_USER_TOKEN'), + api_key=token, model=model_name, **kwargs, ) class SingleStoreChat(ChatOpenAI): - def __init__(self, model_name: str, **kwargs: Any): + def __init__(self, model_name: str, api_key: Optional[str] = None, **kwargs: Any): inference_api_manger = ( get_workspace_manager().organizations.current.inference_apis ) info = inference_api_manger.get(model_name=model_name) + token = ( + api_key + if api_key is not None + else os.environ.get('SINGLESTOREDB_USER_TOKEN') + ) super().__init__( base_url=info.connection_url, - api_key=os.environ.get('SINGLESTOREDB_USER_TOKEN'), + api_key=token, model=model_name, **kwargs, ) + + +class SingleStoreExperimentalChat: + """Experimental unified chat interface (prefix-based two-part identifier). + + Input model name MUST (for dynamic selection) be of the form: + . + where is one of: + * ``aura`` -> OpenAI style (ChatOpenAI backend) + * ``aura-azr`` -> Azure OpenAI style (still ChatOpenAI backend) + * ``aura-amz`` -> Amazon Bedrock (ChatBedrockConverse backend) + + If no delimiter (".", ":" or "/") is present, or prefix is unrecognized, + the entire string is treated as an OpenAI-style model (ChatOpenAI). + + Only the prefix ``aura-amz`` triggers Bedrock usage; in that case the + *second* component (after the first delimiter) is passed as the model + name to the Bedrock client. For other prefixes the second component is + passed to ChatOpenAI with the SingleStore Fusion-provided base_url. + + This class uses composition and delegates attribute access to the chosen + backend client for near drop-in behavior. + """ + + _VALID_PREFIXES = {'aura', 'aura-azr', 'aura-amz'} + + def __init__( + self, + model_name: str, + http_client: Optional[httpx.Client] = None, + api_key: Optional[str] = None, + **kwargs: Any, + ) -> None: + prefix, actual_model = self._parse_identifier(model_name) + + inference_api_manager = ( + get_workspace_manager().organizations.current.inference_apis + ) + # Use the raw identifier for Fusion lookup (keeps gateway mapping + # logic server-side). + info = inference_api_manager.get(model_name=actual_model) + if prefix == 'aura-amz': + backend_type = 'bedrock' + elif prefix == 'aura-azr': + backend_type = 'azure-openai' + else: + backend_type = 'openai' + + # Extract headers from provided http_client (if any) for possible reuse. + provided_headers: dict[str, str] = {} + if http_client is not None and hasattr(http_client, 'headers'): + try: + provided_headers = dict(http_client.headers) # make a copy + except Exception: + provided_headers = {} + + if backend_type == 'bedrock': + self._removed_aws_env: dict[str, str] = {} + for _v in ( + 'AWS_ACCESS_KEY_ID', + 'AWS_SECRET_ACCESS_KEY', + 'AWS_SESSION_TOKEN', + ): + if _v in os.environ: + self._removed_aws_env[_v] = os.environ.pop(_v) + + token_env = os.environ.get('SINGLESTOREDB_USER_TOKEN') + token = api_key if api_key is not None else token_env + # Generate a per-instance client ID for tracing Bedrock calls. + self._client_id = str(uuid.uuid4()) + self._client = ChatBedrockConverse( + base_url=info.connection_url, + model=actual_model, + **kwargs, + ) + + # Attempt to inject Authorization header for downstream HTTP layers. + # Not all implementations expose a direct header map; we add a + # lightweight wrapper if needed. + self._auth_header = None + merged_headers: dict[str, str] = {} + if provided_headers: + merged_headers.update({k: v for k, v in provided_headers.items()}) + if token: + merged_headers.setdefault('Authorization', f'Bearer {token}') + # Always include X-ClientID for Bedrock path + merged_headers.setdefault('X-ClientID', self._client_id) + if merged_headers: + # Try to set directly if backend exposes default_headers + if ( + hasattr(self._client, 'default_headers') + and isinstance( + getattr(self._client, 'default_headers'), + dict, + ) + ): + getattr(self._client, 'default_headers').update( + { + k: v + for k, v in merged_headers.items() + if k + not in getattr( + self._client, 'default_headers', + ) + }, + ) + else: + self._auth_header = merged_headers # fallback for invoke/stream + else: + # Pass through http_client if ChatOpenAI supports it; if not, + # include in kwargs only when present. + token_env = os.environ.get('SINGLESTOREDB_USER_TOKEN') + token = api_key if api_key is not None else token_env + openai_kwargs = dict( + base_url=info.connection_url, + api_key=token, + model=actual_model, + ) + if http_client is not None: + # Some versions accept 'http_client' parameter for custom transport. + openai_kwargs['http_client'] = http_client + self._client = ChatOpenAI( + **openai_kwargs, + **kwargs, + ) + + self._backend_type = backend_type + self.model_name = model_name # external identifier provided by caller + self.actual_model = actual_model # model portion after prefix + self.prefix = prefix # normalized prefix + self.connection_url = info.connection_url + + @classmethod + def _parse_identifier(cls, identifier: str) -> tuple[str, str]: + for sep in ('.', ':', '/'): + if sep in identifier: + head, tail = identifier.split(sep, 1) + prefix = head.strip().lower() + model = tail.strip() + if prefix in cls._VALID_PREFIXES: + return prefix, model + return 'aura', identifier.strip() # treat whole string as model + return 'aura', identifier.strip() + + # --------------------------------------------------------------------- + # Delegation layer + # --------------------------------------------------------------------- + def __getattr__(self, item: str) -> Any: + return getattr(self._client, item) + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + def _maybe_inject_headers(self, kwargs: dict[str, Any]) -> None: + """Inject Bedrock auth headers into kwargs if we only have a fallback. + + If the Bedrock client accepted headers via its own internal + `default_headers` we don't need to do anything here. When we had + to stash headers into `_auth_header` we add them for each outbound + call that allows a `headers` kwarg and has not already provided + its own. + """ + if ( + self._backend_type == 'bedrock' + and hasattr(self, '_auth_header') + and getattr(self, '_auth_header') + and 'headers' not in kwargs + ): + kwargs['headers'] = getattr( + self, + '_auth_header', + ) + + def as_base(self) -> Any: + """Return the underlying backend client instance. + + This gives callers direct access to provider specific methods or + configuration that aren't surfaced by the experimental wrapper. + """ + return self._client + + def invoke(self, *args: Any, **kwargs: Any) -> Any: + self._maybe_inject_headers(kwargs) + return self._client.invoke(*args, **kwargs) + + async def ainvoke(self, *args: Any, **kwargs: Any) -> Any: + self._maybe_inject_headers(kwargs) + return await self._client.ainvoke(*args, **kwargs) + + def stream(self, *args: Any, **kwargs: Any) -> Any: + self._maybe_inject_headers(kwargs) + return self._client.stream(*args, **kwargs) + + async def astream( + self, + *args: Any, + **kwargs: Any, + ) -> AsyncIterator[Any]: + self._maybe_inject_headers(kwargs) + async for chunk in self._client.astream(*args, **kwargs): + yield chunk + + # ------------------------------------------------------------------ + # Extended delegation for additional common chat model surface area. + # Each method simply injects headers (if needed) then forwards. + # ------------------------------------------------------------------ + def generate(self, *args: Any, **kwargs: Any) -> Any: + self._maybe_inject_headers(kwargs) + return self._client.generate(*args, **kwargs) + + async def agenerate(self, *args: Any, **kwargs: Any) -> Any: + self._maybe_inject_headers(kwargs) + return await self._client.agenerate(*args, **kwargs) + + def predict(self, *args: Any, **kwargs: Any) -> Any: + self._maybe_inject_headers(kwargs) + return self._client.predict(*args, **kwargs) + + async def apredict( + self, + *args: Any, + **kwargs: Any, + ) -> Any: + self._maybe_inject_headers(kwargs) + return await self._client.apredict(*args, **kwargs) + + def predict_messages( + self, + *args: Any, + **kwargs: Any, + ) -> Any: + self._maybe_inject_headers(kwargs) + return self._client.predict_messages(*args, **kwargs) + + async def apredict_messages( + self, + *args: Any, + **kwargs: Any, + ) -> Any: + self._maybe_inject_headers(kwargs) + return await self._client.apredict_messages(*args, **kwargs) + + def batch(self, *args: Any, **kwargs: Any) -> Any: + self._maybe_inject_headers(kwargs) + return self._client.batch(*args, **kwargs) + + async def abatch(self, *args: Any, **kwargs: Any) -> Any: + self._maybe_inject_headers(kwargs) + return await self._client.abatch(*args, **kwargs) + + def apply(self, *args: Any, **kwargs: Any) -> Any: + self._maybe_inject_headers(kwargs) + return self._client.apply(*args, **kwargs) + + async def aapply( + self, + *args: Any, + **kwargs: Any, + ) -> Any: + self._maybe_inject_headers(kwargs) + return await self._client.aapply(*args, **kwargs) + + def __repr__(self) -> str: + return ( + 'SingleStoreExperimentalChat(' + f'identifier={self.model_name!r}, ' + f'actual_model={self.actual_model!r}, ' + f'prefix={self.prefix}, backend={self._backend_type})' + ) From 0e69c154c7db3443085705d634b1abf1a0e0075e Mon Sep 17 00:00:00 2001 From: mgiannakopoulos Date: Wed, 17 Sep 2025 07:36:59 -0400 Subject: [PATCH 02/21] Provide option for getting 'X-S2-OBO' token for every request. --- singlestoredb/ai/chat.py | 39 ++++++++++++++++++++++++++++++++------- 1 file changed, 32 insertions(+), 7 deletions(-) diff --git a/singlestoredb/ai/chat.py b/singlestoredb/ai/chat.py index e5dd5c112..64d039223 100644 --- a/singlestoredb/ai/chat.py +++ b/singlestoredb/ai/chat.py @@ -2,6 +2,7 @@ import uuid from typing import Any from typing import AsyncIterator +from typing import Callable from typing import Optional import httpx @@ -92,6 +93,7 @@ def __init__( model_name: str, http_client: Optional[httpx.Client] = None, api_key: Optional[str] = None, + obo_token_getter: Optional[Callable[[], Optional[str]]] = None, **kwargs: Any, ) -> None: prefix, actual_model = self._parse_identifier(model_name) @@ -192,6 +194,10 @@ def __init__( self.actual_model = actual_model # model portion after prefix self.prefix = prefix # normalized prefix self.connection_url = info.connection_url + # Optional callable returning a fresh OBO token each request (Bedrock only). + # If supplied, a new token will be fetched and injected into the + # 'X-S2-OBO' header for every Bedrock request made via this wrapper. + self._obo_token_getter = obo_token_getter @classmethod def _parse_identifier(cls, identifier: str) -> tuple[str, str]: @@ -223,16 +229,35 @@ def _maybe_inject_headers(self, kwargs: dict[str, Any]) -> None: call that allows a `headers` kwarg and has not already provided its own. """ - if ( - self._backend_type == 'bedrock' - and hasattr(self, '_auth_header') + if self._backend_type != 'bedrock': + return + + # Start from existing headers in the call. + # Copy to avoid mutating caller-provided dict in-place. + call_headers: dict[str, str] = {} + if 'headers' in kwargs and isinstance(kwargs['headers'], dict): + call_headers = dict(kwargs['headers']) + elif ( + hasattr(self, '_auth_header') and getattr(self, '_auth_header') and 'headers' not in kwargs ): - kwargs['headers'] = getattr( - self, - '_auth_header', - ) + # Use fallback auth header if user did not pass any. + call_headers = dict(getattr(self, '_auth_header')) + + # Dynamic OBO token injection (always fresh per request if getter provided) + getter = getattr(self, '_obo_token_getter', None) + if getter is not None: + try: + obo_token = getter() + except Exception: + obo_token = None + if obo_token: + # Overwrite any stale value. + call_headers['X-S2-OBO'] = obo_token + + if call_headers: + kwargs['headers'] = call_headers def as_base(self) -> Any: """Return the underlying backend client instance. From 6e96e96f65c8f3d5434f1ecbf09992a01fa0e10c Mon Sep 17 00:00:00 2001 From: mgiannakopoulos Date: Wed, 17 Sep 2025 12:40:54 -0400 Subject: [PATCH 03/21] Provide headers that indicate passthrough Amazon bedrock requests. --- singlestoredb/ai/chat.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/singlestoredb/ai/chat.py b/singlestoredb/ai/chat.py index 64d039223..8b3d7d6db 100644 --- a/singlestoredb/ai/chat.py +++ b/singlestoredb/ai/chat.py @@ -1,5 +1,4 @@ import os -import uuid from typing import Any from typing import AsyncIterator from typing import Callable @@ -94,6 +93,7 @@ def __init__( http_client: Optional[httpx.Client] = None, api_key: Optional[str] = None, obo_token_getter: Optional[Callable[[], Optional[str]]] = None, + streaming: bool = False, **kwargs: Any, ) -> None: prefix, actual_model = self._parse_identifier(model_name) @@ -131,11 +131,10 @@ def __init__( token_env = os.environ.get('SINGLESTOREDB_USER_TOKEN') token = api_key if api_key is not None else token_env - # Generate a per-instance client ID for tracing Bedrock calls. - self._client_id = str(uuid.uuid4()) self._client = ChatBedrockConverse( base_url=info.connection_url, model=actual_model, + streaming=streaming, **kwargs, ) @@ -148,8 +147,11 @@ def __init__( merged_headers.update({k: v for k, v in provided_headers.items()}) if token: merged_headers.setdefault('Authorization', f'Bearer {token}') - # Always include X-ClientID for Bedrock path - merged_headers.setdefault('X-ClientID', self._client_id) + # Add Bedrock converse headers based on streaming flag + if streaming: + merged_headers.setdefault('X-BEDROCK-CONVERSE-STREAMING', 'true') + else: + merged_headers.setdefault('X-BEDROCK-CONVERSE', 'true') if merged_headers: # Try to set directly if backend exposes default_headers if ( @@ -180,6 +182,7 @@ def __init__( base_url=info.connection_url, api_key=token, model=actual_model, + streaming=streaming, ) if http_client is not None: # Some versions accept 'http_client' parameter for custom transport. @@ -198,6 +201,7 @@ def __init__( # If supplied, a new token will be fetched and injected into the # 'X-S2-OBO' header for every Bedrock request made via this wrapper. self._obo_token_getter = obo_token_getter + self._streaming = streaming @classmethod def _parse_identifier(cls, identifier: str) -> tuple[str, str]: From c23f3fa3bc9d6ebf7e8c50b0dbe56250e6352a91 Mon Sep 17 00:00:00 2001 From: mgiannakopoulos Date: Wed, 17 Sep 2025 14:15:12 -0400 Subject: [PATCH 04/21] Hardcode dummy credentials and region info for ChatBedrockConverse client. --- singlestoredb/ai/chat.py | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/singlestoredb/ai/chat.py b/singlestoredb/ai/chat.py index 8b3d7d6db..9ee11e2f9 100644 --- a/singlestoredb/ai/chat.py +++ b/singlestoredb/ai/chat.py @@ -120,21 +120,16 @@ def __init__( provided_headers = {} if backend_type == 'bedrock': - self._removed_aws_env: dict[str, str] = {} - for _v in ( - 'AWS_ACCESS_KEY_ID', - 'AWS_SECRET_ACCESS_KEY', - 'AWS_SESSION_TOKEN', - ): - if _v in os.environ: - self._removed_aws_env[_v] = os.environ.pop(_v) - token_env = os.environ.get('SINGLESTOREDB_USER_TOKEN') token = api_key if api_key is not None else token_env self._client = ChatBedrockConverse( base_url=info.connection_url, model=actual_model, streaming=streaming, + region='us-east-1', # dummy value; UMG does not use this + aws_access_key_id='placeholder', # dummy value; UMG does not use this + aws_secret_access_key='placeholder', # dummy value; UMG does not use this + cache=True, **kwargs, ) From f2ccfbf5741d10cac6c163faf74bb483f23f8304 Mon Sep 17 00:00:00 2001 From: mgiannakopoulos Date: Wed, 17 Sep 2025 14:25:47 -0400 Subject: [PATCH 05/21] Rename 'region' parameter to 'region_name'. --- singlestoredb/ai/chat.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/singlestoredb/ai/chat.py b/singlestoredb/ai/chat.py index 9ee11e2f9..ff55d572c 100644 --- a/singlestoredb/ai/chat.py +++ b/singlestoredb/ai/chat.py @@ -126,7 +126,7 @@ def __init__( base_url=info.connection_url, model=actual_model, streaming=streaming, - region='us-east-1', # dummy value; UMG does not use this + region_name='us-east-1', # dummy value; UMG does not use this aws_access_key_id='placeholder', # dummy value; UMG does not use this aws_secret_access_key='placeholder', # dummy value; UMG does not use this cache=True, From a2728dc1fba822258f80c11b43f74fa453c1e0a9 Mon Sep 17 00:00:00 2001 From: mgiannakopoulos Date: Wed, 17 Sep 2025 15:23:39 -0400 Subject: [PATCH 06/21] Expose 'streaming' parameter setting its oposite value to 'disable_streaming' ChatBedrockConverse client. --- singlestoredb/ai/chat.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/singlestoredb/ai/chat.py b/singlestoredb/ai/chat.py index ff55d572c..f94f3c218 100644 --- a/singlestoredb/ai/chat.py +++ b/singlestoredb/ai/chat.py @@ -125,7 +125,7 @@ def __init__( self._client = ChatBedrockConverse( base_url=info.connection_url, model=actual_model, - streaming=streaming, + disable_streaming=not streaming, region_name='us-east-1', # dummy value; UMG does not use this aws_access_key_id='placeholder', # dummy value; UMG does not use this aws_secret_access_key='placeholder', # dummy value; UMG does not use this From a0a519735bfa4aa3a87d34b30103f384382ace26 Mon Sep 17 00:00:00 2001 From: mgiannakopoulos Date: Wed, 17 Sep 2025 15:25:10 -0400 Subject: [PATCH 07/21] Set the default value for 'streaming' paramter to True. --- singlestoredb/ai/chat.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/singlestoredb/ai/chat.py b/singlestoredb/ai/chat.py index f94f3c218..5f8a419f9 100644 --- a/singlestoredb/ai/chat.py +++ b/singlestoredb/ai/chat.py @@ -93,7 +93,7 @@ def __init__( http_client: Optional[httpx.Client] = None, api_key: Optional[str] = None, obo_token_getter: Optional[Callable[[], Optional[str]]] = None, - streaming: bool = False, + streaming: bool = True, **kwargs: Any, ) -> None: prefix, actual_model = self._parse_identifier(model_name) From 3ea94005d39789a8260fe36bf382016c5a837680 Mon Sep 17 00:00:00 2001 From: mgiannakopoulos Date: Wed, 17 Sep 2025 17:30:13 -0400 Subject: [PATCH 08/21] Remove the cache option. --- singlestoredb/ai/chat.py | 1 - 1 file changed, 1 deletion(-) diff --git a/singlestoredb/ai/chat.py b/singlestoredb/ai/chat.py index 5f8a419f9..b3eb3ead7 100644 --- a/singlestoredb/ai/chat.py +++ b/singlestoredb/ai/chat.py @@ -129,7 +129,6 @@ def __init__( region_name='us-east-1', # dummy value; UMG does not use this aws_access_key_id='placeholder', # dummy value; UMG does not use this aws_secret_access_key='placeholder', # dummy value; UMG does not use this - cache=True, **kwargs, ) From 321fd9b0b2b151723bff3cd1ca89c2e3a6ce2e65 Mon Sep 17 00:00:00 2001 From: mgiannakopoulos Date: Wed, 17 Sep 2025 18:22:11 -0400 Subject: [PATCH 09/21] Remove unsupported kargs from Bedrock calls. --- singlestoredb/ai/chat.py | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/singlestoredb/ai/chat.py b/singlestoredb/ai/chat.py index b3eb3ead7..53d6c873f 100644 --- a/singlestoredb/ai/chat.py +++ b/singlestoredb/ai/chat.py @@ -257,6 +257,31 @@ def _maybe_inject_headers(self, kwargs: dict[str, Any]) -> None: if call_headers: kwargs['headers'] = call_headers + # ------------------------------------------------------------------ + # Bedrock kwargs sanitation + # ------------------------------------------------------------------ + def _sanitize_bedrock_kwargs(self, kwargs: dict[str, Any]) -> None: + """Remove or adapt kwargs not supported by ChatBedrockConverse. + + Currently strips keys that would raise TypeError in + ChatBedrockConverse._converse_params (e.g. 'parallelToolCalls'). + This guards against passing OpenAI/other provider specific + parameters straight through to Bedrock. + """ + if self._backend_type != 'bedrock': # only relevant for bedrock backend + return + unsupported = {'parallelToolCalls', 'parallel_tool_calls'} + # Direct kwargs + for key in list(kwargs.keys()): + if key in unsupported: + kwargs.pop(key) + # Nested model_kwargs if present + mk = kwargs.get('model_kwargs') + if isinstance(mk, dict): + for key in list(mk.keys()): + if key in unsupported: + mk.pop(key) + def as_base(self) -> Any: """Return the underlying backend client instance. @@ -267,14 +292,17 @@ def as_base(self) -> Any: def invoke(self, *args: Any, **kwargs: Any) -> Any: self._maybe_inject_headers(kwargs) + self._sanitize_bedrock_kwargs(kwargs) return self._client.invoke(*args, **kwargs) async def ainvoke(self, *args: Any, **kwargs: Any) -> Any: self._maybe_inject_headers(kwargs) + self._sanitize_bedrock_kwargs(kwargs) return await self._client.ainvoke(*args, **kwargs) def stream(self, *args: Any, **kwargs: Any) -> Any: self._maybe_inject_headers(kwargs) + self._sanitize_bedrock_kwargs(kwargs) return self._client.stream(*args, **kwargs) async def astream( @@ -283,6 +311,7 @@ async def astream( **kwargs: Any, ) -> AsyncIterator[Any]: self._maybe_inject_headers(kwargs) + self._sanitize_bedrock_kwargs(kwargs) async for chunk in self._client.astream(*args, **kwargs): yield chunk @@ -292,14 +321,17 @@ async def astream( # ------------------------------------------------------------------ def generate(self, *args: Any, **kwargs: Any) -> Any: self._maybe_inject_headers(kwargs) + self._sanitize_bedrock_kwargs(kwargs) return self._client.generate(*args, **kwargs) async def agenerate(self, *args: Any, **kwargs: Any) -> Any: self._maybe_inject_headers(kwargs) + self._sanitize_bedrock_kwargs(kwargs) return await self._client.agenerate(*args, **kwargs) def predict(self, *args: Any, **kwargs: Any) -> Any: self._maybe_inject_headers(kwargs) + self._sanitize_bedrock_kwargs(kwargs) return self._client.predict(*args, **kwargs) async def apredict( @@ -308,6 +340,7 @@ async def apredict( **kwargs: Any, ) -> Any: self._maybe_inject_headers(kwargs) + self._sanitize_bedrock_kwargs(kwargs) return await self._client.apredict(*args, **kwargs) def predict_messages( @@ -316,6 +349,7 @@ def predict_messages( **kwargs: Any, ) -> Any: self._maybe_inject_headers(kwargs) + self._sanitize_bedrock_kwargs(kwargs) return self._client.predict_messages(*args, **kwargs) async def apredict_messages( @@ -324,18 +358,22 @@ async def apredict_messages( **kwargs: Any, ) -> Any: self._maybe_inject_headers(kwargs) + self._sanitize_bedrock_kwargs(kwargs) return await self._client.apredict_messages(*args, **kwargs) def batch(self, *args: Any, **kwargs: Any) -> Any: self._maybe_inject_headers(kwargs) + self._sanitize_bedrock_kwargs(kwargs) return self._client.batch(*args, **kwargs) async def abatch(self, *args: Any, **kwargs: Any) -> Any: self._maybe_inject_headers(kwargs) + self._sanitize_bedrock_kwargs(kwargs) return await self._client.abatch(*args, **kwargs) def apply(self, *args: Any, **kwargs: Any) -> Any: self._maybe_inject_headers(kwargs) + self._sanitize_bedrock_kwargs(kwargs) return self._client.apply(*args, **kwargs) async def aapply( @@ -344,6 +382,7 @@ async def aapply( **kwargs: Any, ) -> Any: self._maybe_inject_headers(kwargs) + self._sanitize_bedrock_kwargs(kwargs) return await self._client.aapply(*args, **kwargs) def __repr__(self) -> str: From 8ff4dfec0051ed01563dbe64b8caee7e3b38e5c0 Mon Sep 17 00:00:00 2001 From: mgiannakopoulos Date: Thu, 18 Sep 2025 06:04:29 -0400 Subject: [PATCH 10/21] Replace composition wrapper with a factory method. --- singlestoredb/ai/chat.py | 419 +++++++++------------------------------ 1 file changed, 96 insertions(+), 323 deletions(-) diff --git a/singlestoredb/ai/chat.py b/singlestoredb/ai/chat.py index 53d6c873f..7ea4b5ae5 100644 --- a/singlestoredb/ai/chat.py +++ b/singlestoredb/ai/chat.py @@ -1,6 +1,5 @@ import os from typing import Any -from typing import AsyncIterator from typing import Callable from typing import Optional @@ -24,6 +23,9 @@ 'Please install it with `pip install langchain-aws`.', ) +import boto3 +from botocore.config import Config + class SingleStoreChatOpenAI(ChatOpenAI): def __init__(self, model_name: str, api_key: Optional[str] = None, **kwargs: Any): @@ -63,332 +65,103 @@ def __init__(self, model_name: str, api_key: Optional[str] = None, **kwargs: Any ) -class SingleStoreExperimentalChat: - """Experimental unified chat interface (prefix-based two-part identifier). - - Input model name MUST (for dynamic selection) be of the form: - . - where is one of: - * ``aura`` -> OpenAI style (ChatOpenAI backend) - * ``aura-azr`` -> Azure OpenAI style (still ChatOpenAI backend) - * ``aura-amz`` -> Amazon Bedrock (ChatBedrockConverse backend) - - If no delimiter (".", ":" or "/") is present, or prefix is unrecognized, - the entire string is treated as an OpenAI-style model (ChatOpenAI). - - Only the prefix ``aura-amz`` triggers Bedrock usage; in that case the - *second* component (after the first delimiter) is passed as the model - name to the Bedrock client. For other prefixes the second component is - passed to ChatOpenAI with the SingleStore Fusion-provided base_url. - - This class uses composition and delegates attribute access to the chosen - backend client for near drop-in behavior. +def SingleStoreChatFactory( + model_name: str, + api_key: Optional[str] = None, + streaming: bool = True, + http_client: Optional[httpx.Client] = None, + obo_token_getter: Optional[Callable[[], Optional[str]]] = None, + **kwargs: Any, +) -> ChatOpenAI | ChatBedrockConverse: + """Return a chat model instance (ChatOpenAI or ChatBedrockConverse) based on prefix. + + The fully-qualified model name is expected to contain a prefix followed by + a delimiter (one of '.', ':', '/'). Supported prefixes: + * aura -> OpenAI style (ChatOpenAI backend) + * aura-azr -> Azure OpenAI style (still ChatOpenAI backend) + * aura-amz -> Amazon Bedrock (ChatBedrockConverse backend) + + If no supported prefix is detected the entire value is treated as an + OpenAI-style model routed through the SingleStore Fusion gateway. """ + # Parse identifier + prefix = 'aura' + actual_model = model_name + for sep in ('.', ':', '/'): + if sep in model_name: + head, tail = model_name.split(sep, 1) + candidate = head.strip().lower() + if candidate in {'aura', 'aura-azr', 'aura-amz'}: + prefix = candidate + actual_model = tail.strip() + else: + # Unsupported prefix; treat whole string as model for OpenAI path + actual_model = model_name + break - _VALID_PREFIXES = {'aura', 'aura-azr', 'aura-amz'} - - def __init__( - self, - model_name: str, - http_client: Optional[httpx.Client] = None, - api_key: Optional[str] = None, - obo_token_getter: Optional[Callable[[], Optional[str]]] = None, - streaming: bool = True, - **kwargs: Any, - ) -> None: - prefix, actual_model = self._parse_identifier(model_name) - - inference_api_manager = ( - get_workspace_manager().organizations.current.inference_apis + inference_api_manager = ( + get_workspace_manager().organizations.current.inference_apis + ) + info = inference_api_manager.get(model_name=actual_model) + token_env = os.environ.get('SINGLESTOREDB_USER_TOKEN') + token = api_key if api_key is not None else token_env + + if prefix == 'aura-amz': + # Instantiate Bedrock client + cfg = Config() + if http_client is not None and http_client.timeout is not None: + cfg.timeout = http_client.timeout + client = boto3.client( + 'bedrock-runtime', + endpoint_url=info.connection_url, # redirect requests to UMG + region_name='us-east-1', # dummy value; UMG does not use this + aws_access_key_id='placeholder', # dummy value; UMG does not use this + aws_secret_access_key='placeholder', # dummy value; UMG does not use this + config=cfg, ) - # Use the raw identifier for Fusion lookup (keeps gateway mapping - # logic server-side). - info = inference_api_manager.get(model_name=actual_model) - if prefix == 'aura-amz': - backend_type = 'bedrock' - elif prefix == 'aura-azr': - backend_type = 'azure-openai' - else: - backend_type = 'openai' - - # Extract headers from provided http_client (if any) for possible reuse. - provided_headers: dict[str, str] = {} - if http_client is not None and hasattr(http_client, 'headers'): - try: - provided_headers = dict(http_client.headers) # make a copy - except Exception: - provided_headers = {} - - if backend_type == 'bedrock': - token_env = os.environ.get('SINGLESTOREDB_USER_TOKEN') - token = api_key if api_key is not None else token_env - self._client = ChatBedrockConverse( - base_url=info.connection_url, - model=actual_model, - disable_streaming=not streaming, - region_name='us-east-1', # dummy value; UMG does not use this - aws_access_key_id='placeholder', # dummy value; UMG does not use this - aws_secret_access_key='placeholder', # dummy value; UMG does not use this - **kwargs, + if obo_token_getter is not None: + def _inject_headers(request: Any, **_ignored: Any) -> None: + """Inject dynamic auth/OBO headers prior to Bedrock signing.""" + obo_val = obo_token_getter() + if obo_val: + request.headers['X-S2-OBO'] = obo_val + if token: + request.headers['Authorization'] = f'Bearer {token}' + + emitter = client._endpoint._event_emitter + emitter.register_first( + 'before-sign.bedrock-runtime.Converse', + _inject_headers, ) - - # Attempt to inject Authorization header for downstream HTTP layers. - # Not all implementations expose a direct header map; we add a - # lightweight wrapper if needed. - self._auth_header = None - merged_headers: dict[str, str] = {} - if provided_headers: - merged_headers.update({k: v for k, v in provided_headers.items()}) - if token: - merged_headers.setdefault('Authorization', f'Bearer {token}') - # Add Bedrock converse headers based on streaming flag - if streaming: - merged_headers.setdefault('X-BEDROCK-CONVERSE-STREAMING', 'true') - else: - merged_headers.setdefault('X-BEDROCK-CONVERSE', 'true') - if merged_headers: - # Try to set directly if backend exposes default_headers - if ( - hasattr(self._client, 'default_headers') - and isinstance( - getattr(self._client, 'default_headers'), - dict, - ) - ): - getattr(self._client, 'default_headers').update( - { - k: v - for k, v in merged_headers.items() - if k - not in getattr( - self._client, 'default_headers', - ) - }, - ) - else: - self._auth_header = merged_headers # fallback for invoke/stream - else: - # Pass through http_client if ChatOpenAI supports it; if not, - # include in kwargs only when present. - token_env = os.environ.get('SINGLESTOREDB_USER_TOKEN') - token = api_key if api_key is not None else token_env - openai_kwargs = dict( - base_url=info.connection_url, - api_key=token, - model=actual_model, - streaming=streaming, + emitter.register_first( + 'before-sign.bedrock-runtime.ConverseStream', + _inject_headers, ) - if http_client is not None: - # Some versions accept 'http_client' parameter for custom transport. - openai_kwargs['http_client'] = http_client - self._client = ChatOpenAI( - **openai_kwargs, - **kwargs, + emitter.register_first( + 'before-sign.bedrock-runtime.InvokeModel', + _inject_headers, ) - - self._backend_type = backend_type - self.model_name = model_name # external identifier provided by caller - self.actual_model = actual_model # model portion after prefix - self.prefix = prefix # normalized prefix - self.connection_url = info.connection_url - # Optional callable returning a fresh OBO token each request (Bedrock only). - # If supplied, a new token will be fetched and injected into the - # 'X-S2-OBO' header for every Bedrock request made via this wrapper. - self._obo_token_getter = obo_token_getter - self._streaming = streaming - - @classmethod - def _parse_identifier(cls, identifier: str) -> tuple[str, str]: - for sep in ('.', ':', '/'): - if sep in identifier: - head, tail = identifier.split(sep, 1) - prefix = head.strip().lower() - model = tail.strip() - if prefix in cls._VALID_PREFIXES: - return prefix, model - return 'aura', identifier.strip() # treat whole string as model - return 'aura', identifier.strip() - - # --------------------------------------------------------------------- - # Delegation layer - # --------------------------------------------------------------------- - def __getattr__(self, item: str) -> Any: - return getattr(self._client, item) - - # ------------------------------------------------------------------ - # Internal helpers - # ------------------------------------------------------------------ - def _maybe_inject_headers(self, kwargs: dict[str, Any]) -> None: - """Inject Bedrock auth headers into kwargs if we only have a fallback. - - If the Bedrock client accepted headers via its own internal - `default_headers` we don't need to do anything here. When we had - to stash headers into `_auth_header` we add them for each outbound - call that allows a `headers` kwarg and has not already provided - its own. - """ - if self._backend_type != 'bedrock': - return - - # Start from existing headers in the call. - # Copy to avoid mutating caller-provided dict in-place. - call_headers: dict[str, str] = {} - if 'headers' in kwargs and isinstance(kwargs['headers'], dict): - call_headers = dict(kwargs['headers']) - elif ( - hasattr(self, '_auth_header') - and getattr(self, '_auth_header') - and 'headers' not in kwargs - ): - # Use fallback auth header if user did not pass any. - call_headers = dict(getattr(self, '_auth_header')) - - # Dynamic OBO token injection (always fresh per request if getter provided) - getter = getattr(self, '_obo_token_getter', None) - if getter is not None: - try: - obo_token = getter() - except Exception: - obo_token = None - if obo_token: - # Overwrite any stale value. - call_headers['X-S2-OBO'] = obo_token - - if call_headers: - kwargs['headers'] = call_headers - - # ------------------------------------------------------------------ - # Bedrock kwargs sanitation - # ------------------------------------------------------------------ - def _sanitize_bedrock_kwargs(self, kwargs: dict[str, Any]) -> None: - """Remove or adapt kwargs not supported by ChatBedrockConverse. - - Currently strips keys that would raise TypeError in - ChatBedrockConverse._converse_params (e.g. 'parallelToolCalls'). - This guards against passing OpenAI/other provider specific - parameters straight through to Bedrock. - """ - if self._backend_type != 'bedrock': # only relevant for bedrock backend - return - unsupported = {'parallelToolCalls', 'parallel_tool_calls'} - # Direct kwargs - for key in list(kwargs.keys()): - if key in unsupported: - kwargs.pop(key) - # Nested model_kwargs if present - mk = kwargs.get('model_kwargs') - if isinstance(mk, dict): - for key in list(mk.keys()): - if key in unsupported: - mk.pop(key) - - def as_base(self) -> Any: - """Return the underlying backend client instance. - - This gives callers direct access to provider specific methods or - configuration that aren't surfaced by the experimental wrapper. - """ - return self._client - - def invoke(self, *args: Any, **kwargs: Any) -> Any: - self._maybe_inject_headers(kwargs) - self._sanitize_bedrock_kwargs(kwargs) - return self._client.invoke(*args, **kwargs) - - async def ainvoke(self, *args: Any, **kwargs: Any) -> Any: - self._maybe_inject_headers(kwargs) - self._sanitize_bedrock_kwargs(kwargs) - return await self._client.ainvoke(*args, **kwargs) - - def stream(self, *args: Any, **kwargs: Any) -> Any: - self._maybe_inject_headers(kwargs) - self._sanitize_bedrock_kwargs(kwargs) - return self._client.stream(*args, **kwargs) - - async def astream( - self, - *args: Any, - **kwargs: Any, - ) -> AsyncIterator[Any]: - self._maybe_inject_headers(kwargs) - self._sanitize_bedrock_kwargs(kwargs) - async for chunk in self._client.astream(*args, **kwargs): - yield chunk - - # ------------------------------------------------------------------ - # Extended delegation for additional common chat model surface area. - # Each method simply injects headers (if needed) then forwards. - # ------------------------------------------------------------------ - def generate(self, *args: Any, **kwargs: Any) -> Any: - self._maybe_inject_headers(kwargs) - self._sanitize_bedrock_kwargs(kwargs) - return self._client.generate(*args, **kwargs) - - async def agenerate(self, *args: Any, **kwargs: Any) -> Any: - self._maybe_inject_headers(kwargs) - self._sanitize_bedrock_kwargs(kwargs) - return await self._client.agenerate(*args, **kwargs) - - def predict(self, *args: Any, **kwargs: Any) -> Any: - self._maybe_inject_headers(kwargs) - self._sanitize_bedrock_kwargs(kwargs) - return self._client.predict(*args, **kwargs) - - async def apredict( - self, - *args: Any, - **kwargs: Any, - ) -> Any: - self._maybe_inject_headers(kwargs) - self._sanitize_bedrock_kwargs(kwargs) - return await self._client.apredict(*args, **kwargs) - - def predict_messages( - self, - *args: Any, - **kwargs: Any, - ) -> Any: - self._maybe_inject_headers(kwargs) - self._sanitize_bedrock_kwargs(kwargs) - return self._client.predict_messages(*args, **kwargs) - - async def apredict_messages( - self, - *args: Any, - **kwargs: Any, - ) -> Any: - self._maybe_inject_headers(kwargs) - self._sanitize_bedrock_kwargs(kwargs) - return await self._client.apredict_messages(*args, **kwargs) - - def batch(self, *args: Any, **kwargs: Any) -> Any: - self._maybe_inject_headers(kwargs) - self._sanitize_bedrock_kwargs(kwargs) - return self._client.batch(*args, **kwargs) - - async def abatch(self, *args: Any, **kwargs: Any) -> Any: - self._maybe_inject_headers(kwargs) - self._sanitize_bedrock_kwargs(kwargs) - return await self._client.abatch(*args, **kwargs) - - def apply(self, *args: Any, **kwargs: Any) -> Any: - self._maybe_inject_headers(kwargs) - self._sanitize_bedrock_kwargs(kwargs) - return self._client.apply(*args, **kwargs) - - async def aapply( - self, - *args: Any, - **kwargs: Any, - ) -> Any: - self._maybe_inject_headers(kwargs) - self._sanitize_bedrock_kwargs(kwargs) - return await self._client.aapply(*args, **kwargs) - - def __repr__(self) -> str: - return ( - 'SingleStoreExperimentalChat(' - f'identifier={self.model_name!r}, ' - f'actual_model={self.actual_model!r}, ' - f'prefix={self.prefix}, backend={self._backend_type})' + emitter.register_first( + 'before-sign.bedrock-runtime.InvokeModelWithResponseStream', + _inject_headers, + ) + return ChatBedrockConverse( + model=actual_model, + disable_streaming=not streaming, + bedrock_runtime_client=client, + **kwargs, ) + + # OpenAI / Azure OpenAI path + openai_kwargs = dict( + base_url=info.connection_url, + api_key=token, + model=actual_model, + streaming=streaming, + ) + if http_client is not None: + openai_kwargs['http_client'] = http_client + return ChatOpenAI( + **openai_kwargs, + **kwargs, + ) From f37614115a821bd8610f21725415b17fae6a71f7 Mon Sep 17 00:00:00 2001 From: mgiannakopoulos Date: Thu, 18 Sep 2025 06:28:30 -0400 Subject: [PATCH 11/21] Pass bedrock runtime client as client parameter. --- singlestoredb/ai/chat.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/singlestoredb/ai/chat.py b/singlestoredb/ai/chat.py index 7ea4b5ae5..8e79314fb 100644 --- a/singlestoredb/ai/chat.py +++ b/singlestoredb/ai/chat.py @@ -147,8 +147,12 @@ def _inject_headers(request: Any, **_ignored: Any) -> None: ) return ChatBedrockConverse( model=actual_model, + endpoint_url=info.connection_url, # redirect requests to UMG + region_name='us-east-1', # dummy value; UMG does not use this + aws_access_key_id='placeholder', # dummy value; UMG does not use this + aws_secret_access_key='placeholder', # dummy value; UMG does not use this disable_streaming=not streaming, - bedrock_runtime_client=client, + client=client, **kwargs, ) From 84ad037fa12fa1e3633b3d0e337899135eb044b4 Mon Sep 17 00:00:00 2001 From: mgiannakopoulos Date: Thu, 18 Sep 2025 06:40:35 -0400 Subject: [PATCH 12/21] Pass also the 'X-BEDROCK-CONVERSE' headers that indicate that the requets should be handled as passthrough from UMG. --- singlestoredb/ai/chat.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/singlestoredb/ai/chat.py b/singlestoredb/ai/chat.py index 8e79314fb..963bd95df 100644 --- a/singlestoredb/ai/chat.py +++ b/singlestoredb/ai/chat.py @@ -111,6 +111,7 @@ def SingleStoreChatFactory( cfg = Config() if http_client is not None and http_client.timeout is not None: cfg.timeout = http_client.timeout + cfg.connect_timeout = http_client.timeout client = boto3.client( 'bedrock-runtime', endpoint_url=info.connection_url, # redirect requests to UMG @@ -127,6 +128,10 @@ def _inject_headers(request: Any, **_ignored: Any) -> None: request.headers['X-S2-OBO'] = obo_val if token: request.headers['Authorization'] = f'Bearer {token}' + if streaming: + request.headers['X-BEDROCK-CONVERSE-STREAMING'] = 'true' + else: + request.headers['X-BEDROCK-CONVERSE'] = 'true' emitter = client._endpoint._event_emitter emitter.register_first( From 40de94e42492589a0b534df73ce7e15578940637 Mon Sep 17 00:00:00 2001 From: mgiannakopoulos Date: Thu, 18 Sep 2025 10:51:03 -0400 Subject: [PATCH 13/21] Remove some amazon specific headers, along with validation, remove X-BEDROCK headers as well. --- singlestoredb/ai/chat.py | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/singlestoredb/ai/chat.py b/singlestoredb/ai/chat.py index 963bd95df..10dbcf02d 100644 --- a/singlestoredb/ai/chat.py +++ b/singlestoredb/ai/chat.py @@ -24,6 +24,7 @@ ) import boto3 +from botocore import UNSIGNED from botocore.config import Config @@ -108,7 +109,7 @@ def SingleStoreChatFactory( if prefix == 'aura-amz': # Instantiate Bedrock client - cfg = Config() + cfg = Config(signature_version=UNSIGNED) if http_client is not None and http_client.timeout is not None: cfg.timeout = http_client.timeout cfg.connect_timeout = http_client.timeout @@ -128,26 +129,30 @@ def _inject_headers(request: Any, **_ignored: Any) -> None: request.headers['X-S2-OBO'] = obo_val if token: request.headers['Authorization'] = f'Bearer {token}' - if streaming: - request.headers['X-BEDROCK-CONVERSE-STREAMING'] = 'true' - else: - request.headers['X-BEDROCK-CONVERSE'] = 'true' + # if streaming: + # request.headers['X-BEDROCK-CONVERSE-STREAMING'] = 'true' + # else: + # request.headers['X-BEDROCK-CONVERSE'] = 'true' + request.headers.pop('X-Amz-Date', None) + request.headers.pop('X-Amz-Security-Token', None) + # request.headers.pop('Amz-Sdk-Request', None) + # request.headers.pop('Amz-Sdk-Invocation-Id', None) emitter = client._endpoint._event_emitter emitter.register_first( - 'before-sign.bedrock-runtime.Converse', + 'before-send.bedrock-runtime.Converse', _inject_headers, ) emitter.register_first( - 'before-sign.bedrock-runtime.ConverseStream', + 'before-send.bedrock-runtime.ConverseStream', _inject_headers, ) emitter.register_first( - 'before-sign.bedrock-runtime.InvokeModel', + 'before-send.bedrock-runtime.InvokeModel', _inject_headers, ) emitter.register_first( - 'before-sign.bedrock-runtime.InvokeModelWithResponseStream', + 'before-send.bedrock-runtime.InvokeModelWithResponseStream', _inject_headers, ) return ChatBedrockConverse( From 48b79d7b67ef76e6e026d75a5a6f28c83664d3d1 Mon Sep 17 00:00:00 2001 From: mgiannakopoulos Date: Fri, 19 Sep 2025 02:41:27 -0400 Subject: [PATCH 14/21] Remove commented out code; set max retries to 1 for Amazon Bedrock models. --- singlestoredb/ai/chat.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/singlestoredb/ai/chat.py b/singlestoredb/ai/chat.py index 10dbcf02d..25f2ccef0 100644 --- a/singlestoredb/ai/chat.py +++ b/singlestoredb/ai/chat.py @@ -109,7 +109,13 @@ def SingleStoreChatFactory( if prefix == 'aura-amz': # Instantiate Bedrock client - cfg = Config(signature_version=UNSIGNED) + cfg = Config( + signature_version=UNSIGNED, + retries={ + 'max_attempts': 1, + 'mode': 'standard', + }, + ) if http_client is not None and http_client.timeout is not None: cfg.timeout = http_client.timeout cfg.connect_timeout = http_client.timeout @@ -129,14 +135,8 @@ def _inject_headers(request: Any, **_ignored: Any) -> None: request.headers['X-S2-OBO'] = obo_val if token: request.headers['Authorization'] = f'Bearer {token}' - # if streaming: - # request.headers['X-BEDROCK-CONVERSE-STREAMING'] = 'true' - # else: - # request.headers['X-BEDROCK-CONVERSE'] = 'true' request.headers.pop('X-Amz-Date', None) request.headers.pop('X-Amz-Security-Token', None) - # request.headers.pop('Amz-Sdk-Request', None) - # request.headers.pop('Amz-Sdk-Invocation-Id', None) emitter = client._endpoint._event_emitter emitter.register_first( From 02b1f4759eaeed0360253bbc8ab0220d254f5015 Mon Sep 17 00:00:00 2001 From: mgiannakopoulos Date: Fri, 19 Sep 2025 03:10:01 -0400 Subject: [PATCH 15/21] Use 'Union' return type to satisfy pre-commit checks for python version 3.9. --- singlestoredb/ai/chat.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/singlestoredb/ai/chat.py b/singlestoredb/ai/chat.py index 25f2ccef0..edc277e9c 100644 --- a/singlestoredb/ai/chat.py +++ b/singlestoredb/ai/chat.py @@ -2,6 +2,7 @@ from typing import Any from typing import Callable from typing import Optional +from typing import Union import httpx @@ -73,7 +74,7 @@ def SingleStoreChatFactory( http_client: Optional[httpx.Client] = None, obo_token_getter: Optional[Callable[[], Optional[str]]] = None, **kwargs: Any, -) -> ChatOpenAI | ChatBedrockConverse: +) -> Union[ChatOpenAI, ChatBedrockConverse]: """Return a chat model instance (ChatOpenAI or ChatBedrockConverse) based on prefix. The fully-qualified model name is expected to contain a prefix followed by From 5db3fbff09374972687475b25a2a9dd0f90ce485 Mon Sep 17 00:00:00 2001 From: mgiannakopoulos Date: Fri, 19 Sep 2025 05:38:04 -0400 Subject: [PATCH 16/21] Expose also the hostingPlatform for InferenceAPIInfo. --- singlestoredb/management/inference_api.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/singlestoredb/management/inference_api.py b/singlestoredb/management/inference_api.py index f017339d4..13b6413c2 100644 --- a/singlestoredb/management/inference_api.py +++ b/singlestoredb/management/inference_api.py @@ -23,6 +23,7 @@ class InferenceAPIInfo(object): name: str connection_url: str project_id: str + hosting_platform: str def __init__( self, @@ -31,12 +32,14 @@ def __init__( name: str, connection_url: str, project_id: str, + hosting_platform: str, ): self.service_id = service_id self.connection_url = connection_url self.model_name = model_name self.name = name self.project_id = project_id + self.hosting_platform = hosting_platform @classmethod def from_dict( @@ -62,6 +65,7 @@ def from_dict( model_name=obj['modelName'], name=obj['name'], connection_url=obj['connectionURL'], + hosting_platform=obj['hostingPlatform'], ) return out From 2fdd49ba0637f5ffe3db755e4a28a9a0a1b3048b Mon Sep 17 00:00:00 2001 From: mgiannakopoulos Date: Fri, 19 Sep 2025 20:08:11 -0400 Subject: [PATCH 17/21] Do not use model prefix, rely on hosting platform. --- singlestoredb/ai/chat.py | 34 +++++----------------------------- 1 file changed, 5 insertions(+), 29 deletions(-) diff --git a/singlestoredb/ai/chat.py b/singlestoredb/ai/chat.py index edc277e9c..dedbed481 100644 --- a/singlestoredb/ai/chat.py +++ b/singlestoredb/ai/chat.py @@ -75,40 +75,16 @@ def SingleStoreChatFactory( obo_token_getter: Optional[Callable[[], Optional[str]]] = None, **kwargs: Any, ) -> Union[ChatOpenAI, ChatBedrockConverse]: - """Return a chat model instance (ChatOpenAI or ChatBedrockConverse) based on prefix. - - The fully-qualified model name is expected to contain a prefix followed by - a delimiter (one of '.', ':', '/'). Supported prefixes: - * aura -> OpenAI style (ChatOpenAI backend) - * aura-azr -> Azure OpenAI style (still ChatOpenAI backend) - * aura-amz -> Amazon Bedrock (ChatBedrockConverse backend) - - If no supported prefix is detected the entire value is treated as an - OpenAI-style model routed through the SingleStore Fusion gateway. + """Return a chat model instance (ChatOpenAI or ChatBedrockConverse). """ - # Parse identifier - prefix = 'aura' - actual_model = model_name - for sep in ('.', ':', '/'): - if sep in model_name: - head, tail = model_name.split(sep, 1) - candidate = head.strip().lower() - if candidate in {'aura', 'aura-azr', 'aura-amz'}: - prefix = candidate - actual_model = tail.strip() - else: - # Unsupported prefix; treat whole string as model for OpenAI path - actual_model = model_name - break - inference_api_manager = ( get_workspace_manager().organizations.current.inference_apis ) - info = inference_api_manager.get(model_name=actual_model) + info = inference_api_manager.get(model_name=model_name) token_env = os.environ.get('SINGLESTOREDB_USER_TOKEN') token = api_key if api_key is not None else token_env - if prefix == 'aura-amz': + if info.hosting_platform == 'Amazon': # Instantiate Bedrock client cfg = Config( signature_version=UNSIGNED, @@ -157,7 +133,7 @@ def _inject_headers(request: Any, **_ignored: Any) -> None: _inject_headers, ) return ChatBedrockConverse( - model=actual_model, + model=model_name, endpoint_url=info.connection_url, # redirect requests to UMG region_name='us-east-1', # dummy value; UMG does not use this aws_access_key_id='placeholder', # dummy value; UMG does not use this @@ -171,7 +147,7 @@ def _inject_headers(request: Any, **_ignored: Any) -> None: openai_kwargs = dict( base_url=info.connection_url, api_key=token, - model=actual_model, + model=model_name, streaming=streaming, ) if http_client is not None: From 054c8e2bbaaaa786a9678679d0423d1e1487635a Mon Sep 17 00:00:00 2001 From: mgiannakopoulos Date: Sat, 20 Sep 2025 09:01:11 -0400 Subject: [PATCH 18/21] Introduce SingleStoreEmbeddingsFactory; small fixes. --- singlestoredb/ai/chat.py | 71 ++++++++++++------------ singlestoredb/ai/embeddings.py | 98 ++++++++++++++++++++++++++++++++++ 2 files changed, 134 insertions(+), 35 deletions(-) diff --git a/singlestoredb/ai/chat.py b/singlestoredb/ai/chat.py index dedbed481..b8e655cbe 100644 --- a/singlestoredb/ai/chat.py +++ b/singlestoredb/ai/chat.py @@ -86,16 +86,15 @@ def SingleStoreChatFactory( if info.hosting_platform == 'Amazon': # Instantiate Bedrock client - cfg = Config( - signature_version=UNSIGNED, - retries={ - 'max_attempts': 1, - 'mode': 'standard', - }, - ) + cfg_kwargs = { + 'signature_version': UNSIGNED, + 'retries': {'max_attempts': 1, 'mode': 'standard'}, + } if http_client is not None and http_client.timeout is not None: - cfg.timeout = http_client.timeout - cfg.connect_timeout = http_client.timeout + cfg_kwargs['read_timeout'] = http_client.timeout + cfg_kwargs['connect_timeout'] = http_client.timeout + + cfg = Config(**cfg_kwargs) client = boto3.client( 'bedrock-runtime', endpoint_url=info.connection_url, # redirect requests to UMG @@ -104,36 +103,38 @@ def SingleStoreChatFactory( aws_secret_access_key='placeholder', # dummy value; UMG does not use this config=cfg, ) - if obo_token_getter is not None: - def _inject_headers(request: Any, **_ignored: Any) -> None: - """Inject dynamic auth/OBO headers prior to Bedrock signing.""" + + def _inject_headers(request: Any, **_ignored: Any) -> None: + """Inject dynamic auth/OBO headers prior to Bedrock sending.""" + if obo_token_getter is not None: obo_val = obo_token_getter() if obo_val: request.headers['X-S2-OBO'] = obo_val - if token: - request.headers['Authorization'] = f'Bearer {token}' - request.headers.pop('X-Amz-Date', None) - request.headers.pop('X-Amz-Security-Token', None) - - emitter = client._endpoint._event_emitter - emitter.register_first( - 'before-send.bedrock-runtime.Converse', - _inject_headers, - ) - emitter.register_first( - 'before-send.bedrock-runtime.ConverseStream', - _inject_headers, - ) - emitter.register_first( - 'before-send.bedrock-runtime.InvokeModel', - _inject_headers, - ) - emitter.register_first( - 'before-send.bedrock-runtime.InvokeModelWithResponseStream', - _inject_headers, - ) + if token: + request.headers['Authorization'] = f'Bearer {token}' + request.headers.pop('X-Amz-Date', None) + request.headers.pop('X-Amz-Security-Token', None) + + emitter = client._endpoint._event_emitter + emitter.register_first( + 'before-send.bedrock-runtime.Converse', + _inject_headers, + ) + emitter.register_first( + 'before-send.bedrock-runtime.ConverseStream', + _inject_headers, + ) + emitter.register_first( + 'before-send.bedrock-runtime.InvokeModel', + _inject_headers, + ) + emitter.register_first( + 'before-send.bedrock-runtime.InvokeModelWithResponseStream', + _inject_headers, + ) + return ChatBedrockConverse( - model=model_name, + model_id=model_name, endpoint_url=info.connection_url, # redirect requests to UMG region_name='us-east-1', # dummy value; UMG does not use this aws_access_key_id='placeholder', # dummy value; UMG does not use this diff --git a/singlestoredb/ai/embeddings.py b/singlestoredb/ai/embeddings.py index 106b66328..a26cc6383 100644 --- a/singlestoredb/ai/embeddings.py +++ b/singlestoredb/ai/embeddings.py @@ -1,5 +1,10 @@ import os from typing import Any +from typing import Callable +from typing import Optional +from typing import Union + +import httpx from singlestoredb.fusion.handlers.utils import get_workspace_manager @@ -11,6 +16,18 @@ 'Please install it with `pip install langchain_openai`.', ) +try: + from langchain_aws import BedrockEmbeddings +except ImportError: + raise ImportError( + 'Could not import langchain-aws python package. ' + 'Please install it with `pip install langchain-aws`.', + ) + +import boto3 +from botocore import UNSIGNED +from botocore.config import Config + class SingleStoreEmbeddings(OpenAIEmbeddings): @@ -25,3 +42,84 @@ def __init__(self, model_name: str, **kwargs: Any): model=model_name, **kwargs, ) + + +def SingleStoreEmbeddingsFactory( + model_name: str, + api_key: Optional[str] = None, + http_client: Optional[httpx.Client] = None, + obo_token_getter: Optional[Callable[[], Optional[str]]] = None, + **kwargs: Any, +) -> Union[OpenAIEmbeddings, BedrockEmbeddings]: + """Return an embeddings model instance (OpenAIEmbeddings or BedrockEmbeddings). + """ + inference_api_manager = ( + get_workspace_manager().organizations.current.inference_apis + ) + info = inference_api_manager.get(model_name=model_name) + token_env = os.environ.get('SINGLESTOREDB_USER_TOKEN') + token = api_key if api_key is not None else token_env + + if info.hosting_platform == 'Amazon': + # Instantiate Bedrock client + cfg_kwargs = { + 'signature_version': UNSIGNED, + 'retries': {'max_attempts': 1, 'mode': 'standard'}, + } + if http_client is not None and http_client.timeout is not None: + cfg_kwargs['read_timeout'] = http_client.timeout + cfg_kwargs['connect_timeout'] = http_client.timeout + + cfg = Config(**cfg_kwargs) + client = boto3.client( + 'bedrock-runtime', + endpoint_url=info.connection_url, # redirect requests to UMG + region_name='us-east-1', # dummy value; UMG does not use this + aws_access_key_id='placeholder', # dummy value; UMG does not use this + aws_secret_access_key='placeholder', # dummy value; UMG does not use this + config=cfg, + ) + + def _inject_headers(request: Any, **_ignored: Any) -> None: + """Inject dynamic auth/OBO headers prior to Bedrock sending.""" + if obo_token_getter is not None: + obo_val = obo_token_getter() + if obo_val: + request.headers['X-S2-OBO'] = obo_val + if token: + request.headers['Authorization'] = f'Bearer {token}' + request.headers.pop('X-Amz-Date', None) + request.headers.pop('X-Amz-Security-Token', None) + + emitter = client._endpoint._event_emitter + emitter.register_first( + 'before-send.bedrock-runtime.InvokeModel', + _inject_headers, + ) + emitter.register_first( + 'before-send.bedrock-runtime.InvokeModelWithResponseStream', + _inject_headers, + ) + + return BedrockEmbeddings( + model_id=model_name, + endpoint_url=info.connection_url, # redirect requests to UMG + region_name='us-east-1', # dummy value; UMG does not use this + aws_access_key_id='placeholder', # dummy value; UMG does not use this + aws_secret_access_key='placeholder', # dummy value; UMG does not use this + client=client, + **kwargs, + ) + + # OpenAI / Azure OpenAI path + openai_kwargs = dict( + base_url=info.connection_url, + api_key=token, + model=model_name, + ) + if http_client is not None: + openai_kwargs['http_client'] = http_client + return OpenAIEmbeddings( + **openai_kwargs, + **kwargs, + ) From 540100d55b48380872d00049649f99a4bbbfeb13 Mon Sep 17 00:00:00 2001 From: mgiannakopoulos Date: Mon, 22 Sep 2025 11:58:51 -0400 Subject: [PATCH 19/21] Fix openai langchain library. --- singlestoredb/ai/chat.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/singlestoredb/ai/chat.py b/singlestoredb/ai/chat.py index b8e655cbe..3b2344dd5 100644 --- a/singlestoredb/ai/chat.py +++ b/singlestoredb/ai/chat.py @@ -13,7 +13,7 @@ except ImportError: raise ImportError( 'Could not import langchain_openai python package. ' - 'Please install it with `pip install langchain-openai`.', + 'Please install it with `pip install langchain_openai`.', ) try: From 80b487be9c2b6cef22e0fe1c65d7b9d2b468ba28 Mon Sep 17 00:00:00 2001 From: mgiannakopoulos Date: Mon, 22 Sep 2025 13:49:18 -0400 Subject: [PATCH 20/21] Remove any comments that expose internal implementation details. --- singlestoredb/ai/chat.py | 16 ++++++++-------- singlestoredb/ai/embeddings.py | 16 ++++++++-------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/singlestoredb/ai/chat.py b/singlestoredb/ai/chat.py index 3b2344dd5..5a796aaa7 100644 --- a/singlestoredb/ai/chat.py +++ b/singlestoredb/ai/chat.py @@ -97,10 +97,10 @@ def SingleStoreChatFactory( cfg = Config(**cfg_kwargs) client = boto3.client( 'bedrock-runtime', - endpoint_url=info.connection_url, # redirect requests to UMG - region_name='us-east-1', # dummy value; UMG does not use this - aws_access_key_id='placeholder', # dummy value; UMG does not use this - aws_secret_access_key='placeholder', # dummy value; UMG does not use this + endpoint_url=info.connection_url, + region_name='us-east-1', + aws_access_key_id='placeholder', + aws_secret_access_key='placeholder', config=cfg, ) @@ -135,10 +135,10 @@ def _inject_headers(request: Any, **_ignored: Any) -> None: return ChatBedrockConverse( model_id=model_name, - endpoint_url=info.connection_url, # redirect requests to UMG - region_name='us-east-1', # dummy value; UMG does not use this - aws_access_key_id='placeholder', # dummy value; UMG does not use this - aws_secret_access_key='placeholder', # dummy value; UMG does not use this + endpoint_url=info.connection_url, + region_name='us-east-1', + aws_access_key_id='placeholder', + aws_secret_access_key='placeholder', disable_streaming=not streaming, client=client, **kwargs, diff --git a/singlestoredb/ai/embeddings.py b/singlestoredb/ai/embeddings.py index a26cc6383..0f455d8e4 100644 --- a/singlestoredb/ai/embeddings.py +++ b/singlestoredb/ai/embeddings.py @@ -73,10 +73,10 @@ def SingleStoreEmbeddingsFactory( cfg = Config(**cfg_kwargs) client = boto3.client( 'bedrock-runtime', - endpoint_url=info.connection_url, # redirect requests to UMG - region_name='us-east-1', # dummy value; UMG does not use this - aws_access_key_id='placeholder', # dummy value; UMG does not use this - aws_secret_access_key='placeholder', # dummy value; UMG does not use this + endpoint_url=info.connection_url, + region_name='us-east-1', + aws_access_key_id='placeholder', + aws_secret_access_key='placeholder', config=cfg, ) @@ -103,10 +103,10 @@ def _inject_headers(request: Any, **_ignored: Any) -> None: return BedrockEmbeddings( model_id=model_name, - endpoint_url=info.connection_url, # redirect requests to UMG - region_name='us-east-1', # dummy value; UMG does not use this - aws_access_key_id='placeholder', # dummy value; UMG does not use this - aws_secret_access_key='placeholder', # dummy value; UMG does not use this + endpoint_url=info.connection_url, + region_name='us-east-1', + aws_access_key_id='placeholder', + aws_secret_access_key='placeholder', client=client, **kwargs, ) From ca059a34630ff31491ee1815f3d6d20454c19f27 Mon Sep 17 00:00:00 2001 From: mgiannakopoulos Date: Mon, 22 Sep 2025 15:15:06 -0400 Subject: [PATCH 21/21] Minor fixes. --- singlestoredb/ai/__init__.py | 3 +++ singlestoredb/ai/chat.py | 8 ++++---- singlestoredb/ai/embeddings.py | 6 +++--- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/singlestoredb/ai/__init__.py b/singlestoredb/ai/__init__.py index 6f97c99f4..31a4db2eb 100644 --- a/singlestoredb/ai/__init__.py +++ b/singlestoredb/ai/__init__.py @@ -1,2 +1,5 @@ +from .chat import SingleStoreChat # noqa: F401 +from .chat import SingleStoreChatFactory # noqa: F401 from .chat import SingleStoreChatOpenAI # noqa: F401 from .embeddings import SingleStoreEmbeddings # noqa: F401 +from .embeddings import SingleStoreEmbeddingsFactory # noqa: F401 diff --git a/singlestoredb/ai/chat.py b/singlestoredb/ai/chat.py index 5a796aaa7..88481760e 100644 --- a/singlestoredb/ai/chat.py +++ b/singlestoredb/ai/chat.py @@ -6,7 +6,7 @@ import httpx -from singlestoredb.fusion.handlers.utils import get_workspace_manager +from singlestoredb import manage_workspaces try: from langchain_openai import ChatOpenAI @@ -32,7 +32,7 @@ class SingleStoreChatOpenAI(ChatOpenAI): def __init__(self, model_name: str, api_key: Optional[str] = None, **kwargs: Any): inference_api_manger = ( - get_workspace_manager().organizations.current.inference_apis + manage_workspaces().organizations.current.inference_apis ) info = inference_api_manger.get(model_name=model_name) token = ( @@ -51,7 +51,7 @@ def __init__(self, model_name: str, api_key: Optional[str] = None, **kwargs: Any class SingleStoreChat(ChatOpenAI): def __init__(self, model_name: str, api_key: Optional[str] = None, **kwargs: Any): inference_api_manger = ( - get_workspace_manager().organizations.current.inference_apis + manage_workspaces().organizations.current.inference_apis ) info = inference_api_manger.get(model_name=model_name) token = ( @@ -78,7 +78,7 @@ def SingleStoreChatFactory( """Return a chat model instance (ChatOpenAI or ChatBedrockConverse). """ inference_api_manager = ( - get_workspace_manager().organizations.current.inference_apis + manage_workspaces().organizations.current.inference_apis ) info = inference_api_manager.get(model_name=model_name) token_env = os.environ.get('SINGLESTOREDB_USER_TOKEN') diff --git a/singlestoredb/ai/embeddings.py b/singlestoredb/ai/embeddings.py index 0f455d8e4..bd6c81ef0 100644 --- a/singlestoredb/ai/embeddings.py +++ b/singlestoredb/ai/embeddings.py @@ -6,7 +6,7 @@ import httpx -from singlestoredb.fusion.handlers.utils import get_workspace_manager +from singlestoredb import manage_workspaces try: from langchain_openai import OpenAIEmbeddings @@ -33,7 +33,7 @@ class SingleStoreEmbeddings(OpenAIEmbeddings): def __init__(self, model_name: str, **kwargs: Any): inference_api_manger = ( - get_workspace_manager().organizations.current.inference_apis + manage_workspaces().organizations.current.inference_apis ) info = inference_api_manger.get(model_name=model_name) super().__init__( @@ -54,7 +54,7 @@ def SingleStoreEmbeddingsFactory( """Return an embeddings model instance (OpenAIEmbeddings or BedrockEmbeddings). """ inference_api_manager = ( - get_workspace_manager().organizations.current.inference_apis + manage_workspaces().organizations.current.inference_apis ) info = inference_api_manager.get(model_name=model_name) token_env = os.environ.get('SINGLESTOREDB_USER_TOKEN')