Skip to content

Commit 9b365d0

Browse files
authored
Add ability to manage rules with the ServiceBusReceiver (Azure#30271)
* Create rule manager
1 parent a3c1d35 commit 9b365d0

File tree

18 files changed

+1438
-3
lines changed

18 files changed

+1438
-3
lines changed

sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
## 7.13.0-beta.1 (Unreleased)
44

55
### Features Added
6+
- Added rule manager client to manage rules for ServiceBus subscription with listen claims. ([#27711](https://github.com/Azure/azure-sdk-for-java/issues/27711))
67

78
### Breaking Changes
89

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -649,6 +649,15 @@ public ServiceBusSessionProcessorClientBuilder sessionProcessor() {
649649
return new ServiceBusSessionProcessorClientBuilder();
650650
}
651651

652+
/**
653+
* A new instance of {@link ServiceBusRuleManagerBuilder} used to configure a Service Bus rule manager instance.
654+
*
655+
* @return A new instance of {@link ServiceBusRuleManagerBuilder}.
656+
*/
657+
public ServiceBusRuleManagerBuilder ruleManager() {
658+
return new ServiceBusRuleManagerBuilder();
659+
}
660+
652661
/**
653662
* Called when a child client is closed. Disposes of the shared connection if there are no more clients.
654663
*/
@@ -2021,6 +2030,77 @@ ServiceBusReceiverAsyncClient buildAsyncClient(boolean isAutoCompleteAllowed, bo
20212030
}
20222031
}
20232032

2033+
/**
2034+
* Builder for creating {@link ServiceBusRuleManagerAsyncClient} to manage Service Bus subscription rules.
2035+
*
2036+
* @see ServiceBusRuleManagerAsyncClient
2037+
*/
2038+
@ServiceClientBuilder(serviceClients = {ServiceBusRuleManagerAsyncClient.class})
2039+
public final class ServiceBusRuleManagerBuilder {
2040+
private String subscriptionName;
2041+
private String topicName;
2042+
2043+
private ServiceBusRuleManagerBuilder() {
2044+
}
2045+
2046+
/**
2047+
* Sets the name of the topic. <b>{@link #subscriptionName(String)} must also be set.</b>
2048+
*
2049+
* @param topicName Name of the topic.
2050+
*
2051+
* @return The modified {@link ServiceBusRuleManagerBuilder} object.
2052+
* @see #subscriptionName A subscription name should be set as well.
2053+
*/
2054+
public ServiceBusRuleManagerBuilder topicName(String topicName) {
2055+
this.topicName = topicName;
2056+
return this;
2057+
}
2058+
2059+
/**
2060+
* Sets the name of the subscription in the topic to manage its rules. <b>{@link #topicName(String)} must also be set.
2061+
* </b>
2062+
* @param subscriptionName Name of the subscription.
2063+
*
2064+
* @return The modified {@link ServiceBusRuleManagerBuilder} object.
2065+
* @see #topicName A topic name should be set as well.
2066+
*/
2067+
public ServiceBusRuleManagerBuilder subscriptionName(String subscriptionName) {
2068+
this.subscriptionName = subscriptionName;
2069+
return this;
2070+
}
2071+
2072+
/**
2073+
* Creates an <b>asynchronous</b> {@link ServiceBusRuleManagerAsyncClient} for managing rules of the specific subscription.
2074+
*
2075+
* @return A new {@link ServiceBusRuleManagerAsyncClient} that manages rules for specific subscription.
2076+
* @throws IllegalStateException if {@code topicName} or {@code subscriptionName} is null or empty. It is also
2077+
* thrown if the Service Bus {@link #connectionString(String) connectionString} contains an {@code EntityPath}
2078+
* that does not match one set in {@link #topicName(String) topicName}.
2079+
*/
2080+
public ServiceBusRuleManagerAsyncClient buildAsyncClient() {
2081+
final MessagingEntityType entityType = validateEntityPaths(connectionStringEntityName, topicName,
2082+
null);
2083+
final String entityPath = getEntityPath(entityType, null, topicName, subscriptionName,
2084+
null);
2085+
final ServiceBusConnectionProcessor connectionProcessor = getOrCreateConnectionProcessor(messageSerializer);
2086+
2087+
return new ServiceBusRuleManagerAsyncClient(entityPath, entityType, connectionProcessor,
2088+
ServiceBusClientBuilder.this::onClientClose);
2089+
}
2090+
2091+
/**
2092+
* Creates a <b>synchronous</b> {@link ServiceBusRuleManagerClient} for managing rules of the specific subscription.
2093+
*
2094+
* @return A new {@link ServiceBusRuleManagerClient} that manages rules for specific subscription.
2095+
* @throws IllegalStateException if {@code topicName} or {@code subscriptionName} is null or empty. It is also
2096+
* thrown if the Service Bus {@link #connectionString(String) connectionString} contains an {@code EntityPath}
2097+
* that does not match one set in {@link #topicName(String) topicName}.
2098+
*/
2099+
public ServiceBusRuleManagerClient buildClient() {
2100+
return new ServiceBusRuleManagerClient(buildAsyncClient(), MessageUtils.getTotalTimeout(retryOptions));
2101+
}
2102+
}
2103+
20242104
private void validateAndThrow(int prefetchCount) {
20252105
if (prefetchCount < 0) {
20262106
throw LOGGER.logExceptionAsError(new IllegalArgumentException(String.format(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.messaging.servicebus;
5+
6+
import com.azure.core.annotation.ServiceClient;
7+
import com.azure.core.util.logging.ClientLogger;
8+
import com.azure.messaging.servicebus.administration.ServiceBusAdministrationAsyncClient;
9+
import com.azure.messaging.servicebus.administration.models.CorrelationRuleFilter;
10+
import com.azure.messaging.servicebus.administration.models.CreateRuleOptions;
11+
import com.azure.messaging.servicebus.administration.models.RuleFilter;
12+
import com.azure.messaging.servicebus.administration.models.RuleProperties;
13+
import com.azure.messaging.servicebus.administration.models.SqlRuleAction;
14+
import com.azure.messaging.servicebus.administration.models.SqlRuleFilter;
15+
import com.azure.messaging.servicebus.implementation.MessagingEntityType;
16+
import com.azure.messaging.servicebus.implementation.ServiceBusConnectionProcessor;
17+
import com.azure.messaging.servicebus.implementation.ServiceBusManagementNode;
18+
import reactor.core.publisher.Flux;
19+
import reactor.core.publisher.Mono;
20+
21+
import static com.azure.core.util.FluxUtil.fluxError;
22+
import static com.azure.core.util.FluxUtil.monoError;
23+
import static com.azure.messaging.servicebus.implementation.Messages.INVALID_OPERATION_DISPOSED_RULE_MANAGER;
24+
25+
import java.util.Objects;
26+
import java.util.concurrent.atomic.AtomicBoolean;
27+
28+
/**
29+
* An <b>asynchronous</b> rule manager responsible for managing rules for a specific topic subscription. The rule manager
30+
* requires only Listen claims, whereas the {@link ServiceBusAdministrationAsyncClient} requires Manage claims.
31+
*
32+
* <p><strong>Create an instance of rule manager</strong></p>
33+
* <!-- src_embed com.azure.messaging.servicebus.servicebusrulemanagerasyncclient.instantiation -->
34+
* <pre>
35+
* &#47;&#47; The required parameters is connectionString, a way to authenticate with Service Bus using credentials.
36+
* &#47;&#47; The connectionString&#47;queueName must be set by the application. The 'connectionString' format is shown below.
37+
* &#47;&#47; &quot;Endpoint=&#123;fully-qualified-namespace&#125;;SharedAccessKeyName=&#123;policy-name&#125;;SharedAccessKey=&#123;key&#125;&quot;
38+
*
39+
* ServiceBusRuleManagerAsyncClient ruleManager = new ServiceBusClientBuilder&#40;&#41;
40+
* .connectionString&#40;connectionString&#41;
41+
* .ruleManager&#40;&#41;
42+
* .topicName&#40;topicName&#41;
43+
* .subscriptionName&#40;subscriptionName&#41;
44+
* .buildAsyncClient&#40;&#41;;
45+
* </pre>
46+
* <!-- end com.azure.messaging.servicebus.servicebusrulemanagerasyncclient.instantiation -->
47+
*
48+
* <p><strong>Create a rule to a Service Bus subscription</strong></p>
49+
* <!-- src_embed com.azure.messaging.servicebus.servicebusrulemanagerasyncclient.createRule -->
50+
* <pre>
51+
* RuleFilter trueRuleFilter = new TrueRuleFilter&#40;&#41;;
52+
* CreateRuleOptions options = new CreateRuleOptions&#40;trueRuleFilter&#41;;
53+
* ruleManager.createRule&#40;&quot;new-rule&quot;, options&#41;.subscribe&#40;
54+
* unused -&gt; &#123; &#125;,
55+
* err -&gt; System.err.println&#40;&quot;Error occurred when create a rule, err: &quot; + err&#41;,
56+
* &#40;&#41; -&gt; System.out.println&#40;&quot;Create complete.&quot;&#41;
57+
* &#41;;
58+
* </pre>
59+
* <!-- end com.azure.messaging.servicebus.servicebusrulemanagerasyncclient.createRule -->
60+
*
61+
* <p><strong>Fetch all rules.</strong></p>
62+
* <!-- src_embed com.azure.messaging.servicebus.servicebusrulemanagerasyncclient.getRules -->
63+
* <pre>
64+
* ruleManager.getRules&#40;&#41;.subscribe&#40;ruleProperties -&gt; System.out.println&#40;ruleProperties.getName&#40;&#41;&#41;&#41;;
65+
* </pre>
66+
* <!-- end com.azure.messaging.servicebus.servicebusrulemanagerasyncclient.getRules -->
67+
*
68+
* <p><strong>Delete a rule.</strong></p>
69+
* <!-- src_embed com.azure.messaging.servicebus.servicebusrulemanagerasyncclient.deleteRule -->
70+
* <pre>
71+
* ruleManager.deleteRule&#40;&quot;exist-rule&quot;&#41;.subscribe&#40;
72+
* unused -&gt; &#123; &#125;,
73+
* err -&gt; System.err.println&#40;&quot;Error occurred when delete rule, err: &quot; + err&#41;,
74+
* &#40;&#41; -&gt; System.out.println&#40;&quot;Delete complete.&quot;&#41;
75+
* &#41;;
76+
* </pre>
77+
* <!-- end com.azure.messaging.servicebus.servicebusrulemanagerasyncclient.deleteRule -->
78+
* @see ServiceBusClientBuilder
79+
*/
80+
@ServiceClient(builder = ServiceBusClientBuilder.class, isAsync = true)
81+
public class ServiceBusRuleManagerAsyncClient implements AutoCloseable {
82+
private static final ClientLogger LOGGER = new ClientLogger(ServiceBusRuleManagerAsyncClient.class);
83+
84+
private final String entityPath;
85+
private final MessagingEntityType entityType;
86+
private final ServiceBusConnectionProcessor connectionProcessor;
87+
private final Runnable onClientClose;
88+
private final AtomicBoolean isDisposed = new AtomicBoolean();
89+
90+
/**
91+
* Creates a rule manager that manages rules for a Service Bus subscription.
92+
*
93+
* @param entityPath The name of the topic and subscription.
94+
* @param entityType The type of the Service Bus resource.
95+
* @param connectionProcessor The AMQP connection to the Service Bus resource.
96+
* @param onClientClose Operation to run when the client completes.
97+
*/
98+
ServiceBusRuleManagerAsyncClient(String entityPath, MessagingEntityType entityType,
99+
ServiceBusConnectionProcessor connectionProcessor, Runnable onClientClose) {
100+
this.entityPath = Objects.requireNonNull(entityPath, "'entityPath' cannot be null.");
101+
this.entityType = Objects.requireNonNull(entityType, "'entityType' cannot be null.");
102+
this.connectionProcessor = Objects.requireNonNull(connectionProcessor,
103+
"'connectionProcessor' cannot be null.");
104+
this.onClientClose = onClientClose;
105+
}
106+
107+
/**
108+
* Gets the fully qualified namespace.
109+
*
110+
* @return The fully qualified namespace.
111+
*/
112+
public String getFullyQualifiedNamespace() {
113+
return connectionProcessor.getFullyQualifiedNamespace();
114+
}
115+
116+
/**
117+
* Gets the name of the Service Bus resource.
118+
*
119+
* @return The name of the Service Bus resource.
120+
*/
121+
public String getEntityPath() {
122+
return entityPath;
123+
}
124+
125+
/**
126+
* Creates a rule to the current subscription to filter the messages reaching from topic to the subscription.
127+
*
128+
* @param ruleName Name of rule.
129+
* @param options The options for the rule to add.
130+
* @return A Mono that completes when the rule is created.
131+
*
132+
* @throws NullPointerException if {@code options}, {@code ruleName} is null.
133+
* @throws IllegalStateException if client is disposed.
134+
* @throws IllegalArgumentException if {@code ruleName} is empty string, action of {@code options} is not null and not
135+
* instanceof {@link SqlRuleAction}, filter of {@code options} is not instanceof {@link SqlRuleFilter} or
136+
* {@link CorrelationRuleFilter}.
137+
* @throws ServiceBusException if filter matches {@code ruleName} is already created in subscription.
138+
*/
139+
public Mono<Void> createRule(String ruleName, CreateRuleOptions options) {
140+
if (Objects.isNull(options)) {
141+
return monoError(LOGGER, new NullPointerException("'options' cannot be null."));
142+
}
143+
return createRuleInternal(ruleName, options);
144+
}
145+
146+
/**
147+
* Creates a rule to the current subscription to filter the messages reaching from topic to the subscription.
148+
*
149+
* @param ruleName Name of rule.
150+
* @param filter The filter expression against which messages will be matched.
151+
* @return A Mono that completes when the rule is created.
152+
*
153+
* @throws NullPointerException if {@code filter}, {@code ruleName} is null.
154+
* @throws IllegalStateException if client is disposed.
155+
* @throws IllegalArgumentException if ruleName is empty string, {@code filter} is not instanceof {@link SqlRuleFilter} or
156+
* {@link CorrelationRuleFilter}.
157+
* @throws ServiceBusException if filter matches {@code ruleName} is already created in subscription.
158+
*/
159+
public Mono<Void> createRule(String ruleName, RuleFilter filter) {
160+
CreateRuleOptions options = new CreateRuleOptions(filter);
161+
return createRuleInternal(ruleName, options);
162+
}
163+
164+
/**
165+
* Fetches all rules associated with the topic and subscription.
166+
*
167+
* @return A list of rules associated with the topic and subscription.
168+
*
169+
* @throws IllegalStateException if client is disposed.
170+
* @throws UnsupportedOperationException if client cannot support filter with descriptor in message body.
171+
*/
172+
public Flux<RuleProperties> getRules() {
173+
if (isDisposed.get()) {
174+
return fluxError(LOGGER, new IllegalStateException(
175+
String.format(INVALID_OPERATION_DISPOSED_RULE_MANAGER, "getRules")
176+
));
177+
}
178+
179+
return connectionProcessor
180+
.flatMap(connection -> connection.getManagementNode(entityPath, entityType))
181+
.flatMapMany(ServiceBusManagementNode::getRules);
182+
}
183+
184+
/**
185+
* Removes the rule on the subscription identified by {@code ruleName}.
186+
*
187+
* @param ruleName Name of rule to delete.
188+
* @return A Mono that completes when the rule is deleted.
189+
*
190+
* @throws NullPointerException if {@code ruleName} is null.
191+
* @throws IllegalStateException if client is disposed.
192+
* @throws IllegalArgumentException if {@code ruleName} is empty string.
193+
* @throws ServiceBusException if cannot find filter matches {@code ruleName} in subscription.
194+
*/
195+
public Mono<Void> deleteRule(String ruleName) {
196+
if (isDisposed.get()) {
197+
return monoError(LOGGER, new IllegalStateException(
198+
String.format(INVALID_OPERATION_DISPOSED_RULE_MANAGER, "deleteRule")
199+
));
200+
}
201+
202+
if (ruleName == null) {
203+
return monoError(LOGGER, new NullPointerException("'ruleName' cannot be null."));
204+
} else if (ruleName.isEmpty()) {
205+
return monoError(LOGGER, new IllegalArgumentException("'ruleName' cannot be an empty string."));
206+
}
207+
208+
return connectionProcessor
209+
.flatMap(connection -> connection.getManagementNode(entityPath, entityType))
210+
.flatMap(managementNode -> managementNode.deleteRule(ruleName));
211+
}
212+
213+
/**
214+
* Disposes of the {@link ServiceBusRuleManagerAsyncClient}. If the client has a dedicated connection, the underlying
215+
* connection is also closed.
216+
*/
217+
@Override
218+
public void close() {
219+
if (isDisposed.getAndSet(true)) {
220+
return;
221+
}
222+
223+
onClientClose.run();
224+
}
225+
226+
private Mono<Void> createRuleInternal(String ruleName, CreateRuleOptions options) {
227+
if (isDisposed.get()) {
228+
return monoError(LOGGER, new IllegalStateException(
229+
String.format(INVALID_OPERATION_DISPOSED_RULE_MANAGER, "createRule")
230+
));
231+
}
232+
233+
if (ruleName == null) {
234+
return monoError(LOGGER, new NullPointerException("'ruleName' cannot be null."));
235+
} else if (ruleName.isEmpty()) {
236+
return monoError(LOGGER, new IllegalArgumentException("'ruleName' cannot be an empty string."));
237+
}
238+
239+
return connectionProcessor
240+
.flatMap(connection -> connection.getManagementNode(entityPath, entityType))
241+
.flatMap(managementNode -> managementNode.createRule(ruleName, options));
242+
}
243+
}

0 commit comments

Comments
 (0)