Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Add `opentelemetry-exporter-credential-provider-gcp` as an optional dependency to `opentelemetry-exporter-otlp-proto-grpc`
and `opentelemetry-exporter-otlp-proto-http`
([#4760](https://github.com/open-telemetry/opentelemetry-python/pull/4760))
- feat: implement on ending in span processor
([#4775](https://github.com/open-telemetry/opentelemetry-python/pull/4775))
Comment on lines +24 to +25
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this could go to bottom of Unreleased list for chronological order

- semantic-conventions: Bump to 1.38.0
([#4791](https://github.com/open-telemetry/opentelemetry-python/pull/4791))
- [BREAKING] Remove LogData and extend SDK LogRecord to have instrumentation scope
Expand Down
21 changes: 21 additions & 0 deletions opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,16 @@ def on_start(
parent_context: The parent context of the span that just started.
"""

def _on_ending(self, span: "Span") -> None:
"""Called when a :class:`opentelemetry.trace.Span` is ending.

This method is called synchronously on the thread that ends the
span, therefore it should not block or throw an exception.

Args:
span: The :class:`opentelemetry.trace.Span` that is ending.
"""

def on_end(self, span: "ReadableSpan") -> None:
"""Called when a :class:`opentelemetry.trace.Span` is ended.

Expand Down Expand Up @@ -170,6 +180,11 @@ def on_start(
for sp in self._span_processors:
sp.on_start(span, parent_context=parent_context)

def _on_ending(self, span: "Span") -> None:
for sp in self._span_processors:
# pylint: disable=protected-access
sp._on_ending(span)

def on_end(self, span: "ReadableSpan") -> None:
for sp in self._span_processors:
sp.on_end(span)
Expand Down Expand Up @@ -254,6 +269,10 @@ def on_start(
lambda sp: sp.on_start, span, parent_context=parent_context
)

def _on_ending(self, span: "Span") -> None:
# pylint: disable=protected-access
self._submit_and_await(lambda sp: sp._on_ending, span)

def on_end(self, span: "ReadableSpan") -> None:
self._submit_and_await(lambda sp: sp.on_end, span)

Expand Down Expand Up @@ -945,6 +964,8 @@ def end(self, end_time: Optional[int] = None) -> None:

self._end_time = end_time if end_time is not None else time_ns()

# pylint: disable=protected-access
self._span_processor._on_ending(self)
self._span_processor.on_end(self._readable_span())

@_check_span_ended
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ def on_start(
) -> None:
pass

def _on_ending(self, span: Span) -> None:
pass

def on_end(self, span: ReadableSpan) -> None:
if not (span.context and span.context.trace_flags.sampled):
return
Expand Down Expand Up @@ -187,6 +190,9 @@ def on_start(
) -> None:
pass

def _on_ending(self, span: Span) -> None:
pass

def on_end(self, span: ReadableSpan) -> None:
if not (span.context and span.context.trace_flags.sampled):
return
Expand Down
172 changes: 172 additions & 0 deletions opentelemetry-sdk/tests/trace/test_span_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ def span_event_start_fmt(span_processor_name, span_name):
return span_processor_name + ":" + span_name + ":start"


def span_event_ending_fmt(span_processor_name, span_name):
return span_processor_name + ":" + span_name + ":ending"


def span_event_end_fmt(span_processor_name, span_name):
return span_processor_name + ":" + span_name + ":end"

Expand All @@ -50,6 +54,11 @@ def on_end(self, span: "trace.Span") -> None:
self.span_list.append(span_event_end_fmt(self.name, span.name))


class MyExtendedSpanProcessor(MySpanProcessor):
def _on_ending(self, span: "trace.Span") -> None:
self.span_list.append(span_event_ending_fmt(self.name, span.name))


class TestSpanProcessor(unittest.TestCase):
def test_span_processor(self):
tracer_provider = trace.TracerProvider()
Expand Down Expand Up @@ -120,6 +129,85 @@ def test_span_processor(self):
# compare if two lists are the same
self.assertListEqual(spans_calls_list, expected_list)

# pylint: disable=too-many-statements
def test_span_processor_with_on_ending(self):
tracer_provider = trace.TracerProvider()
tracer = tracer_provider.get_tracer(__name__)

spans_calls_list = [] # filled by MySpanProcessor
expected_list = [] # filled by hand

# Span processors are created but not added to the tracer yet
sp1 = MyExtendedSpanProcessor("SP1", spans_calls_list)
sp2 = MyExtendedSpanProcessor("SP2", spans_calls_list)

with tracer.start_as_current_span("foo"):
with tracer.start_as_current_span("bar"):
with tracer.start_as_current_span("baz"):
pass

# at this point lists must be empty
self.assertEqual(len(spans_calls_list), 0)

# add single span processor
tracer_provider.add_span_processor(sp1)

with tracer.start_as_current_span("foo"):
expected_list.append(span_event_start_fmt("SP1", "foo"))

with tracer.start_as_current_span("bar"):
expected_list.append(span_event_start_fmt("SP1", "bar"))

with tracer.start_as_current_span("baz"):
expected_list.append(span_event_start_fmt("SP1", "baz"))

expected_list.append(span_event_ending_fmt("SP1", "baz"))
expected_list.append(span_event_end_fmt("SP1", "baz"))

expected_list.append(span_event_ending_fmt("SP1", "bar"))
expected_list.append(span_event_end_fmt("SP1", "bar"))

expected_list.append(span_event_ending_fmt("SP1", "foo"))
expected_list.append(span_event_end_fmt("SP1", "foo"))

self.assertListEqual(spans_calls_list, expected_list)

spans_calls_list.clear()
expected_list.clear()

# go for multiple span processors
tracer_provider.add_span_processor(sp2)

with tracer.start_as_current_span("foo"):
expected_list.append(span_event_start_fmt("SP1", "foo"))
expected_list.append(span_event_start_fmt("SP2", "foo"))

with tracer.start_as_current_span("bar"):
expected_list.append(span_event_start_fmt("SP1", "bar"))
expected_list.append(span_event_start_fmt("SP2", "bar"))

with tracer.start_as_current_span("baz"):
expected_list.append(span_event_start_fmt("SP1", "baz"))
expected_list.append(span_event_start_fmt("SP2", "baz"))

expected_list.append(span_event_ending_fmt("SP1", "baz"))
expected_list.append(span_event_ending_fmt("SP2", "baz"))
expected_list.append(span_event_end_fmt("SP1", "baz"))
expected_list.append(span_event_end_fmt("SP2", "baz"))

expected_list.append(span_event_ending_fmt("SP1", "bar"))
expected_list.append(span_event_ending_fmt("SP2", "bar"))
expected_list.append(span_event_end_fmt("SP1", "bar"))
expected_list.append(span_event_end_fmt("SP2", "bar"))

expected_list.append(span_event_ending_fmt("SP1", "foo"))
expected_list.append(span_event_ending_fmt("SP2", "foo"))
expected_list.append(span_event_end_fmt("SP1", "foo"))
expected_list.append(span_event_end_fmt("SP2", "foo"))

# compare if two lists are the same
self.assertListEqual(spans_calls_list, expected_list)

def test_add_span_processor_after_span_creation(self):
tracer_provider = trace.TracerProvider()
tracer = tracer_provider.get_tracer(__name__)
Expand All @@ -144,6 +232,37 @@ def test_add_span_processor_after_span_creation(self):

self.assertListEqual(spans_calls_list, expected_list)

def test_on_ending_not_implemented_does_not_raise(self):
tracer_provider = trace.TracerProvider()
tracer = tracer_provider.get_tracer(__name__)

spans_calls_list = [] # filled by MySpanProcessor
expected_list = [] # filled by hand

# Does not implement _on_ending
sp = MySpanProcessor("SP1", spans_calls_list)
tracer_provider.add_span_processor(sp)

try:
with tracer.start_as_current_span("foo"):
expected_list.append(span_event_start_fmt("SP1", "foo"))

with tracer.start_as_current_span("bar"):
expected_list.append(span_event_start_fmt("SP1", "bar"))

with tracer.start_as_current_span("baz"):
expected_list.append(
span_event_start_fmt("SP1", "baz")
)

expected_list.append(span_event_end_fmt("SP1", "baz"))
expected_list.append(span_event_end_fmt("SP1", "bar"))
expected_list.append(span_event_end_fmt("SP1", "foo"))
except NotImplementedError:
self.fail("_on_ending() should not raise an exception")

self.assertListEqual(spans_calls_list, expected_list)


class MultiSpanProcessorTestBase(abc.ABC):
@abc.abstractmethod
Expand Down Expand Up @@ -176,6 +295,22 @@ def test_on_start(self):
)
multi_processor.shutdown()

def test_on_ending(self):
multi_processor = self.create_multi_span_processor()

mocks = [mock.Mock(spec=trace.SpanProcessor) for _ in range(0, 5)]
for mock_processor in mocks:
multi_processor.add_span_processor(mock_processor)

span = self.create_default_span()
# pylint: disable=protected-access
multi_processor._on_ending(span)

for mock_processor in mocks:
# pylint: disable=protected-access
mock_processor._on_ending.assert_called_once_with(span)
multi_processor.shutdown()

def test_on_end(self):
multi_processor = self.create_multi_span_processor()

Expand Down Expand Up @@ -219,6 +354,43 @@ def test_force_flush(self):
self.assertEqual(1, mock_processor.force_flush.call_count)
multi_processor.shutdown()

def test_on_ending_not_implemented_does_not_raise(self):
tracer_provider = trace.TracerProvider()
tracer = tracer_provider.get_tracer(__name__)

spans_calls_list = [] # filled by MySpanProcessor
expected_list = [] # filled by hand

multi_processor = self.create_multi_span_processor()
# Does not implement _on_ending
multi_processor.add_span_processor(
MySpanProcessor("SP1", spans_calls_list)
)

tracer_provider.add_span_processor(multi_processor)

try:
with tracer.start_as_current_span("foo"):
expected_list.append(span_event_start_fmt("SP1", "foo"))

with tracer.start_as_current_span("bar"):
expected_list.append(span_event_start_fmt("SP1", "bar"))

with tracer.start_as_current_span("baz"):
expected_list.append(
span_event_start_fmt("SP1", "baz")
)

expected_list.append(span_event_end_fmt("SP1", "baz"))
expected_list.append(span_event_end_fmt("SP1", "bar"))
expected_list.append(span_event_end_fmt("SP1", "foo"))
except NotImplementedError:
# pylint: disable=no-member
self.fail("_on_ending() should not raise an exception")

# pylint: disable=no-member
self.assertListEqual(spans_calls_list, expected_list)


class TestSynchronousMultiSpanProcessor(
MultiSpanProcessorTestBase, unittest.TestCase
Expand Down
20 changes: 20 additions & 0 deletions opentelemetry-sdk/tests/trace/test_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -1428,6 +1428,10 @@ def span_event_start_fmt(span_processor_name, span_name):
return span_processor_name + ":" + span_name + ":start"


def span_event_ending_fmt(span_processor_name, span_name):
return span_processor_name + ":" + span_name + ":ending"


def span_event_end_fmt(span_processor_name, span_name):
return span_processor_name + ":" + span_name + ":end"

Expand All @@ -1442,11 +1446,15 @@ def on_start(
) -> None:
self.span_list.append(span_event_start_fmt(self.name, span.name))

def _on_ending(self, span: "trace.ReadableSpan") -> None:
self.span_list.append(span_event_ending_fmt(self.name, span.name))

def on_end(self, span: "trace.ReadableSpan") -> None:
self.span_list.append(span_event_end_fmt(self.name, span.name))


class TestSpanProcessor(unittest.TestCase):
# pylint: disable=too-many-statements
def test_span_processor(self):
tracer_provider = trace.TracerProvider()
tracer = tracer_provider.get_tracer(__name__)
Expand Down Expand Up @@ -1478,10 +1486,13 @@ def test_span_processor(self):
with tracer.start_as_current_span("baz"):
expected_list.append(span_event_start_fmt("SP1", "baz"))

expected_list.append(span_event_ending_fmt("SP1", "baz"))
expected_list.append(span_event_end_fmt("SP1", "baz"))

expected_list.append(span_event_ending_fmt("SP1", "bar"))
expected_list.append(span_event_end_fmt("SP1", "bar"))

expected_list.append(span_event_ending_fmt("SP1", "foo"))
expected_list.append(span_event_end_fmt("SP1", "foo"))

self.assertListEqual(spans_calls_list, expected_list)
Expand All @@ -1504,12 +1515,18 @@ def test_span_processor(self):
expected_list.append(span_event_start_fmt("SP1", "baz"))
expected_list.append(span_event_start_fmt("SP2", "baz"))

expected_list.append(span_event_ending_fmt("SP1", "baz"))
expected_list.append(span_event_ending_fmt("SP2", "baz"))
expected_list.append(span_event_end_fmt("SP1", "baz"))
expected_list.append(span_event_end_fmt("SP2", "baz"))

expected_list.append(span_event_ending_fmt("SP1", "bar"))
expected_list.append(span_event_ending_fmt("SP2", "bar"))
expected_list.append(span_event_end_fmt("SP1", "bar"))
expected_list.append(span_event_end_fmt("SP2", "bar"))

expected_list.append(span_event_ending_fmt("SP1", "foo"))
expected_list.append(span_event_ending_fmt("SP2", "foo"))
expected_list.append(span_event_end_fmt("SP1", "foo"))
expected_list.append(span_event_end_fmt("SP2", "foo"))

Expand All @@ -1532,10 +1549,13 @@ def test_add_span_processor_after_span_creation(self):
# add span processor after spans have been created
tracer_provider.add_span_processor(sp)

expected_list.append(span_event_ending_fmt("SP1", "baz"))
expected_list.append(span_event_end_fmt("SP1", "baz"))

expected_list.append(span_event_ending_fmt("SP1", "bar"))
expected_list.append(span_event_end_fmt("SP1", "bar"))

expected_list.append(span_event_ending_fmt("SP1", "foo"))
expected_list.append(span_event_end_fmt("SP1", "foo"))

self.assertListEqual(spans_calls_list, expected_list)
Expand Down
Loading