Skip to content

Commit f841097

Browse files
[azservicebus] Add support for DefaultRule when creating a Subscription (Azure#20888)
Add `DefaultRule` field to `SubscriptionProperties`, allowing for a rule to take immediate effect when the subscription is created. Contributed by @StrawbrryFlurry
1 parent 8a375ae commit f841097

File tree

6 files changed

+322
-111
lines changed

6 files changed

+322
-111
lines changed

sdk/messaging/azservicebus/CHANGELOG.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
# Release History
22

3-
## 1.3.1 (2023-06-06)
3+
## 1.4.0 (2023-06-06)
44

55
### Features Added
66

7-
- TBD, pending customer PR.
7+
- `admin.SubscriptionProperties` now allow for a `DefaultRule` to be set. This allows Subscriptions to be created with an immediate filter/action.
8+
Contributed by @StrawbrryFlurry. (PR#20888)
89

910
## 1.3.0 (2023-05-09)
1011

sdk/messaging/azservicebus/admin/admin_client_rules.go

Lines changed: 120 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ func (ac *Client) GetRule(ctx context.Context, topicName string, subscriptionNam
208208
return mapATOMError[GetRuleResponse](err)
209209
}
210210

211-
props, err := ac.newRuleProperties(ruleEnv)
211+
props, err := ac.newRulePropertiesFromEnvelope(ruleEnv)
212212

213213
if err != nil {
214214
return nil, err
@@ -240,7 +240,7 @@ func (ac *Client) NewListRulesPager(topicName string, subscriptionName string, o
240240
}
241241

242242
ep := &entityPager[atom.RuleFeed, atom.RuleEnvelope, RuleProperties]{
243-
convertFn: ac.newRuleProperties,
243+
convertFn: ac.newRulePropertiesFromEnvelope,
244244
baseFragment: fmt.Sprintf("/%s/Subscriptions/%s/Rules/", topicName, subscriptionName),
245245
maxPageSize: pageSize,
246246
em: ac.em,
@@ -304,107 +304,25 @@ func (ac *Client) DeleteRule(ctx context.Context, topicName string, subscription
304304
}
305305

306306
func (ac *Client) createOrUpdateRule(ctx context.Context, topicName string, subscriptionName string, putProps RuleProperties, creating bool) (*RuleProperties, *http.Response, error) {
307-
ruleDesc := atom.RuleDescription{}
308-
309-
theirFilter := putProps.Filter
310-
311-
if theirFilter != nil {
312-
switch actualFilter := theirFilter.(type) {
313-
case *FalseFilter:
314-
ruleDesc.Filter = &atom.FilterDescription{
315-
Type: "FalseFilter",
316-
SQLExpression: to.Ptr("1=0"),
317-
}
318-
case *TrueFilter:
319-
ruleDesc.Filter = &atom.FilterDescription{
320-
Type: "TrueFilter",
321-
SQLExpression: to.Ptr("1=1"),
322-
}
323-
case *SQLFilter:
324-
params, err := publicSQLParametersToInternal(actualFilter.Parameters)
325-
326-
if err != nil {
327-
return nil, nil, err
328-
}
329-
330-
ruleDesc.Filter = &atom.FilterDescription{
331-
Type: "SqlFilter",
332-
SQLExpression: &actualFilter.Expression,
333-
Parameters: params,
334-
}
335-
case *CorrelationFilter:
336-
appProps, err := publicSQLParametersToInternal(actualFilter.ApplicationProperties)
337-
338-
if err != nil {
339-
return nil, nil, err
340-
}
341-
342-
ruleDesc.Filter = &atom.FilterDescription{
343-
Type: "CorrelationFilter",
344-
CorrelationFilter: atom.CorrelationFilter{
345-
ContentType: actualFilter.ContentType,
346-
CorrelationID: actualFilter.CorrelationID,
347-
MessageID: actualFilter.MessageID,
348-
ReplyTo: actualFilter.ReplyTo,
349-
ReplyToSessionID: actualFilter.ReplyToSessionID,
350-
SessionID: actualFilter.SessionID,
351-
Label: actualFilter.Subject,
352-
To: actualFilter.To,
353-
Properties: appProps,
354-
},
355-
}
356-
case *UnknownRuleFilter:
357-
fd, err := convertUnknownRuleFilterToFilterDescription(actualFilter)
358-
359-
if err != nil {
360-
return nil, nil, err
361-
}
362-
363-
ruleDesc.Filter = fd
364-
default:
365-
return nil, nil, fmt.Errorf("invalid type ('%T') for Rule.Filter", theirFilter)
366-
}
367-
} else {
368-
ruleDesc.Filter = &atom.FilterDescription{
369-
Type: "TrueFilter",
370-
SQLExpression: to.Ptr("1=1"),
371-
}
307+
ruleDesc := atom.RuleDescription{
308+
Name: makeRuleNameForProperties(&putProps),
372309
}
373310

374-
theirAction := putProps.Action
375-
376-
if theirAction != nil {
377-
switch actualAction := theirAction.(type) {
378-
case *SQLAction:
379-
params, err := publicSQLParametersToInternal(actualAction.Parameters)
311+
filter, err := convertRuleFilterToFilterDescription(&putProps.Filter)
380312

381-
if err != nil {
382-
return nil, nil, err
383-
}
313+
if err != nil {
314+
return nil, nil, err
315+
}
384316

385-
ruleDesc.Action = &atom.ActionDescription{
386-
Type: "SqlRuleAction",
387-
SQLExpression: actualAction.Expression,
388-
Parameters: params,
389-
}
390-
case *UnknownRuleAction:
391-
ad, err := convertUnknownRuleActionToActionDescription(actualAction)
317+
ruleDesc.Filter = filter
392318

393-
if err != nil {
394-
return nil, nil, err
395-
}
319+
action, err := convertRuleActionToActionDescription(&putProps.Action)
396320

397-
ruleDesc.Action = ad
398-
default:
399-
return nil, nil, fmt.Errorf("invalid type ('%T') for Rule.Action", theirAction)
400-
}
321+
if err != nil {
322+
return nil, nil, err
401323
}
402324

403-
ruleDesc.Name = "$Default"
404-
405-
if putProps.Name != "" {
406-
ruleDesc.Name = putProps.Name
407-
}
325+
ruleDesc.Action = action
408326

409327
if !creating {
410328
ctx = runtime.WithHTTPHeader(ctx, http.Header{
@@ -422,16 +340,120 @@ func (ac *Client) createOrUpdateRule(ctx context.Context, topicName string, subs
422340
return nil, nil, err
423341
}
424342

425-
respProps, err := ac.newRuleProperties(respEnv)
343+
respProps, err := ac.newRulePropertiesFromEnvelope(respEnv)
426344

427345
return respProps, httpResp, err
428346
}
429347

430-
func (ac *Client) newRuleProperties(env *atom.RuleEnvelope) (*RuleProperties, error) {
348+
func makeRuleNameForProperties(properties *RuleProperties) string {
349+
if properties.Name != "" {
350+
return properties.Name
351+
}
352+
353+
return "$Default"
354+
}
355+
356+
func convertRuleFilterToFilterDescription(filter *RuleFilter) (*atom.FilterDescription, error) {
357+
if *filter == nil {
358+
return &atom.FilterDescription{
359+
Type: "TrueFilter",
360+
SQLExpression: to.Ptr("1=1"),
361+
}, nil
362+
}
363+
364+
switch actualFilter := (*filter).(type) {
365+
case *FalseFilter:
366+
return &atom.FilterDescription{
367+
Type: "FalseFilter",
368+
SQLExpression: to.Ptr("1=0"),
369+
}, nil
370+
case *TrueFilter:
371+
return &atom.FilterDescription{
372+
Type: "TrueFilter",
373+
SQLExpression: to.Ptr("1=1"),
374+
}, nil
375+
case *SQLFilter:
376+
params, err := publicSQLParametersToInternal(actualFilter.Parameters)
377+
378+
if err != nil {
379+
return nil, err
380+
}
381+
382+
return &atom.FilterDescription{
383+
Type: "SqlFilter",
384+
SQLExpression: &actualFilter.Expression,
385+
Parameters: params,
386+
}, nil
387+
case *CorrelationFilter:
388+
appProps, err := publicSQLParametersToInternal(actualFilter.ApplicationProperties)
389+
390+
if err != nil {
391+
return nil, err
392+
}
393+
394+
return &atom.FilterDescription{
395+
Type: "CorrelationFilter",
396+
CorrelationFilter: atom.CorrelationFilter{
397+
ContentType: actualFilter.ContentType,
398+
CorrelationID: actualFilter.CorrelationID,
399+
MessageID: actualFilter.MessageID,
400+
ReplyTo: actualFilter.ReplyTo,
401+
ReplyToSessionID: actualFilter.ReplyToSessionID,
402+
SessionID: actualFilter.SessionID,
403+
Label: actualFilter.Subject,
404+
To: actualFilter.To,
405+
Properties: appProps,
406+
},
407+
}, nil
408+
case *UnknownRuleFilter:
409+
filterDefinition, err := convertUnknownRuleFilterToFilterDescription(actualFilter)
410+
411+
if err != nil {
412+
return nil, err
413+
}
414+
415+
return filterDefinition, nil
416+
default:
417+
return nil, fmt.Errorf("invalid type ('%T') for Rule.Filter", filter)
418+
}
419+
}
420+
421+
func convertRuleActionToActionDescription(action *RuleAction) (*atom.ActionDescription, error) {
422+
if *action == nil {
423+
return nil, nil
424+
}
425+
426+
switch actualAction := (*action).(type) {
427+
case *SQLAction:
428+
params, err := publicSQLParametersToInternal(actualAction.Parameters)
429+
430+
if err != nil {
431+
return nil, err
432+
}
433+
434+
return &atom.ActionDescription{
435+
Type: "SqlRuleAction",
436+
SQLExpression: actualAction.Expression,
437+
Parameters: params,
438+
}, nil
439+
case *UnknownRuleAction:
440+
ad, err := convertUnknownRuleActionToActionDescription(actualAction)
441+
442+
if err != nil {
443+
return nil, err
444+
}
445+
446+
return ad, nil
447+
default:
448+
return nil, fmt.Errorf("invalid type ('%T') for Rule.Action", &action)
449+
}
450+
}
451+
452+
func (ac *Client) newRulePropertiesFromEnvelope(env *atom.RuleEnvelope) (*RuleProperties, error) {
431453
desc := env.Content.RuleDescription
432454

433455
props := RuleProperties{
434-
Name: env.Title,
456+
Name: desc.Name,
435457
}
436458

437459
switch desc.Filter.Type {

sdk/messaging/azservicebus/admin/admin_client_subscription.go

Lines changed: 46 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ type SubscriptionProperties struct {
6262

6363
// UserMetadata is custom metadata that user can associate with the subscription.
6464
UserMetadata *string
65+
66+
// DefaultRule is a rule that is added to the subscription as soon as it is created.
67+
DefaultRule *RuleProperties
6568
}
6669

6770
// SubscriptionRuntimeProperties represent dynamic properties of a subscription, such as the ActiveMessageCount.
@@ -338,7 +341,11 @@ func (ac *Client) createOrUpdateSubscriptionImpl(ctx context.Context, topicName
338341
props = &SubscriptionProperties{}
339342
}
340343

341-
env := newSubscriptionEnvelope(props, ac.em.TokenProvider())
344+
env, err := newSubscriptionEnvelope(props, ac.em.TokenProvider())
345+
346+
if err != nil {
347+
return nil, nil, err
348+
}
342349

343350
if !creating {
344351
ctx = runtime.WithHTTPHeader(ctx, http.Header{
@@ -367,7 +374,13 @@ func (ac *Client) createOrUpdateSubscriptionImpl(ctx context.Context, topicName
367374
return &item.SubscriptionProperties, resp, nil
368375
}
369376

370-
func newSubscriptionEnvelope(props *SubscriptionProperties, tokenProvider auth.TokenProvider) *atom.SubscriptionEnvelope {
377+
func newSubscriptionEnvelope(props *SubscriptionProperties, tokenProvider auth.TokenProvider) (*atom.SubscriptionEnvelope, error) {
378+
defaultRuleDescription, err := newDefaultRuleDescription(props.DefaultRule)
379+
380+
if err != nil {
381+
return nil, err
382+
}
383+
371384
desc := &atom.SubscriptionDescription{
372385
DefaultMessageTimeToLive: props.DefaultMessageTimeToLive,
373386
LockDuration: props.LockDuration,
@@ -380,12 +393,39 @@ func newSubscriptionEnvelope(props *SubscriptionProperties, tokenProvider auth.T
380393
UserMetadata: props.UserMetadata,
381394
EnableBatchedOperations: props.EnableBatchedOperations,
382395
AutoDeleteOnIdle: props.AutoDeleteOnIdle,
383-
// TODO: when we get rule serialization in place.
384-
// DefaultRuleDescription: props.DefaultRuleDescription,
385-
// are these attributes just not valid anymore?
396+
DefaultRuleDescription: defaultRuleDescription,
397+
}
398+
399+
return atom.WrapWithSubscriptionEnvelope(desc), nil
400+
}
401+
402+
func newDefaultRuleDescription(properties *RuleProperties) (*atom.DefaultRuleDescription, error) {
403+
if properties == nil {
404+
return nil, nil
386405
}
387406

388-
return atom.WrapWithSubscriptionEnvelope(desc)
407+
ruleDescription := atom.DefaultRuleDescription{
408+
Name: makeRuleNameForProperties(properties),
409+
}
410+
411+
filter, err := convertRuleFilterToFilterDescription(&properties.Filter)
412+
413+
if err != nil {
414+
return nil, err
415+
}
416+
417+
// Filter can never be nil because it's default is TrueFilter
418+
ruleDescription.Filter = filter
419+
420+
action, err := convertRuleActionToActionDescription(&properties.Action)
421+
422+
if err != nil {
423+
return nil, err
424+
}
425+
426+
ruleDescription.Action = action
427+
428+
return &ruleDescription, nil
389429
}
390430

391431
func newSubscriptionItem(env *atom.SubscriptionEnvelope, topicName string) (*SubscriptionPropertiesItem, error) {

0 commit comments

Comments
 (0)