Skip to content

Commit e04ece0

Browse files
authored
Stream response - transfer to byte channels. (Azure#30026)
* construct StreamResponse with source response. * testing. * more testing.
1 parent 410fb9c commit e04ece0

File tree

12 files changed

+570
-31
lines changed

12 files changed

+570
-31
lines changed

sdk/core/azure-core-test/src/main/java/com/azure/core/test/http/MockHttpClient.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import com.azure.core.test.implementation.entities.HttpBinFormDataJSON.Form;
1313
import com.azure.core.test.implementation.entities.HttpBinFormDataJSON.PizzaSize;
1414
import com.azure.core.test.implementation.entities.HttpBinJSON;
15+
import com.azure.core.test.utils.MessageDigestUtils;
1516
import com.azure.core.util.Base64Url;
1617
import com.azure.core.util.DateTimeRfc1123;
1718
import com.azure.core.util.FluxUtil;
@@ -27,6 +28,7 @@
2728
import java.util.HashMap;
2829
import java.util.List;
2930
import java.util.Map;
31+
import java.util.Random;
3032

3133
/**
3234
* This HttpClient attempts to mimic the behavior of http://httpbin.org without ever making a network call.
@@ -39,6 +41,7 @@ public class MockHttpClient extends NoOpHttpClient {
3941
.set("X-Processed-Time", "1.0")
4042
.set("Access-Control-Allow-Credentials", "true")
4143
.set("Content-Type", "application/json");
44+
private static final Random RANDOM = new Random();
4245

4346
@Override
4447
public Mono<HttpResponse> send(HttpRequest request) {
@@ -66,7 +69,15 @@ public Mono<HttpResponse> send(HttpRequest request) {
6669
HttpHeaders newHeaders = new HttpHeaders(RESPONSE_HEADERS)
6770
.set("Content-Type", ContentType.APPLICATION_OCTET_STREAM)
6871
.set("Content-Length", Integer.toString(byteCount));
69-
response = new MockHttpResponse(request, 200, newHeaders, byteCount == 0 ? null : new byte[byteCount]);
72+
byte[] content;
73+
if (byteCount > 0) {
74+
content = new byte[byteCount];
75+
RANDOM.nextBytes(content);
76+
newHeaders = newHeaders.set("ETag", MessageDigestUtils.md5(content));
77+
} else {
78+
content = null;
79+
}
80+
response = new MockHttpResponse(request, 200, newHeaders, content);
7081
} else if (requestPathLower.startsWith("/base64urlbytes/")) {
7182
final String byteCountString = requestPath.substring("/base64urlbytes/".length());
7283
final int byteCount = Integer.parseInt(byteCountString);

sdk/core/azure-core-test/src/main/java/com/azure/core/test/implementation/RestProxyTests.java

Lines changed: 124 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,17 +41,30 @@
4141
import com.azure.core.test.implementation.entities.HttpBinFormDataJSON.PizzaSize;
4242
import com.azure.core.test.implementation.entities.HttpBinHeaders;
4343
import com.azure.core.test.implementation.entities.HttpBinJSON;
44+
import com.azure.core.test.utils.MessageDigestUtils;
4445
import com.azure.core.util.BinaryData;
46+
import com.azure.core.util.Context;
4547
import com.azure.core.util.FluxUtil;
48+
import com.azure.core.util.IOUtils;
4649
import org.junit.jupiter.api.Assertions;
50+
import org.junit.jupiter.api.Named;
4751
import org.junit.jupiter.api.Test;
52+
import org.junit.jupiter.params.ParameterizedTest;
53+
import org.junit.jupiter.params.provider.Arguments;
54+
import org.junit.jupiter.params.provider.MethodSource;
55+
import reactor.core.Exceptions;
4856
import reactor.core.publisher.Flux;
4957
import reactor.core.publisher.Mono;
58+
import reactor.core.scheduler.Schedulers;
5059
import reactor.test.StepVerifier;
60+
import reactor.util.function.Tuples;
5161

5262
import java.io.ByteArrayInputStream;
63+
import java.io.ByteArrayOutputStream;
64+
import java.io.IOException;
5365
import java.nio.ByteBuffer;
5466
import java.nio.channels.AsynchronousFileChannel;
67+
import java.nio.channels.Channels;
5568
import java.nio.charset.StandardCharsets;
5669
import java.nio.file.Files;
5770
import java.nio.file.Path;
@@ -62,6 +75,7 @@
6275
import java.util.LinkedHashMap;
6376
import java.util.List;
6477
import java.util.Map;
78+
import java.util.stream.Stream;
6579

6680
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
6781
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -74,6 +88,7 @@
7488
import static org.junit.jupiter.api.Assertions.fail;
7589

7690
public abstract class RestProxyTests {
91+
private static final String HTTP_REST_PROXY_SYNC_PROXY_ENABLE = "com.azure.core.http.restproxy.syncproxy.enable";
7792

7893
/**
7994
* Get the HTTP client that will be used for each test. This will be called once per test.
@@ -1619,20 +1634,125 @@ public void service21GetBytes100() {
16191634
@Host("http://localhost")
16201635
@ServiceInterface(name = "DownloadService")
16211636
interface DownloadService {
1637+
16221638
@Get("/bytes/30720")
1623-
StreamResponse getBytes();
1639+
StreamResponse getBytes(Context context);
1640+
1641+
@Get("/bytes/30720")
1642+
Mono<StreamResponse> getBytesAsync(Context context);
16241643

16251644
@Get("/bytes/30720")
16261645
Flux<ByteBuffer> getBytesFlux();
16271646
}
16281647

1629-
@Test
1630-
public void simpleDownloadTest() {
1631-
StepVerifier.create(Flux.using(() -> createService(DownloadService.class).getBytes(),
1648+
@ParameterizedTest
1649+
@MethodSource("downloadTestArgumentProvider")
1650+
public void simpleDownloadTest(Context context) {
1651+
StepVerifier.create(Flux.using(() -> createService(DownloadService.class).getBytes(context),
16321652
response -> response.getValue().map(ByteBuffer::remaining).reduce(0, Integer::sum),
16331653
StreamResponse::close))
16341654
.assertNext(count -> assertEquals(30720, count))
16351655
.verifyComplete();
1656+
1657+
StepVerifier.create(Flux.using(() -> createService(DownloadService.class).getBytes(context),
1658+
response -> Mono.zip(MessageDigestUtils.md5(response.getValue()), Mono.just(response.getHeaders().getValue("ETag"))),
1659+
StreamResponse::close))
1660+
.assertNext(hashTuple -> assertEquals(hashTuple.getT2(), hashTuple.getT1()))
1661+
.verifyComplete();
1662+
}
1663+
1664+
@ParameterizedTest
1665+
@MethodSource("downloadTestArgumentProvider")
1666+
public void simpleDownloadTestAsync(Context context) {
1667+
StepVerifier.create(createService(DownloadService.class).getBytesAsync(context)
1668+
.flatMap(response -> response.getValue().map(ByteBuffer::remaining)
1669+
.reduce(0, Integer::sum)
1670+
.doFinally(ignore -> response.close())))
1671+
.assertNext(count -> assertEquals(30720, count))
1672+
.verifyComplete();
1673+
1674+
StepVerifier.create(createService(DownloadService.class).getBytesAsync(context)
1675+
.flatMap(response -> Mono.zip(MessageDigestUtils.md5(response.getValue()), Mono.just(response.getHeaders().getValue("ETag")))
1676+
.doFinally(ignore -> response.close())))
1677+
.assertNext(hashTuple -> assertEquals(hashTuple.getT2(), hashTuple.getT1()))
1678+
.verifyComplete();
1679+
}
1680+
1681+
@ParameterizedTest
1682+
@MethodSource("downloadTestArgumentProvider")
1683+
public void streamResponseCanTransferBody(Context context) throws IOException {
1684+
try (StreamResponse streamResponse = createService(DownloadService.class).getBytes(context)) {
1685+
ByteArrayOutputStream bos = new ByteArrayOutputStream();
1686+
streamResponse.transferValueTo(Channels.newChannel(bos));
1687+
assertEquals(streamResponse.getHeaders().getValue("ETag"), MessageDigestUtils.md5(bos.toByteArray()));
1688+
}
1689+
1690+
Path tempFile = Files.createTempFile("streamResponseCanTransferBody", null);
1691+
tempFile.toFile().deleteOnExit();
1692+
try (StreamResponse streamResponse = createService(DownloadService.class).getBytes(context)) {
1693+
StepVerifier.create(Mono.using(
1694+
() -> IOUtils.toAsynchronousByteChannel(AsynchronousFileChannel.open(tempFile, StandardOpenOption.WRITE), 0),
1695+
streamResponse::transferValueToAsync,
1696+
channel -> {
1697+
try {
1698+
channel.close();
1699+
} catch (IOException e) {
1700+
throw Exceptions.propagate(e);
1701+
}
1702+
}).then(Mono.fromCallable(() -> MessageDigestUtils.md5(Files.readAllBytes(tempFile)))))
1703+
.assertNext(hash -> assertEquals(streamResponse.getHeaders().getValue("ETag"), hash))
1704+
.verifyComplete();
1705+
}
1706+
}
1707+
1708+
@ParameterizedTest
1709+
@MethodSource("downloadTestArgumentProvider")
1710+
public void streamResponseCanTransferBodyAsync(Context context) throws IOException {
1711+
StepVerifier.create(createService(DownloadService.class).getBytesAsync(context)
1712+
.publishOn(Schedulers.boundedElastic())
1713+
.map(streamResponse -> {
1714+
ByteArrayOutputStream bos = new ByteArrayOutputStream();
1715+
try {
1716+
streamResponse.transferValueTo(Channels.newChannel(bos));
1717+
} catch (IOException e) {
1718+
throw Exceptions.propagate(e);
1719+
} finally {
1720+
streamResponse.close();
1721+
}
1722+
return Tuples.of(streamResponse.getHeaders().getValue("Etag"), MessageDigestUtils.md5(bos.toByteArray()));
1723+
}))
1724+
.assertNext(hashTuple -> assertEquals(hashTuple.getT1(), hashTuple.getT2()))
1725+
.verifyComplete();
1726+
1727+
Path tempFile = Files.createTempFile("streamResponseCanTransferBody", null);
1728+
tempFile.toFile().deleteOnExit();
1729+
StepVerifier.create(createService(DownloadService.class).getBytesAsync(context)
1730+
.flatMap(streamResponse -> Mono.using(
1731+
() -> IOUtils.toAsynchronousByteChannel(AsynchronousFileChannel.open(tempFile, StandardOpenOption.WRITE), 0),
1732+
streamResponse::transferValueToAsync,
1733+
channel -> {
1734+
try {
1735+
channel.close();
1736+
} catch (IOException e) {
1737+
throw Exceptions.propagate(e);
1738+
}
1739+
}).doFinally(ignored -> streamResponse.close())
1740+
.then(Mono.just(streamResponse.getHeaders().getValue("ETag")))))
1741+
.assertNext(hash -> {
1742+
try {
1743+
assertEquals(hash, MessageDigestUtils.md5(Files.readAllBytes(tempFile)));
1744+
} catch (IOException e) {
1745+
Exceptions.propagate(e);
1746+
}
1747+
})
1748+
.verifyComplete();
1749+
}
1750+
1751+
public static Stream<Arguments> downloadTestArgumentProvider() {
1752+
return Stream.of(
1753+
Arguments.of(Named.named("default", Context.NONE)),
1754+
Arguments.of(Named.named("sync proxy enabled", Context.NONE
1755+
.addData(HTTP_REST_PROXY_SYNC_PROXY_ENABLE, true))));
16361756
}
16371757

16381758
@Test
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.core.test.utils;
5+
6+
import reactor.core.Exceptions;
7+
import reactor.core.publisher.Flux;
8+
import reactor.core.publisher.Mono;
9+
10+
import java.nio.ByteBuffer;
11+
import java.security.MessageDigest;
12+
import java.security.NoSuchAlgorithmException;
13+
import java.util.Base64;
14+
15+
/**
16+
Utilities to compute hashes in tests.
17+
*/
18+
public final class MessageDigestUtils {
19+
20+
private MessageDigestUtils() {
21+
}
22+
23+
/**
24+
* Returns base64 encoded MD5 of bytes.
25+
* @param bytes bytes.
26+
* @return base64 encoded MD5 of bytes.
27+
* @throws RuntimeException if md5 is not found.
28+
*/
29+
public static String md5(byte[] bytes) {
30+
try {
31+
MessageDigest digest = MessageDigest.getInstance("MD5");
32+
digest.update(bytes);
33+
return Base64.getEncoder().encodeToString(digest.digest());
34+
} catch (NoSuchAlgorithmException e) {
35+
throw new RuntimeException(e);
36+
}
37+
}
38+
39+
/**
40+
* Returns base64 encoded MD5 of flux of byte buffers.
41+
* @param bufferFlux flux of byte buffers.
42+
* @return Mono that emits base64 encoded MD5 of bytes.
43+
*/
44+
public static Mono<String> md5(Flux<ByteBuffer> bufferFlux) {
45+
return bufferFlux.reduceWith(() -> {
46+
try {
47+
return MessageDigest.getInstance("MD5");
48+
} catch (NoSuchAlgorithmException e) {
49+
throw Exceptions.propagate(e);
50+
}
51+
}, (digest, buffer) -> {
52+
digest.update(buffer);
53+
return digest;
54+
}).map(digest -> Base64.getEncoder().encodeToString(digest.digest()));
55+
}
56+
}

sdk/core/azure-core-test/src/test/java/com/azure/core/test/RestProxyTestsWireMockServer.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.azure.core.http.ContentType;
77
import com.azure.core.test.implementation.entities.HttpBinFormDataJSON;
88
import com.azure.core.test.implementation.entities.HttpBinJSON;
9+
import com.azure.core.test.utils.MessageDigestUtils;
910
import com.azure.core.util.DateTimeRfc1123;
1011
import com.azure.core.util.serializer.JacksonAdapter;
1112
import com.azure.core.util.serializer.SerializerEncoding;
@@ -23,13 +24,13 @@
2324

2425
import java.io.IOException;
2526
import java.net.URL;
26-
import java.security.SecureRandom;
2727
import java.time.OffsetDateTime;
2828
import java.time.ZoneOffset;
2929
import java.util.ArrayList;
3030
import java.util.HashMap;
3131
import java.util.List;
3232
import java.util.Map;
33+
import java.util.Random;
3334
import java.util.stream.Collectors;
3435

3536
import static com.github.tomakehurst.wiremock.client.WireMock.delete;
@@ -42,6 +43,7 @@
4243

4344
public final class RestProxyTestsWireMockServer {
4445
private static final JacksonAdapter JACKSON_ADAPTER = new JacksonAdapter();
46+
private static final Random RANDOM = new Random();
4547

4648
public static WireMockServer getRestProxyTestsServer() {
4749
WireMockServer server = new WireMockServer(WireMockConfiguration.options()
@@ -106,7 +108,9 @@ private static ResponseDefinition createBytesResponse(String urlPath) {
106108
rawHeaders.put("Content-Length", String.valueOf(bodySize));
107109

108110
byte[] body = new byte[bodySize];
109-
new SecureRandom().nextBytes(body);
111+
RANDOM.nextBytes(body);
112+
113+
rawHeaders.put("ETag", MessageDigestUtils.md5(body));
110114

111115
return new ResponseDefinitionBuilder().withStatus(200)
112116
.withHeaders(toWireMockHeaders(rawHeaders))

sdk/core/azure-core/src/main/java/com/azure/core/http/rest/RestProxy.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,11 +81,10 @@ public Object invoke(Object proxy, final Method method, Object[] args) {
8181
RequestOptions options = methodParser.setRequestOptions(args);
8282
Context context = methodParser.setContext(args);
8383
boolean isReactive = methodParser.isReactive();
84-
boolean isStreamResponseType = methodParser.isStreamResponse();
8584
boolean syncRestProxyEnabled = (boolean) context.getData(HTTP_REST_PROXY_SYNC_PROXY_ENABLE).orElse(false);
8685

8786

88-
if (isReactive || isStreamResponseType || !syncRestProxyEnabled) {
87+
if (isReactive || !syncRestProxyEnabled) {
8988
return asyncRestProxy.invoke(proxy, method, options, options != null ? options.getErrorOptions() : null,
9089
options != null ? options.getRequestCallback() : null, methodParser, isReactive, args);
9190
} else {

0 commit comments

Comments
 (0)