Skip to content

Commit d28b4ae

Browse files
Fixing it so we check to see if a created span is actually being recorded before we add it to the links for the overall span. If not, then we don't bother since it'll be the empty/null span and won't actually record anything of interest. (Azure#14040)
When we were instrumenting spans we wouldn't pay attention to whether we were currently recording or not, which could lead to us creating a bunch of empty/null spans and trying to add them as links to the span for our batches. Rather than do that I just check (when we instrument) if the span we created is recording and if not don't add it. Fixes Azure#13049
1 parent 849472a commit d28b4ae

File tree

4 files changed

+188
-89
lines changed

4 files changed

+188
-89
lines changed

sdk/servicebus/service-bus/src/diagnostics/instrumentServiceBusMessage.ts

Lines changed: 1 addition & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright (c) Microsoft Corporation.
22
// Licensed under the MIT license.
33

4-
import { extractSpanContextFromTraceParentHeader, getTraceParentHeader } from "@azure/core-tracing";
4+
import { extractSpanContextFromTraceParentHeader } from "@azure/core-tracing";
55
import { CanonicalCode, Link, Span, SpanContext, SpanKind } from "@opentelemetry/api";
66
import { ConnectionContext } from "../connectionContext";
77
import { OperationOptionsBase } from "../modelsToBeSharedWithEventHubs";
@@ -14,34 +14,6 @@ import { createServiceBusSpan } from "./tracing";
1414
*/
1515
export const TRACEPARENT_PROPERTY = "Diagnostic-Id";
1616

17-
/**
18-
* Populates the `ServiceBusMessage` with `SpanContext` info to support trace propagation.
19-
* Creates and returns a copy of the passed in `ServiceBusMessage` unless the `ServiceBusMessage`
20-
* has already been instrumented.
21-
* @param message - The `ServiceBusMessage` to instrument.
22-
* @param span - The `Span` containing the context to propagate tracing information.
23-
* @hidden
24-
* @internal
25-
*/
26-
export function instrumentServiceBusMessage(
27-
message: ServiceBusMessage,
28-
span: Span
29-
): ServiceBusMessage {
30-
if (message.applicationProperties && message.applicationProperties[TRACEPARENT_PROPERTY]) {
31-
return message;
32-
}
33-
34-
// create a copy so the original isn't modified
35-
message = { ...message, applicationProperties: { ...message.applicationProperties } };
36-
37-
const traceParent = getTraceParentHeader(span.context());
38-
if (traceParent) {
39-
message.applicationProperties![TRACEPARENT_PROPERTY] = traceParent;
40-
}
41-
42-
return message;
43-
}
44-
4517
/**
4618
* Extracts the `SpanContext` from an `ServiceBusMessage` if the context exists.
4719
* @param message - An individual `ServiceBusMessage` object.

sdk/servicebus/service-bus/src/diagnostics/tracing.ts

Lines changed: 106 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,14 @@
22
// Licensed under the MIT license.
33

44
import { OperationOptions, RestError } from "@azure/core-http";
5-
import { CanonicalCode, Span, SpanKind } from "@opentelemetry/api";
6-
import { createSpanFunction, SpanOptions } from "@azure/core-tracing";
5+
import { CanonicalCode, Span, SpanContext, SpanKind } from "@opentelemetry/api";
6+
import {
7+
createSpanFunction,
8+
extractSpanContextFromTraceParentHeader,
9+
getTraceParentHeader,
10+
SpanOptions
11+
} from "@azure/core-tracing";
12+
import { ServiceBusMessage } from "../serviceBusMessage";
713

814
/**
915
* Creates a span using the global tracer.
@@ -75,3 +81,101 @@ export function createServiceBusSpan(
7581
updatedOptions
7682
};
7783
}
84+
85+
/**
86+
* @internal
87+
*/
88+
export const TRACEPARENT_PROPERTY = "Diagnostic-Id";
89+
90+
/**
91+
* @hidden
92+
*/
93+
export interface InstrumentableMessage {
94+
/**
95+
* The application specific properties which can be
96+
* used for custom message metadata.
97+
*/
98+
applicationProperties?: { [key: string]: number | boolean | string | Date };
99+
}
100+
101+
/**
102+
* Instruments an AMQP message with a proper `Diagnostic-Id` for tracing.
103+
*
104+
* @hidden
105+
*/
106+
export function instrumentMessage<T extends InstrumentableMessage>(
107+
message: T,
108+
options: OperationOptions,
109+
entityPath: string,
110+
host: string
111+
): {
112+
/**
113+
* If instrumentation was done, a copy of the message with
114+
* message.applicationProperties['Diagnostic-Id'] filled
115+
* out appropriately.
116+
*/
117+
message: T;
118+
119+
/**
120+
* A valid SpanContext if this message should be linked to a parent span, or undefined otherwise.
121+
*/
122+
spanContext: SpanContext | undefined;
123+
} {
124+
const previouslyInstrumented = Boolean(
125+
message.applicationProperties && message.applicationProperties[TRACEPARENT_PROPERTY]
126+
);
127+
128+
if (previouslyInstrumented) {
129+
return {
130+
message,
131+
spanContext: undefined
132+
};
133+
}
134+
135+
const { span: messageSpan } = createMessageSpan(options, entityPath, host);
136+
137+
try {
138+
if (!messageSpan.isRecording()) {
139+
return {
140+
message,
141+
spanContext: undefined
142+
};
143+
}
144+
145+
const traceParent = getTraceParentHeader(messageSpan.context());
146+
147+
if (traceParent) {
148+
// create a copy so the original isn't modified
149+
message = {
150+
...message,
151+
applicationProperties: {
152+
...message.applicationProperties,
153+
[TRACEPARENT_PROPERTY]: traceParent
154+
}
155+
};
156+
}
157+
158+
return {
159+
message,
160+
spanContext: messageSpan.context()
161+
};
162+
} finally {
163+
messageSpan.end();
164+
}
165+
}
166+
167+
/**
168+
* Extracts the `SpanContext` from an `ServiceBusMessage` if the context exists.
169+
* @param message - An individual `ServiceBusMessage` object.
170+
* @internal
171+
*/
172+
export function extractSpanContextFromServiceBusMessage(
173+
message: ServiceBusMessage
174+
): SpanContext | undefined {
175+
if (!message.applicationProperties || !message.applicationProperties[TRACEPARENT_PROPERTY]) {
176+
return;
177+
}
178+
179+
const diagnosticId = message.applicationProperties[TRACEPARENT_PROPERTY] as string;
180+
return extractSpanContextFromTraceParentHeader(diagnosticId);
181+
}

sdk/servicebus/service-bus/src/serviceBusMessageBatch.ts

Lines changed: 10 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,7 @@ import {
1515
Message as RheaMessage
1616
} from "rhea-promise";
1717
import { SpanContext } from "@opentelemetry/api";
18-
import {
19-
instrumentServiceBusMessage,
20-
TRACEPARENT_PROPERTY
21-
} from "./diagnostics/instrumentServiceBusMessage";
22-
import { createMessageSpan } from "./diagnostics/tracing";
18+
import { instrumentMessage } from "./diagnostics/tracing";
2319
import { TryAddOptions } from "./modelsToBeSharedWithEventHubs";
2420
import { defaultDataTransformer } from "./dataTransformer";
2521

@@ -230,32 +226,22 @@ export class ServiceBusMessageBatchImpl implements ServiceBusMessageBatch {
230226
* **NOTE**: Always remember to check the return value of this method, before calling it again
231227
* for the next message.
232228
*
233-
* @param message - An individual service bus message.
229+
* @param originalMessage - An individual service bus message.
234230
* @returns A boolean value indicating if the message has been added to the batch or not.
235231
*/
236-
public tryAddMessage(message: ServiceBusMessage, options: TryAddOptions = {}): boolean {
237-
throwTypeErrorIfParameterMissing(this._context.connectionId, "message", message);
232+
public tryAddMessage(originalMessage: ServiceBusMessage, options: TryAddOptions = {}): boolean {
233+
throwTypeErrorIfParameterMissing(this._context.connectionId, "message", originalMessage);
238234
throwIfNotValidServiceBusMessage(
239-
message,
235+
originalMessage,
240236
"Provided value for 'message' must be of type ServiceBusMessage."
241237
);
242238

243-
// check if the event has already been instrumented
244-
const previouslyInstrumented = Boolean(
245-
message.applicationProperties && message.applicationProperties[TRACEPARENT_PROPERTY]
239+
const { message, spanContext } = instrumentMessage(
240+
originalMessage,
241+
options,
242+
this._context.config.entityPath!,
243+
this._context.config.host
246244
);
247-
let spanContext: SpanContext | undefined;
248-
if (!previouslyInstrumented) {
249-
const { span: messageSpan } = createMessageSpan(
250-
options,
251-
this._context.config.entityPath!,
252-
this._context.config.host
253-
);
254-
255-
message = instrumentServiceBusMessage(message, messageSpan);
256-
spanContext = messageSpan.context();
257-
messageSpan.end();
258-
}
259245

260246
// Convert ServiceBusMessage to AmqpMessage.
261247
const amqpMessage = toRheaMessage(message);

sdk/servicebus/service-bus/test/internal/unit/tracing.spec.ts

Lines changed: 71 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,10 @@
44
import chai from "chai";
55
import { getTracer, NoOpSpan, TestSpan, TestTracer } from "@azure/core-tracing";
66
import { CanonicalCode, SpanOptions } from "@opentelemetry/api";
7-
import {
8-
ServiceBusMessage,
9-
ServiceBusMessageImpl,
10-
ServiceBusReceivedMessage
11-
} from "../../../src/serviceBusMessage";
7+
import { ServiceBusMessageImpl, ServiceBusReceivedMessage } from "../../../src/serviceBusMessage";
128
import {
139
createAndEndProcessingSpan,
1410
createProcessingSpan,
15-
instrumentServiceBusMessage,
1611
TRACEPARENT_PROPERTY
1712
} from "../../../src/diagnostics/instrumentServiceBusMessage";
1813
import { OperationOptionsBase, trace } from "../../../src/modelsToBeSharedWithEventHubs";
@@ -26,6 +21,7 @@ import { ServiceBusReceiverImpl } from "../../../src/receivers/receiver";
2621
import { OnMessage } from "../../../src/core/messageReceiver";
2722
import { ServiceBusSessionReceiverImpl } from "../../../src/receivers/sessionReceiver";
2823
import { MessageSession } from "../../../src/session/messageSession";
24+
import { instrumentMessage } from "../../../src/diagnostics/tracing";
2925
const should = chai.should();
3026
const assert = chai.assert;
3127

@@ -291,7 +287,7 @@ describe("Tracing tests", () => {
291287
}
292288

293289
describe("telemetry", () => {
294-
const otherProperties = {
290+
const receiverProperties = {
295291
entityPath: "entityPath"
296292
};
297293

@@ -302,7 +298,7 @@ describe("Tracing tests", () => {
302298
it("basic span properties are set", async () => {
303299
const fakeParentSpanContext = new NoOpSpan().context();
304300

305-
createProcessingSpan([], otherProperties, connectionConfig, {
301+
createProcessingSpan([], receiverProperties, connectionConfig, {
306302
tracingOptions: {
307303
spanOptions: {
308304
parent: fakeParentSpanContext
@@ -320,46 +316,79 @@ describe("Tracing tests", () => {
320316

321317
attributes!.should.deep.equal({
322318
"az.namespace": "Microsoft.ServiceBus",
323-
"message_bus.destination": otherProperties.entityPath,
319+
"message_bus.destination": receiverProperties.entityPath,
324320
"peer.address": connectionConfig.host
325321
});
326322
});
327323

328-
it("received events are linked to this span using Diagnostic-Id", async () => {
324+
it("already instrumented messages are skipped", () => {
325+
const alreadyInstrumentedMessage = {
326+
body: "hello",
327+
enqueuedTimeUtc: new Date(),
328+
applicationProperties: {
329+
"Diagnostic-Id": "alreadyhasdiagnosticsid"
330+
}
331+
};
332+
333+
const { message, spanContext } = instrumentMessage(alreadyInstrumentedMessage, {}, "", "");
334+
335+
assert.equal(
336+
message,
337+
alreadyInstrumentedMessage,
338+
"Messages that are already instrumented do not get copied"
339+
);
340+
assert.isUndefined(
341+
spanContext,
342+
"Messages that are already instrumented do not get a new Span (or SpanContext)"
343+
);
344+
assert.isUndefined(
345+
tracer.spanOptions,
346+
"No spans should be created for already instrumented messages"
347+
);
348+
});
349+
350+
it("Round trip test - instrument message as it would be for sending and then receive and process them", async () => {
351+
// the sender side...
329352
const requiredMessageProperties = {
330353
body: "hello",
331-
enqueuedTimeUtc: new Date()
354+
enqueuedTimeUtc: new Date(),
355+
applicationProperties: undefined
332356
};
333357

334-
const firstEvent = tracer.startSpan("a");
335-
const thirdEvent = tracer.startSpan("c");
358+
const originalMessage = { ...requiredMessageProperties };
359+
360+
const { message, spanContext } = instrumentMessage(originalMessage, {}, "", "");
361+
assert.ok(
362+
spanContext,
363+
"A span context should be created when we instrument a message for the first time"
364+
);
365+
assert.notEqual(message, originalMessage, "Instrumenting a message should copy it");
366+
367+
assert.ok(tracer.spanOptions, "A span should be created when we instrumented the messsage");
368+
const spanContextFromSender = tracer.span?.context();
369+
assert.ok(spanContextFromSender);
370+
371+
tracer.clearTracingData();
336372

337-
const receivedMessages: ServiceBusMessage[] = [
338-
instrumentServiceBusMessage({ ...requiredMessageProperties }, firstEvent),
339-
{ applicationProperties: {}, ...requiredMessageProperties }, // no diagnostic ID means it gets skipped
340-
instrumentServiceBusMessage({ ...requiredMessageProperties }, thirdEvent)
341-
];
373+
// the receiver side...
342374

343-
createProcessingSpan(
344-
(receivedMessages as any) as ServiceBusReceivedMessage[],
345-
otherProperties,
375+
// messages have been received. Now we'll create a span and link the received spans to it.
376+
const processingSpan = createProcessingSpan(
377+
([message] as any) as ServiceBusReceivedMessage[],
378+
receiverProperties,
346379
connectionConfig,
347380
{}
348381
);
349382

350-
// middle event, since it has no trace information, doesn't get included
351-
// in the telemetry
352-
tracer.spanOptions!.links!.length.should.equal(3 - 1);
353-
// the test tracer just hands out a string integer that just gets
354-
// incremented
355-
tracer.spanOptions!.links![0]!.context.traceId.should.equal(firstEvent.context().traceId);
383+
assert.ok(processingSpan);
384+
385+
const processingSpanOptions = tracer.spanOptions;
386+
assert.ok(processingSpanOptions);
387+
388+
tracer.spanOptions!.links![0]!.context.traceId.should.equal(spanContextFromSender?.traceId);
356389
(tracer.spanOptions!.links![0]!.attributes!.enqueuedTime as number).should.equal(
357390
requiredMessageProperties.enqueuedTimeUtc.getTime()
358391
);
359-
tracer.spanOptions!.links![1]!.context.traceId.should.equal(thirdEvent.context().traceId);
360-
(tracer.spanOptions!.links![1]!.attributes!.enqueuedTime as number).should.equal(
361-
requiredMessageProperties.enqueuedTimeUtc.getTime()
362-
);
363392
});
364393

365394
it("trace - normal", async () => {
@@ -388,12 +417,20 @@ describe("Tracing tests", () => {
388417
});
389418

390419
class TestTracer2 extends TestTracer {
391-
public spanOptions: SpanOptions | undefined;
392-
public spanName: string | undefined;
420+
spanName: string | undefined;
421+
spanOptions: SpanOptions | undefined;
422+
span: TestSpan | undefined;
423+
424+
clearTracingData() {
425+
this.spanName = undefined;
426+
this.spanOptions = undefined;
427+
this.span = undefined;
428+
}
393429

394430
startSpan(nameArg: string, optionsArg?: SpanOptions): TestSpan {
395431
this.spanName = nameArg;
396432
this.spanOptions = optionsArg;
397-
return super.startSpan(nameArg, optionsArg);
433+
this.span = super.startSpan(nameArg, optionsArg);
434+
return this.span;
398435
}
399436
}

0 commit comments

Comments
 (0)