Skip to content

Commit 851796e

Browse files
authored
Adds AsyncCloseable (Azure#21991)
* Adding AsyncCloseable with codesnippet. * Implementing AsyncCloseable and deleting AsyncAutoCloseable. * Add CHANGELOG entry. * Removing azure-core as an explicit dependency. * Fix use in AmqpReceiveLinkProcessor.
1 parent b26abbb commit 851796e

File tree

15 files changed

+133
-39
lines changed

15 files changed

+133
-39
lines changed

sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AmqpChannelProcessor.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.azure.core.amqp.AmqpEndpointState;
77
import com.azure.core.amqp.AmqpRetryPolicy;
88
import com.azure.core.amqp.exception.AmqpException;
9+
import com.azure.core.util.AsyncCloseable;
910
import com.azure.core.util.logging.ClientLogger;
1011
import org.reactivestreams.Processor;
1112
import org.reactivestreams.Subscription;
@@ -298,8 +299,8 @@ private void setAndClearChannel() {
298299
}
299300

300301
private void close(T channel) {
301-
if (channel instanceof AsyncAutoCloseable) {
302-
((AsyncAutoCloseable) channel).closeAsync().subscribe();
302+
if (channel instanceof AsyncCloseable) {
303+
((AsyncCloseable) channel).closeAsync().subscribe();
303304
} else if (channel instanceof AutoCloseable) {
304305
try {
305306
((AutoCloseable) channel).close();

sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AsyncAutoCloseable.java

Lines changed: 0 additions & 19 deletions
This file was deleted.

sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import com.azure.core.amqp.AmqpRetryOptions;
99
import com.azure.core.amqp.exception.AmqpErrorCondition;
1010
import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler;
11+
import com.azure.core.util.AsyncCloseable;
1112
import com.azure.core.util.logging.ClientLogger;
1213
import org.apache.qpid.proton.Proton;
1314
import org.apache.qpid.proton.amqp.Symbol;
@@ -37,7 +38,7 @@
3738
/**
3839
* Handles receiving events from Event Hubs service and translating them to proton-j messages.
3940
*/
40-
public class ReactorReceiver implements AmqpReceiveLink, AsyncAutoCloseable, AutoCloseable {
41+
public class ReactorReceiver implements AmqpReceiveLink, AsyncCloseable, AutoCloseable {
4142
private final String entityPath;
4243
private final Receiver receiver;
4344
private final ReceiveLinkHandler handler;

sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import com.azure.core.amqp.exception.AmqpException;
1313
import com.azure.core.amqp.exception.OperationCancelledException;
1414
import com.azure.core.amqp.implementation.handler.SendLinkHandler;
15+
import com.azure.core.util.AsyncCloseable;
1516
import com.azure.core.util.logging.ClientLogger;
1617
import org.apache.qpid.proton.Proton;
1718
import org.apache.qpid.proton.amqp.Binary;
@@ -62,7 +63,7 @@
6263
/**
6364
* Handles scheduling and transmitting events through proton-j to Event Hubs service.
6465
*/
65-
class ReactorSender implements AmqpSendLink, AsyncAutoCloseable, AutoCloseable {
66+
class ReactorSender implements AmqpSendLink, AsyncCloseable, AutoCloseable {
6667
private final String entityPath;
6768
private final Sender sender;
6869
private final SendLinkHandler handler;

sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RequestResponseChannel.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import com.azure.core.amqp.exception.AmqpErrorContext;
1010
import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler;
1111
import com.azure.core.amqp.implementation.handler.SendLinkHandler;
12+
import com.azure.core.util.AsyncCloseable;
1213
import com.azure.core.util.logging.ClientLogger;
1314
import org.apache.qpid.proton.Proton;
1415
import org.apache.qpid.proton.amqp.UnsignedLong;
@@ -50,7 +51,7 @@
5051
* Represents a bidirectional link between the message broker and the client. Allows client to send a request to the
5152
* broker and receive the associated response.
5253
*/
53-
public class RequestResponseChannel implements AsyncAutoCloseable {
54+
public class RequestResponseChannel implements AsyncCloseable {
5455
private final ConcurrentSkipListMap<UnsignedLong, MonoSink<Message>> unconfirmedSends =
5556
new ConcurrentSkipListMap<>();
5657
private final AtomicBoolean hasError = new AtomicBoolean();

sdk/core/azure-core/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
## 1.17.0-beta.1 (Unreleased)
44

5+
### Features Added
6+
7+
- Added `AsyncCloseable`
58

69
## 1.16.0 (2021-05-07)
710

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.core.util;
5+
6+
import reactor.core.publisher.Mono;
7+
8+
/**
9+
* Interface for close operations that are asynchronous.
10+
*
11+
* <p><strong>Asynchronously closing a class</strong></p>
12+
* <p>In the snippet below, we have a long-lived {@code NetworkResource} class. There are some operations such
13+
* as closing {@literal I/O}. Instead of returning a sync {@code close()}, we use {@code closeAsync()} so users'
14+
* programs don't block waiting for this operation to complete.</p>
15+
*
16+
* {@codesnippet com.azure.core.util.AsyncCloseable.closeAsync}
17+
*/
18+
public interface AsyncCloseable {
19+
/**
20+
* Begins the close operation. If one is in progress, will return that existing close operation. If the close
21+
* operation is unsuccessful, the Mono completes with an error.
22+
*
23+
* @return A Mono representing the close operation. If the close operation is unsuccessful, the Mono completes with
24+
* an error.
25+
*/
26+
Mono<Void> closeAsync();
27+
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.core.util;
5+
6+
import reactor.core.publisher.Flux;
7+
import reactor.core.publisher.Mono;
8+
import reactor.core.publisher.Sinks;
9+
10+
import java.io.IOException;
11+
import java.nio.ByteBuffer;
12+
import java.nio.charset.StandardCharsets;
13+
import java.time.Duration;
14+
import java.util.concurrent.atomic.AtomicBoolean;
15+
import java.util.stream.IntStream;
16+
17+
/**
18+
* Code snippets for {@link AsyncCloseable}.
19+
*/
20+
public class AsyncCloseableJavaDocCodeSnippet {
21+
public void asyncResource() throws IOException {
22+
// BEGIN: com.azure.core.util.AsyncCloseable.closeAsync
23+
NetworkResource resource = new NetworkResource();
24+
resource.longRunningDownload("https://longdownload.com")
25+
.subscribe(
26+
byteBuffer -> System.out.println("Buffer received: " + byteBuffer),
27+
error -> System.err.printf("Error occurred while downloading: %s%n", error),
28+
() -> System.out.println("Completed download operation."));
29+
30+
System.out.println("Press enter to stop downloading.");
31+
System.in.read();
32+
33+
// We block here because it is the end of the main Program function. A real-life program may chain this
34+
// with some other close operations like save download/program state, etc.
35+
resource.closeAsync().block();
36+
// END: com.azure.core.util.AsyncCloseable.closeAsync
37+
}
38+
39+
/**
40+
* A long lived network resource.
41+
*/
42+
static class NetworkResource implements AsyncCloseable {
43+
private final AtomicBoolean isClosed = new AtomicBoolean();
44+
private final Sinks.Empty<Void> closeMono = Sinks.empty();
45+
46+
/**
47+
* Downloads a resource.
48+
*
49+
* @param url URL for the download.
50+
*
51+
* @return A stream of bytes.
52+
*/
53+
Flux<ByteBuffer> longRunningDownload(String url) {
54+
final byte[] bytes = url.getBytes(StandardCharsets.UTF_8);
55+
56+
// Does nothing real but it represents taking from this possibly infinite Flux until
57+
// the closeMono emits a signal.
58+
return Flux.fromStream(IntStream.range(0, bytes.length)
59+
.mapToObj(index -> ByteBuffer.wrap(bytes)))
60+
.takeUntilOther(closeMono.asMono());
61+
}
62+
63+
@Override
64+
public Mono<Void> closeAsync() {
65+
// If the close operation has started, then
66+
if (isClosed.getAndSet(true)) {
67+
return closeMono.asMono();
68+
}
69+
70+
return startAsyncClose().then(closeMono.asMono());
71+
}
72+
73+
private Mono<Void> startAsyncClose() {
74+
return Mono.delay(Duration.ofSeconds(10)).then()
75+
.doOnError(error -> closeMono.emitError(error, Sinks.EmitFailureHandler.FAIL_FAST))
76+
.doOnSuccess(unused -> closeMono.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST));
77+
}
78+
}
79+
}

sdk/eventhubs/azure-messaging-eventhubs/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
<dependency>
3838
<groupId>com.azure</groupId>
3939
<artifactId>azure-core</artifactId>
40-
<version>1.16.0</version> <!-- {x-version-update;com.azure:azure-core;dependency} -->
40+
<version>1.17.0-beta.1</version> <!-- {x-version-update;unreleased_com.azure:azure-core;dependency} -->
4141
</dependency>
4242
<dependency>
4343
<groupId>com.azure</groupId>

sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/AmqpReceiveLinkProcessor.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import com.azure.core.amqp.exception.AmqpException;
1010
import com.azure.core.amqp.exception.LinkErrorContext;
1111
import com.azure.core.amqp.implementation.AmqpReceiveLink;
12-
import com.azure.core.amqp.implementation.AsyncAutoCloseable;
12+
import com.azure.core.util.AsyncCloseable;
1313
import com.azure.core.util.logging.ClientLogger;
1414
import org.apache.qpid.proton.message.Message;
1515
import org.reactivestreams.Subscription;
@@ -599,8 +599,8 @@ private void disposeReceiver(AmqpReceiveLink link) {
599599
}
600600

601601
try {
602-
if (link instanceof AsyncAutoCloseable) {
603-
((AsyncAutoCloseable) link).closeAsync().subscribe();
602+
if (link instanceof AsyncCloseable) {
603+
((AsyncCloseable) link).closeAsync().subscribe();
604604
} else {
605605
link.dispose();
606606
}

0 commit comments

Comments
 (0)