From 881bc76c26ae2fbd638560516a093f4195178509 Mon Sep 17 00:00:00 2001 From: Andrey Karabanov Date: Mon, 13 Oct 2025 21:20:22 +0300 Subject: [PATCH 1/6] feature: hydrate AsyncPG span attributes at creation (#3643) --- .../instrumentation/asyncpg/__init__.py | 35 +++++++++---------- 1 file changed, 16 insertions(+), 19 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-asyncpg/src/opentelemetry/instrumentation/asyncpg/__init__.py b/instrumentation/opentelemetry-instrumentation-asyncpg/src/opentelemetry/instrumentation/asyncpg/__init__.py index daba570d6e..c8aba9bbf3 100644 --- a/instrumentation/opentelemetry-instrumentation-asyncpg/src/opentelemetry/instrumentation/asyncpg/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-asyncpg/src/opentelemetry/instrumentation/asyncpg/__init__.py @@ -178,18 +178,16 @@ async def _do_execute(self, func, instance, args, kwargs): except IndexError: name = "" + # Hydrate attributes before span creation to enable filtering + span_attributes = _hydrate_span_from_args( + instance, + args[0], + args[1:] if self.capture_parameters else None, + ) + with self._tracer.start_as_current_span( - name, kind=SpanKind.CLIENT + name, kind=SpanKind.CLIENT, attributes=span_attributes ) as span: - if span.is_recording(): - span_attributes = _hydrate_span_from_args( - instance, - args[0], - args[1:] if self.capture_parameters else None, - ) - for attribute, value in span_attributes.items(): - span.set_attribute(attribute, value) - try: result = await func(*args, **kwargs) except Exception as exc: # pylint: disable=W0703 @@ -217,20 +215,19 @@ async def _do_cursor_execute(self, func, instance, args, kwargs): except IndexError: name = "" + # Hydrate attributes before span creation to enable filtering + span_attributes = _hydrate_span_from_args( + instance._connection, + instance._query, + instance._args if self.capture_parameters else None, + ) + stop = False with self._tracer.start_as_current_span( f"CURSOR: {name}", kind=SpanKind.CLIENT, + attributes=span_attributes, ) as span: - if span.is_recording(): - span_attributes = _hydrate_span_from_args( - instance._connection, - instance._query, - instance._args if self.capture_parameters else None, - ) - for attribute, value in span_attributes.items(): - span.set_attribute(attribute, value) - try: result = await func(*args, **kwargs) except StopAsyncIteration: From 6cadc668f86379a47d270aa6a4573b0c99ca013b Mon Sep 17 00:00:00 2001 From: Andrey Date: Sun, 7 Dec 2025 22:56:24 +0300 Subject: [PATCH 2/6] opentelemetry-instrumentation-asyncpg: ensure sampler sees span attributes --- CHANGELOG.md | 2 + .../tests/asyncpg/test_asyncpg_functional.py | 124 ++++++++++++++++++ 2 files changed, 126 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9cbbe7aa3b..e9038db9d7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -70,6 +70,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#3936](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3936)) - `opentelemetry-instrumentation-aiohttp-client`: Update instrumentor to respect suppressing http instrumentation ([#3957](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3957)) +- `opentelemetry-instrumentation-asyncpg`: Hydrate span attributes before creation so samplers can filter on database details + ([#3643](https://github.com/open-telemetry/opentelemetry-python-contrib/issues/3643)) ## Version 1.38.0/0.59b0 (2025-10-16) diff --git a/tests/opentelemetry-docker-tests/tests/asyncpg/test_asyncpg_functional.py b/tests/opentelemetry-docker-tests/tests/asyncpg/test_asyncpg_functional.py index b89880e6eb..cac5d0a094 100644 --- a/tests/opentelemetry-docker-tests/tests/asyncpg/test_asyncpg_functional.py +++ b/tests/opentelemetry-docker-tests/tests/asyncpg/test_asyncpg_functional.py @@ -1,10 +1,13 @@ import asyncio import os +import unittest from collections import namedtuple from unittest.mock import patch import asyncpg +from opentelemetry import trace + from opentelemetry.instrumentation.asyncpg import AsyncPGInstrumentor from opentelemetry.semconv.trace import SpanAttributes from opentelemetry.test.test_base import TestBase @@ -425,3 +428,124 @@ def test_instrumented_fetch_method_broken_asyncpg(self, *_, **__): self.assertIs(StatusCode.UNSET, spans[0].status.status_code) self.assertEqual(spans[0].name, "postgresql") self.assertEqual(spans[0].attributes[SpanAttributes.DB_STATEMENT], "") + + +class _NonRecordingSpan: + def __init__(self, recording=False): + self._recording = recording + self.set_status_calls = 0 + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def is_recording(self): + return self._recording + + def set_status(self, status): + self.set_status_calls += 1 + self.status = status + + +class _CapturingTracer: + def __init__(self, span): + self.span = span + self.started_spans = [] + + def start_as_current_span(self, name, kind=None, attributes=None): + self.started_spans.append( + {"name": name, "kind": kind, "attributes": attributes} + ) + return self.span + + +class _FakeParams: + def __init__(self, database="testdb", user="dbuser"): + self.database = database + self.user = user + + +class _FakeConnection: + def __init__(self): + self._params = _FakeParams() + self._addr = ("db.example.com", 5432) + + +class _FakeCursor: + def __init__(self, connection, query, args): + self._connection = connection + self._query = query + self._args = args + + +class TestAsyncPGSamplingAttributes(unittest.TestCase): + def setUp(self): + self.connection = _FakeConnection() + self.span = _NonRecordingSpan(recording=False) + self.tracer = _CapturingTracer(self.span) + self.instrumentor = AsyncPGInstrumentor(capture_parameters=True) + self.instrumentor._tracer = self.tracer + + def test_attributes_available_when_span_not_recording(self): + async def _fake_execute(*args, **kwargs): + return "ok" + + async_call( + self.instrumentor._do_execute( + _fake_execute, + self.connection, + ("SELECT $1", "42"), + {}, + ) + ) + + self.assertEqual(len(self.tracer.started_spans), 1) + span_info = self.tracer.started_spans[0] + self.assertEqual( + span_info["attributes"], + { + SpanAttributes.DB_SYSTEM: "postgresql", + SpanAttributes.DB_NAME: "testdb", + SpanAttributes.DB_USER: "dbuser", + SpanAttributes.NET_PEER_NAME: "db.example.com", + SpanAttributes.NET_PEER_PORT: 5432, + SpanAttributes.DB_STATEMENT: "SELECT $1", + "db.statement.parameters": "('42',)", + }, + ) + self.assertEqual(span_info["kind"], trace.SpanKind.CLIENT) + self.assertEqual(span_info["name"], "SELECT") + + def test_cursor_attributes_available_when_span_not_recording(self): + async def _fake_cursor_execute(*args, **kwargs): + return "ok" + + cursor = _FakeCursor(self.connection, "SELECT $1", ("99",)) + + async_call( + self.instrumentor._do_cursor_execute( + _fake_cursor_execute, + cursor, + (), + {}, + ) + ) + + self.assertEqual(len(self.tracer.started_spans), 1) + span_info = self.tracer.started_spans[0] + self.assertEqual( + span_info["attributes"], + { + SpanAttributes.DB_SYSTEM: "postgresql", + SpanAttributes.DB_NAME: "testdb", + SpanAttributes.DB_USER: "dbuser", + SpanAttributes.NET_PEER_NAME: "db.example.com", + SpanAttributes.NET_PEER_PORT: 5432, + SpanAttributes.DB_STATEMENT: "SELECT $1", + "db.statement.parameters": "('99',)", + }, + ) + self.assertEqual(span_info["kind"], trace.SpanKind.CLIENT) + self.assertEqual(span_info["name"], "CURSOR: SELECT") From e99879c4de492f791f82ad0f009b31d6ce3d2f92 Mon Sep 17 00:00:00 2001 From: Andrey Date: Tue, 9 Dec 2025 10:18:21 +0300 Subject: [PATCH 3/6] tests: fix import order in asyncpg docker tests --- .../tests/asyncpg/test_asyncpg_functional.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/opentelemetry-docker-tests/tests/asyncpg/test_asyncpg_functional.py b/tests/opentelemetry-docker-tests/tests/asyncpg/test_asyncpg_functional.py index cac5d0a094..7a72de5776 100644 --- a/tests/opentelemetry-docker-tests/tests/asyncpg/test_asyncpg_functional.py +++ b/tests/opentelemetry-docker-tests/tests/asyncpg/test_asyncpg_functional.py @@ -7,7 +7,6 @@ import asyncpg from opentelemetry import trace - from opentelemetry.instrumentation.asyncpg import AsyncPGInstrumentor from opentelemetry.semconv.trace import SpanAttributes from opentelemetry.test.test_base import TestBase From 32136cf4de0d04ef873699afba62c7e94117347b Mon Sep 17 00:00:00 2001 From: Andrey Date: Tue, 9 Dec 2025 10:25:54 +0300 Subject: [PATCH 4/6] tests: expect net.transport in asyncpg sampler attributes --- .../tests/asyncpg/test_asyncpg_functional.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/opentelemetry-docker-tests/tests/asyncpg/test_asyncpg_functional.py b/tests/opentelemetry-docker-tests/tests/asyncpg/test_asyncpg_functional.py index 7a72de5776..700d11637e 100644 --- a/tests/opentelemetry-docker-tests/tests/asyncpg/test_asyncpg_functional.py +++ b/tests/opentelemetry-docker-tests/tests/asyncpg/test_asyncpg_functional.py @@ -510,6 +510,7 @@ async def _fake_execute(*args, **kwargs): SpanAttributes.DB_USER: "dbuser", SpanAttributes.NET_PEER_NAME: "db.example.com", SpanAttributes.NET_PEER_PORT: 5432, + SpanAttributes.NET_TRANSPORT: "ip_tcp", SpanAttributes.DB_STATEMENT: "SELECT $1", "db.statement.parameters": "('42',)", }, @@ -542,6 +543,7 @@ async def _fake_cursor_execute(*args, **kwargs): SpanAttributes.DB_USER: "dbuser", SpanAttributes.NET_PEER_NAME: "db.example.com", SpanAttributes.NET_PEER_PORT: 5432, + SpanAttributes.NET_TRANSPORT: "ip_tcp", SpanAttributes.DB_STATEMENT: "SELECT $1", "db.statement.parameters": "('99',)", }, From 2fc1b821503227d2ad69571241787044476e88ef Mon Sep 17 00:00:00 2001 From: Andrey Date: Tue, 9 Dec 2025 22:30:51 +0300 Subject: [PATCH 5/6] tests: use sdk tracer in asyncpg sampling checks --- .../tests/asyncpg/test_asyncpg_functional.py | 68 +++++++------------ 1 file changed, 23 insertions(+), 45 deletions(-) diff --git a/tests/opentelemetry-docker-tests/tests/asyncpg/test_asyncpg_functional.py b/tests/opentelemetry-docker-tests/tests/asyncpg/test_asyncpg_functional.py index 700d11637e..e14ba07271 100644 --- a/tests/opentelemetry-docker-tests/tests/asyncpg/test_asyncpg_functional.py +++ b/tests/opentelemetry-docker-tests/tests/asyncpg/test_asyncpg_functional.py @@ -1,6 +1,5 @@ import asyncio import os -import unittest from collections import namedtuple from unittest.mock import patch @@ -8,6 +7,11 @@ from opentelemetry import trace from opentelemetry.instrumentation.asyncpg import AsyncPGInstrumentor +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, +) from opentelemetry.semconv.trace import SpanAttributes from opentelemetry.test.test_base import TestBase from opentelemetry.trace import StatusCode @@ -429,37 +433,6 @@ def test_instrumented_fetch_method_broken_asyncpg(self, *_, **__): self.assertEqual(spans[0].attributes[SpanAttributes.DB_STATEMENT], "") -class _NonRecordingSpan: - def __init__(self, recording=False): - self._recording = recording - self.set_status_calls = 0 - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc, tb): - return False - - def is_recording(self): - return self._recording - - def set_status(self, status): - self.set_status_calls += 1 - self.status = status - - -class _CapturingTracer: - def __init__(self, span): - self.span = span - self.started_spans = [] - - def start_as_current_span(self, name, kind=None, attributes=None): - self.started_spans.append( - {"name": name, "kind": kind, "attributes": attributes} - ) - return self.span - - class _FakeParams: def __init__(self, database="testdb", user="dbuser"): self.database = database @@ -479,11 +452,14 @@ def __init__(self, connection, query, args): self._args = args -class TestAsyncPGSamplingAttributes(unittest.TestCase): +class TestAsyncPGSamplingAttributes(TestBase): def setUp(self): + super().setUp() self.connection = _FakeConnection() - self.span = _NonRecordingSpan(recording=False) - self.tracer = _CapturingTracer(self.span) + self.exporter = InMemorySpanExporter() + tracer_provider = TracerProvider() + tracer_provider.add_span_processor(SimpleSpanProcessor(self.exporter)) + self.tracer = tracer_provider.get_tracer(__name__) self.instrumentor = AsyncPGInstrumentor(capture_parameters=True) self.instrumentor._tracer = self.tracer @@ -500,10 +476,11 @@ async def _fake_execute(*args, **kwargs): ) ) - self.assertEqual(len(self.tracer.started_spans), 1) - span_info = self.tracer.started_spans[0] + spans = self.exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] self.assertEqual( - span_info["attributes"], + span.attributes, { SpanAttributes.DB_SYSTEM: "postgresql", SpanAttributes.DB_NAME: "testdb", @@ -515,8 +492,8 @@ async def _fake_execute(*args, **kwargs): "db.statement.parameters": "('42',)", }, ) - self.assertEqual(span_info["kind"], trace.SpanKind.CLIENT) - self.assertEqual(span_info["name"], "SELECT") + self.assertEqual(span.kind, trace.SpanKind.CLIENT) + self.assertEqual(span.name, "SELECT") def test_cursor_attributes_available_when_span_not_recording(self): async def _fake_cursor_execute(*args, **kwargs): @@ -533,10 +510,11 @@ async def _fake_cursor_execute(*args, **kwargs): ) ) - self.assertEqual(len(self.tracer.started_spans), 1) - span_info = self.tracer.started_spans[0] + spans = self.exporter.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] self.assertEqual( - span_info["attributes"], + span.attributes, { SpanAttributes.DB_SYSTEM: "postgresql", SpanAttributes.DB_NAME: "testdb", @@ -548,5 +526,5 @@ async def _fake_cursor_execute(*args, **kwargs): "db.statement.parameters": "('99',)", }, ) - self.assertEqual(span_info["kind"], trace.SpanKind.CLIENT) - self.assertEqual(span_info["name"], "CURSOR: SELECT") + self.assertEqual(span.kind, trace.SpanKind.CLIENT) + self.assertEqual(span.name, "CURSOR: SELECT") From 8d8eee35f6ce7007879089ffafcac8b1556be8d6 Mon Sep 17 00:00:00 2001 From: Andrey Date: Wed, 10 Dec 2025 00:24:21 +0300 Subject: [PATCH 6/6] changelog: move asyncpg fix to Unreleased --- CHANGELOG.md | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e5064cb6ba..c72d3d842e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +### Fixed + +- `opentelemetry-instrumentation-asyncpg`: Hydrate span attributes before creation so samplers can filter on database details + ([#3643](https://github.com/open-telemetry/opentelemetry-python-contrib/issues/3643)) + ## Version 1.39.0/0.60b0 (2025-12-03) ### Added @@ -73,9 +78,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#3936](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3936)) - `opentelemetry-instrumentation-aiohttp-client`: Update instrumentor to respect suppressing http instrumentation ([#3957](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3957)) -- `opentelemetry-instrumentation-asyncpg`: Hydrate span attributes before creation so samplers can filter on database details - ([#3643](https://github.com/open-telemetry/opentelemetry-python-contrib/issues/3643)) - ## Version 1.38.0/0.59b0 (2025-10-16) ### Fixed