Skip to content

Commit 123fc38

Browse files
authored
Fix uploads larger than 256MB (Azure#19658)
* Fix uploads larger than 256MB Uploads larger than 256MB could fail depending on their size. Refactored some internal code to follow convention. * add another test * fix comment * refine changelog entry
1 parent 028cadc commit 123fc38

File tree

6 files changed

+95
-10
lines changed

6 files changed

+95
-10
lines changed

sdk/storage/azblob/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
* Corrected signing of User Delegation SAS. Fixes [#19372](https://github.com/Azure/azure-sdk-for-go/issues/19372) and [#19454](https://github.com/Azure/azure-sdk-for-go/issues/19454)
2121
* Added formatting of start and expiry time in [SetAccessPolicy](https://learn.microsoft.com/rest/api/storageservices/set-container-acl#request-body). Fixes [#18712](https://github.com/Azure/azure-sdk-for-go/issues/18712)
22+
* Uploading block blobs larger than 256MB can fail in some cases with error `net/http: HTTP/1.x transport connection broken`.
2223

2324
### Other Changes
2425

sdk/storage/azblob/blob/client.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -336,8 +336,7 @@ func (b *Client) download(ctx context.Context, writer io.WriterAt, o downloadOpt
336336
TransferSize: count,
337337
ChunkSize: o.BlockSize,
338338
Concurrency: o.Concurrency,
339-
Operation: func(chunkStart int64, count int64, ctx context.Context) error {
340-
339+
Operation: func(ctx context.Context, chunkStart int64, count int64) error {
341340
downloadBlobOptions := o.getDownloadBlobOptions(HTTPRange{
342341
Offset: chunkStart + o.Range.Offset,
343342
Count: count,

sdk/storage/azblob/blockblob/client.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,8 @@ func (bb *Client) CopyFromURL(ctx context.Context, copySource string, o *blob.Co
362362
// Concurrent Upload Functions -----------------------------------------------------------------------------------------
363363

364364
// uploadFromReader uploads a buffer in blocks to a block blob.
365-
func (bb *Client) uploadFromReader(ctx context.Context, reader io.ReaderAt, readerSize int64, o *uploadFromReaderOptions) (uploadFromReaderResponse, error) {
365+
func (bb *Client) uploadFromReader(ctx context.Context, reader io.ReaderAt, actualSize int64, o *uploadFromReaderOptions) (uploadFromReaderResponse, error) {
366+
readerSize := actualSize
366367
if o.BlockSize == 0 {
367368
// If bufferSize > (MaxStageBlockBytes * MaxBlocks), then error
368369
if readerSize > MaxStageBlockBytes*MaxBlocks {
@@ -412,11 +413,17 @@ func (bb *Client) uploadFromReader(ctx context.Context, reader io.ReaderAt, read
412413
TransferSize: readerSize,
413414
ChunkSize: o.BlockSize,
414415
Concurrency: o.Concurrency,
415-
Operation: func(offset int64, count int64, ctx context.Context) error {
416+
Operation: func(ctx context.Context, offset int64, chunkSize int64) error {
416417
// This function is called once per block.
417418
// It is passed this block's offset within the buffer and its count of bytes
418419
// Prepare to read the proper block/section of the buffer
419-
var body io.ReadSeeker = io.NewSectionReader(reader, offset, count)
420+
if chunkSize < o.BlockSize {
421+
// this is the last block. its actual size might be less
422+
// than the calculated size due to rounding up of the payload
423+
// size to fit in a whole number of blocks.
424+
chunkSize = (actualSize - offset)
425+
}
426+
var body io.ReadSeeker = io.NewSectionReader(reader, offset, chunkSize)
420427
blockNum := offset / o.BlockSize
421428
if o.Progress != nil {
422429
blockProgress := int64(0)

sdk/storage/azblob/blockblob/client_test.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"context"
1212
"crypto/md5"
1313
"encoding/base64"
14+
"errors"
1415
"fmt"
1516
"io"
1617
"net/http"
@@ -20,6 +21,7 @@ import (
2021
"time"
2122

2223
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
24+
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
2325
"github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming"
2426
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
2527
"github.com/Azure/azure-sdk-for-go/sdk/internal/recording"
@@ -3843,3 +3845,80 @@ func (s *BlockBlobUnrecordedTestsSuite) TestLargeBlockBufferedUploadInParallel()
38433845
_require.Equal(*(committed[0].Size), largeBlockSize)
38443846
_require.Equal(*(committed[1].Size), largeBlockSize)
38453847
}
3848+
3849+
type fakeBlockBlob struct {
3850+
totalStaged int64
3851+
}
3852+
3853+
func (f *fakeBlockBlob) Do(req *http.Request) (*http.Response, error) {
3854+
// verify that the number of bytes read matches what was specified
3855+
data := make([]byte, req.ContentLength)
3856+
read, err := req.Body.Read(data)
3857+
if err != nil && !errors.Is(err, io.EOF) {
3858+
return nil, err
3859+
} else if int64(read) < req.ContentLength {
3860+
return nil, fmt.Errorf("expected %d bytes, read %d", req.ContentLength, read)
3861+
}
3862+
qp := req.URL.Query()
3863+
if comp := qp.Get("comp"); comp == "block" {
3864+
// staging a block, record its size
3865+
f.totalStaged += int64(read)
3866+
}
3867+
return &http.Response{
3868+
Request: req,
3869+
Status: "Created",
3870+
StatusCode: http.StatusCreated,
3871+
Header: http.Header{},
3872+
Body: http.NoBody,
3873+
}, nil
3874+
}
3875+
3876+
func TestUploadBufferUnevenBlockSize(t *testing.T) {
3877+
fbb := &fakeBlockBlob{}
3878+
client, err := blockblob.NewClientWithNoCredential("https://fake/blob/path", &blockblob.ClientOptions{
3879+
ClientOptions: policy.ClientOptions{
3880+
Transport: fbb,
3881+
},
3882+
})
3883+
require.NoError(t, err)
3884+
require.NotNil(t, client)
3885+
3886+
// create fake source that's not evenly divisible by 50000 (max number of blocks)
3887+
// and greater than MaxUploadBlobBytes (256MB) so that it doesn't fit into a single upload.
3888+
3889+
buffer := make([]byte, 263*1024*1024)
3890+
for i := 0; i < len(buffer); i++ {
3891+
buffer[i] = 1
3892+
}
3893+
3894+
_, err = client.UploadBuffer(context.Background(), buffer, &blockblob.UploadBufferOptions{
3895+
Concurrency: 1,
3896+
})
3897+
require.NoError(t, err)
3898+
require.Equal(t, int64(len(buffer)), fbb.totalStaged)
3899+
}
3900+
3901+
func TestUploadBufferEvenBlockSize(t *testing.T) {
3902+
fbb := &fakeBlockBlob{}
3903+
client, err := blockblob.NewClientWithNoCredential("https://fake/blob/path", &blockblob.ClientOptions{
3904+
ClientOptions: policy.ClientOptions{
3905+
Transport: fbb,
3906+
},
3907+
})
3908+
require.NoError(t, err)
3909+
require.NotNil(t, client)
3910+
3911+
// create fake source that's evenly divisible by 50000 (max number of blocks)
3912+
// and greater than MaxUploadBlobBytes (256MB) so that it doesn't fit into a single upload.
3913+
3914+
buffer := make([]byte, 270000000)
3915+
for i := 0; i < len(buffer); i++ {
3916+
buffer[i] = 1
3917+
}
3918+
3919+
_, err = client.UploadBuffer(context.Background(), buffer, &blockblob.UploadBufferOptions{
3920+
Concurrency: 1,
3921+
})
3922+
require.NoError(t, err)
3923+
require.Equal(t, int64(len(buffer)), fbb.totalStaged)
3924+
}

sdk/storage/azblob/client_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -512,7 +512,7 @@ func (s *AZBlobUnrecordedTestsSuite) TestBasicDoBatchTransfer() {
512512
TransferSize: test.transferSize,
513513
ChunkSize: test.chunkSize,
514514
Concurrency: test.concurrency,
515-
Operation: func(offset int64, chunkSize int64, ctx context.Context) error {
515+
Operation: func(ctx context.Context, offset int64, chunkSize int64) error {
516516
atomic.AddInt64(&totalSizeCount, chunkSize)
517517
atomic.AddInt64(&runCount, 1)
518518
return nil
@@ -554,7 +554,7 @@ func (s *AZBlobUnrecordedTestsSuite) TestDoBatchTransferWithError() {
554554
TransferSize: 5,
555555
ChunkSize: 1,
556556
Concurrency: 5,
557-
Operation: func(offset int64, chunkSize int64, ctx context.Context) error {
557+
Operation: func(ctx context.Context, offset int64, chunkSize int64) error {
558558
// simulate doing some work (HTTP call in real scenarios)
559559
// later chunks later longer to finish
560560
time.Sleep(time.Second * time.Duration(offset))

sdk/storage/azblob/internal/shared/batch_transfer.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ type BatchTransferOptions struct {
1616
TransferSize int64
1717
ChunkSize int64
1818
Concurrency uint16
19-
Operation func(offset int64, chunkSize int64, ctx context.Context) error
19+
Operation func(ctx context.Context, offset int64, chunkSize int64) error
2020
OperationName string
2121
}
2222

@@ -57,9 +57,8 @@ func DoBatchTransfer(ctx context.Context, o *BatchTransferOptions) error {
5757
curChunkSize = o.TransferSize - (int64(chunkNum) * o.ChunkSize) // Remove size of all transferred chunks from total
5858
}
5959
offset := int64(chunkNum) * o.ChunkSize
60-
6160
operationChannel <- func() error {
62-
return o.Operation(offset, curChunkSize, ctx)
61+
return o.Operation(ctx, offset, curChunkSize)
6362
}
6463
}
6564
close(operationChannel)

0 commit comments

Comments
 (0)