Skip to content

Commit 0307c7c

Browse files
[BUG FIX] Client-side encryption respects StorageTransferOptions.InitialTransferSize (Azure#25392)
* encryption respects storage transfer options * more tests * resolved internal access compilation conflict * fixed test * changelog Co-authored-by: jschrepp-MSFT <41338290+jschrepp-MSFT@users.noreply.github.com>
1 parent b5f249b commit 0307c7c

File tree

12 files changed

+215
-23
lines changed

12 files changed

+215
-23
lines changed

sdk/storage/Azure.Storage.Blobs/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
- Added support for blob names container invalid XML characters.
77
- Added support for updating the key encryption key on a client-side encrypted blob.
88
- Fixed a bug where BlobClient.Upload() and UploadAsync() when using client-side encryption would modify the Dictionary instance passed by the caller for blob metadata.
9+
- Fixed a bug where BlobClient.Upload() and UploadAsync() when using client-side encryption would not respect StorageTransferOptions.InitialTransferSize.
910

1011
## 12.11.0-beta.1 (2021-11-03)
1112
- Added support for service version 2020-12-06.

sdk/storage/Azure.Storage.Blobs/src/BlobClient.cs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1622,14 +1622,20 @@ internal async Task<Response<BlobContentInfo>> StagedUploadInternal(
16221622
bool async = true,
16231623
CancellationToken cancellationToken = default)
16241624
{
1625+
long? expectedContentLength = null;
16251626
if (UsingClientSideEncryption)
16261627
{
16271628
if (UsingClientSideEncryption && options.TransactionalHashingOptions != default)
16281629
{
16291630
throw Errors.TransactionalHashingNotSupportedWithClientSideEncryption();
16301631
}
16311632

1632-
// content is now unseekable, so PartitionedUploader will be forced to do a buffered multipart upload
1633+
// if content length was known, we retain that for dividing REST requests appropriately
1634+
expectedContentLength = content.GetLengthOrDefault();
1635+
if (expectedContentLength.HasValue)
1636+
{
1637+
expectedContentLength = ClientSideEncryptor.ExpectedCiphertextLength(expectedContentLength.Value);
1638+
}
16331639
(content, options.Metadata) = await new BlobClientSideEncryptor(new ClientSideEncryptor(ClientSideEncryption))
16341640
.ClientSideEncryptInternal(content, options.Metadata, async, cancellationToken).ConfigureAwait(false);
16351641
}
@@ -1641,6 +1647,7 @@ internal async Task<Response<BlobContentInfo>> StagedUploadInternal(
16411647

16421648
return await uploader.UploadInternal(
16431649
content,
1650+
expectedContentLength,
16441651
options,
16451652
options?.ProgressHandler,
16461653
async,

sdk/storage/Azure.Storage.Blobs/src/BlockBlobClient.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -533,6 +533,7 @@ public virtual Response<BlobContentInfo> Upload(
533533

534534
return uploader.UploadInternal(
535535
content,
536+
expectedContentLength: default,
536537
options,
537538
options.ProgressHandler,
538539
async: false,
@@ -589,6 +590,7 @@ public virtual async Task<Response<BlobContentInfo>> UploadAsync(
589590

590591
return await uploader.UploadInternal(
591592
content,
593+
expectedContentLength: default,
592594
options,
593595
options.ProgressHandler,
594596
async: true,
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
using Azure.Storage.Blobs.Specialized;
5+
using BlobsClientBuilder = Azure.Storage.Test.Shared.ClientBuilder<
6+
Azure.Storage.Blobs.BlobServiceClient,
7+
Azure.Storage.Blobs.BlobClientOptions>;
8+
9+
/**
10+
* Azure.Storage.Blobs.Batch (and any other sub-package of blobs) finds itself in the situation
11+
* where it cannot have access to Azure.Storage.Blobs internals. It requires access to
12+
* Azure.Storage.Blobs.Batch internals, which shares compile-includes with Azure.Storage.Blobs.
13+
* Some client builder extensions require access to the internals of blobs, but batch needs
14+
* general access to client builder extensions. This file contains extensions that require access
15+
* to blobs internals, and would cause compile conflicts were other packages to have access to those
16+
* internals.
17+
*/
18+
namespace Azure.Storage.Blobs.Tests
19+
{
20+
public static partial class ClientBuilderExtensions
21+
{
22+
public static BlockBlobClient ToBlockBlobClient(
23+
this BlobsClientBuilder clientBuilder,
24+
BlobBaseClient client)
25+
=> clientBuilder.AzureCoreRecordedTestBase.InstrumentClient(new BlockBlobClient(client.Uri, client.ClientConfiguration));
26+
}
27+
}

sdk/storage/Azure.Storage.Blobs/tests/ClientBuilderExtensions.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,15 @@
55
using System.Collections.Generic;
66
using System.Threading.Tasks;
77
using Azure.Storage.Blobs.Models;
8+
using Azure.Storage.Blobs.Specialized;
89
using Azure.Storage.Test.Shared;
910
using BlobsClientBuilder = Azure.Storage.Test.Shared.ClientBuilder<
1011
Azure.Storage.Blobs.BlobServiceClient,
1112
Azure.Storage.Blobs.BlobClientOptions>;
1213

1314
namespace Azure.Storage.Blobs.Tests
1415
{
15-
public static class ClientBuilderExtensions
16+
public static partial class ClientBuilderExtensions
1617
{
1718
/// <summary>
1819
/// Creates a new <see cref="ClientBuilder{TServiceClient, TServiceClientOptions}"/>

sdk/storage/Azure.Storage.Blobs/tests/ClientSideEncryptionTests.cs

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1058,6 +1058,61 @@ await encryptedBlobClient.UploadAsync(
10581058
}
10591059
}
10601060

1061+
[Test]
1062+
[LiveOnly]
1063+
/// <summary>
1064+
/// Crypto transform streams are unseekable and have no <see cref="Stream.Length"/>.
1065+
/// When length is unknown, <see cref="PartitionedUploader{TServiceSpecificArgs, TCompleteUploadReturn}"/>
1066+
/// doesn't even attempt a one-shot upload.
1067+
/// This tests if we correctly inform the uploader of an expected stream length so it
1068+
/// can respect the given <see cref="StorageTransferOptions"/>.
1069+
/// </summary>
1070+
public async Task PutBlobPutBlockSwitch([Values(true, false)] bool oneshot)
1071+
{
1072+
const int dataSize = 1 * Constants.KB;
1073+
1074+
// Arrange
1075+
byte[] data = GetRandomBuffer(dataSize);
1076+
int transferSize = oneshot
1077+
? 2 * dataSize // big enough for put blob even after AES-CBC PKCS7 padding
1078+
: dataSize / 2;
1079+
StorageTransferOptions transferOptions = new StorageTransferOptions
1080+
{
1081+
InitialTransferSize = transferSize,
1082+
MaximumTransferSize = transferSize
1083+
};
1084+
1085+
IKeyEncryptionKey key = GetIKeyEncryptionKey().Object;
1086+
await using var disposable = await GetTestContainerEncryptionAsync(
1087+
new ClientSideEncryptionOptions(ClientSideEncryptionVersion.V1_0)
1088+
{
1089+
KeyEncryptionKey = key,
1090+
KeyWrapAlgorithm = s_algorithmName
1091+
});
1092+
var blob = disposable.Container.GetBlobClient(GetNewBlobName());
1093+
1094+
// Act
1095+
await blob.UploadAsync(
1096+
new MemoryStream(data),
1097+
new BlobUploadOptions { TransferOptions = transferOptions },
1098+
cancellationToken: s_cancellationToken);
1099+
1100+
// Assert
1101+
Assert.IsTrue(await blob.ExistsAsync());
1102+
Assert.Greater((await blob.GetPropertiesAsync()).Value.ContentLength, 0);
1103+
// block list will return empty when putblob was used
1104+
BlockList blockList = await BlobsClientBuilder.ToBlockBlobClient(blob).GetBlockListAsync();
1105+
Assert.IsEmpty(blockList.UncommittedBlocks);
1106+
if (oneshot)
1107+
{
1108+
Assert.IsEmpty(blockList.CommittedBlocks);
1109+
}
1110+
else
1111+
{
1112+
Assert.IsNotEmpty(blockList.CommittedBlocks);
1113+
}
1114+
}
1115+
10611116
[RecordedTest]
10621117
public void CanGenerateSas_WithClientSideEncryptionOptions_True()
10631118
{

sdk/storage/Azure.Storage.Blobs/tests/PartitionedUploaderTests.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,7 @@ private async Task<Response<BlobContentInfo>> InvokeUploadAsync(PartitionedUploa
318318
{
319319
return await uploader.UploadInternal(
320320
content,
321+
expectedContentLength: default,
321322
new BlobUploadOptions
322323
{
323324
HttpHeaders = s_blobHttpHeaders,

sdk/storage/Azure.Storage.Common/src/Shared/ClientsideEncryption/ClientSideEncryptor.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,14 @@ public ClientSideEncryptor(ClientSideEncryptionOptions options)
2121
_keyWrapAlgorithm = options.KeyWrapAlgorithm;
2222
}
2323

24+
public static long ExpectedCiphertextLength(long plaintextLength)
25+
{
26+
const int aesBlockSizeBytes = 16;
27+
28+
// pkcs7 padding output length algorithm
29+
return plaintextLength + (aesBlockSizeBytes - (plaintextLength % aesBlockSizeBytes));
30+
}
31+
2432
/// <summary>
2533
/// Wraps the given read-stream in a CryptoStream and provides the metadata used to create
2634
/// that stream.

sdk/storage/Azure.Storage.Common/src/Shared/PartitionedUploader.cs

Lines changed: 42 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
using Azure.Core.Pipeline;
1212
using Azure.Storage.Shared;
1313

14+
#pragma warning disable SA1402 // File may only contain a single type
15+
1416
namespace Azure.Storage
1517
{
1618
internal class PartitionedUploader<TServiceSpecificArgs, TCompleteUploadReturn>
@@ -162,6 +164,7 @@ public PartitionedUploader(
162164

163165
public async Task<Response<TCompleteUploadReturn>> UploadInternal(
164166
Stream content,
167+
long? expectedContentLength,
165168
TServiceSpecificArgs args,
166169
IProgress<long> progressHandler,
167170
bool async,
@@ -184,11 +187,27 @@ public async Task<Response<TCompleteUploadReturn>> UploadInternal(
184187
// some strategies are unavailable if we don't know the stream length, and some can still work
185188
// we may introduce separately provided stream lengths in the future for unseekable streams with
186189
// an expected length
187-
long? length = GetLengthOrDefault(content);
190+
long? length = expectedContentLength ?? content.GetLengthOrDefault();
188191

189192
// If we know the length and it's small enough
190193
if (length < _singleUploadThreshold)
191194
{
195+
// may not be seekable. buffer if that's the case
196+
if (!content.CanSeek)
197+
{
198+
content = await PooledMemoryStream.BufferStreamPartitionInternal(
199+
content,
200+
// we've passed a comparison on length; we know there is a value
201+
length.Value,
202+
length.Value,
203+
// for the purposes of a one-shot, absolutePosition is always zero
204+
absolutePosition: 0,
205+
_arrayPool,
206+
maxArrayPoolRentalSize: default,
207+
async,
208+
cancellationToken).ConfigureAwait(false);
209+
}
210+
192211
// Upload it in a single request
193212
return await _singleUploadInternal(
194213
content,
@@ -472,25 +491,6 @@ await _uploadPartitionInternal(
472491
}
473492
}
474493

475-
/// <summary>
476-
/// Some streams will throw if you try to access their length so we wrap
477-
/// the check in a TryGet helper.
478-
/// </summary>
479-
private static long? GetLengthOrDefault(Stream content)
480-
{
481-
try
482-
{
483-
if (content.CanSeek)
484-
{
485-
return content.Length - content.Position;
486-
}
487-
}
488-
catch (NotSupportedException)
489-
{
490-
}
491-
return default;
492-
}
493-
494494
#region Stream Splitters
495495
/// <summary>
496496
/// Partition a stream into a series of blocks buffered as needed by an array pool.
@@ -630,4 +630,26 @@ private static Task<SlicedStream> GetStreamedPartitionInternal(
630630
=> Task.FromResult((SlicedStream)WindowStream.GetWindow(stream, maxCount, absolutePosition));
631631
#endregion
632632
}
633+
634+
internal static partial class StreamExtensions
635+
{
636+
/// <summary>
637+
/// Some streams will throw if you try to access their length so we wrap
638+
/// the check in a TryGet helper.
639+
/// </summary>
640+
public static long? GetLengthOrDefault(this Stream content)
641+
{
642+
try
643+
{
644+
if (content.CanSeek)
645+
{
646+
return content.Length - content.Position;
647+
}
648+
}
649+
catch (NotSupportedException)
650+
{
651+
}
652+
return default;
653+
}
654+
}
633655
}

sdk/storage/Azure.Storage.Common/tests/PartitionedUploaderTests.cs

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
using System.Threading;
99
using System.Threading.Tasks;
1010
using Azure.Core.Pipeline;
11+
using Azure.Core.TestFramework;
1112
using Azure.Storage.Test;
1213
using Moq;
1314
using NUnit.Framework;
@@ -134,10 +135,75 @@ public async Task AlwaysUploadSeekableStreamsSequentialUploads(int streamSize, i
134135
s_hashingOptions,
135136
operationName: s_operationName);
136137

137-
Response<object> result = await partitionedUploader.UploadInternal(stream.Object, s_objectArgs, s_progress, IsAsync, s_cancellation).ConfigureAwait(false);
138+
Response<object> result = await partitionedUploader.UploadInternal(stream.Object, default, s_objectArgs, s_progress, IsAsync, s_cancellation).ConfigureAwait(false);
138139

139140
// assert streams were actually sent to delegates; the delegates themselves threw if conditions weren't met
140141
Assert.Greater(singleUpload.Invocations.Count + uploadPartition.Invocations.Count, 0);
141142
}
143+
144+
[Test]
145+
public async Task InterpretsLengthNonSeekableStream([Values(true, false)] bool oneshot)
146+
{
147+
// Arrange
148+
const int dataSize = Constants.KB;
149+
const int numPartitions = 2;
150+
var data = TestHelper.GetRandomBuffer(dataSize);
151+
int blockSize = oneshot ? dataSize * 2 : dataSize / numPartitions;
152+
153+
var stream = new Mock<MemoryStream>(MockBehavior.Loose, data);
154+
stream.CallBase = true;
155+
156+
// make stream unseekable (can't get length from stream)
157+
stream.SetupGet(s => s.CanSeek).Returns(false);
158+
stream.SetupGet(s => s.Position).Throws(new NotSupportedException());
159+
stream.SetupSet(s => s.Position = default).Throws(new NotSupportedException());
160+
stream.Setup(s => s.Seek(It.IsAny<long>(), It.IsAny<SeekOrigin>())).Throws(new NotSupportedException());
161+
stream.SetupGet(s => s.Length).Throws(new NotSupportedException());
162+
stream.Setup(s => s.SetLength(It.IsAny<long>())).Throws(new NotSupportedException());
163+
164+
// confirm our stream cannot give a length
165+
Assert.Throws<NotSupportedException>(() => _ = stream.Object.Length);
166+
167+
var createScope = GetMockCreateScope();
168+
var initializeDestination = GetMockInitializeDestinationInternal();
169+
var singleUpload = GetMockSingleUploadInternal(dataSize);
170+
var uploadPartition = GetMockUploadPartitionInternal(blockSize);
171+
var commitPartitions = GetMockCommitPartitionedUploadInternal();
172+
var partitionedUploader = new PartitionedUploader<object, object>(
173+
new PartitionedUploader<object, object>.Behaviors
174+
{
175+
Scope = createScope.Object,
176+
InitializeDestination = initializeDestination.Object,
177+
SingleUpload = singleUpload.Object,
178+
UploadPartition = uploadPartition.Object,
179+
CommitPartitionedUpload = commitPartitions.Object
180+
},
181+
new StorageTransferOptions()
182+
{
183+
InitialTransferSize = blockSize,
184+
MaximumTransferSize = blockSize,
185+
MaximumConcurrency = 1
186+
},
187+
s_hashingOptions,
188+
operationName: s_operationName);
189+
190+
// Act
191+
// give uploader an expected content length for unseekable stream
192+
Response<object> result = await partitionedUploader.UploadInternal(stream.Object, dataSize, s_objectArgs, s_progress, IsAsync, s_cancellation);
193+
194+
// Assert
195+
if (oneshot)
196+
{
197+
Assert.AreEqual(1, singleUpload.Invocations.Count);
198+
Assert.IsEmpty(uploadPartition.Invocations);
199+
Assert.IsEmpty(commitPartitions.Invocations);
200+
}
201+
else
202+
{
203+
Assert.IsEmpty(singleUpload.Invocations);
204+
Assert.AreEqual(numPartitions, uploadPartition.Invocations.Count);
205+
Assert.AreEqual(1, commitPartitions.Invocations.Count);
206+
}
207+
}
142208
}
143209
}

0 commit comments

Comments
 (0)