Skip to content

Commit bb676d8

Browse files
[Storage][DataMovement] Refactor some internal transfer logic (Azure#36367)
1 parent fb11613 commit bb676d8

File tree

6 files changed

+211
-86
lines changed

6 files changed

+211
-86
lines changed

sdk/storage/Azure.Storage.DataMovement/src/JobPartInternal.cs

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -254,16 +254,6 @@ await QueueChunk(
254254
/// <returns>The task to wait until the cancellation has been triggered.</returns>
255255
internal async Task TriggerCancellationAsync()
256256
{
257-
// If stop on failure specified, cancel entire job.
258-
if (_errorHandling == ErrorHandlingOptions.StopOnAllFailures)
259-
{
260-
if (!_cancellationToken.IsCancellationRequested)
261-
{
262-
_dataTransfer._state.TriggerCancellation();
263-
}
264-
_dataTransfer._state.ResetTransferredBytes();
265-
}
266-
267257
// Set the status to Pause/CancellationInProgress
268258
if (StorageTransferStatus.PauseInProgress == _dataTransfer.TransferStatus)
269259
{
@@ -276,6 +266,7 @@ internal async Task TriggerCancellationAsync()
276266
// It's a cancellation if a pause wasn't called.
277267
await OnTransferStatusChanged(StorageTransferStatus.CancellationInProgress).ConfigureAwait(false);
278268
}
269+
await CleanupAbortedJobPartAsync().ConfigureAwait(false);
279270
}
280271

281272
/// <summary>
@@ -300,11 +291,7 @@ internal async Task OnTransferStatusChanged(StorageTransferStatus transferStatus
300291
{
301292
await InvokeSingleCompletedArg().ConfigureAwait(false);
302293
}
303-
if (JobPartStatus == StorageTransferStatus.Paused ||
304-
JobPartStatus == StorageTransferStatus.CompletedWithFailedTransfers)
305-
{
306-
await CleanupAbortedJobPartAsync().ConfigureAwait(false);
307-
}
294+
308295
// Set the status in the checkpointer
309296
await SetCheckpointerStatus(transferStatus).ConfigureAwait(false);
310297

@@ -510,7 +497,7 @@ internal static long ParseRangeTotalLength(string range)
510497
long absolutePosition = blockSize;
511498
long blockLength = acceptableBlockSize;
512499

513-
// TODO: divide up paritions based on how much array pool is left
500+
// TODO: divide up partitions based on how much array pool is left
514501
while (absolutePosition < streamLength)
515502
{
516503
// Return based on the size of the stream divided up by the acceptable blocksize.
@@ -526,11 +513,11 @@ internal async Task CheckAndUpdateCancellationStatusAsync()
526513
{
527514
if (_chunkTasks.All((Task task) => (task.IsCompleted)))
528515
{
529-
if (_dataTransfer.TransferStatus == StorageTransferStatus.PauseInProgress)
516+
if (JobPartStatus == StorageTransferStatus.PauseInProgress)
530517
{
531518
await OnTransferStatusChanged(StorageTransferStatus.Paused).ConfigureAwait(false);
532519
}
533-
else if (_dataTransfer.TransferStatus == StorageTransferStatus.CancellationInProgress)
520+
else if (JobPartStatus == StorageTransferStatus.CancellationInProgress)
534521
{
535522
await OnTransferStatusChanged(StorageTransferStatus.CompletedWithFailedTransfers).ConfigureAwait(false);
536523
}

sdk/storage/Azure.Storage.DataMovement/src/TransferJobInternal.cs

Lines changed: 33 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -260,21 +260,14 @@ public void DisposeHandlers()
260260
/// <returns>The task to wait until the cancellation has been triggered.</returns>
261261
public async Task TriggerJobCancellationAsync()
262262
{
263-
DisposeHandlers();
264263
if (!_dataTransfer._state.CancellationTokenSource.IsCancellationRequested)
265264
{
266265
_dataTransfer._state.TriggerCancellation();
267266
}
268-
// Set the status to Pause/CancellationInProgress
269-
if (StorageTransferStatus.PauseInProgress == _dataTransfer.TransferStatus)
270-
{
271-
// It's possible that the status hasn't propagated down to the job part
272-
// status yet here since we pause from the data transfer object.
273-
await OnJobStatusChangedAsync(StorageTransferStatus.PauseInProgress).ConfigureAwait(false);
274-
}
275-
else
267+
268+
// Set CancellationInProgress if we are not already pausing
269+
if (StorageTransferStatus.PauseInProgress != _dataTransfer.TransferStatus)
276270
{
277-
// It's a cancellation if a pause wasn't called.
278271
await OnJobStatusChangedAsync(StorageTransferStatus.CancellationInProgress).ConfigureAwait(false);
279272
}
280273
}
@@ -315,30 +308,33 @@ await TransferFailedEventHandler.Invoke(new TransferFailedEventArgs(
315308
/// </summary>
316309
public async Task JobPartEvent(TransferStatusEventArgs args)
317310
{
318-
StorageTransferStatus status = _dataTransfer._state.GetTransferStatus();
319-
if ((args.StorageTransferStatus == StorageTransferStatus.Paused ||
320-
args.StorageTransferStatus == StorageTransferStatus.Completed ||
321-
args.StorageTransferStatus == StorageTransferStatus.CompletedWithSkippedTransfers ||
322-
args.StorageTransferStatus == StorageTransferStatus.CompletedWithFailedTransfers)
323-
&& (status == StorageTransferStatus.Queued ||
324-
status == StorageTransferStatus.InProgress ||
325-
status == StorageTransferStatus.PauseInProgress ||
326-
status == StorageTransferStatus.CancellationInProgress))
311+
StorageTransferStatus jobPartStatus = args.StorageTransferStatus;
312+
StorageTransferStatus jobStatus = _dataTransfer._state.GetTransferStatus();
313+
314+
// Cancel the entire job if one job part fails and StopOnFailure is set
315+
if (_errorHandling == ErrorHandlingOptions.StopOnAllFailures &&
316+
jobPartStatus == StorageTransferStatus.CompletedWithFailedTransfers &&
317+
jobStatus != StorageTransferStatus.CompletedWithFailedTransfers &&
318+
jobStatus != StorageTransferStatus.CompletedWithSkippedTransfers &&
319+
jobStatus != StorageTransferStatus.Completed)
320+
{
321+
await TriggerJobCancellationAsync().ConfigureAwait(false);
322+
}
323+
324+
if ((jobPartStatus == StorageTransferStatus.Paused ||
325+
jobPartStatus == StorageTransferStatus.Completed ||
326+
jobPartStatus == StorageTransferStatus.CompletedWithSkippedTransfers ||
327+
jobPartStatus == StorageTransferStatus.CompletedWithFailedTransfers)
328+
&& (jobStatus == StorageTransferStatus.Queued ||
329+
jobStatus == StorageTransferStatus.InProgress ||
330+
jobStatus == StorageTransferStatus.PauseInProgress ||
331+
jobStatus == StorageTransferStatus.CancellationInProgress))
327332
{
328333
if (_enumerationComplete)
329334
{
330335
await CheckAndUpdateStatusAsync().ConfigureAwait(false);
331336
}
332337
}
333-
else if (args.StorageTransferStatus == StorageTransferStatus.Paused &&
334-
status == StorageTransferStatus.Paused)
335-
{
336-
await OnJobStatusChangedAsync(StorageTransferStatus.Paused).ConfigureAwait(false);
337-
}
338-
else if (args.StorageTransferStatus > status)
339-
{
340-
await OnJobStatusChangedAsync(args.StorageTransferStatus).ConfigureAwait(false);
341-
}
342338
}
343339

344340
public async Task OnJobStatusChangedAsync(StorageTransferStatus status)
@@ -350,6 +346,15 @@ public async Task OnJobStatusChangedAsync(StorageTransferStatus status)
350346
}
351347
if (statusChanged)
352348
{
349+
// If we are in a final state, dispose the JobPartEvent handlers
350+
if (status == StorageTransferStatus.Completed ||
351+
status == StorageTransferStatus.CompletedWithSkippedTransfers ||
352+
status == StorageTransferStatus.CompletedWithFailedTransfers ||
353+
status == StorageTransferStatus.Paused)
354+
{
355+
DisposeHandlers();
356+
}
357+
353358
if (TransferStatusEventHandler != null)
354359
{
355360
await TransferStatusEventHandler.Invoke(

sdk/storage/Azure.Storage.DataMovement/tests/PauseResumeTransferTests.cs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -447,7 +447,7 @@ public async Task PauseThenResumeTransferAsync(TransferType transferType)
447447
{
448448
ResumeFromCheckpointId = transfer.Id
449449
};
450-
TestEventsRaised resumeFailureHolder = new TestEventsRaised(resumeOptions);
450+
TestEventsRaised testEventRaised2 = new TestEventsRaised(resumeOptions);
451451
DataTransfer resumeTransfer = await CreateSingleLongTransferAsync(
452452
manager: transferManager,
453453
sourceResource: sourceResource,
@@ -458,7 +458,7 @@ public async Task PauseThenResumeTransferAsync(TransferType transferType)
458458
await resumeTransfer.AwaitCompletion(waitTransferCompletion.Token);
459459

460460
// Assert
461-
resumeFailureHolder.AssertPausedCheck();
461+
testEventRaised2.AssertContainerCompletedCheck(1);
462462
Assert.AreEqual(StorageTransferStatus.Completed, resumeTransfer.TransferStatus);
463463
Assert.IsTrue(resumeTransfer.HasCompleted);
464464

@@ -809,18 +809,19 @@ public async Task PauseThenResumeTransferAsync_Directory(TransferType transferTy
809809
{
810810
ResumeFromCheckpointId = transfer.Id
811811
};
812-
TestEventsRaised resumeFailureHolder = new TestEventsRaised(resumeOptions);
812+
TestEventsRaised testEventsRaised2 = new TestEventsRaised(resumeOptions);
813813
DataTransfer resumeTransfer = await CreateDirectoryLongTransferAsync(
814814
manager: transferManager,
815815
sourceResource: sourceResource,
816816
destinationResource: destinationResource,
817-
transferOptions: resumeOptions);
817+
transferOptions: resumeOptions,
818+
transferCount: 100);
818819

819820
CancellationTokenSource waitTransferCompletion = new CancellationTokenSource(TimeSpan.FromSeconds(600));
820821
await resumeTransfer.AwaitCompletion(waitTransferCompletion.Token);
821822

822823
// Assert
823-
resumeFailureHolder.AssertPausedCheck();
824+
testEventsRaised2.AssertContainerCompletedCheck(100);
824825
Assert.AreEqual(StorageTransferStatus.Completed, resumeTransfer.TransferStatus);
825826
Assert.IsTrue(resumeTransfer.HasCompleted);
826827

sdk/storage/Azure.Storage.DataMovement/tests/StartTransferDownloadDirectoryTests.cs

Lines changed: 79 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,79 @@ await DownloadBlobDirectoryAndVerify(
329329
destinationFolder).ConfigureAwait(false);
330330
}
331331

332+
[Test]
333+
[LiveOnly] // https://github.com/Azure/azure-sdk-for-net/issues/33082
334+
public async Task DownloadDirectoryAsync_SmallChunks_ManyFiles()
335+
{
336+
// Arrange
337+
int blobSize = 2 * Constants.KB;
338+
await using DisposingBlobContainer test = await GetTestContainerAsync();
339+
340+
using DisposingLocalDirectory testDirectory = DisposingLocalDirectory.GetTestDirectory();
341+
string tempFolder = CreateRandomDirectory(testDirectory.DirectoryPath);
342+
string blobDirectoryName = "foo";
343+
string fullSourceFolderPath = CreateRandomDirectory(tempFolder, blobDirectoryName);
344+
345+
List<string> blobNames = new List<string>();
346+
string blobName = Path.Combine(blobDirectoryName, GetNewBlobName());
347+
await CreateBlockBlobAndSourceFile(test.Container, tempFolder, blobName, blobSize);
348+
blobNames.Add(blobName);
349+
blobName = Path.Combine(blobDirectoryName, GetNewBlobName());
350+
await CreateBlockBlobAndSourceFile(test.Container, tempFolder, blobName, blobSize);
351+
blobNames.Add(blobName);
352+
blobName = Path.Combine(blobDirectoryName, GetNewBlobName());
353+
await CreateBlockBlobAndSourceFile(test.Container, tempFolder, blobName, blobSize);
354+
blobNames.Add(blobName);
355+
blobName = Path.Combine(blobDirectoryName, GetNewBlobName());
356+
await CreateBlockBlobAndSourceFile(test.Container, tempFolder, blobName, blobSize);
357+
blobNames.Add(blobName);
358+
blobName = Path.Combine(blobDirectoryName, GetNewBlobName());
359+
await CreateBlockBlobAndSourceFile(test.Container, tempFolder, blobName, blobSize);
360+
blobNames.Add(blobName);
361+
362+
string subDir1 = CreateRandomDirectory(fullSourceFolderPath, "bar").Substring(fullSourceFolderPath.Length + 1);
363+
blobName = Path.Combine(blobDirectoryName, subDir1, GetNewBlobName());
364+
await CreateBlockBlobAndSourceFile(test.Container, tempFolder, blobName, blobSize);
365+
blobNames.Add(blobName);
366+
blobName = Path.Combine(blobDirectoryName, subDir1, GetNewBlobName());
367+
await CreateBlockBlobAndSourceFile(test.Container, tempFolder, blobName, blobSize);
368+
blobNames.Add(blobName);
369+
blobName = Path.Combine(blobDirectoryName, subDir1, GetNewBlobName());
370+
await CreateBlockBlobAndSourceFile(test.Container, tempFolder, blobName, blobSize);
371+
blobNames.Add(blobName);
372+
string subDir2 = CreateRandomDirectory(fullSourceFolderPath, "rul").Substring(fullSourceFolderPath.Length + 1);
373+
blobName = Path.Combine(blobDirectoryName, subDir2, GetNewBlobName());
374+
await CreateBlockBlobAndSourceFile(test.Container, tempFolder, blobName, blobSize);
375+
blobNames.Add(blobName);
376+
blobName = Path.Combine(blobDirectoryName, subDir2, GetNewBlobName());
377+
await CreateBlockBlobAndSourceFile(test.Container, tempFolder, blobName, blobSize);
378+
blobNames.Add(blobName);
379+
380+
using DisposingLocalDirectory destinationFolder = DisposingLocalDirectory.GetTestDirectory();
381+
string sourceBlobPrefix = fullSourceFolderPath.Substring(tempFolder.Length + 1);
382+
383+
TransferManagerOptions transferManagerOptions = new TransferManagerOptions()
384+
{
385+
ErrorHandling = ErrorHandlingOptions.StopOnAllFailures,
386+
MaximumConcurrency = 3
387+
};
388+
TransferOptions options = new TransferOptions()
389+
{
390+
InitialTransferSize = 512,
391+
MaximumTransferChunkSize = 512
392+
};
393+
394+
// Act / Assert
395+
await DownloadBlobDirectoryAndVerify(
396+
sourceContainer: test.Container,
397+
sourceBlobPrefix: sourceBlobPrefix,
398+
sourceFilePrefix: fullSourceFolderPath,
399+
blobNames,
400+
destinationFolder.DirectoryPath,
401+
transferManagerOptions: transferManagerOptions,
402+
options: options).ConfigureAwait(false);
403+
}
404+
332405
[Test]
333406
[LiveOnly] // https://github.com/Azure/azure-sdk-for-net/issues/33082
334407
public async Task DownloadDirectoryAsync_Root()
@@ -436,7 +509,7 @@ public async Task StartTransfer_AwaitCompletion()
436509
options: options);
437510

438511
// Act
439-
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5));
512+
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(10));
440513
await transfer.AwaitCompletion(cancellationTokenSource.Token).ConfigureAwait(false);
441514

442515
// Assert
@@ -446,7 +519,6 @@ public async Task StartTransfer_AwaitCompletion()
446519
testEventsRaised.AssertContainerCompletedCheck(4);
447520
}
448521

449-
//[Ignore("https://github.com/Azure/azure-sdk-for-net/issues/35209")]
450522
[Test]
451523
[LiveOnly] // https://github.com/Azure/azure-sdk-for-net/issues/33082
452524
public async Task StartTransfer_AwaitCompletion_Failed()
@@ -473,7 +545,7 @@ public async Task StartTransfer_AwaitCompletion_Failed()
473545
options: options);
474546

475547
// Act
476-
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5));
548+
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(10));
477549
await transfer.AwaitCompletion(cancellationTokenSource.Token).ConfigureAwait(false);
478550

479551
// Assert
@@ -484,7 +556,6 @@ public async Task StartTransfer_AwaitCompletion_Failed()
484556
testEventsRaised.AssertContainerCompletedWithFailedCheck(1);
485557
}
486558

487-
//[Ignore("https://github.com/Azure/azure-sdk-for-net/issues/35209")]
488559
[Test]
489560
[LiveOnly] // https://github.com/Azure/azure-sdk-for-net/issues/33082
490561
public async Task StartTransfer_AwaitCompletion_Skipped()
@@ -512,7 +583,7 @@ public async Task StartTransfer_AwaitCompletion_Skipped()
512583
options: options);
513584

514585
// Act
515-
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5));
586+
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(10));
516587
await transfer.AwaitCompletion(cancellationTokenSource.Token).ConfigureAwait(false);
517588

518589
// Assert
@@ -542,7 +613,7 @@ public async Task StartTransfer_EnsureCompleted()
542613
options: options);
543614

544615
// Act
545-
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5));
616+
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(10));
546617
transfer.EnsureCompleted(cancellationTokenSource.Token);
547618

548619
// Assert
@@ -552,7 +623,6 @@ public async Task StartTransfer_EnsureCompleted()
552623
testEventsRaised.AssertContainerCompletedCheck(4);
553624
}
554625

555-
//[Ignore("https://github.com/Azure/azure-sdk-for-net/issues/35209")]
556626
[Test]
557627
[LiveOnly] // https://github.com/Azure/azure-sdk-for-net/issues/33082
558628
public async Task StartTransfer_EnsureCompleted_Failed()
@@ -579,7 +649,7 @@ public async Task StartTransfer_EnsureCompleted_Failed()
579649
options: options);
580650

581651
// Act
582-
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5));
652+
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(10));
583653
transfer.EnsureCompleted(cancellationTokenSource.Token);
584654

585655
// Assert
@@ -590,7 +660,6 @@ public async Task StartTransfer_EnsureCompleted_Failed()
590660
testEventsRaised.AssertContainerCompletedWithFailedCheck(1);
591661
}
592662

593-
//[Ignore("https://github.com/Azure/azure-sdk-for-net/issues/35209")]
594663
[Test]
595664
[LiveOnly] // https://github.com/Azure/azure-sdk-for-net/issues/33082
596665
public async Task StartTransfer_EnsureCompleted_Skipped()
@@ -618,7 +687,7 @@ public async Task StartTransfer_EnsureCompleted_Skipped()
618687
options: options);
619688

620689
// Act
621-
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5));
690+
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(10));
622691
transfer.EnsureCompleted(cancellationTokenSource.Token);
623692

624693
// Assert

0 commit comments

Comments
 (0)