Skip to content

Commit d21bd89

Browse files
committed
Merge branch 'resubscribe-on-token-expiration' into 3.x
Signed-off-by: Ben Hale <bhale@pivotal.io>
2 parents 5f69c68 + 052baff commit d21bd89

File tree

8 files changed

+97
-80
lines changed

8 files changed

+97
-80
lines changed

.idea/encodings.xml

Lines changed: 7 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/uaa/AbstractUaaOperations.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,17 @@ protected final <T> Mono<T> post(Object requestPayload, Class<T> responseType, F
108108
.parseBody(responseType));
109109
}
110110

111+
protected final <T> Mono<T> post(Object requestPayload, Class<T> responseType, Function<UriComponentsBuilder, UriComponentsBuilder> uriTransformer, Consumer<HttpHeaders> headersTransformer,
112+
Function<HttpHeaders, Mono<? extends HttpHeaders>> headersWhenTransformer) {
113+
return createOperator()
114+
.flatMap(operator -> operator.headers(headers -> addHeaders(headers, requestPayload, headersTransformer)).headersWhen(headersWhenTransformer)
115+
.post()
116+
.uri(queryTransformer(requestPayload).andThen(uriTransformer))
117+
.send(requestPayload)
118+
.response()
119+
.parseBody(responseType));
120+
}
121+
111122
protected final <T> Mono<T> post(Object requestPayload, Class<T> responseType, Function<UriComponentsBuilder, UriComponentsBuilder> uriTransformer) {
112123
return createOperator()
113124
.flatMap(operator -> operator.headers(headers -> addHeaders(headers, requestPayload))

cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/uaa/tokens/ReactorTokens.java

Lines changed: 21 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,13 @@ public ReactorTokens(ConnectionContext connectionContext, Mono<String> root, Tok
7070

7171
@Override
7272
public Mono<CheckTokenResponse> check(CheckTokenRequest request) {
73-
return post(request, CheckTokenResponse.class, builder -> builder.pathSegment("check_token"), outbound -> {
74-
String encoded = Base64.getEncoder().encodeToString(new AsciiString(request.getClientId()).concat(":").concat(request.getClientSecret()).toByteArray());
75-
outbound.set(AUTHORIZATION, BASIC_PREAMBLE + encoded);
76-
})
73+
return post(request, CheckTokenResponse.class, builder -> builder.pathSegment("check_token"),
74+
outbound -> {
75+
},
76+
outbound -> {
77+
String encoded = Base64.getEncoder().encodeToString(new AsciiString(request.getClientId()).concat(":").concat(request.getClientSecret()).toByteArray());
78+
return Mono.just(outbound.set(AUTHORIZATION, BASIC_PREAMBLE + encoded));
79+
})
7780
.checkpoint();
7881
}
7982

@@ -82,10 +85,8 @@ public Mono<GetTokenByAuthorizationCodeResponse> getByAuthorizationCode(GetToken
8285
return post(request, GetTokenByAuthorizationCodeResponse.class, builder -> builder.pathSegment("oauth", "token")
8386
.queryParam("grant_type", AUTHORIZATION_CODE)
8487
.queryParam("response_type", ResponseType.TOKEN),
85-
outbound -> {
86-
ReactorTokens.removeAuthorization(outbound);
87-
ReactorTokens.setUrlEncoded(outbound);
88-
})
88+
ReactorTokens::setUrlEncoded,
89+
ReactorTokens::removeAuthorization)
8990
.checkpoint();
9091
}
9192

@@ -94,10 +95,8 @@ public Mono<GetTokenByClientCredentialsResponse> getByClientCredentials(GetToken
9495
return post(request, GetTokenByClientCredentialsResponse.class, builder -> builder.pathSegment("oauth", "token")
9596
.queryParam("grant_type", CLIENT_CREDENTIALS)
9697
.queryParam("response_type", ResponseType.TOKEN),
97-
outbound -> {
98-
ReactorTokens.removeAuthorization(outbound);
99-
ReactorTokens.setUrlEncoded(outbound);
100-
})
98+
ReactorTokens::setUrlEncoded,
99+
ReactorTokens::removeAuthorization)
101100
.checkpoint();
102101
}
103102

@@ -106,10 +105,8 @@ public Mono<GetTokenByOneTimePasscodeResponse> getByOneTimePasscode(GetTokenByOn
106105
return post(request, GetTokenByOneTimePasscodeResponse.class, builder -> builder.pathSegment("oauth", "token")
107106
.queryParam("grant_type", PASSWORD)
108107
.queryParam("response_type", ResponseType.TOKEN),
109-
outbound -> {
110-
ReactorTokens.removeAuthorization(outbound);
111-
ReactorTokens.setUrlEncoded(outbound);
112-
})
108+
ReactorTokens::setUrlEncoded,
109+
ReactorTokens::removeAuthorization)
113110
.checkpoint();
114111
}
115112

@@ -118,10 +115,8 @@ public Mono<GetTokenByOpenIdResponse> getByOpenId(GetTokenByOpenIdRequest reques
118115
return post(request, GetTokenByOpenIdResponse.class, builder -> builder.pathSegment("oauth", "token")
119116
.queryParam("grant_type", AUTHORIZATION_CODE)
120117
.queryParam("response_type", ResponseType.ID_TOKEN),
121-
outbound -> {
122-
ReactorTokens.removeAuthorization(outbound);
123-
ReactorTokens.setUrlEncoded(outbound);
124-
})
118+
ReactorTokens::setUrlEncoded,
119+
ReactorTokens::removeAuthorization)
125120
.checkpoint();
126121
}
127122

@@ -130,10 +125,8 @@ public Mono<GetTokenByPasswordResponse> getByPassword(GetTokenByPasswordRequest
130125
return post(request, GetTokenByPasswordResponse.class, builder -> builder.pathSegment("oauth", "token")
131126
.queryParam("grant_type", PASSWORD)
132127
.queryParam("response_type", ResponseType.TOKEN),
133-
outbound -> {
134-
ReactorTokens.removeAuthorization(outbound);
135-
ReactorTokens.setUrlEncoded(outbound);
136-
})
128+
ReactorTokens::setUrlEncoded,
129+
ReactorTokens::removeAuthorization)
137130
.checkpoint();
138131
}
139132

@@ -153,15 +146,13 @@ public Mono<ListTokenKeysResponse> listKeys(ListTokenKeysRequest request) {
153146
public Mono<RefreshTokenResponse> refresh(RefreshTokenRequest request) {
154147
return post(request, RefreshTokenResponse.class, builder -> builder.pathSegment("oauth", "token")
155148
.queryParam("grant_type", REFRESH_TOKEN),
156-
outbound -> {
157-
ReactorTokens.removeAuthorization(outbound);
158-
ReactorTokens.setUrlEncoded(outbound);
159-
})
149+
ReactorTokens::setUrlEncoded,
150+
ReactorTokens::removeAuthorization)
160151
.checkpoint();
161152
}
162153

163-
private static void removeAuthorization(HttpHeaders request) {
164-
request.remove(AUTHORIZATION);
154+
private static Mono<HttpHeaders> removeAuthorization(HttpHeaders request) {
155+
return Mono.just(request.remove(AUTHORIZATION));
165156
}
166157

167158
private static void setUrlEncoded(HttpHeaders request) {

cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/util/AbstractReactorOperations.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.cloudfoundry.reactor.util;
1818

19+
import io.netty.handler.codec.http.HttpHeaders;
1920
import org.cloudfoundry.reactor.ConnectionContext;
2021
import org.cloudfoundry.reactor.TokenProvider;
2122
import reactor.core.publisher.Mono;
@@ -44,8 +45,18 @@ protected Mono<Operator> createOperator() {
4445

4546
return this.root.map(this::buildOperatorContext)
4647
.map(context -> new Operator(context, httpClient))
47-
.flatMap(operator -> this.tokenProvider.getToken(this.connectionContext)
48-
.map(token -> setHeaders(operator, token)));
48+
.map(operator -> operator.headers(this::addHeaders))
49+
.map(operator -> operator.headersWhen(this::addHeadersWhen));
50+
}
51+
52+
private void addHeaders(HttpHeaders httpHeaders) {
53+
UserAgent.setUserAgent(httpHeaders);
54+
JsonCodec.setDecodeHeaders(httpHeaders);
55+
}
56+
57+
private Mono<? extends HttpHeaders> addHeadersWhen(HttpHeaders httpHeaders) {
58+
return this.tokenProvider.getToken(this.connectionContext)
59+
.map(token -> httpHeaders.set(AUTHORIZATION, token));
4960
}
5061

5162
private OperatorContext buildOperatorContext(String root) {
@@ -56,12 +67,4 @@ private OperatorContext buildOperatorContext(String root) {
5667
.build();
5768
}
5869

59-
private Operator setHeaders(Operator operator, String token) {
60-
return operator.headers(httpHeaders -> {
61-
httpHeaders.set(AUTHORIZATION, token);
62-
UserAgent.setUserAgent(httpHeaders);
63-
JsonCodec.setDecodeHeaders(httpHeaders);
64-
});
65-
}
66-
6770
}

cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/util/ErrorPayloadMapper.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,11 @@
1616

1717
package org.cloudfoundry.reactor.util;
1818

19-
import java.util.function.Function;
20-
2119
import org.cloudfoundry.reactor.HttpClientResponseWithBody;
20+
import reactor.core.publisher.Flux;
2221

23-
import reactor.core.publisher.Mono;
22+
import java.util.function.Function;
2423

25-
public interface ErrorPayloadMapper extends Function<Mono<HttpClientResponseWithBody>, Mono<HttpClientResponseWithBody>> {
24+
public interface ErrorPayloadMapper extends Function<Flux<HttpClientResponseWithBody>, Flux<HttpClientResponseWithBody>> {
2625

2726
}

cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/util/JsonCodec.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import reactor.netty.ByteBufFlux;
3030
import reactor.netty.NettyOutbound;
3131
import reactor.netty.http.client.HttpClientRequest;
32-
import reactor.netty.http.client.HttpClientResponse;
3332

3433
import java.nio.charset.Charset;
3534
import java.util.function.BiFunction;

cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/util/Operator.java

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ public Operator headers(Consumer<HttpHeaders> headersTransformer) {
6868
return new Operator(this.context, this.httpClient.headers(headersTransformer));
6969
}
7070

71+
public Operator headersWhen(Function<HttpHeaders, Mono<? extends HttpHeaders>> headersWhenTransformer) {
72+
return new Operator(this.context, this.httpClient.headersWhen(headersWhenTransformer));
73+
}
74+
7175
public UriConfiguration patch() {
7276
return request(HttpMethod.PATCH);
7377
}
@@ -148,8 +152,9 @@ public ResponseReceiver addChannelHandler(Function<HttpClientResponse, ChannelHa
148152
}
149153

150154
public Mono<HttpClientResponse> get() {
151-
return this.responseReceiver.response((response, body) -> processResponse(response, body)
152-
.map(HttpClientResponseWithBody::getResponse))
155+
return this.responseReceiver.response((resp, body) -> Mono.just(HttpClientResponseWithBody.of(body, resp)))
156+
.transform(this::processResponse)
157+
.map(HttpClientResponseWithBody::getResponse)
153158
.singleOrEmpty();
154159
}
155160

@@ -163,9 +168,10 @@ public <T> Flux<T> parseBodyToFlux(Function<HttpClientResponseWithBody, Publishe
163168
attachChannelHandlers(response, connection);
164169
ByteBufFlux body = connection.inbound().receive();
165170

166-
return processResponse(response, body).flatMapMany(responseTransformer)
167-
.doFinally(signalType -> connection.dispose());
168-
});
171+
return Mono.just(HttpClientResponseWithBody.of(body, response));
172+
})
173+
.transform(this::processResponse)
174+
.flatMap(responseTransformer);
169175
}
170176

171177
public <T> Mono<T> parseBodyToMono(Function<HttpClientResponseWithBody, Publisher<T>> responseTransformer) {
@@ -186,28 +192,37 @@ private <T> Mono<T> deserialized(ByteBufFlux body, Class<T> bodyType) {
186192
return JsonCodec.decode(this.context.getConnectionContext().getObjectMapper(), body, bodyType);
187193
}
188194

189-
private Mono<HttpClientResponseWithBody> invalidateToken(Mono<HttpClientResponseWithBody> inbound) {
195+
private Flux<HttpClientResponseWithBody> invalidateToken(Flux<HttpClientResponseWithBody> inbound) {
190196
return inbound
191-
.flatMap(response -> {
197+
.doOnNext(response -> {
192198
if (isUnauthorized(response)) {
193199
this.context.getTokenProvider().ifPresent(tokenProvider -> tokenProvider.invalidate(this.context.getConnectionContext()));
194-
return inbound
195-
.transform(this::invalidateToken);
196-
} else {
197-
return Mono.just(response);
200+
throw new InvalidTokenException();
198201
}
199202
});
200203
}
201204

202-
private Mono<HttpClientResponseWithBody> processResponse(HttpClientResponse response, ByteBufFlux body) {
203-
HttpClientResponseWithBody responseWithBody = HttpClientResponseWithBody.of(body, response);
204-
205-
return Mono.just(responseWithBody)
205+
private Flux<HttpClientResponseWithBody> processResponse(Flux<HttpClientResponseWithBody> inbound) {
206+
return inbound
206207
.transform(this::invalidateToken)
208+
.retry(t -> t instanceof InvalidTokenException)
207209
.transform(this.context.getErrorPayloadMapper()
208210
.orElse(ErrorPayloadMappers.fallback()));
209211
}
210212

213+
private static final class InvalidTokenException extends RuntimeException {
214+
215+
private static final long serialVersionUID = -3114034909507471614L;
216+
217+
private InvalidTokenException() {
218+
}
219+
220+
@Override
221+
public synchronized Throwable fillInStackTrace() {
222+
return null;
223+
}
224+
}
225+
211226
}
212227

213228
public static class ResponseReceiverConstructor extends OperatorContextAware {

0 commit comments

Comments
 (0)