From 13852f2b1942d4065a0bd87f02359423b13f1807 Mon Sep 17 00:00:00 2001 From: Hansehart <97880342+Hansehart@users.noreply.github.com> Date: Thu, 13 Nov 2025 22:16:29 +0100 Subject: [PATCH 1/2] add: langfuse log levels --- .../tracing/langfuse/tracer.py | 50 ++++++++++++++++-- integrations/langfuse/tests/test_tracer.py | 51 +++++++++++++++++++ 2 files changed, 96 insertions(+), 5 deletions(-) diff --git a/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py b/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py index 5c4fda0f6..d97247a61 100644 --- a/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py +++ b/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py @@ -4,6 +4,7 @@ import contextlib import os +import sys from abc import ABC, abstractmethod from collections import Counter from contextlib import AbstractContextManager @@ -298,7 +299,9 @@ def create_span(self, context: SpanContext) -> LangfuseSpan: ) # Create a new trace when there's no parent span span_context_manager = self.tracer.start_as_current_observation( - name=context.trace_name, version=tracing_ctx.get("version"), as_type=root_span_type + name=context.trace_name, + version=tracing_ctx.get("version"), + as_type=root_span_type, ) # Create LangfuseSpan which will handle entering the context manager @@ -433,7 +436,10 @@ def __init__( @contextlib.contextmanager def trace( - self, operation_name: str, tags: Optional[Dict[str, Any]] = None, parent_span: Optional[Span] = None + self, + operation_name: str, + tags: Optional[Dict[str, Any]] = None, + parent_span: Optional[Span] = None, ) -> Iterator[Span]: tags = tags or {} span_name = tags.get(_COMPONENT_NAME_KEY, operation_name) @@ -465,16 +471,50 @@ def trace( try: yield span - finally: - # Always clean up context, even if nested operations fail + except Exception: + # Exception occurred - capture exception info and pass to __exit__ + # This allows Langfuse/OpenTelemetry to properly mark the span with ERROR level + exc_info = sys.exc_info() try: # Process span data (may fail with nested pipeline exceptions) self._span_handler.handle(span, component_type) - # End span (may fail if span data is corrupted) + # End span with exception info (may fail if span data is corrupted) + raw_span = span.raw_span() + if span._context_manager is not None: + # Pass actual exception info to mark span as failed with ERROR level + span._context_manager.__exit__(*exc_info) + elif hasattr(raw_span, "end"): + # Only call end() if it's not a context manager + raw_span.end() + except Exception as cleanup_error: + # Log cleanup errors but don't let them corrupt context + logger.warning( + "Error during span cleanup for {operation_name}: {cleanup_error}", + operation_name=operation_name, + cleanup_error=cleanup_error, + ) + finally: + # Restore previous span stack using saved token - ensures proper cleanup + span_stack_var.reset(token) + + if self.enforce_flush: + self.flush() + + # Re-raise the original exception + raise + else: + # No exception - clean exit with success status + # This preserves any manually-set log levels (WARNING, DEBUG) + try: + # Process span data + self._span_handler.handle(span, component_type) + + # End span successfully raw_span = span.raw_span() # In v3, we need to properly exit context managers if span._context_manager is not None: + # No exception - pass None to indicate success span._context_manager.__exit__(None, None, None) elif hasattr(raw_span, "end"): # Only call end() if it's not a context manager diff --git a/integrations/langfuse/tests/test_tracer.py b/integrations/langfuse/tests/test_tracer.py index e09b73571..a0d764f4b 100644 --- a/integrations/langfuse/tests/test_tracer.py +++ b/integrations/langfuse/tests/test_tracer.py @@ -690,3 +690,54 @@ async def run_concurrent_traces(): assert task2_spans[1][2] == task2_inner # current_span during inner assert task2_spans[2][2] == task2_outer # current_span after inner assert task2_spans[3][2] is None # current_span after outer + + def test_trace_exception_handling(self): + """ + Test that exceptions are properly captured and passed to span __exit__. + + This verifies the new exception handling behavior where: + - Exception case: __exit__() receives (exc_type, exc_val, exc_tb) + - Success case: __exit__() receives (None, None, None) + """ + # Create a mock context manager that tracks how __exit__ was called + mock_exit_calls = [] + + class TrackingContextManager: + def __init__(self): + self._span = MockSpan() + + def __enter__(self): + return self._span + + def __exit__(self, exc_type, exc_val, exc_tb): + # Track what was passed to __exit__ + mock_exit_calls.append((exc_type, exc_val, exc_tb)) + return False # Don't suppress exceptions + + mock_client = MockLangfuseClient() + mock_client._mock_context_manager = TrackingContextManager() + + tracer = LangfuseTracer(tracer=mock_client, name="Test", public=False) + + # Test 1: Exception case - __exit__ should receive exception info + mock_exit_calls.clear() + error_msg = "test error" + with pytest.raises(ValueError, match="test error"): + with tracer.trace("test_operation"): + raise ValueError(error_msg) + + assert len(mock_exit_calls) == 1 + assert mock_exit_calls[0][0] is ValueError # exc_type + assert str(mock_exit_calls[0][1]) == error_msg # exc_val + assert mock_exit_calls[0][2] is not None # exc_tb (traceback) + + # Test 2: Success case - __exit__ should receive (None, None, None) + mock_exit_calls.clear() + with tracer.trace("test_operation"): + pass # No exception + + assert len(mock_exit_calls) == 1 + assert mock_exit_calls[0] == (None, None, None) + + # Test 3: Verify span stack is cleaned up after exception + assert tracer.current_span() is None From 4eec961a1055fce713b0c8f0de1e64d584f78acd Mon Sep 17 00:00:00 2001 From: Hansehart <97880342+Hansehart@users.noreply.github.com> Date: Tue, 18 Nov 2025 22:33:09 +0000 Subject: [PATCH 2/2] fix: dry finally block --- .../haystack_integrations/tracing/langfuse/tracer.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py b/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py index 6a75837ef..79ea97481 100644 --- a/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py +++ b/integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py @@ -492,12 +492,6 @@ def trace( operation_name=operation_name, cleanup_error=cleanup_error, ) - finally: - # Restore previous span stack using saved token - ensures proper cleanup - span_stack_var.reset(token) - - if self.enforce_flush: - self.flush() # Re-raise the original exception raise @@ -524,9 +518,9 @@ def trace( operation_name=operation_name, cleanup_error=cleanup_error, ) - finally: - # Restore previous span stack using saved token - ensures proper cleanup - span_stack_var.reset(token) + finally: + # Restore previous span stack using saved token + span_stack_var.reset(token) if self.enforce_flush: self.flush()