Skip to content

Commit 37bf37c

Browse files
nisha-bhatiascbeddJoshLove-msft
authored
[Azure.Monitor.Ingestion] EventHandler (Azure#33490)
* wip * wip * wip * Update LogsIngestionClient.cs * wip * wip * wip * wip * wip * add tests * add generic type back to upload methods * wip * wip * wip * wip * Update LogsIngestionClient.cs * Update LogsIngestionClient.cs * wip * wip * wip * Update LogsIngestionClient.cs * update data props to test-proxy with additional debug info * Debug changes * set to record * newest proxy version * Update LogsOptions MaxConcurrency error statement and constructor * Update Upload method in LogsIngestionClient to create new Options for MaxConcurrency * new proxy version that includes upstream request dump * wip * restore TestProxy * Update MonitorIngestionLiveTest.cs * wip * Update Packages.Data.props * wip * Remove cast to List in Upload code * Update LogsIngestionClient.cs * Update LogsIngestionClient.cs * revert snippets * wip --------- Co-authored-by: scbedd <45376673+scbedd@users.noreply.github.com> Co-authored-by: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com>
1 parent 6978b92 commit 37bf37c

25 files changed

+1536
-1832
lines changed

sdk/monitor/Azure.Monitor.Ingestion/api/Azure.Monitor.Ingestion.net461.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,18 @@ public enum ServiceVersion
1919
V2021_11_01_Preview = 1,
2020
}
2121
}
22+
public partial class UploadFailedEventArgs : Azure.SyncAsyncEventArgs
23+
{
24+
public UploadFailedEventArgs(System.Collections.Generic.List<object> failedLogs, System.Exception exception, bool isRunningSynchronously, System.Threading.CancellationToken cancellationToken) : base (default(bool), default(System.Threading.CancellationToken)) { }
25+
public System.Exception Exception { get { throw null; } }
26+
public System.Collections.Generic.IReadOnlyList<object> FailedLogs { get { throw null; } }
27+
}
2228
public partial class UploadLogsOptions
2329
{
2430
public UploadLogsOptions() { }
2531
public int MaxConcurrency { get { throw null; } set { } }
2632
public Azure.Core.Serialization.ObjectSerializer Serializer { get { throw null; } set { } }
33+
public event Azure.Core.SyncAsyncEventHandler<Azure.Monitor.Ingestion.UploadFailedEventArgs> UploadFailedEventHandler { add { } remove { } }
2734
}
2835
}
2936
namespace Microsoft.Extensions.Azure

sdk/monitor/Azure.Monitor.Ingestion/api/Azure.Monitor.Ingestion.netstandard2.0.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,18 @@ public enum ServiceVersion
1919
V2021_11_01_Preview = 1,
2020
}
2121
}
22+
public partial class UploadFailedEventArgs : Azure.SyncAsyncEventArgs
23+
{
24+
public UploadFailedEventArgs(System.Collections.Generic.List<object> failedLogs, System.Exception exception, bool isRunningSynchronously, System.Threading.CancellationToken cancellationToken) : base (default(bool), default(System.Threading.CancellationToken)) { }
25+
public System.Exception Exception { get { throw null; } }
26+
public System.Collections.Generic.IReadOnlyList<object> FailedLogs { get { throw null; } }
27+
}
2228
public partial class UploadLogsOptions
2329
{
2430
public UploadLogsOptions() { }
2531
public int MaxConcurrency { get { throw null; } set { } }
2632
public Azure.Core.Serialization.ObjectSerializer Serializer { get { throw null; } set { } }
33+
public event Azure.Core.SyncAsyncEventHandler<Azure.Monitor.Ingestion.UploadFailedEventArgs> UploadFailedEventHandler { add { } remove { } }
2734
}
2835
}
2936
namespace Microsoft.Extensions.Azure

sdk/monitor/Azure.Monitor.Ingestion/src/Azure.Monitor.Ingestion.csproj

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
<Project Sdk="Microsoft.NET.Sdk">
1+
<Project Sdk="Microsoft.NET.Sdk">
22
<PropertyGroup>
33
<Description>A library for ingesting data to Azure Monitor.</Description>
44
<AssemblyTitle>Azure Monitor Ingestion client library</AssemblyTitle>
@@ -15,6 +15,7 @@
1515
<Compile Include="$(AzureCoreSharedSources)AzureResourceProviderNamespaceAttribute.cs" LinkBase="Shared" />
1616
<Compile Include="$(AzureCoreSharedSources)AzureKeyCredentialPolicy.cs" LinkBase="Shared" />
1717
<Compile Include="$(AzureCoreSharedSources)GZipUtf8JsonRequestContent.cs" LinkBase="Shared" />
18+
<Compile Include="$(AzureCoreSharedSources)SyncAsyncEventHandlerExtensions.cs" LinkBase="Shared" />
1819
</ItemGroup>
1920
<ItemGroup>
2021
<PackageReference Include="Azure.Core" />

sdk/monitor/Azure.Monitor.Ingestion/src/LogsIngestionClient.cs

Lines changed: 109 additions & 39 deletions
Large diffs are not rendered by default.
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
using System;
5+
using System.Collections.Generic;
6+
using System.Threading;
7+
using Azure.Core.Pipeline;
8+
using Azure.Core.Serialization;
9+
10+
namespace Azure.Monitor.Ingestion
11+
{
12+
/// <summary>
13+
/// The event argument models configured with the EventHandler to upload logs to Azure Monitor.
14+
/// </summary>
15+
public class UploadFailedEventArgs : SyncAsyncEventArgs
16+
{
17+
/// <summary>
18+
/// Initializes a new instance of the <see cref="UploadFailedEventArgs"/> class.
19+
/// </summary>
20+
/// <param name="failedLogs"></param>
21+
/// <param name="exception"></param>
22+
/// <param name="isRunningSynchronously"></param>
23+
/// <param name="clientDiagnostics"></param>
24+
/// <param name="cancellationToken"></param>
25+
internal UploadFailedEventArgs(List<object> failedLogs, Exception exception, bool isRunningSynchronously, ClientDiagnostics clientDiagnostics, CancellationToken cancellationToken) : this (failedLogs, exception, isRunningSynchronously, cancellationToken)
26+
{
27+
ClientDiagnostics = clientDiagnostics;
28+
}
29+
30+
/// <summary>
31+
/// Initializes a new instance of the <see cref="UploadFailedEventArgs"/> class.
32+
/// </summary>
33+
/// <param name="failedLogs"></param>
34+
/// <param name="exception"></param>
35+
/// <param name="isRunningSynchronously"></param>
36+
/// <param name="cancellationToken"></param>
37+
public UploadFailedEventArgs(List<object> failedLogs, Exception exception, bool isRunningSynchronously, CancellationToken cancellationToken) : base(isRunningSynchronously, cancellationToken)
38+
{
39+
FailedLogs = failedLogs;
40+
Exception = exception;
41+
}
42+
43+
/// <summary>
44+
/// The list of logs in the batch that failed to upload.
45+
/// </summary>
46+
public IReadOnlyList<object> FailedLogs { get; }
47+
/// <summary>
48+
/// The exception from the batch that failed to upload.
49+
/// </summary>
50+
public Exception Exception { get; }
51+
52+
internal ClientDiagnostics ClientDiagnostics;
53+
}
54+
}

sdk/monitor/Azure.Monitor.Ingestion/src/Models/UploadLogsOptions.cs

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,13 @@
11
// Copyright (c) Microsoft Corporation. All rights reserved.
22
// Licensed under the MIT License.
33

4+
using System;
5+
using System.Collections.Generic;
6+
using System.Reflection;
7+
using System.Threading;
8+
using System.Threading.Tasks;
9+
using Azure.Core;
10+
using Azure.Core.Pipeline;
411
using Azure.Core.Serialization;
512

613
namespace Azure.Monitor.Ingestion
@@ -20,6 +27,58 @@ public class UploadLogsOptions
2027
/// The max concurrent requests to send to the Azure Monitor service when uploading logs.
2128
/// <remarks> In the Upload method, this parameter is not used as the batches are uploaded in sequence. For parallel uploads, if this value is not set the default concurrency will be 5. </remarks>
2229
/// </summary>
23-
public int MaxConcurrency { get; set; }
30+
public int MaxConcurrency
31+
{
32+
get { return _maxConcurrency; }
33+
set { _maxConcurrency = AssertNotNegative(value, "MaxConcurrency"); }
34+
}
35+
36+
private int _maxConcurrency = 5;
37+
38+
/// <summary>
39+
/// An optional EventHandler that provides the list of failed logs and the corresponding exception.
40+
/// </summary>
41+
public event SyncAsyncEventHandler<UploadFailedEventArgs> UploadFailedEventHandler;
42+
43+
internal virtual async Task InvokeEvent(UploadFailedEventArgs uploadFailedArgs)
44+
{
45+
await UploadFailedEventHandler.RaiseAsync(uploadFailedArgs, nameof(LogsIngestionClient), "Upload", uploadFailedArgs.ClientDiagnostics).ConfigureAwait(false);
46+
}
47+
48+
internal virtual async Task<Exception> OnUploadFailedAsync(UploadFailedEventArgs eventArgs)
49+
{
50+
try
51+
{
52+
if (eventArgs.IsRunningSynchronously)
53+
{
54+
#pragma warning disable AZC0103 // Do not wait synchronously in asynchronous scope.
55+
// for customer code so async not ran over sync
56+
InvokeEvent(eventArgs).GetAwaiter().GetResult();
57+
#pragma warning restore AZC0103 // Do not wait synchronously in asynchronous scope.
58+
}
59+
else
60+
{
61+
await InvokeEvent(eventArgs).ConfigureAwait(false);
62+
}
63+
return null;
64+
}
65+
catch (Exception ex)
66+
{
67+
// return exception to caller and caller should check exception to abort processing and rethrow this exception
68+
return ex;
69+
}
70+
}
71+
72+
internal static int AssertNotNegative(int argumentValue, string argumentName)
73+
{
74+
if (argumentValue <= 0)
75+
{
76+
throw new ArgumentOutOfRangeException(argumentName, $"Argument {argumentName} must be a non-negative concurrency (integer) value. The provided value was {argumentValue}.");
77+
}
78+
else
79+
return argumentValue;
80+
}
81+
82+
internal bool HasHandler => UploadFailedEventHandler != null;
2483
}
2584
}

sdk/monitor/Azure.Monitor.Ingestion/tests/BatchingTest.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@ public void ValidateBatchingOneChunkNoGzip()
2626
}
2727
});
2828
}
29-
IEnumerable<LogsIngestionClient.BatchedLogs<IEnumerable>> x = LogsIngestionClient.Batch(entries);
29+
IEnumerable<LogsIngestionClient.BatchedLogs> x = LogsIngestionClient.Batch(entries);
3030
Assert.AreEqual(1, x.Count());
31-
Assert.AreEqual(10, x.First().LogsCount);
31+
Assert.AreEqual(10, x.First().Logs.Count);
3232
}
3333

3434
[Test]
@@ -45,12 +45,12 @@ public void ValidateBatchingMultiChunkNoGzip()
4545
}
4646
});
4747
}
48-
IEnumerable<LogsIngestionClient.BatchedLogs<IEnumerable>> x = LogsIngestionClient.Batch(entries);
48+
IEnumerable<LogsIngestionClient.BatchedLogs> x = LogsIngestionClient.Batch(entries);
4949
int count = x.Count();
5050
Assert.Greater(count, 1); //ideally should be 2 batches
5151
Assert.Less(count, 3);
52-
Assert.Greater(x.First().LogsCount, 10000);
53-
Assert.Less(x.ToList()[1].LogsCount, 10000);
52+
Assert.Greater(x.First().Logs.Count, 10000);
53+
Assert.Less(x.ToList()[1].Logs.Count, 10000);
5454
}
5555
}
5656
}

sdk/monitor/Azure.Monitor.Ingestion/tests/ErrorTest.cs

Lines changed: 79 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33

44
using System;
55
using System.Collections.Generic;
6+
using System.Threading;
7+
using System.Threading.Tasks;
68
using Azure.Core;
79
using Azure.Core.Pipeline;
810
using Azure.Core.TestFramework;
@@ -51,7 +53,8 @@ private static List<Object> GenerateEntries(int numEntries, DateTime recordingNo
5153
public void OneFailure()
5254
{
5355
LogsIngestionClient client = CreateClient();
54-
LogsIngestionClient.Compression = null;
56+
// set compression to gzip so SDK does not gzip data (assumes already gzipped)
57+
LogsIngestionClient.Compression = "gzip";
5558
var entries = GenerateEntries(800, Recording.Now.DateTime);
5659
entries.Add(new object[] {
5760
new {
@@ -77,9 +80,9 @@ public void OneFailure()
7780
public void TwoFailures()
7881
{
7982
LogsIngestionClient client = CreateClient();
80-
LogsIngestionClient.Compression = null;
83+
// set compression to gzip so SDK does not gzip data (assumes already gzipped)
84+
LogsIngestionClient.Compression = "gzip";
8185
var entries = GenerateEntries(800, Recording.Now.DateTime);
82-
8386
// Add 2 entries that are going to fail in 2 batches
8487
entries.Add(new object[] {
8588
new {
@@ -107,5 +110,78 @@ public void TwoFailures()
107110
Assert.AreEqual(413, exception.Status);
108111
}
109112
}
113+
114+
[Test]
115+
public async Task OneFailureWithEventHandler()
116+
{
117+
LogsIngestionClient client = CreateClient();
118+
// set compression to gzip so SDK does not gzip data (assumes already gzipped)
119+
LogsIngestionClient.Compression = "gzip";
120+
var entries = GenerateEntries(800, Recording.Now.DateTime);
121+
entries.Add(new object[] {
122+
new {
123+
Time = Recording.Now.DateTime,
124+
Computer = "Computer" + new string('*', Mb),
125+
AdditionalContext = 1
126+
}
127+
});
128+
129+
// Make the request
130+
UploadLogsOptions options = new UploadLogsOptions();
131+
var cts = new CancellationTokenSource();
132+
bool isTriggered = false;
133+
options.UploadFailedEventHandler += Options_UploadFailed;
134+
//await client.UploadAsync(TestEnvironment.DCRImmutableId, TestEnvironment.StreamName, entries, options, cts.Token).ConfigureAwait(false);
135+
await client.UploadAsync(TestEnvironment.DCRImmutableId, TestEnvironment.StreamName, entries, options).ConfigureAwait(false);
136+
Assert.IsTrue(isTriggered);
137+
Task Options_UploadFailed(UploadFailedEventArgs e)
138+
{
139+
isTriggered = true;
140+
Assert.IsInstanceOf<RequestFailedException>(e.Exception);
141+
Assert.AreEqual("ContentLengthLimitExceeded", ((RequestFailedException)(e.Exception)).ErrorCode);
142+
Assert.IsNull(((RequestFailedException)(e.Exception)).InnerException);
143+
Assert.AreEqual(413, ((RequestFailedException)(e.Exception)).Status);
144+
return Task.CompletedTask;
145+
}
146+
}
147+
148+
[Test]
149+
public async Task TwoFailuresWithEventHandler()
150+
{
151+
LogsIngestionClient client = CreateClient();
152+
// set compression to gzip so SDK does not gzip data (assumes already gzipped)
153+
LogsIngestionClient.Compression = "gzip";
154+
var entries = GenerateEntries(800, Recording.Now.DateTime);
155+
entries.Add(new object[] {
156+
new {
157+
Time = Recording.Now.DateTime,
158+
Computer = "Computer" + new string('*', Mb),
159+
AdditionalContext = 1
160+
}
161+
});
162+
entries.Add(new object[] {
163+
new {
164+
Time = Recording.Now.DateTime,
165+
Computer = "Computer" + new string('!', Mb),
166+
AdditionalContext = 1
167+
}
168+
});
169+
170+
// Make the request
171+
UploadLogsOptions options = new UploadLogsOptions();
172+
bool isTriggered = false;
173+
options.UploadFailedEventHandler += Options_UploadFailed;
174+
await client.UploadAsync(TestEnvironment.DCRImmutableId, TestEnvironment.StreamName, entries, options).ConfigureAwait(false);
175+
Assert.IsTrue(isTriggered);
176+
Task Options_UploadFailed(UploadFailedEventArgs e)
177+
{
178+
isTriggered = true;
179+
Assert.IsInstanceOf<RequestFailedException>(e.Exception);
180+
Assert.AreEqual("ContentLengthLimitExceeded", ((RequestFailedException)(e.Exception)).ErrorCode);
181+
Assert.IsNull(((RequestFailedException)(e.Exception)).InnerException);
182+
Assert.AreEqual(413, ((RequestFailedException)(e.Exception)).Status);
183+
return Task.CompletedTask;
184+
}
185+
}
110186
}
111187
}

sdk/monitor/Azure.Monitor.Ingestion/tests/MonitorIngestionLiveTest.cs

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,30 @@ namespace Azure.Monitor.Ingestion.Tests
1616
public class MonitorIngestionLiveTest : RecordedTestBase<MonitorIngestionTestEnvironment>
1717
{
1818
private const int Mb = 1024 * 1024;
19+
private const int Kb = 1024;
20+
1921
public MonitorIngestionLiveTest(bool isAsync) : base(isAsync)
2022
{
2123
}
2224

2325
/* please refer to https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/template/Azure.Template/tests/TemplateClientLiveTests.cs to write tests. */
2426

27+
[OneTimeSetUp]
28+
public void SetUp()
29+
{
30+
// make batch size smaller for Uploads for test recording size
31+
if (Mode == RecordedTestMode.Record || Mode == RecordedTestMode.Playback)
32+
LogsIngestionClient.SingleUploadThreshold = Kb;
33+
else
34+
LogsIngestionClient.SingleUploadThreshold = Mb;
35+
}
36+
37+
[OneTimeTearDown]
38+
public void CleanUp()
39+
{
40+
LogsIngestionClient.SingleUploadThreshold = Mb;
41+
}
42+
2543
private LogsIngestionClient CreateClient(HttpPipelinePolicy policy = null)
2644
{
2745
var options = new LogsIngestionClientOptions();
@@ -119,7 +137,6 @@ public async Task ValidInputFromArrayAsJsonWithSingleBatchWithGzip()
119137
public async Task ValidInputFromArrayAsJsonWithMultiBatchWithGzip()
120138
{
121139
LogsIngestionClient client = CreateClient();
122-
LogsIngestionClient.SingleUploadThreshold = 500; // make batch size smaller for Uploads for test recording size
123140

124141
// Make the request
125142
var response = await client.UploadAsync(TestEnvironment.DCRImmutableId, TestEnvironment.StreamName, GenerateEntries(1000, Recording.Now.DateTime)).ConfigureAwait(false);
@@ -136,8 +153,6 @@ public async Task ConcurrencyMultiThread()
136153
{
137154
var policy = new ConcurrencyCounterPolicy(10);
138155
LogsIngestionClient client = CreateClient(policy);
139-
// make batch size smaller for Uploads for test recording size
140-
LogsIngestionClient.SingleUploadThreshold = 100;
141156

142157
// Make the request
143158
UploadLogsOptions options = new UploadLogsOptions();
@@ -156,7 +171,6 @@ public void ConcurrencySingleThread()
156171
var policy = new ConcurrencyCounterPolicy(10);
157172
LogsIngestionClient client = CreateClient(policy);
158173

159-
LogsIngestionClient.SingleUploadThreshold = 100; // make batch size smaller for Uploads for test recording size
160174
var response = client.Upload(TestEnvironment.DCRImmutableId, TestEnvironment.StreamName, GenerateEntries(50, Recording.Now.DateTime));
161175

162176
// Check the response
@@ -220,5 +234,24 @@ public async Task ValidInputAlreadyGzipped()
220234
Assert.AreEqual(204, response.Status);
221235
Assert.IsFalse(response.IsError);
222236
}
237+
238+
[Test]
239+
public async Task ValidInputWithEventHandler()
240+
{
241+
LogsIngestionClient client = CreateClient();
242+
var entries = GenerateEntries(200, Recording.Now.DateTime);
243+
244+
// Make the request
245+
UploadLogsOptions options = new UploadLogsOptions();
246+
bool isTriggered = false;
247+
options.UploadFailedEventHandler += Options_UploadFailed;
248+
await client.UploadAsync(TestEnvironment.DCRImmutableId, TestEnvironment.StreamName, entries, options).ConfigureAwait(false);
249+
Assert.IsFalse(isTriggered);
250+
Task Options_UploadFailed(UploadFailedEventArgs e)
251+
{
252+
isTriggered = true;
253+
return Task.CompletedTask;
254+
}
255+
}
223256
}
224257
}

0 commit comments

Comments
 (0)