Skip to content

Commit aa31fa0

Browse files
Added storage event sample (Azure#22946)
* Added storage event sample * Update sdk/storage/azure-storage-blob/src/samples/java/com/azure/storage/blob/StorageEventExample.java Co-authored-by: Alan Zimmer <48699787+alzimmermsft@users.noreply.github.com> * ci fixes Co-authored-by: Alan Zimmer <48699787+alzimmermsft@users.noreply.github.com>
1 parent d11329c commit aa31fa0

File tree

2 files changed

+151
-1
lines changed

2 files changed

+151
-1
lines changed
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.storage.blob;
5+
6+
import com.azure.core.http.HttpPipelineCallContext;
7+
import com.azure.core.http.HttpPipelineNextPolicy;
8+
import com.azure.core.http.HttpPipelinePosition;
9+
import com.azure.core.http.HttpRequest;
10+
import com.azure.core.http.HttpResponse;
11+
import com.azure.core.http.policy.HttpLoggingPolicy;
12+
import com.azure.core.http.policy.HttpPipelinePolicy;
13+
import reactor.core.publisher.Mono;
14+
15+
import java.io.IOException;
16+
import java.util.Optional;
17+
import java.util.function.BiConsumer;
18+
import java.util.function.Consumer;
19+
20+
/**
21+
* This examples shows how to approximate the StorageEvent behavior from the track 1 SDK. It is a general translation to
22+
* achieve roughly the same results, but it is not an identical implementation. It may be modified to suit the use case.
23+
*
24+
* The general pattern is to create an {@link HttpPipelinePolicy} that will call the events at appropriate times. All
25+
* requests pass through a pipeline, so any pipeline which contains an instance of this policy will be able to invoke
26+
* the callbacks. Once the policy is defined, it must be set on the builder when constructing the clients.
27+
*
28+
* This sample can be run as is to demonstrate usage. Expected output is a console statement indicating the request
29+
* being sent, the response being received, and (in the async case) the request completing. To demonstrate the retry and
30+
* error handlers, uncomment the line {@code //.addPolicy(new ErrorPolicy())} when configuring the builder and run
31+
* again. This will print out several retries and errors. The async segment will not run in this case as the eventual
32+
* exception will terminate the program.
33+
*
34+
* The main areas of divergence from the original feature are:
35+
* - It is only possible to use the sendingRequestHandler on an async client.
36+
* - The callbacks do not all accept the same type as they did in the track 1 sdk and there is no StorageEvent type.
37+
* - The SendingRequest events come after the signature here. In track 1, they came before the signature. It would be
38+
* possible to create a second, near identical policy to put before the signature specifically for the SendingRequest
39+
* event if that behavior is desirable. The policy shown here should then be modified to not duplicate that event.
40+
* - Global handlers are not demonstrated here. They could be implemented in a very similar fashion by having some
41+
* static fields on the policy object that are called alongside the instance fields.
42+
*/
43+
public class StorageEventExample {
44+
45+
public static void main(String[] args) {
46+
// Define the event handlers
47+
Consumer<HttpRequest> sendingRequestHandler =
48+
request -> System.out.println("Sending request " + request.getUrl());
49+
BiConsumer<HttpRequest, Integer> retryRequestHandler =
50+
(request, retryNumber) -> System.out.println("Retrying request. " + request.getUrl() + " Attempt number "
51+
+ retryNumber);
52+
BiConsumer<HttpRequest, HttpResponse> responseReceivedHandler =
53+
(request, response) -> System.out.println("Received response. Request " + request.getUrl() + "\nResponse "
54+
+ "status" + response.getStatusCode());
55+
BiConsumer<HttpRequest, Throwable> errorHandler =
56+
(request, t) -> System.out.println("Error. Request " + request.getUrl() + "\n " + t.getMessage());
57+
/*
58+
If actions specific to the request type must be taken, the consumer type parameter should correspond to the type
59+
returned by the api.
60+
*/
61+
Consumer<Object> requestCompleteHandler =
62+
obj -> System.out.println("Request complete");
63+
64+
// Instantiate the policy that will invoke the handlers at the proper time
65+
EventHandlerPolicy eventHandlerPolicy = new EventHandlerPolicy(sendingRequestHandler, retryRequestHandler,
66+
responseReceivedHandler, errorHandler);
67+
68+
// Create clients whose pipeline contains the new policy
69+
BlobClientBuilder builder = new BlobClientBuilder()
70+
.connectionString("<connection-string>")
71+
.addPolicy(eventHandlerPolicy)
72+
.addPolicy(new ErrorPolicy())
73+
.containerName("<container-name>")
74+
.blobName("<blob-name>");
75+
BlobClient bc = builder.buildClient();
76+
BlobAsyncClient bac = builder.buildAsyncClient();
77+
78+
// Use the client as usual, the handlers will now be automatically invoked at the proper times
79+
bc.downloadContent();
80+
/*
81+
The only way to use a requestCompleteHandler is to use the async client and set a side effect operator on the
82+
return value.
83+
*/
84+
System.out.println("Async");
85+
bac.downloadWithResponse(null, null, null, false)
86+
.doOnNext(requestCompleteHandler)
87+
.block();
88+
}
89+
90+
static class EventHandlerPolicy implements HttpPipelinePolicy {
91+
92+
private final Consumer<HttpRequest> sendingRequestEvent;
93+
private final BiConsumer<HttpRequest, Integer> retryRequestEvent;
94+
private final BiConsumer<HttpRequest, HttpResponse> responseReceivedEvent;
95+
private final BiConsumer<HttpRequest, Throwable> errorResponseEvent;
96+
97+
EventHandlerPolicy(Consumer<HttpRequest> sendingRequestEvent,
98+
BiConsumer<HttpRequest, Integer> retryRequestEvent,
99+
BiConsumer<HttpRequest, HttpResponse> responseReceivedEvent,
100+
BiConsumer<HttpRequest, Throwable> errorResponseEvent) {
101+
this.sendingRequestEvent = sendingRequestEvent;
102+
this.retryRequestEvent = retryRequestEvent;
103+
this.responseReceivedEvent = responseReceivedEvent;
104+
this.errorResponseEvent = errorResponseEvent;
105+
}
106+
107+
@Override
108+
public Mono<HttpResponse> process(HttpPipelineCallContext httpPipelineCallContext, HttpPipelineNextPolicy httpPipelineNextPolicy) {
109+
HttpRequest request = httpPipelineCallContext.getHttpRequest();
110+
/*
111+
Check how many retries have gone out. Send an initial sendingRequest event or a retryRequest event as
112+
appropriate.
113+
This value is updated automatically by the retry policy before the request gets here
114+
*/
115+
Optional<Object> retryOptional = httpPipelineCallContext.getData(HttpLoggingPolicy.RETRY_COUNT_CONTEXT);
116+
Integer retryCount = retryOptional.map(o -> (Integer) o).orElse(0);
117+
if (retryCount <= 1) {
118+
this.sendingRequestEvent.accept(request);
119+
} else {
120+
this.retryRequestEvent.accept(request, retryCount);
121+
}
122+
123+
// Set side-effect call backs to process the event without affecting the normal request-response flow
124+
return httpPipelineNextPolicy.process()
125+
.doOnNext(response -> this.responseReceivedEvent.accept(request, response))
126+
.doOnError(throwable -> this.errorResponseEvent.accept(request, throwable));
127+
}
128+
129+
@Override
130+
public HttpPipelinePosition getPipelinePosition() {
131+
// This policy must be in a position to see each retry
132+
return HttpPipelinePosition.PER_RETRY;
133+
}
134+
}
135+
136+
/**
137+
* A simple policy that always returns a retryable error to demonstrate retry and error event handlers
138+
*/
139+
static class ErrorPolicy implements HttpPipelinePolicy {
140+
@Override
141+
public Mono<HttpResponse> process(HttpPipelineCallContext httpPipelineCallContext, HttpPipelineNextPolicy httpPipelineNextPolicy) {
142+
return Mono.error(new IOException("Dummy error"));
143+
}
144+
145+
@Override
146+
public HttpPipelinePosition getPipelinePosition() {
147+
return HttpPipelinePosition.PER_RETRY;
148+
}
149+
}
150+
}

sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/policy/RequestRetryPolicy.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ stream, the buffers that were emitted will have already been consumed (their pos
110110
/*
111111
Update the RETRY_COUNT_CONTEXT to log retries.
112112
*/
113-
context.setData(HttpLoggingPolicy.RETRY_COUNT_CONTEXT, attempt + 1);
113+
context.setData(HttpLoggingPolicy.RETRY_COUNT_CONTEXT, attempt);
114114

115115
/*
116116
We want to send the request with a given timeout, but we don't want to kickoff that timeout-bound operation

0 commit comments

Comments
 (0)