Skip to content

Commit ace19e8

Browse files
Blob Batch API Surface (Azure#19896)
* Adding methods for blob batch operations
1 parent b43109e commit ace19e8

24 files changed

+2705
-26
lines changed

sdk/storage/azblob/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
### Features Added
66

7+
* Added [Blob Batch API](https://learn.microsoft.com/rest/api/storageservices/blob-batch).
8+
79
### Breaking Changes
810

911
### Bugs Fixed

sdk/storage/azblob/blob/client.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ func NewClient(blobURL string, cred azcore.TokenCredential, options *ClientOptio
4242
conOptions.PerRetryPolicies = append(conOptions.PerRetryPolicies, authPolicy)
4343
pl := runtime.NewPipeline(exported.ModuleName, exported.ModuleVersion, runtime.PipelineOptions{}, &conOptions.ClientOptions)
4444

45-
return (*Client)(base.NewBlobClient(blobURL, pl, nil)), nil
45+
return (*Client)(base.NewBlobClient(blobURL, pl, &cred)), nil
4646
}
4747

4848
// NewClientWithNoCredential creates an instance of Client with the specified values.
@@ -100,6 +100,10 @@ func (b *Client) sharedKey() *SharedKeyCredential {
100100
return base.SharedKey((*base.Client[generated.BlobClient])(b))
101101
}
102102

103+
func (b *Client) credential() any {
104+
return base.Credential((*base.Client[generated.BlobClient])(b))
105+
}
106+
103107
// URL returns the URL endpoint used by the Client object.
104108
func (b *Client) URL() string {
105109
return b.generated().Endpoint()
@@ -114,7 +118,7 @@ func (b *Client) WithSnapshot(snapshot string) (*Client, error) {
114118
}
115119
p.Snapshot = snapshot
116120

117-
return (*Client)(base.NewBlobClient(p.String(), b.generated().Pipeline(), b.sharedKey())), nil
121+
return (*Client)(base.NewBlobClient(p.String(), b.generated().Pipeline(), b.credential())), nil
118122
}
119123

120124
// WithVersionID creates a new AppendBlobURL object identical to the source but with the specified version id.
@@ -126,7 +130,7 @@ func (b *Client) WithVersionID(versionID string) (*Client, error) {
126130
}
127131
p.VersionID = versionID
128132

129-
return (*Client)(base.NewBlobClient(p.String(), b.generated().Pipeline(), b.sharedKey())), nil
133+
return (*Client)(base.NewBlobClient(p.String(), b.generated().Pipeline(), b.credential())), nil
130134
}
131135

132136
// Delete marks the specified blob or snapshot for deletion. The blob is later deleted during garbage collection.
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
//go:build go1.18
2+
// +build go1.18
3+
4+
// Copyright (c) Microsoft Corporation. All rights reserved.
5+
// Licensed under the MIT License. See License.txt in the project root for license information.
6+
7+
package container
8+
9+
import (
10+
"context"
11+
"fmt"
12+
"net/url"
13+
14+
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
15+
"github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime"
16+
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
17+
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/internal/exported"
18+
)
19+
20+
// BatchBuilder is used for creating the batch operations list. It contains the list of either delete or set tier sub-requests.
21+
// NOTE: All sub-requests in the batch must be of the same type, either delete or set tier.
22+
type BatchBuilder struct {
23+
endpoint string
24+
authPolicy policy.Policy
25+
subRequests []*policy.Request
26+
operationType *exported.BlobBatchOperationType
27+
}
28+
29+
func (bb *BatchBuilder) checkOperationType(operationType exported.BlobBatchOperationType) error {
30+
if bb.operationType == nil {
31+
bb.operationType = &operationType
32+
return nil
33+
}
34+
if *bb.operationType != operationType {
35+
return fmt.Errorf("BlobBatch only supports one operation type per batch and is already being used for %s operations", *bb.operationType)
36+
}
37+
return nil
38+
}
39+
40+
// Delete operation is used to add delete sub-request to the batch builder.
41+
func (bb *BatchBuilder) Delete(blobName string, options *BatchDeleteOptions) error {
42+
err := bb.checkOperationType(exported.BatchDeleteOperationType)
43+
if err != nil {
44+
return err
45+
}
46+
47+
blobName = url.PathEscape(blobName)
48+
blobURL := runtime.JoinPaths(bb.endpoint, blobName)
49+
50+
blobClient, err := blob.NewClientWithNoCredential(blobURL, nil)
51+
if err != nil {
52+
return err
53+
}
54+
55+
deleteOptions, leaseInfo, accessConditions := options.format()
56+
req, err := getGeneratedBlobClient(blobClient).DeleteCreateRequest(context.TODO(), deleteOptions, leaseInfo, accessConditions)
57+
if err != nil {
58+
return err
59+
}
60+
61+
// remove x-ms-version header
62+
exported.UpdateSubRequestHeaders(req)
63+
64+
bb.subRequests = append(bb.subRequests, req)
65+
return nil
66+
}
67+
68+
// SetTier operation is used to add set tier sub-request to the batch builder.
69+
func (bb *BatchBuilder) SetTier(blobName string, accessTier blob.AccessTier, options *BatchSetTierOptions) error {
70+
err := bb.checkOperationType(exported.BatchSetTierOperationType)
71+
if err != nil {
72+
return err
73+
}
74+
75+
blobName = url.PathEscape(blobName)
76+
blobURL := runtime.JoinPaths(bb.endpoint, blobName)
77+
78+
blobClient, err := blob.NewClientWithNoCredential(blobURL, nil)
79+
if err != nil {
80+
return err
81+
}
82+
83+
setTierOptions, leaseInfo, accessConditions := options.format()
84+
req, err := getGeneratedBlobClient(blobClient).SetTierCreateRequest(context.TODO(), accessTier, setTierOptions, leaseInfo, accessConditions)
85+
if err != nil {
86+
return err
87+
}
88+
89+
// remove x-ms-version header
90+
exported.UpdateSubRequestHeaders(req)
91+
92+
bb.subRequests = append(bb.subRequests, req)
93+
return nil
94+
}

sdk/storage/azblob/container/client.go

Lines changed: 79 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,11 @@
77
package container
88

99
import (
10+
"bytes"
1011
"context"
12+
"errors"
13+
"fmt"
14+
"github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming"
1115
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
1216
"net/http"
1317
"net/url"
@@ -45,7 +49,7 @@ func NewClient(containerURL string, cred azcore.TokenCredential, options *Client
4549
conOptions.PerRetryPolicies = append(conOptions.PerRetryPolicies, authPolicy)
4650
pl := runtime.NewPipeline(exported.ModuleName, exported.ModuleVersion, runtime.PipelineOptions{}, &conOptions.ClientOptions)
4751

48-
return (*Client)(base.NewContainerClient(containerURL, pl, nil)), nil
52+
return (*Client)(base.NewContainerClient(containerURL, pl, &cred)), nil
4953
}
5054

5155
// NewClientWithNoCredential creates an instance of Client with the specified values.
@@ -102,6 +106,15 @@ func (c *Client) sharedKey() *SharedKeyCredential {
102106
return base.SharedKey((*base.Client[generated.ContainerClient])(c))
103107
}
104108

109+
func (c *Client) credential() any {
110+
return base.Credential((*base.Client[generated.ContainerClient])(c))
111+
}
112+
113+
// helper method to return the generated.BlobClient which is used for creating the sub-requests
114+
func getGeneratedBlobClient(b *blob.Client) *generated.BlobClient {
115+
return base.InnerClient((*base.Client[generated.BlobClient])(b))
116+
}
117+
105118
// URL returns the URL endpoint used by the Client object.
106119
func (c *Client) URL() string {
107120
return c.generated().Endpoint()
@@ -113,7 +126,7 @@ func (c *Client) URL() string {
113126
func (c *Client) NewBlobClient(blobName string) *blob.Client {
114127
blobName = url.PathEscape(blobName)
115128
blobURL := runtime.JoinPaths(c.URL(), blobName)
116-
return (*blob.Client)(base.NewBlobClient(blobURL, c.generated().Pipeline(), c.sharedKey()))
129+
return (*blob.Client)(base.NewBlobClient(blobURL, c.generated().Pipeline(), c.credential()))
117130
}
118131

119132
// NewAppendBlobClient creates a new appendblob.Client object by concatenating blobName to the end of
@@ -329,3 +342,67 @@ func (c *Client) GetSASURL(permissions sas.ContainerPermissions, expiry time.Tim
329342

330343
return endpoint, nil
331344
}
345+
346+
// NewBatchBuilder creates an instance of BatchBuilder using the same auth policy as the client.
347+
// BatchBuilder is used to build the batch consisting of either delete or set tier sub-requests.
348+
// All sub-requests in the batch must be of the same type, either delete or set tier.
349+
func (c *Client) NewBatchBuilder() (*BatchBuilder, error) {
350+
var authPolicy policy.Policy
351+
352+
switch cred := c.credential().(type) {
353+
case *azcore.TokenCredential:
354+
authPolicy = runtime.NewBearerTokenPolicy(*cred, []string{shared.TokenScope}, nil)
355+
case *SharedKeyCredential:
356+
authPolicy = exported.NewSharedKeyCredPolicy(cred)
357+
case nil:
358+
// for authentication using SAS
359+
authPolicy = nil
360+
default:
361+
return nil, fmt.Errorf("unrecognised authentication type %T", cred)
362+
}
363+
364+
return &BatchBuilder{
365+
endpoint: c.URL(),
366+
authPolicy: authPolicy,
367+
}, nil
368+
}
369+
370+
// SubmitBatch operation allows multiple API calls to be embedded into a single HTTP request.
371+
// It builds the request body using the BatchBuilder object passed.
372+
// BatchBuilder contains the list of operations to be submitted. It supports up to 256 sub-requests in a single batch.
373+
// For more information, see https://docs.microsoft.com/rest/api/storageservices/blob-batch.
374+
func (c *Client) SubmitBatch(ctx context.Context, bb *BatchBuilder, options *SubmitBatchOptions) (SubmitBatchResponse, error) {
375+
if bb == nil || len(bb.subRequests) == 0 {
376+
return SubmitBatchResponse{}, errors.New("batch builder is empty")
377+
}
378+
379+
// create the request body
380+
batchReq, batchID, err := exported.CreateBatchRequest(&exported.BlobBatchBuilder{
381+
AuthPolicy: bb.authPolicy,
382+
SubRequests: bb.subRequests,
383+
})
384+
if err != nil {
385+
return SubmitBatchResponse{}, err
386+
}
387+
388+
reader := bytes.NewReader(batchReq)
389+
rsc := streaming.NopCloser(reader)
390+
multipartContentType := "multipart/mixed; boundary=" + batchID
391+
392+
resp, err := c.generated().SubmitBatch(ctx, int64(len(batchReq)), multipartContentType, rsc, options.format())
393+
if err != nil {
394+
return SubmitBatchResponse{}, err
395+
}
396+
397+
batchResponses, err := exported.ParseBlobBatchResponse(resp.Body, resp.ContentType, bb.subRequests)
398+
if err != nil {
399+
return SubmitBatchResponse{}, err
400+
}
401+
402+
return SubmitBatchResponse{
403+
Responses: batchResponses,
404+
ContentType: resp.ContentType,
405+
RequestID: resp.RequestID,
406+
Version: resp.Version,
407+
}, nil
408+
}

0 commit comments

Comments
 (0)