Skip to content

Commit 9cf49ee

Browse files
author
Liudmila Molkova
authored
Tracing: don't make http span current and ignore current if parent context was provided (Azure#33971)
* don't make http span current * don't use current as parent if invalid context is extracted * fail span on cancellation
1 parent 660c4c4 commit 9cf49ee

File tree

11 files changed

+404
-179
lines changed

11 files changed

+404
-179
lines changed

eng/code-quality-reports/src/main/resources/spotbugs/spotbugs-exclude.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -566,8 +566,8 @@
566566

567567
<!-- Incorrect flagging, the surrounding hasValue() checks that get() isn't null -->
568568
<Match>
569-
<Class name="com.azure.core.http.rest.RestProxy"/>
570-
<Method name="endTracingSpan"/>
569+
<Class name="com.azure.core.implementation.http.rest.AsyncRestProxy"/>
570+
<Method name="~(.*)\$endSpanWhenDone\$(.*)"/>
571571
<Bug pattern="NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE"/>
572572
</Match>
573573

sdk/core/azure-core-tracing-opentelemetry/src/main/java/com/azure/core/tracing/opentelemetry/OpenTelemetryTracer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ private SpanBuilder createSpanBuilder(String spanName,
185185
Context remoteParentContext = options.getRemoteParent();
186186
SpanContext remoteSpanContext = remoteParentContext == null ? null : getOrNull(remoteParentContext, SPAN_CONTEXT_KEY, SpanContext.class);
187187

188-
if (remoteSpanContext != null && remoteSpanContext.isValid()) {
188+
if (remoteSpanContext != null) {
189189
if (parentContext == null) {
190190
parentContext = io.opentelemetry.context.Context.root();
191191
}

sdk/core/azure-core-tracing-opentelemetry/src/test/java/com/azure/core/tracing/opentelemetry/OpenTelemetryHttpPolicyTests.java

Lines changed: 60 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.opentelemetry.api.trace.SpanKind;
2424
import io.opentelemetry.api.trace.StatusCode;
2525
import io.opentelemetry.api.trace.Tracer;
26+
import io.opentelemetry.context.Scope;
2627
import io.opentelemetry.sdk.OpenTelemetrySdk;
2728
import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter;
2829
import io.opentelemetry.sdk.trace.ReadableSpan;
@@ -43,6 +44,7 @@
4344
import reactor.core.publisher.Mono;
4445
import reactor.test.StepVerifier;
4546

47+
import java.time.Duration;
4648
import java.util.ArrayList;
4749
import java.util.Arrays;
4850
import java.util.HashMap;
@@ -62,6 +64,7 @@
6264
/**
6365
* Unit tests for {com.azure.core.implementation.http.policy.InstrumentationPolicy.
6466
*/
67+
@SuppressWarnings("try")
6568
public class OpenTelemetryHttpPolicyTests {
6669

6770
private static final String X_MS_REQUEST_ID_1 = "response id 1";
@@ -85,7 +88,7 @@ public void setUp(TestInfo testInfo) {
8588

8689
@Test
8790
public void openTelemetryHttpPolicyTest() {
88-
// Start user parent span.
91+
// Start parent span.
8992
Span parentSpan = tracer.spanBuilder(SPAN_NAME).startSpan();
9093

9194
// Add parent span to tracingContext
@@ -95,8 +98,10 @@ public void openTelemetryHttpPolicyTest() {
9598
// Act
9699
HttpRequest request = new HttpRequest(HttpMethod.POST, "https://httpbin.org/hello?there#otel");
97100
request.setHeader("User-Agent", "user-agent");
98-
HttpResponse response = createHttpPipeline(azTracer).send(request, tracingContext).block();
99101

102+
try (Scope scope = parentSpan.makeCurrent()) {
103+
createHttpPipeline(azTracer).send(request, tracingContext).block();
104+
}
100105
// Assert
101106
List<SpanData> exportedSpans = exporter.getFinishedSpanItems();
102107
// rest proxy span is not exported as global otel is not configured
@@ -150,10 +155,12 @@ public String getDescription() {
150155

151156
// Act
152157
HttpRequest request = new HttpRequest(HttpMethod.DELETE, "https://httpbin.org/hello?there#otel");
153-
HttpResponse response = createHttpPipeline(new OpenTelemetryTracer("test", null, null,
158+
try (Scope scope = tracer.spanBuilder("test").startSpan().makeCurrent()) {
159+
createHttpPipeline(new OpenTelemetryTracer("test", null, null,
154160
new OpenTelemetryTracingOptions().setProvider(providerWithSampler)))
155-
.send(request).block();
156-
161+
.send(request)
162+
.block();
163+
}
157164
// Assert
158165
List<SpanData> exportedSpans = exporter.getFinishedSpanItems();
159166
assertEquals(0, exportedSpans.size());
@@ -162,33 +169,33 @@ public String getDescription() {
162169

163170
@Test
164171
public void clientRequestIdIsStamped() {
165-
HttpRequest request = new HttpRequest(HttpMethod.PUT, "https://httpbin.org/hello?there#otel");
166-
HttpResponse response = createHttpPipeline(azTracer, new RequestIdPolicy()).send(request).block();
172+
try (Scope scope = tracer.spanBuilder("test").startSpan().makeCurrent()) {
173+
HttpRequest request = new HttpRequest(HttpMethod.PUT, "https://httpbin.org/hello?there#otel");
174+
HttpResponse response = createHttpPipeline(azTracer, new RequestIdPolicy()).send(request).block();
167175

168-
// Assert
169-
List<SpanData> exportedSpans = exporter.getFinishedSpanItems();
170-
assertEquals(1, exportedSpans.size());
176+
// Assert
177+
List<SpanData> exportedSpans = exporter.getFinishedSpanItems();
178+
assertEquals(1, exportedSpans.size());
171179

172-
assertEquals("HTTP PUT", exportedSpans.get(0).getName());
180+
assertEquals("HTTP PUT", exportedSpans.get(0).getName());
173181

174-
Map<String, Object> httpAttributes = getAttributes(exportedSpans.get(0));
175-
assertEquals(5, httpAttributes.size());
182+
Map<String, Object> httpAttributes = getAttributes(exportedSpans.get(0));
183+
assertEquals(5, httpAttributes.size());
176184

177-
assertEquals(response.getRequest().getHeaders().getValue("x-ms-client-request-id"), httpAttributes.get("az.client_request_id"));
178-
assertEquals(X_MS_REQUEST_ID_1, httpAttributes.get("az.service_request_id"));
185+
assertEquals(response.getRequest().getHeaders().getValue("x-ms-client-request-id"), httpAttributes.get("az.client_request_id"));
186+
assertEquals(X_MS_REQUEST_ID_1, httpAttributes.get("az.service_request_id"));
179187

180-
assertEquals("https://httpbin.org/hello?there#otel", httpAttributes.get("http.url"));
181-
assertEquals("PUT", httpAttributes.get("http.method"));
182-
assertEquals(Long.valueOf(RESPONSE_STATUS_CODE), httpAttributes.get("http.status_code"));
188+
assertEquals("https://httpbin.org/hello?there#otel", httpAttributes.get("http.url"));
189+
assertEquals("PUT", httpAttributes.get("http.method"));
190+
assertEquals(Long.valueOf(RESPONSE_STATUS_CODE), httpAttributes.get("http.status_code"));
191+
}
183192
}
184193

185194
@Test
186195
public void everyTryIsTraced() {
187196
AtomicInteger attemptCount = new AtomicInteger();
188197
AtomicReference<String> traceparentTry503 = new AtomicReference<>();
189198
AtomicReference<String> traceparentTry200 = new AtomicReference<>();
190-
AtomicReference<Span> currentSpanTry503 = new AtomicReference<>();
191-
AtomicReference<Span> currentSpanTry200 = new AtomicReference<>();
192199

193200
OpenTelemetryTracingOptions options = new OpenTelemetryTracingOptions().setProvider(tracerProvider);
194201

@@ -205,12 +212,10 @@ public void everyTryIsTraced() {
205212
int count = attemptCount.getAndIncrement();
206213
if (count == 0) {
207214
traceparentTry503.set(request.getHeaders().getValue("traceparent"));
208-
currentSpanTry503.set(Span.current());
209215
headers.set("x-ms-request-id", X_MS_REQUEST_ID_1);
210216
return Mono.just(new MockHttpResponse(request, 503, headers));
211217
} else if (count == 1) {
212218
traceparentTry200.set(request.getHeaders().getValue("traceparent"));
213-
currentSpanTry200.set(Span.current());
214219
headers.set("x-ms-request-id", X_MS_REQUEST_ID_2);
215220
return Mono.just(new MockHttpResponse(request, 200, headers));
216221
} else {
@@ -240,9 +245,6 @@ public void everyTryIsTraced() {
240245
assertEquals(traceparentTry503.get(), String.format("00-%s-%s-01", try503.getTraceId(), try503.getSpanId()));
241246
assertEquals(traceparentTry200.get(), String.format("00-%s-%s-01", try200.getTraceId(), try200.getSpanId()));
242247

243-
assertEquals(currentSpanTry503.get().getSpanContext().getSpanId(), try503.getSpanId());
244-
assertEquals(currentSpanTry200.get().getSpanContext().getSpanId(), try200.getSpanId());
245-
246248
assertEquals("HTTP GET", try503.getName());
247249
Map<String, Object> httpAttributes503 = getAttributes(try503);
248250
assertEquals(5, httpAttributes503.size());
@@ -370,6 +372,36 @@ public void timeoutIsTraced() {
370372
assertEquals(TimeoutException.class.getName(), events.get(0).getAttributes().get(AttributeKey.stringKey("exception.type")));
371373
}
372374

375+
@Test
376+
public void cancelIsTraced() {
377+
OpenTelemetryTracingOptions options = new OpenTelemetryTracingOptions().setProvider(tracerProvider);
378+
379+
com.azure.core.util.tracing.Tracer azTracer = new OpenTelemetryTracer("test", null, null, options);
380+
381+
List<HttpPipelinePolicy> policies = new ArrayList<>(Arrays.asList(new RetryPolicy()));
382+
HttpPolicyProviders.addAfterRetryPolicies(policies);
383+
384+
HttpPipeline pipeline = new HttpPipelineBuilder()
385+
.policies(policies.toArray(new HttpPipelinePolicy[0]))
386+
.httpClient(request ->
387+
Mono.delay(Duration.ofSeconds(10)).map(l -> new MockHttpResponse(request, 200)))
388+
.tracer(azTracer)
389+
.build();
390+
391+
pipeline.send(new HttpRequest(HttpMethod.GET, "http://localhost/hello"), Context.NONE)
392+
.toFuture()
393+
.cancel(true);
394+
395+
List<SpanData> exportedSpans = exporter.getFinishedSpanItems();
396+
assertEquals(1, exportedSpans.size());
397+
398+
SpanData cancelled = exportedSpans.get(0);
399+
Map<String, Object> httpAttributesTimeout = getAttributes(cancelled);
400+
assertNull(httpAttributesTimeout.get("http.status_code"));
401+
assertEquals(StatusCode.ERROR, cancelled.getStatus().getStatusCode());
402+
assertEquals("cancel", cancelled.getStatus().getDescription());
403+
}
404+
373405
private Map<String, Object> getAttributes(SpanData span) {
374406
Map<String, Object> attributes = new HashMap<>();
375407
span.getAttributes().forEach((k, v) -> attributes.put(k.getKey(), v));
@@ -411,13 +443,10 @@ public Mono<HttpResponse> send(HttpRequest request) {
411443
HttpHeaders headers = new HttpHeaders()
412444
.set("x-ms-request-id", X_MS_REQUEST_ID_1);
413445

446+
// parent span
414447
SpanContext currentContext = Span.current().getSpanContext();
415-
String expectedTraceparent = String.format("00-%s-%s-%s", currentContext.getTraceId(), currentContext.getSpanId(), currentContext.getTraceFlags().toString());
416-
if (currentContext.isValid()) {
417-
assertEquals(expectedTraceparent, request.getHeaders().getValue("traceparent"));
418-
} else {
419-
assertNull(request.getHeaders().getValue("traceparent"));
420-
}
448+
assertTrue(currentContext.isValid());
449+
assertEquals(currentContext.getTraceId(), request.getHeaders().getValue("traceparent").substring(3, 35));
421450

422451
return Mono.just(new MockHttpResponse(request, RESPONSE_STATUS_CODE, headers));
423452
}

sdk/core/azure-core-tracing-opentelemetry/src/test/java/com/azure/core/tracing/opentelemetry/OpenTelemetryTracerTest.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1143,9 +1143,7 @@ public void invalidRemoteParent() {
11431143
assertEquals(2, testExporter.getFinishedSpanItems().size());
11441144

11451145
SpanData innerSpan = testExporter.getFinishedSpanItems().get(0);
1146-
SpanData outerSpan = testExporter.getFinishedSpanItems().get(1);
1147-
assertEquals(innerSpan.getSpanContext().getTraceId(), outerSpan.getSpanContext().getTraceId());
1148-
assertEquals(innerSpan.getParentSpanId(), outerSpan.getSpanContext().getSpanId());
1146+
assertFalse(innerSpan.getParentSpanContext().isValid());
11491147
}
11501148

11511149
@Test

sdk/core/azure-core/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
--add-opens com.azure.core/com.azure.core.implementation.logging=ALL-UNNAMED
6969
--add-opens com.azure.core/com.azure.core.models=ALL-UNNAMED
7070
--add-opens com.azure.core/com.azure.core.util=ALL-UNNAMED
71+
--add-opens com.azure.core/com.azure.core.util.tracing=ALL-UNNAMED
7172
--add-opens com.azure.core/com.azure.core.util.logging=ALL-UNNAMED
7273
--add-opens com.azure.core/com.azure.core.util.polling=ALL-UNNAMED
7374
--add-opens com.azure.core/com.azure.core.util.serializer=ALL-UNNAMED

sdk/core/azure-core/src/main/java/com/azure/core/implementation/http/policy/InstrumentationPolicy.java

Lines changed: 13 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,7 @@
1616
import com.azure.core.util.tracing.SpanKind;
1717
import com.azure.core.util.tracing.StartSpanOptions;
1818
import com.azure.core.util.tracing.Tracer;
19-
import reactor.core.CoreSubscriber;
2019
import reactor.core.publisher.Mono;
21-
import reactor.core.publisher.Operators;
22-
import reactor.core.publisher.Signal;
2320

2421
import static com.azure.core.util.tracing.Tracer.DISABLE_TRACING_KEY;
2522

@@ -37,7 +34,6 @@ public class InstrumentationPolicy implements HttpPipelinePolicy {
3734
private static final ClientLogger LOGGER = new ClientLogger(InstrumentationPolicy.class);
3835

3936
private Tracer tracer;
40-
private ScalarPropagatingMono propagatingMono;
4137
private static boolean foundLegacyOTelPolicy;
4238

4339
static {
@@ -53,7 +49,6 @@ public InstrumentationPolicy() {
5349

5450
public void initialize(Tracer tracer) {
5551
this.tracer = tracer;
56-
this.propagatingMono = new ScalarPropagatingMono(tracer);
5752
}
5853

5954
@Override
@@ -65,10 +60,13 @@ public Mono<HttpResponse> process(HttpPipelineCallContext context, HttpPipelineN
6560
// OpenTelemetry reactor instrumentation needs a bit of help
6661
// to pick up Azure SDK context. While we're working on explicit
6762
// context propagation, ScalarPropagatingMono.INSTANCE is the workaround
68-
return propagatingMono
69-
.flatMap(ignored -> next.process())
70-
.doOnEach(this::handleResponse)
71-
.contextWrite(reactor.util.context.Context.of(REACTOR_HTTP_TRACE_CONTEXT_KEY, startSpan(context)));
63+
return Mono.defer(() -> {
64+
Context span = startSpan(context);
65+
return next.process()
66+
.doOnSuccess(response -> endSpan(response, span))
67+
.doOnCancel(() -> tracer.end("cancel", null, span))
68+
.doOnError(exception -> tracer.end(null, exception, span));
69+
});
7270
}
7371

7472
@SuppressWarnings("try")
@@ -79,19 +77,16 @@ public HttpResponse processSync(HttpPipelineCallContext context, HttpPipelineNex
7977
}
8078

8179
Context span = startSpan(context);
82-
Throwable exception = null;
83-
HttpResponse response = null;
8480
try (AutoCloseable scope = tracer.makeSpanCurrent(span)) {
85-
response = next.processSync();
81+
HttpResponse response = next.processSync();
82+
endSpan(response, span);
8683
return response;
8784
} catch (RuntimeException ex) {
88-
exception = ex;
85+
tracer.end(null, ex, span);
8986
throw ex;
9087
} catch (Exception ex) {
91-
exception = ex;
88+
tracer.end(null, ex, span);
9289
throw LOGGER.logExceptionAsWarning(new RuntimeException(ex));
93-
} finally {
94-
endSpan(response, exception, span);
9590
}
9691
}
9792

@@ -123,23 +118,7 @@ private void addPostSamplingAttributes(Context span, HttpRequest request) {
123118
}
124119
}
125120

126-
/**
127-
* Handles retrieving the information from the service response and ending the span.
128-
*
129-
* @param signal Reactive Stream signal fired by Reactor.
130-
*/
131-
private void handleResponse(Signal<? extends HttpResponse> signal) {
132-
// Ignore the on complete and on subscribe events, they don't contain the information needed to end the span.
133-
if (signal.isOnComplete() || signal.isOnSubscribe()) {
134-
return;
135-
}
136-
137-
// Get the context that was added to the mono, this will contain the information needed to end the span.
138-
Context span = signal.getContextView().getOrDefault(REACTOR_HTTP_TRACE_CONTEXT_KEY, null);
139-
endSpan(signal.get(), signal.getThrowable(), span);
140-
}
141-
142-
private void endSpan(HttpResponse response, Throwable error, Context span) {
121+
private void endSpan(HttpResponse response, Context span) {
143122
if (response != null) {
144123
int statusCode = response.getStatusCode();
145124
tracer.setAttribute(HTTP_STATUS_CODE, statusCode, span);
@@ -151,43 +130,11 @@ private void endSpan(HttpResponse response, Throwable error, Context span) {
151130
tracer.end((statusCode >= 400) ? "error" : null, null, span);
152131
}
153132

154-
tracer.end(null, error, span);
133+
tracer.end("", null, span);
155134
}
156135

157136
private boolean isTracingEnabled(HttpPipelineCallContext context) {
158137
return tracer != null && tracer.isEnabled() && !foundLegacyOTelPolicy
159138
&& !((boolean) context.getData(DISABLE_TRACING_KEY).orElse(false));
160139
}
161-
162-
/**
163-
* Helper class allowing to run Mono subscription and any hot path
164-
* in scope of trace context. This enables OpenTelemetry auto-collection
165-
* to pick it up and correlate lower levels of instrumentation and logs
166-
* to logical/HTTP spans.
167-
*
168-
* OpenTelemetry reactor auto-instrumentation will take care of the cold path.
169-
*/
170-
static final class ScalarPropagatingMono extends Mono<Object> {
171-
private final Object value = new Object();
172-
private final Tracer tracer;
173-
174-
private ScalarPropagatingMono(Tracer tracer) {
175-
this.tracer = tracer;
176-
}
177-
178-
@Override
179-
@SuppressWarnings("try")
180-
public void subscribe(CoreSubscriber<? super Object> actual) {
181-
Context traceContext = actual.currentContext().getOrDefault(REACTOR_HTTP_TRACE_CONTEXT_KEY, null);
182-
if (tracer.isEnabled() && traceContext != null) {
183-
try (AutoCloseable scope = tracer.makeSpanCurrent(traceContext)) {
184-
actual.onSubscribe(Operators.scalarSubscription(actual, value));
185-
} catch (Exception e) {
186-
LOGGER.verbose("Error closing scope", e);
187-
}
188-
} else {
189-
actual.onSubscribe(Operators.scalarSubscription(actual, value));
190-
}
191-
}
192-
}
193140
}

0 commit comments

Comments
 (0)