Skip to content

Commit ad0fc49

Browse files
authored
Append and Upload with BinaryData for Data Lake (Azure#30366)
1 parent 5485b56 commit ad0fc49

File tree

12 files changed

+945
-2
lines changed

12 files changed

+945
-2
lines changed

sdk/storage/azure-storage-file-datalake/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
## 12.12.0-beta.1 (Unreleased)
44

55
### Features Added
6+
- Added upload and append methods on DataLakeFileClient and DataLakeFileAsyncClient that support BinaryData.
67

78
### Breaking Changes
89

sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/DataLakeFileAsyncClient.java

Lines changed: 153 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import com.azure.core.http.HttpResponse;
1212
import com.azure.core.http.rest.Response;
1313
import com.azure.core.http.rest.SimpleResponse;
14+
import com.azure.core.util.BinaryData;
1415
import com.azure.core.util.Context;
1516
import com.azure.core.util.Contexts;
1617
import com.azure.core.util.DateTimeRfc1123;
@@ -66,6 +67,7 @@
6667
import java.util.HashSet;
6768
import java.util.List;
6869
import java.util.Map;
70+
import java.util.Objects;
6971
import java.util.Set;
7072
import java.util.function.BiFunction;
7173
import java.util.function.Function;
@@ -332,6 +334,36 @@ public Mono<PathInfo> upload(Flux<ByteBuffer> data, ParallelTransferOptions para
332334
return upload(data, parallelTransferOptions, false);
333335
}
334336

337+
/**
338+
* Creates a new file and uploads content.
339+
*
340+
* <p><strong>Code Samples</strong></p>
341+
*
342+
* <!-- src_embed com.azure.storage.file.datalake.DataLakeFileAsyncClient.upload#BinaryData-ParallelTransferOptions -->
343+
* <pre>
344+
* Long blockSize = 100L * 1024L * 1024L; &#47;&#47; 100 MB;
345+
* ParallelTransferOptions pto = new ParallelTransferOptions&#40;&#41;
346+
* .setBlockSizeLong&#40;blockSize&#41;
347+
* .setProgressListener&#40;bytesTransferred -&gt; System.out.printf&#40;&quot;Upload progress: %s bytes sent&quot;, bytesTransferred&#41;&#41;;
348+
*
349+
* BinaryData.fromFlux&#40;data, length, false&#41;
350+
* .flatMap&#40;binaryData -&gt; client.upload&#40;binaryData, pto&#41;&#41;
351+
* .doOnError&#40;throwable -&gt; System.err.printf&#40;&quot;Failed to upload %s%n&quot;, throwable.getMessage&#40;&#41;&#41;&#41;
352+
* .subscribe&#40;completion -&gt; System.out.println&#40;&quot;Upload succeeded&quot;&#41;&#41;;
353+
* </pre>
354+
* <!-- end com.azure.storage.file.datalake.DataLakeFileAsyncClient.upload#BinaryData-ParallelTransferOptions -->
355+
*
356+
* @param data The data to write to the file. Unlike other upload methods, this method does not require that the
357+
* {@code Flux} be replayable. In other words, it does not have to support multiple subscribers and is not expected
358+
* to produce the same values across subscriptions.
359+
* @param parallelTransferOptions {@link ParallelTransferOptions} used to configure buffered uploading.
360+
* @return A reactive response containing the information of the uploaded file.
361+
*/
362+
@ServiceMethod(returns = ReturnType.SINGLE)
363+
public Mono<PathInfo> upload(BinaryData data, ParallelTransferOptions parallelTransferOptions) {
364+
return upload(data, parallelTransferOptions, false);
365+
}
366+
335367
/**
336368
* Creates a new file and uploads content.
337369
*
@@ -376,6 +408,54 @@ public Mono<PathInfo> upload(Flux<ByteBuffer> data, ParallelTransferOptions para
376408
.flatMap(FluxUtil::toMono);
377409
}
378410

411+
/**
412+
* Creates a new file and uploads content.
413+
*
414+
* <p><strong>Code Samples</strong></p>
415+
*
416+
* <!-- src_embed com.azure.storage.file.datalake.DataLakeFileAsyncClient.upload#BinaryData-ParallelTransferOptions-boolean -->
417+
* <pre>
418+
* Long blockSize = 100L * 1024L * 1024L; &#47;&#47; 100 MB;
419+
* ParallelTransferOptions pto = new ParallelTransferOptions&#40;&#41;
420+
* .setBlockSizeLong&#40;blockSize&#41;
421+
* .setProgressListener&#40;bytesTransferred -&gt; System.out.printf&#40;&quot;Upload progress: %s bytes sent&quot;, bytesTransferred&#41;&#41;;
422+
*
423+
* BinaryData.fromFlux&#40;data, length, false&#41;
424+
* .flatMap&#40;binaryData -&gt; client.upload&#40;binaryData, pto, true&#41;&#41;
425+
* .doOnError&#40;throwable -&gt; System.err.printf&#40;&quot;Failed to upload %s%n&quot;, throwable.getMessage&#40;&#41;&#41;&#41;
426+
* .subscribe&#40;completion -&gt; System.out.println&#40;&quot;Upload succeeded&quot;&#41;&#41;;
427+
* </pre>
428+
* <!-- end com.azure.storage.file.datalake.DataLakeFileAsyncClient.upload#BinaryData-ParallelTransferOptions-boolean -->
429+
*
430+
* @param data The data to write to the file. Unlike other upload methods, this method does not require that the
431+
* {@code Flux} be replayable. In other words, it does not have to support multiple subscribers and is not expected
432+
* to produce the same values across subscriptions.
433+
* @param parallelTransferOptions {@link ParallelTransferOptions} used to configure buffered uploading.
434+
* @param overwrite Whether to overwrite, should the file already exist.
435+
* @return A reactive response containing the information of the uploaded file.
436+
*/
437+
@ServiceMethod(returns = ReturnType.SINGLE)
438+
public Mono<PathInfo> upload(BinaryData data, ParallelTransferOptions parallelTransferOptions, boolean overwrite) {
439+
Mono<Void> overwriteCheck;
440+
DataLakeRequestConditions requestConditions;
441+
442+
if (overwrite) {
443+
overwriteCheck = Mono.empty();
444+
requestConditions = null;
445+
} else {
446+
overwriteCheck = exists().flatMap(exists -> exists
447+
? monoError(LOGGER, new IllegalArgumentException(Constants.BLOB_ALREADY_EXISTS))
448+
: Mono.empty());
449+
requestConditions = new DataLakeRequestConditions()
450+
.setIfNoneMatch(Constants.HeaderConstants.ETAG_WILDCARD);
451+
}
452+
453+
return overwriteCheck
454+
.then(uploadWithResponse(new FileParallelUploadOptions(data)
455+
.setParallelTransferOptions(parallelTransferOptions).setRequestConditions(requestConditions)))
456+
.flatMap(FluxUtil::toMono);
457+
}
458+
379459
/**
380460
* Creates a new file.
381461
* To avoid overwriting, pass "*" to {@link DataLakeRequestConditions#setIfNoneMatch(String)}.
@@ -526,7 +606,10 @@ public Mono<Response<PathInfo>> uploadWithResponse(FileParallelUploadOptions opt
526606
fileOffset, length, options.getHeaders(), validatedUploadRequestConditions,
527607
validatedParallelTransferOptions.getProgressListener());
528608

529-
Flux<ByteBuffer> data = options.getDataFlux();
609+
BinaryData binaryData = options.getData();
610+
611+
// if BinaryData is present, convert it to Flux Byte Buffer
612+
Flux<ByteBuffer> data = binaryData != null ? binaryData.toFluxByteBuffer() : options.getDataFlux();
530613
// no specified length: use azure.core's converter
531614
if (data == null && options.getOptionalLength() == null) {
532615
// We can only buffer up to max int due to restrictions in ByteBuffer.
@@ -843,6 +926,34 @@ public Mono<Void> append(Flux<ByteBuffer> data, long fileOffset, long length) {
843926
return appendWithResponse(data, fileOffset, length, null, null).flatMap(FluxUtil::toMono);
844927
}
845928

929+
/**
930+
* Appends data to the specified resource to later be flushed (written) by a call to flush
931+
*
932+
* <p><strong>Code Samples</strong></p>
933+
*
934+
* <!-- src_embed com.azure.storage.file.datalake.DataLakeFileAsyncClient.append#Flux-long-long -->
935+
* <pre>
936+
* client.append&#40;data, offset, length&#41;
937+
* .subscribe&#40;
938+
* response -&gt; System.out.println&#40;&quot;Append data completed&quot;&#41;,
939+
* error -&gt; System.out.printf&#40;&quot;Error when calling append data: %s&quot;, error&#41;&#41;;
940+
* </pre>
941+
* <!-- end com.azure.storage.file.datalake.DataLakeFileAsyncClient.append#Flux-long-long -->
942+
*
943+
* <p>For more information, see the
944+
* <a href="https://docs.microsoft.com/rest/api/storageservices/datalakestoragegen2/path/update">Azure
945+
* Docs</a></p>
946+
*
947+
* @param data The data to write to the file.
948+
* @param fileOffset The position where the data is to be appended.
949+
*
950+
* @return A reactive response signalling completion.
951+
*/
952+
@ServiceMethod(returns = ReturnType.SINGLE)
953+
public Mono<Void> append(BinaryData data, long fileOffset) {
954+
return appendWithResponse(data, fileOffset, null, null).flatMap(FluxUtil::toMono);
955+
}
956+
846957
/**
847958
* Appends data to the specified resource to later be flushed (written) by a call to flush
848959
*
@@ -884,6 +995,47 @@ public Mono<Response<Void>> appendWithResponse(Flux<ByteBuffer> data, long fileO
884995
}
885996
}
886997

998+
/**
999+
* Appends data to the specified resource to later be flushed (written) by a call to flush
1000+
*
1001+
* <p><strong>Code Samples</strong></p>
1002+
*
1003+
* <!-- src_embed com.azure.storage.file.datalake.DataLakeFileAsyncClient.appendWithResponse#Flux-long-long-byte-String -->
1004+
* <pre>
1005+
* FileRange range = new FileRange&#40;1024, 2048L&#41;;
1006+
* DownloadRetryOptions options = new DownloadRetryOptions&#40;&#41;.setMaxRetryRequests&#40;5&#41;;
1007+
* byte[] contentMd5 = new byte[0]; &#47;&#47; Replace with valid md5
1008+
*
1009+
* client.appendWithResponse&#40;data, offset, length, contentMd5, leaseId&#41;.subscribe&#40;response -&gt;
1010+
* System.out.printf&#40;&quot;Append data completed with status %d%n&quot;, response.getStatusCode&#40;&#41;&#41;&#41;;
1011+
* </pre>
1012+
* <!-- end com.azure.storage.file.datalake.DataLakeFileAsyncClient.appendWithResponse#Flux-long-long-byte-String -->
1013+
*
1014+
* <p>For more information, see the
1015+
* <a href="https://docs.microsoft.com/rest/api/storageservices/datalakestoragegen2/path/update">Azure
1016+
* Docs</a></p>
1017+
*
1018+
* @param data The data to write to the file.
1019+
* @param fileOffset The position where the data is to be appended.
1020+
* @param contentMd5 An MD5 hash of the content of the data. If specified, the service will calculate the MD5 of the
1021+
* received data and fail the request if it does not match the provided MD5.
1022+
* @param leaseId By setting lease id, requests will fail if the provided lease does not match the active lease on
1023+
* the file.
1024+
*
1025+
* @return A reactive response signalling completion.
1026+
*/
1027+
@ServiceMethod(returns = ReturnType.SINGLE)
1028+
public Mono<Response<Void>> appendWithResponse(BinaryData data, long fileOffset, byte[] contentMd5, String leaseId) {
1029+
try {
1030+
Objects.requireNonNull(data);
1031+
Flux<ByteBuffer> fluxData = data.toFluxByteBuffer();
1032+
long length = data.getLength();
1033+
return withContext(context -> appendWithResponse(fluxData, fileOffset, length, contentMd5, leaseId, context));
1034+
} catch (RuntimeException ex) {
1035+
return monoError(LOGGER, ex);
1036+
}
1037+
}
1038+
8871039
Mono<Response<Void>> appendWithResponse(Flux<ByteBuffer> data, long fileOffset, long length,
8881040
byte[] contentMd5, String leaseId, Context context) {
8891041

sdk/storage/azure-storage-file-datalake/src/main/java/com/azure/storage/file/datalake/DataLakeFileClient.java

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import com.azure.core.http.rest.Response;
1010
import com.azure.core.http.rest.ResponseBase;
1111
import com.azure.core.http.rest.SimpleResponse;
12+
import com.azure.core.util.BinaryData;
1213
import com.azure.core.util.Context;
1314
import com.azure.core.util.FluxUtil;
1415
import com.azure.core.util.logging.ClientLogger;
@@ -286,6 +287,32 @@ public PathInfo upload(InputStream data, long length) {
286287
return upload(data, length, false);
287288
}
288289

290+
/**
291+
* Creates a new file. By default, this method will not overwrite an existing file.
292+
*
293+
* <p><strong>Code Samples</strong></p>
294+
*
295+
* <!-- src_embed com.azure.storage.file.datalake.DataLakeFileClient.upload#BinaryData -->
296+
* <pre>
297+
* try &#123;
298+
* client.upload&#40;binaryData&#41;;
299+
* System.out.println&#40;&quot;Upload from file succeeded&quot;&#41;;
300+
* &#125; catch &#40;UncheckedIOException ex&#41; &#123;
301+
* System.err.printf&#40;&quot;Failed to upload from file %s%n&quot;, ex.getMessage&#40;&#41;&#41;;
302+
* &#125;
303+
* </pre>
304+
* <!-- end com.azure.storage.file.datalake.DataLakeFileClient.upload#BinaryData -->
305+
*
306+
* @param data The data to write to the blob. The data must be markable. This is in order to support retries. If
307+
* the data is not markable, consider wrapping your data source in a {@link java.io.BufferedInputStream} to add mark
308+
* support.
309+
* @return Information about the uploaded path.
310+
*/
311+
@ServiceMethod(returns = ReturnType.SINGLE)
312+
public PathInfo upload(BinaryData data) {
313+
return upload(data, false);
314+
}
315+
289316
/**
290317
* Creates a new file, or updates the content of an existing file.
291318
*
@@ -321,6 +348,39 @@ public PathInfo upload(InputStream data, long length, boolean overwrite) {
321348
null, Context.NONE).getValue();
322349
}
323350

351+
/**
352+
* Creates a new file, or updates the content of an existing file.
353+
*
354+
* <p><strong>Code Samples</strong></p>
355+
*
356+
* <!-- src_embed com.azure.storage.file.datalake.DataLakeFileClient.upload#BinaryData-boolean -->
357+
* <pre>
358+
* try &#123;
359+
* boolean overwrite = false;
360+
* client.upload&#40;binaryData, overwrite&#41;;
361+
* System.out.println&#40;&quot;Upload from file succeeded&quot;&#41;;
362+
* &#125; catch &#40;UncheckedIOException ex&#41; &#123;
363+
* System.err.printf&#40;&quot;Failed to upload from file %s%n&quot;, ex.getMessage&#40;&#41;&#41;;
364+
* &#125;
365+
* </pre>
366+
* <!-- end com.azure.storage.file.datalake.DataLakeFileClient.upload#BinaryData-boolean -->
367+
*
368+
* @param data The data to write to the blob. The data must be markable. This is in order to support retries. If
369+
* the data is not markable, consider wrapping your data source in a {@link java.io.BufferedInputStream} to add mark
370+
* support.
371+
* @param overwrite Whether to overwrite, should data exist on the file.
372+
* @return Information about the uploaded path.
373+
*/
374+
@ServiceMethod(returns = ReturnType.SINGLE)
375+
public PathInfo upload(BinaryData data, boolean overwrite) {
376+
DataLakeRequestConditions requestConditions = new DataLakeRequestConditions();
377+
if (!overwrite) {
378+
requestConditions.setIfNoneMatch(Constants.HeaderConstants.ETAG_WILDCARD);
379+
}
380+
return uploadWithResponse(new FileParallelUploadOptions(data).setRequestConditions(requestConditions),
381+
null, Context.NONE).getValue();
382+
}
383+
324384
/**
325385
* Creates a new file.
326386
* To avoid overwriting, pass "*" to {@link DataLakeRequestConditions#setIfNoneMatch(String)}.
@@ -513,6 +573,30 @@ public void append(InputStream data, long fileOffset, long length) {
513573
appendWithResponse(data, fileOffset, length, null, null, null, Context.NONE);
514574
}
515575

576+
/**
577+
* Appends data to the specified resource to later be flushed (written) by a call to flush
578+
*
579+
* <p><strong>Code Samples</strong></p>
580+
*
581+
* <!-- src_embed com.azure.storage.file.datalake.DataLakeFileClient.append#BinaryData-long -->
582+
* <pre>
583+
* client.append&#40;binaryData, offset&#41;;
584+
* System.out.println&#40;&quot;Append data completed&quot;&#41;;
585+
* </pre>
586+
* <!-- end com.azure.storage.file.datalake.DataLakeFileClient.append#BinaryData-long -->
587+
*
588+
* <p>For more information, see the
589+
* <a href="https://docs.microsoft.com/rest/api/storageservices/datalakestoragegen2/path/update">Azure
590+
* Docs</a></p>
591+
*
592+
* @param data The data to write to the file.
593+
* @param fileOffset The position where the data is to be appended.
594+
*/
595+
@ServiceMethod(returns = ReturnType.SINGLE)
596+
public void append(BinaryData data, long fileOffset) {
597+
appendWithResponse(data, fileOffset, null, null, null, Context.NONE);
598+
}
599+
516600
/**
517601
* Appends data to the specified resource to later be flushed (written) by a call to flush
518602
*
@@ -563,6 +647,54 @@ public Response<Void> appendWithResponse(InputStream data, long fileOffset, long
563647
}
564648
}
565649

650+
/**
651+
* Appends data to the specified resource to later be flushed (written) by a call to flush
652+
*
653+
* <p><strong>Code Samples</strong></p>
654+
*
655+
* <!-- src_embed com.azure.storage.file.datalake.DataLakeFileClient.appendWithResponse#BinaryData-long-byte-String-Duration-Context -->
656+
* <pre>
657+
* FileRange range = new FileRange&#40;1024, 2048L&#41;;
658+
* DownloadRetryOptions options = new DownloadRetryOptions&#40;&#41;.setMaxRetryRequests&#40;5&#41;;
659+
* byte[] contentMd5 = new byte[0]; &#47;&#47; Replace with valid md5
660+
*
661+
* Response&lt;Void&gt; response = client.appendWithResponse&#40;binaryData, offset, contentMd5, leaseId, timeout,
662+
* new Context&#40;key1, value1&#41;&#41;;
663+
* System.out.printf&#40;&quot;Append data completed with status %d%n&quot;, response.getStatusCode&#40;&#41;&#41;;
664+
* </pre>
665+
* <!-- end com.azure.storage.file.datalake.DataLakeFileClient.appendWithResponse#BinaryData-long-byte-String-Duration-Context -->
666+
*
667+
* <p>For more information, see the
668+
* <a href="https://docs.microsoft.com/rest/api/storageservices/datalakestoragegen2/path/update">Azure
669+
* Docs</a></p>
670+
*
671+
* @param data The data to write to the file.
672+
* @param fileOffset The position where the data is to be appended.
673+
* @param contentMd5 An MD5 hash of the content of the data. If specified, the service will calculate the MD5 of the
674+
* received data and fail the request if it does not match the provided MD5.
675+
* @param leaseId By setting lease id, requests will fail if the provided lease does not match the active lease on
676+
* the file.
677+
* @param timeout An optional timeout value beyond which a {@link RuntimeException} will be raised.
678+
* @param context Additional context that is passed through the Http pipeline during the service call.
679+
*
680+
* @return A response signalling completion.
681+
*/
682+
@ServiceMethod(returns = ReturnType.SINGLE)
683+
public Response<Void> appendWithResponse(BinaryData data, long fileOffset, byte[] contentMd5, String leaseId,
684+
Duration timeout, Context context) {
685+
686+
Objects.requireNonNull(data);
687+
Flux<ByteBuffer> fluxData = data.toFluxByteBuffer();
688+
Mono<Response<Void>> response = dataLakeFileAsyncClient.appendWithResponse(
689+
fluxData.subscribeOn(Schedulers.boundedElastic()), fileOffset, data.getLength(), contentMd5, leaseId, context);
690+
691+
try {
692+
return StorageImplUtils.blockWithOptionalTimeout(response, timeout);
693+
} catch (UncheckedIOException e) {
694+
throw LOGGER.logExceptionAsError(e);
695+
}
696+
}
697+
566698
/**
567699
* Flushes (writes) data previously appended to the file through a call to append.
568700
* The previously uploaded data must be contiguous.

0 commit comments

Comments
 (0)