Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#3936](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3936))
- `opentelemetry-instrumentation-aiohttp-client`: Update instrumentor to respect suppressing http instrumentation
([#3957](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3957))
- `opentelemetry-instrumentation-asyncpg`: Hydrate span attributes before creation so samplers can filter on database details
([#3643](https://github.com/open-telemetry/opentelemetry-python-contrib/issues/3643))

## Version 1.38.0/0.59b0 (2025-10-16)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,18 +178,16 @@ async def _do_execute(self, func, instance, args, kwargs):
except IndexError:
name = ""

# Hydrate attributes before span creation to enable filtering
span_attributes = _hydrate_span_from_args(
instance,
args[0],
args[1:] if self.capture_parameters else None,
)

with self._tracer.start_as_current_span(
name, kind=SpanKind.CLIENT
name, kind=SpanKind.CLIENT, attributes=span_attributes
) as span:
if span.is_recording():
span_attributes = _hydrate_span_from_args(
instance,
args[0],
args[1:] if self.capture_parameters else None,
)
for attribute, value in span_attributes.items():
span.set_attribute(attribute, value)

try:
result = await func(*args, **kwargs)
except Exception as exc: # pylint: disable=W0703
Expand Down Expand Up @@ -217,20 +215,19 @@ async def _do_cursor_execute(self, func, instance, args, kwargs):
except IndexError:
name = ""

# Hydrate attributes before span creation to enable filtering
span_attributes = _hydrate_span_from_args(
instance._connection,
instance._query,
instance._args if self.capture_parameters else None,
)

stop = False
with self._tracer.start_as_current_span(
f"CURSOR: {name}",
kind=SpanKind.CLIENT,
attributes=span_attributes,
) as span:
if span.is_recording():
span_attributes = _hydrate_span_from_args(
instance._connection,
instance._query,
instance._args if self.capture_parameters else None,
)
for attribute, value in span_attributes.items():
span.set_attribute(attribute, value)

try:
result = await func(*args, **kwargs)
except StopAsyncIteration:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import asyncio
import os
import unittest
from collections import namedtuple
from unittest.mock import patch

import asyncpg

from opentelemetry import trace
from opentelemetry.instrumentation.asyncpg import AsyncPGInstrumentor
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.test.test_base import TestBase
Expand Down Expand Up @@ -425,3 +427,126 @@ def test_instrumented_fetch_method_broken_asyncpg(self, *_, **__):
self.assertIs(StatusCode.UNSET, spans[0].status.status_code)
self.assertEqual(spans[0].name, "postgresql")
self.assertEqual(spans[0].attributes[SpanAttributes.DB_STATEMENT], "")


class _NonRecordingSpan:
def __init__(self, recording=False):
self._recording = recording
self.set_status_calls = 0

def __enter__(self):
return self

def __exit__(self, exc_type, exc, tb):
return False

def is_recording(self):
return self._recording

def set_status(self, status):
self.set_status_calls += 1
self.status = status


class _CapturingTracer:
def __init__(self, span):
self.span = span
self.started_spans = []

def start_as_current_span(self, name, kind=None, attributes=None):
self.started_spans.append(
{"name": name, "kind": kind, "attributes": attributes}
)
return self.span


class _FakeParams:
def __init__(self, database="testdb", user="dbuser"):
self.database = database
self.user = user


class _FakeConnection:
def __init__(self):
self._params = _FakeParams()
self._addr = ("db.example.com", 5432)


class _FakeCursor:
def __init__(self, connection, query, args):
self._connection = connection
self._query = query
self._args = args


class TestAsyncPGSamplingAttributes(unittest.TestCase):
def setUp(self):
self.connection = _FakeConnection()
self.span = _NonRecordingSpan(recording=False)
self.tracer = _CapturingTracer(self.span)
self.instrumentor = AsyncPGInstrumentor(capture_parameters=True)
self.instrumentor._tracer = self.tracer

def test_attributes_available_when_span_not_recording(self):
async def _fake_execute(*args, **kwargs):
return "ok"

async_call(
self.instrumentor._do_execute(
_fake_execute,
self.connection,
("SELECT $1", "42"),
{},
)
)

self.assertEqual(len(self.tracer.started_spans), 1)
span_info = self.tracer.started_spans[0]
self.assertEqual(
span_info["attributes"],
{
SpanAttributes.DB_SYSTEM: "postgresql",
SpanAttributes.DB_NAME: "testdb",
SpanAttributes.DB_USER: "dbuser",
SpanAttributes.NET_PEER_NAME: "db.example.com",
SpanAttributes.NET_PEER_PORT: 5432,
SpanAttributes.NET_TRANSPORT: "ip_tcp",
SpanAttributes.DB_STATEMENT: "SELECT $1",
"db.statement.parameters": "('42',)",
},
)
self.assertEqual(span_info["kind"], trace.SpanKind.CLIENT)
self.assertEqual(span_info["name"], "SELECT")

def test_cursor_attributes_available_when_span_not_recording(self):
async def _fake_cursor_execute(*args, **kwargs):
return "ok"

cursor = _FakeCursor(self.connection, "SELECT $1", ("99",))

async_call(
self.instrumentor._do_cursor_execute(
_fake_cursor_execute,
cursor,
(),
{},
)
)

self.assertEqual(len(self.tracer.started_spans), 1)
span_info = self.tracer.started_spans[0]
self.assertEqual(
span_info["attributes"],
{
SpanAttributes.DB_SYSTEM: "postgresql",
SpanAttributes.DB_NAME: "testdb",
SpanAttributes.DB_USER: "dbuser",
SpanAttributes.NET_PEER_NAME: "db.example.com",
SpanAttributes.NET_PEER_PORT: 5432,
SpanAttributes.NET_TRANSPORT: "ip_tcp",
SpanAttributes.DB_STATEMENT: "SELECT $1",
"db.statement.parameters": "('99',)",
},
)
self.assertEqual(span_info["kind"], trace.SpanKind.CLIENT)
self.assertEqual(span_info["name"], "CURSOR: SELECT")