From 56f6d8e03fa8b3e8122d5c1fbccb9b130f4e8c4e Mon Sep 17 00:00:00 2001 From: majanjua-amzn Date: Thu, 16 Oct 2025 09:54:25 -0700 Subject: [PATCH] feat: implement on ending in span processor --- CHANGELOG.md | 2 + .../src/opentelemetry/sdk/trace/__init__.py | 21 +++ .../sdk/trace/export/__init__.py | 6 + .../tests/trace/test_span_processor.py | 172 ++++++++++++++++++ opentelemetry-sdk/tests/trace/test_trace.py | 20 ++ 5 files changed, 221 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 15001c9e9ad..dce795b44e8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Add `opentelemetry-exporter-credential-provider-gcp` as an optional dependency to `opentelemetry-exporter-otlp-proto-grpc` and `opentelemetry-exporter-otlp-proto-http` ([#4760](https://github.com/open-telemetry/opentelemetry-python/pull/4760)) +- feat: implement on ending in span processor + ([#4775](https://github.com/open-telemetry/opentelemetry-python/pull/4775)) - semantic-conventions: Bump to 1.38.0 ([#4791](https://github.com/open-telemetry/opentelemetry-python/pull/4791)) - [BREAKING] Remove LogData and extend SDK LogRecord to have instrumentation scope diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py index a1c0576520e..0e7e1f6db3b 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py @@ -113,6 +113,16 @@ def on_start( parent_context: The parent context of the span that just started. """ + def _on_ending(self, span: "Span") -> None: + """Called when a :class:`opentelemetry.trace.Span` is ending. + + This method is called synchronously on the thread that ends the + span, therefore it should not block or throw an exception. + + Args: + span: The :class:`opentelemetry.trace.Span` that is ending. + """ + def on_end(self, span: "ReadableSpan") -> None: """Called when a :class:`opentelemetry.trace.Span` is ended. @@ -170,6 +180,11 @@ def on_start( for sp in self._span_processors: sp.on_start(span, parent_context=parent_context) + def _on_ending(self, span: "Span") -> None: + for sp in self._span_processors: + # pylint: disable=protected-access + sp._on_ending(span) + def on_end(self, span: "ReadableSpan") -> None: for sp in self._span_processors: sp.on_end(span) @@ -254,6 +269,10 @@ def on_start( lambda sp: sp.on_start, span, parent_context=parent_context ) + def _on_ending(self, span: "Span") -> None: + # pylint: disable=protected-access + self._submit_and_await(lambda sp: sp._on_ending, span) + def on_end(self, span: "ReadableSpan") -> None: self._submit_and_await(lambda sp: sp.on_end, span) @@ -945,6 +964,8 @@ def end(self, end_time: Optional[int] = None) -> None: self._end_time = end_time if end_time is not None else time_ns() + # pylint: disable=protected-access + self._span_processor._on_ending(self) self._span_processor.on_end(self._readable_span()) @_check_span_ended diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py index 3c28463afbb..a9108b7337a 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py @@ -101,6 +101,9 @@ def on_start( ) -> None: pass + def _on_ending(self, span: Span) -> None: + pass + def on_end(self, span: ReadableSpan) -> None: if not (span.context and span.context.trace_flags.sampled): return @@ -187,6 +190,9 @@ def on_start( ) -> None: pass + def _on_ending(self, span: Span) -> None: + pass + def on_end(self, span: ReadableSpan) -> None: if not (span.context and span.context.trace_flags.sampled): return diff --git a/opentelemetry-sdk/tests/trace/test_span_processor.py b/opentelemetry-sdk/tests/trace/test_span_processor.py index c672d4ce102..d1cf1e3df00 100644 --- a/opentelemetry-sdk/tests/trace/test_span_processor.py +++ b/opentelemetry-sdk/tests/trace/test_span_processor.py @@ -32,6 +32,10 @@ def span_event_start_fmt(span_processor_name, span_name): return span_processor_name + ":" + span_name + ":start" +def span_event_ending_fmt(span_processor_name, span_name): + return span_processor_name + ":" + span_name + ":ending" + + def span_event_end_fmt(span_processor_name, span_name): return span_processor_name + ":" + span_name + ":end" @@ -50,6 +54,11 @@ def on_end(self, span: "trace.Span") -> None: self.span_list.append(span_event_end_fmt(self.name, span.name)) +class MyExtendedSpanProcessor(MySpanProcessor): + def _on_ending(self, span: "trace.Span") -> None: + self.span_list.append(span_event_ending_fmt(self.name, span.name)) + + class TestSpanProcessor(unittest.TestCase): def test_span_processor(self): tracer_provider = trace.TracerProvider() @@ -120,6 +129,85 @@ def test_span_processor(self): # compare if two lists are the same self.assertListEqual(spans_calls_list, expected_list) + # pylint: disable=too-many-statements + def test_span_processor_with_on_ending(self): + tracer_provider = trace.TracerProvider() + tracer = tracer_provider.get_tracer(__name__) + + spans_calls_list = [] # filled by MySpanProcessor + expected_list = [] # filled by hand + + # Span processors are created but not added to the tracer yet + sp1 = MyExtendedSpanProcessor("SP1", spans_calls_list) + sp2 = MyExtendedSpanProcessor("SP2", spans_calls_list) + + with tracer.start_as_current_span("foo"): + with tracer.start_as_current_span("bar"): + with tracer.start_as_current_span("baz"): + pass + + # at this point lists must be empty + self.assertEqual(len(spans_calls_list), 0) + + # add single span processor + tracer_provider.add_span_processor(sp1) + + with tracer.start_as_current_span("foo"): + expected_list.append(span_event_start_fmt("SP1", "foo")) + + with tracer.start_as_current_span("bar"): + expected_list.append(span_event_start_fmt("SP1", "bar")) + + with tracer.start_as_current_span("baz"): + expected_list.append(span_event_start_fmt("SP1", "baz")) + + expected_list.append(span_event_ending_fmt("SP1", "baz")) + expected_list.append(span_event_end_fmt("SP1", "baz")) + + expected_list.append(span_event_ending_fmt("SP1", "bar")) + expected_list.append(span_event_end_fmt("SP1", "bar")) + + expected_list.append(span_event_ending_fmt("SP1", "foo")) + expected_list.append(span_event_end_fmt("SP1", "foo")) + + self.assertListEqual(spans_calls_list, expected_list) + + spans_calls_list.clear() + expected_list.clear() + + # go for multiple span processors + tracer_provider.add_span_processor(sp2) + + with tracer.start_as_current_span("foo"): + expected_list.append(span_event_start_fmt("SP1", "foo")) + expected_list.append(span_event_start_fmt("SP2", "foo")) + + with tracer.start_as_current_span("bar"): + expected_list.append(span_event_start_fmt("SP1", "bar")) + expected_list.append(span_event_start_fmt("SP2", "bar")) + + with tracer.start_as_current_span("baz"): + expected_list.append(span_event_start_fmt("SP1", "baz")) + expected_list.append(span_event_start_fmt("SP2", "baz")) + + expected_list.append(span_event_ending_fmt("SP1", "baz")) + expected_list.append(span_event_ending_fmt("SP2", "baz")) + expected_list.append(span_event_end_fmt("SP1", "baz")) + expected_list.append(span_event_end_fmt("SP2", "baz")) + + expected_list.append(span_event_ending_fmt("SP1", "bar")) + expected_list.append(span_event_ending_fmt("SP2", "bar")) + expected_list.append(span_event_end_fmt("SP1", "bar")) + expected_list.append(span_event_end_fmt("SP2", "bar")) + + expected_list.append(span_event_ending_fmt("SP1", "foo")) + expected_list.append(span_event_ending_fmt("SP2", "foo")) + expected_list.append(span_event_end_fmt("SP1", "foo")) + expected_list.append(span_event_end_fmt("SP2", "foo")) + + # compare if two lists are the same + self.assertListEqual(spans_calls_list, expected_list) + def test_add_span_processor_after_span_creation(self): tracer_provider = trace.TracerProvider() tracer = tracer_provider.get_tracer(__name__) @@ -144,6 +232,37 @@ def test_add_span_processor_after_span_creation(self): self.assertListEqual(spans_calls_list, expected_list) + def test_on_ending_not_implemented_does_not_raise(self): + tracer_provider = trace.TracerProvider() + tracer = tracer_provider.get_tracer(__name__) + + spans_calls_list = [] # filled by MySpanProcessor + expected_list = [] # filled by hand + + # Does not implement _on_ending + sp = MySpanProcessor("SP1", spans_calls_list) + tracer_provider.add_span_processor(sp) + + try: + with tracer.start_as_current_span("foo"): + expected_list.append(span_event_start_fmt("SP1", "foo")) + + with tracer.start_as_current_span("bar"): + expected_list.append(span_event_start_fmt("SP1", "bar")) + + with tracer.start_as_current_span("baz"): + expected_list.append( + span_event_start_fmt("SP1", "baz") + ) + + expected_list.append(span_event_end_fmt("SP1", "baz")) + expected_list.append(span_event_end_fmt("SP1", "bar")) + expected_list.append(span_event_end_fmt("SP1", "foo")) + except NotImplementedError: + self.fail("_on_ending() should not raise an exception") + + self.assertListEqual(spans_calls_list, expected_list) + class MultiSpanProcessorTestBase(abc.ABC): @abc.abstractmethod @@ -176,6 +295,22 @@ def test_on_start(self): ) multi_processor.shutdown() + def test_on_ending(self): + multi_processor = self.create_multi_span_processor() + + mocks = [mock.Mock(spec=trace.SpanProcessor) for _ in range(0, 5)] + for mock_processor in mocks: + multi_processor.add_span_processor(mock_processor) + + span = self.create_default_span() + # pylint: disable=protected-access + multi_processor._on_ending(span) + + for mock_processor in mocks: + # pylint: disable=protected-access + mock_processor._on_ending.assert_called_once_with(span) + multi_processor.shutdown() + def test_on_end(self): multi_processor = self.create_multi_span_processor() @@ -219,6 +354,43 @@ def test_force_flush(self): self.assertEqual(1, mock_processor.force_flush.call_count) multi_processor.shutdown() + def test_on_ending_not_implemented_does_not_raise(self): + tracer_provider = trace.TracerProvider() + tracer = tracer_provider.get_tracer(__name__) + + spans_calls_list = [] # filled by MySpanProcessor + expected_list = [] # filled by hand + + multi_processor = self.create_multi_span_processor() + # Does not implement _on_ending + multi_processor.add_span_processor( + MySpanProcessor("SP1", spans_calls_list) + ) + + tracer_provider.add_span_processor(multi_processor) + + try: + with tracer.start_as_current_span("foo"): + expected_list.append(span_event_start_fmt("SP1", "foo")) + + with tracer.start_as_current_span("bar"): + expected_list.append(span_event_start_fmt("SP1", "bar")) + + with tracer.start_as_current_span("baz"): + expected_list.append( + span_event_start_fmt("SP1", "baz") + ) + + expected_list.append(span_event_end_fmt("SP1", "baz")) + expected_list.append(span_event_end_fmt("SP1", "bar")) + expected_list.append(span_event_end_fmt("SP1", "foo")) + except NotImplementedError: + # pylint: disable=no-member + self.fail("_on_ending() should not raise an exception") + + # pylint: disable=no-member + self.assertListEqual(spans_calls_list, expected_list) + class TestSynchronousMultiSpanProcessor( MultiSpanProcessorTestBase, unittest.TestCase diff --git a/opentelemetry-sdk/tests/trace/test_trace.py b/opentelemetry-sdk/tests/trace/test_trace.py index 7b23c11fa1f..b83b000f4d1 100644 --- a/opentelemetry-sdk/tests/trace/test_trace.py +++ b/opentelemetry-sdk/tests/trace/test_trace.py @@ -1428,6 +1428,10 @@ def span_event_start_fmt(span_processor_name, span_name): return span_processor_name + ":" + span_name + ":start" +def span_event_ending_fmt(span_processor_name, span_name): + return span_processor_name + ":" + span_name + ":ending" + + def span_event_end_fmt(span_processor_name, span_name): return span_processor_name + ":" + span_name + ":end" @@ -1442,11 +1446,15 @@ def on_start( ) -> None: self.span_list.append(span_event_start_fmt(self.name, span.name)) + def _on_ending(self, span: "trace.ReadableSpan") -> None: + self.span_list.append(span_event_ending_fmt(self.name, span.name)) + def on_end(self, span: "trace.ReadableSpan") -> None: self.span_list.append(span_event_end_fmt(self.name, span.name)) class TestSpanProcessor(unittest.TestCase): + # pylint: disable=too-many-statements def test_span_processor(self): tracer_provider = trace.TracerProvider() tracer = tracer_provider.get_tracer(__name__) @@ -1478,10 +1486,13 @@ def test_span_processor(self): with tracer.start_as_current_span("baz"): expected_list.append(span_event_start_fmt("SP1", "baz")) + expected_list.append(span_event_ending_fmt("SP1", "baz")) expected_list.append(span_event_end_fmt("SP1", "baz")) + expected_list.append(span_event_ending_fmt("SP1", "bar")) expected_list.append(span_event_end_fmt("SP1", "bar")) + expected_list.append(span_event_ending_fmt("SP1", "foo")) expected_list.append(span_event_end_fmt("SP1", "foo")) self.assertListEqual(spans_calls_list, expected_list) @@ -1504,12 +1515,18 @@ def test_span_processor(self): expected_list.append(span_event_start_fmt("SP1", "baz")) expected_list.append(span_event_start_fmt("SP2", "baz")) + expected_list.append(span_event_ending_fmt("SP1", "baz")) + expected_list.append(span_event_ending_fmt("SP2", "baz")) expected_list.append(span_event_end_fmt("SP1", "baz")) expected_list.append(span_event_end_fmt("SP2", "baz")) + expected_list.append(span_event_ending_fmt("SP1", "bar")) + expected_list.append(span_event_ending_fmt("SP2", "bar")) expected_list.append(span_event_end_fmt("SP1", "bar")) expected_list.append(span_event_end_fmt("SP2", "bar")) + expected_list.append(span_event_ending_fmt("SP1", "foo")) + expected_list.append(span_event_ending_fmt("SP2", "foo")) expected_list.append(span_event_end_fmt("SP1", "foo")) expected_list.append(span_event_end_fmt("SP2", "foo")) @@ -1532,10 +1549,13 @@ def test_add_span_processor_after_span_creation(self): # add span processor after spans have been created tracer_provider.add_span_processor(sp) + expected_list.append(span_event_ending_fmt("SP1", "baz")) expected_list.append(span_event_end_fmt("SP1", "baz")) + expected_list.append(span_event_ending_fmt("SP1", "bar")) expected_list.append(span_event_end_fmt("SP1", "bar")) + expected_list.append(span_event_ending_fmt("SP1", "foo")) expected_list.append(span_event_end_fmt("SP1", "foo")) self.assertListEqual(spans_calls_list, expected_list)