diff --git a/.gitignore b/.gitignore index 6463b520..063f6123 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,4 @@ __pycache__/ poetry.toml .ruff_cache/ .vscode -.env \ No newline at end of file +.env diff --git a/pyproject.toml b/pyproject.toml index e559a681..2d113069 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,3 +1,6 @@ +[project] +name = "humanloop" + [tool.poetry] name = "humanloop" version = "0.8.22" diff --git a/src/humanloop/client.py b/src/humanloop/client.py index 1663e7ed..af2b1f38 100644 --- a/src/humanloop/client.py +++ b/src/humanloop/client.py @@ -10,16 +10,16 @@ from opentelemetry.trace import Tracer from humanloop.core.client_wrapper import SyncClientWrapper -from humanloop.decorators.types import DecoratorPromptKernelRequestParams +from humanloop.utilities.types import DecoratorPromptKernelRequestParams from humanloop.eval_utils.context import EVALUATION_CONTEXT_VARIABLE_NAME, EvaluationContext from humanloop.eval_utils import log_with_evaluation_context, run_eval from humanloop.eval_utils.types import Dataset, Evaluator, EvaluatorCheck, File from humanloop.base_client import AsyncBaseHumanloop, BaseHumanloop -from humanloop.decorators.flow import flow as flow_decorator_factory -from humanloop.decorators.prompt import prompt as prompt_decorator_factory -from humanloop.decorators.tool import tool as tool_decorator_factory +from humanloop.utilities.flow import flow as flow_decorator_factory +from humanloop.utilities.prompt import prompt as prompt_decorator_factory +from humanloop.utilities.tool import tool as tool_decorator_factory from humanloop.environment import HumanloopEnvironment from humanloop.evaluations.client import EvaluationsClient from humanloop.otel import instrument_provider @@ -49,7 +49,6 @@ def run( name: Optional[str], dataset: Dataset, evaluators: Optional[Sequence[Evaluator]] = None, - # logs: typing.Sequence[dict] | None = None, workers: int = 4, ) -> List[EvaluatorCheck]: """Evaluate your function for a given `Dataset` and set of `Evaluators`. diff --git a/src/humanloop/eval_utils/run.py b/src/humanloop/eval_utils/run.py index ff6dce61..3d1a5c9e 100644 --- a/src/humanloop/eval_utils/run.py +++ b/src/humanloop/eval_utils/run.py @@ -212,10 +212,6 @@ def increment(self): sys.stderr.write("\n") -# Module-level so it can be shared by threads. -_PROGRESS_BAR: Optional[_SimpleProgressBar] = None - - def run_eval( client: "BaseHumanloop", file: File, @@ -236,7 +232,6 @@ def run_eval( :param workers: the number of threads to process datapoints using your function concurrently. :return: per Evaluator checks. """ - global _PROGRESS_BAR if hasattr(file["callable"], "file"): # When the decorator inside `file` is a decorated function, diff --git a/src/humanloop/otel/__init__.py b/src/humanloop/otel/__init__.py index 0a1eab92..3442161e 100644 --- a/src/humanloop/otel/__init__.py +++ b/src/humanloop/otel/__init__.py @@ -1,7 +1,4 @@ -from typing import Optional, TypedDict - from opentelemetry.sdk.trace import TracerProvider -from typing_extensions import NotRequired from humanloop.otel.helpers import module_is_installed @@ -41,12 +38,3 @@ def instrument_provider(provider: TracerProvider): from opentelemetry.instrumentation.bedrock import BedrockInstrumentor BedrockInstrumentor().instrument(tracer_provider=provider) - - -class FlowContext(TypedDict): - trace_id: NotRequired[str] - trace_parent_id: NotRequired[Optional[int]] - is_flow_log: NotRequired[bool] - - -TRACE_FLOW_CONTEXT: dict[int, FlowContext] = {} diff --git a/src/humanloop/otel/constants.py b/src/humanloop/otel/constants.py index d28126a0..06de824d 100644 --- a/src/humanloop/otel/constants.py +++ b/src/humanloop/otel/constants.py @@ -4,3 +4,5 @@ HUMANLOOP_LOG_KEY = "humanloop.log" HUMANLOOP_FILE_TYPE_KEY = "humanloop.file.type" HUMANLOOP_PATH_KEY = "humanloop.file.path" +# Required for the exporter to know when to mark the Flow Log as complete +HUMANLOOP_FLOW_PREREQUISITES_KEY = "humanloop.flow.prerequisites" diff --git a/src/humanloop/otel/exporter.py b/src/humanloop/otel/exporter.py index 7e7de7e5..544d2e7b 100644 --- a/src/humanloop/otel/exporter.py +++ b/src/humanloop/otel/exporter.py @@ -1,7 +1,7 @@ import contextvars -import json import logging import threading +import time import typing from queue import Empty as EmptyQueue from queue import Queue @@ -14,16 +14,17 @@ from humanloop.core import ApiError as HumanloopApiError from humanloop.eval_utils.context import EVALUATION_CONTEXT_VARIABLE_NAME, EvaluationContext -from humanloop.otel import TRACE_FLOW_CONTEXT, FlowContext from humanloop.otel.constants import ( HUMANLOOP_FILE_KEY, HUMANLOOP_FILE_TYPE_KEY, + HUMANLOOP_FLOW_PREREQUISITES_KEY, HUMANLOOP_LOG_KEY, HUMANLOOP_PATH_KEY, ) from humanloop.otel.helpers import is_humanloop_span, read_from_opentelemetry_span from humanloop.requests.flow_kernel_request import FlowKernelRequestParams from humanloop.requests.prompt_kernel_request import PromptKernelRequestParams +from humanloop.requests.tool_kernel_request import ToolKernelRequestParams if typing.TYPE_CHECKING: from humanloop.client import Humanloop @@ -69,7 +70,8 @@ def __init__( for thread in self._threads: thread.start() logger.debug("Exporter Thread %s started", thread.ident) - self._flow_logs_to_complete: list[str] = [] + # Flow Log Span ID mapping to children Spans that must be uploaded first + self._flow_log_prerequisites: dict[int, set[int]] = {} def export(self, spans: trace.Sequence[ReadableSpan]) -> SpanExportResult: def is_evaluated_file( @@ -106,8 +108,9 @@ def is_evaluated_file( ), ) logger.debug( - "Span %s with EvaluationContext %s added to upload queue", - span.attributes, + "[HumanloopSpanExporter] Span %s %s with EvaluationContext %s added to upload queue", + span.context.span_id, + span.name, evaluation_context_copy, ) # Reset the EvaluationContext so run eval does not @@ -117,7 +120,7 @@ def is_evaluated_file( evaluation_context, ): logger.debug( - "EvaluationContext %s marked as exhausted for Log in Span %s", + "[HumanloopSpanExporter] EvaluationContext %s marked as exhausted for Log in Span %s", evaluation_context, spans[0].attributes, ) @@ -125,21 +128,16 @@ def is_evaluated_file( self._client.evaluation_context_variable.set(None) return SpanExportResult.SUCCESS else: - logger.warning("HumanloopSpanExporter is shutting down, not accepting new spans") + logger.warning("[HumanloopSpanExporter] Shutting down, not accepting new spans") return SpanExportResult.FAILURE def shutdown(self) -> None: self._shutdown = True for thread in self._threads: thread.join() - logger.debug("Exporter Thread %s joined", thread.ident) - for log_id in self._flow_logs_to_complete: - self._client.flows.update_log( - log_id=log_id, - trace_status="complete", - ) + logger.debug("[HumanloopSpanExporter] Exporter Thread %s joined", thread.ident) - def force_flush(self, timeout_millis: int = 3000) -> bool: + def force_flush(self, timeout_millis: int = 10000) -> bool: self._shutdown = True for thread in self._threads: thread.join(timeout=timeout_millis) @@ -178,42 +176,64 @@ def _do_work(self): self._client.evaluation_context_variable.set(evaluation_context) except EmptyQueue: continue - trace_metadata = TRACE_FLOW_CONTEXT.get(span_to_export.get_span_context().span_id) - if trace_metadata is None: + if span_to_export.parent is None: # Span is not part of a Flow Log self._export_span_dispatch(span_to_export) logger.debug( - "_do_work on Thread %s: Dispatched span %s with FlowContext %s which is not part of a Flow", + "[HumanloopSpanExporter] _do_work on Thread %s: Dispatching span %s %s", threading.get_ident(), - span_to_export.attributes, - trace_metadata, - ) - elif trace_metadata["trace_parent_id"] is None: - # Span is the head of a Flow Trace - self._export_span_dispatch(span_to_export) - logger.debug( - "Dispatched span %s which is a Flow Log with FlowContext %s", - span_to_export.attributes, - trace_metadata, + span_to_export.context.span_id, + span_to_export.name, ) - elif trace_metadata["trace_parent_id"] in self._span_id_to_uploaded_log_id: + elif span_to_export.parent.span_id in self._span_id_to_uploaded_log_id: # Span is part of a Flow and its parent has been uploaded self._export_span_dispatch(span_to_export) logger.debug( - "_do_work on Thread %s: Dispatched span %s after its parent %s with FlowContext %s", + "[HumanloopSpanExporter] _do_work on Thread %s: Dispatching span %s %s", threading.get_ident(), - span_to_export.attributes, - trace_metadata["trace_parent_id"], - trace_metadata, + span_to_export.context.span_id, + span_to_export.name, ) else: - # Requeue the Span to be uploaded later + # Requeue the Span and upload after its parent self._upload_queue.put((span_to_export, evaluation_context)) self._upload_queue.task_done() + def _mark_span_completed(self, span_id: int) -> None: + for flow_log_span_id, flow_children_span_ids in self._flow_log_prerequisites.items(): + if span_id in flow_children_span_ids: + flow_children_span_ids.remove(span_id) + if len(flow_children_span_ids) == 0: + # All logs in the Trace have been uploaded, mark the Flow Log as complete + flow_log_id = self._span_id_to_uploaded_log_id[flow_log_span_id] + if flow_log_id is None: + logger.error( + "[HumanloopSpanExporter] Cannot complete Flow log %s, log ID is None", + flow_log_span_id, + ) + else: + self._client.flows.update_log(log_id=flow_log_id, trace_status="complete") + break + def _export_span_dispatch(self, span: ReadableSpan) -> None: hl_file = read_from_opentelemetry_span(span, key=HUMANLOOP_FILE_KEY) file_type = span._attributes.get(HUMANLOOP_FILE_TYPE_KEY) # type: ignore + parent_span_id = span.parent.span_id if span.parent else None + + while parent_span_id and self._span_id_to_uploaded_log_id.get(parent_span_id) is None: + logger.debug( + "[HumanloopSpanExporter] _export_span_dispatch on Thread %s Span %s %s waiting for parent %s to be uploaded", + threading.get_ident(), + span.context.span_id, + span.name, + parent_span_id, + ) + + logger.debug( + "[HumanloopSpanExporter] Exporting span %s with file type %s", + span, + file_type, + ) if file_type == "prompt": export_func = self._export_prompt @@ -242,25 +262,16 @@ def _export_prompt(self, span: ReadableSpan) -> None: log_object["messages"] = [] if "tools" not in file_object["prompt"]: file_object["prompt"]["tools"] = [] - trace_metadata = TRACE_FLOW_CONTEXT.get(span.get_span_context().span_id) - if trace_metadata and "trace_parent_id" in trace_metadata and trace_metadata["trace_parent_id"]: - trace_parent_id = self._span_id_to_uploaded_log_id[trace_metadata["trace_parent_id"]] - if trace_parent_id is None: - # Parent Log in Trace upload failed - file_path = read_from_opentelemetry_span(span, key=HUMANLOOP_PATH_KEY) - logger.error(f"Skipping log for {file_path}: parent Log upload failed") - return - else: - trace_parent_id = None - prompt: PromptKernelRequestParams = file_object["prompt"] + path: str = file_object["path"] - if "output" in log_object: - if not isinstance(log_object["output"], str): - # Output expected to be a string, if decorated function - # does not return one, jsonify it - log_object["output"] = json.dumps(log_object["output"]) + prompt: PromptKernelRequestParams = file_object["prompt"] + + span_parent_id = span.parent.span_id if span.parent else None + trace_parent_id = self._span_id_to_uploaded_log_id[span_parent_id] if span_parent_id else None + if "attributes" not in prompt or not prompt["attributes"]: prompt["attributes"] = {} + try: log_response = self._client.prompts.log( path=path, @@ -271,34 +282,32 @@ def _export_prompt(self, span: ReadableSpan) -> None: self._span_id_to_uploaded_log_id[span.context.span_id] = log_response.id except HumanloopApiError: self._span_id_to_uploaded_log_id[span.context.span_id] = None + self._mark_span_completed(span_id=span.context.span_id) def _export_tool(self, span: ReadableSpan) -> None: - file_object: dict[str, Any] = read_from_opentelemetry_span(span, key=HUMANLOOP_FILE_KEY) - log_object: dict[str, Any] = read_from_opentelemetry_span(span, key=HUMANLOOP_LOG_KEY) - trace_metadata: FlowContext = TRACE_FLOW_CONTEXT.get(span.get_span_context().span_id, {}) - if "trace_parent_id" in trace_metadata and trace_metadata["trace_parent_id"]: - trace_parent_id = self._span_id_to_uploaded_log_id.get( - trace_metadata["trace_parent_id"], - ) - if trace_parent_id is None: - # Parent Log in Trace upload failed - file_path = read_from_opentelemetry_span(span, key=HUMANLOOP_PATH_KEY) - logger.error(f"Skipping log for {file_path}: parent Log upload failed") - return - else: - trace_parent_id = None - tool = file_object["tool"] + file_object: dict[str, Any] = read_from_opentelemetry_span( + span, + key=HUMANLOOP_FILE_KEY, + ) + log_object: dict[str, Any] = read_from_opentelemetry_span( + span, + key=HUMANLOOP_LOG_KEY, + ) + + path: str = file_object["path"] + tool: ToolKernelRequestParams = file_object["tool"] + + span_parent_id = span.parent.span_id if span.parent else None + trace_parent_id = self._span_id_to_uploaded_log_id[span_parent_id] if span_parent_id else None + + # API expects an empty dictionary if user does not supply attributes if not tool.get("attributes"): tool["attributes"] = {} if not tool.get("setup_values"): tool["setup_values"] = {} - path: str = file_object["path"] if "parameters" in tool["function"] and "properties" not in tool["function"]["parameters"]: tool["function"]["parameters"]["properties"] = {} - if not isinstance(log_object["output"], str): - # Output expected to be a string, if decorated function - # does not return one, jsonify it - log_object["output"] = json.dumps(log_object["output"]) + try: log_response = self._client.tools.log( path=path, @@ -309,33 +318,37 @@ def _export_tool(self, span: ReadableSpan) -> None: self._span_id_to_uploaded_log_id[span.context.span_id] = log_response.id except HumanloopApiError: self._span_id_to_uploaded_log_id[span.context.span_id] = None + self._mark_span_completed(span_id=span.context.span_id) def _export_flow(self, span: ReadableSpan) -> None: - file_object: dict[str, Any] = read_from_opentelemetry_span(span, key=HUMANLOOP_FILE_KEY) - log_object: dict[str, Any] = read_from_opentelemetry_span(span, key=HUMANLOOP_LOG_KEY) - trace_metadata: FlowContext = TRACE_FLOW_CONTEXT.get( - span.get_span_context().span_id, - {}, + file_object: dict[str, Any] = read_from_opentelemetry_span( + span, + key=HUMANLOOP_FILE_KEY, ) - if "trace_parent_id" in trace_metadata: - trace_parent_id = self._span_id_to_uploaded_log_id.get( - trace_metadata["trace_parent_id"], # type: ignore + log_object: dict[str, Any] = read_from_opentelemetry_span( + span, + key=HUMANLOOP_LOG_KEY, + ) + # Spans that must be uploaded before the Flow Span is completed + try: + prerequisites: list[int] = read_from_opentelemetry_span( # type: ignore + span=span, + key=HUMANLOOP_FLOW_PREREQUISITES_KEY, ) - if trace_parent_id is None and trace_metadata["trace_id"] != span.get_span_context().span_id: - # Parent Log in Trace upload failed - # NOTE: Check if the trace_id metadata field points to the - # span itself. This signifies the span is the head of the Trace - file_path = read_from_opentelemetry_span(span, key=HUMANLOOP_PATH_KEY) - logger.error(f"Skipping log for {file_path}: parent Log upload failed") - return - else: - trace_parent_id = None + self._flow_log_prerequisites[span.context.span_id] = set(prerequisites) + except KeyError: + self._flow_log_prerequisites[span.context.span_id] = set() + + path: str = file_object["path"] flow: FlowKernelRequestParams if not file_object.get("flow"): flow = {"attributes": {}} else: flow = file_object["flow"] - path: str = file_object["path"] + + span_parent_id = span.parent.span_id if span.parent else None + trace_parent_id = self._span_id_to_uploaded_log_id[span_parent_id] if span_parent_id else None + if "output" not in log_object: log_object["output"] = None try: @@ -345,8 +358,8 @@ def _export_flow(self, span: ReadableSpan) -> None: **log_object, trace_parent_id=trace_parent_id, ) - self._flow_logs_to_complete.append(log_response.id) self._span_id_to_uploaded_log_id[span.get_span_context().span_id] = log_response.id except HumanloopApiError as e: logger.error(str(e)) self._span_id_to_uploaded_log_id[span.context.span_id] = None + self._mark_span_completed(span_id=span.context.span_id) diff --git a/src/humanloop/otel/helpers.py b/src/humanloop/otel/helpers.py index d25a5674..9e645144 100644 --- a/src/humanloop/otel/helpers.py +++ b/src/humanloop/otel/helpers.py @@ -1,13 +1,10 @@ import json -import uuid from typing import Any, Callable, Union from opentelemetry.sdk.trace import ReadableSpan from opentelemetry.trace import SpanKind from opentelemetry.util.types import AttributeValue -from humanloop.otel.constants import HUMANLOOP_FILE_KEY, HUMANLOOP_LOG_KEY - NestedDict = dict[str, Union["NestedDict", AttributeValue]] NestedList = list[Union["NestedList", NestedDict]] @@ -267,13 +264,7 @@ def is_llm_provider_call(span: ReadableSpan) -> bool: def is_humanloop_span(span: ReadableSpan) -> bool: """Check if the Span was created by the Humanloop SDK.""" - try: - # Valid spans will have keys with the HL_FILE_OT_KEY and HL_LOG_OT_KEY prefixes present - read_from_opentelemetry_span(span, key=HUMANLOOP_FILE_KEY) - read_from_opentelemetry_span(span, key=HUMANLOOP_LOG_KEY) - except KeyError: - return False - return True + return span.name.startswith("humanloop.") def module_is_installed(module_name: str) -> bool: @@ -288,10 +279,6 @@ def module_is_installed(module_name: str) -> bool: return True -def generate_span_id() -> str: - return str(uuid.uuid4()) - - def jsonify_if_not_string(func: Callable, output: Any) -> str: if not isinstance(output, str): try: diff --git a/src/humanloop/otel/processor.py b/src/humanloop/otel/processor.py index 3542c244..d027cb35 100644 --- a/src/humanloop/otel/processor.py +++ b/src/humanloop/otel/processor.py @@ -1,13 +1,19 @@ +from concurrent.futures import ThreadPoolExecutor import logging from collections import defaultdict -from typing import Any +import time +from typing import Any, TypedDict -# No typing stubs for parse from opentelemetry.sdk.trace import ReadableSpan from opentelemetry.sdk.trace.export import SimpleSpanProcessor, SpanExporter from pydantic import ValidationError as PydanticValidationError -from humanloop.otel.constants import HUMANLOOP_FILE_KEY, HUMANLOOP_FILE_TYPE_KEY, HUMANLOOP_LOG_KEY +from humanloop.otel.constants import ( + HUMANLOOP_FILE_KEY, + HUMANLOOP_FILE_TYPE_KEY, + HUMANLOOP_FLOW_PREREQUISITES_KEY, + HUMANLOOP_LOG_KEY, +) from humanloop.otel.helpers import ( is_humanloop_span, is_llm_provider_call, @@ -19,6 +25,11 @@ logger = logging.getLogger("humanloop.sdk") +class CompletableSpan(TypedDict): + span: ReadableSpan + complete: bool + + class HumanloopSpanProcessor(SimpleSpanProcessor): """Enrich Humanloop spans with data from their children spans. @@ -38,30 +49,92 @@ class HumanloopSpanProcessor(SimpleSpanProcessor): def __init__(self, exporter: SpanExporter) -> None: super().__init__(exporter) - # Span parent to Span children map - self._children: dict[int, list] = defaultdict(list) - - # NOTE: Could override on_start and process Flow spans ahead of time - # and PATCH the created Logs in on_end. A special type of ReadableSpan could be - # used for this + # span parent to span children map + self._children: dict[int, list[CompletableSpan]] = defaultdict(list) + # List of all span IDs that are contained in a Flow trace + # They are passed to the Exporter as a span attribute + # so the Exporter knows when to complete a trace + self._prerequisites: dict[int, list[int]] = {} + self._executor = ThreadPoolExecutor(max_workers=4) + + def shutdown(self): + self._executor.shutdown() + return super().shutdown() + + def on_start(self, span, parent_context=None): + span_id = span.context.span_id + parent_span_id = span.parent.span_id if span.parent else None + if span.name == "humanloop.flow": + self._prerequisites[span_id] = [] + if parent_span_id and is_humanloop_span(span): + for trace_head, all_trace_nodes in self._prerequisites.items(): + if parent_span_id == trace_head or parent_span_id in all_trace_nodes: + all_trace_nodes.append(span_id) + break + # Handle stream case: when Prompt instrumented function calls a provider with streaming: true + # The instrumentor span will end only when the ChunksResponse is consumed, which can happen + # after the span created by the Prompt utility finishes. To handle this, we register all instrumentor + # spans belonging to a Humanloop span, and their parent will wait for them to complete in onEnd before + # exporting the Humanloop span. + if parent_span_id and _is_instrumentor_span(span): + if parent_span_id not in self._children: + self._children[parent_span_id] = [] + self._children[parent_span_id].append( + { + "span": span, + "complete": False, + } + ) def on_end(self, span: ReadableSpan) -> None: if is_humanloop_span(span=span): - _process_span_dispatch(span, self._children[span.context.span_id]) - # Release the reference to the Spans as they've already - # been sent to the Exporter - del self._children[span.context.span_id] + # Wait for children to complete asynchronously + self._executor.submit(self._wait_for_children, span=span) + elif span.parent is not None and _is_instrumentor_span(span): + # If this is one of the children spans waited upon, update its completion status + + # Updating the child span status + self._children[span.parent.span_id] = [ + child if child["span"].context.span_id != span.context.span_id else {"span": span, "complete": True} + for child in self._children[span.parent.span_id] + ] + + # Export the instrumentor span + self.span_exporter.export([span]) else: - if span.parent is not None and _is_instrumentor_span(span): - # Copy the Span and keep it until the Humanloop Span - # arrives in order to enrich it - self._children[span.parent.span_id].append(span) - # Pass the Span to the Exporter + # Unknown span, pass it to the Exporter + self.span_exporter.export([span]) + + def _wait_for_children(self, span: ReadableSpan): + """Wait for all children spans to complete before processing the Humanloop span.""" + span_id = span.context.span_id + while not all(child["complete"] for child in self._children[span_id]): + logger.debug( + "[HumanloopSpanProcessor] Span %s %s waiting for children to complete: %s", + span_id, + span.name, + self._children[span_id], + ) + # All instrumentor spans have arrived, we can process the + # Humanloop parent span owning them + if span.name == "humanloop.flow": + write_to_opentelemetry_span( + span=span, + key=HUMANLOOP_FLOW_PREREQUISITES_KEY, + value=self._prerequisites[span_id], + ) + del self._prerequisites[span_id] + logger.debug("[HumanloopSpanProcessor] Dispatching span %s %s", span_id, span.name) + _process_span_dispatch(span, [child["span"] for child in self._children[span_id]]) + # Release references + del self._children[span_id] + # Pass Humanloop span to Exporter + logger.debug("[HumanloopSpanProcessor] Sending span %s %s to exporter", span_id, span.name) self.span_exporter.export([span]) def _is_instrumentor_span(span: ReadableSpan) -> bool: - """Determine if the Span contains information of interest for Spans created by Humanloop decorators.""" + """Determine if the span contains information of interest for Spans created by Humanloop decorators.""" # At the moment we only enrich Spans created by the Prompt decorators # As we add Instrumentors for other libraries, this function must # be expanded @@ -87,7 +160,11 @@ def _process_span_dispatch(span: ReadableSpan, children_spans: list[ReadableSpan elif file_type == "flow": pass else: - logger.error("Unknown Humanloop File Span %s", span) + logger.error( + "[HumanloopSpanProcessor] Unknown Humanloop File span %s %s", + span.context.span_id, + span.name, + ) def _process_prompt(prompt_span: ReadableSpan, children_spans: list[ReadableSpan]): @@ -133,9 +210,14 @@ def _enrich_prompt_kernel(prompt_span: ReadableSpan, llm_provider_call_span: Rea # Validate the Prompt Kernel PromptKernelRequest.model_validate(obj=prompt) except PydanticValidationError as e: - logger.error("Could not validate Prompt Kernel extracted from Span: %s", e) - - # Write the enriched Prompt Kernel back to the Span + logger.error( + "[HumanloopSpanProcessor] Could not validate Prompt Kernel extracted from span: %s %s. Error: %s", + prompt_span.context.span_id, + prompt_span.name, + e, + ) + + # Write the enriched Prompt Kernel back to the span hl_file["prompt"] = prompt write_to_opentelemetry_span( span=prompt_span, diff --git a/src/humanloop/prompts/client.py b/src/humanloop/prompts/client.py index e3f402b8..48950fdc 100644 --- a/src/humanloop/prompts/client.py +++ b/src/humanloop/prompts/client.py @@ -1229,6 +1229,12 @@ def upsert( provider="openai", max_tokens=-1, temperature=0.7, + top_p=1.0, + presence_penalty=0.0, + frequency_penalty=0.0, + other={}, + tools=[], + linked_tools=[], commit_message="Initial commit", ) """ @@ -3094,116 +3100,125 @@ async def upsert( request_options: typing.Optional[RequestOptions] = None, ) -> PromptResponse: """ - Create a Prompt or update it with a new version if it already exists. + Create a Prompt or update it with a new version if it already exists. - Prompts are identified by the `ID` or their `path`. The parameters (i.e. the prompt template, temperature, model etc.) determine the versions of the Prompt. + Prompts are identified by the `ID` or their `path`. The parameters (i.e. the prompt template, temperature, model etc.) determine the versions of the Prompt. - If you provide a commit message, then the new version will be committed; - otherwise it will be uncommitted. If you try to commit an already committed version, - an exception will be raised. + If you provide a commit message, then the new version will be committed; + otherwise it will be uncommitted. If you try to commit an already committed version, + an exception will be raised. - Parameters - ---------- - model : str - The model instance used, e.g. `gpt-4`. See [supported models](https://humanloop.com/docs/reference/supported-models) + Parameters + ---------- + model : str + The model instance used, e.g. `gpt-4`. See [supported models](https://humanloop.com/docs/reference/supported-models) - path : typing.Optional[str] - Path of the Prompt, including the name. This locates the Prompt in the Humanloop filesystem and is used as as a unique identifier. For example: `folder/name` or just `name`. + path : typing.Optional[str] + Path of the Prompt, including the name. This locates the Prompt in the Humanloop filesystem and is used as as a unique identifier. For example: `folder/name` or just `name`. - id : typing.Optional[str] - ID for an existing Prompt. + id : typing.Optional[str] + ID for an existing Prompt. - endpoint : typing.Optional[ModelEndpoints] - The provider model endpoint used. + endpoint : typing.Optional[ModelEndpoints] + The provider model endpoint used. - template : typing.Optional[PromptRequestTemplateParams] - The template contains the main structure and instructions for the model, including input variables for dynamic values. + template : typing.Optional[PromptRequestTemplateParams] + The template contains the main structure and instructions for the model, including input variables for dynamic values. - For chat models, provide the template as a ChatTemplate (a list of messages), e.g. a system message, followed by a user message with an input variable. - For completion models, provide a prompt template as a string. + For chat models, provide the template as a ChatTemplate (a list of messages), e.g. a system message, followed by a user message with an input variable. + For completion models, provide a prompt template as a string. - Input variables should be specified with double curly bracket syntax: `{{input_name}}`. + Input variables should be specified with double curly bracket syntax: `{{input_name}}`. - provider : typing.Optional[ModelProviders] - The company providing the underlying model service. + provider : typing.Optional[ModelProviders] + The company providing the underlying model service. - max_tokens : typing.Optional[int] - The maximum number of tokens to generate. Provide max_tokens=-1 to dynamically calculate the maximum number of tokens to generate given the length of the prompt + max_tokens : typing.Optional[int] + The maximum number of tokens to generate. Provide max_tokens=-1 to dynamically calculate the maximum number of tokens to generate given the length of the prompt - temperature : typing.Optional[float] - What sampling temperature to use when making a generation. Higher values means the model will be more creative. + temperature : typing.Optional[float] + What sampling temperature to use when making a generation. Higher values means the model will be more creative. - top_p : typing.Optional[float] - An alternative to sampling with temperature, called nucleus sampling, where the model considers the results of the tokens with top_p probability mass. + top_p : typing.Optional[float] + An alternative to sampling with temperature, called nucleus sampling, where the model considers the results of the tokens with top_p probability mass. - stop : typing.Optional[PromptRequestStopParams] - The string (or list of strings) after which the model will stop generating. The returned text will not contain the stop sequence. + stop : typing.Optional[PromptRequestStopParams] + The string (or list of strings) after which the model will stop generating. The returned text will not contain the stop sequence. - presence_penalty : typing.Optional[float] - Number between -2.0 and 2.0. Positive values penalize new tokens based on whether they appear in the generation so far. + presence_penalty : typing.Optional[float] + Number between -2.0 and 2.0. Positive values penalize new tokens based on whether they appear in the generation so far. - frequency_penalty : typing.Optional[float] - Number between -2.0 and 2.0. Positive values penalize new tokens based on how frequently they appear in the generation so far. + frequency_penalty : typing.Optional[float] + Number between -2.0 and 2.0. Positive values penalize new tokens based on how frequently they appear in the generation so far. - other : typing.Optional[typing.Dict[str, typing.Optional[typing.Any]]] - Other parameter values to be passed to the provider call. + other : typing.Optional[typing.Dict[str, typing.Optional[typing.Any]]] + Other parameter values to be passed to the provider call. - seed : typing.Optional[int] - If specified, model will make a best effort to sample deterministically, but it is not guaranteed. + seed : typing.Optional[int] + If specified, model will make a best effort to sample deterministically, but it is not guaranteed. - response_format : typing.Optional[ResponseFormatParams] - The format of the response. Only `{"type": "json_object"}` is currently supported for chat. + response_format : typing.Optional[ResponseFormatParams] + The format of the response. Only `{"type": "json_object"}` is currently supported for chat. - tools : typing.Optional[typing.Sequence[ToolFunctionParams]] - The tool specification that the model can choose to call if Tool calling is supported. + tools : typing.Optional[typing.Sequence[ToolFunctionParams]] + The tool specification that the model can choose to call if Tool calling is supported. - linked_tools : typing.Optional[typing.Sequence[str]] - The IDs of the Tools in your organization that the model can choose to call if Tool calling is supported. The default deployed version of that tool is called. + linked_tools : typing.Optional[typing.Sequence[str]] + The IDs of the Tools in your organization that the model can choose to call if Tool calling is supported. The default deployed version of that tool is called. - attributes : typing.Optional[typing.Dict[str, typing.Optional[typing.Any]]] - Additional fields to describe the Prompt. Helpful to separate Prompt versions from each other with details on how they were created or used. + attributes : typing.Optional[typing.Dict[str, typing.Optional[typing.Any]]] + Additional fields to describe the Prompt. Helpful to separate Prompt versions from each other with details on how they were created or used. - commit_message : typing.Optional[str] - Message describing the changes made. + commit_message : typing.Optional[str] + Message describing the changes made. - request_options : typing.Optional[RequestOptions] - Request-specific configuration. + request_options : typing.Optional[RequestOptions] + Request-specific configuration. - Returns - ------- - PromptResponse - Successful Response + Returns + ------- + PromptResponse + Successful Response - Examples - -------- - import asyncio + Examples + -------- + import asyncio - from humanloop import AsyncHumanloop + from humanloop import AsyncHumanloop - client = AsyncHumanloop( - api_key="YOUR_API_KEY", - ) + client = AsyncHumanloop( + api_key="YOUR_API_KEY", + ) - async def main() -> None: - await client.prompts.upsert( - path="Personal Projects/Coding Assistant", - model="gpt-4o", - endpoint="chat", - template=[ - { - "content": "You are a helpful coding assistant specialising in {{language}}", - "role": "system", - } - ], - provider="openai", - max_tokens=-1, - temperature=0.7, - commit_message="Initial commit", - ) + async def main() -> None: + await client.prompts.upsert( + path="Personal Projects/Coding Assistant", + model="gpt-4o", + endpoint="chat", + template=[ + { + "content": "You are a helpful coding assistant specialising in {{language}}", + "role": "system", + } + ], + provider="openai", + max_tokens=-1, + temperature=0.7, + <<<<<<< HEAD + ======= + top_p=1.0, + presence_penalty=0.0, + frequency_penalty=0.0, + other={}, + tools=[], + linked_tools=[], + >>>>>>> 0799123 (draft) + commit_message="Initial commit", + ) - asyncio.run(main()) + asyncio.run(main()) """ _response = await self._client_wrapper.httpx_client.request( "prompts", diff --git a/src/humanloop/decorators/__init__.py b/src/humanloop/utilities/__init__.py similarity index 100% rename from src/humanloop/decorators/__init__.py rename to src/humanloop/utilities/__init__.py diff --git a/src/humanloop/decorators/flow.py b/src/humanloop/utilities/flow.py similarity index 65% rename from src/humanloop/decorators/flow.py rename to src/humanloop/utilities/flow.py index 51f9c731..f63573ed 100644 --- a/src/humanloop/decorators/flow.py +++ b/src/humanloop/utilities/flow.py @@ -6,11 +6,15 @@ from opentelemetry.trace import Tracer from typing_extensions import Unpack -from humanloop.decorators.helpers import args_to_inputs +from humanloop.utilities.helpers import args_to_inputs from humanloop.eval_utils.types import File -from humanloop.otel import TRACE_FLOW_CONTEXT, FlowContext -from humanloop.otel.constants import HUMANLOOP_FILE_KEY, HUMANLOOP_FILE_TYPE_KEY, HUMANLOOP_LOG_KEY, HUMANLOOP_PATH_KEY -from humanloop.otel.helpers import generate_span_id, jsonify_if_not_string, write_to_opentelemetry_span +from humanloop.otel.constants import ( + HUMANLOOP_FILE_KEY, + HUMANLOOP_FILE_TYPE_KEY, + HUMANLOOP_LOG_KEY, + HUMANLOOP_PATH_KEY, +) +from humanloop.otel.helpers import jsonify_if_not_string, write_to_opentelemetry_span from humanloop.requests import FlowKernelRequestParams as FlowDict from humanloop.requests.flow_kernel_request import FlowKernelRequestParams @@ -28,29 +32,10 @@ def decorator(func: Callable): @wraps(func) def wrapper(*args: Sequence[Any], **kwargs: Mapping[str, Any]) -> Any: span: Span - with opentelemetry_tracer.start_as_current_span(generate_span_id()) as span: # type: ignore - span_id = span.get_span_context().span_id - if span.parent: - span_parent_id = span.parent.span_id - parent_trace_metadata = TRACE_FLOW_CONTEXT.get(span_parent_id) - if parent_trace_metadata: - TRACE_FLOW_CONTEXT[span_id] = FlowContext( - trace_id=span_id, - trace_parent_id=span_parent_id, - is_flow_log=True, - ) - - else: - # The Flow Log is not nested under another Flow Log - # Set the trace_id to the current span_id - TRACE_FLOW_CONTEXT[span_id] = FlowContext( - trace_id=span_id, - trace_parent_id=None, - is_flow_log=True, - ) - + with opentelemetry_tracer.start_as_current_span("humanloop.flow") as span: # type: ignore span.set_attribute(HUMANLOOP_PATH_KEY, path if path else func.__name__) span.set_attribute(HUMANLOOP_FILE_TYPE_KEY, "flow") + if flow_kernel: write_to_opentelemetry_span( span=span, @@ -58,8 +43,6 @@ def wrapper(*args: Sequence[Any], **kwargs: Mapping[str, Any]) -> Any: value=flow_kernel, # type: ignore ) - inputs = args_to_inputs(func, args, kwargs) - # Call the decorated function try: output = func(*args, **kwargs) @@ -78,7 +61,7 @@ def wrapper(*args: Sequence[Any], **kwargs: Mapping[str, Any]) -> Any: error = str(e) flow_log = { - "inputs": inputs, + "inputs": args_to_inputs(func, args, kwargs), "output": output_stringified, "error": error, } diff --git a/src/humanloop/decorators/helpers.py b/src/humanloop/utilities/helpers.py similarity index 100% rename from src/humanloop/decorators/helpers.py rename to src/humanloop/utilities/helpers.py diff --git a/src/humanloop/decorators/prompt.py b/src/humanloop/utilities/prompt.py similarity index 71% rename from src/humanloop/decorators/prompt.py rename to src/humanloop/utilities/prompt.py index c1f68a77..4e0f55f5 100644 --- a/src/humanloop/decorators/prompt.py +++ b/src/humanloop/utilities/prompt.py @@ -6,12 +6,16 @@ from opentelemetry.trace import Tracer from typing_extensions import Unpack -from humanloop.decorators.helpers import args_to_inputs -from humanloop.decorators.types import DecoratorPromptKernelRequestParams +from humanloop.utilities.helpers import args_to_inputs +from humanloop.utilities.types import DecoratorPromptKernelRequestParams from humanloop.eval_utils import File -from humanloop.otel import TRACE_FLOW_CONTEXT, FlowContext -from humanloop.otel.constants import HUMANLOOP_FILE_KEY, HUMANLOOP_FILE_TYPE_KEY, HUMANLOOP_LOG_KEY, HUMANLOOP_PATH_KEY -from humanloop.otel.helpers import generate_span_id, jsonify_if_not_string, write_to_opentelemetry_span +from humanloop.otel.constants import ( + HUMANLOOP_FILE_KEY, + HUMANLOOP_FILE_TYPE_KEY, + HUMANLOOP_LOG_KEY, + HUMANLOOP_PATH_KEY, +) +from humanloop.otel.helpers import jsonify_if_not_string, write_to_opentelemetry_span logger = logging.getLogger("humanloop.sdk") @@ -26,18 +30,7 @@ def decorator(func: Callable): @wraps(func) def wrapper(*args: Sequence[Any], **kwargs: Mapping[str, Any]) -> Any: span: Span - with opentelemetry_tracer.start_as_current_span(generate_span_id()) as span: # type: ignore - span_id = span.get_span_context().span_id - if span.parent: - span_parent_id = span.parent.span_id - parent_trace_metadata = TRACE_FLOW_CONTEXT.get(span_parent_id, {}) - if parent_trace_metadata: - TRACE_FLOW_CONTEXT[span_id] = FlowContext( - trace_id=parent_trace_metadata["trace_id"], - trace_parent_id=span_parent_id, - is_flow_log=False, - ) - + with opentelemetry_tracer.start_as_current_span("humanloop.prompt") as span: # type: ignore span.set_attribute(HUMANLOOP_PATH_KEY, path if path else func.__name__) span.set_attribute(HUMANLOOP_FILE_TYPE_KEY, "prompt") @@ -73,6 +66,7 @@ def wrapper(*args: Sequence[Any], **kwargs: Mapping[str, Any]) -> Any: "output": output_stringified, "error": error, } + write_to_opentelemetry_span( span=span, key=HUMANLOOP_LOG_KEY, diff --git a/src/humanloop/decorators/tool.py b/src/humanloop/utilities/tool.py similarity index 95% rename from src/humanloop/decorators/tool.py rename to src/humanloop/utilities/tool.py index 2752d017..c17903d1 100644 --- a/src/humanloop/decorators/tool.py +++ b/src/humanloop/utilities/tool.py @@ -12,16 +12,15 @@ from opentelemetry.trace import Tracer from typing_extensions import Unpack -from humanloop.decorators.helpers import args_to_inputs +from humanloop.utilities.helpers import args_to_inputs from humanloop.eval_utils import File -from humanloop.otel import TRACE_FLOW_CONTEXT, FlowContext from humanloop.otel.constants import ( HUMANLOOP_FILE_KEY, HUMANLOOP_FILE_TYPE_KEY, HUMANLOOP_LOG_KEY, HUMANLOOP_PATH_KEY, ) -from humanloop.otel.helpers import generate_span_id, jsonify_if_not_string, write_to_opentelemetry_span +from humanloop.otel.helpers import jsonify_if_not_string, write_to_opentelemetry_span from humanloop.requests.tool_function import ToolFunctionParams from humanloop.requests.tool_kernel_request import ToolKernelRequestParams @@ -49,20 +48,7 @@ def decorator(func: Callable): @wraps(func) def wrapper(*args, **kwargs): - with opentelemetry_tracer.start_as_current_span(generate_span_id()) as span: - span_id = span.get_span_context().span_id - if span.parent: - span_parent_id = span.parent.span_id - else: - span_parent_id = None - parent_trace_metadata = TRACE_FLOW_CONTEXT.get(span_parent_id) - if parent_trace_metadata: - TRACE_FLOW_CONTEXT[span_id] = FlowContext( - span_id=span_id, - trace_parent_id=span_parent_id, - is_flow_log=False, - ) - + with opentelemetry_tracer.start_as_current_span("humanloop.tool") as span: # Write the Tool Kernel to the Span on HL_FILE_OT_KEY span.set_attribute(HUMANLOOP_PATH_KEY, path if path else func.__name__) span.set_attribute(HUMANLOOP_FILE_TYPE_KEY, "tool") diff --git a/src/humanloop/decorators/types.py b/src/humanloop/utilities/types.py similarity index 100% rename from src/humanloop/decorators/types.py rename to src/humanloop/utilities/types.py diff --git a/tests/decorators/__init__.py b/tests/utilities/__init__.py similarity index 100% rename from tests/decorators/__init__.py rename to tests/utilities/__init__.py diff --git a/tests/decorators/test_flow_decorator.py b/tests/utilities/test_flow_decorator.py similarity index 57% rename from tests/decorators/test_flow_decorator.py rename to tests/utilities/test_flow_decorator.py index 09a769f6..da895ee0 100644 --- a/tests/decorators/test_flow_decorator.py +++ b/tests/utilities/test_flow_decorator.py @@ -2,20 +2,21 @@ import random import string import time -from unittest.mock import patch +from unittest.mock import patch import pytest -from humanloop.decorators.flow import flow -from humanloop.decorators.prompt import prompt -from humanloop.decorators.tool import tool -from humanloop.otel import TRACE_FLOW_CONTEXT -from humanloop.otel.constants import HUMANLOOP_FILE_KEY -from humanloop.otel.exporter import HumanloopSpanExporter -from humanloop.otel.helpers import read_from_opentelemetry_span from openai import OpenAI from openai.types.chat.chat_completion_message_param import ChatCompletionMessageParam from opentelemetry.sdk.trace import Tracer from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter +from opentelemetry.sdk.trace import ReadableSpan + +from humanloop.utilities.flow import flow +from humanloop.utilities.prompt import prompt +from humanloop.utilities.tool import tool +from humanloop.otel.constants import HUMANLOOP_FILE_KEY +from humanloop.otel.exporter import HumanloopSpanExporter +from humanloop.otel.helpers import read_from_opentelemetry_span def _test_scenario( @@ -85,23 +86,28 @@ def test_decorators_without_flow( ] ) # WHEN exporting the spans + # Wait for the prompt span to be exported; It was waiting + # on the OpenAI call span to finish first + time.sleep(1) spans = exporter.get_finished_spans() - # THEN 3 spans arrive at the exporter in the following order: - # 0. Intercepted OpenAI call, which is ignored by the exporter - # 1. Tool Span (called after the OpenAI call but before the Prompt Span finishes) - # 2. Prompt Span + + # THEN 3 spans arrive at the exporter assert len(spans) == 3 + + for i in range(3): + if spans[i].name == "humanloop.tool": + tool_span = spans[i] + elif spans[i].name == "humanloop.prompt": + prompt_span = spans[i] + assert read_from_opentelemetry_span( - span=spans[1], + span=tool_span, key=HUMANLOOP_FILE_KEY, )["tool"] assert read_from_opentelemetry_span( - span=spans[2], + span=prompt_span, key=HUMANLOOP_FILE_KEY, )["prompt"] - for span in spans: - # THEN no metadata related to trace is present on either of them - assert TRACE_FLOW_CONTEXT.get(span.get_span_context().span_id) is None def test_decorators_with_flow_decorator( @@ -125,28 +131,23 @@ def test_decorators_with_flow_decorator( }, ] ) - # THEN 4 spans arrive at the exporter in the following order: - # 0. Intercepted OpenAI call, which is ignored by the exporter - # 1. Tool Span (called after the OpenAI call but before the Prompt Span finishes) - # 2. Prompt Span - # 3. Flow Span + + # THEN 4 spans arrive at the exporter spans = exporter.get_finished_spans() assert len(spans) == 4 + + for i in range(4): + if spans[i].name == "humanloop.flow": + flow_span = spans[i] + elif spans[i].name == "humanloop.prompt": + prompt_span = spans[i] + elif spans[i].name == "humanloop.tool": + tool_span = spans[i] + # THEN the span are returned bottom to top - assert read_from_opentelemetry_span(span=spans[1], key=HUMANLOOP_FILE_KEY)["tool"] - assert read_from_opentelemetry_span(span=spans[2], key=HUMANLOOP_FILE_KEY)["prompt"] - # assert read_from_opentelemetry_span(span=spans[3], key=HL_FILE_OT_KEY)["flow"] - assert (tool_trace_metadata := TRACE_FLOW_CONTEXT.get(spans[1].get_span_context().span_id)) - assert (prompt_trace_metadata := TRACE_FLOW_CONTEXT.get(spans[2].get_span_context().span_id)) - assert (flow_trace_metadata := TRACE_FLOW_CONTEXT.get(spans[3].get_span_context().span_id)) - # THEN Tool span is a child of Prompt span - assert tool_trace_metadata["trace_parent_id"] == spans[2].context.span_id - assert tool_trace_metadata["is_flow_log"] is False - assert prompt_trace_metadata["trace_parent_id"] == spans[3].context.span_id - # THEN Prompt span is a child of Flow span - assert prompt_trace_metadata["is_flow_log"] is False - assert flow_trace_metadata["is_flow_log"] - assert flow_trace_metadata["trace_id"] == spans[3].context.span_id + assert read_from_opentelemetry_span(span=tool_span, key=HUMANLOOP_FILE_KEY)["tool"] + assert read_from_opentelemetry_span(span=prompt_span, key=HUMANLOOP_FILE_KEY)["prompt"] + assert read_from_opentelemetry_span(span=flow_span, key=HUMANLOOP_FILE_KEY)["flow"] def test_flow_decorator_flow_in_flow( @@ -161,38 +162,29 @@ def test_flow_decorator_flow_in_flow( # WHEN Calling the _test_flow_in_flow function with specific messages _flow_over_flow(call_llm_messages) - # THEN 5 spans are arrive at the exporter in the following order: - # 0. Intercepted OpenAI call, which is ignored by the exporter - # 1. Tool Span (called after the OpenAI call but before the Prompt Span finishes) - # 2. Prompt Span - # 3. Nested Flow Span - # 4. Flow Span + # Wait for the Prompt span to be exported; It was asynchronously waiting + # on the OpenAI call span to finish first + time.sleep(1) + + # THEN 5 spans arrive at the exporter spans = exporter.get_finished_spans() assert len(spans) == 5 - assert read_from_opentelemetry_span(span=spans[1], key=HUMANLOOP_FILE_KEY)["tool"] - assert read_from_opentelemetry_span(span=spans[2], key=HUMANLOOP_FILE_KEY)["prompt"] - assert read_from_opentelemetry_span(span=spans[3], key=HUMANLOOP_FILE_KEY)["flow"] != {} + + for i in range(5): + if spans[i].name == "humanloop.flow" and spans[i].parent is None: + flow_span = spans[i] + elif spans[i].name == "humanloop.flow" and spans[i].parent: + nested_flow_span = spans[i] + elif spans[i].name == "humanloop.prompt": + prompt_span = spans[i] + elif spans[i].name == "humanloop.tool": + tool_span = spans[i] + + assert read_from_opentelemetry_span(span=tool_span, key=HUMANLOOP_FILE_KEY)["tool"] + assert read_from_opentelemetry_span(span=prompt_span, key=HUMANLOOP_FILE_KEY)["prompt"] + assert read_from_opentelemetry_span(span=nested_flow_span, key=HUMANLOOP_FILE_KEY)["flow"] != {} with pytest.raises(KeyError): - read_from_opentelemetry_span(span=spans[4], key=HUMANLOOP_FILE_KEY)["flow"] != {} - - assert (tool_trace_metadata := TRACE_FLOW_CONTEXT.get(spans[1].get_span_context().span_id)) - assert (prompt_trace_metadata := TRACE_FLOW_CONTEXT.get(spans[2].get_span_context().span_id)) - assert (nested_flow_trace_metadata := TRACE_FLOW_CONTEXT.get(spans[3].get_span_context().span_id)) - assert (flow_trace_metadata := TRACE_FLOW_CONTEXT.get(spans[4].get_span_context().span_id)) - # THEN the parent of the Tool Log is the Prompt Log - assert tool_trace_metadata["trace_parent_id"] == spans[2].context.span_id - assert tool_trace_metadata["is_flow_log"] is False - # THEN the parent of the Prompt Log is the Flow Log - assert prompt_trace_metadata["trace_parent_id"] == spans[3].context.span_id - assert prompt_trace_metadata["is_flow_log"] is False - # THEN the nested Flow Log creates a new trace - assert nested_flow_trace_metadata["trace_id"] == spans[3].context.span_id - assert nested_flow_trace_metadata["is_flow_log"] - # THEN the parent of the nested Flow Log is the upper Flow Log - assert nested_flow_trace_metadata["trace_parent_id"] == spans[4].context.span_id - # THEN the parent Flow Log correctly points to itself - assert flow_trace_metadata["trace_id"] == spans[4].context.span_id - assert flow_trace_metadata["is_flow_log"] + read_from_opentelemetry_span(span=flow_span, key=HUMANLOOP_FILE_KEY)["flow"] != {} def test_flow_decorator_with_hl_exporter( @@ -212,17 +204,17 @@ def test_flow_decorator_with_hl_exporter( # Exporter is threaded, need to wait threads shutdown time.sleep(3) - # THEN 4 spans are arrive at the exporter in the following order: - # 0. Intercepted OpenAI call, which is ignored by the exporter - # 1. Tool Span (called after the OpenAI call but before the Prompt Span finishes) - # 2. Prompt Span - # 3. Flow Span assert len(mock_export_method.call_args_list) == 4 - tool_span = mock_export_method.call_args_list[1][0][0][0] - prompt_span = mock_export_method.call_args_list[2][0][0][0] - flow_span = mock_export_method.call_args_list[3][0][0][0] - # THEN the last uploaded span is the Flow + for i in range(4): + span = mock_export_method.call_args_list[i][0][0][0] + if span.name == "humanloop.flow": + flow_span = span + elif span.name == "humanloop.prompt": + prompt_span = span + elif span.name == "humanloop.tool": + tool_span = span + assert read_from_opentelemetry_span( span=flow_span, key=HUMANLOOP_FILE_KEY, @@ -241,8 +233,6 @@ def test_flow_decorator_with_hl_exporter( key=HUMANLOOP_FILE_KEY, ) - # NOTE: The type: ignore comments are caused by the MagicMock used to mock the HTTP client - # THEN the first Log uploaded is the Flow first_log = exporter._client.flows.log.call_args_list[0][1] # type: ignore assert "flow" in first_log @@ -280,19 +270,18 @@ def test_flow_decorator_hl_exporter_flow_inside_flow( # Exporter is threaded, need to wait threads shutdown time.sleep(3) - # THEN 5 spans are arrive at the exporter in the following order: - # 0. Intercepted OpenAI call, which is ignored by the exporter - # 1. Tool Span (called after the OpenAI call but before the Prompt Span finishes) - # 2. Prompt Span - # 3. Nested Flow Span - # 4. Flow Span + # THEN 5 spans are arrive at the exporter assert len(mock_export_method.call_args_list) == 5 - # THEN the last uploaded span is the larger Flow - # THEN the second to last uploaded span is the nested Flow - flow_span = mock_export_method.call_args_list[4][0][0][0] - nested_flow_span = mock_export_method.call_args_list[3][0][0][0] - assert (last_span_flow_metadata := TRACE_FLOW_CONTEXT.get(flow_span.get_span_context().span_id)) - assert (flow_span_flow_metadata := TRACE_FLOW_CONTEXT.get(nested_flow_span.get_span_context().span_id)) - assert flow_span_flow_metadata["trace_parent_id"] == flow_span.context.span_id - assert last_span_flow_metadata["is_flow_log"] - assert flow_span_flow_metadata["is_flow_log"] + + # THEN one of the flows is nested inside the other + spans: list[ReadableSpan] = [mock_export_method.call_args_list[i][0][0][0] for i in range(1, 5)] + counter = 0 + for span in spans: + if span.name == "humanloop.flow": + counter += 1 + if span.parent: + nested_flow_span = span + else: + flow_span = span + # We are certain span_id exists for these 2 spans + assert nested_flow_span.parent.span_id == flow_span.context.span_id # type: ignore diff --git a/tests/decorators/test_prompt_decorator.py b/tests/utilities/test_prompt_decorator.py similarity index 94% rename from tests/decorators/test_prompt_decorator.py rename to tests/utilities/test_prompt_decorator.py index 23c4fb64..96bffeda 100644 --- a/tests/decorators/test_prompt_decorator.py +++ b/tests/utilities/test_prompt_decorator.py @@ -1,4 +1,5 @@ import os +import time from typing import Optional import cohere @@ -11,7 +12,7 @@ from dotenv import load_dotenv from groq import Groq from groq import NotFoundError as GroqNotFoundError -from humanloop.decorators.prompt import prompt +from humanloop.utilities.prompt import prompt from humanloop.otel.constants import HUMANLOOP_FILE_KEY from humanloop.otel.helpers import is_humanloop_span, read_from_opentelemetry_span from humanloop.types.model_providers import ModelProviders @@ -158,6 +159,11 @@ def test_prompt_decorator( model=model, messages=call_llm_messages, ) + + # Wait for the Prompt span to be exported, it is waiting + # asynchronously for the LLM provider call span to finish + time.sleep(1) + # THEN two spans are created: one for the OpenAI LLM provider call and one for the Prompt spans = exporter.get_finished_spans() assert len(spans) == 2 @@ -189,7 +195,13 @@ def test_prompt_decorator_with_hl_processor( model=model, messages=call_llm_messages, ) + # THEN two spans are created: one for the OpenAI LLM provider call and one for the Prompt + + # Wait for the Prompt span to be exported, it is waiting + # asynchronously for the LLM provider call span to finish + time.sleep(1) + spans = exporter.get_finished_spans() assert len(spans) == 2 assert not is_humanloop_span(span=spans[0]) @@ -237,6 +249,11 @@ def test_prompt_decorator_with_defaults( model=model, messages=call_llm_messages, ) + + # Wait for the Prompt span to be exported, it is waiting + # asynchronously for the LLM provider call span to finish + time.sleep(1) + spans = exporter.get_finished_spans() # THEN the Prompt span is enhanced with information and forms a correct PromptKernel prompt = PromptKernelRequest.model_validate( @@ -289,6 +306,10 @@ def test_prompt_attributes( messages=call_llm_messages, ) + # Wait for the Prompt span to be exported, it is waiting + # asynchronously for the LLM provider call span to finish + time.sleep(1) + assert len(exporter.get_finished_spans()) == 2 prompt_kernel = PromptKernelRequest.model_validate( diff --git a/tests/decorators/test_tool_decorator.py b/tests/utilities/test_tool_decorator.py similarity index 99% rename from tests/decorators/test_tool_decorator.py rename to tests/utilities/test_tool_decorator.py index 2f4db209..983c93f6 100644 --- a/tests/decorators/test_tool_decorator.py +++ b/tests/utilities/test_tool_decorator.py @@ -1,8 +1,9 @@ import sys +import time from typing import Any, Optional, TypedDict, Union import pytest -from humanloop.decorators.tool import tool +from humanloop.utilities.tool import tool from humanloop.otel.constants import HUMANLOOP_FILE_KEY, HUMANLOOP_LOG_KEY from humanloop.otel.helpers import read_from_opentelemetry_span from jsonschema.protocols import Validator @@ -450,6 +451,9 @@ def calculator(operation: str, num1: float, num2: float) -> float: higher_order_fn_tool(operation="add", num1=1, num2=2) calculator(operation="add", num1=1, num2=2) + # Processor handles HL spans asynchronously, wait for them + time.sleep(1) + assert len(spans := exporter.get_finished_spans()) == 2 hl_file_higher_order_fn = read_from_opentelemetry_span(