22// Licensed under the MIT license.
33
44import { diag } from "@opentelemetry/api" ;
5- import { ExportResult , ExportResultCode } from "@opentelemetry/core" ;
6- import { RestError } from "@azure/core-rest-pipeline" ;
75import { ConnectionStringParser } from "../utils/connectionStringParser" ;
8- import { HttpSender , FileSystemPersist } from "../platform" ;
96import { AzureMonitorExporterOptions } from "../config" ;
10- import { PersistentStorage , Sender } from "../types" ;
11- import { isRetriable , BreezeResponse } from "../utils/breezeUtils" ;
127import {
138 DEFAULT_BREEZE_ENDPOINT ,
149 ENV_CONNECTION_STRING ,
1510 ENV_DISABLE_STATSBEAT ,
1611} from "../Declarations/Constants" ;
17- import { TelemetryItem as Envelope } from "../generated" ;
18- import { NetworkStatsbeatMetrics } from "./statsbeat/networkStatsbeatMetrics" ;
19- import { MAX_STATSBEAT_FAILURES } from "./statsbeat/types" ;
20- import { getInstance } from "./statsbeat/longIntervalStatsbeatMetrics" ;
2112
22- const DEFAULT_BATCH_SEND_RETRY_INTERVAL_MS = 60_000 ;
2313/**
2414 * Azure Monitor OpenTelemetry Trace Exporter.
2515 */
2616export abstract class AzureMonitorBaseExporter {
2717 /**
2818 * Instrumentation key to be used for exported envelopes
2919 */
30- protected _instrumentationKey : string = "" ;
31- private _endpointUrl : string = "" ;
32- private readonly _persister : PersistentStorage ;
33- private readonly _sender : Sender ;
34- private _numConsecutiveRedirects : number ;
35- private _retryTimer : NodeJS . Timer | null ;
36- private _networkStatsbeatMetrics : NetworkStatsbeatMetrics | undefined ;
37- private _longIntervalStatsbeatMetrics ;
20+ protected instrumentationKey : string = "" ;
21+ /**
22+ * Ingestion Endpoint URL
23+ */
24+ protected endpointUrl : string = "" ;
25+ /**
26+ *Flag to determine if exporter will generate Statsbeat data
27+ */
28+ protected trackStatsbeat : boolean = false ;
3829 private _isStatsbeatExporter : boolean ;
39- private _statsbeatFailureCount : number = 0 ;
40- private _batchSendRetryIntervalMs : number = DEFAULT_BATCH_SEND_RETRY_INTERVAL_MS ;
30+
4131 /**
4232 * Exporter internal configuration
4333 */
@@ -49,216 +39,27 @@ export abstract class AzureMonitorBaseExporter {
4939 */
5040 constructor ( options : AzureMonitorExporterOptions = { } , isStatsbeatExporter ?: boolean ) {
5141 this . _options = options ;
52- this . _numConsecutiveRedirects = 0 ;
53- this . _instrumentationKey = "" ;
54- this . _endpointUrl = DEFAULT_BREEZE_ENDPOINT ;
42+ this . instrumentationKey = "" ;
43+ this . endpointUrl = DEFAULT_BREEZE_ENDPOINT ;
5544 const connectionString = this . _options . connectionString || process . env [ ENV_CONNECTION_STRING ] ;
5645 this . _isStatsbeatExporter = isStatsbeatExporter ? isStatsbeatExporter : false ;
5746
5847 if ( connectionString ) {
5948 const parsedConnectionString = ConnectionStringParser . parse ( connectionString ) ;
60- this . _instrumentationKey =
61- parsedConnectionString . instrumentationkey || this . _instrumentationKey ;
62- this . _endpointUrl = parsedConnectionString . ingestionendpoint ?. trim ( ) || this . _endpointUrl ;
49+ this . instrumentationKey =
50+ parsedConnectionString . instrumentationkey || this . instrumentationKey ;
51+ this . endpointUrl = parsedConnectionString . ingestionendpoint ?. trim ( ) || this . endpointUrl ;
6352 }
6453
6554 // Instrumentation key is required
66- if ( ! this . _instrumentationKey ) {
55+ if ( ! this . instrumentationKey ) {
6756 const message =
6857 "No instrumentation key or connection string was provided to the Azure Monitor Exporter" ;
6958 diag . error ( message ) ;
7059 throw new Error ( message ) ;
7160 }
72- this . _sender = new HttpSender ( this . _endpointUrl , this . _options ) ;
73- this . _persister = new FileSystemPersist ( this . _instrumentationKey , this . _options ) ;
61+ this . trackStatsbeat = ! this . _isStatsbeatExporter && ! process . env [ ENV_DISABLE_STATSBEAT ] ;
7462
75- if ( ! this . _isStatsbeatExporter && ! process . env [ ENV_DISABLE_STATSBEAT ] ) {
76- // Initialize statsbeatMetrics
77- this . _networkStatsbeatMetrics = new NetworkStatsbeatMetrics ( {
78- instrumentationKey : this . _instrumentationKey ,
79- endpointUrl : this . _endpointUrl ,
80- } ) ;
81- this . _longIntervalStatsbeatMetrics = getInstance ( {
82- instrumentationKey : this . _instrumentationKey ,
83- endpointUrl : this . _endpointUrl ,
84- } ) ;
85- }
86- this . _retryTimer = null ;
8763 diag . debug ( "AzureMonitorExporter was successfully setup" ) ;
8864 }
89-
90- /**
91- * Persist envelopes to disk
92- */
93- private async _persist ( envelopes : unknown [ ] ) : Promise < ExportResult > {
94- try {
95- const success = await this . _persister . push ( envelopes ) ;
96- return success
97- ? { code : ExportResultCode . SUCCESS }
98- : {
99- code : ExportResultCode . FAILED ,
100- error : new Error ( "Failed to persist envelope in disk." ) ,
101- } ;
102- } catch ( ex : any ) {
103- return { code : ExportResultCode . FAILED , error : ex } ;
104- }
105- }
106-
107- /**
108- * Shutdown exporter
109- */
110- protected async _shutdown ( ) : Promise < void > {
111- return this . _sender . shutdown ( ) ;
112- }
113-
114- /**
115- * Export envelopes
116- */
117- protected async _exportEnvelopes ( envelopes : Envelope [ ] ) : Promise < ExportResult > {
118- diag . info ( `Exporting ${ envelopes . length } envelope(s)` ) ;
119-
120- if ( envelopes . length < 1 ) {
121- return { code : ExportResultCode . SUCCESS } ;
122- }
123-
124- try {
125- const startTime = new Date ( ) . getTime ( ) ;
126- const { result, statusCode } = await this . _sender . send ( envelopes ) ;
127- const endTime = new Date ( ) . getTime ( ) ;
128- const duration = endTime - startTime ;
129- this . _numConsecutiveRedirects = 0 ;
130-
131- if ( statusCode === 200 ) {
132- // Success -- @todo: start retry timer
133- if ( ! this . _retryTimer ) {
134- this . _retryTimer = setTimeout ( ( ) => {
135- this . _retryTimer = null ;
136- this . _sendFirstPersistedFile ( ) ;
137- } , this . _batchSendRetryIntervalMs ) ;
138- this . _retryTimer . unref ( ) ;
139- }
140- // If we are not exportings statsbeat and statsbeat is not disabled -- count success
141- this . _networkStatsbeatMetrics ?. countSuccess ( duration ) ;
142- return { code : ExportResultCode . SUCCESS } ;
143- } else if ( statusCode && isRetriable ( statusCode ) ) {
144- // Failed -- persist failed data
145- if ( statusCode === 429 || statusCode === 439 ) {
146- this . _networkStatsbeatMetrics ?. countThrottle ( statusCode ) ;
147- }
148- if ( result ) {
149- diag . info ( result ) ;
150- const breezeResponse = JSON . parse ( result ) as BreezeResponse ;
151- const filteredEnvelopes : Envelope [ ] = [ ] ;
152- if ( breezeResponse . errors ) {
153- breezeResponse . errors . forEach ( ( error ) => {
154- if ( error . statusCode && isRetriable ( error . statusCode ) ) {
155- filteredEnvelopes . push ( envelopes [ error . index ] ) ;
156- }
157- } ) ;
158- }
159- if ( filteredEnvelopes . length > 0 ) {
160- this . _networkStatsbeatMetrics ?. countRetry ( statusCode ) ;
161- // calls resultCallback(ExportResult) based on result of persister.push
162- return await this . _persist ( filteredEnvelopes ) ;
163- }
164- // Failed -- not retriable
165- this . _networkStatsbeatMetrics ?. countFailure ( duration , statusCode ) ;
166- return {
167- code : ExportResultCode . FAILED ,
168- } ;
169- } else {
170- // calls resultCallback(ExportResult) based on result of persister.push
171- this . _networkStatsbeatMetrics ?. countRetry ( statusCode ) ;
172- return await this . _persist ( envelopes ) ;
173- }
174- } else {
175- // Failed -- not retriable
176- if ( this . _networkStatsbeatMetrics ) {
177- if ( statusCode ) {
178- this . _networkStatsbeatMetrics . countFailure ( duration , statusCode ) ;
179- }
180- } else {
181- this . _incrementStatsbeatFailure ( ) ;
182- }
183- return {
184- code : ExportResultCode . FAILED ,
185- } ;
186- }
187- } catch ( error : any ) {
188- const restError = error as RestError ;
189- if (
190- restError . statusCode &&
191- ( restError . statusCode === 307 || // Temporary redirect
192- restError . statusCode === 308 )
193- ) {
194- // Permanent redirect
195- this . _numConsecutiveRedirects ++ ;
196- // To prevent circular redirects
197- if ( this . _numConsecutiveRedirects < 10 ) {
198- if ( restError . response && restError . response . headers ) {
199- const location = restError . response . headers . get ( "location" ) ;
200- if ( location ) {
201- // Update sender URL
202- this . _sender . handlePermanentRedirect ( location ) ;
203- // Send to redirect endpoint as HTTPs library doesn't handle redirect automatically
204- return this . _exportEnvelopes ( envelopes ) ;
205- }
206- }
207- } else {
208- let redirectError = new Error ( "Circular redirect" ) ;
209- this . _networkStatsbeatMetrics ?. countException ( redirectError ) ;
210- return { code : ExportResultCode . FAILED , error : redirectError } ;
211- }
212- } else if ( restError . statusCode && isRetriable ( restError . statusCode ) ) {
213- this . _networkStatsbeatMetrics ?. countRetry ( restError . statusCode ) ;
214- return await this . _persist ( envelopes ) ;
215- }
216- if ( this . _isNetworkError ( restError ) ) {
217- if ( restError . statusCode ) {
218- this . _networkStatsbeatMetrics ?. countRetry ( restError . statusCode ) ;
219- }
220- diag . error (
221- "Retrying due to transient client side error. Error message:" ,
222- restError . message
223- ) ;
224- return await this . _persist ( envelopes ) ;
225- }
226- this . _networkStatsbeatMetrics ?. countException ( restError ) ;
227- diag . error (
228- "Envelopes could not be exported and are not retriable. Error message:" ,
229- restError . message
230- ) ;
231- return { code : ExportResultCode . FAILED , error : restError } ;
232- }
233- }
234-
235- // Disable collection of statsbeat metrics after max failures
236- private _incrementStatsbeatFailure ( ) {
237- this . _statsbeatFailureCount ++ ;
238- if ( this . _statsbeatFailureCount > MAX_STATSBEAT_FAILURES ) {
239- this . _isStatsbeatExporter = false ;
240- this . _networkStatsbeatMetrics ?. shutdown ( ) ;
241- this . _longIntervalStatsbeatMetrics ?. shutdown ( ) ;
242- this . _networkStatsbeatMetrics = undefined ;
243- this . _statsbeatFailureCount = 0 ;
244- }
245- }
246-
247- private async _sendFirstPersistedFile ( ) : Promise < void > {
248- try {
249- const envelopes = ( await this . _persister . shift ( ) ) as Envelope [ ] | null ;
250- if ( envelopes ) {
251- await this . _sender . send ( envelopes ) ;
252- }
253- } catch ( err : any ) {
254- diag . warn ( `Failed to fetch persisted file` , err ) ;
255- }
256- }
257-
258- private _isNetworkError ( error : RestError ) : boolean {
259- if ( error && error . code && error . code === "REQUEST_SEND_ERROR" ) {
260- return true ;
261- }
262- return false ;
263- }
26465}
0 commit comments