Skip to content

Commit 072ff86

Browse files
[azeventhubs] Allow ConsumerClient to receive from multiple partitions (Azure#18870)
The previous ConsumerClient only allowed receiving from a single partition, which meant multiple partitions also required multiple AMQP connections. This makes it so multiple partitions can be consumed by pushing ReceiveEvents down, into a new type (PartitionClient) which you can instantiate from ConsumerClient. This is similar to the model we have with azservicebus, creating Receivers and Senders.
1 parent b74e8f8 commit 072ff86

File tree

8 files changed

+517
-401
lines changed

8 files changed

+517
-401
lines changed

sdk/messaging/azeventhubs/consumer_client.go

Lines changed: 40 additions & 251 deletions
Original file line numberDiff line numberDiff line change
@@ -5,24 +5,15 @@ package azeventhubs
55
import (
66
"context"
77
"crypto/tls"
8-
"errors"
98
"fmt"
109
"net"
11-
"time"
1210

1311
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
14-
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
15-
"github.com/Azure/azure-sdk-for-go/sdk/internal/log"
1612
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal"
1713
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/amqpwrap"
18-
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/go-amqp"
1914
)
2015

21-
// DefaultConsumerGroup is the name of the default consumer group in the Event Hubs service.
22-
const DefaultConsumerGroup = "$Default"
23-
24-
// ConsumerClientOptions contains options for the `NewConsumerClient` and `NewConsumerClientFromConnectionString`
25-
// functions.
16+
// ConsumerClientOptions configures optional parameters for a ConsumerClient.
2617
type ConsumerClientOptions struct {
2718
// TLSConfig configures a client with a custom *tls.Config.
2819
TLSConfig *tls.Config
@@ -37,77 +28,31 @@ type ConsumerClientOptions struct {
3728
// RetryOptions controls how often operations are retried from this client and any
3829
// Receivers and Senders created from this client.
3930
RetryOptions RetryOptions
40-
41-
// StartPosition is the position we will start receiving events from,
42-
// either an offset (inclusive) with Offset, or receiving events received
43-
// after a specific time using EnqueuedTime.
44-
StartPosition StartPosition
45-
46-
// OwnerLevel is the priority for this consumer, also known as the 'epoch' level.
47-
// When used, a consumer with a higher OwnerLevel will take ownership of a partition
48-
// from consumers with a lower OwnerLevel.
49-
// Default is off.
50-
OwnerLevel *int64
51-
}
52-
53-
// StartPosition indicates the position to start receiving events within a partition.
54-
// The default position is Latest.
55-
type StartPosition struct {
56-
// Offset will start the consumer after the specified offset. Can be exclusive
57-
// or inclusive, based on the Inclusive property.
58-
// NOTE: offsets are not stable values, and might refer to different events over time
59-
// as the Event Hub events reach their age limit and are discarded.
60-
Offset *int64
61-
62-
// SequenceNumber will start the consumer after the specified sequence number. Can be exclusive
63-
// or inclusive, based on the Inclusive property.
64-
SequenceNumber *int64
65-
66-
// EnqueuedTime will start the consumer before events that were enqueued on or after EnqueuedTime.
67-
// Can be exclusive or inclusive, based on the Inclusive property.
68-
EnqueuedTime *time.Time
69-
70-
// Inclusive configures whether the events directly at Offset, SequenceNumber or EnqueuedTime will be included (true)
71-
// or excluded (false).
72-
Inclusive bool
73-
74-
// Earliest will start the consumer at the earliest event.
75-
Earliest *bool
76-
77-
// Latest will start the consumer after the last event.
78-
Latest *bool
7931
}
8032

81-
// ConsumerClient is used to receive events from an Event Hub partition.
33+
// ConsumerClient can create PartitionClient instances, which can read events from
34+
// a partition.
8235
type ConsumerClient struct {
36+
consumerGroup string
37+
eventHub string
8338
retryOptions RetryOptions
8439
namespace *internal.Namespace
85-
eventHub string
86-
consumerGroup string
87-
partitionID string
88-
ownerLevel *int64
89-
90-
offsetExpression string
91-
92-
links *internal.Links[amqpwrap.AMQPReceiverCloser]
40+
links *internal.Links[amqpwrap.AMQPReceiverCloser]
9341
}
9442

9543
// NewConsumerClient creates a ConsumerClient which uses an azcore.TokenCredential for authentication.
96-
// The consumerGroup is the consumer group for this consumer.
9744
// The fullyQualifiedNamespace is the Event Hubs namespace name (ex: myeventhub.servicebus.windows.net)
9845
// The credential is one of the credentials in the `github.com/Azure/azure-sdk-for-go/sdk/azidentity` package.
99-
func NewConsumerClient(fullyQualifiedNamespace string, eventHub string, partitionID string, consumerGroup string, credential azcore.TokenCredential, options *ConsumerClientOptions) (*ConsumerClient, error) {
100-
return newConsumerClientImpl(consumerClientArgs{
46+
func NewConsumerClient(fullyQualifiedNamespace string, eventHub string, consumerGroup string, credential azcore.TokenCredential, options *ConsumerClientOptions) (*ConsumerClient, error) {
47+
return newConsumerClient(consumerClientArgs{
48+
consumerGroup: consumerGroup,
10149
fullyQualifiedNamespace: fullyQualifiedNamespace,
102-
credential: credential,
10350
eventHub: eventHub,
104-
partitionID: partitionID,
105-
consumerGroup: consumerGroup,
51+
credential: credential,
10652
}, options)
10753
}
10854

10955
// NewConsumerClientFromConnectionString creates a ConsumerClient from a connection string.
110-
// The consumerGroup is the consumer group for this consumer.
11156
//
11257
// connectionString can be one of the following formats:
11358
//
@@ -116,75 +61,43 @@ func NewConsumerClient(fullyQualifiedNamespace string, eventHub string, partitio
11661
//
11762
// Connection string, has EntityPath. In this case eventHub must be empty.
11863
// ex: Endpoint=sb://<your-namespace>.servicebus.windows.net/;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<entity path>
119-
func NewConsumerClientFromConnectionString(connectionString string, eventHub string, partitionID string, consumerGroup string, options *ConsumerClientOptions) (*ConsumerClient, error) {
64+
func NewConsumerClientFromConnectionString(connectionString string, eventHub string, consumerGroup string, options *ConsumerClientOptions) (*ConsumerClient, error) {
12065
parsedConn, err := parseConn(connectionString, eventHub)
12166

12267
if err != nil {
12368
return nil, err
12469
}
12570

126-
return newConsumerClientImpl(consumerClientArgs{
71+
return newConsumerClient(consumerClientArgs{
72+
consumerGroup: consumerGroup,
12773
connectionString: connectionString,
12874
eventHub: parsedConn.HubName,
129-
partitionID: partitionID,
130-
consumerGroup: consumerGroup,
13175
}, options)
13276
}
13377

134-
// ReceiveEventsOptions contains optional parameters for the ReceiveEvents function
135-
type ReceiveEventsOptions struct {
136-
// For future expansion
137-
}
138-
139-
// ReceiveEvents receives events until the context has expired or been cancelled.
140-
func (cc *ConsumerClient) ReceiveEvents(ctx context.Context, count int, options *ReceiveEventsOptions) ([]*ReceivedEventData, error) {
141-
var events []*ReceivedEventData
142-
143-
err := cc.links.Retry(ctx, EventConsumer, "ReceiveEvents", cc.partitionID, cc.retryOptions, func(ctx context.Context, lwid internal.LinkWithID[amqpwrap.AMQPReceiverCloser]) error {
144-
events = nil
145-
146-
outstandingCredits := lwid.Link.Credits()
147-
148-
if count > int(outstandingCredits) {
149-
newCredits := uint32(count) - outstandingCredits
150-
151-
log.Writef(EventConsumer, "Have %d outstanding credit, only issuing %d credits", outstandingCredits, newCredits)
152-
153-
if err := lwid.Link.IssueCredit(newCredits); err != nil {
154-
return err
155-
}
156-
}
157-
158-
for {
159-
amqpMessage, err := lwid.Link.Receive(ctx)
160-
161-
if err != nil {
162-
prefetched := getAllPrefetched(lwid.Link, count-len(events))
163-
164-
for _, amqpMsg := range prefetched {
165-
events = append(events, newReceivedEventData(amqpMsg))
166-
}
167-
168-
// this lets cancel errors just return
169-
return err
170-
}
171-
172-
receivedEvent := newReceivedEventData(amqpMessage)
173-
events = append(events, receivedEvent)
174-
175-
if len(events) == count {
176-
return nil
177-
}
178-
}
179-
})
78+
// NewPartitionClientOptions provides options for the Subscribe function.
79+
type NewPartitionClientOptions struct {
80+
// StartPosition is the position we will start receiving events from,
81+
// either an offset (inclusive) with Offset, or receiving events received
82+
// after a specific time using EnqueuedTime.
83+
StartPosition StartPosition
18084

181-
if err != nil && len(events) == 0 {
182-
// TODO: if we get a "partition ownership lost" we need to think about whether that's retryable.
183-
return nil, internal.TransformError(err)
184-
}
85+
// OwnerLevel is the priority for this partition client, also known as the 'epoch' level.
86+
// When used, a partition client with a higher OwnerLevel will take ownership of a partition
87+
// from partition clients with a lower OwnerLevel.
88+
// Default is off.
89+
OwnerLevel *int64
90+
}
18591

186-
cc.offsetExpression = formatOffsetExpressionForSequence(">", events[len(events)-1].SequenceNumber)
187-
return events, nil
92+
// NewPartitionClient creates a client that can receive events from a partition.
93+
func (cc *ConsumerClient) NewPartitionClient(partitionID string, options *NewPartitionClientOptions) (*PartitionClient, error) {
94+
return newPartitionClient(partitionClientArgs{
95+
namespace: cc.namespace,
96+
eventHub: cc.eventHub,
97+
partitionID: partitionID,
98+
consumerGroup: cc.consumerGroup,
99+
retryOptions: cc.retryOptions,
100+
}, options)
188101
}
189102

190103
// GetEventHubProperties gets event hub properties, like the available partition IDs and when the Event Hub was created.
@@ -210,138 +123,30 @@ func (cc *ConsumerClient) GetPartitionProperties(ctx context.Context, partitionI
210123
return getPartitionProperties(ctx, cc.namespace, rpcLink.Link, cc.eventHub, partitionID, options)
211124
}
212125

213-
// Close closes the consumer's link and the underlying AMQP connection.
126+
// Close closes the connection for this client.
214127
func (cc *ConsumerClient) Close(ctx context.Context) error {
215-
if err := cc.links.Close(ctx); err != nil {
216-
log.Writef(EventConsumer, "Failed to close link (error might be cached): %s", err.Error())
217-
}
218128
return cc.namespace.Close(ctx, true)
219129
}
220130

221-
func getOffsetExpression(startPosition StartPosition) (string, error) {
222-
lt := ">"
223-
224-
if startPosition.Inclusive {
225-
lt = ">="
226-
}
227-
228-
var errMultipleFieldsSet = errors.New("only a single start point can be set: Earliest, EnqueuedTime, Latest, Offset, or SequenceNumber")
229-
230-
offsetExpr := ""
231-
232-
if startPosition.EnqueuedTime != nil {
233-
// time-based, non-inclusive
234-
offsetExpr = fmt.Sprintf("amqp.annotation.x-opt-enqueued-time %s '%d'", lt, startPosition.EnqueuedTime.UnixMilli())
235-
}
236-
237-
if startPosition.Offset != nil {
238-
// offset-based, non-inclusive
239-
// ex: amqp.annotation.x-opt-enqueued-time %s '165805323000'
240-
if offsetExpr != "" {
241-
return "", errMultipleFieldsSet
242-
}
243-
244-
offsetExpr = fmt.Sprintf("amqp.annotation.x-opt-offset %s '%d'", lt, *startPosition.Offset)
245-
}
246-
247-
if startPosition.Latest != nil && *startPosition.Latest {
248-
if offsetExpr != "" {
249-
return "", errMultipleFieldsSet
250-
}
251-
252-
offsetExpr = "amqp.annotation.x-opt-offset > '@latest'"
253-
}
254-
255-
if startPosition.SequenceNumber != nil {
256-
if offsetExpr != "" {
257-
return "", errMultipleFieldsSet
258-
}
259-
260-
offsetExpr = formatOffsetExpressionForSequence(lt, *startPosition.SequenceNumber)
261-
}
262-
263-
if startPosition.Earliest != nil && *startPosition.Earliest {
264-
if offsetExpr != "" {
265-
return "", errMultipleFieldsSet
266-
}
267-
268-
return "amqp.annotation.x-opt-offset > '-1'", nil
269-
}
270-
271-
if offsetExpr != "" {
272-
return offsetExpr, nil
273-
}
274-
275-
// default to the start
276-
return "amqp.annotation.x-opt-offset > '@latest'", nil
277-
}
278-
279-
func formatOffsetExpressionForSequence(op string, sequenceNumber int64) string {
280-
return fmt.Sprintf("amqp.annotation.x-opt-sequence-number %s '%d'", op, sequenceNumber)
281-
}
282-
283-
func (cc *ConsumerClient) getEntityPath(partitionID string) string {
284-
return fmt.Sprintf("%s/ConsumerGroups/%s/Partitions/%s", cc.eventHub, cc.consumerGroup, partitionID)
285-
}
286-
287-
const defaultLinkRxBuffer = 2048
288-
289-
func (cc *ConsumerClient) newEventHubConsumerLink(ctx context.Context, session amqpwrap.AMQPSession, entityPath string) (internal.AMQPReceiverCloser, error) {
290-
var receiverProps map[string]interface{}
291-
292-
if cc.ownerLevel != nil {
293-
receiverProps = map[string]interface{}{
294-
"com.microsoft:epoch": *cc.ownerLevel,
295-
}
296-
}
297-
298-
receiver, err := session.NewReceiver(ctx, entityPath, &amqp.ReceiverOptions{
299-
SettlementMode: to.Ptr(amqp.ModeFirst),
300-
ManualCredits: true,
301-
Credit: defaultLinkRxBuffer,
302-
Filters: []amqp.LinkFilter{
303-
amqp.LinkFilterSelector(cc.offsetExpression),
304-
},
305-
Properties: receiverProps,
306-
})
307-
308-
if err != nil {
309-
return nil, err
310-
}
311-
312-
return receiver, nil
313-
}
314-
315131
type consumerClientArgs struct {
316132
connectionString string
317133

318134
// the Event Hubs namespace name (ex: myservicebus.servicebus.windows.net)
319135
fullyQualifiedNamespace string
320136
credential azcore.TokenCredential
321137

322-
eventHub string
323-
partitionID string
324-
325138
consumerGroup string
139+
eventHub string
326140
}
327141

328-
func newConsumerClientImpl(args consumerClientArgs, options *ConsumerClientOptions) (*ConsumerClient, error) {
142+
func newConsumerClient(args consumerClientArgs, options *ConsumerClientOptions) (*ConsumerClient, error) {
329143
if options == nil {
330144
options = &ConsumerClientOptions{}
331145
}
332146

333-
offsetExpr, err := getOffsetExpression(options.StartPosition)
334-
335-
if err != nil {
336-
return nil, err
337-
}
338-
339147
client := &ConsumerClient{
340-
eventHub: args.eventHub,
341-
partitionID: args.partitionID,
342-
ownerLevel: options.OwnerLevel,
343-
consumerGroup: args.consumerGroup,
344-
offsetExpression: offsetExpr,
148+
consumerGroup: args.consumerGroup,
149+
eventHub: args.eventHub,
345150
}
346151

347152
var nsOptions []internal.NamespaceOption
@@ -379,23 +184,7 @@ func newConsumerClientImpl(args consumerClientArgs, options *ConsumerClientOptio
379184
}
380185

381186
client.namespace = tempNS
382-
client.links = internal.NewLinks(tempNS, fmt.Sprintf("%s/$management", client.eventHub), client.getEntityPath, client.newEventHubConsumerLink)
187+
client.links = internal.NewLinks[amqpwrap.AMQPReceiverCloser](tempNS, fmt.Sprintf("%s/$management", client.eventHub), nil, nil)
383188

384189
return client, nil
385190
}
386-
387-
func getAllPrefetched(receiver amqpwrap.AMQPReceiver, max int) []*amqp.Message {
388-
var messages []*amqp.Message
389-
390-
for i := 0; i < max; i++ {
391-
msg := receiver.Prefetched()
392-
393-
if msg == nil {
394-
break
395-
}
396-
397-
messages = append(messages, msg)
398-
}
399-
400-
return messages
401-
}

0 commit comments

Comments
 (0)