|
| 1 | +// Copyright (c) Microsoft Corporation. All rights reserved. |
| 2 | +// Licensed under the MIT License. |
| 3 | +package azeventhubs |
| 4 | + |
| 5 | +import ( |
| 6 | + "context" |
| 7 | + "crypto/tls" |
| 8 | + "fmt" |
| 9 | + "net" |
| 10 | + "time" |
| 11 | + |
| 12 | + "github.com/Azure/azure-sdk-for-go/sdk/azcore" |
| 13 | + "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" |
| 14 | + "github.com/Azure/azure-sdk-for-go/sdk/internal/log" |
| 15 | + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal" |
| 16 | + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/amqpwrap" |
| 17 | + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/go-amqp" |
| 18 | +) |
| 19 | + |
| 20 | +// DefaultConsumerGroup is the name of the default consumer group in the Event Hubs service. |
| 21 | +const DefaultConsumerGroup = "$Default" |
| 22 | + |
| 23 | +// ConsumerClientOptions contains options for the `NewConsumerClient` and `NewConsumerClientFromConnectionString` |
| 24 | +// functions. |
| 25 | +type ConsumerClientOptions struct { |
| 26 | + // TLSConfig configures a client with a custom *tls.Config. |
| 27 | + TLSConfig *tls.Config |
| 28 | + |
| 29 | + // Application ID that will be passed to the namespace. |
| 30 | + ApplicationID string |
| 31 | + |
| 32 | + // NewWebSocketConn is a function that can create a net.Conn for use with websockets. |
| 33 | + // For an example, see ExampleNewClient_usingWebsockets() function in example_client_test.go. |
| 34 | + NewWebSocketConn func(ctx context.Context, args NewWebSocketConnArgs) (net.Conn, error) |
| 35 | + |
| 36 | + // RetryOptions controls how often operations are retried from this client and any |
| 37 | + // Receivers and Senders created from this client. |
| 38 | + RetryOptions RetryOptions |
| 39 | + |
| 40 | + // StartPosition is the position we will start receiving events from, |
| 41 | + // either an offset (inclusive) with Offset, or receiving events received |
| 42 | + // after a specific time using EnqueuedTime. |
| 43 | + StartPosition StartPosition |
| 44 | + |
| 45 | + // OwnerLevel is the priority for this consumer, also known as the 'epoch' level. |
| 46 | + // When used, a consumer with a higher OwnerLevel will take ownership of a partition |
| 47 | + // from consumers with a lower OwnerLevel. |
| 48 | + // Default is off. |
| 49 | + OwnerLevel *uint64 |
| 50 | +} |
| 51 | + |
| 52 | +// StartPosition indicates the position to start receiving events within a partition. |
| 53 | +type StartPosition struct { |
| 54 | + // Offset will start the consumer after the specified offset. Can be exclusive |
| 55 | + // or inclusive, based on the Inclusive property. |
| 56 | + // NOTE: offsets are not stable values, and might refer to different events over time |
| 57 | + // as the Event Hub events reach their age limit and are discarded. |
| 58 | + Offset *uint64 |
| 59 | + |
| 60 | + // SequenceNumber will start the consumer after the specified sequence number. Can be exclusive |
| 61 | + // or inclusive, based on the Inclusive property. |
| 62 | + SequenceNumber *int64 |
| 63 | + |
| 64 | + // EnqueuedTime will start the consumer before events that were enqueued on or after EnqueuedTime. |
| 65 | + // Can be exclusive or inclusive, based on the Inclusive property. |
| 66 | + EnqueuedTime *time.Time |
| 67 | + |
| 68 | + // Inclusive configures whether the events directly at Offset, SequenceNumber or EnqueuedTime will be included (true) |
| 69 | + // or excluded (false). |
| 70 | + Inclusive bool |
| 71 | + |
| 72 | + // Earliest will start the consumer at the earliest event. |
| 73 | + Earliest *bool |
| 74 | + |
| 75 | + // Latest will start the consumer after the last event. |
| 76 | + Latest *bool |
| 77 | +} |
| 78 | + |
| 79 | +// ConsumerClient is used to receive events from an Event Hub partition. |
| 80 | +type ConsumerClient struct { |
| 81 | + retryOptions RetryOptions |
| 82 | + namespace *internal.Namespace |
| 83 | + eventHub string |
| 84 | + consumerGroup string |
| 85 | + partitionID string |
| 86 | + ownerLevel *uint64 |
| 87 | + |
| 88 | + offsetExpression string |
| 89 | + |
| 90 | + links *internal.Links[amqpwrap.AMQPReceiverCloser] |
| 91 | +} |
| 92 | + |
| 93 | +// NewConsumerClient creates a ConsumerClient which uses an azcore.TokenCredential for authentication. |
| 94 | +// The consumerGroup is the consumer group for this consumer. |
| 95 | +// The fullyQualifiedNamespace is the Event Hubs namespace name (ex: myeventhub.servicebus.windows.net) |
| 96 | +// The credential is one of the credentials in the `github.com/Azure/azure-sdk-for-go/sdk/azidentity` package. |
| 97 | +func NewConsumerClient(fullyQualifiedNamespace string, eventHub string, partitionID string, consumerGroup string, credential azcore.TokenCredential, options *ConsumerClientOptions) (*ConsumerClient, error) { |
| 98 | + return newConsumerClientImpl(consumerClientArgs{ |
| 99 | + fullyQualifiedNamespace: fullyQualifiedNamespace, |
| 100 | + credential: credential, |
| 101 | + eventHub: eventHub, |
| 102 | + partitionID: partitionID, |
| 103 | + consumerGroup: consumerGroup, |
| 104 | + }, options) |
| 105 | +} |
| 106 | + |
| 107 | +// NewConsumerClientFromConnectionString creates a ConsumerClient from a connection string. |
| 108 | +// The consumerGroup is the consumer group for this consumer. |
| 109 | +// |
| 110 | +// connectionString can be one of the following formats: |
| 111 | +// |
| 112 | +// Connection string, no EntityPath. In this case eventHub cannot be empty. |
| 113 | +// ex: Endpoint=sb://<your-namespace>.servicebus.windows.net/;SharedAccessKeyName=<key-name>;SharedAccessKey=<key> |
| 114 | +// |
| 115 | +// Connection string, has EntityPath. In this case eventHub must be empty. |
| 116 | +// ex: Endpoint=sb://<your-namespace>.servicebus.windows.net/;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<entity path> |
| 117 | +func NewConsumerClientFromConnectionString(connectionString string, eventHub string, partitionID string, consumerGroup string, options *ConsumerClientOptions) (*ConsumerClient, error) { |
| 118 | + parsedConn, err := parseConn(connectionString, eventHub) |
| 119 | + |
| 120 | + if err != nil { |
| 121 | + return nil, err |
| 122 | + } |
| 123 | + |
| 124 | + return newConsumerClientImpl(consumerClientArgs{ |
| 125 | + connectionString: connectionString, |
| 126 | + eventHub: parsedConn.HubName, |
| 127 | + partitionID: partitionID, |
| 128 | + consumerGroup: consumerGroup, |
| 129 | + }, options) |
| 130 | +} |
| 131 | + |
| 132 | +// ReceiveEventsOptions contains optional parameters for the ReceiveEvents function |
| 133 | +type ReceiveEventsOptions struct { |
| 134 | + // For future expansion |
| 135 | +} |
| 136 | + |
| 137 | +// ReceiveEvents receives events until the context has expired or been cancelled. |
| 138 | +func (cc *ConsumerClient) ReceiveEvents(ctx context.Context, count int, options *ReceiveEventsOptions) ([]*ReceivedEventData, error) { |
| 139 | + var events []*ReceivedEventData |
| 140 | + |
| 141 | + err := cc.links.Retry(ctx, EventConsumer, "ReceiveEvents", cc.partitionID, cc.retryOptions, func(ctx context.Context, lwid internal.LinkWithID[amqpwrap.AMQPReceiverCloser]) error { |
| 142 | + events = nil |
| 143 | + |
| 144 | + outstandingCredits := lwid.Link.Credits() |
| 145 | + |
| 146 | + if count > int(outstandingCredits) { |
| 147 | + newCredits := uint32(count) - outstandingCredits |
| 148 | + |
| 149 | + log.Writef(EventConsumer, "Have %d outstanding credit, only issuing %d credits", outstandingCredits, newCredits) |
| 150 | + |
| 151 | + if err := lwid.Link.IssueCredit(newCredits); err != nil { |
| 152 | + return err |
| 153 | + } |
| 154 | + } |
| 155 | + |
| 156 | + for { |
| 157 | + amqpMessage, err := lwid.Link.Receive(ctx) |
| 158 | + |
| 159 | + if err != nil { |
| 160 | + prefetched := getAllPrefetched(lwid.Link, count-len(events)) |
| 161 | + |
| 162 | + for _, amqpMsg := range prefetched { |
| 163 | + events = append(events, newReceivedEventData(amqpMsg)) |
| 164 | + } |
| 165 | + |
| 166 | + // this lets cancel errors just return |
| 167 | + return err |
| 168 | + } |
| 169 | + |
| 170 | + receivedEvent := newReceivedEventData(amqpMessage) |
| 171 | + events = append(events, receivedEvent) |
| 172 | + |
| 173 | + if len(events) == count { |
| 174 | + return nil |
| 175 | + } |
| 176 | + } |
| 177 | + }) |
| 178 | + |
| 179 | + if err != nil && len(events) == 0 { |
| 180 | + // TODO: if we get a "partition ownership lost" we need to think about whether that's retryable. |
| 181 | + return nil, internal.TransformError(err) |
| 182 | + } |
| 183 | + |
| 184 | + cc.offsetExpression = getOffsetExpression(StartPosition{ |
| 185 | + SequenceNumber: to.Ptr(events[len(events)-1].SequenceNumber), |
| 186 | + Inclusive: false, |
| 187 | + }) |
| 188 | + |
| 189 | + return events, nil |
| 190 | +} |
| 191 | + |
| 192 | +// GetEventHubProperties gets event hub properties, like the available partition IDs and when the Event Hub was created. |
| 193 | +func (cc *ConsumerClient) GetEventHubProperties(ctx context.Context, options *GetEventHubPropertiesOptions) (EventHubProperties, error) { |
| 194 | + rpcLink, err := cc.links.GetManagementLink(ctx) |
| 195 | + |
| 196 | + if err != nil { |
| 197 | + return EventHubProperties{}, err |
| 198 | + } |
| 199 | + |
| 200 | + return getEventHubProperties(ctx, cc.namespace, rpcLink.Link, cc.eventHub, options) |
| 201 | +} |
| 202 | + |
| 203 | +// GetPartitionProperties gets properties for a specific partition. This includes data like the last enqueued sequence number, the first sequence |
| 204 | +// number and when an event was last enqueued to the partition. |
| 205 | +func (cc *ConsumerClient) GetPartitionProperties(ctx context.Context, partitionID string, options *GetPartitionPropertiesOptions) (PartitionProperties, error) { |
| 206 | + rpcLink, err := cc.links.GetManagementLink(ctx) |
| 207 | + |
| 208 | + if err != nil { |
| 209 | + return PartitionProperties{}, err |
| 210 | + } |
| 211 | + |
| 212 | + return getPartitionProperties(ctx, cc.namespace, rpcLink.Link, cc.eventHub, partitionID, options) |
| 213 | +} |
| 214 | + |
| 215 | +// Close closes the consumer's link and the underlying AMQP connection. |
| 216 | +func (cc *ConsumerClient) Close(ctx context.Context) error { |
| 217 | + if err := cc.links.Close(ctx); err != nil { |
| 218 | + log.Writef(EventConsumer, "Failed to close link (error might be cached): %s", err.Error()) |
| 219 | + } |
| 220 | + return cc.namespace.Close(ctx, true) |
| 221 | +} |
| 222 | + |
| 223 | +func getOffsetExpression(startPosition StartPosition) string { |
| 224 | + lt := ">" |
| 225 | + |
| 226 | + if startPosition.Inclusive { |
| 227 | + lt = ">=" |
| 228 | + } |
| 229 | + |
| 230 | + if startPosition.EnqueuedTime != nil { |
| 231 | + // time-based, non-inclusive |
| 232 | + return fmt.Sprintf("amqp.annotation.x-opt-enqueued-time %s '%v'", lt, startPosition.EnqueuedTime.UnixNano()/int64(time.Millisecond)) |
| 233 | + } |
| 234 | + |
| 235 | + if startPosition.Offset != nil { |
| 236 | + // offset-based, non-inclusive |
| 237 | + // ex: amqp.annotation.x-opt-enqueued-time %s '165805323000' |
| 238 | + return fmt.Sprintf("amqp.annotation.x-opt-offset %s '%v'", lt, startPosition.Offset) |
| 239 | + } |
| 240 | + |
| 241 | + if startPosition.Latest != nil && *startPosition.Latest { |
| 242 | + return "amqp.annotation.x-opt-offset > '@latest'" |
| 243 | + } |
| 244 | + |
| 245 | + if startPosition.SequenceNumber != nil { |
| 246 | + return fmt.Sprintf("amqp.annotation.x-opt-sequence-number %s '%d", lt, *startPosition.SequenceNumber) |
| 247 | + } |
| 248 | + |
| 249 | + // default to the start |
| 250 | + return "amqp.annotation.x-opt-offset > '-1'" |
| 251 | +} |
| 252 | + |
| 253 | +func (cc *ConsumerClient) getEntityPath(partitionID string) string { |
| 254 | + if cc.ownerLevel == nil { |
| 255 | + return fmt.Sprintf("%s/ConsumerGroups/%s/Partitions/%s", cc.eventHub, cc.consumerGroup, partitionID) |
| 256 | + } else { |
| 257 | + return fmt.Sprintf("%s/ConsumerGroups/%s/Partitions/%s/epoch/%d", cc.eventHub, cc.consumerGroup, partitionID, *cc.ownerLevel) |
| 258 | + } |
| 259 | +} |
| 260 | + |
| 261 | +const defaultLinkRxBuffer = 2048 |
| 262 | + |
| 263 | +func (cc *ConsumerClient) newEventHubConsumerLink(ctx context.Context, session amqpwrap.AMQPSession, entityPath string) (internal.AMQPReceiverCloser, error) { |
| 264 | + receiver, err := session.NewReceiver(ctx, entityPath, &amqp.ReceiverOptions{ |
| 265 | + SettlementMode: to.Ptr(amqp.ModeFirst), |
| 266 | + ManualCredits: true, |
| 267 | + Credit: defaultLinkRxBuffer, |
| 268 | + Filters: []amqp.LinkFilter{ |
| 269 | + amqp.LinkFilterSelector(cc.offsetExpression), |
| 270 | + }, |
| 271 | + }) |
| 272 | + |
| 273 | + if err != nil { |
| 274 | + return nil, err |
| 275 | + } |
| 276 | + |
| 277 | + return receiver, nil |
| 278 | +} |
| 279 | + |
| 280 | +type consumerClientArgs struct { |
| 281 | + connectionString string |
| 282 | + |
| 283 | + // the Event Hubs namespace name (ex: myservicebus.servicebus.windows.net) |
| 284 | + fullyQualifiedNamespace string |
| 285 | + credential azcore.TokenCredential |
| 286 | + |
| 287 | + eventHub string |
| 288 | + partitionID string |
| 289 | + |
| 290 | + consumerGroup string |
| 291 | +} |
| 292 | + |
| 293 | +func newConsumerClientImpl(args consumerClientArgs, options *ConsumerClientOptions) (*ConsumerClient, error) { |
| 294 | + if options == nil { |
| 295 | + options = &ConsumerClientOptions{} |
| 296 | + } |
| 297 | + |
| 298 | + client := &ConsumerClient{ |
| 299 | + eventHub: args.eventHub, |
| 300 | + partitionID: args.partitionID, |
| 301 | + ownerLevel: options.OwnerLevel, |
| 302 | + consumerGroup: args.consumerGroup, |
| 303 | + offsetExpression: getOffsetExpression(options.StartPosition), |
| 304 | + } |
| 305 | + |
| 306 | + var err error |
| 307 | + var nsOptions []internal.NamespaceOption |
| 308 | + |
| 309 | + if args.connectionString != "" { |
| 310 | + nsOptions = append(nsOptions, internal.NamespaceWithConnectionString(args.connectionString)) |
| 311 | + } else if args.credential != nil { |
| 312 | + option := internal.NamespaceWithTokenCredential( |
| 313 | + args.fullyQualifiedNamespace, |
| 314 | + args.credential) |
| 315 | + |
| 316 | + nsOptions = append(nsOptions, option) |
| 317 | + } |
| 318 | + |
| 319 | + client.retryOptions = options.RetryOptions |
| 320 | + |
| 321 | + if options.TLSConfig != nil { |
| 322 | + nsOptions = append(nsOptions, internal.NamespaceWithTLSConfig(options.TLSConfig)) |
| 323 | + } |
| 324 | + |
| 325 | + if options.NewWebSocketConn != nil { |
| 326 | + nsOptions = append(nsOptions, internal.NamespaceWithWebSocket(options.NewWebSocketConn)) |
| 327 | + } |
| 328 | + |
| 329 | + if options.ApplicationID != "" { |
| 330 | + nsOptions = append(nsOptions, internal.NamespaceWithUserAgent(options.ApplicationID)) |
| 331 | + } |
| 332 | + |
| 333 | + nsOptions = append(nsOptions, internal.NamespaceWithRetryOptions(options.RetryOptions)) |
| 334 | + |
| 335 | + tempNS, err := internal.NewNamespace(nsOptions...) |
| 336 | + |
| 337 | + if err != nil { |
| 338 | + return nil, err |
| 339 | + } |
| 340 | + |
| 341 | + client.namespace = tempNS |
| 342 | + client.links = internal.NewLinks(tempNS, fmt.Sprintf("%s/$management", client.eventHub), client.getEntityPath, client.newEventHubConsumerLink) |
| 343 | + |
| 344 | + return client, nil |
| 345 | +} |
| 346 | + |
| 347 | +func getAllPrefetched(receiver amqpwrap.AMQPReceiver, max int) []*amqp.Message { |
| 348 | + var messages []*amqp.Message |
| 349 | + |
| 350 | + for i := 0; i < max; i++ { |
| 351 | + msg := receiver.Prefetched() |
| 352 | + |
| 353 | + if msg == nil { |
| 354 | + break |
| 355 | + } |
| 356 | + |
| 357 | + messages = append(messages, msg) |
| 358 | + } |
| 359 | + |
| 360 | + return messages |
| 361 | +} |
0 commit comments