diff --git a/singlestoredb/ai/__init__.py b/singlestoredb/ai/__init__.py index 6f97c99f4..31a4db2eb 100644 --- a/singlestoredb/ai/__init__.py +++ b/singlestoredb/ai/__init__.py @@ -1,2 +1,5 @@ +from .chat import SingleStoreChat # noqa: F401 +from .chat import SingleStoreChatFactory # noqa: F401 from .chat import SingleStoreChatOpenAI # noqa: F401 from .embeddings import SingleStoreEmbeddings # noqa: F401 +from .embeddings import SingleStoreEmbeddingsFactory # noqa: F401 diff --git a/singlestoredb/ai/chat.py b/singlestoredb/ai/chat.py index c637d390d..88481760e 100644 --- a/singlestoredb/ai/chat.py +++ b/singlestoredb/ai/chat.py @@ -1,7 +1,12 @@ import os from typing import Any +from typing import Callable +from typing import Optional +from typing import Union -from singlestoredb.fusion.handlers.utils import get_workspace_manager +import httpx + +from singlestoredb import manage_workspaces try: from langchain_openai import ChatOpenAI @@ -11,30 +16,144 @@ 'Please install it with `pip install langchain_openai`.', ) +try: + from langchain_aws import ChatBedrockConverse +except ImportError: + raise ImportError( + 'Could not import langchain-aws python package. ' + 'Please install it with `pip install langchain-aws`.', + ) + +import boto3 +from botocore import UNSIGNED +from botocore.config import Config + class SingleStoreChatOpenAI(ChatOpenAI): - def __init__(self, model_name: str, **kwargs: Any): + def __init__(self, model_name: str, api_key: Optional[str] = None, **kwargs: Any): inference_api_manger = ( - get_workspace_manager().organizations.current.inference_apis + manage_workspaces().organizations.current.inference_apis ) info = inference_api_manger.get(model_name=model_name) + token = ( + api_key + if api_key is not None + else os.environ.get('SINGLESTOREDB_USER_TOKEN') + ) super().__init__( base_url=info.connection_url, - api_key=os.environ.get('SINGLESTOREDB_USER_TOKEN'), + api_key=token, model=model_name, **kwargs, ) class SingleStoreChat(ChatOpenAI): - def __init__(self, model_name: str, **kwargs: Any): + def __init__(self, model_name: str, api_key: Optional[str] = None, **kwargs: Any): inference_api_manger = ( - get_workspace_manager().organizations.current.inference_apis + manage_workspaces().organizations.current.inference_apis ) info = inference_api_manger.get(model_name=model_name) + token = ( + api_key + if api_key is not None + else os.environ.get('SINGLESTOREDB_USER_TOKEN') + ) super().__init__( base_url=info.connection_url, - api_key=os.environ.get('SINGLESTOREDB_USER_TOKEN'), + api_key=token, model=model_name, **kwargs, ) + + +def SingleStoreChatFactory( + model_name: str, + api_key: Optional[str] = None, + streaming: bool = True, + http_client: Optional[httpx.Client] = None, + obo_token_getter: Optional[Callable[[], Optional[str]]] = None, + **kwargs: Any, +) -> Union[ChatOpenAI, ChatBedrockConverse]: + """Return a chat model instance (ChatOpenAI or ChatBedrockConverse). + """ + inference_api_manager = ( + manage_workspaces().organizations.current.inference_apis + ) + info = inference_api_manager.get(model_name=model_name) + token_env = os.environ.get('SINGLESTOREDB_USER_TOKEN') + token = api_key if api_key is not None else token_env + + if info.hosting_platform == 'Amazon': + # Instantiate Bedrock client + cfg_kwargs = { + 'signature_version': UNSIGNED, + 'retries': {'max_attempts': 1, 'mode': 'standard'}, + } + if http_client is not None and http_client.timeout is not None: + cfg_kwargs['read_timeout'] = http_client.timeout + cfg_kwargs['connect_timeout'] = http_client.timeout + + cfg = Config(**cfg_kwargs) + client = boto3.client( + 'bedrock-runtime', + endpoint_url=info.connection_url, + region_name='us-east-1', + aws_access_key_id='placeholder', + aws_secret_access_key='placeholder', + config=cfg, + ) + + def _inject_headers(request: Any, **_ignored: Any) -> None: + """Inject dynamic auth/OBO headers prior to Bedrock sending.""" + if obo_token_getter is not None: + obo_val = obo_token_getter() + if obo_val: + request.headers['X-S2-OBO'] = obo_val + if token: + request.headers['Authorization'] = f'Bearer {token}' + request.headers.pop('X-Amz-Date', None) + request.headers.pop('X-Amz-Security-Token', None) + + emitter = client._endpoint._event_emitter + emitter.register_first( + 'before-send.bedrock-runtime.Converse', + _inject_headers, + ) + emitter.register_first( + 'before-send.bedrock-runtime.ConverseStream', + _inject_headers, + ) + emitter.register_first( + 'before-send.bedrock-runtime.InvokeModel', + _inject_headers, + ) + emitter.register_first( + 'before-send.bedrock-runtime.InvokeModelWithResponseStream', + _inject_headers, + ) + + return ChatBedrockConverse( + model_id=model_name, + endpoint_url=info.connection_url, + region_name='us-east-1', + aws_access_key_id='placeholder', + aws_secret_access_key='placeholder', + disable_streaming=not streaming, + client=client, + **kwargs, + ) + + # OpenAI / Azure OpenAI path + openai_kwargs = dict( + base_url=info.connection_url, + api_key=token, + model=model_name, + streaming=streaming, + ) + if http_client is not None: + openai_kwargs['http_client'] = http_client + return ChatOpenAI( + **openai_kwargs, + **kwargs, + ) diff --git a/singlestoredb/ai/embeddings.py b/singlestoredb/ai/embeddings.py index 106b66328..bd6c81ef0 100644 --- a/singlestoredb/ai/embeddings.py +++ b/singlestoredb/ai/embeddings.py @@ -1,7 +1,12 @@ import os from typing import Any +from typing import Callable +from typing import Optional +from typing import Union -from singlestoredb.fusion.handlers.utils import get_workspace_manager +import httpx + +from singlestoredb import manage_workspaces try: from langchain_openai import OpenAIEmbeddings @@ -11,12 +16,24 @@ 'Please install it with `pip install langchain_openai`.', ) +try: + from langchain_aws import BedrockEmbeddings +except ImportError: + raise ImportError( + 'Could not import langchain-aws python package. ' + 'Please install it with `pip install langchain-aws`.', + ) + +import boto3 +from botocore import UNSIGNED +from botocore.config import Config + class SingleStoreEmbeddings(OpenAIEmbeddings): def __init__(self, model_name: str, **kwargs: Any): inference_api_manger = ( - get_workspace_manager().organizations.current.inference_apis + manage_workspaces().organizations.current.inference_apis ) info = inference_api_manger.get(model_name=model_name) super().__init__( @@ -25,3 +42,84 @@ def __init__(self, model_name: str, **kwargs: Any): model=model_name, **kwargs, ) + + +def SingleStoreEmbeddingsFactory( + model_name: str, + api_key: Optional[str] = None, + http_client: Optional[httpx.Client] = None, + obo_token_getter: Optional[Callable[[], Optional[str]]] = None, + **kwargs: Any, +) -> Union[OpenAIEmbeddings, BedrockEmbeddings]: + """Return an embeddings model instance (OpenAIEmbeddings or BedrockEmbeddings). + """ + inference_api_manager = ( + manage_workspaces().organizations.current.inference_apis + ) + info = inference_api_manager.get(model_name=model_name) + token_env = os.environ.get('SINGLESTOREDB_USER_TOKEN') + token = api_key if api_key is not None else token_env + + if info.hosting_platform == 'Amazon': + # Instantiate Bedrock client + cfg_kwargs = { + 'signature_version': UNSIGNED, + 'retries': {'max_attempts': 1, 'mode': 'standard'}, + } + if http_client is not None and http_client.timeout is not None: + cfg_kwargs['read_timeout'] = http_client.timeout + cfg_kwargs['connect_timeout'] = http_client.timeout + + cfg = Config(**cfg_kwargs) + client = boto3.client( + 'bedrock-runtime', + endpoint_url=info.connection_url, + region_name='us-east-1', + aws_access_key_id='placeholder', + aws_secret_access_key='placeholder', + config=cfg, + ) + + def _inject_headers(request: Any, **_ignored: Any) -> None: + """Inject dynamic auth/OBO headers prior to Bedrock sending.""" + if obo_token_getter is not None: + obo_val = obo_token_getter() + if obo_val: + request.headers['X-S2-OBO'] = obo_val + if token: + request.headers['Authorization'] = f'Bearer {token}' + request.headers.pop('X-Amz-Date', None) + request.headers.pop('X-Amz-Security-Token', None) + + emitter = client._endpoint._event_emitter + emitter.register_first( + 'before-send.bedrock-runtime.InvokeModel', + _inject_headers, + ) + emitter.register_first( + 'before-send.bedrock-runtime.InvokeModelWithResponseStream', + _inject_headers, + ) + + return BedrockEmbeddings( + model_id=model_name, + endpoint_url=info.connection_url, + region_name='us-east-1', + aws_access_key_id='placeholder', + aws_secret_access_key='placeholder', + client=client, + **kwargs, + ) + + # OpenAI / Azure OpenAI path + openai_kwargs = dict( + base_url=info.connection_url, + api_key=token, + model=model_name, + ) + if http_client is not None: + openai_kwargs['http_client'] = http_client + return OpenAIEmbeddings( + **openai_kwargs, + **kwargs, + ) diff --git a/singlestoredb/management/inference_api.py b/singlestoredb/management/inference_api.py index f017339d4..13b6413c2 100644 --- a/singlestoredb/management/inference_api.py +++ b/singlestoredb/management/inference_api.py @@ -23,6 +23,7 @@ class InferenceAPIInfo(object): name: str connection_url: str project_id: str + hosting_platform: str def __init__( self, @@ -31,12 +32,14 @@ def __init__( name: str, connection_url: str, project_id: str, + hosting_platform: str, ): self.service_id = service_id self.connection_url = connection_url self.model_name = model_name self.name = name self.project_id = project_id + self.hosting_platform = hosting_platform @classmethod def from_dict( @@ -62,6 +65,7 @@ def from_dict( model_name=obj['modelName'], name=obj['name'], connection_url=obj['connectionURL'], + hosting_platform=obj['hostingPlatform'], ) return out