Skip to content

Commit 8e9b225

Browse files
authored
HttpPipeline sync refactor (Azure#29049)
1 parent daa5f1c commit 8e9b225

File tree

14 files changed

+582
-36
lines changed

14 files changed

+582
-36
lines changed

eng/code-quality-reports/src/main/resources/spotbugs/spotbugs-exclude.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -385,7 +385,9 @@
385385
<Match>
386386
<Or>
387387
<Class name="com.azure.core.http.HttpPipelineNextPolicy"/>
388+
<Class name="com.azure.core.http.HttpPipelineNextSyncPolicy"/>
388389
<Class name="com.azure.core.http.HttpRequest"/>
390+
<Class name="com.azure.core.implementation.http.HttpPipelineCallState"/>
389391
</Or>
390392
<Bug pattern="CN_IMPLEMENTS_CLONE_BUT_NOT_CLONEABLE"/>
391393
</Match>

sdk/core/azure-core/CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,12 @@
44

55
### Features Added
66

7+
- Added support for sending synchronous requests using `sendSync` in `HttpPipeline`:
8+
- Added `HttpPipelinePolicy.processSync(HttpPipelineCallContext context, HttpPipelineNextSyncPolicy next)` to allow processing policies synchronously.
9+
- Added `HttpPipelineSyncPolicy` to represent synchronous `HttpPipelinePolicy`.
10+
- Added `HttpPipelineNextSyncPolicy` to invoke the next synchronous policy in pipeline. to process synchronous policy pipeline.
11+
- Added `HttpPipelineCallState` to maintain request specific pipeline and contextual data.
12+
713
### Breaking Changes
814

915
### Bugs Fixed

sdk/core/azure-core/src/main/java/com/azure/core/http/HttpClient.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,17 @@ default Mono<HttpResponse> send(HttpRequest request, Context context) {
3131
return send(request);
3232
}
3333

34+
/**
35+
* Sends the provided request synchronously with contextual information.
36+
*
37+
* @param request The HTTP request to send.
38+
* @param context Contextual information about the request.
39+
* @return The response.
40+
*/
41+
default HttpResponse sendSync(HttpRequest request, Context context) {
42+
return send(request, context).block();
43+
}
44+
3445
/**
3546
* Creates a new {@link HttpClient} instance.
3647
*

sdk/core/azure-core/src/main/java/com/azure/core/http/HttpPipeline.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package com.azure.core.http;
55

66
import com.azure.core.http.policy.HttpPipelinePolicy;
7+
import com.azure.core.implementation.http.HttpPipelineCallState;
78
import com.azure.core.util.Context;
89
import reactor.core.publisher.Mono;
910

@@ -89,6 +90,7 @@ public Mono<HttpResponse> send(HttpRequest request, Context data) {
8990
return this.send(new HttpPipelineCallContext(request, data));
9091
}
9192

93+
9294
/**
9395
* Sends the context (containing an HTTP request) through pipeline.
9496
*
@@ -99,8 +101,23 @@ public Mono<HttpResponse> send(HttpRequest request, Context data) {
99101
public Mono<HttpResponse> send(HttpPipelineCallContext context) {
100102
// Return deferred to mono for complete lazy behaviour.
101103
return Mono.defer(() -> {
102-
HttpPipelineNextPolicy next = new HttpPipelineNextPolicy(this, context);
104+
HttpPipelineNextPolicy next =
105+
new HttpPipelineNextPolicy(new HttpPipelineCallState(this, context));
103106
return next.process();
104107
});
105108
}
109+
110+
/**
111+
* Wraps the request in a context with additional metadata and sends it through the pipeline.
112+
*
113+
* @param request THe HTTP request to send.
114+
* @param data Additional metadata to pass along with the request.
115+
* @return A publisher upon subscription flows the context through policies, sends the request, and emits response
116+
* upon completion.
117+
*/
118+
public HttpResponse sendSync(HttpRequest request, Context data) {
119+
HttpPipelineNextSyncPolicy next = new HttpPipelineNextSyncPolicy(
120+
new HttpPipelineCallState(this, new HttpPipelineCallContext(request, data)));
121+
return next.processSync();
122+
}
106123
}

sdk/core/azure-core/src/main/java/com/azure/core/http/HttpPipelineNextPolicy.java

Lines changed: 41 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,28 +4,41 @@
44
package com.azure.core.http;
55

66
import com.azure.core.http.policy.HttpPipelinePolicy;
7+
import com.azure.core.implementation.http.HttpPipelineCallState;
8+
import com.azure.core.util.logging.ClientLogger;
79
import reactor.core.publisher.Mono;
10+
import reactor.core.scheduler.Schedulers;
811

912
/**
1013
* A type that invokes next policy in the pipeline.
1114
*/
1215
public class HttpPipelineNextPolicy {
13-
private final HttpPipeline pipeline;
14-
private final HttpPipelineCallContext context;
15-
private int currentPolicyIndex;
16+
private static final ClientLogger LOGGER = new ClientLogger(HttpPipelineNextPolicy.class);
17+
private final HttpPipelineCallState state;
18+
private final boolean originatedFromSyncPolicy;
1619

1720
/**
1821
* Package Private ctr.
1922
*
2023
* Creates HttpPipelineNextPolicy.
2124
*
22-
* @param pipeline the pipeline
23-
* @param context the request-response context
25+
* @param state the pipeline call state.
2426
*/
25-
HttpPipelineNextPolicy(final HttpPipeline pipeline, HttpPipelineCallContext context) {
26-
this.pipeline = pipeline;
27-
this.context = context;
28-
this.currentPolicyIndex = -1;
27+
HttpPipelineNextPolicy(HttpPipelineCallState state) {
28+
this.state = state;
29+
this.originatedFromSyncPolicy = false;
30+
}
31+
32+
/**
33+
* Package Private ctr.
34+
* Creates HttpPipelineNextPolicy.
35+
*
36+
* @param state the pipeline call state.
37+
* @param originatedFromSyncPolicy boolean to indicate if the next policy originated from sync call stack.
38+
*/
39+
HttpPipelineNextPolicy(HttpPipelineCallState state, boolean originatedFromSyncPolicy) {
40+
this.state = state;
41+
this.originatedFromSyncPolicy = originatedFromSyncPolicy;
2942
}
3043

3144
/**
@@ -34,16 +47,25 @@ public class HttpPipelineNextPolicy {
3447
* @return A publisher which upon subscription invokes next policy and emits response from the policy.
3548
*/
3649
public Mono<HttpResponse> process() {
37-
final int size = this.pipeline.getPolicyCount();
38-
if (this.currentPolicyIndex > size) {
39-
return Mono.error(new IllegalStateException("There is no more policies to execute."));
40-
}
41-
42-
this.currentPolicyIndex++;
43-
if (this.currentPolicyIndex == size) {
44-
return this.pipeline.getHttpClient().send(this.context.getHttpRequest(), this.context.getContext());
50+
if (originatedFromSyncPolicy && !Schedulers.isInNonBlockingThread()) {
51+
// Pipeline executes in synchronous style. We most likely got here via default implementation in the
52+
// HttpPipelinePolicy.processSynchronously so go back to sync style here.
53+
// Don't do this on non-blocking threads.
54+
return Mono.fromCallable(() -> new HttpPipelineNextSyncPolicy(state).processSync());
4555
} else {
46-
return this.pipeline.getPolicy(this.currentPolicyIndex).process(this.context, this);
56+
if (originatedFromSyncPolicy) {
57+
LOGGER.warning("The pipeline switched from synchronous to asynchronous."
58+
+ "Check if {} does not override HttpPipelinePolicy.processSync",
59+
this.state.getCurrentPolicy().getClass().getSimpleName());
60+
}
61+
62+
HttpPipelinePolicy nextPolicy = state.getNextPolicy();
63+
if (nextPolicy == null) {
64+
return this.state.getPipeline().getHttpClient().send(
65+
this.state.getCallContext().getHttpRequest(), this.state.getCallContext().getContext());
66+
} else {
67+
return nextPolicy.process(this.state.getCallContext(), this);
68+
}
4769
}
4870
}
4971

@@ -54,8 +76,6 @@ public Mono<HttpResponse> process() {
5476
*/
5577
@Override
5678
public HttpPipelineNextPolicy clone() {
57-
HttpPipelineNextPolicy cloned = new HttpPipelineNextPolicy(this.pipeline, this.context);
58-
cloned.currentPolicyIndex = this.currentPolicyIndex;
59-
return cloned;
79+
return new HttpPipelineNextPolicy(this.state.clone(), this.originatedFromSyncPolicy);
6080
}
6181
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.core.http;
5+
6+
import com.azure.core.http.policy.HttpPipelinePolicy;
7+
import com.azure.core.implementation.http.HttpPipelineCallState;
8+
import com.azure.core.implementation.http.HttpPipelineNextSyncPolicyHelper;
9+
10+
/**
11+
* A type that invokes next policy in the pipeline.
12+
*/
13+
public class HttpPipelineNextSyncPolicy {
14+
private final HttpPipelineCallState state;
15+
16+
static {
17+
HttpPipelineNextSyncPolicyHelper.setAccessor(HttpPipelineNextSyncPolicy::toAsyncPolicy);
18+
}
19+
20+
/**
21+
* Package Private ctr.
22+
* Creates HttpPipelineNextPolicy.
23+
*
24+
* @param state the pipeline call state.
25+
*/
26+
HttpPipelineNextSyncPolicy(HttpPipelineCallState state) {
27+
this.state = state;
28+
}
29+
30+
/**
31+
* Invokes the next {@link HttpPipelinePolicy}.
32+
*
33+
* @return The response.
34+
*/
35+
public HttpResponse processSync() {
36+
HttpPipelinePolicy nextPolicy = state.getNextPolicy();
37+
if (nextPolicy == null) {
38+
return this.state.getPipeline().getHttpClient().sendSync(
39+
this.state.getCallContext().getHttpRequest(), this.state.getCallContext().getContext());
40+
} else {
41+
return nextPolicy.processSync(this.state.getCallContext(), this);
42+
}
43+
}
44+
45+
/**
46+
* Creates a new instance of this instance.
47+
*
48+
* @return A new instance of this next pipeline sync policy.
49+
*/
50+
@Override
51+
public HttpPipelineNextSyncPolicy clone() {
52+
return new HttpPipelineNextSyncPolicy(this.state.clone());
53+
}
54+
55+
/**
56+
* Method to convert a {@link HttpPipelineNextSyncPolicy} to a {@link HttpPipelineNextPolicy} for supporting the
57+
* default implementation of
58+
* {@link HttpPipelinePolicy#processSync(HttpPipelineCallContext, HttpPipelineNextSyncPolicy)}.
59+
*
60+
* @return the converted {@link HttpPipelineNextSyncPolicy}.
61+
*/
62+
HttpPipelineNextPolicy toAsyncPolicy() {
63+
return new HttpPipelineNextPolicy(this.state, true);
64+
}
65+
}

sdk/core/azure-core/src/main/java/com/azure/core/http/policy/HttpPipelinePolicy.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@
66
import com.azure.core.http.HttpPipeline;
77
import com.azure.core.http.HttpPipelineCallContext;
88
import com.azure.core.http.HttpPipelineNextPolicy;
9+
import com.azure.core.http.HttpPipelineNextSyncPolicy;
910
import com.azure.core.http.HttpPipelinePosition;
1011
import com.azure.core.http.HttpResponse;
12+
import com.azure.core.implementation.http.HttpPipelineNextSyncPolicyHelper;
1113
import reactor.core.publisher.Mono;
1214

1315
/**
@@ -26,6 +28,17 @@ public interface HttpPipelinePolicy {
2628
*/
2729
Mono<HttpResponse> process(HttpPipelineCallContext context, HttpPipelineNextPolicy next);
2830

31+
/**
32+
* Processes provided request context and invokes the next policy synchronously.
33+
*
34+
* @param context The request context.
35+
* @param next The next policy to invoke.
36+
* @return A publisher that initiates the request upon subscription and emits a response on completion.
37+
*/
38+
default HttpResponse processSync(HttpPipelineCallContext context, HttpPipelineNextSyncPolicy next) {
39+
return process(context, HttpPipelineNextSyncPolicyHelper.toAsyncPolicy(next)).block();
40+
}
41+
2942
/**
3043
* Gets the position to place the policy.
3144
* <p>
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.core.http.policy;
5+
6+
import com.azure.core.http.HttpPipelineCallContext;
7+
import com.azure.core.http.HttpPipelineNextPolicy;
8+
import com.azure.core.http.HttpPipelineNextSyncPolicy;
9+
import com.azure.core.http.HttpResponse;
10+
import reactor.core.publisher.Mono;
11+
12+
/**
13+
* Represents a {@link HttpPipelinePolicy} that doesn't do any asynchronous or synchronously blocking operations.
14+
*/
15+
public class HttpPipelineSyncPolicy implements HttpPipelinePolicy {
16+
17+
/**
18+
* {@inheritDoc}
19+
*/
20+
@Override
21+
public final Mono<HttpResponse> process(HttpPipelineCallContext context, HttpPipelineNextPolicy next) {
22+
return Mono.fromCallable(
23+
() -> {
24+
beforeSendingRequest(context);
25+
return next;
26+
})
27+
.flatMap(ignored -> next.process())
28+
.map(response -> afterReceivedResponse(context, response));
29+
}
30+
31+
/**
32+
* {@inheritDoc}
33+
*/
34+
@Override
35+
public final HttpResponse processSync(HttpPipelineCallContext context, HttpPipelineNextSyncPolicy next) {
36+
beforeSendingRequest(context);
37+
HttpResponse response = next.processSync();
38+
return afterReceivedResponse(context, response);
39+
}
40+
41+
/**
42+
* Method is invoked before the request is sent.
43+
* @param context The request context.
44+
*/
45+
protected void beforeSendingRequest(HttpPipelineCallContext context) {
46+
// empty by default
47+
}
48+
49+
/**
50+
* Method is invoked after the response is received.
51+
* @param context The request context.
52+
* @param response The response received.
53+
* @return The transformed response.
54+
*/
55+
protected HttpResponse afterReceivedResponse(HttpPipelineCallContext context, HttpResponse response) {
56+
// empty by default
57+
return response;
58+
}
59+
}

0 commit comments

Comments
 (0)