Skip to content

Commit 9e629da

Browse files
authored
Rearrange span processors to avoid repeating scrubbing and other tweaking (#658)
1 parent 7e6b140 commit 9e629da

File tree

6 files changed

+172
-119
lines changed

6 files changed

+172
-119
lines changed

logfire/_internal/config.py

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
from opentelemetry.sdk.metrics.export import AggregationTemporality, MetricReader, PeriodicExportingMetricReader
4545
from opentelemetry.sdk.metrics.view import ExponentialBucketHistogramAggregation, View
4646
from opentelemetry.sdk.resources import Resource
47-
from opentelemetry.sdk.trace import SpanProcessor, TracerProvider as SDKTracerProvider
47+
from opentelemetry.sdk.trace import SpanProcessor, SynchronousMultiSpanProcessor, TracerProvider as SDKTracerProvider
4848
from opentelemetry.sdk.trace.export import BatchSpanProcessor, SimpleSpanProcessor
4949
from opentelemetry.sdk.trace.id_generator import IdGenerator
5050
from opentelemetry.sdk.trace.sampling import ParentBasedTraceIdRatio, Sampler
@@ -79,7 +79,7 @@
7979
from .exporters.fallback import FallbackSpanExporter
8080
from .exporters.file import FileSpanExporter
8181
from .exporters.otlp import OTLPExporterHttpSession, RetryFewerSpansSpanExporter
82-
from .exporters.processor_wrapper import MainSpanProcessorWrapper
82+
from .exporters.processor_wrapper import CheckSuppressInstrumentationProcessorWrapper, MainSpanProcessorWrapper
8383
from .exporters.quiet_metrics import QuietMetricExporter
8484
from .exporters.remove_pending import RemovePendingSpansExporter
8585
from .exporters.test import TestExporter
@@ -764,20 +764,22 @@ def _initialize(self) -> None:
764764
self._tracer_provider.set_provider(tracer_provider) # do we need to shut down the existing one???
765765

766766
processors_with_pending_spans: list[SpanProcessor] = []
767+
root_processor = main_multiprocessor = SynchronousMultiSpanProcessor()
768+
if self.sampling.tail:
769+
root_processor = TailSamplingProcessor(root_processor, self.sampling.tail)
770+
tracer_provider.add_span_processor(
771+
CheckSuppressInstrumentationProcessorWrapper(
772+
MainSpanProcessorWrapper(root_processor, self.scrubber),
773+
)
774+
)
767775

768776
def add_span_processor(span_processor: SpanProcessor) -> None:
769-
# Some span processors added to the tracer provider should also be recorded in
770-
# `processors_with_pending_spans` so that they can be used by the final pending span processor.
771-
# This means that `tracer_provider.add_span_processor` should only appear in two places.
777+
main_multiprocessor.add_span_processor(span_processor)
778+
772779
has_pending = isinstance(
773780
getattr(span_processor, 'span_exporter', None),
774781
(TestExporter, RemovePendingSpansExporter, SimpleConsoleSpanExporter),
775782
)
776-
777-
if self.sampling.tail:
778-
span_processor = TailSamplingProcessor(span_processor, self.sampling.tail)
779-
span_processor = MainSpanProcessorWrapper(span_processor, self.scrubber)
780-
tracer_provider.add_span_processor(span_processor)
781783
if has_pending:
782784
processors_with_pending_spans.append(span_processor)
783785

@@ -877,8 +879,14 @@ def check_token():
877879
]
878880

879881
if processors_with_pending_spans:
880-
tracer_provider.add_span_processor(
881-
PendingSpanProcessor(self.advanced.id_generator, tuple(processors_with_pending_spans))
882+
pending_multiprocessor = SynchronousMultiSpanProcessor()
883+
for processor in processors_with_pending_spans:
884+
pending_multiprocessor.add_span_processor(processor)
885+
886+
main_multiprocessor.add_span_processor(
887+
PendingSpanProcessor(
888+
self.advanced.id_generator, MainSpanProcessorWrapper(pending_multiprocessor, self.scrubber)
889+
)
882890
)
883891

884892
otlp_endpoint = os.getenv(OTEL_EXPORTER_OTLP_ENDPOINT)

logfire/_internal/exporters/processor_wrapper.py

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
from __future__ import annotations
22

3+
from dataclasses import dataclass
34
from urllib.parse import parse_qs, urlparse
45

56
from opentelemetry import context
6-
from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor
7+
from opentelemetry.sdk.trace import ReadableSpan, Span
78
from opentelemetry.sdk.util.instrumentation import InstrumentationScope
89
from opentelemetry.semconv.trace import SpanAttributes
910
from opentelemetry.trace import Status, StatusCode
@@ -31,31 +32,44 @@
3132
from .wrapper import WrapperSpanProcessor
3233

3334

35+
class CheckSuppressInstrumentationProcessorWrapper(WrapperSpanProcessor):
36+
"""Checks if instrumentation is suppressed, then suppresses instrumentation itself.
37+
38+
Placed at the root of the tree of processors.
39+
"""
40+
41+
def on_start(self, span: Span, parent_context: context.Context | None = None) -> None:
42+
if is_instrumentation_suppressed():
43+
return
44+
with logfire.suppress_instrumentation():
45+
super().on_start(span, parent_context)
46+
47+
def on_end(self, span: ReadableSpan) -> None:
48+
if is_instrumentation_suppressed():
49+
return
50+
with logfire.suppress_instrumentation():
51+
super().on_end(span)
52+
53+
54+
@dataclass
3455
class MainSpanProcessorWrapper(WrapperSpanProcessor):
3556
"""Wrapper around other processors to intercept starting and ending spans with our own global logic.
3657
3758
Suppresses starting/ending if the current context has a `suppress_instrumentation` value.
3859
Tweaks the send/receive span names generated by the ASGI middleware.
3960
"""
4061

41-
def __init__(self, processor: SpanProcessor, scrubber: BaseScrubber) -> None:
42-
super().__init__(processor)
43-
self.scrubber = scrubber
62+
scrubber: BaseScrubber
4463

4564
def on_start(
4665
self,
4766
span: Span,
4867
parent_context: context.Context | None = None,
4968
) -> None:
50-
if is_instrumentation_suppressed():
51-
return
5269
_set_log_level_on_asgi_send_receive_spans(span)
53-
with logfire.suppress_instrumentation():
54-
super().on_start(span, parent_context)
70+
super().on_start(span, parent_context)
5571

5672
def on_end(self, span: ReadableSpan) -> None:
57-
if is_instrumentation_suppressed():
58-
return
5973
span_dict = span_to_dict(span)
6074
_tweak_asgi_send_receive_spans(span_dict)
6175
_tweak_sqlalchemy_connect_spans(span_dict)
@@ -64,8 +78,7 @@ def on_end(self, span: ReadableSpan) -> None:
6478
_set_error_level_and_status(span_dict)
6579
self.scrubber.scrub_span(span_dict)
6680
span = ReadableSpan(**span_dict)
67-
with logfire.suppress_instrumentation():
68-
super().on_end(span)
81+
super().on_end(span)
6982

7083

7184
def _set_error_level_and_status(span: ReadableSpanDict) -> None:

logfire/_internal/exporters/wrapper.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import annotations
22

3+
from dataclasses import dataclass
34
from typing import Any, Sequence
45

56
from opentelemetry import context
@@ -49,11 +50,11 @@ def shutdown(self, timeout_millis: float = 30_000, **kwargs: Any) -> None:
4950
self.wrapped_exporter.shutdown(timeout_millis, **kwargs) # type: ignore
5051

5152

53+
@dataclass
5254
class WrapperSpanProcessor(SpanProcessor):
5355
"""A base class for SpanProcessors that wrap another processor."""
5456

55-
def __init__(self, processor: SpanProcessor) -> None:
56-
self.processor = processor
57+
processor: SpanProcessor
5758

5859
def on_start(self, span: Span, parent_context: context.Context | None = None) -> None:
5960
self.processor.on_start(span, parent_context)

logfire/_internal/tracer.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -237,11 +237,14 @@ def start_span(self, name: str, context: Context | None = None, *args: Any, **kw
237237
class PendingSpanProcessor(SpanProcessor):
238238
"""Span processor that emits an extra pending span for each span as it starts.
239239
240-
The pending span is emitted by calling `on_end` on all other processors.
240+
The pending span is emitted by calling `on_end` on the inner `processor`.
241+
This is intentionally not a `WrapperSpanProcessor` to avoid the default implementations of `on_end`
242+
and `shutdown`. This processor is expected to contain processors which are already included
243+
elsewhere in the pipeline where `on_end` and `shutdown` are called normally.
241244
"""
242245

243246
id_generator: IdGenerator
244-
other_processors: tuple[SpanProcessor, ...]
247+
processor: SpanProcessor
245248

246249
def on_start(
247250
self,
@@ -250,7 +253,7 @@ def on_start(
250253
) -> None:
251254
assert isinstance(span, ReadableSpan) and isinstance(span, Span)
252255
if not span.is_recording(): # pragma: no cover
253-
# Span was sampled out
256+
# Span was sampled out, or has finished already (happens with tail sampling)
254257
return
255258

256259
attributes = span.attributes
@@ -295,8 +298,7 @@ def on_start(
295298
end_time=start_and_end_time,
296299
instrumentation_scope=span.instrumentation_scope,
297300
)
298-
for processor in self.other_processors:
299-
processor.on_end(pending_span)
301+
self.processor.on_end(pending_span)
300302

301303

302304
def should_sample(span_context: SpanContext, attributes: Mapping[str, otel_types.AttributeValue]) -> bool:

tests/test_configure.py

Lines changed: 34 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
from opentelemetry.metrics import NoOpMeterProvider, get_meter_provider
2222
from opentelemetry.sdk.metrics._internal.export import PeriodicExportingMetricReader
2323
from opentelemetry.sdk.metrics.export import InMemoryMetricReader
24-
from opentelemetry.sdk.trace import ReadableSpan, SpanProcessor
24+
from opentelemetry.sdk.trace import ReadableSpan, SpanProcessor, SynchronousMultiSpanProcessor
2525
from opentelemetry.sdk.trace.export import (
2626
BatchSpanProcessor,
2727
ConsoleSpanExporter,
@@ -46,7 +46,10 @@
4646
from logfire._internal.exporters.console import ShowParentsConsoleSpanExporter
4747
from logfire._internal.exporters.fallback import FallbackSpanExporter
4848
from logfire._internal.exporters.file import WritingFallbackWarning
49-
from logfire._internal.exporters.processor_wrapper import MainSpanProcessorWrapper
49+
from logfire._internal.exporters.processor_wrapper import (
50+
CheckSuppressInstrumentationProcessorWrapper,
51+
MainSpanProcessorWrapper,
52+
)
5053
from logfire._internal.exporters.quiet_metrics import QuietMetricExporter
5154
from logfire._internal.exporters.remove_pending import RemovePendingSpansExporter
5255
from logfire._internal.exporters.wrapper import WrapperSpanExporter
@@ -570,11 +573,9 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
570573
)
571574
wait_for_check_token_thread()
572575

573-
send_to_logfire_processor, *_ = get_span_processors()
576+
batch_span_processor, *_ = get_span_processors()
574577
# It's OK if these processor/exporter types change.
575578
# We just need access to the FallbackSpanExporter either way to swap out its underlying exporter.
576-
assert isinstance(send_to_logfire_processor, MainSpanProcessorWrapper)
577-
batch_span_processor = send_to_logfire_processor.processor
578579
assert isinstance(batch_span_processor, BatchSpanProcessor)
579580
exporter = batch_span_processor.span_exporter
580581
assert isinstance(exporter, WrapperSpanExporter)
@@ -622,9 +623,7 @@ def configure_tracking_exporter():
622623
)
623624
wait_for_check_token_thread()
624625

625-
send_to_logfire_processor, *_ = get_span_processors()
626-
assert isinstance(send_to_logfire_processor, MainSpanProcessorWrapper)
627-
batch_span_processor = send_to_logfire_processor.processor
626+
batch_span_processor, *_ = get_span_processors()
628627
assert isinstance(batch_span_processor, BatchSpanProcessor)
629628

630629
batch_span_processor.span_exporter = TrackingExporter()
@@ -1460,16 +1459,19 @@ def test_default_exporters(monkeypatch: pytest.MonkeyPatch):
14601459

14611460
[console_processor, send_to_logfire_processor, pending_span_processor] = get_span_processors()
14621461

1463-
assert isinstance(console_processor, MainSpanProcessorWrapper)
1464-
assert isinstance(console_processor.processor, SimpleSpanProcessor)
1465-
assert isinstance(console_processor.processor.span_exporter, ShowParentsConsoleSpanExporter)
1462+
assert isinstance(console_processor, SimpleSpanProcessor)
1463+
assert isinstance(console_processor.span_exporter, ShowParentsConsoleSpanExporter)
14661464

1467-
assert isinstance(send_to_logfire_processor, MainSpanProcessorWrapper)
1468-
assert isinstance(send_to_logfire_processor.processor, BatchSpanProcessor)
1469-
assert isinstance(send_to_logfire_processor.processor.span_exporter, RemovePendingSpansExporter)
1465+
assert isinstance(send_to_logfire_processor, BatchSpanProcessor)
1466+
assert isinstance(send_to_logfire_processor.span_exporter, RemovePendingSpansExporter)
14701467

14711468
assert isinstance(pending_span_processor, PendingSpanProcessor)
1472-
assert pending_span_processor.other_processors == (console_processor, send_to_logfire_processor)
1469+
assert isinstance(pending_span_processor.processor, MainSpanProcessorWrapper)
1470+
assert isinstance(pending_span_processor.processor.processor, SynchronousMultiSpanProcessor)
1471+
assert pending_span_processor.processor.processor._span_processors == ( # type: ignore
1472+
console_processor,
1473+
send_to_logfire_processor,
1474+
)
14731475

14741476
[logfire_metric_reader] = get_metric_readers()
14751477
assert isinstance(logfire_metric_reader, PeriodicExportingMetricReader)
@@ -1487,9 +1489,8 @@ def test_custom_exporters():
14871489
metrics=logfire.MetricsOptions(additional_readers=[custom_metric_reader]),
14881490
)
14891491

1490-
[custom_processor_wrapper] = get_span_processors()
1491-
assert isinstance(custom_processor_wrapper, MainSpanProcessorWrapper)
1492-
assert custom_processor_wrapper.processor is custom_span_processor
1492+
[custom_span_processor2] = get_span_processors()
1493+
assert custom_span_processor2 is custom_span_processor
14931494

14941495
[custom_metric_reader2] = get_metric_readers()
14951496
assert custom_metric_reader2 is custom_metric_reader
@@ -1501,10 +1502,9 @@ def test_otel_exporter_otlp_endpoint_env_var():
15011502
logfire.configure(send_to_logfire=False, console=False)
15021503

15031504
[otel_processor] = get_span_processors()
1504-
assert isinstance(otel_processor, MainSpanProcessorWrapper)
1505-
assert isinstance(otel_processor.processor, BatchSpanProcessor)
1506-
assert isinstance(otel_processor.processor.span_exporter, OTLPSpanExporter)
1507-
assert otel_processor.processor.span_exporter._endpoint == 'otel_endpoint/v1/traces' # type: ignore
1505+
assert isinstance(otel_processor, BatchSpanProcessor)
1506+
assert isinstance(otel_processor.span_exporter, OTLPSpanExporter)
1507+
assert otel_processor.span_exporter._endpoint == 'otel_endpoint/v1/traces' # type: ignore
15081508

15091509
[otel_metric_reader] = get_metric_readers()
15101510
assert isinstance(otel_metric_reader, PeriodicExportingMetricReader)
@@ -1531,10 +1531,9 @@ def test_otel_metrics_exporter_env_var():
15311531
logfire.configure(send_to_logfire=False, console=False)
15321532

15331533
[otel_processor] = get_span_processors()
1534-
assert isinstance(otel_processor, MainSpanProcessorWrapper)
1535-
assert isinstance(otel_processor.processor, BatchSpanProcessor)
1536-
assert isinstance(otel_processor.processor.span_exporter, OTLPSpanExporter)
1537-
assert otel_processor.processor.span_exporter._endpoint == 'otel_endpoint3/v1/traces' # type: ignore
1534+
assert isinstance(otel_processor, BatchSpanProcessor)
1535+
assert isinstance(otel_processor.span_exporter, OTLPSpanExporter)
1536+
assert otel_processor.span_exporter._endpoint == 'otel_endpoint3/v1/traces' # type: ignore
15381537

15391538
assert len(list(get_metric_readers())) == 0
15401539

@@ -1545,10 +1544,9 @@ def test_otel_exporter_otlp_traces_endpoint_env_var():
15451544
logfire.configure(send_to_logfire=False, console=False)
15461545

15471546
[otel_processor] = get_span_processors()
1548-
assert isinstance(otel_processor, MainSpanProcessorWrapper)
1549-
assert isinstance(otel_processor.processor, BatchSpanProcessor)
1550-
assert isinstance(otel_processor.processor.span_exporter, OTLPSpanExporter)
1551-
assert otel_processor.processor.span_exporter._endpoint == 'otel_traces_endpoint' # type: ignore
1547+
assert isinstance(otel_processor, BatchSpanProcessor)
1548+
assert isinstance(otel_processor.span_exporter, OTLPSpanExporter)
1549+
assert otel_processor.span_exporter._endpoint == 'otel_traces_endpoint' # type: ignore
15521550

15531551
assert len(list(get_metric_readers())) == 0
15541552

@@ -1576,7 +1574,12 @@ def test_metrics_false(monkeypatch: pytest.MonkeyPatch):
15761574

15771575

15781576
def get_span_processors() -> Iterable[SpanProcessor]:
1579-
return get_tracer_provider().provider._active_span_processor._span_processors # type: ignore
1577+
[first, *rest] = get_tracer_provider().provider._active_span_processor._span_processors # type: ignore
1578+
assert isinstance(first, CheckSuppressInstrumentationProcessorWrapper)
1579+
assert isinstance(first.processor, MainSpanProcessorWrapper)
1580+
assert isinstance(first.processor.processor, SynchronousMultiSpanProcessor)
1581+
1582+
return [*first.processor.processor._span_processors, *rest] # type: ignore
15801583

15811584

15821585
def get_metric_readers() -> Iterable[SpanProcessor]:

0 commit comments

Comments
 (0)