99using System . Threading . Tasks ;
1010using Azure . Core ;
1111using Azure . Core . Pipeline ;
12- using Azure . Core . Serialization ;
1312using Azure . Messaging . EventGrid . Models ;
1413
1514namespace Azure . Messaging . EventGrid
@@ -19,15 +18,9 @@ namespace Azure.Messaging.EventGrid
1918 /// </summary>
2019 public class EventGridPublisherClient
2120 {
22- private readonly EventGridRestClient _serviceRestClient ;
2321 private readonly ClientDiagnostics _clientDiagnostics ;
24- private string _hostName => _endpoint . Authority ;
25- private readonly Uri _endpoint ;
26- private readonly AzureKeyCredential _key ;
22+ private readonly RequestUriBuilder _uriBuilder ;
2723 private readonly HttpPipeline _pipeline ;
28- private readonly string _apiVersion ;
29-
30- private static readonly JsonObjectSerializer s_jsonSerializer = new JsonObjectSerializer ( ) ;
3124
3225 /// <summary>Initalizes a new instance of the <see cref="EventGridPublisherClient"/> class for mocking.</summary>
3326 protected EventGridPublisherClient ( )
@@ -50,11 +43,10 @@ public EventGridPublisherClient(Uri endpoint, AzureKeyCredential credential, Eve
5043 {
5144 Argument . AssertNotNull ( credential , nameof ( credential ) ) ;
5245 options ??= new EventGridPublisherClientOptions ( ) ;
53- _apiVersion = options . Version . GetVersionString ( ) ;
54- _endpoint = endpoint ;
55- _key = credential ;
56- _pipeline = HttpPipelineBuilder . Build ( options , new EventGridKeyCredentialPolicy ( credential , Constants . SasKeyName ) ) ;
57- _serviceRestClient = new EventGridRestClient ( new ClientDiagnostics ( options ) , _pipeline , options . Version . GetVersionString ( ) ) ;
46+ _uriBuilder = new RequestUriBuilder ( ) ;
47+ _uriBuilder . Reset ( endpoint ) ;
48+ _uriBuilder . AppendQuery ( "api-version" , options . Version . GetVersionString ( ) , true ) ;
49+ _pipeline = HttpPipelineBuilder . Build ( options , new EventGridKeyCredentialPolicy ( credential ) ) ;
5850 _clientDiagnostics = new ClientDiagnostics ( options ) ;
5951 }
6052
@@ -69,9 +61,10 @@ public EventGridPublisherClient(Uri endpoint, AzureSasCredential credential, Eve
6961 {
7062 Argument . AssertNotNull ( credential , nameof ( credential ) ) ;
7163 options ??= new EventGridPublisherClientOptions ( ) ;
72- _endpoint = endpoint ;
73- HttpPipeline pipeline = HttpPipelineBuilder . Build ( options , new EventGridSharedAccessSignatureCredentialPolicy ( credential ) ) ;
74- _serviceRestClient = new EventGridRestClient ( new ClientDiagnostics ( options ) , pipeline , options . Version . GetVersionString ( ) ) ;
64+ _uriBuilder = new RequestUriBuilder ( ) ;
65+ _uriBuilder . Reset ( endpoint ) ;
66+ _uriBuilder . AppendQuery ( "api-version" , options . Version . GetVersionString ( ) , true ) ;
67+ _pipeline = HttpPipelineBuilder . Build ( options , new EventGridSharedAccessSignatureCredentialPolicy ( credential ) ) ;
7568 _clientDiagnostics = new ClientDiagnostics ( options ) ;
7669 }
7770
@@ -123,20 +116,6 @@ private async Task<Response> SendCloudNativeCloudEventsInternalAsync(ReadOnlyMem
123116 }
124117 }
125118
126- private Request CreateEventRequest( HttpMessage message , string contentType )
127- {
128- Request request = message. Request ;
129- request. Method = RequestMethod . Post ;
130- var uri = new RawRequestUriBuilder ( ) ;
131- uri . AppendRaw ( "https://" , false ) ;
132- uri . AppendRaw ( _hostName , false ) ;
133- uri . AppendPath ( "/api/events" , false ) ;
134- uri . AppendQuery ( "api-version" , _apiVersion , true ) ;
135- request . Uri = uri ;
136- request . Headers . Add ( "Content-Type" , contentType ) ;
137- return request ;
138- }
139-
140119 /// <summary> Publishes a set of EventGridEvents to an Event Grid topic. </summary>
141120 /// <param name="eventGridEvent"> The event to be published to Event Grid. </param>
142121 /// <param name="cancellationToken"> An optional cancellation token instance to signal the request to cancel the operation.</param>
@@ -177,13 +156,13 @@ private async Task<Response> SendEventsInternal(IEnumerable<EventGridEvent> even
177156 // List of events cannot be null
178157 Argument. AssertNotNull ( events , nameof ( events ) ) ;
179158
180- List< EventGridEventInternal > eventsWithSerializedPayloads = new List < EventGridEventInternal > ( ) ;
159+ using HttpMessage message = _pipeline. CreateMessage ( ) ;
160+ Request request = CreateEventRequest( message , "application/json" ) ;
161+ var content = new Utf8JsonRequestContent( ) ;
162+ content. JsonWriter . WriteStartArray ( ) ;
181163 foreach ( EventGridEvent egEvent in events)
182164 {
183- // Individual events cannot be null
184- Argument . AssertNotNull ( egEvent , nameof ( egEvent ) ) ;
185165 JsonDocument data = JsonDocument. Parse ( egEvent . Data . ToStream ( ) ) ;
186-
187166 EventGridEventInternal newEGEvent = new EventGridEventInternal(
188167 egEvent . Id ,
189168 egEvent . Subject ,
@@ -194,24 +173,27 @@ private async Task<Response> SendEventsInternal(IEnumerable<EventGridEvent> even
194173 {
195174 Topic = egEvent. Topic
196175 } ;
197-
198- eventsWithSerializedPayloads. Add ( newEGEvent ) ;
176+ content. JsonWriter . WriteObjectValue ( newEGEvent ) ;
199177 }
178+
179+ content. JsonWriter . WriteEndArray ( ) ;
180+ request. Content = content ;
181+
200182 if ( async)
201183 {
202- // Publish asynchronously if called via an async path
203- return await _serviceRestClient. PublishEventsAsync (
204- _hostName ,
205- eventsWithSerializedPayloads ,
206- cancellationToken ) . ConfigureAwait ( false ) ;
184+ await _pipeline . SendAsync ( message , cancellationToken ) . ConfigureAwait ( false ) ;
207185 }
208186 else
209187 {
210- return _serviceRestClient. PublishEvents (
211- _hostName ,
212- eventsWithSerializedPayloads ,
213- cancellationToken ) ;
188+ _pipeline . Send ( message , cancellationToken ) ;
214189 }
190+ return message . Response . Status switch
191+ {
192+ 200 => message . Response ,
193+ _ => async ?
194+ throw await _clientDiagnostics . CreateRequestFailedExceptionAsync ( message . Response ) . ConfigureAwait ( false ) :
195+ throw _clientDiagnostics. CreateRequestFailedException ( message . Response )
196+ } ;
215197 }
216198 catch ( Exception e )
217199 {
@@ -313,44 +295,6 @@ public virtual async Task<Response> SendEventsAsync(IEnumerable<BinaryData> cust
313295 public virtual Response SendEvents( IEnumerable < BinaryData > customEvents , CancellationToken cancellationToken = default )
314296 => PublishCustomEventsInternal( customEvents , false /*async*/ , cancellationToken ) . EnsureCompleted ( ) ;
315297
316- private async Task< Response > PublishCustomEventsInternal ( IEnumerable < object > events , bool async , CancellationToken cancellationToken = default )
317- {
318- using DiagnosticScope scope = _clientDiagnostics. CreateScope ( $ "{ nameof ( EventGridPublisherClient ) } .{ nameof ( SendEvents ) } ") ;
319- scope. Start ( ) ;
320-
321- try
322- {
323- List< CustomModelSerializer > serializedEvents = new List < CustomModelSerializer > ( ) ;
324- foreach ( object customEvent in events)
325- {
326- serializedEvents. Add (
327- new CustomModelSerializer (
328- customEvent ,
329- s_jsonSerializer ,
330- cancellationToken ) ) ;
331- }
332- if ( async)
333- {
334- return await _serviceRestClient. PublishCustomEventEventsAsync (
335- _hostName ,
336- serializedEvents ,
337- cancellationToken ) . ConfigureAwait ( false ) ;
338- }
339- else
340- {
341- return _serviceRestClient. PublishCustomEventEvents (
342- _hostName ,
343- serializedEvents ,
344- cancellationToken ) ;
345- }
346- }
347- catch ( Exception e )
348- {
349- scope. Failed ( e ) ;
350- throw ;
351- }
352- }
353-
354298 private async Task< Response > PublishCustomEventsInternal ( IEnumerable < BinaryData > events , bool async , CancellationToken cancellationToken = default )
355299 {
356300 using DiagnosticScope scope = _clientDiagnostics. CreateScope ( $ "{ nameof ( EventGridPublisherClient ) } .{ nameof ( SendEvents ) } ") ;
@@ -394,5 +338,14 @@ private async Task<Response> PublishCustomEventsInternal(IEnumerable<BinaryData>
394338 throw ;
395339 }
396340 }
341+
342+ private Request CreateEventRequest( HttpMessage message , string contentType )
343+ {
344+ Request request = message. Request ;
345+ request. Method = RequestMethod . Post ;
346+ request. Uri = _uriBuilder ;
347+ request. Headers . Add ( "Content-Type" , contentType ) ;
348+ return request;
349+ }
397350 }
398351}
0 commit comments