Skip to content

Commit ad75273

Browse files
authored
Exposes management node in azure-core-amqp (Azure#22095)
* Update AmqpConnection to have a getManagementNode. * Adding AmqpManagementNode. * Update AmqpConnection, AmqpManagementNode, AmqpSession to use AsyncCloseable. * Adding AsyncCloseable to AmqpLink. * ClaimsBasedSecurityNode.java uses AsyncCloseable. * Implements CbsNode's closeAsync() and adds tests. * ReactorSession implements closeAsync() * ReactorConnection uses closeAsync(). Renames dispose() to closeAsync(). Fixes errors where some close operations were not subscribed to. * RequestResponseChannel. Remove close operation with message. * Adding DeliveryOutcome models and DeliveryState enum. * Add authorization scope to connection options. * Add MessageUtils to serialize and deserialize AmqpAnnotatedMessage * Update AmqpManagementNode to expose delivery outcomes because they can be associated with messages. * Adding MessageUtil support for converting DeliveryOutcome and Outcomes. * Fixing build breaks from ConnectionOptions. * Adding management channel class. * Adding management channel into ReactorConnection. * Update ExceptionUtil to return instead of throwing on unknown amqp error codes. * Moving ManagementChannel formatting. * Add javadocs to ReceivedDeliveryOutcome. * Add tests for ManagementChannel * Adding tests for message utils. * Fix javadoc on ModifiedDeliveryOutcome * ReactorConnection: Hook up dispose method. * EventHubs: Fixing instances of ConnectionOptions. * ServiceBus: Fix build errors using ConnectionOptions. * Adding MessageUtilsTests. * Updating CHANGELOG.
1 parent 366a95d commit ad75273

File tree

47 files changed

+2910
-135
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+2910
-135
lines changed

sdk/core/azure-core-amqp/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44

55
### New Features
66
- Exposing CbsAuthorizationType.
7+
- Exposing ManagementNode that can perform management and metadata operations on an AMQP message broker.
8+
- AmqpConnection, AmqpSession, AmqpSendLink, and AmqpReceiveLink extend from AsyncCloseable.
9+
- Delivery outcomes and delivery states are added.
710

811
### Bug Fixes
912
- Fixed a bug where connection and sessions would not be disposed when their endpoint closed.

sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/AmqpConnection.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package com.azure.core.amqp;
55

66
import com.azure.core.amqp.exception.AmqpException;
7+
import com.azure.core.util.AsyncCloseable;
78
import reactor.core.Disposable;
89
import reactor.core.publisher.Flux;
910
import reactor.core.publisher.Mono;
@@ -13,7 +14,7 @@
1314
/**
1415
* Represents a TCP connection between the client and a service that uses the AMQP protocol.
1516
*/
16-
public interface AmqpConnection extends Disposable {
17+
public interface AmqpConnection extends Disposable, AsyncCloseable {
1718
/**
1819
* Gets the connection identifier.
1920
*
@@ -53,6 +54,7 @@ public interface AmqpConnection extends Disposable {
5354
* Creates a new session with the given session name.
5455
*
5556
* @param sessionName Name of the session.
57+
*
5658
* @return The AMQP session that was created.
5759
*/
5860
Mono<AmqpSession> createSession(String sessionName);
@@ -61,6 +63,7 @@ public interface AmqpConnection extends Disposable {
6163
* Removes a session with the {@code sessionName} from the AMQP connection.
6264
*
6365
* @param sessionName Name of the session to remove.
66+
*
6467
* @return {@code true} if a session with the name was removed; {@code false} otherwise.
6568
*/
6669
boolean removeSession(String sessionName);
@@ -79,4 +82,26 @@ public interface AmqpConnection extends Disposable {
7982
* @return A stream of shutdown signals that occur in the AMQP endpoint.
8083
*/
8184
Flux<AmqpShutdownSignal> getShutdownSignals();
85+
86+
/**
87+
* Gets or creates the management node.
88+
*
89+
* @param entityPath Entity for which to get the management node of.
90+
*
91+
* @return A Mono that completes with the management node.
92+
*
93+
* @throws UnsupportedOperationException if there is no implementation of fetching a management node.
94+
*/
95+
default Mono<AmqpManagementNode> getManagementNode(String entityPath) {
96+
return Mono.error(new UnsupportedOperationException("This has not been implemented."));
97+
}
98+
99+
/**
100+
* Disposes of the AMQP connection.
101+
*
102+
* @return Mono that completes when the close operation is complete.
103+
*/
104+
default Mono<Void> closeAsync() {
105+
return Mono.fromRunnable(this::dispose);
106+
}
82107
}

sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/AmqpLink.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,16 @@
44
package com.azure.core.amqp;
55

66
import com.azure.core.amqp.exception.AmqpException;
7+
import com.azure.core.util.AsyncCloseable;
78
import reactor.core.Disposable;
89
import reactor.core.publisher.Flux;
10+
import reactor.core.publisher.Mono;
911

1012
/**
1113
* Represents a unidirectional AMQP link.
1214
*/
13-
public interface AmqpLink extends Disposable {
15+
public interface AmqpLink extends Disposable, AsyncCloseable {
16+
1417
/**
1518
* Gets the name of the link.
1619
*
@@ -39,4 +42,13 @@ public interface AmqpLink extends Disposable {
3942
* @return A stream of endpoint states for the AMQP link.
4043
*/
4144
Flux<AmqpEndpointState> getEndpointStates();
45+
46+
/**
47+
* Disposes of the AMQP link.
48+
*
49+
* @return A mono that completes when the link is disposed.
50+
*/
51+
default Mono<Void> closeAsync() {
52+
return Mono.fromRunnable(() -> dispose());
53+
}
4254
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.core.amqp;
5+
6+
import com.azure.core.amqp.models.AmqpAnnotatedMessage;
7+
import com.azure.core.amqp.models.DeliveryOutcome;
8+
import com.azure.core.util.AsyncCloseable;
9+
import reactor.core.publisher.Mono;
10+
11+
/**
12+
* An AMQP endpoint that allows users to perform management and metadata operations on it.
13+
*/
14+
public interface AmqpManagementNode extends AsyncCloseable {
15+
/**
16+
* Sends a message to the management node.
17+
*
18+
* @param message Message to send.
19+
*
20+
* @return Response from management node.
21+
*/
22+
Mono<AmqpAnnotatedMessage> send(AmqpAnnotatedMessage message);
23+
24+
/**
25+
* Sends a message to the management node and associates the {@code deliveryOutcome} with that message.
26+
*
27+
* @param message Message to send.
28+
* @param deliveryOutcome Delivery outcome to associate with the message.
29+
*
30+
* @return Response from management node.
31+
*/
32+
Mono<AmqpAnnotatedMessage> send(AmqpAnnotatedMessage message, DeliveryOutcome deliveryOutcome);
33+
}

sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/AmqpSession.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package com.azure.core.amqp;
55

66
import com.azure.core.amqp.exception.AmqpException;
7+
import com.azure.core.util.AsyncCloseable;
78
import reactor.core.Disposable;
89
import reactor.core.publisher.Flux;
910
import reactor.core.publisher.Mono;
@@ -13,7 +14,7 @@
1314
/**
1415
* An AMQP session representing bidirectional communication that supports multiple {@link AmqpLink AMQP links}.
1516
*/
16-
public interface AmqpSession extends Disposable {
17+
public interface AmqpSession extends Disposable, AsyncCloseable {
1718
/**
1819
* Gets the name for this AMQP session.
1920
*
@@ -91,4 +92,9 @@ public interface AmqpSession extends Disposable {
9192
* @return A completable mono.
9293
*/
9394
Mono<Void> rollbackTransaction(AmqpTransaction transaction);
95+
96+
@Override
97+
default Mono<Void> closeAsync() {
98+
return Mono.fromRunnable(() -> dispose());
99+
}
94100
}

sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/ClaimsBasedSecurityNode.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package com.azure.core.amqp;
55

66
import com.azure.core.credential.TokenCredential;
7+
import com.azure.core.util.AsyncCloseable;
78
import reactor.core.publisher.Mono;
89

910
import java.time.OffsetDateTime;
@@ -14,7 +15,7 @@
1415
* @see <a href="https://www.oasis-open.org/committees/download.php/62097/amqp-cbs-v1.0-wd05.doc">
1516
* AMPQ Claims-based Security v1.0</a>
1617
*/
17-
public interface ClaimsBasedSecurityNode extends AutoCloseable {
18+
public interface ClaimsBasedSecurityNode extends AutoCloseable, AsyncCloseable {
1819
/**
1920
* Authorizes the caller with the CBS node to access resources for the {@code audience}.
2021
*
@@ -31,4 +32,9 @@ public interface ClaimsBasedSecurityNode extends AutoCloseable {
3132
*/
3233
@Override
3334
void close();
35+
36+
@Override
37+
default Mono<Void> closeAsync() {
38+
return Mono.fromRunnable(() -> close());
39+
}
3440
}

sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ClaimsBasedSecurityChannel.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import com.azure.core.amqp.models.CbsAuthorizationType;
1111
import com.azure.core.credential.TokenCredential;
1212
import com.azure.core.credential.TokenRequestContext;
13-
import com.azure.core.util.logging.ClientLogger;
1413
import org.apache.qpid.proton.Proton;
1514
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
1615
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
@@ -35,7 +34,6 @@ public class ClaimsBasedSecurityChannel implements ClaimsBasedSecurityNode {
3534
private static final String PUT_TOKEN_OPERATION = "operation";
3635
private static final String PUT_TOKEN_OPERATION_VALUE = "put-token";
3736

38-
private final ClientLogger logger = new ClientLogger(ClaimsBasedSecurityChannel.class);
3937
private final TokenCredential credential;
4038
private final Mono<RequestResponseChannel> cbsChannelMono;
4139
private final CbsAuthorizationType authorizationType;
@@ -87,9 +85,11 @@ public Mono<OffsetDateTime> authorize(String tokenAudience, String scopes) {
8785

8886
@Override
8987
public void close() {
90-
final RequestResponseChannel channel = cbsChannelMono.block(retryOptions.getTryTimeout());
91-
if (channel != null) {
92-
channel.closeAsync().block();
93-
}
88+
closeAsync().block(retryOptions.getTryTimeout());
89+
}
90+
91+
@Override
92+
public Mono<Void> closeAsync() {
93+
return cbsChannelMono.flatMap(channel -> channel.closeAsync());
9494
}
9595
}

sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ConnectionOptions.java

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,14 @@
2323
*/
2424
@Immutable
2525
public class ConnectionOptions {
26-
// These name version keys are used in our properties files to specify client product and version information.
27-
static final String NAME_KEY = "name";
28-
static final String VERSION_KEY = "version";
29-
static final String UNKNOWN = "UNKNOWN";
30-
3126
private final TokenCredential tokenCredential;
3227
private final AmqpTransportType transport;
3328
private final AmqpRetryOptions retryOptions;
3429
private final ProxyOptions proxyOptions;
3530
private final Scheduler scheduler;
3631
private final String fullyQualifiedNamespace;
3732
private final CbsAuthorizationType authorizationType;
33+
private final String authorizationScope;
3834
private final ClientOptions clientOptions;
3935
private final String product;
4036
private final String clientVersion;
@@ -62,10 +58,10 @@ public class ConnectionOptions {
6258
* {@code proxyOptions} or {@code verifyMode} is null.
6359
*/
6460
public ConnectionOptions(String fullyQualifiedNamespace, TokenCredential tokenCredential,
65-
CbsAuthorizationType authorizationType, AmqpTransportType transport, AmqpRetryOptions retryOptions,
66-
ProxyOptions proxyOptions, Scheduler scheduler, ClientOptions clientOptions,
61+
CbsAuthorizationType authorizationType, String authorizationScope, AmqpTransportType transport,
62+
AmqpRetryOptions retryOptions, ProxyOptions proxyOptions, Scheduler scheduler, ClientOptions clientOptions,
6763
SslDomain.VerifyMode verifyMode, String product, String clientVersion) {
68-
this(fullyQualifiedNamespace, tokenCredential, authorizationType, transport, retryOptions,
64+
this(fullyQualifiedNamespace, tokenCredential, authorizationType, authorizationScope, transport, retryOptions,
6965
proxyOptions, scheduler, clientOptions, verifyMode, product, clientVersion, fullyQualifiedNamespace,
7066
getPort(transport));
7167
}
@@ -94,14 +90,15 @@ public ConnectionOptions(String fullyQualifiedNamespace, TokenCredential tokenCr
9490
* {@code clientOptions}, {@code hostname}, or {@code verifyMode} is null.
9591
*/
9692
public ConnectionOptions(String fullyQualifiedNamespace, TokenCredential tokenCredential,
97-
CbsAuthorizationType authorizationType, AmqpTransportType transport, AmqpRetryOptions retryOptions,
98-
ProxyOptions proxyOptions, Scheduler scheduler, ClientOptions clientOptions,
93+
CbsAuthorizationType authorizationType, String authorizationScope, AmqpTransportType transport,
94+
AmqpRetryOptions retryOptions, ProxyOptions proxyOptions, Scheduler scheduler, ClientOptions clientOptions,
9995
SslDomain.VerifyMode verifyMode, String product, String clientVersion, String hostname, int port) {
10096

10197
this.fullyQualifiedNamespace = Objects.requireNonNull(fullyQualifiedNamespace,
10298
"'fullyQualifiedNamespace' is required.");
10399
this.tokenCredential = Objects.requireNonNull(tokenCredential, "'tokenCredential' is required.");
104100
this.authorizationType = Objects.requireNonNull(authorizationType, "'authorizationType' is required.");
101+
this.authorizationScope = Objects.requireNonNull(authorizationScope, "'authorizationScope' is required.");
105102
this.transport = Objects.requireNonNull(transport, "'transport' is required.");
106103
this.retryOptions = Objects.requireNonNull(retryOptions, "'retryOptions' is required.");
107104
this.scheduler = Objects.requireNonNull(scheduler, "'scheduler' is required.");
@@ -115,6 +112,15 @@ public ConnectionOptions(String fullyQualifiedNamespace, TokenCredential tokenCr
115112
this.clientVersion = Objects.requireNonNull(clientVersion, "'clientVersion' cannot be null.");
116113
}
117114

115+
/**
116+
* Gets the scope to use when authorizing.
117+
*
118+
* @return The scope to use when authorizing.
119+
*/
120+
public String getAuthorizationScope() {
121+
return authorizationScope;
122+
}
123+
118124
/**
119125
* Gets the authorisation type for the CBS node.
120126
*

sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ExceptionUtil.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import com.azure.core.amqp.exception.AmqpException;
99
import com.azure.core.amqp.exception.AmqpResponseCode;
1010

11-
import java.util.Locale;
1211
import java.util.regex.Matcher;
1312
import java.util.regex.Pattern;
1413

@@ -78,8 +77,9 @@ public static Exception toException(String errorCondition, String description, A
7877
case NOT_FOUND:
7978
return distinguishNotFound(description, errorContext);
8079
default:
81-
throw new IllegalArgumentException(String.format(Locale.ROOT, "This condition '%s' is not known.",
82-
condition));
80+
return new AmqpException(false, condition, String.format("errorCondition[%s]. description[%s] "
81+
+ "Condition could not be mapped to a transient condition.",
82+
errorCondition, description), errorContext);
8383
}
8484

8585
return new AmqpException(isTransient, condition, description, errorContext);

0 commit comments

Comments
 (0)