Skip to content

Commit adb14ea

Browse files
[azeventhubs] Adding a section to the migration guide that shows how to migrate from older checkpoints to a newer checkpoint store Azure#20297
Adding in a migration guide section that shows how to migrate We want to make it easy for customers using the previous `azure-event-hubs-go` package to use this package (`github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs`). Fixes Azure#19849
1 parent 988beb5 commit adb14ea

File tree

4 files changed

+156
-3
lines changed

4 files changed

+156
-3
lines changed
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package azeventhubs_test
5+
6+
import (
7+
"context"
8+
"encoding/json"
9+
"fmt"
10+
"os"
11+
"strconv"
12+
13+
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
14+
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
15+
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
16+
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
17+
)
18+
19+
type LegacyCheckpoint struct {
20+
PartitionID string `json:"partitionID"`
21+
Epoch int `json:"epoch"`
22+
Owner string `json:"owner"`
23+
Checkpoint struct {
24+
Offset string `json:"offset"`
25+
SequenceNumber int64 `json:"sequenceNumber"`
26+
EnqueueTime string `json:"enqueueTime"` // ": "0001-01-01T00:00:00Z"
27+
} `json:"checkpoint"`
28+
}
29+
30+
// Shows how to migrate from the older `github.com/Azure/azure-event-hubs-go` checkpointer to to
31+
// the format used by this package, `github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints/BlobStore`
32+
//
33+
// NOTE: This example is not safe to run while either the old or new checkpoint store is in-use as it doesn't
34+
// respect locking or ownership.
35+
func Example_migrateCheckpoints() {
36+
// Azure Event Hubs connection string. You can get this from the Azure Portal.
37+
// For example: youreventhub.servicebus.windows.net
38+
var EventHubNamespace = os.Getenv("EVENTHUB_NAMESPACE")
39+
40+
// Name of your Event Hub that these checkpoints reference.
41+
var EventHubName = os.Getenv("EVENTHUB_NAME")
42+
43+
// Name of your Event Hub consumer group
44+
// Example: $Default
45+
var EventHubConsumerGroup = os.Getenv("EVENTHUB_CONSUMER_GROUP")
46+
47+
// Azure Storage account connection string. You can get this from the Azure Portal.
48+
// For example: DefaultEndpointsProtocol=https;AccountName=accountname;AccountKey=account-key;EndpointSuffix=core.windows.net
49+
var StorageConnectionString = os.Getenv("STORAGE_CONNECTION_STRING")
50+
51+
// Optional: If you used `eventhub.WithPrefixInBlobPath()` configuration option for your Event Processor Host
52+
// then you'll need to set this value.
53+
//
54+
// NOTE: This is no longer needed with the new checkpoint store as it automatically makes the path unique
55+
// for each combination of eventhub + hubname + consumergroup + partition.
56+
var BlobPrefix = os.Getenv("OLD_STORAGE_BLOB_PREFIX")
57+
58+
// Name of the checkpoint store's Azure Storage container.
59+
var OldStorageContainerName = os.Getenv("OLD_STORAGE_CONTAINER_NAME")
60+
61+
// Name of the Azure Storage container to place new checkpoints in.
62+
var NewStorageContainerName = os.Getenv("NEW_STORAGE_CONTAINER_NAME")
63+
64+
if EventHubNamespace == "" || EventHubName == "" || EventHubConsumerGroup == "" ||
65+
StorageConnectionString == "" || OldStorageContainerName == "" || NewStorageContainerName == "" {
66+
fmt.Printf("Skipping migration, missing parameters\n")
67+
return
68+
}
69+
70+
blobClient, err := azblob.NewClientFromConnectionString(StorageConnectionString, nil)
71+
72+
if err != nil {
73+
panic(err)
74+
}
75+
76+
oldCheckpoints, err := loadOldCheckpoints(blobClient, OldStorageContainerName, BlobPrefix)
77+
78+
if err != nil {
79+
panic(err)
80+
}
81+
82+
newCheckpointStore, err := checkpoints.NewBlobStore(blobClient.ServiceClient().NewContainerClient(NewStorageContainerName), nil)
83+
84+
if err != nil {
85+
panic(err)
86+
}
87+
88+
for _, oldCheckpoint := range oldCheckpoints {
89+
newCheckpoint := azeventhubs.Checkpoint{
90+
ConsumerGroup: EventHubConsumerGroup,
91+
EventHubName: EventHubName,
92+
FullyQualifiedNamespace: EventHubNamespace,
93+
PartitionID: oldCheckpoint.PartitionID,
94+
}
95+
96+
offset, err := strconv.ParseInt(oldCheckpoint.Checkpoint.Offset, 10, 64)
97+
98+
if err != nil {
99+
panic(err)
100+
}
101+
102+
newCheckpoint.Offset = &offset
103+
newCheckpoint.SequenceNumber = &oldCheckpoint.Checkpoint.SequenceNumber
104+
105+
if err := newCheckpointStore.UpdateCheckpoint(context.Background(), newCheckpoint, nil); err != nil {
106+
panic(err)
107+
}
108+
}
109+
}
110+
111+
func loadOldCheckpoints(blobClient *azblob.Client, containerName string, customBlobPrefix string) ([]*LegacyCheckpoint, error) {
112+
blobPrefix := &customBlobPrefix
113+
114+
if customBlobPrefix == "" {
115+
blobPrefix = nil
116+
}
117+
118+
pager := blobClient.NewListBlobsFlatPager(containerName, &container.ListBlobsFlatOptions{
119+
Prefix: blobPrefix,
120+
})
121+
122+
var checkpoints []*LegacyCheckpoint
123+
124+
for pager.More() {
125+
page, err := pager.NextPage(context.Background())
126+
127+
if err != nil {
128+
return nil, err
129+
}
130+
131+
for _, item := range page.Segment.BlobItems {
132+
buff := [4000]byte{}
133+
134+
len, err := blobClient.DownloadBuffer(context.Background(), containerName, *item.Name, buff[:], nil)
135+
136+
if err != nil {
137+
return nil, err
138+
}
139+
140+
var legacyCheckpoint *LegacyCheckpoint
141+
142+
if err := json.Unmarshal(buff[0:len], &legacyCheckpoint); err != nil {
143+
return nil, err
144+
}
145+
146+
checkpoints = append(checkpoints, legacyCheckpoint)
147+
}
148+
}
149+
150+
return checkpoints, nil
151+
}

sdk/messaging/azeventhubs/example_consuming_events_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@ import (
1616
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
1717
)
1818

19-
// Example_consumingEventsUsingConsumerClient shows how to start consuming events in partitions
20-
// in an Event Hub.
19+
// Shows how to start consuming events in partitions in an Event Hub.
2120
//
2221
// If you have an Azure Storage account you can use the [Processor] type instead, which will handle
2322
// distributing partitions between multiple consumers. See example_processor_test.go for usage of

sdk/messaging/azeventhubs/example_processor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import (
1616
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
1717
)
1818

19-
// Example_consumingEventsUsingProcessor shows how to use the [Processor] type.
19+
// Shows how to use the [Processor] type.
2020
//
2121
// The Processor type acts as a load balancer, ensuring that partitions are divided up amongst
2222
// active Processor instances. You provide it with a [ConsumerClient] as well as a [CheckpointStore].

sdk/messaging/azeventhubs/migrationguide.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,3 +101,6 @@ producerClient.GetEventHubProperties(context.TODO(), nil)
101101
producerClient.GetPartitionProperties(context.TODO(), "partition-id", nil)
102102
```
103103

104+
## Migrating from a previous checkpoint store
105+
106+
See here for an example: [link](https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/messaging/azeventhubs/example_checkpoint_migration_test.go)

0 commit comments

Comments
 (0)