Skip to content

Commit 8854529

Browse files
Refactor transmitter (Azure#27061)
* draft * remove transmitfromstorage
1 parent b664682 commit 8854529

File tree

4 files changed

+68
-69
lines changed

4 files changed

+68
-69
lines changed

sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/AzureMonitorTransmitter.cs

Lines changed: 59 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
using Azure.Monitor.OpenTelemetry.Exporter.ConnectionString;
1313
using Azure.Monitor.OpenTelemetry.Exporter.Models;
14+
using OpenTelemetry;
1415
using OpenTelemetry.Contrib.Extensions.PersistentStorage;
1516

1617
namespace Azure.Monitor.OpenTelemetry.Exporter
@@ -43,11 +44,9 @@ public AzureMonitorTransmitter(AzureMonitorExporterOptions options)
4344
_applicationInsightsRestClient = new ApplicationInsightsRestClient(new ClientDiagnostics(options), HttpPipelineBuilder.Build(options), host: ingestionEndpoint);
4445
}
4546

46-
public async ValueTask<int> TrackAsync(IEnumerable<TelemetryItem> telemetryItems, bool async, CancellationToken cancellationToken)
47+
public async ValueTask<ExportResult> TrackAsync(IEnumerable<TelemetryItem> telemetryItems, bool async, CancellationToken cancellationToken)
4748
{
48-
// TODO
49-
// Change return type of this function to ExportResult
50-
int result = 0;
49+
ExportResult result = ExportResult.Failure;
5150
if (cancellationToken.IsCancellationRequested)
5251
{
5352
return result;
@@ -56,20 +55,14 @@ public async ValueTask<int> TrackAsync(IEnumerable<TelemetryItem> telemetryItems
5655
try
5756
{
5857
using var httpMessage = async ?
59-
await this._applicationInsightsRestClient.InternalTrackAsync(telemetryItems, cancellationToken).ConfigureAwait(false) :
60-
this._applicationInsightsRestClient.InternalTrackAsync(telemetryItems, cancellationToken).Result;
58+
await _applicationInsightsRestClient.InternalTrackAsync(telemetryItems, cancellationToken).ConfigureAwait(false) :
59+
_applicationInsightsRestClient.InternalTrackAsync(telemetryItems, cancellationToken).Result;
6160

62-
if (httpMessage != null)
61+
result = IsSuccess(httpMessage);
62+
63+
if (result == ExportResult.Failure && _storage != null)
6364
{
64-
if (_storage == null && httpMessage.HasResponse && httpMessage.Response.Status == ResponseStatusCodes.Success)
65-
{
66-
result = 1;
67-
}
68-
else
69-
{
70-
HandleFailures(httpMessage);
71-
result = 1;
72-
}
65+
result = HandleFailures(httpMessage);
7366
}
7467
}
7568
catch (Exception ex)
@@ -80,61 +73,67 @@ await this._applicationInsightsRestClient.InternalTrackAsync(telemetryItems, can
8073
return result;
8174
}
8275

83-
private void HandleFailures(HttpMessage httpMessage)
76+
private static ExportResult IsSuccess(HttpMessage httpMessage)
8477
{
85-
if (httpMessage.HasResponse)
86-
{
87-
HandleFailureResponseCodes(httpMessage);
88-
}
89-
else
78+
if (httpMessage.HasResponse && httpMessage.Response.Status == ResponseStatusCodes.Success)
9079
{
91-
// HttpRequestException
92-
var content = HttpPipelineHelper.GetRequestContent(httpMessage.Request.Content);
93-
_storage.SaveTelemetry(content, HttpPipelineHelper.MinimumRetryInterval);
80+
return ExportResult.Success;
9481
}
82+
83+
return ExportResult.Failure;
9584
}
9685

97-
private void HandleFailureResponseCodes(HttpMessage httpMessage)
86+
private ExportResult HandleFailures(HttpMessage httpMessage)
9887
{
88+
ExportResult result = ExportResult.Failure;
9989
byte[] content;
10090
int retryInterval;
101-
switch (httpMessage.Response.Status)
91+
92+
if (!httpMessage.HasResponse)
10293
{
103-
case ResponseStatusCodes.Success:
104-
// log successful message
105-
break;
106-
case ResponseStatusCodes.PartialSuccess:
107-
// Parse retry-after header
108-
// Send Failed Messages To Storage
109-
TrackResponse trackResponse = HttpPipelineHelper.GetTrackResponse(httpMessage);
110-
content = HttpPipelineHelper.GetPartialContentForRetry(trackResponse, httpMessage.Request.Content);
111-
if (content != null)
112-
{
94+
// HttpRequestException
95+
content = HttpPipelineHelper.GetRequestContent(httpMessage.Request.Content);
96+
result = _storage.SaveTelemetry(content, HttpPipelineHelper.MinimumRetryInterval);
97+
}
98+
else
99+
{
100+
switch (httpMessage.Response.Status)
101+
{
102+
case ResponseStatusCodes.PartialSuccess:
103+
// Parse retry-after header
104+
// Send Failed Messages To Storage
105+
TrackResponse trackResponse = HttpPipelineHelper.GetTrackResponse(httpMessage);
106+
content = HttpPipelineHelper.GetPartialContentForRetry(trackResponse, httpMessage.Request.Content);
107+
if (content != null)
108+
{
109+
retryInterval = HttpPipelineHelper.GetRetryInterval(httpMessage.Response);
110+
result = _storage.SaveTelemetry(content, retryInterval);
111+
}
112+
break;
113+
case ResponseStatusCodes.RequestTimeout:
114+
case ResponseStatusCodes.ResponseCodeTooManyRequests:
115+
case ResponseStatusCodes.ResponseCodeTooManyRequestsAndRefreshCache:
116+
// Parse retry-after header
117+
// Send Messages To Storage
118+
content = HttpPipelineHelper.GetRequestContent(httpMessage.Request.Content);
113119
retryInterval = HttpPipelineHelper.GetRetryInterval(httpMessage.Response);
114-
_storage.SaveTelemetry(content, retryInterval);
115-
}
116-
break;
117-
case ResponseStatusCodes.RequestTimeout:
118-
case ResponseStatusCodes.ResponseCodeTooManyRequests:
119-
case ResponseStatusCodes.ResponseCodeTooManyRequestsAndRefreshCache:
120-
// Parse retry-after header
121-
// Send Messages To Storage
122-
content = HttpPipelineHelper.GetRequestContent(httpMessage.Request.Content);
123-
retryInterval = HttpPipelineHelper.GetRetryInterval(httpMessage.Response);
124-
_storage.SaveTelemetry(content, retryInterval);
125-
break;
126-
case ResponseStatusCodes.InternalServerError:
127-
case ResponseStatusCodes.BadGateway:
128-
case ResponseStatusCodes.ServiceUnavailable:
129-
case ResponseStatusCodes.GatewayTimeout:
130-
// Send Messages To Storage
131-
content = HttpPipelineHelper.GetRequestContent(httpMessage.Request.Content);
132-
_storage.SaveTelemetry(content, HttpPipelineHelper.MinimumRetryInterval);
133-
break;
134-
default:
135-
// Log Non-Retriable Status and don't retry or store;
136-
break;
120+
result = _storage.SaveTelemetry(content, retryInterval);
121+
break;
122+
case ResponseStatusCodes.InternalServerError:
123+
case ResponseStatusCodes.BadGateway:
124+
case ResponseStatusCodes.ServiceUnavailable:
125+
case ResponseStatusCodes.GatewayTimeout:
126+
// Send Messages To Storage
127+
content = HttpPipelineHelper.GetRequestContent(httpMessage.Request.Content);
128+
result =_storage.SaveTelemetry(content, HttpPipelineHelper.MinimumRetryInterval);
129+
break;
130+
default:
131+
// Log Non-Retriable Status and don't retry or store;
132+
break;
133+
}
137134
}
135+
136+
return result;
138137
}
139138
}
140139
}

sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/src/ITransmitter.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
using System.Threading.Tasks;
77

88
using Azure.Monitor.OpenTelemetry.Exporter.Models;
9+
using OpenTelemetry;
910

1011
namespace Azure.Monitor.OpenTelemetry.Exporter
1112
{
@@ -14,6 +15,6 @@ internal interface ITransmitter
1415
/// <summary>
1516
/// Sent telemetry and return the number of items Accepted.
1617
/// </summary>
17-
ValueTask<int> TrackAsync(IEnumerable<TelemetryItem> telemetryItems, bool async, CancellationToken cancellationToken);
18+
ValueTask<ExportResult> TrackAsync(IEnumerable<TelemetryItem> telemetryItems, bool async, CancellationToken cancellationToken);
1819
}
1920
}
Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,18 @@
11
// Copyright (c) Microsoft Corporation. All rights reserved.
22
// Licensed under the MIT License.
33

4+
using OpenTelemetry;
45
using OpenTelemetry.Contrib.Extensions.PersistentStorage;
56

67
namespace Azure.Monitor.OpenTelemetry.Exporter
78
{
89
internal static class PersistentStorageExtensions
910
{
10-
internal static void SaveTelemetry(this IPersistentStorage storage, byte[] content, int leaseTime)
11+
internal static ExportResult SaveTelemetry(this IPersistentStorage storage, byte[] content, int leaseTime)
1112
{
1213
var blob = storage.CreateBlob(content, leaseTime);
13-
if (blob != null)
14-
{
15-
// log telemetry saved offline.
16-
// unsuccessfull message will be logged by persistent storage.
17-
}
14+
15+
return blob == null ? ExportResult.Failure : ExportResult.Success;
1816
}
1917
}
2018
}

sdk/monitor/Azure.Monitor.OpenTelemetry.Exporter/tests/Integration.Tests/TestFramework/MockTransmitter.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,21 +8,22 @@
88
using System.Threading.Tasks;
99

1010
using Azure.Monitor.OpenTelemetry.Exporter.Models;
11+
using OpenTelemetry;
1112

1213
namespace Azure.Monitor.OpenTelemetry.Exporter.Integration.Tests.TestFramework
1314
{
1415
internal class MockTransmitter : ITransmitter
1516
{
1617
public ConcurrentBag<TelemetryItem> TelemetryItems = new ConcurrentBag<TelemetryItem>();
1718

18-
public ValueTask<int> TrackAsync(IEnumerable<TelemetryItem> telemetryItems, bool async, CancellationToken cancellationToken)
19+
public ValueTask<ExportResult> TrackAsync(IEnumerable<TelemetryItem> telemetryItems, bool async, CancellationToken cancellationToken)
1920
{
2021
foreach (var telemetryItem in telemetryItems)
2122
{
2223
this.TelemetryItems.Add(telemetryItem);
2324
}
2425

25-
return new ValueTask<int>(Task.FromResult(telemetryItems.Count()));
26+
return new ValueTask<ExportResult>(Task.FromResult(ExportResult.Success));
2627
}
2728
}
2829
}

0 commit comments

Comments
 (0)